You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2010/07/22 22:32:28 UTC

svn commit: r966846 - in /cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms: EndPointState.java GossipDigestAckMessage.java GossipDigestSynMessage.java Gossiper.java

Author: brandonwilliams
Date: Thu Jul 22 20:32:28 2010
New Revision: 966846

URL: http://svn.apache.org/viewvc?rev=966846&view=rev
Log:
remove obsolete gossip size limit.  patch by Anthony Molinaro and jbellis for CASSANDRA-1138

Modified:
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/EndPointState.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/Gossiper.java

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/EndPointState.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/EndPointState.java?rev=966846&r1=966845&r2=966846&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/EndPointState.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/EndPointState.java Thu Jul 22 20:32:28 2010
@@ -160,9 +160,6 @@ class EndPointStateSerializer implements
     
     public void serialize(EndPointState epState, DataOutputStream dos) throws IOException
     {
-        /* These are for estimating whether we overshoot the MTU limit */
-        int estimate = 0;
-
         /* serialize the HeartBeatState */
         HeartBeatState hbState = epState.getHeartBeatState();
         HeartBeatState.serializer().serialize(hbState, dos);
@@ -170,26 +167,13 @@ class EndPointStateSerializer implements
         /* serialize the map of ApplicationState objects */
         int size = epState.applicationState_.size();
         dos.writeInt(size);
-        if ( size > 0 )
-        {   
-            Set<String> keys = epState.applicationState_.keySet();
-            for( String key : keys )
+        for (String key : epState.applicationState_.keySet())
+        {
+            ApplicationState appState = epState.applicationState_.get(key);
+            if (appState != null)
             {
-                if ( Gossiper.MAX_GOSSIP_PACKET_SIZE - dos.size() < estimate )
-                {
-                    logger_.info("@@@@ Breaking out to respect the MTU size in EndPointState serializer. Estimate is " + estimate + " @@@@");
-                    break;
-                }
-            
-                ApplicationState appState = epState.applicationState_.get(key);
-                if ( appState != null )
-                {
-                    int pre = dos.size();
-                    dos.writeUTF(key);
-                    ApplicationState.serializer().serialize(appState, dos);                    
-                    int post = dos.size();
-                    estimate = post - pre;
-                }                
+                dos.writeUTF(key);
+                ApplicationState.serializer().serialize(appState, dos);
             }
         }
     }

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java?rev=966846&r1=966845&r2=966846&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java Thu Jul 22 20:32:28 2010
@@ -75,26 +75,16 @@ class GossipDigestAckMessageSerializer i
 {
     public void serialize(GossipDigestAckMessage gDigestAckMessage, DataOutputStream dos) throws IOException
     {
-        /* Use the helper to serialize the GossipDigestList */
-        boolean bContinue = GossipDigestSerializationHelper.serialize(gDigestAckMessage.gDigestList_, dos);
-        dos.writeBoolean(bContinue);
-        /* Use the EndPointState */
-        if ( bContinue )
-        {
-            EndPointStatesSerializationHelper.serialize(gDigestAckMessage.epStateMap_, dos);            
-        }
+        GossipDigestSerializationHelper.serialize(gDigestAckMessage.gDigestList_, dos);
+        dos.writeBoolean(true); // 0.6 compatibility
+        EndPointStatesSerializationHelper.serialize(gDigestAckMessage.epStateMap_, dos);
     }
 
     public GossipDigestAckMessage deserialize(DataInputStream dis) throws IOException
     {
-        Map<InetAddress, EndPointState> epStateMap = new HashMap<InetAddress, EndPointState>();
-        List<GossipDigest> gDigestList = GossipDigestSerializationHelper.deserialize(dis);                
-        boolean bContinue = dis.readBoolean();
-
-        if ( bContinue )
-        {
-            epStateMap = EndPointStatesSerializationHelper.deserialize(dis);                                    
-        }
+        List<GossipDigest> gDigestList = GossipDigestSerializationHelper.deserialize(dis);
+        dis.readBoolean(); // 0.6 compatibility
+        Map<InetAddress, EndPointState> epStateMap = EndPointStatesSerializationHelper.deserialize(dis);
         return new GossipDigestAckMessage(gDigestList, epStateMap);
     }
 }

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java?rev=966846&r1=966845&r2=966846&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java Thu Jul 22 20:32:28 2010
@@ -67,44 +67,24 @@ class GossipDigestSerializationHelper
 {
     private static Logger logger_ = Logger.getLogger(GossipDigestSerializationHelper.class);
     
-    static boolean serialize(List<GossipDigest> gDigestList, DataOutputStream dos) throws IOException
+    static void serialize(List<GossipDigest> gDigestList, DataOutputStream dos) throws IOException
     {
-        boolean bVal = true;
-        int size = gDigestList.size();                        
-        dos.writeInt(size);
-        
-        int estimate = 0;            
+        dos.writeInt(gDigestList.size());
         for ( GossipDigest gDigest : gDigestList )
         {
-            if ( Gossiper.MAX_GOSSIP_PACKET_SIZE - dos.size() < estimate )
-            {
-                logger_.info("@@@@ Breaking out to respect the MTU size in GD @@@@");
-                bVal = false;
-                break;
-            }
-            int pre = dos.size();               
             GossipDigest.serializer().serialize( gDigest, dos );
-            int post = dos.size();
-            estimate = post - pre;
         }
-        return bVal;
     }
 
     static List<GossipDigest> deserialize(DataInputStream dis) throws IOException
     {
         int size = dis.readInt();            
-        List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
+        List<GossipDigest> gDigests = new ArrayList<GossipDigest>(size);
         
         for ( int i = 0; i < size; ++i )
         {
-            if ( dis.available() == 0 )
-            {
-                logger_.info("Remaining bytes zero. Stopping deserialization of GossipDigests.");
-                break;
-            }
-                            
-            GossipDigest gDigest = GossipDigest.serializer().deserialize(dis);                
-            gDigests.add( gDigest );                
+            assert dis.available() > 0;
+            gDigests.add(GossipDigest.serializer().deserialize(dis));                
         }        
         return gDigests;
     }
@@ -114,45 +94,25 @@ class EndPointStatesSerializationHelper
 {
     private static final Logger logger_ = Logger.getLogger(EndPointStatesSerializationHelper.class);
 
-    static boolean serialize(Map<InetAddress, EndPointState> epStateMap, DataOutputStream dos) throws IOException
+    static void serialize(Map<InetAddress, EndPointState> epStateMap, DataOutputStream dos) throws IOException
     {
-        boolean bVal = true;
-        int estimate = 0;                
-        int size = epStateMap.size();
-        dos.writeInt(size);
-
+        dos.writeInt(epStateMap.size());
         for (Entry<InetAddress, EndPointState> entry : epStateMap.entrySet())
         {
             InetAddress ep = entry.getKey();
-            if ( Gossiper.MAX_GOSSIP_PACKET_SIZE - dos.size() < estimate )
-            {
-                logger_.info("@@@@ Breaking out to respect the MTU size in EPS. Estimate is " + estimate + " @@@@");
-                bVal = false;
-                break;
-            }
-    
-            int pre = dos.size();
             CompactEndPointSerializationHelper.serialize(ep, dos);
             EndPointState.serializer().serialize(entry.getValue(), dos);
-            int post = dos.size();
-            estimate = post - pre;
         }
-        return bVal;
     }
 
     static Map<InetAddress, EndPointState> deserialize(DataInputStream dis) throws IOException
     {
         int size = dis.readInt();            
-        Map<InetAddress, EndPointState> epStateMap = new HashMap<InetAddress, EndPointState>();
+        Map<InetAddress, EndPointState> epStateMap = new HashMap<InetAddress, EndPointState>(size);
         
         for ( int i = 0; i < size; ++i )
         {
-            if ( dis.available() == 0 )
-            {
-                logger_.info("Remaining bytes zero. Stopping deserialization in EndPointState.");
-                break;
-            }
-            // int length = dis.readInt();            
+            assert dis.available() > 0;
             InetAddress ep = CompactEndPointSerializationHelper.deserialize(dis);
             EndPointState epState = EndPointState.serializer().deserialize(dis);            
             epStateMap.put(ep, epState);

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/Gossiper.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/Gossiper.java?rev=966846&r1=966845&r2=966846&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/Gossiper.java Thu Jul 22 20:32:28 2010
@@ -101,7 +101,6 @@ public class Gossiper implements IFailur
         }
     }
 
-    final static int MAX_GOSSIP_PACKET_SIZE = 1428;
     public final static int intervalInMillis_ = 1000;
     private static Logger logger_ = Logger.getLogger(Gossiper.class);
     public static final Gossiper instance = new Gossiper();
@@ -280,7 +279,7 @@ public class Gossiper implements IFailur
     Message makeGossipDigestSynMessage(List<GossipDigest> gDigests) throws IOException
     {
         GossipDigestSynMessage gDigestMessage = new GossipDigestSynMessage(DatabaseDescriptor.getClusterName(), gDigests);
-        ByteArrayOutputStream bos = new ByteArrayOutputStream(Gossiper.MAX_GOSSIP_PACKET_SIZE);
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream( bos );
         GossipDigestSynMessage.serializer().serialize(gDigestMessage, dos);
         return new Message(localEndPoint_, StageManager.GOSSIP_STAGE, StorageService.Verb.GOSSIP_DIGEST_SYN, bos.toByteArray());
@@ -288,7 +287,7 @@ public class Gossiper implements IFailur
 
     Message makeGossipDigestAckMessage(GossipDigestAckMessage gDigestAckMessage) throws IOException
     {
-        ByteArrayOutputStream bos = new ByteArrayOutputStream(Gossiper.MAX_GOSSIP_PACKET_SIZE);
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream(bos);
         GossipDigestAckMessage.serializer().serialize(gDigestAckMessage, dos);
         if (logger_.isTraceEnabled())
@@ -298,7 +297,7 @@ public class Gossiper implements IFailur
 
     Message makeGossipDigestAck2Message(GossipDigestAck2Message gDigestAck2Message) throws IOException
     {
-        ByteArrayOutputStream bos = new ByteArrayOutputStream(Gossiper.MAX_GOSSIP_PACKET_SIZE);
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream(bos);
         GossipDigestAck2Message.serializer().serialize(gDigestAck2Message, dos);
         return new Message(localEndPoint_, StageManager.GOSSIP_STAGE, StorageService.Verb.GOSSIP_DIGEST_ACK2, bos.toByteArray());