You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/05/08 19:56:15 UTC
[9/15] git commit: Rewrite IncomingTcpConnection to deserialize w/o
extra copies to byte[]. MessageIn now has a payload field,
and uses the Verb to look up the correct deserializer. REQUEST_RESPONSE
deserializer is not uniquely determined by Verb, so we
Rewrite IncomingTcpConnection to deserialize w/o extra copies to byte[]. MessageIn now has a payload field, and uses the Verb to look up the correct deserializer. REQUEST_RESPONSE deserializer is not uniquely determined by Verb, so we look those up by their callback instead.
patch by jbellis and yukim for CASSANDRA-3617
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a06be23f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a06be23f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a06be23f
Branch: refs/heads/trunk
Commit: a06be23fbe7859063039767ce0dff64922445f39
Parents: 021ec71
Author: Jonathan Ellis <jb...@apache.org>
Authored: Mon Mar 26 17:52:16 2012 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue May 8 12:40:47 2012 -0500
----------------------------------------------------------------------
.../cassandra/db/CounterMutationVerbHandler.java | 10 +-
.../cassandra/db/DefinitionsUpdateVerbHandler.java | 16 ++-
src/java/org/apache/cassandra/db/DefsTable.java | 12 --
.../org/apache/cassandra/db/IndexScanCommand.java | 9 +-
.../cassandra/db/MigrationRequestVerbHandler.java | 4 +-
.../org/apache/cassandra/db/RangeSliceCommand.java | 9 +-
.../apache/cassandra/db/ReadRepairVerbHandler.java | 16 +-
.../org/apache/cassandra/db/ReadVerbHandler.java | 13 +-
.../cassandra/db/RowMutationVerbHandler.java | 20 +-
.../cassandra/db/SchemaCheckVerbHandler.java | 2 +-
.../org/apache/cassandra/db/SnapshotCommand.java | 7 -
.../apache/cassandra/db/TruncateVerbHandler.java | 14 +-
.../org/apache/cassandra/dht/BootStrapper.java | 10 +-
.../cassandra/gms/GossipDigestAck2Message.java | 2 +-
.../cassandra/gms/GossipDigestAck2VerbHandler.java | 20 +--
.../cassandra/gms/GossipDigestAckMessage.java | 4 +-
.../cassandra/gms/GossipDigestAckVerbHandler.java | 60 +++----
.../cassandra/gms/GossipDigestSynMessage.java | 2 +-
.../cassandra/gms/GossipDigestSynVerbHandler.java | 70 +++----
.../cassandra/gms/GossipShutdownVerbHandler.java | 5 +-
src/java/org/apache/cassandra/net/AsyncResult.java | 12 +-
.../org/apache/cassandra/net/CallbackInfo.java | 23 ++-
src/java/org/apache/cassandra/net/Header.java | 154 ---------------
.../org/apache/cassandra/net/IAsyncCallback.java | 4 +-
.../org/apache/cassandra/net/IAsyncResult.java | 6 +-
.../org/apache/cassandra/net/IVerbHandler.java | 4 +-
.../cassandra/net/IncomingTcpConnection.java | 41 ++---
.../apache/cassandra/net/MessageDeliveryTask.java | 2 +-
src/java/org/apache/cassandra/net/MessageIn.java | 102 ++++++----
.../org/apache/cassandra/net/MessagingService.java | 121 ++++++++++--
.../cassandra/net/OutboundTcpConnection.java | 5 -
.../apache/cassandra/net/ResponseVerbHandler.java | 6 +-
.../cassandra/service/AbstractRowResolver.java | 44 ++---
.../cassandra/service/AntiEntropyService.java | 83 +++------
.../cassandra/service/DatacenterReadCallback.java | 5 +-
.../DatacenterSyncWriteResponseHandler.java | 2 +-
.../service/DatacenterWriteResponseHandler.java | 2 +-
.../cassandra/service/IResponseResolver.java | 11 +-
.../cassandra/service/IndexScanVerbHandler.java | 10 +-
.../apache/cassandra/service/MigrationTask.java | 9 +-
.../service/RangeSliceResponseResolver.java | 22 +--
.../cassandra/service/RangeSliceVerbHandler.java | 11 +-
.../org/apache/cassandra/service/ReadCallback.java | 12 +-
.../cassandra/service/RowDigestResolver.java | 13 +-
.../cassandra/service/RowRepairResolver.java | 11 +-
.../cassandra/service/SnapshotVerbHandler.java | 10 +-
.../org/apache/cassandra/service/StorageProxy.java | 23 +--
.../apache/cassandra/streaming/FileStreamTask.java | 26 +--
.../streaming/ReplicationFinishedVerbHandler.java | 6 +-
.../apache/cassandra/streaming/StreamReply.java | 2 +-
.../streaming/StreamReplyVerbHandler.java | 17 +-
.../cassandra/streaming/StreamRequestMessage.java | 4 +-
.../streaming/StreamRequestVerbHandler.java | 29 +--
.../cassandra/streaming/StreamingRepairTask.java | 44 +----
.../apache/cassandra/db/SerializationsTest.java | 41 ++--
.../apache/cassandra/net/MessageSerializer.java | 52 -----
.../org/apache/cassandra/service/RemoveTest.java | 2 +-
.../cassandra/service/SerializationsTest.java | 12 +-
.../cassandra/streaming/SerializationsTest.java | 12 +-
.../org/apache/cassandra/streaming/StreamUtil.java | 16 +-
60 files changed, 527 insertions(+), 789 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
index d370a85..470de64 100644
--- a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
@@ -33,24 +33,22 @@ import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.thrift.UnavailableException;
import org.apache.cassandra.utils.FBUtilities;
-public class CounterMutationVerbHandler implements IVerbHandler
+public class CounterMutationVerbHandler implements IVerbHandler<CounterMutation>
{
private static final Logger logger = LoggerFactory.getLogger(CounterMutationVerbHandler.class);
- public void doVerb(MessageIn message, String id)
+ public void doVerb(MessageIn<CounterMutation> message, String id)
{
- DataInputStream in = new DataInputStream(new FastByteArrayInputStream(message.getMessageBody()));
-
try
{
- CounterMutation cm = CounterMutation.serializer().deserialize(in, message.getVersion());
+ CounterMutation cm = message.payload;
if (logger.isDebugEnabled())
logger.debug("Applying forwarded " + cm);
String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
StorageProxy.applyCounterMutationOnLeader(cm, localDataCenter).get();
WriteResponse response = new WriteResponse(cm.getTable(), cm.key(), true);
- MessagingService.instance().sendReply(response.createMessage(), id, message.getFrom());
+ MessagingService.instance().sendReply(response.createMessage(), id, message.from);
}
catch (UnavailableException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java b/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java
index f017f97..5c85530 100644
--- a/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java
@@ -17,6 +17,8 @@
*/
package org.apache.cassandra.db;
+import java.util.Collection;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -24,6 +26,7 @@ import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.WrappedRunnable;
/**
@@ -32,19 +35,24 @@ import org.apache.cassandra.utils.WrappedRunnable;
* (which is going to act as coordinator) and that node sends (pushes) it's updated schema state
* (in form of row mutations) to all the alive nodes in the cluster.
*/
-public class DefinitionsUpdateVerbHandler implements IVerbHandler
+public class DefinitionsUpdateVerbHandler implements IVerbHandler<Collection<RowMutation>>
{
private static final Logger logger = LoggerFactory.getLogger(DefinitionsUpdateVerbHandler.class);
- public void doVerb(final MessageIn message, String id)
+ public void doVerb(final MessageIn<Collection<RowMutation>> message, String id)
{
- logger.debug("Received schema mutation push from " + message.getFrom());
+ logger.debug("Received schema mutation push from " + message.from);
StageManager.getStage(Stage.MIGRATION).submit(new WrappedRunnable()
{
public void runMayThrow() throws Exception
{
- DefsTable.mergeRemoteSchema(message.getMessageBody(), message.getVersion());
+ if (message.version < MessagingService.VERSION_11)
+ {
+ logger.error("Can't accept schema migrations from Cassandra versions previous to 1.1, please upgrade first");
+ return;
+ }
+ DefsTable.mergeSchema(message.payload);
}
});
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/db/DefsTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DefsTable.java b/src/java/org/apache/cassandra/db/DefsTable.java
index 9d2c257..58ab4a6 100644
--- a/src/java/org/apache/cassandra/db/DefsTable.java
+++ b/src/java/org/apache/cassandra/db/DefsTable.java
@@ -238,18 +238,6 @@ public class DefsTable
* @throws ConfigurationException If one of metadata attributes has invalid value
* @throws IOException If data was corrupted during transportation or failed to apply fs operations
*/
- public static void mergeRemoteSchema(byte[] data, int version) throws ConfigurationException, IOException
- {
- if (version < MessagingService.VERSION_11)
- {
- logger.error("Can't accept schema migrations from Cassandra versions previous to 1.1, please update first.");
- return;
- }
-
- DataInputStream in = new DataInputStream(new FastByteArrayInputStream(data));
- mergeSchema(MigrationManager.MigrationsSerializer.instance.deserialize(in, version));
- }
-
public static synchronized void mergeSchema(Collection<RowMutation> mutations) throws ConfigurationException, IOException
{
// current state of the schema
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/db/IndexScanCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/IndexScanCommand.java b/src/java/org/apache/cassandra/db/IndexScanCommand.java
index 2f14849..ad11464 100644
--- a/src/java/org/apache/cassandra/db/IndexScanCommand.java
+++ b/src/java/org/apache/cassandra/db/IndexScanCommand.java
@@ -37,7 +37,7 @@ import org.apache.thrift.TSerializer;
public class IndexScanCommand
{
- private static final IndexScanCommandSerializer serializer = new IndexScanCommandSerializer();
+ public static final IndexScanCommandSerializer serializer = new IndexScanCommandSerializer();
public final String keyspace;
public final String column_family;
@@ -60,13 +60,6 @@ public class IndexScanCommand
return new MessageOut<IndexScanCommand>(MessagingService.Verb.INDEX_SCAN, this, serializer);
}
- public static IndexScanCommand read(MessageIn message) throws IOException
- {
- byte[] bytes = message.getMessageBody();
- FastByteArrayInputStream bis = new FastByteArrayInputStream(bytes);
- return serializer.deserialize(new DataInputStream(bis), message.getVersion());
- }
-
private static class IndexScanCommandSerializer implements IVersionedSerializer<IndexScanCommand>
{
public void serialize(IndexScanCommand o, DataOutput out, int version) throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java b/src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java
index 41650fa..97fd641 100644
--- a/src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java
@@ -38,10 +38,10 @@ public class MigrationRequestVerbHandler implements IVerbHandler
public void doVerb(MessageIn message, String id)
{
- logger.debug("Received migration request from {}.", message.getFrom());
+ logger.debug("Received migration request from {}.", message.from);
MessageOut<Collection<RowMutation>> response = new MessageOut<Collection<RowMutation>>(MessagingService.Verb.INTERNAL_RESPONSE,
SystemTable.serializeSchema(),
MigrationManager.MigrationsSerializer.instance);
- MessagingService.instance().sendReply(response, id, message.getFrom());
+ MessagingService.instance().sendReply(response, id, message.from);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/db/RangeSliceCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeSliceCommand.java b/src/java/org/apache/cassandra/db/RangeSliceCommand.java
index 528de5c..f029ae7 100644
--- a/src/java/org/apache/cassandra/db/RangeSliceCommand.java
+++ b/src/java/org/apache/cassandra/db/RangeSliceCommand.java
@@ -61,7 +61,7 @@ import org.apache.thrift.TSerializer;
public class RangeSliceCommand implements IReadCommand
{
- private static final RangeSliceCommandSerializer serializer = new RangeSliceCommandSerializer();
+ public static final RangeSliceCommandSerializer serializer = new RangeSliceCommandSerializer();
public final String keyspace;
@@ -134,13 +134,6 @@ public class RangeSliceCommand implements IReadCommand
'}';
}
- public static RangeSliceCommand read(MessageIn message) throws IOException
- {
- byte[] bytes = message.getMessageBody();
- FastByteArrayInputStream bis = new FastByteArrayInputStream(bytes);
- return serializer.deserialize(new DataInputStream(bis), message.getVersion());
- }
-
public String getKeyspace()
{
return keyspace;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java b/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
index 5864961..bc5e820 100644
--- a/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
@@ -17,33 +17,27 @@
*/
package org.apache.cassandra.db;
-import java.io.DataInputStream;
import java.io.IOError;
import java.io.IOException;
-import org.apache.cassandra.io.util.FastByteArrayInputStream;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessagingService;
-public class ReadRepairVerbHandler implements IVerbHandler
+public class ReadRepairVerbHandler implements IVerbHandler<RowMutation>
{
- public void doVerb(MessageIn message, String id)
+ public void doVerb(MessageIn<RowMutation> message, String id)
{
- DataInputStream in = new DataInputStream(new FastByteArrayInputStream(message.getMessageBody()));
-
- RowMutation rm;
try
{
- rm = RowMutation.serializer().deserialize(in, message.getVersion());
+ RowMutation rm = message.payload;
rm.apply();
+ WriteResponse response = new WriteResponse(rm.getTable(), rm.key(), true);
+ MessagingService.instance().sendReply(response.createMessage(), id, message.from);
}
catch (IOException e)
{
throw new IOError(e);
}
-
- WriteResponse response = new WriteResponse(rm.getTable(), rm.key(), true);
- MessagingService.instance().sendReply(response.createMessage(), id, message.getFrom());
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/db/ReadVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadVerbHandler.java b/src/java/org/apache/cassandra/db/ReadVerbHandler.java
index 7335d51..d94c0f1 100644
--- a/src/java/org/apache/cassandra/db/ReadVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/ReadVerbHandler.java
@@ -31,11 +31,11 @@ import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
-public class ReadVerbHandler implements IVerbHandler
+public class ReadVerbHandler implements IVerbHandler<ReadCommand>
{
private static final Logger logger = LoggerFactory.getLogger( ReadVerbHandler.class );
- public void doVerb(MessageIn message, String id)
+ public void doVerb(MessageIn<ReadCommand> message, String id)
{
if (StorageService.instance.isBootstrapMode())
{
@@ -44,8 +44,7 @@ public class ReadVerbHandler implements IVerbHandler
try
{
- FastByteArrayInputStream in = new FastByteArrayInputStream(message.getMessageBody());
- ReadCommand command = ReadCommand.serializer().deserialize(new DataInputStream(in), message.getVersion());
+ ReadCommand command = message.payload;
Table table = Table.open(command.table);
Row row = command.getRow(table);
@@ -53,9 +52,9 @@ public class ReadVerbHandler implements IVerbHandler
getResponse(command, row),
ReadResponse.serializer());
if (logger.isDebugEnabled())
- logger.debug(String.format("Read key %s; sending response to %s@%s",
- ByteBufferUtil.bytesToHex(command.key), id, message.getFrom()));
- MessagingService.instance().sendReply(reply, id, message.getFrom());
+ logger.debug(String.format("Read key %s; sending response to %s@%s",
+ ByteBufferUtil.bytesToHex(command.key), id, message.from));
+ MessagingService.instance().sendReply(reply, id, message.from);
}
catch (IOException ex)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java b/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
index 39e5ed3..fbf849a 100644
--- a/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
@@ -27,30 +27,30 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.io.util.FastByteArrayInputStream;
import org.apache.cassandra.net.*;
-public class RowMutationVerbHandler implements IVerbHandler
+public class RowMutationVerbHandler implements IVerbHandler<RowMutation>
{
private static final Logger logger = LoggerFactory.getLogger(RowMutationVerbHandler.class);
- public void doVerb(MessageIn message, String id)
+ public void doVerb(MessageIn<RowMutation> message, String id)
{
try
{
- RowMutation rm = RowMutation.fromBytes(message.getMessageBody(), message.getVersion());
+ RowMutation rm = message.payload;
if (logger.isDebugEnabled())
logger.debug("Applying " + rm);
// Check if there were any forwarding headers in this message
- InetAddress replyTo = message.getFrom();
- byte[] from = message.getHeader(RowMutation.FORWARD_FROM);
- if (from != null)
+ InetAddress replyTo = message.from;
+ byte[] from = message.parameters.get(RowMutation.FORWARD_FROM);
+ if (from == null)
{
- replyTo = InetAddress.getByAddress(from);
+ byte[] forwardBytes = message.parameters.get(RowMutation.FORWARD_TO);
+ if (forwardBytes != null && message.version >= MessagingService.VERSION_11)
+ forwardToLocalNodes(rm, message.verb, forwardBytes, message.from);
}
else
{
- byte[] forwardBytes = message.getHeader(RowMutation.FORWARD_TO);
- if (forwardBytes != null && message.getVersion() >= MessagingService.VERSION_11)
- forwardToLocalNodes(rm, message.getVerb(), forwardBytes, message.getFrom());
+ replyTo = InetAddress.getByAddress(from);
}
rm.apply();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java b/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
index 0003d82..d33419e 100644
--- a/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
@@ -34,6 +34,6 @@ public class SchemaCheckVerbHandler implements IVerbHandler
{
logger.debug("Received schema check request.");
MessageOut response = new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE);
- MessagingService.instance().sendReply(response, id, message.getFrom());
+ MessagingService.instance().sendReply(response, id, message.from);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/db/SnapshotCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SnapshotCommand.java b/src/java/org/apache/cassandra/db/SnapshotCommand.java
index 5463298..623d5c2 100644
--- a/src/java/org/apache/cassandra/db/SnapshotCommand.java
+++ b/src/java/org/apache/cassandra/db/SnapshotCommand.java
@@ -50,13 +50,6 @@ public class SnapshotCommand
return new MessageOut<SnapshotCommand>(MessagingService.Verb.SNAPSHOT, this, serializer);
}
- public static SnapshotCommand read(MessageIn message) throws IOException
- {
- byte[] bytes = message.getMessageBody();
- FastByteArrayInputStream bis = new FastByteArrayInputStream(bytes);
- return serializer.deserialize(new DataInputStream(bis), message.getVersion());
- }
-
@Override
public String toString()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/TruncateVerbHandler.java b/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
index 82bf14f..f7a7546 100644
--- a/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
@@ -29,17 +29,15 @@ import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessagingService;
-public class TruncateVerbHandler implements IVerbHandler
+public class TruncateVerbHandler implements IVerbHandler<Truncation>
{
private static final Logger logger = LoggerFactory.getLogger(TruncateVerbHandler.class);
- public void doVerb(MessageIn message, String id)
+ public void doVerb(MessageIn<Truncation> message, String id)
{
- DataInputStream in = new DataInputStream(new FastByteArrayInputStream(message.getMessageBody()));
-
try
{
- Truncation t = Truncation.serializer().deserialize(in, message.getVersion());
+ Truncation t = message.payload;
logger.debug("Applying {}", t);
try
@@ -55,8 +53,8 @@ public class TruncateVerbHandler implements IVerbHandler
logger.debug("Truncate operation succeeded at this host");
TruncateResponse response = new TruncateResponse(t.keyspace, t.columnFamily, true);
- logger.debug("{} applied. Sending response to {}@{} ", new Object[]{ t, id, message.getFrom()});
- MessagingService.instance().sendReply(response.createMessage(), id, message.getFrom());
+ logger.debug("{} applied. Sending response to {}@{} ", new Object[]{ t, id, message.from });
+ MessagingService.instance().sendReply(response.createMessage(), id, message.from);
}
catch (IOException e)
{
@@ -67,6 +65,6 @@ public class TruncateVerbHandler implements IVerbHandler
private static void respondError(Truncation t, MessageIn truncateRequestMessage) throws IOException
{
TruncateResponse response = new TruncateResponse(t.keyspace, t.columnFamily, false);
- MessagingService.instance().sendOneWay(response.createMessage(), truncateRequestMessage.getFrom());
+ MessagingService.instance().sendOneWay(response.createMessage(), truncateRequestMessage.from);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/dht/BootStrapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/BootStrapper.java b/src/java/org/apache/cassandra/dht/BootStrapper.java
index e31f908..d6fdf68 100644
--- a/src/java/org/apache/cassandra/dht/BootStrapper.java
+++ b/src/java/org/apache/cassandra/dht/BootStrapper.java
@@ -178,11 +178,11 @@ public class BootStrapper
StorageService ss = StorageService.instance;
String tokenString = StorageService.getPartitioner().getTokenFactory().toString(ss.getBootstrapToken());
MessageOut<String> response = new MessageOut<String>(MessagingService.Verb.INTERNAL_RESPONSE, tokenString, StringSerializer.instance);
- MessagingService.instance().sendReply(response, id, message.getFrom());
+ MessagingService.instance().sendReply(response, id, message.from);
}
}
- private static class BootstrapTokenCallback implements IAsyncCallback
+ private static class BootstrapTokenCallback implements IAsyncCallback<String>
{
private volatile Token<?> token;
private final Condition condition = new SimpleCondition();
@@ -202,9 +202,9 @@ public class BootStrapper
return success ? token : null;
}
- public void response(MessageIn msg)
+ public void response(MessageIn<String> msg)
{
- token = StorageService.getPartitioner().getTokenFactory().fromString(new String(msg.getMessageBody(), Charsets.UTF_8));
+ token = StorageService.getPartitioner().getTokenFactory().fromString(msg.payload);
condition.signalAll();
}
@@ -214,7 +214,7 @@ public class BootStrapper
}
}
- private static class StringSerializer implements IVersionedSerializer<String>
+ public static class StringSerializer implements IVersionedSerializer<String>
{
public static final StringSerializer instance = new StringSerializer();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/gms/GossipDigestAck2Message.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipDigestAck2Message.java b/src/java/org/apache/cassandra/gms/GossipDigestAck2Message.java
index c1d880e..609331f 100644
--- a/src/java/org/apache/cassandra/gms/GossipDigestAck2Message.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigestAck2Message.java
@@ -29,7 +29,7 @@ import org.apache.cassandra.io.IVersionedSerializer;
* last stage of the 3 way messaging of the Gossip protocol.
*/
-class GossipDigestAck2Message
+public class GossipDigestAck2Message
{
private static final IVersionedSerializer<GossipDigestAck2Message> serializer;
static
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java b/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java
index 141429e..42080bd 100644
--- a/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java
@@ -29,30 +29,18 @@ import org.apache.cassandra.io.util.FastByteArrayInputStream;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.MessageIn;
-public class GossipDigestAck2VerbHandler implements IVerbHandler
+public class GossipDigestAck2VerbHandler implements IVerbHandler<GossipDigestAck2Message>
{
private static final Logger logger = LoggerFactory.getLogger(GossipDigestAck2VerbHandler.class);
- public void doVerb(MessageIn message, String id)
+ public void doVerb(MessageIn<GossipDigestAck2Message> message, String id)
{
if (logger.isTraceEnabled())
{
- InetAddress from = message.getFrom();
+ InetAddress from = message.from;
logger.trace("Received a GossipDigestAck2Message from {}", from);
}
-
- byte[] bytes = message.getMessageBody();
- DataInputStream dis = new DataInputStream( new FastByteArrayInputStream(bytes) );
- GossipDigestAck2Message gDigestAck2Message;
- try
- {
- gDigestAck2Message = GossipDigestAck2Message.serializer().deserialize(dis, message.getVersion());
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- Map<InetAddress, EndpointState> remoteEpStateMap = gDigestAck2Message.getEndpointStateMap();
+ Map<InetAddress, EndpointState> remoteEpStateMap = message.payload.getEndpointStateMap();
/* Notify the Failure Detector */
Gossiper.instance.notifyFailureDetector(remoteEpStateMap);
Gossiper.instance.applyStateLocally(remoteEpStateMap);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java b/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java
index 4cc25e5..60706cb 100644
--- a/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java
@@ -31,7 +31,7 @@ import org.apache.cassandra.io.IVersionedSerializer;
* endpoint. This is the 2 stage of the 3 way messaging in the Gossip protocol.
*/
-class GossipDigestAckMessage // TODO rename
+public class GossipDigestAckMessage // TODO rename
{
private static final IVersionedSerializer<GossipDigestAckMessage> serializer;
static
@@ -42,7 +42,7 @@ class GossipDigestAckMessage // TODO rename
final List<GossipDigest> gDigestList;
final Map<InetAddress, EndpointState> epStateMap;
- static IVersionedSerializer<GossipDigestAckMessage> serializer()
+ public static IVersionedSerializer<GossipDigestAckMessage> serializer()
{
return serializer;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java b/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
index 5c3e06b..69fa5cf 100644
--- a/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
@@ -33,13 +33,13 @@ import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
-public class GossipDigestAckVerbHandler implements IVerbHandler
+public class GossipDigestAckVerbHandler implements IVerbHandler<GossipDigestAckMessage>
{
private static final Logger logger = LoggerFactory.getLogger(GossipDigestAckVerbHandler.class);
- public void doVerb(MessageIn message, String id)
+ public void doVerb(MessageIn<GossipDigestAckMessage> message, String id)
{
- InetAddress from = message.getFrom();
+ InetAddress from = message.from;
if (logger.isTraceEnabled())
logger.trace("Received a GossipDigestAckMessage from {}", from);
if (!Gossiper.instance.isEnabled())
@@ -49,42 +49,32 @@ public class GossipDigestAckVerbHandler implements IVerbHandler
return;
}
- byte[] bytes = message.getMessageBody();
- DataInputStream dis = new DataInputStream( new FastByteArrayInputStream(bytes) );
+ GossipDigestAckMessage gDigestAckMessage = message.payload;
+ List<GossipDigest> gDigestList = gDigestAckMessage.getGossipDigestList();
+ Map<InetAddress, EndpointState> epStateMap = gDigestAckMessage.getEndpointStateMap();
- try
+ if ( epStateMap.size() > 0 )
{
- GossipDigestAckMessage gDigestAckMessage = GossipDigestAckMessage.serializer().deserialize(dis, message.getVersion());
- List<GossipDigest> gDigestList = gDigestAckMessage.getGossipDigestList();
- Map<InetAddress, EndpointState> epStateMap = gDigestAckMessage.getEndpointStateMap();
-
- if ( epStateMap.size() > 0 )
- {
- /* Notify the Failure Detector */
- Gossiper.instance.notifyFailureDetector(epStateMap);
- Gossiper.instance.applyStateLocally(epStateMap);
- }
-
- /* Get the state required to send to this gossipee - construct GossipDigestAck2Message */
- Map<InetAddress, EndpointState> deltaEpStateMap = new HashMap<InetAddress, EndpointState>();
- for( GossipDigest gDigest : gDigestList )
- {
- InetAddress addr = gDigest.getEndpoint();
- EndpointState localEpStatePtr = Gossiper.instance.getStateForVersionBiggerThan(addr, gDigest.getMaxVersion());
- if ( localEpStatePtr != null )
- deltaEpStateMap.put(addr, localEpStatePtr);
- }
-
- MessageOut<GossipDigestAck2Message> gDigestAck2Message = new MessageOut<GossipDigestAck2Message>(MessagingService.Verb.GOSSIP_DIGEST_ACK2,
- new GossipDigestAck2Message(deltaEpStateMap),
- GossipDigestAck2Message.serializer());
- if (logger.isTraceEnabled())
- logger.trace("Sending a GossipDigestAck2Message to {}", from);
- MessagingService.instance().sendOneWay(gDigestAck2Message, from);
+ /* Notify the Failure Detector */
+ Gossiper.instance.notifyFailureDetector(epStateMap);
+ Gossiper.instance.applyStateLocally(epStateMap);
}
- catch ( IOException e )
+
+ /* Get the state required to send to this gossipee - construct GossipDigestAck2Message */
+ Map<InetAddress, EndpointState> deltaEpStateMap = new HashMap<InetAddress, EndpointState>();
+ for( GossipDigest gDigest : gDigestList )
{
- throw new RuntimeException(e);
+ InetAddress addr = gDigest.getEndpoint();
+ EndpointState localEpStatePtr = Gossiper.instance.getStateForVersionBiggerThan(addr, gDigest.getMaxVersion());
+ if ( localEpStatePtr != null )
+ deltaEpStateMap.put(addr, localEpStatePtr);
}
+
+ MessageOut<GossipDigestAck2Message> gDigestAck2Message = new MessageOut<GossipDigestAck2Message>(MessagingService.Verb.GOSSIP_DIGEST_ACK2,
+ new GossipDigestAck2Message(deltaEpStateMap),
+ GossipDigestAck2Message.serializer());
+ if (logger.isTraceEnabled())
+ logger.trace("Sending a GossipDigestAck2Message to {}", from);
+ MessagingService.instance().sendOneWay(gDigestAck2Message, from);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java b/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java
index e798edb..05e210f 100644
--- a/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java
@@ -34,7 +34,7 @@ import org.apache.cassandra.net.CompactEndpointSerializationHelper;
* round.
*/
-class GossipDigestSynMessage
+public class GossipDigestSynMessage
{
private static final IVersionedSerializer<GossipDigestSynMessage> serializer;
static
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java b/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
index 14bb324..70df9d3 100644
--- a/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
@@ -32,13 +32,13 @@ import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
-public class GossipDigestSynVerbHandler implements IVerbHandler
+public class GossipDigestSynVerbHandler implements IVerbHandler<GossipDigestSynMessage>
{
private static final Logger logger = LoggerFactory.getLogger( GossipDigestSynVerbHandler.class);
- public void doVerb(MessageIn message, String id)
+ public void doVerb(MessageIn<GossipDigestSynMessage> message, String id)
{
- InetAddress from = message.getFrom();
+ InetAddress from = message.from;
if (logger.isTraceEnabled())
logger.trace("Received a GossipDigestSynMessage from {}", from);
if (!Gossiper.instance.isEnabled())
@@ -48,50 +48,40 @@ public class GossipDigestSynVerbHandler implements IVerbHandler
return;
}
- byte[] bytes = message.getMessageBody();
- DataInputStream dis = new DataInputStream( new FastByteArrayInputStream(bytes) );
-
- try
+ GossipDigestSynMessage gDigestMessage = message.payload;
+ /* If the message is from a different cluster throw it away. */
+ if (!gDigestMessage.clusterId.equals(DatabaseDescriptor.getClusterName()))
{
- GossipDigestSynMessage gDigestMessage = GossipDigestSynMessage.serializer().deserialize(dis, message.getVersion());
- /* If the message is from a different cluster throw it away. */
- if ( !gDigestMessage.clusterId.equals(DatabaseDescriptor.getClusterName()) )
- {
- logger.warn("ClusterName mismatch from " + from + " " + gDigestMessage.clusterId + "!=" + DatabaseDescriptor.getClusterName());
- return;
- }
+ logger.warn("ClusterName mismatch from " + from + " " + gDigestMessage.clusterId + "!=" + DatabaseDescriptor.getClusterName());
+ return;
+ }
- List<GossipDigest> gDigestList = gDigestMessage.getGossipDigests();
- if (logger.isTraceEnabled())
+ List<GossipDigest> gDigestList = gDigestMessage.getGossipDigests();
+ if (logger.isTraceEnabled())
+ {
+ StringBuilder sb = new StringBuilder();
+ for ( GossipDigest gDigest : gDigestList )
{
- StringBuilder sb = new StringBuilder();
- for ( GossipDigest gDigest : gDigestList )
- {
- sb.append(gDigest);
- sb.append(" ");
- }
- logger.trace("Gossip syn digests are : " + sb.toString());
+ sb.append(gDigest);
+ sb.append(" ");
}
- /* Notify the Failure Detector */
- Gossiper.instance.notifyFailureDetector(gDigestList);
+ logger.trace("Gossip syn digests are : " + sb.toString());
+ }
+ /* Notify the Failure Detector */
+ Gossiper.instance.notifyFailureDetector(gDigestList);
- doSort(gDigestList);
+ doSort(gDigestList);
- List<GossipDigest> deltaGossipDigestList = new ArrayList<GossipDigest>();
- Map<InetAddress, EndpointState> deltaEpStateMap = new HashMap<InetAddress, EndpointState>();
- Gossiper.instance.examineGossiper(gDigestList, deltaGossipDigestList, deltaEpStateMap);
+ List<GossipDigest> deltaGossipDigestList = new ArrayList<GossipDigest>();
+ Map<InetAddress, EndpointState> deltaEpStateMap = new HashMap<InetAddress, EndpointState>();
+ Gossiper.instance.examineGossiper(gDigestList, deltaGossipDigestList, deltaEpStateMap);
- MessageOut<GossipDigestAckMessage> gDigestAckMessage = new MessageOut<GossipDigestAckMessage>(MessagingService.Verb.GOSSIP_DIGEST_ACK,
- new GossipDigestAckMessage(deltaGossipDigestList, deltaEpStateMap),
- GossipDigestAckMessage.serializer());
- if (logger.isTraceEnabled())
- logger.trace("Sending a GossipDigestAckMessage to {}", from);
- MessagingService.instance().sendOneWay(gDigestAckMessage, from);
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
+ MessageOut<GossipDigestAckMessage> gDigestAckMessage = new MessageOut<GossipDigestAckMessage>(MessagingService.Verb.GOSSIP_DIGEST_ACK,
+ new GossipDigestAckMessage(deltaGossipDigestList, deltaEpStateMap),
+ GossipDigestAckMessage.serializer());
+ if (logger.isTraceEnabled())
+ logger.trace("Sending a GossipDigestAckMessage to {}", from);
+ MessagingService.instance().sendOneWay(gDigestAckMessage, from);
}
/*
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/gms/GossipShutdownVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipShutdownVerbHandler.java b/src/java/org/apache/cassandra/gms/GossipShutdownVerbHandler.java
index b29013d..8990739 100644
--- a/src/java/org/apache/cassandra/gms/GossipShutdownVerbHandler.java
+++ b/src/java/org/apache/cassandra/gms/GossipShutdownVerbHandler.java
@@ -31,13 +31,12 @@ public class GossipShutdownVerbHandler implements IVerbHandler
public void doVerb(MessageIn message, String id)
{
- InetAddress from = message.getFrom();
if (!Gossiper.instance.isEnabled())
{
- logger.debug("Ignoring shutdown message from {} because gossip is disabled", from);
+ logger.debug("Ignoring shutdown message from {} because gossip is disabled", message.from);
return;
}
- FailureDetector.instance.forceConviction(from);
+ FailureDetector.instance.forceConviction(message.from);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/net/AsyncResult.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/AsyncResult.java b/src/java/org/apache/cassandra/net/AsyncResult.java
index 9a3985b..83023e1 100644
--- a/src/java/org/apache/cassandra/net/AsyncResult.java
+++ b/src/java/org/apache/cassandra/net/AsyncResult.java
@@ -28,11 +28,11 @@ import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-class AsyncResult implements IAsyncResult
+class AsyncResult<T> implements IAsyncResult<T>
{
private static final Logger logger = LoggerFactory.getLogger(AsyncResult.class);
- private byte[] result;
+ private T result;
private final AtomicBoolean done = new AtomicBoolean(false);
private final Lock lock = new ReentrantLock();
private final Condition condition;
@@ -45,7 +45,7 @@ class AsyncResult implements IAsyncResult
startTime = System.currentTimeMillis();
}
- public byte[] get(long timeout, TimeUnit tu) throws TimeoutException
+ public T get(long timeout, TimeUnit tu) throws TimeoutException
{
lock.lock();
try
@@ -77,15 +77,15 @@ class AsyncResult implements IAsyncResult
return result;
}
- public void result(MessageIn response)
+ public void result(MessageIn<T> response)
{
try
{
lock.lock();
if (!done.get())
{
- from = response.getFrom();
- result = response.getMessageBody();
+ from = response.from;
+ result = response.payload;
done.set(true);
condition.signal();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/net/CallbackInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/CallbackInfo.java b/src/java/org/apache/cassandra/net/CallbackInfo.java
index d872397..1def33a 100644
--- a/src/java/org/apache/cassandra/net/CallbackInfo.java
+++ b/src/java/org/apache/cassandra/net/CallbackInfo.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,35 +15,40 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.cassandra.net;
import java.net.InetAddress;
+import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.service.StorageProxy;
/**
* Encapsulates the callback information.
- * The ability to set the message is useful in cases for when a hint needs
+ * The ability to set the message is useful in cases for when a hint needs
* to be written due to a timeout in the response from a replica.
*/
-class CallbackInfo
+public class CallbackInfo
{
protected final InetAddress target;
protected final IMessageCallback callback;
- protected final MessageOut<?> message;
+ protected final MessageOut<?> sentMessage;
+ protected final IVersionedSerializer<?> serializer;
- public CallbackInfo(InetAddress target, IMessageCallback callback)
+ public CallbackInfo(InetAddress target, IMessageCallback callback, IVersionedSerializer<?> serializer)
{
this.target = target;
this.callback = callback;
- this.message = null;
+ this.serializer = serializer;
+ this.sentMessage = null;
}
- public CallbackInfo(InetAddress target, IMessageCallback callback, MessageOut<?> message)
+ public CallbackInfo(InetAddress target, IMessageCallback callback, MessageOut<?> sentMessage, IVersionedSerializer<?> serializer)
{
this.target = target;
this.callback = callback;
- this.message = message;
+ this.sentMessage = sentMessage;
+ this.serializer = serializer;
}
/**
@@ -54,6 +59,6 @@ class CallbackInfo
*/
public boolean shouldHint()
{
- return message != null && StorageProxy.shouldHint(target);
+ return sentMessage != null && StorageProxy.shouldHint(target);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/net/Header.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/Header.java b/src/java/org/apache/cassandra/net/Header.java
deleted file mode 100644
index 2b4c2b2..0000000
--- a/src/java/org/apache/cassandra/net/Header.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.net;
-
-import java.io.*;
-import java.net.InetAddress;
-import java.util.Collections;
-import java.util.Hashtable;
-import java.util.Map;
-
-import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.utils.FBUtilities;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
-
-public class Header
-{
- private static final IVersionedSerializer<Header> serializer;
-
- static
- {
- serializer = new HeaderSerializer();
- }
-
- public static IVersionedSerializer<Header> serializer()
- {
- return serializer;
- }
-
- // "from" is the ultimate origin of this request (the coordinator), which in a multi-DC setup
- // is not necessarily the same as the node that forwards us the request (see StorageProxy.sendMessages
- // and RowMutationVerbHandler.forwardToLocalNodes)
- private final InetAddress from;
- private final MessagingService.Verb verb;
- protected final Map<String, byte[]> details;
-
- Header(InetAddress from, MessagingService.Verb verb)
- {
- this(from, verb, Collections.<String, byte[]>emptyMap());
- }
-
- Header(InetAddress from, MessagingService.Verb verb, Map<String, byte[]> details)
- {
- assert from != null;
- assert verb != null;
-
- this.from = from;
- this.verb = verb;
- this.details = ImmutableMap.copyOf(details);
- }
-
- InetAddress getFrom()
- {
- return from;
- }
-
- MessagingService.Verb getVerb()
- {
- return verb;
- }
-
- byte[] getDetail(String key)
- {
- return details.get(key);
- }
-
- Header withDetailsAdded(String key, byte[] value)
- {
- Map<String, byte[]> detailsCopy = Maps.newHashMap(details);
- detailsCopy.put(key, value);
- return new Header(from, verb, detailsCopy);
- }
-
- Header withDetailsRemoved(String key)
- {
- if (!details.containsKey(key))
- return this;
- Map<String, byte[]> detailsCopy = Maps.newHashMap(details);
- detailsCopy.remove(key);
- return new Header(from, verb, detailsCopy);
- }
-
- public int serializedSize()
- {
- int size = 0;
- size += CompactEndpointSerializationHelper.serializedSize(getFrom());
- size += 4;
- size += 4;
- for (String key : details.keySet())
- {
- size += 2 + FBUtilities.encodedUTF8Length(key);
- byte[] value = details.get(key);
- size += 4 + value.length;
- }
- return size;
- }
-}
-
-class HeaderSerializer implements IVersionedSerializer<Header>
-{
- public void serialize(Header t, DataOutput dos, int version) throws IOException
- {
- CompactEndpointSerializationHelper.serialize(t.getFrom(), dos);
- dos.writeInt(t.getVerb().ordinal());
- dos.writeInt(t.details.size());
- for (String key : t.details.keySet())
- {
- dos.writeUTF(key);
- byte[] value = t.details.get(key);
- dos.writeInt(value.length);
- dos.write(value);
- }
- }
-
- public Header deserialize(DataInput dis, int version) throws IOException
- {
- InetAddress from = CompactEndpointSerializationHelper.deserialize(dis);
- int verbOrdinal = dis.readInt();
- int size = dis.readInt();
- Map<String, byte[]> details = new Hashtable<String, byte[]>(size);
- for ( int i = 0; i < size; ++i )
- {
- String key = dis.readUTF();
- int length = dis.readInt();
- byte[] bytes = new byte[length];
- dis.readFully(bytes);
- details.put(key, bytes);
- }
- return new Header(from, MessagingService.VERBS[verbOrdinal], details);
- }
-
- public long serializedSize(Header header, int version)
- {
- throw new UnsupportedOperationException();
- }
-}
-
-
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/net/IAsyncCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IAsyncCallback.java b/src/java/org/apache/cassandra/net/IAsyncCallback.java
index 797779b..868d368 100644
--- a/src/java/org/apache/cassandra/net/IAsyncCallback.java
+++ b/src/java/org/apache/cassandra/net/IAsyncCallback.java
@@ -23,10 +23,10 @@ package org.apache.cassandra.net;
* service. In particular, if any shared state is referenced, making
* response alone synchronized will not suffice.
*/
-public interface IAsyncCallback extends IMessageCallback
+public interface IAsyncCallback<T> extends IMessageCallback
{
/**
* @param msg response received.
*/
- public void response(MessageIn msg);
+ public void response(MessageIn<T> msg);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/net/IAsyncResult.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IAsyncResult.java b/src/java/org/apache/cassandra/net/IAsyncResult.java
index 545cfb3..87a4c73 100644
--- a/src/java/org/apache/cassandra/net/IAsyncResult.java
+++ b/src/java/org/apache/cassandra/net/IAsyncResult.java
@@ -21,7 +21,7 @@ import java.net.InetAddress;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-public interface IAsyncResult extends IMessageCallback
+public interface IAsyncResult<T> extends IMessageCallback
{
/**
* Same operation as the above get() but allows the calling
@@ -30,13 +30,13 @@ public interface IAsyncResult extends IMessageCallback
* @param tu the time unit of the timeout argument
* @return the result wrapped in an Object[]
*/
- public byte[] get(long timeout, TimeUnit tu) throws TimeoutException;
+ public T get(long timeout, TimeUnit tu) throws TimeoutException;
/**
* Store the result obtained for the submitted task.
* @param result the response message
*/
- public void result(MessageIn result);
+ public void result(MessageIn<T> result);
public InetAddress getFrom();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/net/IVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IVerbHandler.java b/src/java/org/apache/cassandra/net/IVerbHandler.java
index 96867ec..8ae63e2 100644
--- a/src/java/org/apache/cassandra/net/IVerbHandler.java
+++ b/src/java/org/apache/cassandra/net/IVerbHandler.java
@@ -23,7 +23,7 @@ package org.apache.cassandra.net;
* for a given verb.
*/
-public interface IVerbHandler
+public interface IVerbHandler<T>
{
/**
* This method delivers a message to the implementing class (if the implementing
@@ -34,5 +34,5 @@ public interface IVerbHandler
* @param message - incoming message that needs handling.
* @param id
*/
- public void doVerb(MessageIn message, String id);
+ public void doVerb(MessageIn<T> message, String id);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
index 9647e8f..45d8255 100644
--- a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
@@ -17,15 +17,18 @@
*/
package org.apache.cassandra.net;
-import java.io.*;
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
import java.net.InetAddress;
import java.net.Socket;
-import org.apache.cassandra.gms.Gossiper;
-import org.apache.cassandra.io.util.FastByteArrayInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.io.util.FastByteArrayInputStream;
import org.apache.cassandra.streaming.IncomingStreamReader;
import org.apache.cassandra.streaming.StreamHeader;
@@ -33,8 +36,6 @@ public class IncomingTcpConnection extends Thread
{
private static final Logger logger = LoggerFactory.getLogger(IncomingTcpConnection.class);
- private static final int CHUNK_SIZE = 1024 * 1024;
-
private final Socket socket;
public InetAddress from;
@@ -126,35 +127,25 @@ public class IncomingTcpConnection extends Thread
private InetAddress receiveMessage(DataInputStream input, int version) throws IOException
{
- int totalSize = input.readInt();
- String id = input.readUTF();
- Header header = Header.serializer().deserialize(input, version);
-
- int bodySize = input.readInt();
- byte[] body = new byte[bodySize];
- // readFully allocates a direct buffer the size of the chunk it is asked to read,
- // so we cap that at CHUNK_SIZE. See https://issues.apache.org/jira/browse/CASSANDRA-2654
- int remainder = bodySize % CHUNK_SIZE;
- for (int offset = 0; offset < bodySize - remainder; offset += CHUNK_SIZE)
- input.readFully(body, offset, CHUNK_SIZE);
- input.readFully(body, bodySize - remainder, remainder);
- // earlier versions would send unnecessary bytes left over at the end of a buffer, too
- long remaining = totalSize - OutboundTcpConnection.messageLength(header, id, body);
- while (remaining > 0)
- remaining -= input.skip(remaining);
+ if (version <= MessagingService.VERSION_11)
+ input.readInt(); // size of entire message. in 1.0+ this is just a placeholder
- // for non-streaming connections, continue to read the messages (and ignore them) until sender
- // starts sending correct-version messages (which it can do without reconnecting -- version is per-Message)
+ String id = input.readUTF();
+ MessageIn message = MessageIn.read(input, version, id);
+ if (message == null)
+ {
+ // callback expired; nothing to do
+ return null;
+ }
if (version <= MessagingService.current_version)
{
- MessageIn message = new MessageIn(header, body, version);
MessagingService.instance().receive(message, id);
}
else
{
logger.debug("Received connection from newer protocol version {}. Ignoring message", version);
}
- return header.getFrom();
+ return message.from;
}
private void close()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
index b0603ff..a5fd614 100644
--- a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
+++ b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
@@ -39,7 +39,7 @@ public class MessageDeliveryTask implements Runnable
public void run()
{
- MessagingService.Verb verb = message.getVerb();
+ MessagingService.Verb verb = message.verb;
if (MessagingService.DROPPABLE_VERBS.contains(verb)
&& System.currentTimeMillis() > constructionTime + DatabaseDescriptor.getRpcTimeout())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/net/MessageIn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageIn.java b/src/java/org/apache/cassandra/net/MessageIn.java
index f326802d..fe053a4 100644
--- a/src/java/org/apache/cassandra/net/MessageIn.java
+++ b/src/java/org/apache/cassandra/net/MessageIn.java
@@ -17,71 +17,95 @@
*/
package org.apache.cassandra.net;
+import java.io.DataInput;
+import java.io.IOException;
import java.net.InetAddress;
+import java.util.Collections;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.io.IVersionedSerializer;
-public class MessageIn
+public class MessageIn<T>
{
- final Header header;
- private final byte[] body;
- private final transient int version;
+ public final InetAddress from;
+ public final T payload;
+ public final Map<String, byte[]> parameters;
+ public final MessagingService.Verb verb;
+ public final int version;
- public MessageIn(Header header, byte[] body, int version)
+ private MessageIn(InetAddress from, T payload, Map<String, byte[]> parameters, MessagingService.Verb verb, int version)
{
- assert header != null;
- assert body != null;
-
- this.header = header;
- this.body = body;
+ this.from = from;
+ this.payload = payload;
+ this.parameters = parameters;
+ this.verb = verb;
this.version = version;
}
- public MessageIn(InetAddress from, MessagingService.Verb verb, byte[] body, int version)
- {
- this(new Header(from, verb), body, version);
- }
-
- public byte[] getHeader(String key)
+ public static <T> MessageIn<T> create(InetAddress from, T payload, Map<String, byte[]> parameters, MessagingService.Verb verb, int version)
{
- return header.getDetail(key);
+ return new MessageIn<T>(from, payload, parameters, verb, version);
}
- public byte[] getMessageBody()
+ public static <T2> MessageIn<T2> read(DataInput in, int version, String id) throws IOException
{
- return body;
- }
+ InetAddress from = CompactEndpointSerializationHelper.deserialize(in);
- public int getVersion()
- {
- return version;
- }
+ MessagingService.Verb verb = MessagingService.Verb.values()[in.readInt()];
+ int parameterCount = in.readInt();
+ Map<String, byte[]> parameters;
+ if (parameterCount == 0)
+ {
+ parameters = Collections.emptyMap();
+ }
+ else
+ {
+ ImmutableMap.Builder<String, byte[]> builder = ImmutableMap.builder();
+ for (int i = 0; i < parameterCount; i++)
+ {
+ String key = in.readUTF();
+ byte[] value = new byte[in.readInt()];
+ in.readFully(value);
+ builder.put(key, value);
+ }
+ parameters = builder.build();
+ }
- public InetAddress getFrom()
- {
- return header.getFrom();
+ int payloadSize = in.readInt();
+ if (payloadSize == 0)
+ return create(from, null, parameters, verb, version);
+ IVersionedSerializer<T2> serializer = (IVersionedSerializer<T2>) MessagingService.verbSerializers.get(verb);
+ if (serializer instanceof MessagingService.CallbackDeterminedSerializer)
+ {
+ CallbackInfo callback = MessagingService.instance().getRegisteredCallback(id);
+ if (callback == null)
+ {
+ // reply for expired callback. we'll have to skip it.
+ in.skipBytes(payloadSize);
+ return null;
+ }
+ serializer = (IVersionedSerializer<T2>) callback.serializer;
+ }
+ T2 payload = serializer.deserialize(in, version);
+ return MessageIn.create(from, payload, parameters, verb, version);
}
public Stage getMessageType()
{
- return MessagingService.verbStages.get(getVerb());
- }
-
- public MessagingService.Verb getVerb()
- {
- return header.getVerb();
+ return MessagingService.verbStages.get(verb);
}
public String toString()
{
StringBuilder sbuf = new StringBuilder("");
String separator = System.getProperty("line.separator");
- sbuf.append("FROM:" + getFrom())
- .append(separator)
- .append("TYPE:" + getMessageType())
- .append(separator)
- .append("VERB:" + getVerb())
- .append(separator);
+ sbuf.append("FROM:").append(from)
+ .append(separator).append("TYPE:").append(getMessageType())
+ .append(separator).append("VERB:").append(verb)
+ .append(separator);
return sbuf.toString();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 649f81d..ef85a82 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -17,6 +17,8 @@
*/
package org.apache.cassandra.net;
+import java.io.DataInput;
+import java.io.DataOutput;
import java.io.IOError;
import java.io.IOException;
import java.lang.management.ManagementFactory;
@@ -44,16 +46,22 @@ import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.EncryptionOptions;
-import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.dht.BootStrapper;
+import org.apache.cassandra.gms.GossipDigestAck2Message;
+import org.apache.cassandra.gms.GossipDigestAckMessage;
+import org.apache.cassandra.gms.GossipDigestSynMessage;
+import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.locator.ILatencySubscriber;
import org.apache.cassandra.net.io.SerializerType;
import org.apache.cassandra.net.sink.SinkManager;
import org.apache.cassandra.security.SSLFactory;
+import org.apache.cassandra.service.AntiEntropyService;
+import org.apache.cassandra.service.MigrationManager;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.streaming.FileStreamTask;
-import org.apache.cassandra.streaming.StreamHeader;
+import org.apache.cassandra.streaming.*;
import org.apache.cassandra.utils.*;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
@@ -150,9 +158,88 @@ public final class MessagingService implements MessagingServiceMBean
put(Verb.UNUSED_3, Stage.INTERNAL_RESPONSE);
}};
+ /**
+ * Messages we receive in IncomingTcpConnection have a Verb that tells us what kind of message it is.
+ * Most of the time, this is enough to determine how to deserialize the message payload.
+ * The exception is the REQUEST_RESPONSE verb, which just means "a reply to something you told me to do."
+ * Traditionally, this was fine since each VerbHandler knew what type of payload it expected, and
+ * handled the deserialization itself. Now that we do that in ITC, to avoid the extra copy to an
+ * intermediary byte[] (See CASSANDRA-3716), we need to wire that up to the CallbackInfo object
+ * (see below).
+ */
+ public static final EnumMap<Verb, IVersionedSerializer<?>> verbSerializers = new EnumMap<Verb, IVersionedSerializer<?>>(Verb.class)
+ {{
+ put(Verb.REQUEST_RESPONSE, CallbackDeterminedSerializer.instance);
+ put(Verb.INTERNAL_RESPONSE, CallbackDeterminedSerializer.instance);
+
+ put(Verb.MUTATION, RowMutation.serializer());
+ put(Verb.READ_REPAIR, RowMutation.serializer());
+ put(Verb.READ, ReadCommand.serializer());
+ put(Verb.STREAM_REPLY, StreamReply.serializer);
+ put(Verb.STREAM_REQUEST, StreamRequestMessage.serializer());
+ put(Verb.RANGE_SLICE, RangeSliceCommand.serializer);
+ put(Verb.BOOTSTRAP_TOKEN, BootStrapper.StringSerializer.instance);
+ put(Verb.TREE_REQUEST, AntiEntropyService.TreeRequest.serializer);
+ put(Verb.TREE_RESPONSE, AntiEntropyService.Validator.serializer);
+ put(Verb.STREAMING_REPAIR_REQUEST, StreamingRepairTask.serializer);
+ put(Verb.STREAMING_REPAIR_RESPONSE, UUIDGen.serializer);
+ put(Verb.GOSSIP_DIGEST_ACK, GossipDigestAckMessage.serializer());
+ put(Verb.GOSSIP_DIGEST_ACK2, GossipDigestAck2Message.serializer());
+ put(Verb.GOSSIP_DIGEST_SYN, GossipDigestSynMessage.serializer());
+ put(Verb.DEFINITIONS_UPDATE, MigrationManager.MigrationsSerializer.instance);
+ put(Verb.TRUNCATE, Truncation.serializer());
+ put(Verb.SCHEMA_CHECK, null);
+ put(Verb.INDEX_SCAN, IndexScanCommand.serializer);
+ put(Verb.REPLICATION_FINISHED, null);
+ put(Verb.COUNTER_MUTATION, CounterMutation.serializer());
+ }};
+
+ /**
+ * A Map of what kind of serializer to wire up to a REQUEST_RESPONSE callback, based on outbound Verb.
+ */
+ public static final EnumMap<Verb, IVersionedSerializer<?>> callbackDeserializers = new EnumMap<Verb, IVersionedSerializer<?>>(Verb.class)
+ {{
+ put(Verb.MUTATION, WriteResponse.serializer());
+ put(Verb.READ_REPAIR, WriteResponse.serializer());
+ put(Verb.COUNTER_MUTATION, WriteResponse.serializer());
+ put(Verb.RANGE_SLICE, RangeSliceReply.serializer);
+ put(Verb.READ, ReadResponse.serializer());
+ put(Verb.TRUNCATE, TruncateResponse.serializer());
+ put(Verb.SNAPSHOT, null);
+
+ put(Verb.MIGRATION_REQUEST, MigrationManager.MigrationsSerializer.instance);
+ put(Verb.SCHEMA_CHECK, UUIDGen.serializer);
+ put(Verb.BOOTSTRAP_TOKEN, BootStrapper.StringSerializer.instance);
+ put(Verb.REPLICATION_FINISHED, null);
+ }};
+
/* This records all the results mapped by message Id */
private final ExpiringMap<String, CallbackInfo> callbacks;
+ /**
+ * a placeholder class that means "deserialize using the callback." We can't implement this without
+ * special-case code in InboundTcpConnection because there is no way to pass the message id to IVersionedSerializer.
+ */
+ static class CallbackDeterminedSerializer implements IVersionedSerializer<Object>
+ {
+ public static final CallbackDeterminedSerializer instance = new CallbackDeterminedSerializer();
+
+ public Object deserialize(DataInput in, int version) throws IOException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void serialize(Object o, DataOutput out, int version) throws IOException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public long serializedSize(Object o, int version)
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
/* Lookup table for registering message handlers based on the verb. */
private final Map<Verb, IVerbHandler> verbHandlers;
@@ -255,8 +342,8 @@ public final class MessagingService implements MessagingServiceMBean
if (expiredCallbackInfo.shouldHint())
{
- assert expiredCallbackInfo.message != null;
- RowMutation rm = (RowMutation) expiredCallbackInfo.message.payload;
+ assert expiredCallbackInfo.sentMessage != null;
+ RowMutation rm = (RowMutation) expiredCallbackInfo.sentMessage.payload;
return StorageProxy.scheduleLocalHint(rm, expiredCallbackInfo.target, null, null);
}
@@ -414,9 +501,9 @@ public final class MessagingService implements MessagingServiceMBean
// If HH is enabled and this is a mutation message => store the message to track for potential hints.
if (DatabaseDescriptor.hintedHandoffEnabled() && message.verb == Verb.MUTATION)
- previous = callbacks.put(messageId, new CallbackInfo(to, cb, message), timeout);
+ previous = callbacks.put(messageId, new CallbackInfo(to, cb, message, callbackDeserializers.get(message.verb)), timeout);
else
- previous = callbacks.put(messageId, new CallbackInfo(to, cb), timeout);
+ previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb)), timeout);
assert previous == null;
return messageId;
@@ -495,9 +582,9 @@ public final class MessagingService implements MessagingServiceMBean
connection.enqueue(processedMessage, id);
}
- public IAsyncResult sendRR(MessageOut message, InetAddress to)
+ public <T> IAsyncResult<T> sendRR(MessageOut message, InetAddress to)
{
- IAsyncResult iar = new AsyncResult();
+ IAsyncResult<T> iar = new AsyncResult();
sendRR(message, to, iar);
return iar;
}
@@ -594,8 +681,8 @@ public final class MessagingService implements MessagingServiceMBean
public void receive(MessageIn message, String id)
{
if (logger.isTraceEnabled())
- logger.trace(FBUtilities.getBroadcastAddress() + " received " + message.getVerb()
- + " from " + id + "@" + message.getFrom());
+ logger.trace(FBUtilities.getBroadcastAddress() + " received " + message.verb
+ + " from " + id + "@" + message.from);
message = SinkManager.processInboundMessage(message, id);
if (message == null)
@@ -603,10 +690,20 @@ public final class MessagingService implements MessagingServiceMBean
Runnable runnable = new MessageDeliveryTask(message, id);
ExecutorService stage = StageManager.getStage(message.getMessageType());
- assert stage != null : "No stage for message type " + message.getVerb();
+ assert stage != null : "No stage for message type " + message.verb;
stage.execute(runnable);
}
+ public void setCallbackForTests(String messageId, CallbackInfo callback)
+ {
+ callbacks.put(messageId, callback);
+ }
+
+ public CallbackInfo getRegisteredCallback(String messageId)
+ {
+ return callbacks.get(messageId);
+ }
+
public CallbackInfo removeRegisteredCallback(String messageId)
{
return callbacks.remove(messageId);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 72164d7..a5cf79d 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -196,11 +196,6 @@ public class OutboundTcpConnection extends Thread
message.serialize(out, version);
}
- public static int messageLength(Header header, String id, byte[] bytes)
- {
- return 2 + FBUtilities.encodedUTF8Length(id) + header.serializedSize() + 4 + bytes.length;
- }
-
private void disconnect()
{
if (socket != null)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
index 3016daf..106f76d 100644
--- a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
+++ b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
@@ -35,18 +35,18 @@ public class ResponseVerbHandler implements IVerbHandler
}
IMessageCallback cb = callbackInfo.callback;
- MessagingService.instance().maybeAddLatency(cb, message.getFrom(), age);
+ MessagingService.instance().maybeAddLatency(cb, message.from, age);
if (cb instanceof IAsyncCallback)
{
if (logger.isDebugEnabled())
- logger.debug("Processing response on a callback from " + id + "@" + message.getFrom());
+ logger.debug("Processing response on a callback from " + id + "@" + message.from);
((IAsyncCallback) cb).response(message);
}
else
{
if (logger.isDebugEnabled())
- logger.debug("Processing response on an async result from " + id + "@" + message.getFrom());
+ logger.debug("Processing response on an async result from " + id + "@" + message.from);
((IAsyncResult) cb).result(message);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/service/AbstractRowResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractRowResolver.java b/src/java/org/apache/cassandra/service/AbstractRowResolver.java
index d4a3b65..b1647a2 100644
--- a/src/java/org/apache/cassandra/service/AbstractRowResolver.java
+++ b/src/java/org/apache/cassandra/service/AbstractRowResolver.java
@@ -17,33 +17,27 @@
*/
package org.apache.cassandra.service;
-import java.io.DataInputStream;
-import java.io.IOError;
-import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.concurrent.ConcurrentMap;
+import java.util.Collections;
+import java.util.Set;
-import org.apache.commons.lang.ArrayUtils;
+import org.cliffc.high_scale_lib.NonBlockingHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.ReadResponse;
import org.apache.cassandra.db.Row;
-import org.apache.cassandra.io.util.FastByteArrayInputStream;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.FBUtilities;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
-public abstract class AbstractRowResolver implements IResponseResolver<Row>
+public abstract class AbstractRowResolver implements IResponseResolver<ReadResponse, Row>
{
protected static final Logger logger = LoggerFactory.getLogger(AbstractRowResolver.class);
- private static final MessageIn FAKE_MESSAGE = new MessageIn(FBUtilities.getBroadcastAddress(), MessagingService.Verb.INTERNAL_RESPONSE, ArrayUtils.EMPTY_BYTE_ARRAY, -1);
-
protected final String table;
- protected final ConcurrentMap<MessageIn, ReadResponse> replies = new NonBlockingHashMap<MessageIn, ReadResponse>();
+ protected final Set<MessageIn<ReadResponse>> replies = new NonBlockingHashSet<MessageIn<ReadResponse>>();
protected final DecoratedKey key;
public AbstractRowResolver(ByteBuffer key, String table)
@@ -52,33 +46,25 @@ public abstract class AbstractRowResolver implements IResponseResolver<Row>
this.table = table;
}
- public void preprocess(MessageIn message)
+ public void preprocess(MessageIn<ReadResponse> message)
{
- byte[] body = message.getMessageBody();
- FastByteArrayInputStream bufIn = new FastByteArrayInputStream(body);
- try
- {
- ReadResponse result = ReadResponse.serializer().deserialize(new DataInputStream(bufIn), message.getVersion());
- if (logger.isDebugEnabled())
- logger.debug("Preprocessed {} response", result.isDigestQuery() ? "digest" : "data");
- replies.put(message, result);
- }
- catch (IOException e)
- {
- throw new IOError(e);
- }
+ replies.add(message);
}
/** hack so local reads don't force de/serialization of an extra real Message */
public void injectPreProcessed(ReadResponse result)
{
- assert replies.get(FAKE_MESSAGE) == null; // should only be one local reply
- replies.put(FAKE_MESSAGE, result);
+ MessageIn<ReadResponse> message = MessageIn.create(FBUtilities.getBroadcastAddress(),
+ result,
+ Collections.<String, byte[]>emptyMap(),
+ MessagingService.Verb.INTERNAL_RESPONSE,
+ MessagingService.current_version);
+ replies.add(message);
}
- public Iterable<MessageIn> getMessages()
+ public Iterable<MessageIn<ReadResponse>> getMessages()
{
- return replies.keySet();
+ return replies;
}
public int getMaxLiveColumns()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a06be23f/src/java/org/apache/cassandra/service/AntiEntropyService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AntiEntropyService.java b/src/java/org/apache/cassandra/service/AntiEntropyService.java
index f9728ad..a13270a 100644
--- a/src/java/org/apache/cassandra/service/AntiEntropyService.java
+++ b/src/java/org/apache/cassandra/service/AntiEntropyService.java
@@ -221,19 +221,6 @@ public class AntiEntropyService
}
/**
-<<<<<<< HEAD
-=======
- * Requests a tree from the given node, and returns the request that was sent.
- */
- TreeRequest request(String sessionid, InetAddress remote, Range<Token> range, String ksname, String cfname)
- {
- TreeRequest request = new TreeRequest(sessionid, remote, range, new CFPair(ksname, cfname));
- MessagingService.instance().sendOneWay(request.createMessage(), remote);
- return request;
- }
-
- /**
->>>>>>> ddbe4e6... MessageOut
* Responds to the node that requested the given valid tree.
* @param validator A locally generated validator
* @param local localhost (parameterized for testing)
@@ -454,31 +441,21 @@ public class AntiEntropyService
* Handler for requests from remote nodes to generate a valid tree.
* The payload is a CFPair representing the columnfamily to validate.
*/
- public static class TreeRequestVerbHandler implements IVerbHandler
+ public static class TreeRequestVerbHandler implements IVerbHandler<TreeRequest>
{
/**
* Trigger a validation compaction which will return the tree upon completion.
*/
- public void doVerb(MessageIn message, String id)
+ public void doVerb(MessageIn<TreeRequest> message, String id)
{
- byte[] bytes = message.getMessageBody();
-
- DataInputStream buffer = new DataInputStream(new FastByteArrayInputStream(bytes));
- try
- {
- TreeRequest remotereq = TreeRequest.serializer.deserialize(buffer, message.getVersion());
- TreeRequest request = new TreeRequest(remotereq.sessionid, message.getFrom(), remotereq.range, remotereq.cf);
-
- // trigger readonly-compaction
- ColumnFamilyStore store = Table.open(request.cf.left).getColumnFamilyStore(request.cf.right);
- Validator validator = new Validator(request);
- logger.debug("Queueing validation compaction for " + request);
- CompactionManager.instance.submitValidation(store, validator);
- }
- catch (IOException e)
- {
- throw new IOError(e);
- }
+ TreeRequest remotereq = message.payload;
+ TreeRequest request = new TreeRequest(remotereq.sessionid, message.from, remotereq.range, remotereq.cf);
+
+ // trigger read-only compaction
+ ColumnFamilyStore store = Table.open(request.cf.left).getColumnFamilyStore(request.cf.right);
+ Validator validator = new Validator(request);
+ logger.debug("Queueing validation compaction for " + request);
+ CompactionManager.instance.submitValidation(store, validator);
}
}
@@ -486,24 +463,14 @@ public class AntiEntropyService
* Handler for responses from remote nodes which contain a valid tree.
* The payload is a completed Validator object from the remote endpoint.
*/
- public static class TreeResponseVerbHandler implements IVerbHandler
+ public static class TreeResponseVerbHandler implements IVerbHandler<Validator>
{
- public void doVerb(MessageIn message, String id)
+ public void doVerb(MessageIn<Validator> message, String id)
{
- byte[] bytes = message.getMessageBody();
- DataInputStream buffer = new DataInputStream(new FastByteArrayInputStream(bytes));
-
- try
- {
- // deserialize the remote tree, and register it
- Validator response = Validator.serializer.deserialize(buffer, message.getVersion());
- TreeRequest request = new TreeRequest(response.request.sessionid, message.getFrom(), response.request.range, response.request.cf);
- AntiEntropyService.instance.rendezvous(request, response.tree);
- }
- catch (IOException e)
- {
- throw new IOError(e);
- }
+ // deserialize the remote tree, and register it
+ Validator response = message.payload;
+ TreeRequest request = new TreeRequest(response.request.sessionid, message.from, response.request.range, response.request.cf);
+ AntiEntropyService.instance.rendezvous(request, response.tree);
}
}
@@ -883,15 +850,15 @@ public class AntiEntropyService
snapshotLatch = new CountDownLatch(endpoints.size());
IAsyncCallback callback = new IAsyncCallback()
{
- public boolean isLatencyForSnitch()
- {
- return false;
- }
-
- public void response(MessageIn msg)
- {
- RepairJob.this.snapshotLatch.countDown();
- }
+ public boolean isLatencyForSnitch()
+ {
+ return false;
+ }
+
+ public void response(MessageIn msg)
+ {
+ RepairJob.this.snapshotLatch.countDown();
+ }
};
for (InetAddress endpoint : endpoints)
MessagingService.instance().sendRR(new SnapshotCommand(tablename, cfname, sessionName, false).createMessage(), endpoint, callback);