You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/05/07 19:13:14 UTC
svn commit: r772713 - in
/incubator/cassandra/trunk/src/java/org/apache/cassandra: db/ net/ service/
Author: jbellis
Date: Thu May 7 17:13:05 2009
New Revision: 772713
URL: http://svn.apache.org/viewvc?rev=772713&view=rev
Log:
clean up read/write path more; include message id in logging so we can trace what
happens to an individual op. patch by jbellis.
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeCommand.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeCommand.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeCommand.java?rev=772713&r1=772712&r2=772713&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeCommand.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeCommand.java Thu May 7 17:13:05 2009
@@ -45,6 +45,16 @@
dib.reset(bytes, bytes.length);
return serializer.deserialize(new DataInputStream(dib));
}
+
+ public String toString()
+ {
+ return "RangeCommand(" +
+ "table='" + table + '\'' +
+ ", startWith='" + startWith + '\'' +
+ ", stopAt='" + stopAt + '\'' +
+ ", maxResults=" + maxResults +
+ ')';
+ }
}
class RangeCommandSerializer implements ICompactSerializer<RangeCommand>
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java?rev=772713&r1=772712&r2=772713&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java Thu May 7 17:13:05 2009
@@ -95,7 +95,7 @@
System.arraycopy(readCtx.bufOut_.getData(), 0, bytes, 0, bytes.length);
Message response = message.getReply(StorageService.getLocalStorageEndPoint(), bytes);
- logger_.debug("Read key " + readCommand.key + "; sending response to " + message.getFrom());
+ logger_.debug("Read key " + readCommand.key + "; sending response to " + message.getMessageId() + "@" + message.getFrom());
MessagingService.getMessagingInstance().sendOneWay(response, message.getFrom());
/* Do read repair if header of the message says so */
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java?rev=772713&r1=772712&r2=772713&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java Thu May 7 17:13:05 2009
@@ -167,7 +167,6 @@
public byte[] digest()
{
- long start = System.currentTimeMillis();
Set<String> cfamilies = columnFamilies_.keySet();
byte[] xorHash = ArrayUtils.EMPTY_BYTE_ARRAY;
for (String cFamily : cfamilies)
@@ -181,7 +180,6 @@
xorHash = FBUtilities.xor(xorHash, columnFamilies_.get(cFamily).digest());
}
}
- logger_.info("DIGEST TIME: " + (System.currentTimeMillis() - start) + " ms.");
return xorHash;
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=772713&r1=772712&r2=772713&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java Thu May 7 17:13:05 2009
@@ -91,7 +91,7 @@
WriteResponse response = new WriteResponse(rm.table(), rm.key(), true);
Message responseMessage = WriteResponse.makeWriteResponseMessage(message, response);
- logger_.debug("Mutation applied in " + (end - start) + "ms. Sending response to " + message.getFrom() + " for key :" + rm.key());
+ logger_.debug("Mutation applied in " + (end - start) + "ms. Sending response to " + message.getMessageId() + "@" + message.getFrom());
MessagingService.getMessagingInstance().sendOneWay(responseMessage, message.getFrom());
}
catch (IOException e)
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java?rev=772713&r1=772712&r2=772713&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java Thu May 7 17:13:05 2009
@@ -34,7 +34,7 @@
IAsyncCallback cb = MessagingService.getRegisteredCallback(messageId);
if ( cb != null )
{
- logger_.info("Processing response on a callback from " + message.getFrom());
+ logger_.info("Processing response on a callback from " + message.getMessageId() + "@" + message.getFrom());
cb.response(message);
}
else
@@ -42,7 +42,7 @@
IAsyncResult ar = MessagingService.getAsyncResult(messageId);
if ( ar != null )
{
- logger_.info("Processing response on an async result from " + message.getFrom());
+ logger_.info("Processing response on an async result from " + message.getMessageId() + "@" + message.getFrom());
ar.result(message);
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java?rev=772713&r1=772712&r2=772713&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java Thu May 7 17:13:05 2009
@@ -508,7 +508,7 @@
throw new InvalidRequestException("range queries may only be performed against an order-preserving partitioner");
}
- return StorageProxy.getRange(new RangeCommand(tablename, startWith, stopAt, maxResults));
+ return StorageProxy.getKeyRange(new RangeCommand(tablename, startWith, stopAt, maxResults));
}
/*
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java?rev=772713&r1=772712&r2=772713&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java Thu May 7 17:13:05 2009
@@ -37,6 +37,7 @@
import org.apache.cassandra.utils.ICachetable;
import org.apache.cassandra.utils.LogUtil;
import org.apache.log4j.Logger;
+import org.apache.commons.lang.StringUtils;
class ConsistencyManager implements Runnable
@@ -49,7 +50,6 @@
public void response(Message msg)
{
- logger_.debug("Received reponse : " + msg.toString());
responses_.add(msg);
if ( responses_.size() == ConsistencyManager.this.replicas_.size() )
handleDigestResponses();
@@ -91,11 +91,10 @@
/* Add the local storage endpoint to the replicas_ list */
replicas_.add(StorageService.getLocalStorageEndPoint());
IAsyncCallback responseHandler = new DataRepairHandler(ConsistencyManager.this.replicas_.size(), readResponseResolver);
- String table = DatabaseDescriptor.getTables().get(0);
ReadCommand readCommand = constructReadMessage(false);
- // ReadMessage readMessage = new ReadMessage(table, row_.key(), columnFamily_);
Message message = readCommand.makeReadMessage();
- MessagingService.getMessagingInstance().sendRR(message, replicas_.toArray( new EndPoint[0] ), responseHandler);
+ logger_.debug("Performing read repair for " + readCommand_.key + " to " + message.getMessageId() + "@[" + StringUtils.join(replicas_, ", ") + "]");
+ MessagingService.getMessagingInstance().sendRR(message, replicas_.toArray(new EndPoint[replicas_.size()]), responseHandler);
}
}
@@ -140,8 +139,7 @@
}
catch ( DigestMismatchException ex )
{
- logger_.info("We should not be coming here under any circumstances ...");
- logger_.info(LogUtil.throwableToString(ex));
+ throw new RuntimeException(ex);
}
}
}
@@ -161,17 +159,16 @@
public void run()
{
- logger_.debug(" Run the consistency checks for " + readCommand_.getColumnFamilyName());
ReadCommand readCommandDigestOnly = constructReadMessage(true);
try
{
- Message messageDigestOnly = readCommandDigestOnly.makeReadMessage();
- IAsyncCallback digestResponseHandler = new DigestResponseHandler();
- MessagingService.getMessagingInstance().sendRR(messageDigestOnly, replicas_.toArray(new EndPoint[replicas_.size()]), digestResponseHandler);
+ Message message = readCommandDigestOnly.makeReadMessage();
+ logger_.debug("Reading consistency digest for " + readCommand_.key + " from " + message.getMessageId() + "@[" + StringUtils.join(replicas_, ", ") + "]");
+ MessagingService.getMessagingInstance().sendRR(message, replicas_.toArray(new EndPoint[replicas_.size()]), new DigestResponseHandler());
}
- catch ( IOException ex )
+ catch (IOException ex)
{
- logger_.info(LogUtil.throwableToString(ex));
+ throw new RuntimeException(ex);
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java?rev=772713&r1=772712&r2=772713&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java Thu May 7 17:13:05 2009
@@ -214,7 +214,6 @@
public void onChange(EndPoint endpoint, EndPointState epState)
{
- logger_.debug("CHANGE IN STATE FOR @ StorageLoadBalancer " + endpoint);
// load information for this specified endpoint for load balancing
ApplicationState loadInfoState = epState.getApplicationState(LoadDisseminator.loadInfo_);
if ( loadInfoState != null )
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=772713&r1=772712&r2=772713&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Thu May 7 17:13:05 2009
@@ -117,15 +117,17 @@
Map<EndPoint, EndPoint> endpointMap = StorageService.instance().getNStorageEndPointMap(rm.key());
// TODO: throw a thrift exception if we do not have N nodes
Map<EndPoint, Message> messageMap = createWriteMessages(rm, endpointMap);
- logger.debug("insert writing key " + rm.key() + " to [" + StringUtils.join(messageMap.keySet(), ", ") + "]");
for (Map.Entry<EndPoint, Message> entry : messageMap.entrySet())
{
- MessagingService.getMessagingInstance().sendOneWay(entry.getValue(), entry.getKey());
+ Message message = entry.getValue();
+ EndPoint endpoint = entry.getKey();
+ logger.debug("insert writing key " + rm.key() + " to " + message.getMessageId() + "@" + endpoint);
+ MessagingService.getMessagingInstance().sendOneWay(message, endpoint);
}
}
catch (IOException e)
{
- throw new RuntimeException(e);
+ throw new RuntimeException("error inserting key " + rm.key(), e);
}
finally
{
@@ -151,7 +153,7 @@
DatabaseDescriptor.getReplicationFactor(),
new WriteResponseResolver());
EndPoint[] endpoints = StorageService.instance().getNStorageEndPoint(rm.key());
- logger.debug("insertBlocking writing key " + rm.key() + " to [" + StringUtils.join(endpoints, ", ") + "]");
+ logger.debug("insertBlocking writing key " + rm.key() + " to " + message.getMessageId() + "@[" + StringUtils.join(endpoints, ", ") + "]");
// TODO: throw a thrift exception if we do not have N nodes
MessagingService.getMessagingInstance().sendRR(message, endpoints, quorumResponseHandler);
@@ -160,7 +162,7 @@
}
catch (Exception e)
{
- logger.error(e);
+ logger.error("error writing key " + rm.key(), e);
throw new UnavailableException();
}
finally
@@ -240,8 +242,8 @@
{
EndPoint endPoint = StorageService.instance().findSuitableEndPoint(command.key);
assert endPoint != null;
- logger.debug("weakreadremote reading " + command + " from " + endPoint);
Message message = command.makeReadMessage();
+ logger.debug("weakreadremote reading " + command + " from " + message.getMessageId() + "@" + endPoint);
message.addHeader(ReadCommand.DO_REPAIR, ReadCommand.DO_REPAIR.getBytes());
IAsyncResult iar = MessagingService.getMessagingInstance().sendRR(message, endPoint);
byte[] body;
@@ -251,8 +253,8 @@
}
catch (TimeoutException e)
{
- throw new RuntimeException(e);
- // TODO retry to a different endpoint
+ throw new RuntimeException("error reading key " + command.key, e);
+ // TODO retry to a different endpoint?
}
DataInputBuffer bufIn = new DataInputBuffer();
bufIn.reset(body, body.length);
@@ -329,7 +331,7 @@
}
catch (IOException ex)
{
- throw new RuntimeException(ex);
+ throw new RuntimeException("error touching key " + key, ex);
}
finally
{
@@ -461,12 +463,14 @@
*/
endPoints[0] = dataPoint;
messages[0] = message;
+ logger.debug("strongread reading data for " + command + " from " + message.getMessageId() + "@" + dataPoint);
for (int i = 1; i < endPoints.length; i++)
{
- endPoints[i] = endpointList.get(i - 1);
+ EndPoint digestPoint = endpointList.get(i - 1);
+ endPoints[i] = digestPoint;
messages[i] = messageDigestOnly;
+ logger.debug("strongread reading digest for " + command + " from " + messageDigestOnly.getMessageId() + "@" + digestPoint);
}
- logger.debug("strongread reading " + command + " from " + StringUtils.join(endPoints, ", "));
try
{
@@ -495,7 +499,7 @@
catch (DigestMismatchException e)
{
// TODO should this be a thrift exception?
- throw new RuntimeException(e);
+ throw new RuntimeException("digest mismatch reading key " + command.key, e);
}
}
}
@@ -592,7 +596,7 @@
}
catch (TimeoutException e)
{
- throw new RuntimeException(e);
+ throw new RuntimeException("timeout reading keys " + StringUtils.join(rows.keySet(), ", "), e);
}
return rows;
}
@@ -660,7 +664,7 @@
return row;
}
- static List<String> getRange(RangeCommand command)
+ static List<String> getKeyRange(RangeCommand command)
{
long startTime = System.currentTimeMillis();
try
@@ -675,7 +679,7 @@
}
catch (Exception e)
{
- throw new RuntimeException(e);
+ throw new RuntimeException("error reading keyrange " + command, e);
}
finally
{