You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/03/27 03:19:35 UTC
svn commit: r758977 - in
/incubator/cassandra/trunk/src/org/apache/cassandra/db:
RowMutationVerbHandler.java WriteResponse.java
Author: jbellis
Date: Fri Mar 27 02:19:35 2009
New Revision: 758977
URL: http://svn.apache.org/viewvc?rev=758977&view=rev
Log:
send back response so blocking calls can work
Modified:
incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutationVerbHandler.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/WriteResponse.java
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutationVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=758977&r1=758976&r2=758977&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutationVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutationVerbHandler.java Fri Mar 27 02:19:35 2009
@@ -46,17 +46,13 @@
protected Row row_ = new Row();
protected DataInputBuffer buffer_ = new DataInputBuffer();
}
-
- private static Logger logger_ = Logger.getLogger(RowMutationVerbHandler.class);
+
+ private static Logger logger_ = Logger.getLogger(RowMutationVerbHandler.class);
/* We use this so that we can reuse the same row mutation context for the mutation. */
private static ThreadLocal<RowMutationContext> tls_ = new InheritableThreadLocal<RowMutationContext>();
-
+
public void doVerb(Message message)
{
- /* For DEBUG only. Printing queue length */
- logger_.info( "ROW MUTATION STAGE: " + StageManager.getStageTaskCount(StorageService.mutationStage_) );
- /* END DEBUG */
-
byte[] bytes = (byte[])message.getMessageBody()[0];
/* Obtain a Row Mutation Context from TLS */
RowMutationContext rowMutationCtx = tls_.get();
@@ -65,51 +61,46 @@
rowMutationCtx = new RowMutationContext();
tls_.set(rowMutationCtx);
}
-
- rowMutationCtx.buffer_.reset(bytes, bytes.length);
-
+
+ rowMutationCtx.buffer_.reset(bytes, bytes.length);
+
try
{
- RowMutationMessage rmMsg = RowMutationMessage.serializer().deserialize(rowMutationCtx.buffer_);
- RowMutation rm = rmMsg.getRowMutation();
+ RowMutation rm = RowMutation.serializer().deserialize(rowMutationCtx.buffer_);
+ logger_.debug("Applying " + rm);
+
/* Check if there were any hints in this message */
- byte[] hintedBytes = message.getHeader(RowMutationMessage.hint_);
+ byte[] hintedBytes = message.getHeader(RowMutationMessage.hint_);
if ( hintedBytes != null && hintedBytes.length > 0 )
{
EndPoint hint = EndPoint.fromBytes(hintedBytes);
+ logger_.debug("Adding hint for " + hint);
/* add necessary hints to this mutation */
- try
- {
- RowMutation hintedMutation = new RowMutation(rm.table(), HintedHandOffManager.key_);
- hintedMutation.addHints(rm.key() + ":" + hint.getHost());
- hintedMutation.apply();
- }
- catch ( ColumnFamilyNotDefinedException ex )
- {
- logger_.debug(LogUtil.throwableToString(ex));
- }
+ RowMutation hintedMutation = new RowMutation(rm.table(), HintedHandOffManager.key_);
+ hintedMutation.addHints(rm.key() + ":" + hint.getHost());
+ hintedMutation.apply();
}
-
- long start = System.currentTimeMillis();
-
+
+ long start = System.currentTimeMillis();
+
rowMutationCtx.row_.key(rm.key());
rm.apply(rowMutationCtx.row_);
-
- long end = System.currentTimeMillis();
- logger_.info("ROW MUTATION APPLY: " + (end - start) + " ms.");
-
- /*WriteResponseMessage writeResponseMessage = new WriteResponseMessage(rm.table(), rm.key(), true);
- Message response = message.getReply( StorageService.getLocalStorageEndPoint(), new Object[]{writeResponseMessage} );
- logger_.debug("Sending teh response to " + message.getFrom() + " for key :" + rm.key());
- MessagingService.getMessagingInstance().sendOneWay(response, message.getFrom()); */
- }
- catch( ColumnFamilyNotDefinedException ex )
+
+ long end = System.currentTimeMillis();
+
+ WriteResponse response = new WriteResponse(rm.table(), rm.key(), true);
+ Message responseMessage = WriteResponse.makeWriteResponseMessage(message, response);
+ logger_.debug("Mutation applied in " + (end - start) + "ms. Sending response to " + message.getFrom() + " for key :" + rm.key());
+ MessagingService.getMessagingInstance().sendOneWay(responseMessage, message.getFrom());
+ }
+ catch(ColumnFamilyNotDefinedException ex)
{
- logger_.debug(LogUtil.throwableToString(ex));
- }
- catch ( IOException e )
+ // TODO shouldn't this be checked before it's sent to us?
+ logger_.warn("column family not defined, and no way to tell the client", ex);
+ }
+ catch (IOException e)
{
- logger_.debug(LogUtil.throwableToString(e));
- }
+ logger_.error("Error in row mutation", e);
+ }
}
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/WriteResponse.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/WriteResponse.java?rev=758977&r1=758976&r2=758977&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/WriteResponse.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/WriteResponse.java Fri Mar 27 02:19:35 2009
@@ -38,40 +38,26 @@
* key in a table
* Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
*/
-public class WriteResponse implements Serializable
+public class WriteResponse
{
-private static ICompactSerializer<WriteResponse> serializer_;
+ private static WriteResponseSerializer serializer_ = new WriteResponseSerializer();
- static
- {
- serializer_ = new WriteResponseSerializer();
- }
-
- static ICompactSerializer<WriteResponse> serializer()
+ public static WriteResponseSerializer serializer()
{
return serializer_;
}
-
- public static Message makeWriteResponseMessage(WriteResponse writeResponse) throws IOException
+
+ public static Message makeWriteResponseMessage(Message original, WriteResponse writeResponseMessage) throws IOException
{
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream( bos );
- WriteResponse.serializer().serialize(writeResponse, dos);
- Message message = new Message(StorageService.getLocalStorageEndPoint(), MessagingService.responseStage_, MessagingService.responseVerbHandler_, new Object[]{bos.toByteArray()});
- return message;
+ WriteResponse.serializer().serialize(writeResponseMessage, dos);
+ return original.getReply(StorageService.getLocalStorageEndPoint(), bos.toByteArray());
}
-
- @XmlElement(name = "Table")
- private String table_;
-
- @XmlElement(name = "key")
- private String key_;
-
- @XmlElement(name = "Status")
- private boolean status_;
-
- private WriteResponse() {
- }
+
+ private final String table_;
+ private final String key_;
+ private final boolean status_;
public WriteResponse(String table, String key, boolean bVal) {
table_ = table;
@@ -79,36 +65,36 @@
status_ = bVal;
}
- public String table()
+ public String table()
{
return table_;
}
- public String key()
+ public String key()
{
return key_;
}
-
+
public boolean isSuccess()
{
return status_;
}
-}
-class WriteResponseSerializer implements ICompactSerializer<WriteResponse>
-{
- public void serialize(WriteResponse wm, DataOutputStream dos) throws IOException
- {
- dos.writeUTF(wm.table());
- dos.writeUTF(wm.key());
- dos.writeBoolean(wm.isSuccess());
- }
-
- public WriteResponse deserialize(DataInputStream dis) throws IOException
+ public static class WriteResponseSerializer implements ICompactSerializer<WriteResponse>
{
- String table = dis.readUTF();
- String key = dis.readUTF();
- boolean status = dis.readBoolean();
- return new WriteResponse(table, key, status);
+ public void serialize(WriteResponse wm, DataOutputStream dos) throws IOException
+ {
+ dos.writeUTF(wm.table());
+ dos.writeUTF(wm.key());
+ dos.writeBoolean(wm.isSuccess());
+ }
+
+ public WriteResponse deserialize(DataInputStream dis) throws IOException
+ {
+ String table = dis.readUTF();
+ String key = dis.readUTF();
+ boolean status = dis.readBoolean();
+ return new WriteResponse(table, key, status);
+ }
}
-}
\ No newline at end of file
+}