You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/05/04 21:02:24 UTC
svn commit: r771402 - in
/incubator/cassandra/trunk/src/java/org/apache/cassandra: db/ dht/ gms/
net/ net/io/ service/ test/ tools/
Author: jbellis
Date: Mon May 4 19:02:23 2009
New Revision: 771402
URL: http://svn.apache.org/viewvc?rev=771402&view=rev
Log:
change Message body to byte[]. this reveals where there are problems: everything that is using any of the send messages already needs to be doing that or they're broken. the good ones I just unwrap; the broken ones I fixed except for MoveMessage which doesn't even have a byte[] serializer yet, so I left that one broken. writeResponseResolver is the specific case that caused the bug report.
patch by jbellis; reviewed by nk11 for CASSANDRA-120
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CalloutDeployMessage.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CalloutDeployVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DataFileVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/LoadVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeCommand.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationMessage.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/TouchMessage.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/TouchVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapInitiateMessage.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/AsyncResult.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IAsyncResult.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MultiAsyncResult.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StreamContextManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadRepairManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/TokenUpdateVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseResolver.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/test/StressTest.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/AdminTool.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/MembershipCleaner.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/MembershipCleanerVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/TokenUpdateVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/TokenUpdater.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryVerbHandler.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryVerbHandler.java Mon May 4 19:02:23 2009
@@ -39,7 +39,7 @@
public void doVerb(Message message)
{
- byte[] bytes = (byte[])message.getMessageBody()[0];
+ byte[] bytes = message.getMessageBody();
/* Obtain a Row Mutation Context from TLS */
RowMutationContext rowMutationCtx = tls_.get();
if ( rowMutationCtx == null )
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CalloutDeployMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CalloutDeployMessage.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CalloutDeployMessage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CalloutDeployMessage.java Mon May 4 19:02:23 2009
@@ -46,7 +46,7 @@
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
serializer_.serialize(cdMessage, dos);
- Message message = new Message(StorageService.getLocalStorageEndPoint(), "", StorageService.calloutDeployVerbHandler_, new Object[]{bos.toByteArray()});
+ Message message = new Message(StorageService.getLocalStorageEndPoint(), "", StorageService.calloutDeployVerbHandler_, bos.toByteArray());
return message;
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CalloutDeployVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CalloutDeployVerbHandler.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CalloutDeployVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CalloutDeployVerbHandler.java Mon May 4 19:02:23 2009
@@ -32,8 +32,7 @@
public void doVerb(Message message)
{
- Object[] body = message.getMessageBody();
- byte[] bytes = (byte[])body[0];
+ byte[] bytes = message.getMessageBody();
DataInputBuffer bufIn = new DataInputBuffer();
bufIn.reset(bytes, bytes.length);
try
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DataFileVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DataFileVerbHandler.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DataFileVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DataFileVerbHandler.java Mon May 4 19:02:23 2009
@@ -20,8 +20,7 @@
public void doVerb(Message message)
{
- Object[] body = message.getMessageBody();
- byte[] bytes = (byte[])body[0];
+ byte[] bytes = message.getMessageBody();
String table = new String(bytes);
logger_.info("**** Received a request from " + message.getFrom());
@@ -35,7 +34,7 @@
{
dos.writeUTF(file);
}
- Message response = message.getReply( StorageService.getLocalStorageEndPoint(), new Object[]{bos.toByteArray()});
+ Message response = message.getReply( StorageService.getLocalStorageEndPoint(), bos.toByteArray());
MessagingService.getMessagingInstance().sendOneWay(response, message.getFrom());
}
catch ( IOException ex )
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/LoadVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/LoadVerbHandler.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/LoadVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/LoadVerbHandler.java Mon May 4 19:02:23 2009
@@ -24,6 +24,7 @@
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.LogUtil;
+import org.apache.cassandra.io.DataInputBuffer;
import org.apache.log4j.Logger;
/**
@@ -38,16 +39,17 @@
{
try
{
- Object[] body = message.getMessageBody();
- RowMutationMessage rmMsg = (RowMutationMessage)body[0];
- RowMutation rm = rmMsg.getRowMutation();
-
- EndPoint[] endpoints = StorageService.instance().getNStorageEndPoint(rm.key());
-
+ byte[] body = message.getMessageBody();
+ DataInputBuffer buffer = new DataInputBuffer();
+ buffer.reset(body, body.length);
+ RowMutationMessage rmMsg = RowMutationMessage.serializer().deserialize(buffer);
+
+ EndPoint[] endpoints = StorageService.instance().getNStorageEndPoint(rmMsg.getRowMutation().key());
+
Message messageInternal = new Message(StorageService.getLocalStorageEndPoint(),
StorageService.mutationStage_,
StorageService.mutationVerbHandler_,
- new Object[]{ rmMsg }
+ body
);
StringBuilder sb = new StringBuilder();
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeCommand.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeCommand.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeCommand.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeCommand.java Mon May 4 19:02:23 2009
@@ -40,7 +40,7 @@
public static RangeCommand read(Message message) throws IOException
{
- byte[] bytes = (byte[]) message.getMessageBody()[0];
+ byte[] bytes = message.getMessageBody();
DataInputBuffer dib = new DataInputBuffer();
dib.reset(bytes, bytes.length);
return serializer.deserialize(new DataInputStream(dib));
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java Mon May 4 19:02:23 2009
@@ -40,7 +40,7 @@
public void doVerb(Message message)
{
- byte[] body = (byte[])message.getMessageBody()[0];
+ byte[] body = message.getMessageBody();
DataInputBuffer buffer = new DataInputBuffer();
buffer.reset(body, body.length);
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java Mon May 4 19:02:23 2009
@@ -54,7 +54,7 @@
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream( bos );
ReadResponse.serializer().serialize(readResponse, dos);
- Message message = new Message(StorageService.getLocalStorageEndPoint(), MessagingService.responseStage_, MessagingService.responseVerbHandler_, new Object[]{bos.toByteArray()});
+ Message message = new Message(StorageService.getLocalStorageEndPoint(), MessagingService.responseStage_, MessagingService.responseVerbHandler_, bos.toByteArray());
return message;
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java Mon May 4 19:02:23 2009
@@ -60,7 +60,7 @@
public void doVerb(Message message)
{
- byte[] body = (byte[])message.getMessageBody()[0];
+ byte[] body = message.getMessageBody();
/* Obtain a Read Context from TLS */
ReadContext readCtx = tls_.get();
if ( readCtx == null )
@@ -101,7 +101,7 @@
System.arraycopy(readCtx.bufOut_.getData(), 0, bytes, 0, bytes.length);
logger_.info("copy TIME: " + (System.currentTimeMillis() - start) + " ms.");
- Message response = message.getReply( StorageService.getLocalStorageEndPoint(), new Object[]{bytes} );
+ Message response = message.getReply( StorageService.getLocalStorageEndPoint(), bytes );
MessagingService.getMessagingInstance().sendOneWay(response, message.getFrom());
logger_.info("ReadVerbHandler TIME 2: " + (System.currentTimeMillis() - start) + " ms.");
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationMessage.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationMessage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationMessage.java Mon May 4 19:02:23 2009
@@ -41,32 +41,26 @@
public class RowMutationMessage implements Serializable
{
public static final String hint_ = "HINT";
- private static ICompactSerializer<RowMutationMessage> serializer_;
+ private static RowMutationMessageSerializer serializer_ = new RowMutationMessageSerializer();
- static
- {
- serializer_ = new RowMutationMessageSerializer();
- }
-
- static ICompactSerializer<RowMutationMessage> serializer()
+ static RowMutationMessageSerializer serializer()
{
return serializer_;
}
- public static Message makeRowMutationMessage(RowMutationMessage rowMutationMessage) throws IOException
+ public Message makeRowMutationMessage() throws IOException
{
- return makeRowMutationMessage(rowMutationMessage, StorageService.mutationVerbHandler_);
+ return makeRowMutationMessage(StorageService.mutationVerbHandler_);
}
- public static Message makeRowMutationMessage(RowMutationMessage rowMutationMessage, String verbHandlerName) throws IOException
+ public Message makeRowMutationMessage(String verbHandlerName) throws IOException
{
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream( bos );
- RowMutationMessage.serializer().serialize(rowMutationMessage, dos);
+ RowMutationMessage.serializer().serialize(this, dos);
EndPoint local = StorageService.getLocalStorageEndPoint();
EndPoint from = ( local != null ) ? local : new EndPoint(FBUtilities.getHostAddress(), 7000);
- Message message = new Message(from, StorageService.mutationStage_, verbHandlerName, new Object[]{bos.toByteArray()});
- return message;
+ return new Message(from, StorageService.mutationStage_, verbHandlerName, bos.toByteArray());
}
@XmlElement(name="RowMutation")
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java Mon May 4 19:02:23 2009
@@ -53,7 +53,7 @@
public void doVerb(Message message)
{
- byte[] bytes = (byte[]) message.getMessageBody()[0];
+ byte[] bytes = message.getMessageBody();
/* Obtain a Row Mutation Context from TLS */
RowMutationContext rowMutationCtx = tls_.get();
if ( rowMutationCtx == null )
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Mon May 4 19:02:23 2009
@@ -299,7 +299,7 @@
*/
public void doVerb(Message message)
{
- byte[] body = (byte[])message.getMessageBody()[0];
+ byte[] body = message.getMessageBody();
DataInputBuffer bufIn = new DataInputBuffer();
bufIn.reset(body, body.length);
@@ -330,7 +330,7 @@
StreamContextManager.registerStreamCompletionHandler(message.getFrom().getHost(), new Table.BootstrapCompletionHandler());
/* Send a bootstrap initiation done message to execute on default stage. */
logger_.debug("Sending a bootstrap initiate done message ...");
- Message doneMessage = new Message( StorageService.getLocalStorageEndPoint(), "", StorageService.bootStrapInitiateDoneVerbHandler_, new Object[]{new byte[0]} );
+ Message doneMessage = new Message( StorageService.getLocalStorageEndPoint(), "", StorageService.bootStrapInitiateDoneVerbHandler_, new byte[0] );
MessagingService.getMessagingInstance().sendOneWay(doneMessage, message.getFrom());
}
catch ( IOException ex )
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/TouchMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/TouchMessage.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/TouchMessage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/TouchMessage.java Mon May 4 19:02:23 2009
@@ -32,7 +32,7 @@
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream( bos );
TouchMessage.serializer().serialize(touchMessage, dos);
- Message message = new Message(StorageService.getLocalStorageEndPoint(), StorageService.readStage_, StorageService.touchVerbHandler_, new Object[]{bos.toByteArray()});
+ Message message = new Message(StorageService.getLocalStorageEndPoint(), StorageService.readStage_, StorageService.touchVerbHandler_, bos.toByteArray());
return message;
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/TouchVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/TouchVerbHandler.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/TouchVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/TouchVerbHandler.java Mon May 4 19:02:23 2009
@@ -23,7 +23,7 @@
public void doVerb(Message message)
{
- byte[] body = (byte[])message.getMessageBody()[0];
+ byte[] body = message.getMessageBody();
/* Obtain a Read Context from TLS */
ReadContext readCtx = tls_.get();
if ( readCtx == null )
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapInitiateMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapInitiateMessage.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapInitiateMessage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapInitiateMessage.java Mon May 4 19:02:23 2009
@@ -54,7 +54,7 @@
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream( bos );
BootstrapInitiateMessage.serializer().serialize(biMessage, dos);
- return new Message( StorageService.getLocalStorageEndPoint(), "", StorageService.bootStrapInitiateVerbHandler_, new Object[]{bos.toByteArray()} );
+ return new Message( StorageService.getLocalStorageEndPoint(), "", StorageService.bootStrapInitiateVerbHandler_, bos.toByteArray() );
}
protected StreamContextManager.StreamContext[] streamContexts_ = new StreamContextManager.StreamContext[0];
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java Mon May 4 19:02:23 2009
@@ -53,7 +53,7 @@
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream( bos );
BootstrapMetadataMessage.serializer().serialize(bsMetadataMessage, dos);
- return new Message( StorageService.getLocalStorageEndPoint(), "", StorageService.bsMetadataVerbHandler_, new Object[]{bos.toByteArray()} );
+ return new Message( StorageService.getLocalStorageEndPoint(), "", StorageService.bsMetadataVerbHandler_, bos.toByteArray() );
}
protected BootstrapMetadata[] bsMetadata_ = new BootstrapMetadata[0];
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java Mon May 4 19:02:23 2009
@@ -47,7 +47,7 @@
public void doVerb(Message message)
{
logger_.debug("Received a BootstrapMetadataMessage from " + message.getFrom());
- byte[] body = (byte[])message.getMessageBody()[0];
+ byte[] body = message.getMessageBody();
DataInputBuffer bufIn = new DataInputBuffer();
bufIn.reset(body, body.length);
try
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Mon May 4 19:02:23 2009
@@ -356,7 +356,7 @@
ByteArrayOutputStream bos = new ByteArrayOutputStream(Gossiper.MAX_GOSSIP_PACKET_SIZE);
DataOutputStream dos = new DataOutputStream( bos );
GossipDigestSynMessage.serializer().serialize(gDigestMessage, dos);
- Message message = new Message(localEndPoint_, Gossiper.GOSSIP_STAGE, GOSSIP_DIGEST_SYN_VERB, new Object[]{bos.toByteArray()});
+ Message message = new Message(localEndPoint_, Gossiper.GOSSIP_STAGE, GOSSIP_DIGEST_SYN_VERB, bos.toByteArray());
return message;
}
@@ -366,7 +366,7 @@
DataOutputStream dos = new DataOutputStream(bos);
GossipDigestAckMessage.serializer().serialize(gDigestAckMessage, dos);
logger_.trace("@@@@ Size of GossipDigestAckMessage is " + bos.toByteArray().length);
- Message message = new Message(localEndPoint_, Gossiper.GOSSIP_STAGE, GOSSIP_DIGEST_ACK_VERB, new Object[]{bos.toByteArray()});
+ Message message = new Message(localEndPoint_, Gossiper.GOSSIP_STAGE, GOSSIP_DIGEST_ACK_VERB, bos.toByteArray());
return message;
}
@@ -375,7 +375,7 @@
ByteArrayOutputStream bos = new ByteArrayOutputStream(Gossiper.MAX_GOSSIP_PACKET_SIZE);
DataOutputStream dos = new DataOutputStream(bos);
GossipDigestAck2Message.serializer().serialize(gDigestAck2Message, dos);
- Message message = new Message(localEndPoint_, Gossiper.GOSSIP_STAGE, GOSSIP_DIGEST_ACK2_VERB, new Object[]{bos.toByteArray()});
+ Message message = new Message(localEndPoint_, Gossiper.GOSSIP_STAGE, GOSSIP_DIGEST_ACK2_VERB, bos.toByteArray());
return message;
}
@@ -949,7 +949,7 @@
EndPoint from = message.getFrom();
logger_.debug("Received a JoinMessage from " + from);
- byte[] bytes = (byte[])message.getMessageBody()[0];
+ byte[] bytes = message.getMessageBody();
DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
try
@@ -976,7 +976,7 @@
EndPoint from = message.getFrom();
logger_.trace("Received a GossipDigestSynMessage from " + from);
- byte[] bytes = (byte[])message.getMessageBody()[0];
+ byte[] bytes = message.getMessageBody();
DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
try
@@ -1060,7 +1060,7 @@
EndPoint from = message.getFrom();
logger_.trace("Received a GossipDigestAckMessage from " + from);
- byte[] bytes = (byte[])message.getMessageBody()[0];
+ byte[] bytes = message.getMessageBody();
DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
try
@@ -1107,7 +1107,7 @@
EndPoint from = message.getFrom();
logger_.trace("Received a GossipDigestAck2Message from " + from);
- byte[] bytes = (byte[])message.getMessageBody()[0];
+ byte[] bytes = message.getMessageBody();
DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
try
{
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/AsyncResult.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/AsyncResult.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/AsyncResult.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/AsyncResult.java Mon May 4 19:02:23 2009
@@ -19,7 +19,6 @@
package org.apache.cassandra.net;
import java.util.List;
-import java.util.Hashtable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -27,8 +26,6 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.service.QuorumResponseHandler;
import org.apache.cassandra.utils.LogUtil;
import org.apache.log4j.Logger;
@@ -39,7 +36,7 @@
class AsyncResult implements IAsyncResult
{
private static Logger logger_ = Logger.getLogger( AsyncResult.class );
- private Object[] result_ = new Object[0];
+ private byte[] result_;
private AtomicBoolean done_ = new AtomicBoolean(false);
private Lock lock_ = new ReentrantLock();
private Condition condition_;
@@ -49,7 +46,7 @@
condition_ = lock_.newCondition();
}
- public Object[] get()
+ public byte[] get()
{
lock_.lock();
try
@@ -75,7 +72,7 @@
return done_.get();
}
- public Object[] get(long timeout, TimeUnit tu) throws TimeoutException
+ public byte[] get(long timeout, TimeUnit tu) throws TimeoutException
{
lock_.lock();
try
@@ -105,12 +102,12 @@
return result_;
}
- public List<Object[]> multiget()
+ public List<byte[]> multiget()
{
throw new UnsupportedOperationException("This operation is not supported in the AsyncResult abstraction.");
}
- public List<Object[]> multiget(long timeout, TimeUnit tu) throws TimeoutException
+ public List<byte[]> multiget(long timeout, TimeUnit tu) throws TimeoutException
{
throw new UnsupportedOperationException("This operation is not supported in the AsyncResult abstraction.");
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IAsyncResult.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IAsyncResult.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IAsyncResult.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IAsyncResult.java Mon May 4 19:02:23 2009
@@ -39,7 +39,7 @@
* Returns the result for the task that was submitted.
* @return the result wrapped in an Object[]
*/
- public Object[] get();
+ public byte[] get();
/**
* Same operation as the above get() but allows the calling
@@ -48,13 +48,13 @@
* @param tu the time unit of the timeout argument
* @return the result wrapped in an Object[]
*/
- public Object[] get(long timeout, TimeUnit tu) throws TimeoutException;
+ public byte[] get(long timeout, TimeUnit tu) throws TimeoutException;
/**
* Returns the result for all tasks that was submitted.
* @return the list of results wrapped in an Object[]
*/
- public List<Object[]> multiget();
+ public List<byte[]> multiget();
/**
* Same operation as the above get() but allows the calling
@@ -63,7 +63,7 @@
* @param tu the time unit of the timeout argument
* @return the result wrapped in an Object[]
*/
- public List<Object[]> multiget(long timeout, TimeUnit tu) throws TimeoutException;
+ public List<byte[]> multiget(long timeout, TimeUnit tu) throws TimeoutException;
/**
* Store the result obtained for the submitted task.
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java Mon May 4 19:02:23 2009
@@ -51,20 +51,20 @@
}
Header header_;
- private Object[] body_ = new Object[0];
+ private byte[] body_;
- protected Message(String id, EndPoint from, String messageType, String verb, Object... body)
+ protected Message(String id, EndPoint from, String messageType, String verb, byte[] body)
{
this(new Header(id, from, messageType, verb), body);
}
- protected Message(Header header, Object... body)
+ protected Message(Header header, byte[] body)
{
header_ = header;
body_ = body;
}
- public Message(EndPoint from, String messageType, String verb, Object... body)
+ public Message(EndPoint from, String messageType, String verb, byte[] body)
{
this(new Header(from, messageType, verb), body);
}
@@ -99,12 +99,12 @@
return header_.getDetails();
}
- public Object[] getMessageBody()
+ public byte[] getMessageBody()
{
return body_;
}
- public void setMessageBody(Object[] body)
+ public void setMessageBody(byte[] body)
{
body_ = body;
}
@@ -128,36 +128,13 @@
{
return header_.getMessageId();
}
-
- public Class[] getTypes()
- {
- List<Class> types = new ArrayList<Class>();
-
- for ( int i = 0; i < body_.length; ++i )
- {
- if ( body_[i].getClass().isArray() )
- {
- int size = Array.getLength(body_[i]);
- if ( size > 0 )
- {
- types.add( Array.get( body_[i], 0).getClass() );
- }
- }
- else
- {
- types.add(body_[i].getClass());
- }
- }
-
- return types.toArray( new Class[0] );
- }
void setMessageId(String id)
{
header_.setMessageId(id);
}
- public Message getReply(EndPoint from, Object... args)
+ public Message getReply(EndPoint from, byte[] args)
{
Message response = new Message(getMessageId(),
from,
@@ -179,22 +156,8 @@
sbuf.append(separator);
sbuf.append("VERB:" + getVerb());
sbuf.append(separator);
- sbuf.append("BODY TYPE:" + getBodyTypes());
- sbuf.append(separator);
return sbuf.toString();
}
-
- private String getBodyTypes()
- {
- StringBuffer sbuf = new StringBuffer("");
- Class[] types = getTypes();
- for ( int i = 0; i < types.length; ++i )
- {
- sbuf.append(types[i].getName());
- sbuf.append(" ");
- }
- return sbuf.toString();
- }
}
class MessageSerializer implements ICompactSerializer<Message>
@@ -202,7 +165,7 @@
public void serialize(Message t, DataOutputStream dos) throws IOException
{
Header.serializer().serialize( t.header_, dos);
- byte[] bytes = (byte[])t.getMessageBody()[0];
+ byte[] bytes = t.getMessageBody();
dos.writeInt(bytes.length);
dos.write(bytes);
}
@@ -214,6 +177,6 @@
byte[] bytes = new byte[size];
dis.readFully(bytes);
// return new Message(header.getMessageId(), header.getFrom(), header.getMessageType(), header.getVerb(), new Object[]{bytes});
- return new Message(header, new Object[]{bytes});
+ return new Message(header, bytes);
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MultiAsyncResult.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MultiAsyncResult.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MultiAsyncResult.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MultiAsyncResult.java Mon May 4 19:02:23 2009
@@ -33,7 +33,7 @@
{
private static Logger logger_ = Logger.getLogger( AsyncResult.class );
private int expectedResults_;
- private List<Object[]> result_ = new ArrayList<Object[]>();
+ private List<byte[]> result_ = new ArrayList<byte[]>();
private AtomicBoolean done_ = new AtomicBoolean(false);
private Lock lock_ = new ReentrantLock();
private Condition condition_;
@@ -44,17 +44,17 @@
condition_ = lock_.newCondition();
}
- public Object[] get()
+ public byte[] get()
{
throw new UnsupportedOperationException("This operation is not supported in the AsyncResult abstraction.");
}
- public Object[] get(long timeout, TimeUnit tu) throws TimeoutException
+ public byte[] get(long timeout, TimeUnit tu) throws TimeoutException
{
throw new UnsupportedOperationException("This operation is not supported in the AsyncResult abstraction.");
}
- public List<Object[]> multiget()
+ public List<byte[]> multiget()
{
lock_.lock();
try
@@ -80,7 +80,7 @@
return done_.get();
}
- public List<Object[]> multiget(long timeout, TimeUnit tu) throws TimeoutException
+ public List<byte[]> multiget(long timeout, TimeUnit tu) throws TimeoutException
{
lock_.lock();
try
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StreamContextManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StreamContextManager.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StreamContextManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StreamContextManager.java Mon May 4 19:02:23 2009
@@ -219,7 +219,7 @@
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream( bos );
StreamStatusMessage.serializer().serialize(streamStatusMessage, dos);
- return new Message(StorageService.getLocalStorageEndPoint(), "", StorageService.bootStrapTerminateVerbHandler_, new Object[]{bos.toByteArray()});
+ return new Message(StorageService.getLocalStorageEndPoint(), "", StorageService.bootStrapTerminateVerbHandler_, bos.toByteArray());
}
protected StreamContextManager.StreamStatus streamStatus_;
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java Mon May 4 19:02:23 2009
@@ -573,7 +573,7 @@
// read response
// TODO send more requests if we need to span multiple nodes
// double the usual timeout since range requests are expensive
- byte[] responseBody = (byte[])iar.get(2 * DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS)[0];
+ byte[] responseBody = iar.get(2 * DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
return RangeReply.read(responseBody).keys;
}
catch (Exception e)
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java Mon May 4 19:02:23 2009
@@ -66,7 +66,7 @@
logger_.debug("Handle Digest reponses");
for( Message response : responses_ )
{
- byte[] body = (byte[])response.getMessageBody()[0];
+ byte[] body = response.getMessageBody();
bufIn.reset(body, body.length);
try
{
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadRepairManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadRepairManager.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadRepairManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadRepairManager.java Mon May 4 19:02:23 2009
@@ -113,7 +113,7 @@
{
try
{
- Message message = RowMutationMessage.makeRowMutationMessage(rowMutationMessage, StorageService.readRepairVerbHandler_);
+ Message message = rowMutationMessage.makeRowMutationMessage(StorageService.readRepairVerbHandler_);
String key = target + ":" + message.getMessageId();
readRepairTable_.put(key, message);
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java Mon May 4 19:02:23 2009
@@ -75,7 +75,7 @@
DataInputBuffer bufIn = new DataInputBuffer();
for (Message response : responses)
{
- byte[] body = (byte[])response.getMessageBody()[0];
+ byte[] body = response.getMessageBody();
bufIn.reset(body, body.length);
try
{
@@ -154,7 +154,7 @@
boolean isDataPresent = false;
for (Message response : responses)
{
- byte[] body = (byte[])response.getMessageBody()[0];
+ byte[] body = response.getMessageBody();
DataInputBuffer bufIn = new DataInputBuffer();
bufIn.reset(body, body.length);
try
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java Mon May 4 19:02:23 2009
@@ -156,11 +156,11 @@
{
public void doVerb(Message message)
{
- Message reply = message.getReply(StorageService.getLocalStorageEndPoint(), new Object[]{isMoveable_.get()});
+ Message reply = message.getReply(StorageService.getLocalStorageEndPoint(), new byte[] {(byte)(isMoveable_.get() ? 1 : 0)});
MessagingService.getMessagingInstance().sendOneWay(reply, message.getFrom());
if ( isMoveable_.get() )
{
- MoveMessage moveMessage = (MoveMessage)message.getMessageBody()[0];
+ // MoveMessage moveMessage = (MoveMessage)message.getMessageBody()[0];
/* Start the leave operation and join the ring at the position specified */
isMoveable_.set(false);
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Mon May 4 19:02:23 2009
@@ -189,11 +189,10 @@
Map<String, Message> messages = constructMessages(readMessages);
/* Dispatch the messages to the respective endpoints */
IAsyncResult iar = dispatchMessages(endPoints, messages);
- List<Object[]> results = iar.multiget(2*DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
+ List<byte[]> results = iar.multiget(2*DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
- for ( Object[] result : results )
+ for ( byte[] body : results )
{
- byte[] body = (byte[])result[0];
DataInputBuffer bufIn = new DataInputBuffer();
bufIn.reset(body, body.length);
ReadResponse response = ReadResponse.serializer().deserialize(bufIn);
@@ -221,8 +220,7 @@
byte[] body;
try
{
- Object[] result = iar.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
- body = (byte[])result[0];
+ body = iar.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
}
catch (TimeoutException e)
{
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java Mon May 4 19:02:23 2009
@@ -48,7 +48,7 @@
public void doVerb(Message message)
{
- byte[] body = (byte[])message.getMessageBody()[0];
+ byte[] body = message.getMessageBody();
DataInputBuffer bufIn = new DataInputBuffer();
bufIn.reset(body, body.length);
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/TokenUpdateVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/TokenUpdateVerbHandler.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/TokenUpdateVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/TokenUpdateVerbHandler.java Mon May 4 19:02:23 2009
@@ -37,7 +37,7 @@
public void doVerb(Message message)
{
- byte[] body = (byte[])message.getMessageBody()[0];
+ byte[] body = message.getMessageBody();
Token token = StorageService.getPartitioner().getTokenFactory().fromByteArray(body);
try
{
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseResolver.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseResolver.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseResolver.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseResolver.java Mon May 4 19:02:23 2009
@@ -19,6 +19,9 @@
package org.apache.cassandra.service;
import java.util.List;
+import java.io.DataInputStream;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
import org.apache.cassandra.db.WriteResponse;
import org.apache.cassandra.net.Message;
@@ -46,12 +49,19 @@
// if a write fails for a key log that the key could not be replicated
boolean returnValue = false;
for (Message response : responses) {
- Object[] body = response.getMessageBody();
- WriteResponse writeResponse = (WriteResponse) body[0];
- boolean result = writeResponse.isSuccess();
- if (!result) {
+ WriteResponse writeResponseMessage = null;
+ try
+ {
+ writeResponseMessage = WriteResponse.serializer().deserialize(new DataInputStream(new ByteArrayInputStream(response.getMessageBody())));
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ boolean result = writeResponseMessage.isSuccess();
+ if (!result) {
logger_.debug("Write at " + response.getFrom()
- + " may have failed for the key " + writeResponse.key());
+ + " may have failed for the key " + writeResponseMessage.key());
}
returnValue |= result;
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/test/StressTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/test/StressTest.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/test/StressTest.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/test/StressTest.java Mon May 4 19:02:23 2009
@@ -60,7 +60,7 @@
public class StressTest
{
- private static Logger logger_ = Logger.getLogger(DataImporter.class);
+ private static Logger logger_ = Logger.getLogger(StressTest.class);
private static final String tablename_ = new String("Test");
@@ -103,10 +103,15 @@
{
if( rmsg_ != null )
{
- Message message = new Message(from_ , StorageService.mutationStage_,
- StorageService.loadVerbHandler_, new Object[] { rmsg_ });
- MessagingService.getMessagingInstance().sendOneWay(message, to_);
- }
+ try
+ {
+ MessagingService.getMessagingInstance().sendOneWay(rmsg_.makeRowMutationMessage(), to_);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
}
}
@@ -125,12 +130,7 @@
{
long t = System.currentTimeMillis();
RowMutationMessage rmMsg = new RowMutationMessage(rm);
- Message message = new Message(from_,
- StorageService.mutationStage_,
- StorageService.mutationVerbHandler_,
- new Object[]{ rmMsg }
- );
- MessagingService.getMessagingInstance().sendOneWay(message, to_);
+ MessagingService.getMessagingInstance().sendOneWay(rmMsg.makeRowMutationMessage(), to_);
Thread.sleep(1, 1000000000/requestsPerSecond_);
}
@@ -143,33 +143,16 @@
public void readLoad(ReadCommand readCommand)
{
- IResponseResolver<Row> readResponseResolver = new ReadResponseResolver();
- QuorumResponseHandler<Row> quorumResponseHandler = new QuorumResponseHandler<Row>(
- 1,
- readResponseResolver);
- Message message = new Message(from_, StorageService.readStage_,
- StorageService.readVerbHandler_,
- new Object[] {readCommand});
- MessagingService.getMessagingInstance().sendOneWay(message, to_);
- /*IAsyncResult iar = MessagingService.getMessagingInstance().sendRR(message, to_);
- try
- {
- long t = System.currentTimeMillis();
- iar.get(2000, TimeUnit.MILLISECONDS );
- logger_.debug("Time taken for read..."
- + (System.currentTimeMillis() - t));
-
- }
- catch (Exception ex)
- {
- ex.printStackTrace();
- }*/
+ try
+ {
+ MessagingService.getMessagingInstance().sendOneWay(readCommand.makeReadMessage(), to_);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
}
-
-
-
-
public void randomReadColumn (int keys, int columns, int size, int tps)
{
Random random = new Random();
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/AdminTool.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/AdminTool.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/AdminTool.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/AdminTool.java Mon May 4 19:02:23 2009
@@ -159,7 +159,7 @@
RowMutationMessage rmMsg = new RowMutationMessage(rm);
if( server_ != null)
{
- Message message = RowMutationMessage.makeRowMutationMessage(rmMsg, StorageService.binaryVerbHandler_);
+ Message message = rmMsg.makeRowMutationMessage(StorageService.binaryVerbHandler_);
EndPoint to = new EndPoint(server_, 7000);
MessagingService.getMessagingInstance().sendOneWay(message, to);
}
@@ -167,7 +167,7 @@
{
for( String server : servers_ )
{
- Message message = RowMutationMessage.makeRowMutationMessage(rmMsg, StorageService.binaryVerbHandler_);
+ Message message = rmMsg.makeRowMutationMessage(StorageService.binaryVerbHandler_);
EndPoint to = new EndPoint(server, 7000);
MessagingService.getMessagingInstance().sendOneWay(message, to);
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/MembershipCleaner.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/MembershipCleaner.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/MembershipCleaner.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/MembershipCleaner.java Mon May 4 19:02:23 2009
@@ -63,7 +63,7 @@
DataOutputStream dos = new DataOutputStream(bos);
MembershipCleanerMessage.serializer().serialize(mcMessage, dos);
/* Construct the token update message to be sent */
- Message mbrshipCleanerMessage = new Message( new EndPoint(FBUtilities.getHostAddress(), port_), "", StorageService.mbrshipCleanerVerbHandler_, new Object[]{bos.toByteArray()} );
+ Message mbrshipCleanerMessage = new Message( new EndPoint(FBUtilities.getHostAddress(), port_), "", StorageService.mbrshipCleanerVerbHandler_, bos.toByteArray() );
BufferedReader bufReader = new BufferedReader( new InputStreamReader( new FileInputStream(file) ) );
String line = null;
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/MembershipCleanerVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/MembershipCleanerVerbHandler.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/MembershipCleanerVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/MembershipCleanerVerbHandler.java Mon May 4 19:02:23 2009
@@ -43,7 +43,7 @@
public void doVerb(Message message)
{
- byte[] body = (byte[])message.getMessageBody()[0];
+ byte[] body = message.getMessageBody();
try
{
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/TokenUpdateVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/TokenUpdateVerbHandler.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/TokenUpdateVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/TokenUpdateVerbHandler.java Mon May 4 19:02:23 2009
@@ -47,7 +47,7 @@
public void doVerb(Message message)
{
- byte[] body = (byte[])message.getMessageBody()[0];
+ byte[] body = message.getMessageBody();
try
{
@@ -78,7 +78,7 @@
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
Token.serializer().serialize(token, dos);
- message.setMessageBody(new Object[]{bos.toByteArray()});
+ message.setMessageBody(bos.toByteArray());
logger_.debug("Sending a token update message to " + target + " to update it to " + token);
MessagingService.getMessagingInstance().sendOneWay(message, target);
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/TokenUpdater.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/TokenUpdater.java?rev=771402&r1=771401&r2=771402&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/TokenUpdater.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/TokenUpdater.java Mon May 4 19:02:23 2009
@@ -58,7 +58,7 @@
Token.serializer().serialize(token, dos);
/* Construct the token update message to be sent */
- Message tokenUpdateMessage = new Message( new EndPoint(FBUtilities.getHostAddress(), port_), "", StorageService.tokenVerbHandler_, new Object[]{bos.toByteArray()} );
+ Message tokenUpdateMessage = new Message( new EndPoint(FBUtilities.getHostAddress(), port_), "", StorageService.tokenVerbHandler_, bos.toByteArray() );
BufferedReader bufReader = new BufferedReader( new InputStreamReader( new FileInputStream(file) ) );
String line = null;