You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/01/21 22:22:26 UTC

svn commit: r901858 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra: db/ReadResponse.java gms/Gossiper.java net/Message.java net/MessagingService.java net/ResponseVerbHandler.java service/AntiEntropyService.java service/StorageService.java

Author: jbellis
Date: Thu Jan 21 21:22:25 2010
New Revision: 901858

URL: http://svn.apache.org/viewvc?rev=901858&view=rev
Log:
mv tree and gossip verb registration into StorageService
patch by jbellis; reviewed by Stu Hood for CASSANDRA-717

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java?rev=901858&r1=901857&r2=901858&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java Thu Jan 21 21:22:25 2010
@@ -51,15 +51,6 @@
         return serializer_;
     }
     
-	public static Message makeReadResponseMessage(ReadResponse readResponse) throws IOException
-    {
-    	ByteArrayOutputStream bos = new ByteArrayOutputStream();
-        DataOutputStream dos = new DataOutputStream( bos );
-        ReadResponse.serializer().serialize(readResponse, dos);
-        Message message = new Message(FBUtilities.getLocalAddress(), StageManager.RESPONSE_STAGE, MessagingService.responseVerbHandler_, bos.toByteArray());
-        return message;
-    }
-	
 	private Row row_;
 	private byte[] digest_ = ArrayUtils.EMPTY_BYTE_ARRAY;
     private boolean isDigestQuery_ = false;

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=901858&r1=901857&r2=901858&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Thu Jan 21 21:22:25 2010
@@ -27,6 +27,7 @@
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
 
 import org.apache.log4j.Logger;
 
@@ -98,14 +99,6 @@
     }
 
     final static int MAX_GOSSIP_PACKET_SIZE = 1428;
-    /* GSV - abbreviation for GOSSIP-DIGEST-SYN-VERB */
-    final static String JOIN_VERB_HANDLER = "JVH";
-    /* GSV - abbreviation for GOSSIP-DIGEST-SYN-VERB */
-    final static String GOSSIP_DIGEST_SYN_VERB = "GSV";
-    /* GAV - abbreviation for GOSSIP-DIGEST-ACK-VERB */
-    final static String GOSSIP_DIGEST_ACK_VERB = "GAV";
-    /* GA2V - abbreviation for GOSSIP-DIGEST-ACK2-VERB */
-    final static String GOSSIP_DIGEST_ACK2_VERB = "GA2V";
     public final static int intervalInMillis_ = 1000;
     private static Logger logger_ = Logger.getLogger(Gossiper.class);
     public static final Gossiper instance = new Gossiper();
@@ -136,11 +129,6 @@
         aVeryLongTime_ = 259200 * 1000;
         /* register with the Failure Detector for receiving Failure detector events */
         FailureDetector.instance.registerFailureDetectionEventListener(this);
-        /* register the verbs */
-        MessagingService.instance.registerVerbHandlers(JOIN_VERB_HANDLER, new JoinVerbHandler());
-        MessagingService.instance.registerVerbHandlers(GOSSIP_DIGEST_SYN_VERB, new GossipDigestSynVerbHandler());
-        MessagingService.instance.registerVerbHandlers(GOSSIP_DIGEST_ACK_VERB, new GossipDigestAckVerbHandler());
-        MessagingService.instance.registerVerbHandlers(GOSSIP_DIGEST_ACK2_VERB, new GossipDigestAck2VerbHandler());
     }
 
     /** Register with the Gossiper for EndPointState notifications */
@@ -271,7 +259,7 @@
         ByteArrayOutputStream bos = new ByteArrayOutputStream(Gossiper.MAX_GOSSIP_PACKET_SIZE);
         DataOutputStream dos = new DataOutputStream( bos );
         GossipDigestSynMessage.serializer().serialize(gDigestMessage, dos);
-        return new Message(localEndPoint_, StageManager.GOSSIP_STAGE, GOSSIP_DIGEST_SYN_VERB, bos.toByteArray());
+        return new Message(localEndPoint_, StageManager.GOSSIP_STAGE, StorageService.GOSSIP_DIGEST_SYN_VERB, bos.toByteArray());
     }
 
     Message makeGossipDigestAckMessage(GossipDigestAckMessage gDigestAckMessage) throws IOException
@@ -281,7 +269,7 @@
         GossipDigestAckMessage.serializer().serialize(gDigestAckMessage, dos);
         if (logger_.isTraceEnabled())
             logger_.trace("@@@@ Size of GossipDigestAckMessage is " + bos.toByteArray().length);
-        return new Message(localEndPoint_, StageManager.GOSSIP_STAGE, GOSSIP_DIGEST_ACK_VERB, bos.toByteArray());
+        return new Message(localEndPoint_, StageManager.GOSSIP_STAGE, StorageService.GOSSIP_DIGEST_ACK_VERB, bos.toByteArray());
     }
 
     Message makeGossipDigestAck2Message(GossipDigestAck2Message gDigestAck2Message) throws IOException
@@ -289,7 +277,7 @@
         ByteArrayOutputStream bos = new ByteArrayOutputStream(Gossiper.MAX_GOSSIP_PACKET_SIZE);
         DataOutputStream dos = new DataOutputStream(bos);
         GossipDigestAck2Message.serializer().serialize(gDigestAck2Message, dos);
-        return new Message(localEndPoint_, StageManager.GOSSIP_STAGE, GOSSIP_DIGEST_ACK2_VERB, bos.toByteArray());
+        return new Message(localEndPoint_, StageManager.GOSSIP_STAGE, StorageService.GOSSIP_DIGEST_ACK2_VERB, bos.toByteArray());
     }
 
     /**
@@ -822,197 +810,196 @@
         gossipTimer_.cancel();
         gossipTimer_ = new Timer(false); // makes the Gossiper reentrant.
     }
-}
 
-class JoinVerbHandler implements IVerbHandler
-{
-    private static Logger logger_ = Logger.getLogger( JoinVerbHandler.class);
-
-    public void doVerb(Message message)
+    public static class JoinVerbHandler implements IVerbHandler
     {
-        InetAddress from = message.getFrom();
-        if (logger_.isDebugEnabled())
-          logger_.debug("Received a JoinMessage from " + from);
-
-        byte[] bytes = message.getMessageBody();
-        DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
+        private static Logger logger_ = Logger.getLogger( JoinVerbHandler.class);
 
-        JoinMessage joinMessage;
-        try
-        {
-            joinMessage = JoinMessage.serializer().deserialize(dis);
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
-        }
-        if ( joinMessage.clusterId_.equals( DatabaseDescriptor.getClusterName() ) )
+        public void doVerb(Message message)
         {
-            Gossiper.instance.join(from);
+            InetAddress from = message.getFrom();
+            if (logger_.isDebugEnabled())
+              logger_.debug("Received a JoinMessage from " + from);
+
+            byte[] bytes = message.getMessageBody();
+            DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
+
+            JoinMessage joinMessage;
+            try
+            {
+                joinMessage = JoinMessage.serializer().deserialize(dis);
+            }
+            catch (IOException e)
+            {
+                throw new RuntimeException(e);
+            }
+            if ( joinMessage.clusterId_.equals( DatabaseDescriptor.getClusterName() ) )
+            {
+                Gossiper.instance.join(from);
+            }
         }
     }
-}
-
-class GossipDigestSynVerbHandler implements IVerbHandler
-{
-    private static Logger logger_ = Logger.getLogger( GossipDigestSynVerbHandler.class);
 
-    public void doVerb(Message message)
+    public static class GossipDigestSynVerbHandler implements IVerbHandler
     {
-        InetAddress from = message.getFrom();
-        if (logger_.isTraceEnabled())
-            logger_.trace("Received a GossipDigestSynMessage from " + from);
-
-        byte[] bytes = message.getMessageBody();
-        DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
+        private static Logger logger_ = Logger.getLogger( GossipDigestSynVerbHandler.class);
 
-        try
+        public void doVerb(Message message)
         {
-            GossipDigestSynMessage gDigestMessage = GossipDigestSynMessage.serializer().deserialize(dis);
-            /* If the message is from a different cluster throw it away. */
-            if ( !gDigestMessage.clusterId_.equals(DatabaseDescriptor.getClusterName()) )
-                return;
+            InetAddress from = message.getFrom();
+            if (logger_.isTraceEnabled())
+                logger_.trace("Received a GossipDigestSynMessage from " + from);
 
-            List<GossipDigest> gDigestList = gDigestMessage.getGossipDigests();
-            /* Notify the Failure Detector */
-            Gossiper.instance.notifyFailureDetector(gDigestList);
+            byte[] bytes = message.getMessageBody();
+            DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
 
-            doSort(gDigestList);
+            try
+            {
+                GossipDigestSynMessage gDigestMessage = GossipDigestSynMessage.serializer().deserialize(dis);
+                /* If the message is from a different cluster throw it away. */
+                if ( !gDigestMessage.clusterId_.equals(DatabaseDescriptor.getClusterName()) )
+                    return;
 
-            List<GossipDigest> deltaGossipDigestList = new ArrayList<GossipDigest>();
-            Map<InetAddress, EndPointState> deltaEpStateMap = new HashMap<InetAddress, EndPointState>();
-            Gossiper.instance.examineGossiper(gDigestList, deltaGossipDigestList, deltaEpStateMap);
+                List<GossipDigest> gDigestList = gDigestMessage.getGossipDigests();
+                /* Notify the Failure Detector */
+                Gossiper.instance.notifyFailureDetector(gDigestList);
 
-            GossipDigestAckMessage gDigestAck = new GossipDigestAckMessage(deltaGossipDigestList, deltaEpStateMap);
-            Message gDigestAckMessage = Gossiper.instance.makeGossipDigestAckMessage(gDigestAck);
-            if (logger_.isTraceEnabled())
-                logger_.trace("Sending a GossipDigestAckMessage to " + from);
-            MessagingService.instance.sendOneWay(gDigestAckMessage, from);
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
+                doSort(gDigestList);
 
-    /*
-     * First construct a map whose key is the endpoint in the GossipDigest and the value is the
-     * GossipDigest itself. Then build a list of version differences i.e difference between the
-     * version in the GossipDigest and the version in the local state for a given InetAddress.
-     * Sort this list. Now loop through the sorted list and retrieve the GossipDigest corresponding
-     * to the endpoint from the map that was initially constructed.
-    */
-    private void doSort(List<GossipDigest> gDigestList)
-    {
-        /* Construct a map of endpoint to GossipDigest. */
-        Map<InetAddress, GossipDigest> epToDigestMap = new HashMap<InetAddress, GossipDigest>();
-        for ( GossipDigest gDigest : gDigestList )
-        {
-            epToDigestMap.put(gDigest.getEndPoint(), gDigest);
-        }
+                List<GossipDigest> deltaGossipDigestList = new ArrayList<GossipDigest>();
+                Map<InetAddress, EndPointState> deltaEpStateMap = new HashMap<InetAddress, EndPointState>();
+                Gossiper.instance.examineGossiper(gDigestList, deltaGossipDigestList, deltaEpStateMap);
 
-        /*
-         * These digests have their maxVersion set to the difference of the version
-         * of the local EndPointState and the version found in the GossipDigest.
-        */
-        List<GossipDigest> diffDigests = new ArrayList<GossipDigest>();
-        for ( GossipDigest gDigest : gDigestList )
-        {
-            InetAddress ep = gDigest.getEndPoint();
-            EndPointState epState = Gossiper.instance.getEndPointStateForEndPoint(ep);
-            int version = (epState != null) ? Gossiper.instance.getMaxEndPointStateVersion( epState ) : 0;
-            int diffVersion = Math.abs(version - gDigest.getMaxVersion() );
-            diffDigests.add( new GossipDigest(ep, gDigest.getGeneration(), diffVersion) );
+                GossipDigestAckMessage gDigestAck = new GossipDigestAckMessage(deltaGossipDigestList, deltaEpStateMap);
+                Message gDigestAckMessage = Gossiper.instance.makeGossipDigestAckMessage(gDigestAck);
+                if (logger_.isTraceEnabled())
+                    logger_.trace("Sending a GossipDigestAckMessage to " + from);
+                MessagingService.instance.sendOneWay(gDigestAckMessage, from);
+            }
+            catch (IOException e)
+            {
+                throw new RuntimeException(e);
+            }
         }
 
-        gDigestList.clear();
-        Collections.sort(diffDigests);
-        int size = diffDigests.size();
         /*
-         * Report the digests in descending order. This takes care of the endpoints
-         * that are far behind w.r.t this local endpoint
+         * First construct a map whose key is the endpoint in the GossipDigest and the value is the
+         * GossipDigest itself. Then build a list of version differences i.e difference between the
+         * version in the GossipDigest and the version in the local state for a given InetAddress.
+         * Sort this list. Now loop through the sorted list and retrieve the GossipDigest corresponding
+         * to the endpoint from the map that was initially constructed.
         */
-        for ( int i = size - 1; i >= 0; --i )
+        private void doSort(List<GossipDigest> gDigestList)
         {
-            gDigestList.add( epToDigestMap.get(diffDigests.get(i).getEndPoint()) );
+            /* Construct a map of endpoint to GossipDigest. */
+            Map<InetAddress, GossipDigest> epToDigestMap = new HashMap<InetAddress, GossipDigest>();
+            for ( GossipDigest gDigest : gDigestList )
+            {
+                epToDigestMap.put(gDigest.getEndPoint(), gDigest);
+            }
+
+            /*
+             * These digests have their maxVersion set to the difference of the version
+             * of the local EndPointState and the version found in the GossipDigest.
+            */
+            List<GossipDigest> diffDigests = new ArrayList<GossipDigest>();
+            for ( GossipDigest gDigest : gDigestList )
+            {
+                InetAddress ep = gDigest.getEndPoint();
+                EndPointState epState = Gossiper.instance.getEndPointStateForEndPoint(ep);
+                int version = (epState != null) ? Gossiper.instance.getMaxEndPointStateVersion( epState ) : 0;
+                int diffVersion = Math.abs(version - gDigest.getMaxVersion() );
+                diffDigests.add( new GossipDigest(ep, gDigest.getGeneration(), diffVersion) );
+            }
+
+            gDigestList.clear();
+            Collections.sort(diffDigests);
+            int size = diffDigests.size();
+            /*
+             * Report the digests in descending order. This takes care of the endpoints
+             * that are far behind w.r.t this local endpoint
+            */
+            for ( int i = size - 1; i >= 0; --i )
+            {
+                gDigestList.add( epToDigestMap.get(diffDigests.get(i).getEndPoint()) );
+            }
         }
     }
-}
-
-class GossipDigestAckVerbHandler implements IVerbHandler
-{
-    private static Logger logger_ = Logger.getLogger(GossipDigestAckVerbHandler.class);
 
-    public void doVerb(Message message)
+    public static class GossipDigestAckVerbHandler implements IVerbHandler
     {
-        InetAddress from = message.getFrom();
-        if (logger_.isTraceEnabled())
-            logger_.trace("Received a GossipDigestAckMessage from " + from);
-
-        byte[] bytes = message.getMessageBody();
-        DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
+        private static Logger logger_ = Logger.getLogger(GossipDigestAckVerbHandler.class);
 
-        try
+        public void doVerb(Message message)
         {
-            GossipDigestAckMessage gDigestAckMessage = GossipDigestAckMessage.serializer().deserialize(dis);
-            List<GossipDigest> gDigestList = gDigestAckMessage.getGossipDigestList();
-            Map<InetAddress, EndPointState> epStateMap = gDigestAckMessage.getEndPointStateMap();
+            InetAddress from = message.getFrom();
+            if (logger_.isTraceEnabled())
+                logger_.trace("Received a GossipDigestAckMessage from " + from);
+
+            byte[] bytes = message.getMessageBody();
+            DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
 
-            if ( epStateMap.size() > 0 )
+            try
             {
-                /* Notify the Failure Detector */
-                Gossiper.instance.notifyFailureDetector(epStateMap);
-                Gossiper.instance.applyStateLocally(epStateMap);
-            }
+                GossipDigestAckMessage gDigestAckMessage = GossipDigestAckMessage.serializer().deserialize(dis);
+                List<GossipDigest> gDigestList = gDigestAckMessage.getGossipDigestList();
+                Map<InetAddress, EndPointState> epStateMap = gDigestAckMessage.getEndPointStateMap();
+
+                if ( epStateMap.size() > 0 )
+                {
+                    /* Notify the Failure Detector */
+                    Gossiper.instance.notifyFailureDetector(epStateMap);
+                    Gossiper.instance.applyStateLocally(epStateMap);
+                }
+
+                /* Get the state required to send to this gossipee - construct GossipDigestAck2Message */
+                Map<InetAddress, EndPointState> deltaEpStateMap = new HashMap<InetAddress, EndPointState>();
+                for( GossipDigest gDigest : gDigestList )
+                {
+                    InetAddress addr = gDigest.getEndPoint();
+                    EndPointState localEpStatePtr = Gossiper.instance.getStateForVersionBiggerThan(addr, gDigest.getMaxVersion());
+                    if ( localEpStatePtr != null )
+                        deltaEpStateMap.put(addr, localEpStatePtr);
+                }
 
-            /* Get the state required to send to this gossipee - construct GossipDigestAck2Message */
-            Map<InetAddress, EndPointState> deltaEpStateMap = new HashMap<InetAddress, EndPointState>();
-            for( GossipDigest gDigest : gDigestList )
+                GossipDigestAck2Message gDigestAck2 = new GossipDigestAck2Message(deltaEpStateMap);
+                Message gDigestAck2Message = Gossiper.instance.makeGossipDigestAck2Message(gDigestAck2);
+                if (logger_.isTraceEnabled())
+                    logger_.trace("Sending a GossipDigestAck2Message to " + from);
+                MessagingService.instance.sendOneWay(gDigestAck2Message, from);
+            }
+            catch ( IOException e )
             {
-                InetAddress addr = gDigest.getEndPoint();
-                EndPointState localEpStatePtr = Gossiper.instance.getStateForVersionBiggerThan(addr, gDigest.getMaxVersion());
-                if ( localEpStatePtr != null )
-                    deltaEpStateMap.put(addr, localEpStatePtr);
+                throw new RuntimeException(e);
             }
-
-            GossipDigestAck2Message gDigestAck2 = new GossipDigestAck2Message(deltaEpStateMap);
-            Message gDigestAck2Message = Gossiper.instance.makeGossipDigestAck2Message(gDigestAck2);
-            if (logger_.isTraceEnabled())
-                logger_.trace("Sending a GossipDigestAck2Message to " + from);
-            MessagingService.instance.sendOneWay(gDigestAck2Message, from);
-        }
-        catch ( IOException e )
-        {
-            throw new RuntimeException(e);
         }
     }
-}
 
-class GossipDigestAck2VerbHandler implements IVerbHandler
-{
-    private static Logger logger_ = Logger.getLogger(GossipDigestAck2VerbHandler.class);
-
-    public void doVerb(Message message)
+    public static class GossipDigestAck2VerbHandler implements IVerbHandler
     {
-        InetAddress from = message.getFrom();
-        if (logger_.isTraceEnabled())
-            logger_.trace("Received a GossipDigestAck2Message from " + from);
+        private static Logger logger_ = Logger.getLogger(GossipDigestAck2VerbHandler.class);
 
-        byte[] bytes = message.getMessageBody();
-        DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
-        GossipDigestAck2Message gDigestAck2Message;
-        try
+        public void doVerb(Message message)
         {
-            gDigestAck2Message = GossipDigestAck2Message.serializer().deserialize(dis);
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
+            InetAddress from = message.getFrom();
+            if (logger_.isTraceEnabled())
+                logger_.trace("Received a GossipDigestAck2Message from " + from);
+
+            byte[] bytes = message.getMessageBody();
+            DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
+            GossipDigestAck2Message gDigestAck2Message;
+            try
+            {
+                gDigestAck2Message = GossipDigestAck2Message.serializer().deserialize(dis);
+            }
+            catch (IOException e)
+            {
+                throw new RuntimeException(e);
+            }
+            Map<InetAddress, EndPointState> remoteEpStateMap = gDigestAck2Message.getEndPointStateMap();
+            /* Notify the Failure Detector */
+            Gossiper.instance.notifyFailureDetector(remoteEpStateMap);
+            Gossiper.instance.applyStateLocally(remoteEpStateMap);
         }
-        Map<InetAddress, EndPointState> remoteEpStateMap = gDigestAck2Message.getEndPointStateMap();
-        /* Notify the Failure Detector */
-        Gossiper.instance.notifyFailureDetector(remoteEpStateMap);
-        Gossiper.instance.applyStateLocally(remoteEpStateMap);
     }
-}
-
+}
\ No newline at end of file

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java?rev=901858&r1=901857&r2=901858&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java Thu Jan 21 21:22:25 2010
@@ -26,6 +26,7 @@
 
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.service.StorageService;
 
 public class Message
 {
@@ -121,7 +122,7 @@
     // TODO should take byte[] + length so we don't have to copy to a byte[] of exactly the right len
     public Message getReply(InetAddress from, byte[] args)
     {
-        Header header = new Header(getMessageId(), from, StageManager.RESPONSE_STAGE, MessagingService.responseVerbHandler_);
+        Header header = new Header(getMessageId(), from, StageManager.RESPONSE_STAGE, StorageService.responseVerbHandler_);
         return new Message(header, args);
     }
     

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=901858&r1=901857&r2=901858&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Thu Jan 21 21:22:25 2010
@@ -105,11 +105,6 @@
                                                                         new NamedThreadFactory("MESSAGE-DESERIALIZER-POOL"));
 
         streamExecutor_ = new JMXEnabledThreadPoolExecutor("MESSAGE-STREAMING-POOL");
-                
-        /* register the response verb handler */
-        registerVerbHandlers(MessagingService.responseVerbHandler_, new ResponseVerbHandler());
-
-        FailureDetector.instance.registerFailureDetectionEventListener(this);
     }
     
     public byte[] hash(String type, byte data[])

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java?rev=901858&r1=901857&r2=901858&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java Thu Jan 21 21:22:25 2010
@@ -20,7 +20,7 @@
 
 import org.apache.log4j.Logger;
 
-class ResponseVerbHandler implements IVerbHandler
+public class ResponseVerbHandler implements IVerbHandler
 {
     private static final Logger logger_ = Logger.getLogger( ResponseVerbHandler.class );
     

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=901858&r1=901857&r2=901858&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java Thu Jan 21 21:22:25 2010
@@ -89,9 +89,6 @@
 {
     private static final Logger logger = Logger.getLogger(AntiEntropyService.class);
 
-    public final static String TREE_REQUEST_VERB = "TREE-REQUEST-VERB";
-    public final static String TREE_RESPONSE_VERB = "TREE-RESPONSE-VERB";
-
     // millisecond lifetime to store trees before they become stale
     public final static long TREE_STORE_TIMEOUT = 600000;
     // max millisecond frequency that natural (automatic) repairs should run at
@@ -120,8 +117,6 @@
      */
     protected AntiEntropyService()
     {
-        MessagingService.instance.registerVerbHandlers(TREE_REQUEST_VERB, new TreeRequestVerbHandler());
-        MessagingService.instance.registerVerbHandlers(TREE_RESPONSE_VERB, new TreeResponseVerbHandler());
         naturalRepairs = new ConcurrentHashMap<CFPair, Long>();
         trees = new HashMap<CFPair, ExpiringMap<InetAddress, TreePair>>();
     }
@@ -662,7 +657,7 @@
                 ByteArrayOutputStream bos = new ByteArrayOutputStream();
                 DataOutputStream dos = new DataOutputStream(bos);
                 SERIALIZER.serialize(new CFPair(table, cf), dos);
-                return new Message(FBUtilities.getLocalAddress(), StageManager.AE_SERVICE_STAGE, TREE_REQUEST_VERB, bos.toByteArray());
+                return new Message(FBUtilities.getLocalAddress(), StageManager.AE_SERVICE_STAGE, StorageService.TREE_REQUEST_VERB, bos.toByteArray());
             }
             catch(IOException e)
             {
@@ -720,7 +715,7 @@
                 ByteArrayOutputStream bos = new ByteArrayOutputStream();
                 DataOutputStream dos = new DataOutputStream(bos);
                 SERIALIZER.serialize(validator, dos);
-                return new Message(local, StageManager.AE_SERVICE_STAGE, TREE_RESPONSE_VERB, bos.toByteArray());
+                return new Message(local, StageManager.AE_SERVICE_STAGE, StorageService.TREE_RESPONSE_VERB, bos.toByteArray());
             }
             catch(IOException e)
             {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=901858&r1=901857&r2=901858&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Thu Jan 21 21:22:25 2010
@@ -85,6 +85,17 @@
     public final static String streamRequestVerbHandler_ = "BS-METADATA-VERB-HANDLER";
     public final static String rangeSliceVerbHandler_ = "RANGE-SLICE-VERB-HANDLER";
     public final static String bootstrapTokenVerbHandler_ = "SPLITS-VERB-HANDLER";
+    public final static String TREE_REQUEST_VERB = "TREE-REQUEST-VERB";
+    public final static String TREE_RESPONSE_VERB = "TREE-RESPONSE-VERB";
+    public static final String responseVerbHandler_ = "RESPONSE";
+    /* GSV - abbreviation for GOSSIP-DIGEST-SYN-VERB */
+    public final static String JOIN_VERB = "JVH";
+    /* GSV - abbreviation for GOSSIP-DIGEST-SYN-VERB */
+    public final static String GOSSIP_DIGEST_SYN_VERB = "GSV";
+    /* GAV - abbreviation for GOSSIP-DIGEST-ACK-VERB */
+    public final static String GOSSIP_DIGEST_ACK_VERB = "GAV";
+    /* GA2V - abbreviation for GOSSIP-DIGEST-ACK2-VERB */
+    public final static String GOSSIP_DIGEST_ACK2_VERB = "GA2V";
 
     private static IPartitioner partitioner_ = DatabaseDescriptor.getPartitioner();
 
@@ -199,6 +210,14 @@
         MessagingService.instance.registerVerbHandlers(streamInitiateVerbHandler_, new Streaming.StreamInitiateVerbHandler());
         MessagingService.instance.registerVerbHandlers(streamInitiateDoneVerbHandler_, new Streaming.StreamInitiateDoneVerbHandler());
         MessagingService.instance.registerVerbHandlers(streamFinishedVerbHandler_, new Streaming.StreamFinishedVerbHandler());
+        MessagingService.instance.registerVerbHandlers(responseVerbHandler_, new ResponseVerbHandler());
+        MessagingService.instance.registerVerbHandlers(TREE_REQUEST_VERB, new TreeRequestVerbHandler());
+        MessagingService.instance.registerVerbHandlers(TREE_RESPONSE_VERB, new AntiEntropyService.TreeResponseVerbHandler());
+
+        MessagingService.instance.registerVerbHandlers(JOIN_VERB, new Gossiper.JoinVerbHandler());
+        MessagingService.instance.registerVerbHandlers(GOSSIP_DIGEST_SYN_VERB, new Gossiper.GossipDigestSynVerbHandler());
+        MessagingService.instance.registerVerbHandlers(GOSSIP_DIGEST_ACK_VERB, new Gossiper.GossipDigestAckVerbHandler());
+        MessagingService.instance.registerVerbHandlers(GOSSIP_DIGEST_ACK2_VERB, new Gossiper.GossipDigestAck2VerbHandler());
 
         replicationStrategy_ = getReplicationStrategy(tokenMetadata_);
     }