You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2010/07/22 22:32:28 UTC
svn commit: r966846 - in
/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms:
EndPointState.java GossipDigestAckMessage.java GossipDigestSynMessage.java
Gossiper.java
Author: brandonwilliams
Date: Thu Jul 22 20:32:28 2010
New Revision: 966846
URL: http://svn.apache.org/viewvc?rev=966846&view=rev
Log:
remove obsolete gossip size limit. patch by Anthony Molinaro and jbellis for CASSANDRA-1138
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/EndPointState.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/Gossiper.java
Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/EndPointState.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/EndPointState.java?rev=966846&r1=966845&r2=966846&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/EndPointState.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/EndPointState.java Thu Jul 22 20:32:28 2010
@@ -160,9 +160,6 @@ class EndPointStateSerializer implements
public void serialize(EndPointState epState, DataOutputStream dos) throws IOException
{
- /* These are for estimating whether we overshoot the MTU limit */
- int estimate = 0;
-
/* serialize the HeartBeatState */
HeartBeatState hbState = epState.getHeartBeatState();
HeartBeatState.serializer().serialize(hbState, dos);
@@ -170,26 +167,13 @@ class EndPointStateSerializer implements
/* serialize the map of ApplicationState objects */
int size = epState.applicationState_.size();
dos.writeInt(size);
- if ( size > 0 )
- {
- Set<String> keys = epState.applicationState_.keySet();
- for( String key : keys )
+ for (String key : epState.applicationState_.keySet())
+ {
+ ApplicationState appState = epState.applicationState_.get(key);
+ if (appState != null)
{
- if ( Gossiper.MAX_GOSSIP_PACKET_SIZE - dos.size() < estimate )
- {
- logger_.info("@@@@ Breaking out to respect the MTU size in EndPointState serializer. Estimate is " + estimate + " @@@@");
- break;
- }
-
- ApplicationState appState = epState.applicationState_.get(key);
- if ( appState != null )
- {
- int pre = dos.size();
- dos.writeUTF(key);
- ApplicationState.serializer().serialize(appState, dos);
- int post = dos.size();
- estimate = post - pre;
- }
+ dos.writeUTF(key);
+ ApplicationState.serializer().serialize(appState, dos);
}
}
}
Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java?rev=966846&r1=966845&r2=966846&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java Thu Jul 22 20:32:28 2010
@@ -75,26 +75,16 @@ class GossipDigestAckMessageSerializer i
{
public void serialize(GossipDigestAckMessage gDigestAckMessage, DataOutputStream dos) throws IOException
{
- /* Use the helper to serialize the GossipDigestList */
- boolean bContinue = GossipDigestSerializationHelper.serialize(gDigestAckMessage.gDigestList_, dos);
- dos.writeBoolean(bContinue);
- /* Use the EndPointState */
- if ( bContinue )
- {
- EndPointStatesSerializationHelper.serialize(gDigestAckMessage.epStateMap_, dos);
- }
+ GossipDigestSerializationHelper.serialize(gDigestAckMessage.gDigestList_, dos);
+ dos.writeBoolean(true); // 0.6 compatibility
+ EndPointStatesSerializationHelper.serialize(gDigestAckMessage.epStateMap_, dos);
}
public GossipDigestAckMessage deserialize(DataInputStream dis) throws IOException
{
- Map<InetAddress, EndPointState> epStateMap = new HashMap<InetAddress, EndPointState>();
- List<GossipDigest> gDigestList = GossipDigestSerializationHelper.deserialize(dis);
- boolean bContinue = dis.readBoolean();
-
- if ( bContinue )
- {
- epStateMap = EndPointStatesSerializationHelper.deserialize(dis);
- }
+ List<GossipDigest> gDigestList = GossipDigestSerializationHelper.deserialize(dis);
+ dis.readBoolean(); // 0.6 compatibility
+ Map<InetAddress, EndPointState> epStateMap = EndPointStatesSerializationHelper.deserialize(dis);
return new GossipDigestAckMessage(gDigestList, epStateMap);
}
}
Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java?rev=966846&r1=966845&r2=966846&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java Thu Jul 22 20:32:28 2010
@@ -67,44 +67,24 @@ class GossipDigestSerializationHelper
{
private static Logger logger_ = Logger.getLogger(GossipDigestSerializationHelper.class);
- static boolean serialize(List<GossipDigest> gDigestList, DataOutputStream dos) throws IOException
+ static void serialize(List<GossipDigest> gDigestList, DataOutputStream dos) throws IOException
{
- boolean bVal = true;
- int size = gDigestList.size();
- dos.writeInt(size);
-
- int estimate = 0;
+ dos.writeInt(gDigestList.size());
for ( GossipDigest gDigest : gDigestList )
{
- if ( Gossiper.MAX_GOSSIP_PACKET_SIZE - dos.size() < estimate )
- {
- logger_.info("@@@@ Breaking out to respect the MTU size in GD @@@@");
- bVal = false;
- break;
- }
- int pre = dos.size();
GossipDigest.serializer().serialize( gDigest, dos );
- int post = dos.size();
- estimate = post - pre;
}
- return bVal;
}
static List<GossipDigest> deserialize(DataInputStream dis) throws IOException
{
int size = dis.readInt();
- List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
+ List<GossipDigest> gDigests = new ArrayList<GossipDigest>(size);
for ( int i = 0; i < size; ++i )
{
- if ( dis.available() == 0 )
- {
- logger_.info("Remaining bytes zero. Stopping deserialization of GossipDigests.");
- break;
- }
-
- GossipDigest gDigest = GossipDigest.serializer().deserialize(dis);
- gDigests.add( gDigest );
+ assert dis.available() > 0;
+ gDigests.add(GossipDigest.serializer().deserialize(dis));
}
return gDigests;
}
@@ -114,45 +94,25 @@ class EndPointStatesSerializationHelper
{
private static final Logger logger_ = Logger.getLogger(EndPointStatesSerializationHelper.class);
- static boolean serialize(Map<InetAddress, EndPointState> epStateMap, DataOutputStream dos) throws IOException
+ static void serialize(Map<InetAddress, EndPointState> epStateMap, DataOutputStream dos) throws IOException
{
- boolean bVal = true;
- int estimate = 0;
- int size = epStateMap.size();
- dos.writeInt(size);
-
+ dos.writeInt(epStateMap.size());
for (Entry<InetAddress, EndPointState> entry : epStateMap.entrySet())
{
InetAddress ep = entry.getKey();
- if ( Gossiper.MAX_GOSSIP_PACKET_SIZE - dos.size() < estimate )
- {
- logger_.info("@@@@ Breaking out to respect the MTU size in EPS. Estimate is " + estimate + " @@@@");
- bVal = false;
- break;
- }
-
- int pre = dos.size();
CompactEndPointSerializationHelper.serialize(ep, dos);
EndPointState.serializer().serialize(entry.getValue(), dos);
- int post = dos.size();
- estimate = post - pre;
}
- return bVal;
}
static Map<InetAddress, EndPointState> deserialize(DataInputStream dis) throws IOException
{
int size = dis.readInt();
- Map<InetAddress, EndPointState> epStateMap = new HashMap<InetAddress, EndPointState>();
+ Map<InetAddress, EndPointState> epStateMap = new HashMap<InetAddress, EndPointState>(size);
for ( int i = 0; i < size; ++i )
{
- if ( dis.available() == 0 )
- {
- logger_.info("Remaining bytes zero. Stopping deserialization in EndPointState.");
- break;
- }
- // int length = dis.readInt();
+ assert dis.available() > 0;
InetAddress ep = CompactEndPointSerializationHelper.deserialize(dis);
EndPointState epState = EndPointState.serializer().deserialize(dis);
epStateMap.put(ep, epState);
Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/Gossiper.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/Gossiper.java?rev=966846&r1=966845&r2=966846&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/Gossiper.java Thu Jul 22 20:32:28 2010
@@ -101,7 +101,6 @@ public class Gossiper implements IFailur
}
}
- final static int MAX_GOSSIP_PACKET_SIZE = 1428;
public final static int intervalInMillis_ = 1000;
private static Logger logger_ = Logger.getLogger(Gossiper.class);
public static final Gossiper instance = new Gossiper();
@@ -280,7 +279,7 @@ public class Gossiper implements IFailur
Message makeGossipDigestSynMessage(List<GossipDigest> gDigests) throws IOException
{
GossipDigestSynMessage gDigestMessage = new GossipDigestSynMessage(DatabaseDescriptor.getClusterName(), gDigests);
- ByteArrayOutputStream bos = new ByteArrayOutputStream(Gossiper.MAX_GOSSIP_PACKET_SIZE);
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream( bos );
GossipDigestSynMessage.serializer().serialize(gDigestMessage, dos);
return new Message(localEndPoint_, StageManager.GOSSIP_STAGE, StorageService.Verb.GOSSIP_DIGEST_SYN, bos.toByteArray());
@@ -288,7 +287,7 @@ public class Gossiper implements IFailur
Message makeGossipDigestAckMessage(GossipDigestAckMessage gDigestAckMessage) throws IOException
{
- ByteArrayOutputStream bos = new ByteArrayOutputStream(Gossiper.MAX_GOSSIP_PACKET_SIZE);
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
GossipDigestAckMessage.serializer().serialize(gDigestAckMessage, dos);
if (logger_.isTraceEnabled())
@@ -298,7 +297,7 @@ public class Gossiper implements IFailur
Message makeGossipDigestAck2Message(GossipDigestAck2Message gDigestAck2Message) throws IOException
{
- ByteArrayOutputStream bos = new ByteArrayOutputStream(Gossiper.MAX_GOSSIP_PACKET_SIZE);
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
GossipDigestAck2Message.serializer().serialize(gDigestAck2Message, dos);
return new Message(localEndPoint_, StageManager.GOSSIP_STAGE, StorageService.Verb.GOSSIP_DIGEST_ACK2, bos.toByteArray());