You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by gd...@apache.org on 2010/06/10 18:18:21 UTC
svn commit: r953373 - in /cassandra/branches/cassandra-0.6: ./
src/java/org/apache/cassandra/gms/ src/java/org/apache/cassandra/net/
src/java/org/apache/cassandra/service/
Author: gdusbabek
Date: Thu Jun 10 16:18:20 2010
New Revision: 953373
URL: http://svn.apache.org/viewvc?rev=953373&view=rev
Log:
restructure the startup ordering of Gossiper and MessageService to avoid timing anomalies. patch by Matthew Dennis, reviewed by Gary Dusbabek. CASSANDRA-1160
Added:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossiperJoinVerbHandler.java
Modified:
cassandra/branches/cassandra-0.6/CHANGES.txt
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/Gossiper.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java
Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=953373&r1=953372&r2=953373&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Thu Jun 10 16:18:20 2010
@@ -16,6 +16,8 @@
(CASSANDRA-1146)
* use generation time to resolve node token reassignment disagreements
(CASSANDRA-1118)
+ * restructure the startup ordering of Gossiper and MessageService to avoid
+ timing anomalies (CASSANDRA-1160)
0.6.2
Added: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java?rev=953373&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java (added)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java Thu Jun 10 16:18:20 2010
@@ -0,0 +1,39 @@
+package org.apache.cassandra.gms;
+
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.log4j.Logger;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.Map;
+
+public class GossipDigestAck2VerbHandler implements IVerbHandler
+{
+ private static Logger logger_ = Logger.getLogger(GossipDigestAck2VerbHandler.class);
+
+ public void doVerb(Message message)
+ {
+ InetAddress from = message.getFrom();
+ if (logger_.isTraceEnabled())
+ logger_.trace("Received a GossipDigestAck2Message from " + from);
+
+ byte[] bytes = message.getMessageBody();
+ DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
+ GossipDigestAck2Message gDigestAck2Message;
+ try
+ {
+ gDigestAck2Message = GossipDigestAck2Message.serializer().deserialize(dis);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ Map<InetAddress, EndPointState> remoteEpStateMap = gDigestAck2Message.getEndPointStateMap();
+ /* Notify the Failure Detector */
+ Gossiper.instance.notifyFailureDetector(remoteEpStateMap);
+ Gossiper.instance.applyStateLocally(remoteEpStateMap);
+ }
+}
Added: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java?rev=953373&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java (added)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java Thu Jun 10 16:18:20 2010
@@ -0,0 +1,63 @@
+package org.apache.cassandra.gms;
+
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.log4j.Logger;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class GossipDigestAckVerbHandler implements IVerbHandler
+{
+ private static Logger logger_ = Logger.getLogger(GossipDigestAckVerbHandler.class);
+
+ public void doVerb(Message message)
+ {
+ InetAddress from = message.getFrom();
+ if (logger_.isTraceEnabled())
+ logger_.trace("Received a GossipDigestAckMessage from " + from);
+
+ byte[] bytes = message.getMessageBody();
+ DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
+
+ try
+ {
+ GossipDigestAckMessage gDigestAckMessage = GossipDigestAckMessage.serializer().deserialize(dis);
+ List<GossipDigest> gDigestList = gDigestAckMessage.getGossipDigestList();
+ Map<InetAddress, EndPointState> epStateMap = gDigestAckMessage.getEndPointStateMap();
+
+ if ( epStateMap.size() > 0 )
+ {
+ /* Notify the Failure Detector */
+ Gossiper.instance.notifyFailureDetector(epStateMap);
+ Gossiper.instance.applyStateLocally(epStateMap);
+ }
+
+ /* Get the state required to send to this gossipee - construct GossipDigestAck2Message */
+ Map<InetAddress, EndPointState> deltaEpStateMap = new HashMap<InetAddress, EndPointState>();
+ for( GossipDigest gDigest : gDigestList )
+ {
+ InetAddress addr = gDigest.getEndPoint();
+ EndPointState localEpStatePtr = Gossiper.instance.getStateForVersionBiggerThan(addr, gDigest.getMaxVersion());
+ if ( localEpStatePtr != null )
+ deltaEpStateMap.put(addr, localEpStatePtr);
+ }
+
+ GossipDigestAck2Message gDigestAck2 = new GossipDigestAck2Message(deltaEpStateMap);
+ Message gDigestAck2Message = Gossiper.instance.makeGossipDigestAck2Message(gDigestAck2);
+ if (logger_.isTraceEnabled())
+ logger_.trace("Sending a GossipDigestAck2Message to " + from);
+ MessagingService.instance.sendOneWay(gDigestAck2Message, from);
+ }
+ catch ( IOException e )
+ {
+ throw new RuntimeException(e);
+ }
+ }
+}
Added: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java?rev=953373&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java (added)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java Thu Jun 10 16:18:20 2010
@@ -0,0 +1,102 @@
+package org.apache.cassandra.gms;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.log4j.Logger;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.*;
+
+public class GossipDigestSynVerbHandler implements IVerbHandler
+{
+ private static Logger logger_ = Logger.getLogger( GossipDigestSynVerbHandler.class);
+
+ public void doVerb(Message message)
+ {
+ InetAddress from = message.getFrom();
+ if (logger_.isTraceEnabled())
+ logger_.trace("Received a GossipDigestSynMessage from " + from);
+
+ byte[] bytes = message.getMessageBody();
+ DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
+
+ try
+ {
+ GossipDigestSynMessage gDigestMessage = GossipDigestSynMessage.serializer().deserialize(dis);
+ /* If the message is from a different cluster throw it away. */
+ if ( !gDigestMessage.clusterId_.equals(DatabaseDescriptor.getClusterName()) )
+ {
+ logger_.warn("ClusterName mismatch from " + from + " " + gDigestMessage.clusterId_ + "!=" + DatabaseDescriptor.getClusterName());
+ return;
+ }
+
+ List<GossipDigest> gDigestList = gDigestMessage.getGossipDigests();
+ /* Notify the Failure Detector */
+ Gossiper.instance.notifyFailureDetector(gDigestList);
+
+ doSort(gDigestList);
+
+ List<GossipDigest> deltaGossipDigestList = new ArrayList<GossipDigest>();
+ Map<InetAddress, EndPointState> deltaEpStateMap = new HashMap<InetAddress, EndPointState>();
+ Gossiper.instance.examineGossiper(gDigestList, deltaGossipDigestList, deltaEpStateMap);
+
+ GossipDigestAckMessage gDigestAck = new GossipDigestAckMessage(deltaGossipDigestList, deltaEpStateMap);
+ Message gDigestAckMessage = Gossiper.instance.makeGossipDigestAckMessage(gDigestAck);
+ if (logger_.isTraceEnabled())
+ logger_.trace("Sending a GossipDigestAckMessage to " + from);
+ MessagingService.instance.sendOneWay(gDigestAckMessage, from);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /*
+ * First construct a map whose key is the endpoint in the GossipDigest and the value is the
+ * GossipDigest itself. Then build a list of version differences i.e difference between the
+ * version in the GossipDigest and the version in the local state for a given InetAddress.
+ * Sort this list. Now loop through the sorted list and retrieve the GossipDigest corresponding
+ * to the endpoint from the map that was initially constructed.
+ */
+ private void doSort(List<GossipDigest> gDigestList)
+ {
+ /* Construct a map of endpoint to GossipDigest. */
+ Map<InetAddress, GossipDigest> epToDigestMap = new HashMap<InetAddress, GossipDigest>();
+ for ( GossipDigest gDigest : gDigestList )
+ {
+ epToDigestMap.put(gDigest.getEndPoint(), gDigest);
+ }
+
+ /*
+ * These digests have their maxVersion set to the difference of the version
+ * of the local EndPointState and the version found in the GossipDigest.
+ */
+ List<GossipDigest> diffDigests = new ArrayList<GossipDigest>();
+ for ( GossipDigest gDigest : gDigestList )
+ {
+ InetAddress ep = gDigest.getEndPoint();
+ EndPointState epState = Gossiper.instance.getEndPointStateForEndPoint(ep);
+ int version = (epState != null) ? Gossiper.instance.getMaxEndPointStateVersion( epState ) : 0;
+ int diffVersion = Math.abs(version - gDigest.getMaxVersion() );
+ diffDigests.add( new GossipDigest(ep, gDigest.getGeneration(), diffVersion) );
+ }
+
+ gDigestList.clear();
+ Collections.sort(diffDigests);
+ int size = diffDigests.size();
+ /*
+ * Report the digests in descending order. This takes care of the endpoints
+ * that are far behind w.r.t this local endpoint
+ */
+ for ( int i = size - 1; i >= 0; --i )
+ {
+ gDigestList.add( epToDigestMap.get(diffDigests.get(i).getEndPoint()) );
+ }
+ }
+}
Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/Gossiper.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/Gossiper.java?rev=953373&r1=953372&r2=953373&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/Gossiper.java Thu Jun 10 16:18:20 2010
@@ -25,7 +25,6 @@ import java.net.InetAddress;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
@@ -51,6 +50,9 @@ public class Gossiper implements IFailur
{
try
{
+ //wait on messaging service to start listening
+ MessagingService.instance.waitUntilListening();
+
synchronized( Gossiper.instance )
{
/* Update the local heartbeat counter. */
@@ -483,7 +485,7 @@ public class Gossiper implements IFailur
}
/*
- * This method is called only from the JoinVerbHandler. This happens
+ * This method is called only from the GossiperJoinVerbHandler. This happens
* when a new node coming up multicasts the JoinMessage. Here we need
* to add the endPoint to the list of live endpoints.
*/
@@ -873,203 +875,4 @@ public class Gossiper implements IFailur
gossipTimer_.cancel();
gossipTimer_ = new Timer(false); // makes the Gossiper reentrant.
}
-
- public static class JoinVerbHandler implements IVerbHandler
- {
- private static Logger logger_ = Logger.getLogger( JoinVerbHandler.class);
-
- public void doVerb(Message message)
- {
- InetAddress from = message.getFrom();
- if (logger_.isDebugEnabled())
- logger_.debug("Received a JoinMessage from " + from);
-
- byte[] bytes = message.getMessageBody();
- DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
-
- JoinMessage joinMessage;
- try
- {
- joinMessage = JoinMessage.serializer().deserialize(dis);
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- if ( joinMessage.clusterId_.equals( DatabaseDescriptor.getClusterName() ) )
- {
- Gossiper.instance.join(from);
- }
- else
- {
- logger_.warn("ClusterName mismatch from " + from + " " + joinMessage.clusterId_ + "!=" + DatabaseDescriptor.getClusterName());
- }
- }
- }
-
- public static class GossipDigestSynVerbHandler implements IVerbHandler
- {
- private static Logger logger_ = Logger.getLogger( GossipDigestSynVerbHandler.class);
-
- public void doVerb(Message message)
- {
- InetAddress from = message.getFrom();
- if (logger_.isTraceEnabled())
- logger_.trace("Received a GossipDigestSynMessage from " + from);
-
- byte[] bytes = message.getMessageBody();
- DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
-
- try
- {
- GossipDigestSynMessage gDigestMessage = GossipDigestSynMessage.serializer().deserialize(dis);
- /* If the message is from a different cluster throw it away. */
- if ( !gDigestMessage.clusterId_.equals(DatabaseDescriptor.getClusterName()) )
- {
- logger_.warn("ClusterName mismatch from " + from + " " + gDigestMessage.clusterId_ + "!=" + DatabaseDescriptor.getClusterName());
- return;
- }
-
- List<GossipDigest> gDigestList = gDigestMessage.getGossipDigests();
- /* Notify the Failure Detector */
- Gossiper.instance.notifyFailureDetector(gDigestList);
-
- doSort(gDigestList);
-
- List<GossipDigest> deltaGossipDigestList = new ArrayList<GossipDigest>();
- Map<InetAddress, EndPointState> deltaEpStateMap = new HashMap<InetAddress, EndPointState>();
- Gossiper.instance.examineGossiper(gDigestList, deltaGossipDigestList, deltaEpStateMap);
-
- GossipDigestAckMessage gDigestAck = new GossipDigestAckMessage(deltaGossipDigestList, deltaEpStateMap);
- Message gDigestAckMessage = Gossiper.instance.makeGossipDigestAckMessage(gDigestAck);
- if (logger_.isTraceEnabled())
- logger_.trace("Sending a GossipDigestAckMessage to " + from);
- MessagingService.instance.sendOneWay(gDigestAckMessage, from);
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- /*
- * First construct a map whose key is the endpoint in the GossipDigest and the value is the
- * GossipDigest itself. Then build a list of version differences i.e difference between the
- * version in the GossipDigest and the version in the local state for a given InetAddress.
- * Sort this list. Now loop through the sorted list and retrieve the GossipDigest corresponding
- * to the endpoint from the map that was initially constructed.
- */
- private void doSort(List<GossipDigest> gDigestList)
- {
- /* Construct a map of endpoint to GossipDigest. */
- Map<InetAddress, GossipDigest> epToDigestMap = new HashMap<InetAddress, GossipDigest>();
- for ( GossipDigest gDigest : gDigestList )
- {
- epToDigestMap.put(gDigest.getEndPoint(), gDigest);
- }
-
- /*
- * These digests have their maxVersion set to the difference of the version
- * of the local EndPointState and the version found in the GossipDigest.
- */
- List<GossipDigest> diffDigests = new ArrayList<GossipDigest>();
- for ( GossipDigest gDigest : gDigestList )
- {
- InetAddress ep = gDigest.getEndPoint();
- EndPointState epState = Gossiper.instance.getEndPointStateForEndPoint(ep);
- int version = (epState != null) ? Gossiper.instance.getMaxEndPointStateVersion( epState ) : 0;
- int diffVersion = Math.abs(version - gDigest.getMaxVersion() );
- diffDigests.add( new GossipDigest(ep, gDigest.getGeneration(), diffVersion) );
- }
-
- gDigestList.clear();
- Collections.sort(diffDigests);
- int size = diffDigests.size();
- /*
- * Report the digests in descending order. This takes care of the endpoints
- * that are far behind w.r.t this local endpoint
- */
- for ( int i = size - 1; i >= 0; --i )
- {
- gDigestList.add( epToDigestMap.get(diffDigests.get(i).getEndPoint()) );
- }
- }
- }
-
- public static class GossipDigestAckVerbHandler implements IVerbHandler
- {
- private static Logger logger_ = Logger.getLogger(GossipDigestAckVerbHandler.class);
-
- public void doVerb(Message message)
- {
- InetAddress from = message.getFrom();
- if (logger_.isTraceEnabled())
- logger_.trace("Received a GossipDigestAckMessage from " + from);
-
- byte[] bytes = message.getMessageBody();
- DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
-
- try
- {
- GossipDigestAckMessage gDigestAckMessage = GossipDigestAckMessage.serializer().deserialize(dis);
- List<GossipDigest> gDigestList = gDigestAckMessage.getGossipDigestList();
- Map<InetAddress, EndPointState> epStateMap = gDigestAckMessage.getEndPointStateMap();
-
- if ( epStateMap.size() > 0 )
- {
- /* Notify the Failure Detector */
- Gossiper.instance.notifyFailureDetector(epStateMap);
- Gossiper.instance.applyStateLocally(epStateMap);
- }
-
- /* Get the state required to send to this gossipee - construct GossipDigestAck2Message */
- Map<InetAddress, EndPointState> deltaEpStateMap = new HashMap<InetAddress, EndPointState>();
- for( GossipDigest gDigest : gDigestList )
- {
- InetAddress addr = gDigest.getEndPoint();
- EndPointState localEpStatePtr = Gossiper.instance.getStateForVersionBiggerThan(addr, gDigest.getMaxVersion());
- if ( localEpStatePtr != null )
- deltaEpStateMap.put(addr, localEpStatePtr);
- }
-
- GossipDigestAck2Message gDigestAck2 = new GossipDigestAck2Message(deltaEpStateMap);
- Message gDigestAck2Message = Gossiper.instance.makeGossipDigestAck2Message(gDigestAck2);
- if (logger_.isTraceEnabled())
- logger_.trace("Sending a GossipDigestAck2Message to " + from);
- MessagingService.instance.sendOneWay(gDigestAck2Message, from);
- }
- catch ( IOException e )
- {
- throw new RuntimeException(e);
- }
- }
- }
-
- public static class GossipDigestAck2VerbHandler implements IVerbHandler
- {
- private static Logger logger_ = Logger.getLogger(GossipDigestAck2VerbHandler.class);
-
- public void doVerb(Message message)
- {
- InetAddress from = message.getFrom();
- if (logger_.isTraceEnabled())
- logger_.trace("Received a GossipDigestAck2Message from " + from);
-
- byte[] bytes = message.getMessageBody();
- DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
- GossipDigestAck2Message gDigestAck2Message;
- try
- {
- gDigestAck2Message = GossipDigestAck2Message.serializer().deserialize(dis);
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- Map<InetAddress, EndPointState> remoteEpStateMap = gDigestAck2Message.getEndPointStateMap();
- /* Notify the Failure Detector */
- Gossiper.instance.notifyFailureDetector(remoteEpStateMap);
- Gossiper.instance.applyStateLocally(remoteEpStateMap);
- }
- }
}
\ No newline at end of file
Added: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossiperJoinVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossiperJoinVerbHandler.java?rev=953373&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossiperJoinVerbHandler.java (added)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossiperJoinVerbHandler.java Thu Jun 10 16:18:20 2010
@@ -0,0 +1,44 @@
+package org.apache.cassandra.gms;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.log4j.Logger;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+
+public class GossiperJoinVerbHandler implements IVerbHandler
+{
+ private static Logger logger_ = Logger.getLogger( GossiperJoinVerbHandler.class);
+
+ public void doVerb(Message message)
+ {
+ InetAddress from = message.getFrom();
+ if (logger_.isDebugEnabled())
+ logger_.debug("Received a JoinMessage from " + from);
+
+ byte[] bytes = message.getMessageBody();
+ DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
+
+ JoinMessage joinMessage;
+ try
+ {
+ joinMessage = JoinMessage.serializer().deserialize(dis);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ if ( joinMessage.clusterId_.equals( DatabaseDescriptor.getClusterName() ) )
+ {
+ Gossiper.instance.join(from);
+ }
+ else
+ {
+ logger_.warn("ClusterName mismatch from " + from + " " + joinMessage.clusterId_ + "!=" + DatabaseDescriptor.getClusterName());
+ }
+ }
+}
Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java?rev=953373&r1=953372&r2=953373&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java Thu Jun 10 16:18:20 2010
@@ -18,29 +18,34 @@
package org.apache.cassandra.net;
-import org.apache.cassandra.concurrent.*;
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.gms.IFailureDetectionEventListener;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.net.io.SerializerType;
import org.apache.cassandra.net.sink.SinkManager;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.*;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
-
+import org.apache.cassandra.utils.ExpiringMap;
+import org.apache.cassandra.utils.GuidGenerator;
+import org.apache.cassandra.utils.SimpleCondition;
import org.apache.log4j.Logger;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
import java.io.IOError;
import java.io.IOException;
-import java.net.ServerSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.net.ServerSocket;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.ServerSocketChannel;
import java.security.MessageDigest;
-import java.util.*;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -72,8 +77,9 @@ public class MessagingService implements
private static Logger logger_ = Logger.getLogger(MessagingService.class);
public static final MessagingService instance = new MessagingService();
-
+
private SocketThread socketThread;
+ private SimpleCondition listenGate;
public Object clone() throws CloneNotSupportedException
{
@@ -82,7 +88,8 @@ public class MessagingService implements
}
protected MessagingService()
- {
+ {
+ listenGate = new SimpleCondition();
verbHandlers_ = new HashMap<StorageService.Verb, IVerbHandler>();
/*
* Leave callbacks in the cachetable long enough that any related messages will arrive
@@ -103,7 +110,7 @@ public class MessagingService implements
streamExecutor_ = new JMXEnabledThreadPoolExecutor("MESSAGE-STREAMING-POOL");
}
-
+
public byte[] hash(String type, byte data[])
{
byte result[];
@@ -138,6 +145,19 @@ public class MessagingService implements
ss.bind(new InetSocketAddress(localEp, DatabaseDescriptor.getStoragePort()));
socketThread = new SocketThread(ss, "ACCEPT-" + localEp);
socketThread.start();
+ listenGate.signalAll();
+ }
+
+ public void waitUntilListening()
+ {
+ try
+ {
+ listenGate.await();
+ }
+ catch (InterruptedException ie)
+ {
+ logger_.debug("await interrupted");
+ }
}
public static OutboundTcpConnectionPool getConnectionPool(InetAddress to)
Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java?rev=953373&r1=953372&r2=953373&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java Thu Jun 10 16:18:20 2010
@@ -222,10 +222,10 @@ public class StorageService implements I
MessagingService.instance.registerVerbHandlers(Verb.TREE_REQUEST, new TreeRequestVerbHandler());
MessagingService.instance.registerVerbHandlers(Verb.TREE_RESPONSE, new AntiEntropyService.TreeResponseVerbHandler());
- MessagingService.instance.registerVerbHandlers(Verb.JOIN, new Gossiper.JoinVerbHandler());
- MessagingService.instance.registerVerbHandlers(Verb.GOSSIP_DIGEST_SYN, new Gossiper.GossipDigestSynVerbHandler());
- MessagingService.instance.registerVerbHandlers(Verb.GOSSIP_DIGEST_ACK, new Gossiper.GossipDigestAckVerbHandler());
- MessagingService.instance.registerVerbHandlers(Verb.GOSSIP_DIGEST_ACK2, new Gossiper.GossipDigestAck2VerbHandler());
+ MessagingService.instance.registerVerbHandlers(Verb.JOIN, new GossiperJoinVerbHandler());
+ MessagingService.instance.registerVerbHandlers(Verb.GOSSIP_DIGEST_SYN, new GossipDigestSynVerbHandler());
+ MessagingService.instance.registerVerbHandlers(Verb.GOSSIP_DIGEST_ACK, new GossipDigestAckVerbHandler());
+ MessagingService.instance.registerVerbHandlers(Verb.GOSSIP_DIGEST_ACK2, new GossipDigestAck2VerbHandler());
replicationStrategies = new HashMap<String, AbstractReplicationStrategy>();
for (String table : DatabaseDescriptor.getNonSystemTables())
@@ -287,10 +287,10 @@ public class StorageService implements I
initialized = true;
isClientMode = true;
logger_.info("Starting up client gossip");
- MessagingService.instance.listen(FBUtilities.getLocalAddress());
+ setMode("Client", false);
Gossiper.instance.register(this);
Gossiper.instance.start(FBUtilities.getLocalAddress(), (int)(System.currentTimeMillis() / 1000)); // needed for node-ring gathering.
- setMode("Client", false);
+ MessagingService.instance.listen(FBUtilities.getLocalAddress());
}
public synchronized void initServer() throws IOException
@@ -326,16 +326,16 @@ public class StorageService implements I
logger_.info("Starting up server gossip");
- MessagingService.instance.listen(FBUtilities.getLocalAddress());
-
- StorageLoadBalancer.instance.startBroadcasting();
-
// have to start the gossip service before we can see any info on other nodes. this is necessary
// for bootstrap to get the load info it needs.
// (we won't be part of the storage ring though until we add a nodeId to our state, below.)
Gossiper.instance.register(this);
Gossiper.instance.start(FBUtilities.getLocalAddress(), storageMetadata_.getGeneration()); // needed for node-ring gathering.
+ MessagingService.instance.listen(FBUtilities.getLocalAddress());
+
+ StorageLoadBalancer.instance.startBroadcasting();
+
if (DatabaseDescriptor.isAutoBootstrap()
&& DatabaseDescriptor.getSeeds().contains(FBUtilities.getLocalAddress())
&& !SystemTable.isBootstrapped())