You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/03/27 03:19:35 UTC

svn commit: r758977 - in /incubator/cassandra/trunk/src/org/apache/cassandra/db: RowMutationVerbHandler.java WriteResponse.java

Author: jbellis
Date: Fri Mar 27 02:19:35 2009
New Revision: 758977

URL: http://svn.apache.org/viewvc?rev=758977&view=rev
Log:
send back response so blocking calls can work

Modified:
    incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutationVerbHandler.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/WriteResponse.java

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutationVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=758977&r1=758976&r2=758977&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutationVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutationVerbHandler.java Fri Mar 27 02:19:35 2009
@@ -46,17 +46,13 @@
         protected Row row_ = new Row();
         protected DataInputBuffer buffer_ = new DataInputBuffer();
     }
-    
-    private static Logger logger_ = Logger.getLogger(RowMutationVerbHandler.class);     
+
+    private static Logger logger_ = Logger.getLogger(RowMutationVerbHandler.class);
     /* We use this so that we can reuse the same row mutation context for the mutation. */
     private static ThreadLocal<RowMutationContext> tls_ = new InheritableThreadLocal<RowMutationContext>();
-    
+
     public void doVerb(Message message)
     {
-        /* For DEBUG only. Printing queue length */                         
-        logger_.info( "ROW MUTATION STAGE: " + StageManager.getStageTaskCount(StorageService.mutationStage_) );
-        /* END DEBUG */
-            
         byte[] bytes = (byte[])message.getMessageBody()[0];
         /* Obtain a Row Mutation Context from TLS */
         RowMutationContext rowMutationCtx = tls_.get();
@@ -65,51 +61,46 @@
             rowMutationCtx = new RowMutationContext();
             tls_.set(rowMutationCtx);
         }
-                
-        rowMutationCtx.buffer_.reset(bytes, bytes.length);        
-        
+
+        rowMutationCtx.buffer_.reset(bytes, bytes.length);
+
         try
         {
-            RowMutationMessage rmMsg = RowMutationMessage.serializer().deserialize(rowMutationCtx.buffer_);
-            RowMutation rm = rmMsg.getRowMutation();
+            RowMutation rm = RowMutation.serializer().deserialize(rowMutationCtx.buffer_);
+            logger_.debug("Applying " + rm);
+
             /* Check if there were any hints in this message */
-            byte[] hintedBytes = message.getHeader(RowMutationMessage.hint_);            
+            byte[] hintedBytes = message.getHeader(RowMutationMessage.hint_);
             if ( hintedBytes != null && hintedBytes.length > 0 )
             {
             	EndPoint hint = EndPoint.fromBytes(hintedBytes);
+                logger_.debug("Adding hint for " + hint);
                 /* add necessary hints to this mutation */
-                try
-                {
-                	RowMutation hintedMutation = new RowMutation(rm.table(), HintedHandOffManager.key_);
-                	hintedMutation.addHints(rm.key() + ":" + hint.getHost());
-                	hintedMutation.apply();
-                }
-                catch ( ColumnFamilyNotDefinedException ex )
-                {
-                    logger_.debug(LogUtil.throwableToString(ex));
-                }
+                RowMutation hintedMutation = new RowMutation(rm.table(), HintedHandOffManager.key_);
+                hintedMutation.addHints(rm.key() + ":" + hint.getHost());
+                hintedMutation.apply();
             }
-            
-            long start = System.currentTimeMillis(); 
-            
+
+            long start = System.currentTimeMillis();
+
             rowMutationCtx.row_.key(rm.key());
             rm.apply(rowMutationCtx.row_);
-            
-            long end = System.currentTimeMillis();                       
-            logger_.info("ROW MUTATION APPLY: " + (end - start) + " ms.");
-            
-            /*WriteResponseMessage writeResponseMessage = new WriteResponseMessage(rm.table(), rm.key(), true);
-            Message response = message.getReply( StorageService.getLocalStorageEndPoint(), new Object[]{writeResponseMessage} );
-            logger_.debug("Sending teh response to " +  message.getFrom() + " for key :" + rm.key());
-            MessagingService.getMessagingInstance().sendOneWay(response, message.getFrom());  */                    
-        }         
-        catch( ColumnFamilyNotDefinedException ex )
+
+            long end = System.currentTimeMillis();
+
+            WriteResponse response = new WriteResponse(rm.table(), rm.key(), true);
+            Message responseMessage = WriteResponse.makeWriteResponseMessage(message, response);
+            logger_.debug("Mutation applied in " + (end - start) + "ms.  Sending response to " +  message.getFrom() + " for key :" + rm.key());
+            MessagingService.getMessagingInstance().sendOneWay(responseMessage, message.getFrom());
+        }
+        catch(ColumnFamilyNotDefinedException ex)
         {
-            logger_.debug(LogUtil.throwableToString(ex));
-        }        
-        catch ( IOException e )
+            // TODO shouldn't this be checked before it's sent to us?
+            logger_.warn("column family not defined, and no way to tell the client", ex);
+        }
+        catch (IOException e)
         {
-            logger_.debug(LogUtil.throwableToString(e));            
-        }        
+            logger_.error("Error in row mutation", e);
+        }
     }
 }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/WriteResponse.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/WriteResponse.java?rev=758977&r1=758976&r2=758977&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/WriteResponse.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/WriteResponse.java Fri Mar 27 02:19:35 2009
@@ -38,40 +38,26 @@
  * key in a table
  * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
  */
-public class WriteResponse implements Serializable
+public class WriteResponse 
 {
-private static ICompactSerializer<WriteResponse> serializer_;
+    private static WriteResponseSerializer serializer_ = new WriteResponseSerializer();
 
-    static
-    {
-        serializer_ = new WriteResponseSerializer();
-    }
-
-    static ICompactSerializer<WriteResponse> serializer()
+    public static WriteResponseSerializer serializer()
     {
         return serializer_;
     }
-	
-    public static Message makeWriteResponseMessage(WriteResponse writeResponse) throws IOException
+
+    public static Message makeWriteResponseMessage(Message original, WriteResponse writeResponseMessage) throws IOException
     {
     	ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream( bos );
-        WriteResponse.serializer().serialize(writeResponse, dos);
-        Message message = new Message(StorageService.getLocalStorageEndPoint(), MessagingService.responseStage_, MessagingService.responseVerbHandler_, new Object[]{bos.toByteArray()});         
-        return message;
+        WriteResponse.serializer().serialize(writeResponseMessage, dos);
+        return original.getReply(StorageService.getLocalStorageEndPoint(), bos.toByteArray());
     }
-    
-	@XmlElement(name = "Table")
-	private String table_;
-
-	@XmlElement(name = "key")
-	private String key_;
-	
-	@XmlElement(name = "Status")
-	private boolean status_;
-	
-	private WriteResponse() {
-	}
+
+	private final String table_;
+	private final String key_;
+	private final boolean status_;
 
 	public WriteResponse(String table, String key, boolean bVal) {
 		table_ = table;
@@ -79,36 +65,36 @@
 		status_ = bVal;
 	}
 
-	public String table() 
+	public String table()
 	{
 		return table_;
 	}
 
-	public String key() 
+	public String key()
 	{
 		return key_;
 	}
-	
+
 	public boolean isSuccess()
 	{
 		return status_;
 	}
-}
 
-class WriteResponseSerializer implements ICompactSerializer<WriteResponse>
-{
-	public void serialize(WriteResponse wm, DataOutputStream dos) throws IOException
-	{
-		dos.writeUTF(wm.table());
-		dos.writeUTF(wm.key());
-		dos.writeBoolean(wm.isSuccess());
-	}
-	
-    public WriteResponse deserialize(DataInputStream dis) throws IOException
+    public static class WriteResponseSerializer implements ICompactSerializer<WriteResponse>
     {
-    	String table = dis.readUTF();
-    	String key = dis.readUTF();
-    	boolean status = dis.readBoolean();
-    	return new WriteResponse(table, key, status);
+        public void serialize(WriteResponse wm, DataOutputStream dos) throws IOException
+        {
+            dos.writeUTF(wm.table());
+            dos.writeUTF(wm.key());
+            dos.writeBoolean(wm.isSuccess());
+        }
+
+        public WriteResponse deserialize(DataInputStream dis) throws IOException
+        {
+            String table = dis.readUTF();
+            String key = dis.readUTF();
+            boolean status = dis.readBoolean();
+            return new WriteResponse(table, key, status);
+        }
     }
-}
\ No newline at end of file
+}