You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/05/08 19:56:15 UTC
[12/15] git commit: Automated refactor: rename Message to MessageIn
Automated refactor: rename Message to MessageIn
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5d46a014
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5d46a014
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5d46a014
Branch: refs/heads/trunk
Commit: 5d46a014be3b4ee3ff010c2c180ec3aa9ccac48b
Parents: 5a6f0b8
Author: Jonathan Ellis <jb...@apache.org>
Authored: Mon Mar 26 17:49:35 2012 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue May 8 00:13:30 2012 -0500
----------------------------------------------------------------------
.../cassandra/db/CounterMutationVerbHandler.java | 4 +-
.../cassandra/db/DefinitionsUpdateVerbHandler.java | 4 +-
.../org/apache/cassandra/db/IndexScanCommand.java | 4 +-
.../cassandra/db/MigrationRequestVerbHandler.java | 6 +-
.../org/apache/cassandra/db/RangeSliceCommand.java | 4 +-
.../org/apache/cassandra/db/RangeSliceReply.java | 3 -
.../apache/cassandra/db/ReadRepairVerbHandler.java | 4 +-
.../org/apache/cassandra/db/ReadVerbHandler.java | 4 +-
.../cassandra/db/RowMutationVerbHandler.java | 2 +-
.../cassandra/db/SchemaCheckVerbHandler.java | 4 +-
.../org/apache/cassandra/db/SnapshotCommand.java | 4 +-
.../apache/cassandra/db/TruncateVerbHandler.java | 6 +-
.../org/apache/cassandra/db/WriteResponse.java | 1 -
.../org/apache/cassandra/dht/BootStrapper.java | 4 +-
.../cassandra/gms/GossipDigestAck2VerbHandler.java | 4 +-
.../cassandra/gms/GossipDigestAckVerbHandler.java | 4 +-
.../cassandra/gms/GossipDigestSynVerbHandler.java | 4 +-
.../cassandra/gms/GossipShutdownVerbHandler.java | 5 +-
src/java/org/apache/cassandra/net/AsyncResult.java | 2 +-
.../org/apache/cassandra/net/IAsyncCallback.java | 2 +-
.../org/apache/cassandra/net/IAsyncResult.java | 2 +-
.../org/apache/cassandra/net/IVerbHandler.java | 2 +-
.../cassandra/net/IncomingTcpConnection.java | 2 +-
src/java/org/apache/cassandra/net/Message.java | 89 ---------------
.../apache/cassandra/net/MessageDeliveryTask.java | 4 +-
src/java/org/apache/cassandra/net/MessageIn.java | 88 ++++++++++++++
.../org/apache/cassandra/net/MessagingService.java | 4 +-
.../apache/cassandra/net/ResponseVerbHandler.java | 2 +-
.../apache/cassandra/net/sink/IMessageSink.java | 4 +-
.../org/apache/cassandra/net/sink/SinkManager.java | 4 +-
.../cassandra/service/AbstractRowResolver.java | 10 +-
.../service/AbstractWriteResponseHandler.java | 4 +-
.../cassandra/service/AntiEntropyService.java | 6 +-
.../cassandra/service/AsyncRepairCallback.java | 4 +-
.../cassandra/service/DatacenterReadCallback.java | 4 +-
.../DatacenterSyncWriteResponseHandler.java | 4 +-
.../service/DatacenterWriteResponseHandler.java | 4 +-
.../cassandra/service/IResponseResolver.java | 6 +-
.../cassandra/service/IndexScanVerbHandler.java | 4 +-
.../apache/cassandra/service/MigrationManager.java | 5 -
.../service/RangeSliceResponseResolver.java | 12 +-
.../cassandra/service/RangeSliceVerbHandler.java | 4 +-
.../org/apache/cassandra/service/ReadCallback.java | 8 +-
.../apache/cassandra/service/RepairCallback.java | 4 +-
.../cassandra/service/RowDigestResolver.java | 6 +-
.../cassandra/service/RowRepairResolver.java | 6 +-
.../cassandra/service/SnapshotVerbHandler.java | 6 +-
.../org/apache/cassandra/service/StorageProxy.java | 2 +-
.../cassandra/service/TruncateResponseHandler.java | 4 +-
.../cassandra/service/WriteResponseHandler.java | 4 +-
.../apache/cassandra/streaming/FileStreamTask.java | 4 +-
.../streaming/ReplicationFinishedVerbHandler.java | 4 +-
.../streaming/StreamReplyVerbHandler.java | 4 +-
.../streaming/StreamRequestVerbHandler.java | 6 +-
.../cassandra/streaming/StreamingRepairTask.java | 4 +-
.../apache/cassandra/db/SerializationsTest.java | 5 +-
.../apache/cassandra/net/MessageSerializer.java | 10 +-
.../org/apache/cassandra/service/RemoveTest.java | 4 +-
.../org/apache/cassandra/streaming/StreamUtil.java | 4 +-
59 files changed, 207 insertions(+), 221 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d46a014/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
index ac27148..d370a85 100644
--- a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
@@ -27,7 +27,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.util.FastByteArrayInputStream;
import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.thrift.UnavailableException;
@@ -37,7 +37,7 @@ public class CounterMutationVerbHandler implements IVerbHandler
{
private static final Logger logger = LoggerFactory.getLogger(CounterMutationVerbHandler.class);
- public void doVerb(Message message, String id)
+ public void doVerb(MessageIn message, String id)
{
DataInputStream in = new DataInputStream(new FastByteArrayInputStream(message.getMessageBody()));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d46a014/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java b/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java
index b1f2f24..f017f97 100644
--- a/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java
@@ -23,7 +23,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.utils.WrappedRunnable;
/**
@@ -36,7 +36,7 @@ public class DefinitionsUpdateVerbHandler implements IVerbHandler
{
private static final Logger logger = LoggerFactory.getLogger(DefinitionsUpdateVerbHandler.class);
- public void doVerb(final Message message, String id)
+ public void doVerb(final MessageIn message, String id)
{
logger.debug("Received schema mutation push from " + message.getFrom());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d46a014/src/java/org/apache/cassandra/db/IndexScanCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/IndexScanCommand.java b/src/java/org/apache/cassandra/db/IndexScanCommand.java
index c7c96dd..2522f3a 100644
--- a/src/java/org/apache/cassandra/db/IndexScanCommand.java
+++ b/src/java/org/apache/cassandra/db/IndexScanCommand.java
@@ -25,7 +25,7 @@ import java.io.IOException;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.FastByteArrayInputStream;
-import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.thrift.IndexClause;
@@ -60,7 +60,7 @@ public class IndexScanCommand
return new MessageOut<IndexScanCommand>(StorageService.Verb.INDEX_SCAN, this, serializer);
}
- public static IndexScanCommand read(Message message) throws IOException
+ public static IndexScanCommand read(MessageIn message) throws IOException
{
byte[] bytes = message.getMessageBody();
FastByteArrayInputStream bis = new FastByteArrayInputStream(bytes);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d46a014/src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java b/src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java
index 4aa9808..60e660e 100644
--- a/src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java
@@ -17,15 +17,13 @@
*/
package org.apache.cassandra.db;
-import java.io.IOError;
-import java.io.IOException;
import java.util.Collection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.MigrationManager;
@@ -39,7 +37,7 @@ public class MigrationRequestVerbHandler implements IVerbHandler
{
private static final Logger logger = LoggerFactory.getLogger(MigrationRequestVerbHandler.class);
- public void doVerb(Message message, String id)
+ public void doVerb(MessageIn message, String id)
{
logger.debug("Received migration request from {}.", message.getFrom());
MessageOut<Collection<RowMutation>> response = new MessageOut<Collection<RowMutation>>(StorageService.Verb.INTERNAL_RESPONSE,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d46a014/src/java/org/apache/cassandra/db/RangeSliceCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeSliceCommand.java b/src/java/org/apache/cassandra/db/RangeSliceCommand.java
index 602602d..749c735 100644
--- a/src/java/org/apache/cassandra/db/RangeSliceCommand.java
+++ b/src/java/org/apache/cassandra/db/RangeSliceCommand.java
@@ -46,7 +46,7 @@ import java.util.List;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.FastByteArrayInputStream;
-import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.IReadCommand;
@@ -135,7 +135,7 @@ public class RangeSliceCommand implements IReadCommand
'}';
}
- public static RangeSliceCommand read(Message message) throws IOException
+ public static RangeSliceCommand read(MessageIn message) throws IOException
{
byte[] bytes = message.getMessageBody();
FastByteArrayInputStream bis = new FastByteArrayInputStream(bytes);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d46a014/src/java/org/apache/cassandra/db/RangeSliceReply.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeSliceReply.java b/src/java/org/apache/cassandra/db/RangeSliceReply.java
index a13bee5..ec82216 100644
--- a/src/java/org/apache/cassandra/db/RangeSliceReply.java
+++ b/src/java/org/apache/cassandra/db/RangeSliceReply.java
@@ -27,12 +27,9 @@ import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.io.util.FastByteArrayInputStream;
-import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.FBUtilities;
public class RangeSliceReply
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d46a014/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java b/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
index 6a8711a..5864961 100644
--- a/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
@@ -23,12 +23,12 @@ import java.io.IOException;
import org.apache.cassandra.io.util.FastByteArrayInputStream;
import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessagingService;
public class ReadRepairVerbHandler implements IVerbHandler
{
- public void doVerb(Message message, String id)
+ public void doVerb(MessageIn message, String id)
{
DataInputStream in = new DataInputStream(new FastByteArrayInputStream(message.getMessageBody()));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d46a014/src/java/org/apache/cassandra/db/ReadVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadVerbHandler.java b/src/java/org/apache/cassandra/db/ReadVerbHandler.java
index 007e1af..e8f5c10 100644
--- a/src/java/org/apache/cassandra/db/ReadVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/ReadVerbHandler.java
@@ -25,7 +25,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.io.util.FastByteArrayInputStream;
import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
@@ -35,7 +35,7 @@ public class ReadVerbHandler implements IVerbHandler
{
private static final Logger logger = LoggerFactory.getLogger( ReadVerbHandler.class );
- public void doVerb(Message message, String id)
+ public void doVerb(MessageIn message, String id)
{
if (StorageService.instance.isBootstrapMode())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d46a014/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java b/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
index e835368..c9052c5 100644
--- a/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
@@ -32,7 +32,7 @@ public class RowMutationVerbHandler implements IVerbHandler
{
private static final Logger logger = LoggerFactory.getLogger(RowMutationVerbHandler.class);
- public void doVerb(Message message, String id)
+ public void doVerb(MessageIn message, String id)
{
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d46a014/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java b/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
index fadc752..64e4080 100644
--- a/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
@@ -21,7 +21,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
@@ -31,7 +31,7 @@ public class SchemaCheckVerbHandler implements IVerbHandler
{
private final Logger logger = LoggerFactory.getLogger(SchemaCheckVerbHandler.class);
- public void doVerb(Message message, String id)
+ public void doVerb(MessageIn message, String id)
{
logger.debug("Received schema check request.");
MessageOut response = new MessageOut(StorageService.Verb.INTERNAL_RESPONSE);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d46a014/src/java/org/apache/cassandra/db/SnapshotCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SnapshotCommand.java b/src/java/org/apache/cassandra/db/SnapshotCommand.java
index 8704759..b92ee5e 100644
--- a/src/java/org/apache/cassandra/db/SnapshotCommand.java
+++ b/src/java/org/apache/cassandra/db/SnapshotCommand.java
@@ -24,7 +24,7 @@ import java.io.IOException;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.FastByteArrayInputStream;
-import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.service.StorageService;
@@ -50,7 +50,7 @@ public class SnapshotCommand
return new MessageOut<SnapshotCommand>(StorageService.Verb.SNAPSHOT, this, serializer);
}
- public static SnapshotCommand read(Message message) throws IOException
+ public static SnapshotCommand read(MessageIn message) throws IOException
{
byte[] bytes = message.getMessageBody();
FastByteArrayInputStream bis = new FastByteArrayInputStream(bytes);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d46a014/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/TruncateVerbHandler.java b/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
index 520f827..82bf14f 100644
--- a/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
@@ -26,14 +26,14 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.io.util.FastByteArrayInputStream;
import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessagingService;
public class TruncateVerbHandler implements IVerbHandler
{
private static final Logger logger = LoggerFactory.getLogger(TruncateVerbHandler.class);
- public void doVerb(Message message, String id)
+ public void doVerb(MessageIn message, String id)
{
DataInputStream in = new DataInputStream(new FastByteArrayInputStream(message.getMessageBody()));
@@ -64,7 +64,7 @@ public class TruncateVerbHandler implements IVerbHandler
}
}
- private static void respondError(Truncation t, Message truncateRequestMessage) throws IOException
+ private static void respondError(Truncation t, MessageIn truncateRequestMessage) throws IOException
{
TruncateResponse response = new TruncateResponse(t.keyspace, t.columnFamily, false);
MessagingService.instance().sendOneWay(response.createMessage(), truncateRequestMessage.getFrom());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d46a014/src/java/org/apache/cassandra/db/WriteResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/WriteResponse.java b/src/java/org/apache/cassandra/db/WriteResponse.java
index f2c1340..ef03f6a 100644
--- a/src/java/org/apache/cassandra/db/WriteResponse.java
+++ b/src/java/org/apache/cassandra/db/WriteResponse.java
@@ -23,7 +23,6 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d46a014/src/java/org/apache/cassandra/dht/BootStrapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/BootStrapper.java b/src/java/org/apache/cassandra/dht/BootStrapper.java
index fcadbf9..bfe80c8 100644
--- a/src/java/org/apache/cassandra/dht/BootStrapper.java
+++ b/src/java/org/apache/cassandra/dht/BootStrapper.java
@@ -173,7 +173,7 @@ public class BootStrapper
public static class BootstrapTokenVerbHandler implements IVerbHandler
{
- public void doVerb(Message message, String id)
+ public void doVerb(MessageIn message, String id)
{
StorageService ss = StorageService.instance;
String tokenString = StorageService.getPartitioner().getTokenFactory().toString(ss.getBootstrapToken());
@@ -202,7 +202,7 @@ public class BootStrapper
return success ? token : null;
}
- public void response(Message msg)
+ public void response(MessageIn msg)
{
token = StorageService.getPartitioner().getTokenFactory().fromString(new String(msg.getMessageBody(), Charsets.UTF_8));
condition.signalAll();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d46a014/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java b/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java
index a4f1f09..141429e 100644
--- a/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java
@@ -27,13 +27,13 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.io.util.FastByteArrayInputStream;
import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageIn;
public class GossipDigestAck2VerbHandler implements IVerbHandler
{
private static final Logger logger = LoggerFactory.getLogger(GossipDigestAck2VerbHandler.class);
- public void doVerb(Message message, String id)
+ public void doVerb(MessageIn message, String id)
{
if (logger.isTraceEnabled())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d46a014/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java b/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
index 2bc04c6..f747bdd 100644
--- a/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
@@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.io.util.FastByteArrayInputStream;
import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
@@ -38,7 +38,7 @@ public class GossipDigestAckVerbHandler implements IVerbHandler
{
private static final Logger logger = LoggerFactory.getLogger(GossipDigestAckVerbHandler.class);
- public void doVerb(Message message, String id)
+ public void doVerb(MessageIn message, String id)
{
InetAddress from = message.getFrom();
if (logger.isTraceEnabled())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d46a014/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java b/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
index 0d673dc..2f45095 100644
--- a/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
@@ -28,7 +28,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.util.FastByteArrayInputStream;
import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
@@ -37,7 +37,7 @@ public class GossipDigestSynVerbHandler implements IVerbHandler
{
private static final Logger logger = LoggerFactory.getLogger( GossipDigestSynVerbHandler.class);
- public void doVerb(Message message, String id)
+ public void doVerb(MessageIn message, String id)
{
InetAddress from = message.getFrom();
if (logger.isTraceEnabled())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d46a014/src/java/org/apache/cassandra/gms/GossipShutdownVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipShutdownVerbHandler.java b/src/java/org/apache/cassandra/gms/GossipShutdownVerbHandler.java
index 352e229..b29013d 100644
--- a/src/java/org/apache/cassandra/gms/GossipShutdownVerbHandler.java
+++ b/src/java/org/apache/cassandra/gms/GossipShutdownVerbHandler.java
@@ -18,7 +18,8 @@
package org.apache.cassandra.gms;
import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageIn;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,7 +29,7 @@ public class GossipShutdownVerbHandler implements IVerbHandler
{
private static final Logger logger = LoggerFactory.getLogger(GossipShutdownVerbHandler.class);
- public void doVerb(Message message, String id)
+ public void doVerb(MessageIn message, String id)
{
InetAddress from = message.getFrom();
if (!Gossiper.instance.isEnabled())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d46a014/src/java/org/apache/cassandra/net/AsyncResult.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/AsyncResult.java b/src/java/org/apache/cassandra/net/AsyncResult.java
index a30cb9f..9a3985b 100644
--- a/src/java/org/apache/cassandra/net/AsyncResult.java
+++ b/src/java/org/apache/cassandra/net/AsyncResult.java
@@ -77,7 +77,7 @@ class AsyncResult implements IAsyncResult
return result;
}
- public void result(Message response)
+ public void result(MessageIn response)
{
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d46a014/src/java/org/apache/cassandra/net/IAsyncCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IAsyncCallback.java b/src/java/org/apache/cassandra/net/IAsyncCallback.java
index 6a5fa87..797779b 100644
--- a/src/java/org/apache/cassandra/net/IAsyncCallback.java
+++ b/src/java/org/apache/cassandra/net/IAsyncCallback.java
@@ -28,5 +28,5 @@ public interface IAsyncCallback extends IMessageCallback
/**
* @param msg response received.
*/
- public void response(Message msg);
+ public void response(MessageIn msg);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d46a014/src/java/org/apache/cassandra/net/IAsyncResult.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IAsyncResult.java b/src/java/org/apache/cassandra/net/IAsyncResult.java
index 0c0f5cb..545cfb3 100644
--- a/src/java/org/apache/cassandra/net/IAsyncResult.java
+++ b/src/java/org/apache/cassandra/net/IAsyncResult.java
@@ -36,7 +36,7 @@ public interface IAsyncResult extends IMessageCallback
* Store the result obtained for the submitted task.
* @param result the response message
*/
- public void result(Message result);
+ public void result(MessageIn result);
public InetAddress getFrom();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d46a014/src/java/org/apache/cassandra/net/IVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IVerbHandler.java b/src/java/org/apache/cassandra/net/IVerbHandler.java
index d9f4887..96867ec 100644
--- a/src/java/org/apache/cassandra/net/IVerbHandler.java
+++ b/src/java/org/apache/cassandra/net/IVerbHandler.java
@@ -34,5 +34,5 @@ public interface IVerbHandler
* @param message - incoming message that needs handling.
* @param id
*/
- public void doVerb(Message message, String id);
+ public void doVerb(MessageIn message, String id);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d46a014/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
index fb5355e..9647e8f 100644
--- a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
@@ -147,7 +147,7 @@ public class IncomingTcpConnection extends Thread
// starts sending correct-version messages (which it can do without reconnecting -- version is per-Message)
if (version <= MessagingService.current_version)
{
- Message message = new Message(header, body, version);
+ MessageIn message = new MessageIn(header, body, version);
MessagingService.instance().receive(message, id);
}
else
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d46a014/src/java/org/apache/cassandra/net/Message.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/Message.java b/src/java/org/apache/cassandra/net/Message.java
deleted file mode 100644
index ab3c7f1..0000000
--- a/src/java/org/apache/cassandra/net/Message.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.net;
-
-import java.net.InetAddress;
-
-import org.apache.cassandra.concurrent.Stage;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.FBUtilities;
-
-public class Message
-{
- final Header header;
- private final byte[] body;
- private final transient int version;
-
- public Message(Header header, byte[] body, int version)
- {
- assert header != null;
- assert body != null;
-
- this.header = header;
- this.body = body;
- this.version = version;
- }
-
- public Message(InetAddress from, StorageService.Verb verb, byte[] body, int version)
- {
- this(new Header(from, verb), body, version);
- }
-
- public byte[] getHeader(String key)
- {
- return header.getDetail(key);
- }
-
- public byte[] getMessageBody()
- {
- return body;
- }
-
- public int getVersion()
- {
- return version;
- }
-
- public InetAddress getFrom()
- {
- return header.getFrom();
- }
-
- public Stage getMessageType()
- {
- return StorageService.verbStages.get(getVerb());
- }
-
- public StorageService.Verb getVerb()
- {
- return header.getVerb();
- }
-
- public String toString()
- {
- StringBuilder sbuf = new StringBuilder("");
- String separator = System.getProperty("line.separator");
- sbuf.append("FROM:" + getFrom())
- .append(separator)
- .append("TYPE:" + getMessageType())
- .append(separator)
- .append("VERB:" + getVerb())
- .append(separator);
- return sbuf.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d46a014/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
index 7a90e00..2a947e8 100644
--- a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
+++ b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
@@ -27,11 +27,11 @@ public class MessageDeliveryTask implements Runnable
{
private static final Logger logger = LoggerFactory.getLogger(MessageDeliveryTask.class);
- private final Message message;
+ private final MessageIn message;
private final long constructionTime = System.currentTimeMillis();
private final String id;
- public MessageDeliveryTask(Message message, String id)
+ public MessageDeliveryTask(MessageIn message, String id)
{
assert message != null;
this.message = message;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d46a014/src/java/org/apache/cassandra/net/MessageIn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageIn.java b/src/java/org/apache/cassandra/net/MessageIn.java
new file mode 100644
index 0000000..885b7ef
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/MessageIn.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.net;
+
+import java.net.InetAddress;
+
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.service.StorageService;
+
+public class MessageIn
+{
+ final Header header;
+ private final byte[] body;
+ private final transient int version;
+
+ public MessageIn(Header header, byte[] body, int version)
+ {
+ assert header != null;
+ assert body != null;
+
+ this.header = header;
+ this.body = body;
+ this.version = version;
+ }
+
+ public MessageIn(InetAddress from, StorageService.Verb verb, byte[] body, int version)
+ {
+ this(new Header(from, verb), body, version);
+ }
+
+ public byte[] getHeader(String key)
+ {
+ return header.getDetail(key);
+ }
+
+ public byte[] getMessageBody()
+ {
+ return body;
+ }
+
+ public int getVersion()
+ {
+ return version;
+ }
+
+ public InetAddress getFrom()
+ {
+ return header.getFrom();
+ }
+
+ public Stage getMessageType()
+ {
+ return StorageService.verbStages.get(getVerb());
+ }
+
+ public StorageService.Verb getVerb()
+ {
+ return header.getVerb();
+ }
+
+ public String toString()
+ {
+ StringBuilder sbuf = new StringBuilder("");
+ String separator = System.getProperty("line.separator");
+ sbuf.append("FROM:" + getFrom())
+ .append(separator)
+ .append("TYPE:" + getMessageType())
+ .append(separator)
+ .append("VERB:" + getVerb())
+ .append(separator);
+ return sbuf.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d46a014/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 45a9de4..05dc07f 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -27,7 +27,6 @@ import java.nio.channels.ServerSocketChannel;
import java.util.*;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -40,7 +39,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.ConfigurationException;
@@ -518,7 +516,7 @@ public final class MessagingService implements MessagingServiceMBean
}
}
- public void receive(Message message, String id)
+ public void receive(MessageIn message, String id)
{
if (logger.isTraceEnabled())
logger.trace(FBUtilities.getBroadcastAddress() + " received " + message.getVerb()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d46a014/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
index 3bdd4df..3016daf 100644
--- a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
+++ b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
@@ -24,7 +24,7 @@ public class ResponseVerbHandler implements IVerbHandler
{
private static final Logger logger = LoggerFactory.getLogger( ResponseVerbHandler.class );
- public void doVerb(Message message, String id)
+ public void doVerb(MessageIn message, String id)
{
double age = System.currentTimeMillis() - MessagingService.instance().getRegisteredCallbackAge(id);
CallbackInfo callbackInfo = MessagingService.instance().removeRegisteredCallback(id);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d46a014/src/java/org/apache/cassandra/net/sink/IMessageSink.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/sink/IMessageSink.java b/src/java/org/apache/cassandra/net/sink/IMessageSink.java
index 8d0ea2e..74d7c70 100644
--- a/src/java/org/apache/cassandra/net/sink/IMessageSink.java
+++ b/src/java/org/apache/cassandra/net/sink/IMessageSink.java
@@ -19,12 +19,12 @@ package org.apache.cassandra.net.sink;
import java.net.InetAddress;
-import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
public interface IMessageSink
{
public MessageOut handleMessage(MessageOut message, String id, InetAddress to);
- public Message handleMessage(Message message, String id, InetAddress to);
+ public MessageIn handleMessage(MessageIn message, String id, InetAddress to);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d46a014/src/java/org/apache/cassandra/net/sink/SinkManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/sink/SinkManager.java b/src/java/org/apache/cassandra/net/sink/SinkManager.java
index e0d0cd5..dc3085c 100644
--- a/src/java/org/apache/cassandra/net/sink/SinkManager.java
+++ b/src/java/org/apache/cassandra/net/sink/SinkManager.java
@@ -21,7 +21,7 @@ import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
-import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
public class SinkManager
@@ -52,7 +52,7 @@ public class SinkManager
return message;
}
- public static Message processInboundMessage(Message message, String id)
+ public static MessageIn processInboundMessage(MessageIn message, String id)
{
if (sinks.isEmpty())
return message;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d46a014/src/java/org/apache/cassandra/service/AbstractRowResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractRowResolver.java b/src/java/org/apache/cassandra/service/AbstractRowResolver.java
index 002ba89..7dc7589 100644
--- a/src/java/org/apache/cassandra/service/AbstractRowResolver.java
+++ b/src/java/org/apache/cassandra/service/AbstractRowResolver.java
@@ -31,7 +31,7 @@ import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.ReadResponse;
import org.apache.cassandra.db.Row;
import org.apache.cassandra.io.util.FastByteArrayInputStream;
-import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.utils.FBUtilities;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
@@ -39,10 +39,10 @@ public abstract class AbstractRowResolver implements IResponseResolver<Row>
{
protected static final Logger logger = LoggerFactory.getLogger(AbstractRowResolver.class);
- private static final Message FAKE_MESSAGE = new Message(FBUtilities.getBroadcastAddress(), StorageService.Verb.INTERNAL_RESPONSE, ArrayUtils.EMPTY_BYTE_ARRAY, -1);
+ private static final MessageIn FAKE_MESSAGE = new MessageIn(FBUtilities.getBroadcastAddress(), StorageService.Verb.INTERNAL_RESPONSE, ArrayUtils.EMPTY_BYTE_ARRAY, -1);
protected final String table;
- protected final ConcurrentMap<Message, ReadResponse> replies = new NonBlockingHashMap<Message, ReadResponse>();
+ protected final ConcurrentMap<MessageIn, ReadResponse> replies = new NonBlockingHashMap<MessageIn, ReadResponse>();
protected final DecoratedKey key;
public AbstractRowResolver(ByteBuffer key, String table)
@@ -51,7 +51,7 @@ public abstract class AbstractRowResolver implements IResponseResolver<Row>
this.table = table;
}
- public void preprocess(Message message)
+ public void preprocess(MessageIn message)
{
byte[] body = message.getMessageBody();
FastByteArrayInputStream bufIn = new FastByteArrayInputStream(body);
@@ -75,7 +75,7 @@ public abstract class AbstractRowResolver implements IResponseResolver<Row>
replies.put(FAKE_MESSAGE, result);
}
- public Iterable<Message> getMessages()
+ public Iterable<MessageIn> getMessages()
{
return replies.keySet();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d46a014/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
index 3f6742e..a6c38ae 100644
--- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
@@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.UnavailableException;
import org.apache.cassandra.utils.SimpleCondition;
@@ -62,7 +62,7 @@ public abstract class AbstractWriteResponseHandler implements IWriteResponseHand
}
/** null message means "response from local write" */
- public abstract void response(Message msg);
+ public abstract void response(MessageIn msg);
public abstract void assureSufficientLiveNodes() throws UnavailableException;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d46a014/src/java/org/apache/cassandra/service/AntiEntropyService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AntiEntropyService.java b/src/java/org/apache/cassandra/service/AntiEntropyService.java
index e339363..0cce133 100644
--- a/src/java/org/apache/cassandra/service/AntiEntropyService.java
+++ b/src/java/org/apache/cassandra/service/AntiEntropyService.java
@@ -459,7 +459,7 @@ public class AntiEntropyService
/**
* Trigger a validation compaction which will return the tree upon completion.
*/
- public void doVerb(Message message, String id)
+ public void doVerb(MessageIn message, String id)
{
byte[] bytes = message.getMessageBody();
@@ -488,7 +488,7 @@ public class AntiEntropyService
*/
public static class TreeResponseVerbHandler implements IVerbHandler
{
- public void doVerb(Message message, String id)
+ public void doVerb(MessageIn message, String id)
{
byte[] bytes = message.getMessageBody();
DataInputStream buffer = new DataInputStream(new FastByteArrayInputStream(bytes));
@@ -888,7 +888,7 @@ public class AntiEntropyService
return false;
}
- public void response(Message msg)
+ public void response(MessageIn msg)
{
RepairJob.this.snapshotLatch.countDown();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d46a014/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AsyncRepairCallback.java b/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
index 08874ea..675f61c 100644
--- a/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
+++ b/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
@@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.net.IAsyncCallback;
-import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.utils.WrappedRunnable;
public class AsyncRepairCallback implements IAsyncCallback
@@ -38,7 +38,7 @@ public class AsyncRepairCallback implements IAsyncCallback
this.blockfor = blockfor;
}
- public void response(Message message)
+ public void response(MessageIn message)
{
repairResolver.preprocess(message);
if (received.incrementAndGet() == blockfor)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d46a014/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DatacenterReadCallback.java b/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
index a08e757..36ed9b3 100644
--- a/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
+++ b/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
@@ -25,7 +25,7 @@ import java.util.List;
import org.apache.cassandra.db.ReadResponse;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.locator.NetworkTopologyStrategy;
-import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.UnavailableException;
@@ -60,7 +60,7 @@ public class DatacenterReadCallback<T> extends ReadCallback<T>
}
@Override
- protected boolean waitingFor(Message message)
+ protected boolean waitingFor(MessageIn message)
{
return localdc.equals(snitch.getDatacenter(message.getFrom()));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d46a014/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
index a0b3a8b..149d05b 100644
--- a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
@@ -28,7 +28,7 @@ import org.apache.cassandra.db.Table;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.locator.IEndpointSnitch;
import org.apache.cassandra.locator.NetworkTopologyStrategy;
-import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.UnavailableException;
import org.apache.cassandra.utils.FBUtilities;
@@ -69,7 +69,7 @@ public class DatacenterSyncWriteResponseHandler extends AbstractWriteResponseHan
return new DatacenterSyncWriteResponseHandler(writeEndpoints, consistencyLevel, table);
}
- public void response(Message message)
+ public void response(MessageIn message)
{
String dataCenter = message == null
? localdc
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d46a014/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
index c7b6345..a61a203 100644
--- a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
@@ -25,7 +25,7 @@ import org.apache.cassandra.db.Table;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.locator.IEndpointSnitch;
import org.apache.cassandra.locator.NetworkTopologyStrategy;
-import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.UnavailableException;
import org.apache.cassandra.utils.FBUtilities;
@@ -63,7 +63,7 @@ public class DatacenterWriteResponseHandler extends WriteResponseHandler
@Override
- public void response(Message message)
+ public void response(MessageIn message)
{
if (message == null || localdc.equals(snitch.getDatacenter(message.getFrom())))
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d46a014/src/java/org/apache/cassandra/service/IResponseResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/IResponseResolver.java b/src/java/org/apache/cassandra/service/IResponseResolver.java
index ec69484..fe0593b 100644
--- a/src/java/org/apache/cassandra/service/IResponseResolver.java
+++ b/src/java/org/apache/cassandra/service/IResponseResolver.java
@@ -19,7 +19,7 @@ package org.apache.cassandra.service;
import java.io.IOException;
-import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageIn;
public interface IResponseResolver<T> {
@@ -40,8 +40,8 @@ public interface IResponseResolver<T> {
*/
public T getData() throws IOException;
- public void preprocess(Message message);
- public Iterable<Message> getMessages();
+ public void preprocess(MessageIn message);
+ public Iterable<MessageIn> getMessages();
public int getMaxLiveColumns();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d46a014/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java b/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
index 27970aa..dccb294 100644
--- a/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
@@ -25,7 +25,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessagingService;
@Deprecated // 1.1 implements index scan with RangeSliceVerb instead
@@ -33,7 +33,7 @@ public class IndexScanVerbHandler implements IVerbHandler
{
private static final Logger logger = LoggerFactory.getLogger(IndexScanVerbHandler.class);
- public void doVerb(Message message, String id)
+ public void doVerb(MessageIn message, String id)
{
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d46a014/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java
index 68d0009..4477f25 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -40,14 +40,9 @@ import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.filter.QueryPath;
-import org.apache.cassandra.db.Column;
import org.apache.cassandra.db.DBConstants;
-import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.gms.*;
import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.util.FastByteArrayInputStream;
-import org.apache.cassandra.io.util.FastByteArrayOutputStream;
-import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.ByteBufferUtil;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d46a014/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java b/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
index 763a721..d260f37 100644
--- a/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
+++ b/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
@@ -30,7 +30,7 @@ import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.RangeSliceReply;
import org.apache.cassandra.db.Row;
import org.apache.cassandra.net.IAsyncResult;
-import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.CloseableIterator;
import org.apache.cassandra.utils.MergeIterator;
@@ -53,7 +53,7 @@ public class RangeSliceResponseResolver implements IResponseResolver<Iterable<Ro
private final String table;
private List<InetAddress> sources;
- protected final Collection<Message> responses = new LinkedBlockingQueue<Message>();;
+ protected final Collection<MessageIn> responses = new LinkedBlockingQueue<MessageIn>();;
public final List<IAsyncResult> repairResults = new ArrayList<IAsyncResult>();
public RangeSliceResponseResolver(String table)
@@ -68,7 +68,7 @@ public class RangeSliceResponseResolver implements IResponseResolver<Iterable<Ro
public List<Row> getData() throws IOException
{
- Message response = responses.iterator().next();
+ MessageIn response = responses.iterator().next();
RangeSliceReply reply = RangeSliceReply.read(response.getMessageBody(), response.getVersion());
return reply.rows;
}
@@ -79,7 +79,7 @@ public class RangeSliceResponseResolver implements IResponseResolver<Iterable<Ro
{
ArrayList<RowIterator> iters = new ArrayList<RowIterator>(responses.size());
int n = 0;
- for (Message response : responses)
+ for (MessageIn response : responses)
{
RangeSliceReply reply = RangeSliceReply.read(response.getMessageBody(), response.getVersion());
n = Math.max(n, reply.rows.size());
@@ -96,7 +96,7 @@ public class RangeSliceResponseResolver implements IResponseResolver<Iterable<Ro
return resolvedRows;
}
- public void preprocess(Message message)
+ public void preprocess(MessageIn message)
{
responses.add(message);
}
@@ -125,7 +125,7 @@ public class RangeSliceResponseResolver implements IResponseResolver<Iterable<Ro
public void close() {}
}
- public Iterable<Message> getMessages()
+ public Iterable<MessageIn> getMessages()
{
return responses;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d46a014/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java b/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
index 49d9ef1..a1d60bb 100644
--- a/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
@@ -31,7 +31,7 @@ import org.apache.cassandra.db.Table;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.filter.IFilter;
import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessagingService;
public class RangeSliceVerbHandler implements IVerbHandler
@@ -49,7 +49,7 @@ public class RangeSliceVerbHandler implements IVerbHandler
return cfs.getRangeSlice(command.super_column, command.range, command.maxResults, columnFilter, command.row_filter, command.maxIsColumns, command.isPaging);
}
- public void doVerb(Message message, String id)
+ public void doVerb(MessageIn message, String id)
{
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d46a014/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java
index 8ac9981..66d973b 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -38,7 +38,7 @@ import org.apache.cassandra.db.ReadResponse;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.locator.IEndpointSnitch;
import org.apache.cassandra.net.IAsyncCallback;
-import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.thrift.ConsistencyLevel;
@@ -142,7 +142,7 @@ public class ReadCallback<T> implements IAsyncCallback
if (!success)
{
StringBuilder sb = new StringBuilder("");
- for (Message message : resolver.getMessages())
+ for (MessageIn message : resolver.getMessages())
sb.append(message.getFrom()).append(", ");
throw new TimeoutException("Operation timed out - received only " + received.get() + " responses from " + sb.toString() + " .");
}
@@ -150,7 +150,7 @@ public class ReadCallback<T> implements IAsyncCallback
return blockfor == 1 ? resolver.getData() : resolver.resolve();
}
- public void response(Message message)
+ public void response(MessageIn message)
{
resolver.preprocess(message);
int n = waitingFor(message)
@@ -167,7 +167,7 @@ public class ReadCallback<T> implements IAsyncCallback
* @return true if the message counts towards the blockfor threshold
* TODO turn the Message into a response so we don't need two versions of this method
*/
- protected boolean waitingFor(Message message)
+ protected boolean waitingFor(MessageIn message)
{
return true;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d46a014/src/java/org/apache/cassandra/service/RepairCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RepairCallback.java b/src/java/org/apache/cassandra/service/RepairCallback.java
index b9b9701..97f3079 100644
--- a/src/java/org/apache/cassandra/service/RepairCallback.java
+++ b/src/java/org/apache/cassandra/service/RepairCallback.java
@@ -27,7 +27,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.Row;
import org.apache.cassandra.net.IAsyncCallback;
-import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.utils.SimpleCondition;
public class RepairCallback implements IAsyncCallback
@@ -68,7 +68,7 @@ public class RepairCallback implements IAsyncCallback
return received.get() > 1 ? resolver.resolve() : null;
}
- public void response(Message message)
+ public void response(MessageIn message)
{
resolver.preprocess(message);
if (received.incrementAndGet() == endpoints.size())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d46a014/src/java/org/apache/cassandra/service/RowDigestResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RowDigestResolver.java b/src/java/org/apache/cassandra/service/RowDigestResolver.java
index bf307af..3f42d85 100644
--- a/src/java/org/apache/cassandra/service/RowDigestResolver.java
+++ b/src/java/org/apache/cassandra/service/RowDigestResolver.java
@@ -24,7 +24,7 @@ import java.util.Map;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.ReadResponse;
import org.apache.cassandra.db.Row;
-import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageIn;
public class RowDigestResolver extends AbstractRowResolver
{
@@ -38,7 +38,7 @@ public class RowDigestResolver extends AbstractRowResolver
*/
public Row getData() throws IOException
{
- for (Map.Entry<Message, ReadResponse> entry : replies.entrySet())
+ for (Map.Entry<MessageIn, ReadResponse> entry : replies.entrySet())
{
ReadResponse result = entry.getValue();
if (!result.isDigestQuery())
@@ -69,7 +69,7 @@ public class RowDigestResolver extends AbstractRowResolver
// also extract the data reply, if any.
ColumnFamily data = null;
ByteBuffer digest = null;
- for (Map.Entry<Message, ReadResponse> entry : replies.entrySet())
+ for (Map.Entry<MessageIn, ReadResponse> entry : replies.entrySet())
{
ReadResponse response = entry.getValue();
if (response.isDigestQuery())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d46a014/src/java/org/apache/cassandra/service/RowRepairResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RowRepairResolver.java b/src/java/org/apache/cassandra/service/RowRepairResolver.java
index 00fc4ad..547eb25 100644
--- a/src/java/org/apache/cassandra/service/RowRepairResolver.java
+++ b/src/java/org/apache/cassandra/service/RowRepairResolver.java
@@ -32,7 +32,7 @@ import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.net.IAsyncResult;
-import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.CloseableIterator;
@@ -67,9 +67,9 @@ public class RowRepairResolver extends AbstractRowResolver
List<ColumnFamily> versions = new ArrayList<ColumnFamily>(replies.size());
List<InetAddress> endpoints = new ArrayList<InetAddress>(replies.size());
- for (Map.Entry<Message, ReadResponse> entry : replies.entrySet())
+ for (Map.Entry<MessageIn, ReadResponse> entry : replies.entrySet())
{
- Message message = entry.getKey();
+ MessageIn message = entry.getKey();
ReadResponse response = entry.getValue();
ColumnFamily cf = response.row().cf;
assert !response.isDigestQuery() : "Received digest response to repair read from " + entry.getKey().getFrom();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d46a014/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java b/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java
index b00901f..c00cc53 100644
--- a/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java
@@ -20,17 +20,17 @@ package org.apache.cassandra.service;
import org.apache.cassandra.db.SnapshotCommand;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.FBUtilities;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SnapshotVerbHandler implements IVerbHandler
{
private static final Logger logger = LoggerFactory.getLogger(SnapshotVerbHandler.class);
- public void doVerb(Message message, String id)
+ public void doVerb(MessageIn message, String id)
{
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d46a014/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index e393590..f4c1654 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -949,7 +949,7 @@ public class StorageProxy implements StorageProxyMBean
IAsyncCallback cb = new IAsyncCallback()
{
- public void response(Message message)
+ public void response(MessageIn message)
{
// record the response from the remote node.
logger.debug("Received schema check response from {}", message.getFrom().getHostAddress());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d46a014/src/java/org/apache/cassandra/service/TruncateResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/TruncateResponseHandler.java b/src/java/org/apache/cassandra/service/TruncateResponseHandler.java
index 6e9bf1e..e5dc93d 100644
--- a/src/java/org/apache/cassandra/service/TruncateResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/TruncateResponseHandler.java
@@ -26,7 +26,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.net.IAsyncCallback;
-import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.utils.SimpleCondition;
public class TruncateResponseHandler implements IAsyncCallback
@@ -66,7 +66,7 @@ public class TruncateResponseHandler implements IAsyncCallback
}
}
- public void response(Message message)
+ public void response(MessageIn message)
{
responses.incrementAndGet();
if (responses.get() >= responseCount)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d46a014/src/java/org/apache/cassandra/service/WriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/WriteResponseHandler.java b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
index 081cc6b..41a6ac3 100644
--- a/src/java/org/apache/cassandra/service/WriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
@@ -27,7 +27,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.gms.FailureDetector;
-import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.UnavailableException;
import org.apache.cassandra.utils.FBUtilities;
@@ -63,7 +63,7 @@ public class WriteResponseHandler extends AbstractWriteResponseHandler
return new WriteResponseHandler(endpoint);
}
- public void response(Message m)
+ public void response(MessageIn m)
{
if (responses.decrementAndGet() == 0)
condition.signal();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d46a014/src/java/org/apache/cassandra/streaming/FileStreamTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/FileStreamTask.java b/src/java/org/apache/cassandra/streaming/FileStreamTask.java
index 583ea6c..cc09a36 100644
--- a/src/java/org/apache/cassandra/streaming/FileStreamTask.java
+++ b/src/java/org/apache/cassandra/streaming/FileStreamTask.java
@@ -28,7 +28,7 @@ import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.net.Header;
-import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -202,7 +202,7 @@ public class FileStreamTask extends WrappedRunnable
int bodySize = input.readInt();
byte[] body = new byte[bodySize];
input.readFully(body);
- Message message = new Message(header, body, version);
+ MessageIn message = new MessageIn(header, body, version);
assert message.getVerb() == StorageService.Verb.STREAM_REPLY : "Non-reply message received on stream socket";
handler.doVerb(message, id);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d46a014/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java b/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java
index 67e3b66..f5e27b6 100644
--- a/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java
@@ -21,7 +21,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
@@ -30,7 +30,7 @@ public class ReplicationFinishedVerbHandler implements IVerbHandler
{
private static final Logger logger = LoggerFactory.getLogger(ReplicationFinishedVerbHandler.class);
- public void doVerb(Message msg, String id)
+ public void doVerb(MessageIn msg, String id)
{
StorageService.instance.confirmReplication(msg.getFrom());
MessageOut response = new MessageOut(StorageService.Verb.INTERNAL_RESPONSE);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d46a014/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java b/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
index 0cd945e..1155518 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
@@ -27,13 +27,13 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.io.util.FastByteArrayInputStream;
import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageIn;
public class StreamReplyVerbHandler implements IVerbHandler
{
private static final Logger logger = LoggerFactory.getLogger(StreamReplyVerbHandler.class);
- public void doVerb(Message message, String id)
+ public void doVerb(MessageIn message, String id)
{
byte[] body = message.getMessageBody();
FastByteArrayInputStream bufIn = new FastByteArrayInputStream(body);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d46a014/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java b/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
index f231f2d..6e20f19 100644
--- a/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
+++ b/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
@@ -26,9 +26,9 @@
import org.apache.cassandra.io.util.FastByteArrayInputStream;
import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageIn;
- /**
+/**
* This verb handler handles the StreamRequestMessage that is sent by
* the node requesting range transfer.
*/
@@ -36,7 +36,7 @@ public class StreamRequestVerbHandler implements IVerbHandler
{
private static final Logger logger = LoggerFactory.getLogger(StreamRequestVerbHandler.class);
- public void doVerb(Message message, String id)
+ public void doVerb(MessageIn message, String id)
{
if (logger.isDebugEnabled())
logger.debug("Received a StreamRequestMessage from {}", message.getFrom());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d46a014/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java b/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
index 977a8a8..ad2c77c 100644
--- a/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
@@ -189,7 +189,7 @@ public class StreamingRepairTask implements Runnable
public static class StreamingRepairRequest implements IVerbHandler
{
- public void doVerb(Message message, String id)
+ public void doVerb(MessageIn message, String id)
{
byte[] bytes = message.getMessageBody();
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(bytes));
@@ -216,7 +216,7 @@ public class StreamingRepairTask implements Runnable
public static class StreamingRepairResponse implements IVerbHandler
{
- public void doVerb(Message message, String id)
+ public void doVerb(MessageIn message, String id)
{
byte[] bytes = message.getMessageBody();
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(bytes));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d46a014/test/unit/org/apache/cassandra/db/SerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/SerializationsTest.java b/test/unit/org/apache/cassandra/db/SerializationsTest.java
index a340a94..0ea99ef 100644
--- a/test/unit/org/apache/cassandra/db/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/db/SerializationsTest.java
@@ -29,10 +29,9 @@ import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessageSerializer;
-import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.SliceRange;
@@ -95,7 +94,7 @@ public class SerializationsTest extends AbstractSerializationsTester
DataInputStream in = getInput("db.RangeSliceCommand.bin");
for (int i = 0; i < 6; i++)
{
- Message msg = messageSerializer.deserialize(in, getVersion());
+ MessageIn msg = messageSerializer.deserialize(in, getVersion());
RangeSliceCommand cmd = RangeSliceCommand.read(msg);
}
in.close();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d46a014/test/unit/org/apache/cassandra/net/MessageSerializer.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/MessageSerializer.java b/test/unit/org/apache/cassandra/net/MessageSerializer.java
index be9eda5..ec0ab1d 100644
--- a/test/unit/org/apache/cassandra/net/MessageSerializer.java
+++ b/test/unit/org/apache/cassandra/net/MessageSerializer.java
@@ -25,10 +25,10 @@ import java.io.*;
import org.apache.cassandra.io.IVersionedSerializer;
-public class MessageSerializer implements IVersionedSerializer<Message>
+public class MessageSerializer implements IVersionedSerializer<MessageIn>
{
// TODO imitate backwards-compatibility code from OutboundTcpConnection here
- public void serialize(Message t, DataOutput dos, int version) throws IOException
+ public void serialize(MessageIn t, DataOutput dos, int version) throws IOException
{
Header.serializer().serialize(t.header, dos, version);
byte[] bytes = t.getMessageBody();
@@ -36,16 +36,16 @@ public class MessageSerializer implements IVersionedSerializer<Message>
dos.write(bytes);
}
- public Message deserialize(DataInput dis, int version) throws IOException
+ public MessageIn deserialize(DataInput dis, int version) throws IOException
{
Header header = Header.serializer().deserialize(dis, version);
int size = dis.readInt();
byte[] bytes = new byte[size];
dis.readFully(bytes);
- return new Message(header, bytes, version);
+ return new MessageIn(header, bytes, version);
}
- public long serializedSize(Message message, int version)
+ public long serializedSize(MessageIn message, int version)
{
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d46a014/test/unit/org/apache/cassandra/service/RemoveTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/RemoveTest.java b/test/unit/org/apache/cassandra/service/RemoveTest.java
index d3cd62d..6dfb301 100644
--- a/test/unit/org/apache/cassandra/service/RemoveTest.java
+++ b/test/unit/org/apache/cassandra/service/RemoveTest.java
@@ -40,7 +40,7 @@ import org.apache.cassandra.dht.RandomPartitioner;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.locator.TokenMetadata;
-import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.sink.IMessageSink;
@@ -169,7 +169,7 @@ public class RemoveTest
*/
class ReplicationSink implements IMessageSink
{
- public Message handleMessage(Message msg, String id, InetAddress to)
+ public MessageIn handleMessage(MessageIn msg, String id, InetAddress to)
{
if (!msg.getVerb().equals(StorageService.Verb.STREAM_REQUEST))
return msg;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d46a014/test/unit/org/apache/cassandra/streaming/StreamUtil.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamUtil.java b/test/unit/org/apache/cassandra/streaming/StreamUtil.java
index 66264ca..c0bed04 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamUtil.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamUtil.java
@@ -23,7 +23,7 @@ import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.net.InetAddress;
-import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessagingService;
public class StreamUtil
@@ -33,7 +33,7 @@ public class StreamUtil
* Takes an stream request message and creates an empty status response. Exists here because StreamRequestMessage
* is package protected.
*/
- static public void finishStreamRequest(Message msg, InetAddress to)
+ static public void finishStreamRequest(MessageIn msg, InetAddress to)
{
byte[] body = msg.getMessageBody();
ByteArrayInputStream bufIn = new ByteArrayInputStream(body);