You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/05/07 19:13:14 UTC

svn commit: r772713 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra: db/ net/ service/

Author: jbellis
Date: Thu May  7 17:13:05 2009
New Revision: 772713

URL: http://svn.apache.org/viewvc?rev=772713&view=rev
Log:
clean up read/write path more; include message id in logging so we can trace what
happens to an individual op.  patch by jbellis.

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeCommand.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeCommand.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeCommand.java?rev=772713&r1=772712&r2=772713&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeCommand.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeCommand.java Thu May  7 17:13:05 2009
@@ -45,6 +45,16 @@
         dib.reset(bytes, bytes.length);
         return serializer.deserialize(new DataInputStream(dib));
     }
+
+    public String toString()
+    {
+        return "RangeCommand(" +
+               "table='" + table + '\'' +
+               ", startWith='" + startWith + '\'' +
+               ", stopAt='" + stopAt + '\'' +
+               ", maxResults=" + maxResults +
+               ')';
+    }
 }
 
 class RangeCommandSerializer implements ICompactSerializer<RangeCommand>

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java?rev=772713&r1=772712&r2=772713&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java Thu May  7 17:13:05 2009
@@ -95,7 +95,7 @@
             System.arraycopy(readCtx.bufOut_.getData(), 0, bytes, 0, bytes.length);
 
             Message response = message.getReply(StorageService.getLocalStorageEndPoint(), bytes);
-            logger_.debug("Read key " + readCommand.key + "; sending response to " + message.getFrom());
+            logger_.debug("Read key " + readCommand.key + "; sending response to " + message.getMessageId() + "@" + message.getFrom());
             MessagingService.getMessagingInstance().sendOneWay(response, message.getFrom());
 
             /* Do read repair if header of the message says so */

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java?rev=772713&r1=772712&r2=772713&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java Thu May  7 17:13:05 2009
@@ -167,7 +167,6 @@
 
     public byte[] digest()
     {
-        long start = System.currentTimeMillis();
         Set<String> cfamilies = columnFamilies_.keySet();
         byte[] xorHash = ArrayUtils.EMPTY_BYTE_ARRAY;
         for (String cFamily : cfamilies)
@@ -181,7 +180,6 @@
                 xorHash = FBUtilities.xor(xorHash, columnFamilies_.get(cFamily).digest());
             }
         }
-        logger_.info("DIGEST TIME: " + (System.currentTimeMillis() - start) + " ms.");
         return xorHash;
     }
 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=772713&r1=772712&r2=772713&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java Thu May  7 17:13:05 2009
@@ -91,7 +91,7 @@
 
             WriteResponse response = new WriteResponse(rm.table(), rm.key(), true);
             Message responseMessage = WriteResponse.makeWriteResponseMessage(message, response);
-            logger_.debug("Mutation applied in " + (end - start) + "ms.  Sending response to " +  message.getFrom() + " for key :" + rm.key());
+            logger_.debug("Mutation applied in " + (end - start) + "ms.  Sending response to " + message.getMessageId() + "@" + message.getFrom());
             MessagingService.getMessagingInstance().sendOneWay(responseMessage, message.getFrom());
         }
         catch (IOException e)

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java?rev=772713&r1=772712&r2=772713&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java Thu May  7 17:13:05 2009
@@ -34,7 +34,7 @@
         IAsyncCallback cb = MessagingService.getRegisteredCallback(messageId);
         if ( cb != null )
         {
-            logger_.info("Processing response on a callback from " + message.getFrom());
+            logger_.info("Processing response on a callback from " + message.getMessageId() + "@" + message.getFrom());
             cb.response(message);
         }
         else
@@ -42,7 +42,7 @@
             IAsyncResult ar = MessagingService.getAsyncResult(messageId);
             if ( ar != null )
             {
-                logger_.info("Processing response on an async result from " + message.getFrom());
+                logger_.info("Processing response on an async result from " + message.getMessageId() + "@" + message.getFrom());
                 ar.result(message);
             }
         }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java?rev=772713&r1=772712&r2=772713&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java Thu May  7 17:13:05 2009
@@ -508,7 +508,7 @@
             throw new InvalidRequestException("range queries may only be performed against an order-preserving partitioner");
         }
 
-        return StorageProxy.getRange(new RangeCommand(tablename, startWith, stopAt, maxResults));
+        return StorageProxy.getKeyRange(new RangeCommand(tablename, startWith, stopAt, maxResults));
     }
 
     /*

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java?rev=772713&r1=772712&r2=772713&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java Thu May  7 17:13:05 2009
@@ -37,6 +37,7 @@
 import org.apache.cassandra.utils.ICachetable;
 import org.apache.cassandra.utils.LogUtil;
 import org.apache.log4j.Logger;
+import org.apache.commons.lang.StringUtils;
 
 
 class ConsistencyManager implements Runnable
@@ -49,7 +50,6 @@
 		
 		public void response(Message msg)
 		{
-			logger_.debug("Received reponse : " + msg.toString());
 			responses_.add(msg);
 			if ( responses_.size() == ConsistencyManager.this.replicas_.size() )
 				handleDigestResponses();
@@ -91,11 +91,10 @@
             /* Add the local storage endpoint to the replicas_ list */
             replicas_.add(StorageService.getLocalStorageEndPoint());
 			IAsyncCallback responseHandler = new DataRepairHandler(ConsistencyManager.this.replicas_.size(), readResponseResolver);	
-			String table = DatabaseDescriptor.getTables().get(0);
             ReadCommand readCommand = constructReadMessage(false);
-			// ReadMessage readMessage = new ReadMessage(table, row_.key(), columnFamily_);
             Message message = readCommand.makeReadMessage();
-			MessagingService.getMessagingInstance().sendRR(message, replicas_.toArray( new EndPoint[0] ), responseHandler);			
+            logger_.debug("Performing read repair for " + readCommand_.key + " to " + message.getMessageId() + "@[" + StringUtils.join(replicas_, ", ") + "]");
+			MessagingService.getMessagingInstance().sendRR(message, replicas_.toArray(new EndPoint[replicas_.size()]), responseHandler);
 		}
 	}
 	
@@ -140,8 +139,7 @@
 			}
 			catch ( DigestMismatchException ex )
 			{
-				logger_.info("We should not be coming here under any circumstances ...");
-				logger_.info(LogUtil.throwableToString(ex));
+				throw new RuntimeException(ex);
 			}
 		}
 	}
@@ -161,17 +159,16 @@
 
 	public void run()
 	{
-		logger_.debug(" Run the consistency checks for " + readCommand_.getColumnFamilyName());		
         ReadCommand readCommandDigestOnly = constructReadMessage(true);
 		try
 		{
-			Message messageDigestOnly = readCommandDigestOnly.makeReadMessage();
-			IAsyncCallback digestResponseHandler = new DigestResponseHandler();
-			MessagingService.getMessagingInstance().sendRR(messageDigestOnly, replicas_.toArray(new EndPoint[replicas_.size()]), digestResponseHandler);
+			Message message = readCommandDigestOnly.makeReadMessage();
+            logger_.debug("Reading consistency digest for " + readCommand_.key + " from " + message.getMessageId() + "@[" + StringUtils.join(replicas_, ", ") + "]");
+            MessagingService.getMessagingInstance().sendRR(message, replicas_.toArray(new EndPoint[replicas_.size()]), new DigestResponseHandler());
 		}
-		catch ( IOException ex )
+		catch (IOException ex)
 		{
-			logger_.info(LogUtil.throwableToString(ex));
+			throw new RuntimeException(ex);
 		}
 	}
     

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java?rev=772713&r1=772712&r2=772713&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java Thu May  7 17:13:05 2009
@@ -214,7 +214,6 @@
 
     public void onChange(EndPoint endpoint, EndPointState epState)
     {
-        logger_.debug("CHANGE IN STATE FOR @ StorageLoadBalancer " + endpoint);
         // load information for this specified endpoint for load balancing 
         ApplicationState loadInfoState = epState.getApplicationState(LoadDisseminator.loadInfo_);
         if ( loadInfoState != null )

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=772713&r1=772712&r2=772713&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Thu May  7 17:13:05 2009
@@ -117,15 +117,17 @@
 			Map<EndPoint, EndPoint> endpointMap = StorageService.instance().getNStorageEndPointMap(rm.key());
 			// TODO: throw a thrift exception if we do not have N nodes
 			Map<EndPoint, Message> messageMap = createWriteMessages(rm, endpointMap);
-            logger.debug("insert writing key " + rm.key() + " to [" + StringUtils.join(messageMap.keySet(), ", ") + "]");
 			for (Map.Entry<EndPoint, Message> entry : messageMap.entrySet())
 			{
-				MessagingService.getMessagingInstance().sendOneWay(entry.getValue(), entry.getKey());
+                Message message = entry.getValue();
+                EndPoint endpoint = entry.getKey();
+                logger.debug("insert writing key " + rm.key() + " to " + message.getMessageId() + "@" + endpoint);
+                MessagingService.getMessagingInstance().sendOneWay(message, endpoint);
 			}
 		}
         catch (IOException e)
         {
-            throw new RuntimeException(e);
+            throw new RuntimeException("error inserting key " + rm.key(), e);
         }
         finally
         {
@@ -151,7 +153,7 @@
                     DatabaseDescriptor.getReplicationFactor(),
                     new WriteResponseResolver());
             EndPoint[] endpoints = StorageService.instance().getNStorageEndPoint(rm.key());
-            logger.debug("insertBlocking writing key " + rm.key() + " to [" + StringUtils.join(endpoints, ", ") + "]");
+            logger.debug("insertBlocking writing key " + rm.key() + " to " + message.getMessageId() + "@[" + StringUtils.join(endpoints, ", ") + "]");
             // TODO: throw a thrift exception if we do not have N nodes
 
             MessagingService.getMessagingInstance().sendRR(message, endpoints, quorumResponseHandler);
@@ -160,7 +162,7 @@
         }
         catch (Exception e)
         {
-            logger.error(e);
+            logger.error("error writing key " + rm.key(), e);
             throw new UnavailableException();
         }
         finally
@@ -240,8 +242,8 @@
     {
         EndPoint endPoint = StorageService.instance().findSuitableEndPoint(command.key);
         assert endPoint != null;
-        logger.debug("weakreadremote reading " + command + " from " + endPoint);
         Message message = command.makeReadMessage();
+        logger.debug("weakreadremote reading " + command + " from " + message.getMessageId() + "@" + endPoint);
         message.addHeader(ReadCommand.DO_REPAIR, ReadCommand.DO_REPAIR.getBytes());
         IAsyncResult iar = MessagingService.getMessagingInstance().sendRR(message, endPoint);
         byte[] body;
@@ -251,8 +253,8 @@
         }
         catch (TimeoutException e)
         {
-            throw new RuntimeException(e);
-            // TODO retry to a different endpoint
+            throw new RuntimeException("error reading key " + command.key, e);
+            // TODO retry to a different endpoint?
         }
         DataInputBuffer bufIn = new DataInputBuffer();
         bufIn.reset(body, body.length);
@@ -329,7 +331,7 @@
         }
         catch (IOException ex)
         {
-            throw new RuntimeException(ex);
+            throw new RuntimeException("error touching key " + key, ex);
         }
         finally
         {
@@ -461,12 +463,14 @@
         */
         endPoints[0] = dataPoint;
         messages[0] = message;
+        logger.debug("strongread reading data for " + command + " from " + message.getMessageId() + "@" + dataPoint);
         for (int i = 1; i < endPoints.length; i++)
         {
-            endPoints[i] = endpointList.get(i - 1);
+            EndPoint digestPoint = endpointList.get(i - 1);
+            endPoints[i] = digestPoint;
             messages[i] = messageDigestOnly;
+            logger.debug("strongread reading digest for " + command + " from " + messageDigestOnly.getMessageId() + "@" + digestPoint);
         }
-        logger.debug("strongread reading " + command + " from " + StringUtils.join(endPoints, ", "));
 
         try
         {
@@ -495,7 +499,7 @@
                 catch (DigestMismatchException e)
                 {
                     // TODO should this be a thrift exception?
-                    throw new RuntimeException(e);
+                    throw new RuntimeException("digest mismatch reading key " + command.key, e);
                 }
             }
         }
@@ -592,7 +596,7 @@
         }
         catch (TimeoutException e)
         {
-            throw new RuntimeException(e);
+            throw new RuntimeException("timeout reading keys " + StringUtils.join(rows.keySet(), ", "), e);
         }
         return rows;
     }
@@ -660,7 +664,7 @@
         return row;
     }
 
-    static List<String> getRange(RangeCommand command)
+    static List<String> getKeyRange(RangeCommand command)
     {
         long startTime = System.currentTimeMillis();
         try
@@ -675,7 +679,7 @@
         }
         catch (Exception e)
         {
-            throw new RuntimeException(e);
+            throw new RuntimeException("error reading keyrange " + command, e);
         }
         finally
         {