You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/02/08 19:24:50 UTC
svn commit: r1068504 [1/3] - in /cassandra/trunk: ./
interface/thrift/gen-java/org/apache/cassandra/thrift/
src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/dht/
src/java/org/apache/cassandra/gms/
src/java/org/apache/cassandra/io/sstable...
Author: jbellis
Date: Tue Feb 8 18:24:48 2011
New Revision: 1068504
URL: http://svn.apache.org/viewvc?rev=1068504&view=rev
Log:
merge from 0.7
Added:
cassandra/trunk/src/java/org/apache/cassandra/utils/StatusLogger.java
- copied unchanged from r1068454, cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/StatusLogger.java
Modified:
cassandra/trunk/ (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java (props changed)
cassandra/trunk/src/java/org/apache/cassandra/db/BinaryVerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsAnnounceVerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
cassandra/trunk/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/CacheWriter.java
cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java
cassandra/trunk/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java
cassandra/trunk/src/java/org/apache/cassandra/net/CacheingMessageProducer.java
cassandra/trunk/src/java/org/apache/cassandra/net/Header.java
cassandra/trunk/src/java/org/apache/cassandra/net/IVerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
cassandra/trunk/src/java/org/apache/cassandra/net/Message.java
cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/net/sink/IMessageSink.java
cassandra/trunk/src/java/org/apache/cassandra/net/sink/SinkManager.java
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
cassandra/trunk/src/java/org/apache/cassandra/service/GCInspector.java
cassandra/trunk/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java
cassandra/trunk/test/data/serialization/0.7/db.RangeSliceCommand.bin
cassandra/trunk/test/data/serialization/0.7/db.Row.bin
cassandra/trunk/test/data/serialization/0.7/db.RowMutation.bin
cassandra/trunk/test/data/serialization/0.7/db.SliceByNamesReadCommand.bin
cassandra/trunk/test/data/serialization/0.7/db.SliceFromReadCommand.bin
cassandra/trunk/test/data/serialization/0.7/db.Truncation.bin
cassandra/trunk/test/data/serialization/0.7/db.migration.Keyspace1.bin
cassandra/trunk/test/data/serialization/0.7/db.migration.Keyspace2.bin
cassandra/trunk/test/data/serialization/0.7/db.migration.Keyspace3.bin
cassandra/trunk/test/data/serialization/0.7/db.migration.Keyspace4.bin
cassandra/trunk/test/data/serialization/0.7/db.migration.Keyspace5.bin
cassandra/trunk/test/data/serialization/0.7/gms.EndpointState.bin
cassandra/trunk/test/data/serialization/0.7/service.TreeRequest.bin
cassandra/trunk/test/data/serialization/0.7/service.TreeResponse.bin
cassandra/trunk/test/data/serialization/0.7/streaming.StreamReply.bin
cassandra/trunk/test/data/serialization/0.7/streaming.StreamRequestMessage.bin
cassandra/trunk/test/data/serialization/0.7/utils.BloomFilter.bin
cassandra/trunk/test/data/serialization/0.7/utils.LegacyBloomFilter.bin
cassandra/trunk/test/unit/org/apache/cassandra/service/RemoveTest.java
Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Feb 8 18:24:48 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1068009
-/cassandra/branches/cassandra-0.7:1026516-1068028,1068497
+/cassandra/branches/cassandra-0.7:1026516-1068454,1068497
/cassandra/branches/cassandra-0.7.0:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3:774578-796573
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Feb 8 18:24:48 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1068009
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1068028,1068497
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1068454,1068497
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Feb 8 18:24:48 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1068009
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1068028,1068497
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1068454,1068497
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Feb 8 18:24:48 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1068009
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1068028,1068497
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1068454,1068497
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Feb 8 18:24:48 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1068009
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1068028,1068497
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1068454,1068497
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Feb 8 18:24:48 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1068009
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1068028,1068497
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1068454,1068497
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/BinaryVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryVerbHandler.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/BinaryVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/BinaryVerbHandler.java Tue Feb 8 18:24:48 2011
@@ -32,7 +32,7 @@ public class BinaryVerbHandler implement
{
private static Logger logger_ = LoggerFactory.getLogger(BinaryVerbHandler.class);
- public void doVerb(Message message)
+ public void doVerb(Message message, String id)
{
byte[] bytes = message.getMessageBody();
ByteArrayInputStream buffer = new ByteArrayInputStream(bytes);
@@ -45,8 +45,8 @@ public class BinaryVerbHandler implement
WriteResponse response = new WriteResponse(rm.getTable(), rm.key(), true);
Message responseMessage = WriteResponse.makeWriteResponseMessage(message, response);
if (logger_.isDebugEnabled())
- logger_.debug("binary " + rm + " applied. Sending response to " + message.getMessageId() + "@" + message.getFrom());
- MessagingService.instance().sendOneWay(responseMessage, message.getFrom());
+ logger_.debug("binary " + rm + " applied. Sending response to " + id + "@" + message.getFrom());
+ MessagingService.instance().sendReply(responseMessage, id, message.getFrom());
}
catch (Exception e)
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java Tue Feb 8 18:24:48 2011
@@ -41,7 +41,7 @@ public class CounterMutationVerbHandler
{
private static Logger logger = LoggerFactory.getLogger(CounterMutationVerbHandler.class);
- public void doVerb(Message message)
+ public void doVerb(Message message, String id)
{
byte[] bytes = message.getMessageBody();
ByteArrayInputStream buffer = new ByteArrayInputStream(bytes);
@@ -56,7 +56,7 @@ public class CounterMutationVerbHandler
StorageProxy.applyCounterMutationOnLeader(cm);
WriteResponse response = new WriteResponse(cm.getTable(), cm.key(), true);
Message responseMessage = WriteResponse.makeWriteResponseMessage(message, response);
- MessagingService.instance().sendOneWay(responseMessage, message.getFrom());
+ MessagingService.instance().sendReply(responseMessage, id, message.getFrom());
}
catch (UnavailableException e)
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsAnnounceVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsAnnounceVerbHandler.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsAnnounceVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsAnnounceVerbHandler.java Tue Feb 8 18:24:48 2011
@@ -28,7 +28,7 @@ public class DefinitionsAnnounceVerbHand
{
/** someone is announcing their schema version. */
- public void doVerb(Message message)
+ public void doVerb(Message message, String id)
{
UUID theirVersion = UUID.fromString(new String(message.getMessageBody()));
MigrationManager.rectify(theirVersion, message.getFrom());
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java Tue Feb 8 18:24:48 2011
@@ -42,7 +42,7 @@ public class DefinitionsUpdateResponseVe
private static final Logger logger = LoggerFactory.getLogger(DefinitionsUpdateResponseVerbHandler.class);
/** someone sent me their data definitions */
- public void doVerb(final Message message)
+ public void doVerb(final Message message, String id)
{
try
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java Tue Feb 8 18:24:48 2011
@@ -260,23 +260,29 @@ public class HintedHandOffManager implem
private void deliverHintsToEndpoint(InetAddress endpoint) throws IOException, DigestMismatchException, InvalidRequestException, TimeoutException, InterruptedException
{
- logger_.info("Checking remote schema before delivering hints");
- int waited = waitForSchemaAgreement(endpoint);
- // sleep a random amount to stagger handoff delivery from different replicas.
- // (if we had to wait, then gossiper randomness took care of that for us already.)
- if (waited == 0) {
- int sleep = new Random().nextInt(60000);
- logger_.info("Sleeping {}ms to stagger hint delivery", sleep);
- Thread.sleep(sleep);
+ try
+ {
+ logger_.info("Checking remote schema before delivering hints");
+ int waited = waitForSchemaAgreement(endpoint);
+ // sleep a random amount to stagger handoff delivery from different replicas.
+ // (if we had to wait, then gossiper randomness took care of that for us already.)
+ if (waited == 0) {
+ int sleep = new Random().nextInt(60000);
+ logger_.info("Sleeping {}ms to stagger hint delivery", sleep);
+ Thread.sleep(sleep);
+ }
+ if (!Gossiper.instance.getEndpointStateForEndpoint(endpoint).isAlive())
+ {
+ logger_.info("Endpoint {} died before hint delivery, aborting", endpoint);
+ return;
+ }
}
- if (!Gossiper.instance.getEndpointStateForEndpoint(endpoint).isAlive())
+ finally
{
- logger_.info("Endpoint {} died before hint delivery, aborting", endpoint);
- return;
+ queuedDeliveries.remove(endpoint);
}
- logger_.info("Started hinted handoff for endpoint " + endpoint);
- queuedDeliveries.remove(endpoint);
+ logger_.info("Started hinted handoff for endpoint " + endpoint);
// 1. Get the key of the endpoint we need to handoff
// 2. For each column read the list of rows: subcolumns are KS + SEPARATOR + CF
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java Tue Feb 8 18:24:48 2011
@@ -33,7 +33,7 @@ public class ReadRepairVerbHandler imple
{
private static Logger logger_ = LoggerFactory.getLogger(ReadRepairVerbHandler.class);
- public void doVerb(Message message)
+ public void doVerb(Message message, String id)
{
byte[] body = message.getMessageBody();
ByteArrayInputStream buffer = new ByteArrayInputStream(body);
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java Tue Feb 8 18:24:48 2011
@@ -45,7 +45,7 @@ public class ReadVerbHandler implements
/* We use this so that we can reuse readcontext objects */
private static ThreadLocal<ReadVerbHandler.ReadContext> tls_ = new InheritableThreadLocal<ReadVerbHandler.ReadContext>();
- public void doVerb(Message message)
+ public void doVerb(Message message, String id)
{
byte[] body = message.getMessageBody();
/* Obtain a Read Context from TLS */
@@ -79,8 +79,8 @@ public class ReadVerbHandler implements
Message response = message.getReply(FBUtilities.getLocalAddress(), bytes, message.getVersion());
if (logger_.isDebugEnabled())
logger_.debug(String.format("Read key %s; sending response to %s@%s",
- ByteBufferUtil.bytesToHex(command.key), message.getMessageId(), message.getFrom()));
- MessagingService.instance().sendOneWay(response, message.getFrom());
+ ByteBufferUtil.bytesToHex(command.key), id, message.getFrom()));
+ MessagingService.instance().sendReply(response, id, message.getFrom());
}
catch (IOException ex)
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java Tue Feb 8 18:24:48 2011
@@ -42,7 +42,7 @@ public class RowMutationVerbHandler impl
{
private static Logger logger_ = LoggerFactory.getLogger(RowMutationVerbHandler.class);
- public void doVerb(Message message)
+ public void doVerb(Message message, String id)
{
try
{
@@ -77,8 +77,8 @@ public class RowMutationVerbHandler impl
WriteResponse response = new WriteResponse(rm.getTable(), rm.key(), true);
Message responseMessage = WriteResponse.makeWriteResponseMessage(message, response);
if (logger_.isDebugEnabled())
- logger_.debug(rm + " applied. Sending response to " + message.getMessageId() + "@" + message.getFrom());
- MessagingService.instance().sendOneWay(responseMessage, message.getFrom());
+ logger_.debug(rm + " applied. Sending response to " + id + "@" + message.getFrom());
+ MessagingService.instance().sendReply(responseMessage, id, message.getFrom());
}
catch (IOException e)
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java Tue Feb 8 18:24:48 2011
@@ -31,10 +31,10 @@ public class SchemaCheckVerbHandler impl
{
private final Logger logger = LoggerFactory.getLogger(SchemaCheckVerbHandler.class);
- public void doVerb(Message message)
+ public void doVerb(Message message, String id)
{
logger.debug("Received schema check request.");
Message response = message.getInternalReply(DatabaseDescriptor.getDefsVersion().toString().getBytes(), message.getVersion());
- MessagingService.instance().sendOneWay(response, message.getFrom());
+ MessagingService.instance().sendReply(response, id, message.getFrom());
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/TruncateVerbHandler.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/TruncateVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/TruncateVerbHandler.java Tue Feb 8 18:24:48 2011
@@ -40,7 +40,7 @@ public class TruncateVerbHandler impleme
{
private static Logger logger = LoggerFactory.getLogger(TruncateVerbHandler.class);
- public void doVerb(Message message)
+ public void doVerb(Message message, String id)
{
byte[] bytes = message.getMessageBody();
ByteArrayInputStream buffer = new ByteArrayInputStream(bytes);
@@ -76,9 +76,8 @@ public class TruncateVerbHandler impleme
TruncateResponse response = new TruncateResponse(t.keyspace, t.columnFamily, true);
Message responseMessage = TruncateResponse.makeTruncateResponseMessage(message, response);
- logger.debug("{} applied. Sending response to {}@{} ",
- new Object[]{t, message.getMessageId(), message.getFrom()});
- MessagingService.instance().sendOneWay(responseMessage, message.getFrom());
+ logger.debug("{} applied. Sending response to {}@{} ", new Object[]{ t, id, message.getFrom()});
+ MessagingService.instance().sendReply(responseMessage, id, message.getFrom());
}
catch (IOException e)
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java Tue Feb 8 18:24:48 2011
@@ -255,12 +255,12 @@ public class BootStrapper
public static class BootstrapTokenVerbHandler implements IVerbHandler
{
- public void doVerb(Message message)
+ public void doVerb(Message message, String id)
{
StorageService ss = StorageService.instance;
String tokenString = StorageService.getPartitioner().getTokenFactory().toString(ss.getBootstrapToken());
Message response = message.getInternalReply(tokenString.getBytes(Charsets.UTF_8), message.getVersion());
- MessagingService.instance().sendOneWay(response, message.getFrom());
+ MessagingService.instance().sendReply(response, id, message.getFrom());
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java Tue Feb 8 18:24:48 2011
@@ -37,7 +37,7 @@ public class GossipDigestAck2VerbHandler
{
private static Logger logger_ = LoggerFactory.getLogger(GossipDigestAck2VerbHandler.class);
- public void doVerb(Message message)
+ public void doVerb(Message message, String id)
{
InetAddress from = message.getFrom();
if (logger_.isTraceEnabled())
Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java Tue Feb 8 18:24:48 2011
@@ -40,7 +40,7 @@ public class GossipDigestAckVerbHandler
{
private static Logger logger_ = LoggerFactory.getLogger(GossipDigestAckVerbHandler.class);
- public void doVerb(Message message)
+ public void doVerb(Message message, String id)
{
InetAddress from = message.getFrom();
if (logger_.isTraceEnabled())
Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java Tue Feb 8 18:24:48 2011
@@ -39,7 +39,7 @@ public class GossipDigestSynVerbHandler
{
private static Logger logger_ = LoggerFactory.getLogger( GossipDigestSynVerbHandler.class);
- public void doVerb(Message message)
+ public void doVerb(Message message, String id)
{
InetAddress from = message.getFrom();
if (logger_.isTraceEnabled())
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/CacheWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/CacheWriter.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/CacheWriter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/CacheWriter.java Tue Feb 8 18:24:48 2011
@@ -45,7 +45,7 @@ public class CacheWriter<K, V> implement
logger.debug("Saving {}", path);
File tmpFile = File.createTempFile(path.getName(), null, path.getParentFile());
- BufferedRandomAccessFile out = new BufferedRandomAccessFile(tmpFile, "w", BufferedRandomAccessFile.DEFAULT_BUFFER_SIZE, true);
+ BufferedRandomAccessFile out = new BufferedRandomAccessFile(tmpFile, "rw", BufferedRandomAccessFile.DEFAULT_BUFFER_SIZE, true);
try
{
for (K key : keys)
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java Tue Feb 8 18:24:48 2011
@@ -63,7 +63,7 @@ public class BufferedRandomAccessFile ex
// or in directIO() method to the DEFAULT_DIRECT_BUFFER_SIZE
private long maxBufferSize;
- // constant, used for caching purpose, -1 if file is open in "w" mode
+ // constant, used for caching purpose, -1 if file is open in "rw" mode
// otherwise this will hold cached file length
private final long fileLength;
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java Tue Feb 8 18:24:48 2011
@@ -22,6 +22,7 @@ package org.apache.cassandra.io.util;
import java.io.DataInput;
+import java.io.IOError;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
@@ -242,7 +243,7 @@ class ColumnIterator implements Iterator
}
catch (IOException e)
{
- return null;
+ throw new IOError(e);
}
}
@@ -263,7 +264,7 @@ class ColumnIterator implements Iterator
{
public IColumn setValue(IColumn value)
{
- return null;
+ throw new UnsupportedOperationException();
}
public IColumn getValue()
Modified: cassandra/trunk/src/java/org/apache/cassandra/net/CacheingMessageProducer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/CacheingMessageProducer.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/CacheingMessageProducer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/CacheingMessageProducer.java Tue Feb 8 18:24:48 2011
@@ -8,8 +8,7 @@ public class CacheingMessageProducer imp
{
private final MessageProducer prod;
private final Map<Integer, Message> messages = new HashMap<Integer, Message>();
- private String messageId = null;
-
+
public CacheingMessageProducer(MessageProducer prod)
{
this.prod = prod;
@@ -21,10 +20,6 @@ public class CacheingMessageProducer imp
if (msg == null)
{
msg = prod.getMessage(version);
- if (messageId == null)
- messageId = msg.getMessageId();
- // it is important that both messages have the same id for callback processing.
- msg.setId(messageId);
messages.put(version, msg);
}
return msg;
Modified: cassandra/trunk/src/java/org/apache/cassandra/net/Header.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/Header.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/Header.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/Header.java Tue Feb 8 18:24:48 2011
@@ -33,8 +33,7 @@ import org.apache.cassandra.service.Stor
public class Header
{
private static ICompactSerializer<Header> serializer_;
- private static AtomicInteger idGen_ = new AtomicInteger(0);
-
+
static
{
serializer_ = new HeaderSerializer();
@@ -48,31 +47,23 @@ public class Header
private final InetAddress from_;
// TODO STAGE can be determined from verb
private final StorageService.Verb verb_;
- private String messageId_;
protected Map<String, byte[]> details_ = new Hashtable<String, byte[]>();
- Header(String id, InetAddress from, StorageService.Verb verb)
+ Header(InetAddress from, StorageService.Verb verb)
{
- assert id != null;
assert from != null;
assert verb != null;
- messageId_ = id;
from_ = from;
verb_ = verb;
}
-
- Header(String id, InetAddress from, StorageService.Verb verb, Map<String, byte[]> details)
+
+ Header(InetAddress from, StorageService.Verb verb, Map<String, byte[]> details)
{
- this(id, from, verb);
+ this(from, verb);
details_ = details;
}
- Header(InetAddress from, StorageService.Verb verb)
- {
- this(Integer.toString(idGen_.incrementAndGet()), from, verb);
- }
-
InetAddress getFrom()
{
return from_;
@@ -83,16 +74,6 @@ public class Header
return verb_;
}
- void setMessageId(String id)
- {
- messageId_ = id;
- }
-
- String getMessageId()
- {
- return messageId_;
- }
-
byte[] getDetail(String key)
{
return details_.get(key);
@@ -113,7 +94,6 @@ class HeaderSerializer implements ICompa
{
public void serialize(Header t, DataOutputStream dos, int version) throws IOException
{
- dos.writeUTF(t.getMessageId());
CompactEndpointSerializationHelper.serialize(t.getFrom(), dos);
dos.writeInt(t.getVerb().ordinal());
@@ -133,7 +113,6 @@ class HeaderSerializer implements ICompa
public Header deserialize(DataInputStream dis, int version) throws IOException
{
- String id = dis.readUTF();
InetAddress from = CompactEndpointSerializationHelper.deserialize(dis);
int verbOrdinal = dis.readInt();
@@ -149,7 +128,7 @@ class HeaderSerializer implements ICompa
details.put(key, bytes);
}
- return new Header(id, from, StorageService.VERBS[verbOrdinal], details);
+ return new Header(from, StorageService.VERBS[verbOrdinal], details);
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/net/IVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/IVerbHandler.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/IVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/IVerbHandler.java Tue Feb 8 18:24:48 2011
@@ -31,8 +31,9 @@ public interface IVerbHandler
* class was registered by a call to MessagingService.registerVerbHandlers).
* Note that the caller should not be holding any locks when calling this method
* because the implementation may be synchronized.
- *
- * @param message - incoming message that needs handling.
+ *
+ * @param message - incoming message that needs handling.
+ * @param id
*/
- public void doVerb(Message message);
+ public void doVerb(Message message, String id);
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java Tue Feb 8 18:24:48 2011
@@ -104,8 +104,10 @@ public class IncomingTcpConnection exten
else
{
// todo: need to be aware of message version.
- Message message = Message.serializer().deserialize(new DataInputStream(new ByteArrayInputStream(contentBytes)), version);
- MessagingService.instance().receive(message);
+ DataInputStream dis = new DataInputStream(new ByteArrayInputStream(contentBytes));
+ String id = dis.readUTF();
+ Message message = Message.serializer().deserialize(dis, version);
+ MessagingService.instance().receive(message, id);
}
}
// prepare to read the next message
Modified: cassandra/trunk/src/java/org/apache/cassandra/net/Message.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/Message.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/Message.java Tue Feb 8 18:24:48 2011
@@ -60,12 +60,7 @@ public class Message
{
this(new Header(from, verb), body, version);
}
-
- public void setId(String messageId)
- {
- header_.setMessageId(messageId);
- }
-
+
public byte[] getHeader(String key)
{
return header_.getDetail(key);
@@ -106,21 +101,17 @@ public class Message
return header_.getVerb();
}
- public String getMessageId()
- {
- return header_.getMessageId();
- }
-
// TODO should take byte[] + length so we don't have to copy to a byte[] of exactly the right len
+ // TODO make static
public Message getReply(InetAddress from, byte[] body, int version)
{
- Header header = new Header(getMessageId(), from, StorageService.Verb.REQUEST_RESPONSE);
+ Header header = new Header(from, StorageService.Verb.REQUEST_RESPONSE);
return new Message(header, body, version);
}
public Message getInternalReply(byte[] body, int version)
{
- Header header = new Header(getMessageId(), FBUtilities.getLocalAddress(), StorageService.Verb.INTERNAL_RESPONSE);
+ Header header = new Header(FBUtilities.getLocalAddress(), StorageService.Verb.INTERNAL_RESPONSE);
return new Message(header, body, version);
}
@@ -128,9 +119,7 @@ public class Message
{
StringBuilder sbuf = new StringBuilder("");
String separator = System.getProperty("line.separator");
- sbuf.append("ID:" + getMessageId())
- .append(separator)
- .append("FROM:" + getFrom())
+ sbuf.append("FROM:" + getFrom())
.append(separator)
.append("TYPE:" + getMessageType())
.append(separator)
Modified: cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java Tue Feb 8 18:24:48 2011
@@ -28,18 +28,20 @@ public class MessageDeliveryTask impleme
{
private static final Logger logger_ = LoggerFactory.getLogger(MessageDeliveryTask.class);
- private Message message_;
- private final long constructionTime_ = System.currentTimeMillis();
+ private Message message;
+ private final long constructionTime = System.currentTimeMillis();
+ private final String id;
- public MessageDeliveryTask(Message message)
+ public MessageDeliveryTask(Message message, String id)
{
assert message != null;
- message_ = message;
+ this.message = message;
+ this.id = id;
}
public void run()
{
- StorageService.Verb verb = message_.getVerb();
+ StorageService.Verb verb = message.getVerb();
switch (verb)
{
case BINARY:
@@ -48,7 +50,7 @@ public class MessageDeliveryTask impleme
case RANGE_SLICE:
case READ_REPAIR:
case REQUEST_RESPONSE:
- if (System.currentTimeMillis() > constructionTime_ + DatabaseDescriptor.getRpcTimeout())
+ if (System.currentTimeMillis() > constructionTime + DatabaseDescriptor.getRpcTimeout())
{
MessagingService.instance().incrementDroppedMessages(verb);
return;
@@ -67,6 +69,6 @@ public class MessageDeliveryTask impleme
IVerbHandler verbHandler = MessagingService.instance().getVerbHandler(verb);
assert verbHandler != null : "unknown verb " + verb;
- verbHandler.doVerb(message_);
+ verbHandler.doVerb(message, id);
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Tue Feb 8 18:24:48 2011
@@ -34,9 +34,6 @@ import javax.management.MBeanServer;
import javax.management.ObjectName;
import com.google.common.base.Function;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Multimap;
-import org.apache.cassandra.gms.Gossiper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,20 +42,18 @@ import org.apache.cassandra.concurrent.S
import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.EncryptionOptions;
+import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.locator.ILatencySubscriber;
import org.apache.cassandra.net.io.SerializerType;
import org.apache.cassandra.net.sink.SinkManager;
import org.apache.cassandra.security.SSLFactory;
import org.apache.cassandra.security.streaming.SSLFileStreamTask;
-import org.apache.cassandra.service.GCInspector;
import org.apache.cassandra.service.ReadCallback;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.streaming.FileStreamTask;
import org.apache.cassandra.streaming.StreamHeader;
-import org.apache.cassandra.utils.ExpiringMap;
-import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.SimpleCondition;
+import org.apache.cassandra.utils.*;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
public final class MessagingService implements MessagingServiceMBean
@@ -281,6 +276,13 @@ public final class MessagingService impl
assert previous == null;
}
+ private static AtomicInteger idGen = new AtomicInteger(0);
+ // TODO make these integers to avoid unnecessary int -> string -> int conversions
+ private static String nextId()
+ {
+ return Integer.toString(idGen.incrementAndGet());
+ }
+
/**
* Send a message to a given endpoint. This method specifies a callback
* which is invoked with the actual response.
@@ -291,12 +293,22 @@ public final class MessagingService impl
* suggest that a timeout occurred to the invoker of the send().
* @return an reference to message id used to match with the result
*/
- public String sendRR(Message message, InetAddress to, IAsyncCallback cb)
+ public String sendRR(Message message, InetAddress to, IMessageCallback cb)
+ {
+ String id = nextId();
+ addCallback(cb, id, to);
+ sendOneWay(message, id, to);
+ return id;
+ }
+
+ public void sendOneWay(Message message, InetAddress to)
{
- String messageId = message.getMessageId();
- addCallback(cb, messageId, to);
- sendOneWay(message, to);
- return messageId;
+ sendOneWay(message, nextId(), to);
+ }
+
+ public void sendReply(Message message, String id, InetAddress to)
+ {
+ sendOneWay(message, id, to);
}
/**
@@ -325,17 +337,20 @@ public final class MessagingService impl
* @param message messages to be sent.
* @param to endpoint to which the message needs to be sent
*/
- public void sendOneWay(Message message, InetAddress to)
+ private void sendOneWay(Message message, String id, InetAddress to)
{
+ if (logger_.isDebugEnabled())
+ logger_.debug(FBUtilities.getLocalAddress() + " sending " + message.getVerb() + " to " + id + "@" + to);
+
// do local deliveries
if ( message.getFrom().equals(to) )
{
- receive(message);
+ receive(message, id);
return;
}
// message sinks are a testing hook
- Message processedMessage = SinkManager.processClientMessage(message, to);
+ Message processedMessage = SinkManager.processClientMessage(message, id, to);
if (processedMessage == null)
{
return;
@@ -349,6 +364,7 @@ public final class MessagingService impl
try
{
DataOutputBuffer buffer = new DataOutputBuffer();
+ buffer.writeUTF(id);
Message.serializer().serialize(message, buffer, message.getVersion());
data = buffer.getData();
}
@@ -366,8 +382,7 @@ public final class MessagingService impl
public IAsyncResult sendRR(Message message, InetAddress to)
{
IAsyncResult iar = new AsyncResult();
- addCallback(iar, message.getMessageId(), to);
- sendOneWay(message, to);
+ sendRR(message, to, iar);
return iar;
}
@@ -418,13 +433,13 @@ public final class MessagingService impl
logger_.info("Shutdown complete (no further commands will be processed)");
}
- public void receive(Message message)
+ public void receive(Message message, String id)
{
- message = SinkManager.processServerMessage(message);
+ message = SinkManager.processServerMessage(message, id);
if (message == null)
return;
- Runnable runnable = new MessageDeliveryTask(message);
+ Runnable runnable = new MessageDeliveryTask(message, id);
ExecutorService stage = StageManager.getStage(message.getMessageType());
assert stage != null : "No stage for message type " + message.getMessageType();
stage.execute(runnable);
@@ -553,7 +568,7 @@ public final class MessagingService impl
}
if (logTpstats)
- GCInspector.instance.logStats();
+ StatusLogger.log();
}
private static class SocketThread extends Thread
Modified: cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java Tue Feb 8 18:24:48 2011
@@ -19,8 +19,6 @@
package org.apache.cassandra.net;
import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,14 +29,13 @@ public class ResponseVerbHandler impleme
{
private static final Logger logger_ = LoggerFactory.getLogger( ResponseVerbHandler.class );
- public void doVerb(Message message)
+ public void doVerb(Message message, String id)
{
- String messageId = message.getMessageId();
- double age = System.currentTimeMillis() - MessagingService.instance().getRegisteredCallbackAge(messageId);
- Pair<InetAddress, IMessageCallback> pair = MessagingService.instance().removeRegisteredCallback(messageId);
+ double age = System.currentTimeMillis() - MessagingService.instance().getRegisteredCallbackAge(id);
+ Pair<InetAddress, IMessageCallback> pair = MessagingService.instance().removeRegisteredCallback(id);
if (pair == null)
{
- logger_.debug("Callback already removed for {}", messageId);
+ logger_.debug("Callback already removed for {}", id);
return;
}
@@ -48,13 +45,13 @@ public class ResponseVerbHandler impleme
if (cb instanceof IAsyncCallback)
{
if (logger_.isDebugEnabled())
- logger_.debug("Processing response on a callback from " + message.getMessageId() + "@" + message.getFrom());
+ logger_.debug("Processing response on a callback from " + id + "@" + message.getFrom());
((IAsyncCallback) cb).response(message);
}
else
{
if (logger_.isDebugEnabled())
- logger_.debug("Processing response on an async result from " + message.getMessageId() + "@" + message.getFrom());
+ logger_.debug("Processing response on an async result from " + id + "@" + message.getFrom());
((IAsyncResult) cb).result(message);
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/net/sink/IMessageSink.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/sink/IMessageSink.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/sink/IMessageSink.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/sink/IMessageSink.java Tue Feb 8 18:24:48 2011
@@ -24,5 +24,5 @@ import org.apache.cassandra.net.Message;
public interface IMessageSink
{
- public Message handleMessage(Message message, InetAddress to);
+ public Message handleMessage(Message message, String id, InetAddress to);
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/net/sink/SinkManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/sink/SinkManager.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/sink/SinkManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/sink/SinkManager.java Tue Feb 8 18:24:48 2011
@@ -38,28 +38,28 @@ public class SinkManager
sinks.clear();
}
- public static Message processClientMessage(Message message, InetAddress to)
+ public static Message processClientMessage(Message message, String id, InetAddress to)
{
if (sinks.isEmpty())
return message;
for (IMessageSink ms : sinks)
{
- message = ms.handleMessage(message, to);
+ message = ms.handleMessage(message, id, to);
if (message == null)
return null;
}
return message;
}
- public static Message processServerMessage(Message message)
+ public static Message processServerMessage(Message message, String id)
{
if (sinks.isEmpty())
return message;
for (IMessageSink ms : sinks)
{
- message = ms.handleMessage(message, null);
+ message = ms.handleMessage(message, id, null);
if (message == null)
return null;
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java Tue Feb 8 18:24:48 2011
@@ -23,7 +23,6 @@ import java.net.InetAddress;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.*;
-import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
@@ -580,7 +579,7 @@ public class AntiEntropyService
/**
* Trigger a validation compaction which will return the tree upon completion.
*/
- public void doVerb(Message message)
+ public void doVerb(Message message, String id)
{
byte[] bytes = message.getMessageBody();
@@ -650,7 +649,7 @@ public class AntiEntropyService
}
}
- public void doVerb(Message message)
+ public void doVerb(Message message, String id)
{
byte[] bytes = message.getMessageBody();
DataInputStream buffer = new DataInputStream(new ByteArrayInputStream(bytes));
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/GCInspector.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/GCInspector.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/GCInspector.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/GCInspector.java Tue Feb 8 18:24:48 2011
@@ -26,19 +26,13 @@ import java.lang.reflect.InvocationTarge
import java.lang.reflect.Method;
import java.util.*;
import java.util.concurrent.TimeUnit;
-import javax.management.JMX;
import javax.management.MBeanServer;
-import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
-import com.google.common.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.concurrent.IExecutorMBean;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.CompactionManager;
-import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.StatusLogger;
public class GCInspector
{
@@ -50,7 +44,6 @@ public class GCInspector
public static final GCInspector instance = new GCInspector();
private HashMap<String, Long> gctimes = new HashMap<String, Long>();
- private final MBeanServer server = ManagementFactory.getPlatformMBeanServer();
List<Object> beans = new ArrayList<Object>(); // these are instances of com.sun.management.GarbageCollectorMXBean
@@ -133,63 +126,12 @@ public class GCInspector
logger.info(st);
else if (logger.isDebugEnabled())
logger.debug(st);
+
if (gcw.getDuration() > MIN_DURATION_TPSTATS)
- {
- logStats();
- }
+ StatusLogger.log();
}
}
- public void logStats()
- {
- // everything from o.a.c.concurrent
- logger.info(String.format("%-25s%10s%10s", "Pool Name", "Active", "Pending"));
- Set<ObjectName> request, internal;
- try
- {
- request = server.queryNames(new ObjectName("org.apache.cassandra.request:type=*"), null);
- internal = server.queryNames(new ObjectName("org.apache.cassandra.internal:type=*"), null);
- }
- catch (MalformedObjectNameException e)
- {
- throw new RuntimeException(e);
- }
- for (ObjectName objectName : Iterables.concat(request, internal))
- {
- String poolName = objectName.getKeyProperty("type");
- IExecutorMBean threadPoolProxy = JMX.newMBeanProxy(server, objectName, IExecutorMBean.class);
- logger.info(String.format("%-25s%10s%10s",
- poolName, threadPoolProxy.getActiveCount(), threadPoolProxy.getPendingTasks()));
- }
- // one offs
- logger.info(String.format("%-25s%10s%10s",
- "CompactionManager", "n/a", CompactionManager.instance.getPendingTasks()));
- int pendingCommands = 0;
- for (int n : MessagingService.instance().getCommandPendingTasks().values())
- {
- pendingCommands += n;
- }
- int pendingResponses = 0;
- for (int n : MessagingService.instance().getResponsePendingTasks().values())
- {
- pendingResponses += n;
- }
- logger.info(String.format("%-25s%10s%10s",
- "MessagingService", "n/a", pendingCommands + "," + pendingResponses));
-
- // per-CF stats
- logger.info(String.format("%-25s%20s%20s%20s", "ColumnFamily", "Memtable ops,data", "Row cache size/cap", "Key cache size/cap"));
- for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
- {
- logger.info(String.format("%-25s%20s%20s%20s",
- cfs.table.name + "." + cfs.columnFamily,
- cfs.getMemtableColumnsCount() + "," + cfs.getMemtableDataSize(),
- cfs.getRowCacheSize() + "/" + cfs.getRowCacheCapacity(),
- cfs.getKeyCacheSize() + "/" + cfs.getKeyCacheCapacity()));
- }
- }
-
-
// wrapper for sun class. this enables other jdks to compile this class.
private static final class SunGcWrapper
{
@@ -267,5 +209,4 @@ public class GCInspector
return usageBeforeGc == null;
}
}
-
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java Tue Feb 8 18:24:48 2011
@@ -33,7 +33,7 @@ public class IndexScanVerbHandler implem
{
private static final Logger logger = LoggerFactory.getLogger(IndexScanVerbHandler.class);
- public void doVerb(Message message)
+ public void doVerb(Message message, String id)
{
try
{
@@ -43,8 +43,8 @@ public class IndexScanVerbHandler implem
RangeSliceReply reply = new RangeSliceReply(rows);
Message response = reply.getReply(message);
if (logger.isDebugEnabled())
- logger.debug("Sending " + reply+ " to " + message.getMessageId() + "@" + message.getFrom());
- MessagingService.instance().sendOneWay(response, message.getFrom());
+ logger.debug("Sending " + reply+ " to " + id + "@" + message.getFrom());
+ MessagingService.instance().sendReply(response, id, message.getFrom());
}
catch (Exception ex)
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java Tue Feb 8 18:24:48 2011
@@ -35,7 +35,7 @@ public class RangeSliceVerbHandler imple
private static final Logger logger = LoggerFactory.getLogger(RangeSliceVerbHandler.class);
- public void doVerb(Message message)
+ public void doVerb(Message message, String id)
{
try
{
@@ -52,8 +52,8 @@ public class RangeSliceVerbHandler imple
QueryFilter.getFilter(command.predicate, cfs.getComparator())));
Message response = reply.getReply(message);
if (logger.isDebugEnabled())
- logger.debug("Sending " + reply+ " to " + message.getMessageId() + "@" + message.getFrom());
- MessagingService.instance().sendOneWay(response, message.getFrom());
+ logger.debug("Sending " + reply+ " to " + id + "@" + message.getFrom());
+ MessagingService.instance().sendReply(response, id, message.getFrom());
}
catch (Exception ex)
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java Tue Feb 8 18:24:48 2011
@@ -144,21 +144,6 @@ public class StorageLoadBalancer impleme
*/
}
- class MoveMessageVerbHandler implements IVerbHandler
- {
- public void doVerb(Message message)
- {
- Message reply = message.getInternalReply(new byte[] {(byte)(isMoveable_.get() ? 1 : 0)}, message.getVersion());
- MessagingService.instance().sendOneWay(reply, message.getFrom());
- if ( isMoveable_.get() )
- {
- // MoveMessage moveMessage = (MoveMessage)message.getMessageBody()[0];
- /* Start the leave operation and join the ring at the position specified */
- isMoveable_.set(false);
- }
- }
- }
-
private static final int BROADCAST_INTERVAL = 60 * 1000;
public static final StorageLoadBalancer instance = new StorageLoadBalancer();
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Tue Feb 8 18:24:48 2011
@@ -219,15 +219,15 @@ public class StorageProxy implements Sto
// unhinted writes
if (destination.equals(FBUtilities.getLocalAddress()))
{
- if (insertLocalMessages)
- insertLocal(rm, responseHandler);
+ insertLocal(rm, responseHandler);
}
else
{
// belongs on a different server
+ // TODO re-use Message objects
Message unhintedMessage = rm.getMessage(Gossiper.instance.getVersion(destination));
if (logger.isDebugEnabled())
- logger.debug("insert writing key " + ByteBufferUtil.bytesToHex(rm.key()) + " to " + unhintedMessage.getMessageId() + "@" + destination);
+ logger.debug("insert writing key " + ByteBufferUtil.bytesToHex(rm.key()) + " to " + destination);
Multimap<Message, InetAddress> messages = dcMessages.get(dc);
if (messages == null)
@@ -278,6 +278,10 @@ public class StorageProxy implements Sto
for (Map.Entry<Message, Collection<InetAddress>> messages: entry.getValue().asMap().entrySet())
{
Message message = messages.getKey();
+ // a single message object is used for unhinted writes, so clean out any forwards
+ // from previous loop iterations
+ // TODO this is currently a no-op until re-use Message object TODOs are fixed
+ message.removeHeader(RowMutation.FORWARD_HEADER);
if (dataCenter.equals(localDataCenter))
{
@@ -390,7 +394,7 @@ public class StorageProxy implements Sto
Message message = cm.makeMutationMessage(Gossiper.instance.getVersion(endpoint));
if (logger.isDebugEnabled())
- logger.debug("forwarding counter update of key " + ByteBufferUtil.bytesToHex(cm.key()) + " to " + message.getMessageId() + "@" + endpoint);
+ logger.debug("forwarding counter update of key " + ByteBufferUtil.bytesToHex(cm.key()) + " to " + endpoint);
MessagingService.instance().sendRR(message, endpoint, responseHandler);
}
}
@@ -548,7 +552,7 @@ public class StorageProxy implements Sto
{
Message message = command.getMessage(Gossiper.instance.getVersion(dataPoint));
if (logger.isDebugEnabled())
- logger.debug("reading data for " + command + " from " + message.getMessageId() + "@" + dataPoint);
+ logger.debug("reading data for " + command + " from " + dataPoint);
MessagingService.instance().sendRR(message, dataPoint, handler);
}
@@ -564,9 +568,10 @@ public class StorageProxy implements Sto
}
else
{
+ // TODO re-use Message objects
Message digestMessage = digestCommand.getMessage(Gossiper.instance.getVersion(digestPoint));
if (logger.isDebugEnabled())
- logger.debug("reading digest for " + command + " from " + digestMessage.getMessageId() + "@" + digestPoint);
+ logger.debug("reading digest for " + command + " from " + digestPoint);
MessagingService.instance().sendRR(digestMessage, digestPoint, handler);
}
}
@@ -666,10 +671,9 @@ public class StorageProxy implements Sto
{
ReadResponseResolver resolver = new ReadResponseResolver(command.table, command.key);
RepairCallback<Row> handler = new RepairCallback<Row>(resolver, endpoints);
+ // TODO should re-use Message objects
for (InetAddress endpoint : endpoints)
- {
MessagingService.instance().sendRR(command, endpoint, handler);
- }
return handler;
}
@@ -725,10 +729,11 @@ public class StorageProxy implements Sto
// TODO bail early if live endpoints can't satisfy requested consistency level
for (InetAddress endpoint : liveEndpoints)
{
+ // TODO re-use Message objects
Message message = c2.getMessage(Gossiper.instance.getVersion(endpoint));
MessagingService.instance().sendRR(message, endpoint, handler);
if (logger.isDebugEnabled())
- logger.debug("reading " + c2 + " from " + message.getMessageId() + "@" + endpoint);
+ logger.debug("reading " + c2 + " from " + endpoint);
}
// TODO read repair on remaining replicas?
@@ -1010,10 +1015,11 @@ public class StorageProxy implements Sto
IndexScanCommand command = new IndexScanCommand(keyspace, column_family, index_clause, column_predicate, range);
for (InetAddress endpoint : liveEndpoints)
{
+ // TODO re-use Message objects
Message message = command.getMessage(Gossiper.instance.getVersion(endpoint));
MessagingService.instance().sendRR(message, endpoint, handler);
if (logger.isDebugEnabled())
- logger.debug("reading " + command + " from " + message.getMessageId() + "@" + endpoint);
+ logger.debug("reading " + command + " from " + endpoint);
}
List<Row> theseRows;
@@ -1098,6 +1104,7 @@ public class StorageProxy implements Sto
Truncation truncation = new Truncation(keyspace, cfname);
for (InetAddress endpoint : allEndpoints)
{
+ // TODO re-use Message objects
Message message = truncation.getMessage(Gossiper.instance.getVersion(endpoint));
MessagingService.instance().sendRR(message, endpoint, responseHandler);
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java Tue Feb 8 18:24:48 2011
@@ -33,12 +33,12 @@ public class ReplicationFinishedVerbHand
{
private static Logger logger = LoggerFactory.getLogger(ReplicationFinishedVerbHandler.class);
- public void doVerb(Message msg)
+ public void doVerb(Message msg, String id)
{
StorageService.instance.confirmReplication(msg.getFrom());
Message response = msg.getInternalReply(ArrayUtils.EMPTY_BYTE_ARRAY, msg.getVersion());
if (logger.isDebugEnabled())
- logger.debug("Replying to " + msg.getMessageId() + "@" + msg.getFrom());
- MessagingService.instance().sendOneWay(response, msg.getFrom());
+ logger.debug("Replying to " + id + "@" + msg.getFrom());
+ MessagingService.instance().sendReply(response, id, msg.getFrom());
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java Tue Feb 8 18:24:48 2011
@@ -36,7 +36,7 @@ public class StreamReplyVerbHandler impl
{
private static Logger logger = LoggerFactory.getLogger(StreamReplyVerbHandler.class);
- public void doVerb(Message message)
+ public void doVerb(Message message, String id)
{
byte[] body = message.getMessageBody();
ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java Tue Feb 8 18:24:48 2011
@@ -37,7 +37,7 @@ public class StreamRequestVerbHandler im
{
private static Logger logger = LoggerFactory.getLogger(StreamRequestVerbHandler.class);
- public void doVerb(Message message)
+ public void doVerb(Message message, String id)
{
if (logger.isDebugEnabled())
logger.debug("Received a StreamRequestMessage from {}", message.getFrom());
Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java Tue Feb 8 18:24:48 2011
@@ -106,15 +106,15 @@ public class ByteBufferUtil
return string(buffer, Charset.defaultCharset());
}
- public static String string(ByteBuffer buffer, int offset, int length) throws CharacterCodingException
+ public static String string(ByteBuffer buffer, int position, int length) throws CharacterCodingException
{
- return string(buffer, offset, length, Charset.defaultCharset());
+ return string(buffer, position, length, Charset.defaultCharset());
}
- public static String string(ByteBuffer buffer, int offset, int length, Charset charset) throws CharacterCodingException
+ public static String string(ByteBuffer buffer, int position, int length, Charset charset) throws CharacterCodingException
{
ByteBuffer copy = buffer.duplicate();
- copy.position(buffer.position() + offset);
+ copy.position(position);
copy.limit(copy.position() + length);
return string(buffer, charset);
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java Tue Feb 8 18:24:48 2011
@@ -39,6 +39,7 @@ public class ExpiringMap<K, V>
CacheableObject(T o)
{
+ assert o != null;
value = o;
age = System.currentTimeMillis();
}
@@ -66,18 +67,19 @@ public class ExpiringMap<K, V>
@Override
public void run()
{
- for (Map.Entry<K, CacheableObject> entry : cache.entrySet())
+ for (Map.Entry<K, CacheableObject<V>> entry : cache.entrySet())
{
if (entry.getValue().isReadyToDie(expiration))
{
cache.remove(entry.getKey());
- postExpireHook.apply(new Pair(entry.getKey(), entry.getValue().getValue()));
+ if (postExpireHook != null)
+ postExpireHook.apply(new Pair<K, V>(entry.getKey(), entry.getValue().getValue()));
}
}
}
}
- private final NonBlockingHashMap<K, CacheableObject> cache = new NonBlockingHashMap<K, CacheableObject>();
+ private final NonBlockingHashMap<K, CacheableObject<V>> cache = new NonBlockingHashMap<K, CacheableObject<V>>();
private final Timer timer;
private static int counter = 0;
Modified: cassandra/trunk/test/data/serialization/0.7/db.RangeSliceCommand.bin
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/data/serialization/0.7/db.RangeSliceCommand.bin?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
Binary files - no diff available.
Modified: cassandra/trunk/test/data/serialization/0.7/db.Row.bin
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/data/serialization/0.7/db.Row.bin?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
Binary files - no diff available.
Modified: cassandra/trunk/test/data/serialization/0.7/db.RowMutation.bin
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/data/serialization/0.7/db.RowMutation.bin?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
Binary files - no diff available.
Modified: cassandra/trunk/test/data/serialization/0.7/db.SliceByNamesReadCommand.bin
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/data/serialization/0.7/db.SliceByNamesReadCommand.bin?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
Binary files - no diff available.
Modified: cassandra/trunk/test/data/serialization/0.7/db.SliceFromReadCommand.bin
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/data/serialization/0.7/db.SliceFromReadCommand.bin?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
Binary files - no diff available.
Modified: cassandra/trunk/test/data/serialization/0.7/db.Truncation.bin
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/data/serialization/0.7/db.Truncation.bin?rev=1068504&r1=1068503&r2=1068504&view=diff
==============================================================================
Binary files - no diff available.