You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/01/21 22:22:26 UTC
svn commit: r901858 - in
/incubator/cassandra/trunk/src/java/org/apache/cassandra:
db/ReadResponse.java gms/Gossiper.java net/Message.java
net/MessagingService.java net/ResponseVerbHandler.java
service/AntiEntropyService.java service/StorageService.java
Author: jbellis
Date: Thu Jan 21 21:22:25 2010
New Revision: 901858
URL: http://svn.apache.org/viewvc?rev=901858&view=rev
Log:
mv tree and gossip verb registration into StorageService
patch by jbellis; reviewed by Stu Hood for CASSANDRA-717
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java?rev=901858&r1=901857&r2=901858&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java Thu Jan 21 21:22:25 2010
@@ -51,15 +51,6 @@
return serializer_;
}
- public static Message makeReadResponseMessage(ReadResponse readResponse) throws IOException
- {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream( bos );
- ReadResponse.serializer().serialize(readResponse, dos);
- Message message = new Message(FBUtilities.getLocalAddress(), StageManager.RESPONSE_STAGE, MessagingService.responseVerbHandler_, bos.toByteArray());
- return message;
- }
-
private Row row_;
private byte[] digest_ = ArrayUtils.EMPTY_BYTE_ARRAY;
private boolean isDigestQuery_ = false;
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=901858&r1=901857&r2=901858&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Thu Jan 21 21:22:25 2010
@@ -27,6 +27,7 @@
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
import org.apache.log4j.Logger;
@@ -98,14 +99,6 @@
}
final static int MAX_GOSSIP_PACKET_SIZE = 1428;
- /* GSV - abbreviation for GOSSIP-DIGEST-SYN-VERB */
- final static String JOIN_VERB_HANDLER = "JVH";
- /* GSV - abbreviation for GOSSIP-DIGEST-SYN-VERB */
- final static String GOSSIP_DIGEST_SYN_VERB = "GSV";
- /* GAV - abbreviation for GOSSIP-DIGEST-ACK-VERB */
- final static String GOSSIP_DIGEST_ACK_VERB = "GAV";
- /* GA2V - abbreviation for GOSSIP-DIGEST-ACK2-VERB */
- final static String GOSSIP_DIGEST_ACK2_VERB = "GA2V";
public final static int intervalInMillis_ = 1000;
private static Logger logger_ = Logger.getLogger(Gossiper.class);
public static final Gossiper instance = new Gossiper();
@@ -136,11 +129,6 @@
aVeryLongTime_ = 259200 * 1000;
/* register with the Failure Detector for receiving Failure detector events */
FailureDetector.instance.registerFailureDetectionEventListener(this);
- /* register the verbs */
- MessagingService.instance.registerVerbHandlers(JOIN_VERB_HANDLER, new JoinVerbHandler());
- MessagingService.instance.registerVerbHandlers(GOSSIP_DIGEST_SYN_VERB, new GossipDigestSynVerbHandler());
- MessagingService.instance.registerVerbHandlers(GOSSIP_DIGEST_ACK_VERB, new GossipDigestAckVerbHandler());
- MessagingService.instance.registerVerbHandlers(GOSSIP_DIGEST_ACK2_VERB, new GossipDigestAck2VerbHandler());
}
/** Register with the Gossiper for EndPointState notifications */
@@ -271,7 +259,7 @@
ByteArrayOutputStream bos = new ByteArrayOutputStream(Gossiper.MAX_GOSSIP_PACKET_SIZE);
DataOutputStream dos = new DataOutputStream( bos );
GossipDigestSynMessage.serializer().serialize(gDigestMessage, dos);
- return new Message(localEndPoint_, StageManager.GOSSIP_STAGE, GOSSIP_DIGEST_SYN_VERB, bos.toByteArray());
+ return new Message(localEndPoint_, StageManager.GOSSIP_STAGE, StorageService.GOSSIP_DIGEST_SYN_VERB, bos.toByteArray());
}
Message makeGossipDigestAckMessage(GossipDigestAckMessage gDigestAckMessage) throws IOException
@@ -281,7 +269,7 @@
GossipDigestAckMessage.serializer().serialize(gDigestAckMessage, dos);
if (logger_.isTraceEnabled())
logger_.trace("@@@@ Size of GossipDigestAckMessage is " + bos.toByteArray().length);
- return new Message(localEndPoint_, StageManager.GOSSIP_STAGE, GOSSIP_DIGEST_ACK_VERB, bos.toByteArray());
+ return new Message(localEndPoint_, StageManager.GOSSIP_STAGE, StorageService.GOSSIP_DIGEST_ACK_VERB, bos.toByteArray());
}
Message makeGossipDigestAck2Message(GossipDigestAck2Message gDigestAck2Message) throws IOException
@@ -289,7 +277,7 @@
ByteArrayOutputStream bos = new ByteArrayOutputStream(Gossiper.MAX_GOSSIP_PACKET_SIZE);
DataOutputStream dos = new DataOutputStream(bos);
GossipDigestAck2Message.serializer().serialize(gDigestAck2Message, dos);
- return new Message(localEndPoint_, StageManager.GOSSIP_STAGE, GOSSIP_DIGEST_ACK2_VERB, bos.toByteArray());
+ return new Message(localEndPoint_, StageManager.GOSSIP_STAGE, StorageService.GOSSIP_DIGEST_ACK2_VERB, bos.toByteArray());
}
/**
@@ -822,197 +810,196 @@
gossipTimer_.cancel();
gossipTimer_ = new Timer(false); // makes the Gossiper reentrant.
}
-}
-class JoinVerbHandler implements IVerbHandler
-{
- private static Logger logger_ = Logger.getLogger( JoinVerbHandler.class);
-
- public void doVerb(Message message)
+ public static class JoinVerbHandler implements IVerbHandler
{
- InetAddress from = message.getFrom();
- if (logger_.isDebugEnabled())
- logger_.debug("Received a JoinMessage from " + from);
-
- byte[] bytes = message.getMessageBody();
- DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
+ private static Logger logger_ = Logger.getLogger( JoinVerbHandler.class);
- JoinMessage joinMessage;
- try
- {
- joinMessage = JoinMessage.serializer().deserialize(dis);
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- if ( joinMessage.clusterId_.equals( DatabaseDescriptor.getClusterName() ) )
+ public void doVerb(Message message)
{
- Gossiper.instance.join(from);
+ InetAddress from = message.getFrom();
+ if (logger_.isDebugEnabled())
+ logger_.debug("Received a JoinMessage from " + from);
+
+ byte[] bytes = message.getMessageBody();
+ DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
+
+ JoinMessage joinMessage;
+ try
+ {
+ joinMessage = JoinMessage.serializer().deserialize(dis);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ if ( joinMessage.clusterId_.equals( DatabaseDescriptor.getClusterName() ) )
+ {
+ Gossiper.instance.join(from);
+ }
}
}
-}
-
-class GossipDigestSynVerbHandler implements IVerbHandler
-{
- private static Logger logger_ = Logger.getLogger( GossipDigestSynVerbHandler.class);
- public void doVerb(Message message)
+ public static class GossipDigestSynVerbHandler implements IVerbHandler
{
- InetAddress from = message.getFrom();
- if (logger_.isTraceEnabled())
- logger_.trace("Received a GossipDigestSynMessage from " + from);
-
- byte[] bytes = message.getMessageBody();
- DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
+ private static Logger logger_ = Logger.getLogger( GossipDigestSynVerbHandler.class);
- try
+ public void doVerb(Message message)
{
- GossipDigestSynMessage gDigestMessage = GossipDigestSynMessage.serializer().deserialize(dis);
- /* If the message is from a different cluster throw it away. */
- if ( !gDigestMessage.clusterId_.equals(DatabaseDescriptor.getClusterName()) )
- return;
+ InetAddress from = message.getFrom();
+ if (logger_.isTraceEnabled())
+ logger_.trace("Received a GossipDigestSynMessage from " + from);
- List<GossipDigest> gDigestList = gDigestMessage.getGossipDigests();
- /* Notify the Failure Detector */
- Gossiper.instance.notifyFailureDetector(gDigestList);
+ byte[] bytes = message.getMessageBody();
+ DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
- doSort(gDigestList);
+ try
+ {
+ GossipDigestSynMessage gDigestMessage = GossipDigestSynMessage.serializer().deserialize(dis);
+ /* If the message is from a different cluster throw it away. */
+ if ( !gDigestMessage.clusterId_.equals(DatabaseDescriptor.getClusterName()) )
+ return;
- List<GossipDigest> deltaGossipDigestList = new ArrayList<GossipDigest>();
- Map<InetAddress, EndPointState> deltaEpStateMap = new HashMap<InetAddress, EndPointState>();
- Gossiper.instance.examineGossiper(gDigestList, deltaGossipDigestList, deltaEpStateMap);
+ List<GossipDigest> gDigestList = gDigestMessage.getGossipDigests();
+ /* Notify the Failure Detector */
+ Gossiper.instance.notifyFailureDetector(gDigestList);
- GossipDigestAckMessage gDigestAck = new GossipDigestAckMessage(deltaGossipDigestList, deltaEpStateMap);
- Message gDigestAckMessage = Gossiper.instance.makeGossipDigestAckMessage(gDigestAck);
- if (logger_.isTraceEnabled())
- logger_.trace("Sending a GossipDigestAckMessage to " + from);
- MessagingService.instance.sendOneWay(gDigestAckMessage, from);
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- }
+ doSort(gDigestList);
- /*
- * First construct a map whose key is the endpoint in the GossipDigest and the value is the
- * GossipDigest itself. Then build a list of version differences i.e difference between the
- * version in the GossipDigest and the version in the local state for a given InetAddress.
- * Sort this list. Now loop through the sorted list and retrieve the GossipDigest corresponding
- * to the endpoint from the map that was initially constructed.
- */
- private void doSort(List<GossipDigest> gDigestList)
- {
- /* Construct a map of endpoint to GossipDigest. */
- Map<InetAddress, GossipDigest> epToDigestMap = new HashMap<InetAddress, GossipDigest>();
- for ( GossipDigest gDigest : gDigestList )
- {
- epToDigestMap.put(gDigest.getEndPoint(), gDigest);
- }
+ List<GossipDigest> deltaGossipDigestList = new ArrayList<GossipDigest>();
+ Map<InetAddress, EndPointState> deltaEpStateMap = new HashMap<InetAddress, EndPointState>();
+ Gossiper.instance.examineGossiper(gDigestList, deltaGossipDigestList, deltaEpStateMap);
- /*
- * These digests have their maxVersion set to the difference of the version
- * of the local EndPointState and the version found in the GossipDigest.
- */
- List<GossipDigest> diffDigests = new ArrayList<GossipDigest>();
- for ( GossipDigest gDigest : gDigestList )
- {
- InetAddress ep = gDigest.getEndPoint();
- EndPointState epState = Gossiper.instance.getEndPointStateForEndPoint(ep);
- int version = (epState != null) ? Gossiper.instance.getMaxEndPointStateVersion( epState ) : 0;
- int diffVersion = Math.abs(version - gDigest.getMaxVersion() );
- diffDigests.add( new GossipDigest(ep, gDigest.getGeneration(), diffVersion) );
+ GossipDigestAckMessage gDigestAck = new GossipDigestAckMessage(deltaGossipDigestList, deltaEpStateMap);
+ Message gDigestAckMessage = Gossiper.instance.makeGossipDigestAckMessage(gDigestAck);
+ if (logger_.isTraceEnabled())
+ logger_.trace("Sending a GossipDigestAckMessage to " + from);
+ MessagingService.instance.sendOneWay(gDigestAckMessage, from);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
}
- gDigestList.clear();
- Collections.sort(diffDigests);
- int size = diffDigests.size();
/*
- * Report the digests in descending order. This takes care of the endpoints
- * that are far behind w.r.t this local endpoint
+ * First construct a map whose key is the endpoint in the GossipDigest and the value is the
+ * GossipDigest itself. Then build a list of version differences i.e difference between the
+ * version in the GossipDigest and the version in the local state for a given InetAddress.
+ * Sort this list. Now loop through the sorted list and retrieve the GossipDigest corresponding
+ * to the endpoint from the map that was initially constructed.
*/
- for ( int i = size - 1; i >= 0; --i )
+ private void doSort(List<GossipDigest> gDigestList)
{
- gDigestList.add( epToDigestMap.get(diffDigests.get(i).getEndPoint()) );
+ /* Construct a map of endpoint to GossipDigest. */
+ Map<InetAddress, GossipDigest> epToDigestMap = new HashMap<InetAddress, GossipDigest>();
+ for ( GossipDigest gDigest : gDigestList )
+ {
+ epToDigestMap.put(gDigest.getEndPoint(), gDigest);
+ }
+
+ /*
+ * These digests have their maxVersion set to the difference of the version
+ * of the local EndPointState and the version found in the GossipDigest.
+ */
+ List<GossipDigest> diffDigests = new ArrayList<GossipDigest>();
+ for ( GossipDigest gDigest : gDigestList )
+ {
+ InetAddress ep = gDigest.getEndPoint();
+ EndPointState epState = Gossiper.instance.getEndPointStateForEndPoint(ep);
+ int version = (epState != null) ? Gossiper.instance.getMaxEndPointStateVersion( epState ) : 0;
+ int diffVersion = Math.abs(version - gDigest.getMaxVersion() );
+ diffDigests.add( new GossipDigest(ep, gDigest.getGeneration(), diffVersion) );
+ }
+
+ gDigestList.clear();
+ Collections.sort(diffDigests);
+ int size = diffDigests.size();
+ /*
+ * Report the digests in descending order. This takes care of the endpoints
+ * that are far behind w.r.t this local endpoint
+ */
+ for ( int i = size - 1; i >= 0; --i )
+ {
+ gDigestList.add( epToDigestMap.get(diffDigests.get(i).getEndPoint()) );
+ }
}
}
-}
-
-class GossipDigestAckVerbHandler implements IVerbHandler
-{
- private static Logger logger_ = Logger.getLogger(GossipDigestAckVerbHandler.class);
- public void doVerb(Message message)
+ public static class GossipDigestAckVerbHandler implements IVerbHandler
{
- InetAddress from = message.getFrom();
- if (logger_.isTraceEnabled())
- logger_.trace("Received a GossipDigestAckMessage from " + from);
-
- byte[] bytes = message.getMessageBody();
- DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
+ private static Logger logger_ = Logger.getLogger(GossipDigestAckVerbHandler.class);
- try
+ public void doVerb(Message message)
{
- GossipDigestAckMessage gDigestAckMessage = GossipDigestAckMessage.serializer().deserialize(dis);
- List<GossipDigest> gDigestList = gDigestAckMessage.getGossipDigestList();
- Map<InetAddress, EndPointState> epStateMap = gDigestAckMessage.getEndPointStateMap();
+ InetAddress from = message.getFrom();
+ if (logger_.isTraceEnabled())
+ logger_.trace("Received a GossipDigestAckMessage from " + from);
+
+ byte[] bytes = message.getMessageBody();
+ DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
- if ( epStateMap.size() > 0 )
+ try
{
- /* Notify the Failure Detector */
- Gossiper.instance.notifyFailureDetector(epStateMap);
- Gossiper.instance.applyStateLocally(epStateMap);
- }
+ GossipDigestAckMessage gDigestAckMessage = GossipDigestAckMessage.serializer().deserialize(dis);
+ List<GossipDigest> gDigestList = gDigestAckMessage.getGossipDigestList();
+ Map<InetAddress, EndPointState> epStateMap = gDigestAckMessage.getEndPointStateMap();
+
+ if ( epStateMap.size() > 0 )
+ {
+ /* Notify the Failure Detector */
+ Gossiper.instance.notifyFailureDetector(epStateMap);
+ Gossiper.instance.applyStateLocally(epStateMap);
+ }
+
+ /* Get the state required to send to this gossipee - construct GossipDigestAck2Message */
+ Map<InetAddress, EndPointState> deltaEpStateMap = new HashMap<InetAddress, EndPointState>();
+ for( GossipDigest gDigest : gDigestList )
+ {
+ InetAddress addr = gDigest.getEndPoint();
+ EndPointState localEpStatePtr = Gossiper.instance.getStateForVersionBiggerThan(addr, gDigest.getMaxVersion());
+ if ( localEpStatePtr != null )
+ deltaEpStateMap.put(addr, localEpStatePtr);
+ }
- /* Get the state required to send to this gossipee - construct GossipDigestAck2Message */
- Map<InetAddress, EndPointState> deltaEpStateMap = new HashMap<InetAddress, EndPointState>();
- for( GossipDigest gDigest : gDigestList )
+ GossipDigestAck2Message gDigestAck2 = new GossipDigestAck2Message(deltaEpStateMap);
+ Message gDigestAck2Message = Gossiper.instance.makeGossipDigestAck2Message(gDigestAck2);
+ if (logger_.isTraceEnabled())
+ logger_.trace("Sending a GossipDigestAck2Message to " + from);
+ MessagingService.instance.sendOneWay(gDigestAck2Message, from);
+ }
+ catch ( IOException e )
{
- InetAddress addr = gDigest.getEndPoint();
- EndPointState localEpStatePtr = Gossiper.instance.getStateForVersionBiggerThan(addr, gDigest.getMaxVersion());
- if ( localEpStatePtr != null )
- deltaEpStateMap.put(addr, localEpStatePtr);
+ throw new RuntimeException(e);
}
-
- GossipDigestAck2Message gDigestAck2 = new GossipDigestAck2Message(deltaEpStateMap);
- Message gDigestAck2Message = Gossiper.instance.makeGossipDigestAck2Message(gDigestAck2);
- if (logger_.isTraceEnabled())
- logger_.trace("Sending a GossipDigestAck2Message to " + from);
- MessagingService.instance.sendOneWay(gDigestAck2Message, from);
- }
- catch ( IOException e )
- {
- throw new RuntimeException(e);
}
}
-}
-class GossipDigestAck2VerbHandler implements IVerbHandler
-{
- private static Logger logger_ = Logger.getLogger(GossipDigestAck2VerbHandler.class);
-
- public void doVerb(Message message)
+ public static class GossipDigestAck2VerbHandler implements IVerbHandler
{
- InetAddress from = message.getFrom();
- if (logger_.isTraceEnabled())
- logger_.trace("Received a GossipDigestAck2Message from " + from);
+ private static Logger logger_ = Logger.getLogger(GossipDigestAck2VerbHandler.class);
- byte[] bytes = message.getMessageBody();
- DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
- GossipDigestAck2Message gDigestAck2Message;
- try
+ public void doVerb(Message message)
{
- gDigestAck2Message = GossipDigestAck2Message.serializer().deserialize(dis);
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
+ InetAddress from = message.getFrom();
+ if (logger_.isTraceEnabled())
+ logger_.trace("Received a GossipDigestAck2Message from " + from);
+
+ byte[] bytes = message.getMessageBody();
+ DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
+ GossipDigestAck2Message gDigestAck2Message;
+ try
+ {
+ gDigestAck2Message = GossipDigestAck2Message.serializer().deserialize(dis);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ Map<InetAddress, EndPointState> remoteEpStateMap = gDigestAck2Message.getEndPointStateMap();
+ /* Notify the Failure Detector */
+ Gossiper.instance.notifyFailureDetector(remoteEpStateMap);
+ Gossiper.instance.applyStateLocally(remoteEpStateMap);
}
- Map<InetAddress, EndPointState> remoteEpStateMap = gDigestAck2Message.getEndPointStateMap();
- /* Notify the Failure Detector */
- Gossiper.instance.notifyFailureDetector(remoteEpStateMap);
- Gossiper.instance.applyStateLocally(remoteEpStateMap);
}
-}
-
+}
\ No newline at end of file
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java?rev=901858&r1=901857&r2=901858&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java Thu Jan 21 21:22:25 2010
@@ -26,6 +26,7 @@
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.service.StorageService;
public class Message
{
@@ -121,7 +122,7 @@
// TODO should take byte[] + length so we don't have to copy to a byte[] of exactly the right len
public Message getReply(InetAddress from, byte[] args)
{
- Header header = new Header(getMessageId(), from, StageManager.RESPONSE_STAGE, MessagingService.responseVerbHandler_);
+ Header header = new Header(getMessageId(), from, StageManager.RESPONSE_STAGE, StorageService.responseVerbHandler_);
return new Message(header, args);
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=901858&r1=901857&r2=901858&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Thu Jan 21 21:22:25 2010
@@ -105,11 +105,6 @@
new NamedThreadFactory("MESSAGE-DESERIALIZER-POOL"));
streamExecutor_ = new JMXEnabledThreadPoolExecutor("MESSAGE-STREAMING-POOL");
-
- /* register the response verb handler */
- registerVerbHandlers(MessagingService.responseVerbHandler_, new ResponseVerbHandler());
-
- FailureDetector.instance.registerFailureDetectionEventListener(this);
}
public byte[] hash(String type, byte data[])
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java?rev=901858&r1=901857&r2=901858&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java Thu Jan 21 21:22:25 2010
@@ -20,7 +20,7 @@
import org.apache.log4j.Logger;
-class ResponseVerbHandler implements IVerbHandler
+public class ResponseVerbHandler implements IVerbHandler
{
private static final Logger logger_ = Logger.getLogger( ResponseVerbHandler.class );
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=901858&r1=901857&r2=901858&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java Thu Jan 21 21:22:25 2010
@@ -89,9 +89,6 @@
{
private static final Logger logger = Logger.getLogger(AntiEntropyService.class);
- public final static String TREE_REQUEST_VERB = "TREE-REQUEST-VERB";
- public final static String TREE_RESPONSE_VERB = "TREE-RESPONSE-VERB";
-
// millisecond lifetime to store trees before they become stale
public final static long TREE_STORE_TIMEOUT = 600000;
// max millisecond frequency that natural (automatic) repairs should run at
@@ -120,8 +117,6 @@
*/
protected AntiEntropyService()
{
- MessagingService.instance.registerVerbHandlers(TREE_REQUEST_VERB, new TreeRequestVerbHandler());
- MessagingService.instance.registerVerbHandlers(TREE_RESPONSE_VERB, new TreeResponseVerbHandler());
naturalRepairs = new ConcurrentHashMap<CFPair, Long>();
trees = new HashMap<CFPair, ExpiringMap<InetAddress, TreePair>>();
}
@@ -662,7 +657,7 @@
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
SERIALIZER.serialize(new CFPair(table, cf), dos);
- return new Message(FBUtilities.getLocalAddress(), StageManager.AE_SERVICE_STAGE, TREE_REQUEST_VERB, bos.toByteArray());
+ return new Message(FBUtilities.getLocalAddress(), StageManager.AE_SERVICE_STAGE, StorageService.TREE_REQUEST_VERB, bos.toByteArray());
}
catch(IOException e)
{
@@ -720,7 +715,7 @@
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
SERIALIZER.serialize(validator, dos);
- return new Message(local, StageManager.AE_SERVICE_STAGE, TREE_RESPONSE_VERB, bos.toByteArray());
+ return new Message(local, StageManager.AE_SERVICE_STAGE, StorageService.TREE_RESPONSE_VERB, bos.toByteArray());
}
catch(IOException e)
{
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=901858&r1=901857&r2=901858&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Thu Jan 21 21:22:25 2010
@@ -85,6 +85,17 @@
public final static String streamRequestVerbHandler_ = "BS-METADATA-VERB-HANDLER";
public final static String rangeSliceVerbHandler_ = "RANGE-SLICE-VERB-HANDLER";
public final static String bootstrapTokenVerbHandler_ = "SPLITS-VERB-HANDLER";
+ public final static String TREE_REQUEST_VERB = "TREE-REQUEST-VERB";
+ public final static String TREE_RESPONSE_VERB = "TREE-RESPONSE-VERB";
+ public static final String responseVerbHandler_ = "RESPONSE";
+ /* GSV - abbreviation for GOSSIP-DIGEST-SYN-VERB */
+ public final static String JOIN_VERB = "JVH";
+ /* GSV - abbreviation for GOSSIP-DIGEST-SYN-VERB */
+ public final static String GOSSIP_DIGEST_SYN_VERB = "GSV";
+ /* GAV - abbreviation for GOSSIP-DIGEST-ACK-VERB */
+ public final static String GOSSIP_DIGEST_ACK_VERB = "GAV";
+ /* GA2V - abbreviation for GOSSIP-DIGEST-ACK2-VERB */
+ public final static String GOSSIP_DIGEST_ACK2_VERB = "GA2V";
private static IPartitioner partitioner_ = DatabaseDescriptor.getPartitioner();
@@ -199,6 +210,14 @@
MessagingService.instance.registerVerbHandlers(streamInitiateVerbHandler_, new Streaming.StreamInitiateVerbHandler());
MessagingService.instance.registerVerbHandlers(streamInitiateDoneVerbHandler_, new Streaming.StreamInitiateDoneVerbHandler());
MessagingService.instance.registerVerbHandlers(streamFinishedVerbHandler_, new Streaming.StreamFinishedVerbHandler());
+ MessagingService.instance.registerVerbHandlers(responseVerbHandler_, new ResponseVerbHandler());
+ MessagingService.instance.registerVerbHandlers(TREE_REQUEST_VERB, new TreeRequestVerbHandler());
+ MessagingService.instance.registerVerbHandlers(TREE_RESPONSE_VERB, new AntiEntropyService.TreeResponseVerbHandler());
+
+ MessagingService.instance.registerVerbHandlers(JOIN_VERB, new Gossiper.JoinVerbHandler());
+ MessagingService.instance.registerVerbHandlers(GOSSIP_DIGEST_SYN_VERB, new Gossiper.GossipDigestSynVerbHandler());
+ MessagingService.instance.registerVerbHandlers(GOSSIP_DIGEST_ACK_VERB, new Gossiper.GossipDigestAckVerbHandler());
+ MessagingService.instance.registerVerbHandlers(GOSSIP_DIGEST_ACK2_VERB, new Gossiper.GossipDigestAck2VerbHandler());
replicationStrategy_ = getReplicationStrategy(tokenMetadata_);
}