You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/07/30 17:30:27 UTC
svn commit: r799331 [13/29] - in /incubator/cassandra/trunk: ./
src/java/org/apache/cassandra/concurrent/
src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/
src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/gms/ src/...
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java Thu Jul 30 15:30:21 2009
@@ -1,319 +1,319 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.gms;
-
-import java.io.FileOutputStream;
-import java.lang.management.ManagementFactory;
-import java.net.UnknownHostException;
-import java.util.*;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-
-import org.apache.commons.lang.StringUtils;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.LogUtil;
-import org.apache.cassandra.utils.BoundedStatsDeque;
-import org.apache.log4j.Logger;
-
-/**
- * This FailureDetector is an implementation of the paper titled
- * "The Phi Accrual Failure Detector" by Hayashibara.
- * Check the paper and the <i>IFailureDetector</i> interface for details.
- *
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-public class FailureDetector implements IFailureDetector, FailureDetectorMBean
-{
- private static Logger logger_ = Logger.getLogger(FailureDetector.class);
- private static final int sampleSize_ = 1000;
- private static final int phiSuspectThreshold_ = 5;
- private static final int phiConvictThreshold_ = 8;
- /* The Failure Detector has to have been up for at least 1 min. */
- private static final long uptimeThreshold_ = 60000;
- private static IFailureDetector failureDetector_;
- /* Used to lock the factory for creation of FailureDetector instance */
- private static Lock createLock_ = new ReentrantLock();
- /* The time when the module was instantiated. */
- private static long creationTime_;
-
- public static IFailureDetector instance()
- {
- if ( failureDetector_ == null )
- {
- FailureDetector.createLock_.lock();
- try
- {
- if ( failureDetector_ == null )
- {
- failureDetector_ = new FailureDetector();
- }
- }
- finally
- {
- createLock_.unlock();
- }
- }
- return failureDetector_;
- }
-
- private Map<EndPoint, ArrivalWindow> arrivalSamples_ = new Hashtable<EndPoint, ArrivalWindow>();
- private List<IFailureDetectionEventListener> fdEvntListeners_ = new ArrayList<IFailureDetectionEventListener>();
-
- public FailureDetector()
- {
- creationTime_ = System.currentTimeMillis();
- // Register this instance with JMX
- try
- {
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- mbs.registerMBean(this, new ObjectName("org.apache.cassandra.gms:type=FailureDetector"));
- }
- catch (Exception e)
- {
- logger_.error(LogUtil.throwableToString(e));
- }
- }
-
- /**
- * Dump the inter arrival times for examination if necessary.
- */
- public void dumpInterArrivalTimes()
- {
- try
- {
- FileOutputStream fos = new FileOutputStream("/var/tmp/output-" + System.currentTimeMillis() + ".dat", true);
- fos.write(toString().getBytes());
- fos.close();
- }
- catch(Throwable th)
- {
- logger_.warn(LogUtil.throwableToString(th));
- }
- }
-
- /**
- * We dump the arrival window for any endpoint only if the
- * local Failure Detector module has been up for more than a
- * minute.
- *
- * @param ep for which the arrival window needs to be dumped.
- */
- private void dumpInterArrivalTimes(EndPoint ep)
- {
- long now = System.currentTimeMillis();
- if ( (now - FailureDetector.creationTime_) <= FailureDetector.uptimeThreshold_ )
- return;
- try
- {
- FileOutputStream fos = new FileOutputStream("/var/tmp/output-" + System.currentTimeMillis() + "-" + ep + ".dat", true);
- ArrivalWindow hWnd = arrivalSamples_.get(ep);
- fos.write(hWnd.toString().getBytes());
- fos.close();
- }
- catch(Throwable th)
- {
- logger_.warn(LogUtil.throwableToString(th));
- }
- }
-
- public boolean isAlive(EndPoint ep)
- {
- try
- {
- /* If the endpoint in question is the local endpoint return true. */
- String localHost = FBUtilities.getHostAddress();
- if ( localHost.equals( ep.getHost() ) )
- return true;
- }
- catch( UnknownHostException ex )
- {
- logger_.info( LogUtil.throwableToString(ex) );
- }
- /* Incoming port is assumed to be the Storage port. We need to change it to the control port */
- EndPoint ep2 = new EndPoint(ep.getHost(), DatabaseDescriptor.getControlPort());
- EndPointState epState = Gossiper.instance().getEndPointStateForEndPoint(ep2);
- return epState.isAlive();
- }
-
- public void report(EndPoint ep)
- {
- if (logger_.isTraceEnabled())
- logger_.trace("reporting " + ep);
- long now = System.currentTimeMillis();
- ArrivalWindow heartbeatWindow = arrivalSamples_.get(ep);
- if ( heartbeatWindow == null )
- {
- heartbeatWindow = new ArrivalWindow(sampleSize_);
- arrivalSamples_.put(ep, heartbeatWindow);
- }
- heartbeatWindow.add(now);
- }
-
- public void interpret(EndPoint ep)
- {
- ArrivalWindow hbWnd = arrivalSamples_.get(ep);
- if ( hbWnd == null )
- {
- return;
- }
- long now = System.currentTimeMillis();
- /* We need this so that we do not suspect a convict. */
- boolean isConvicted = false;
- double phi = hbWnd.phi(now);
- if (logger_.isTraceEnabled())
- logger_.trace("PHI for " + ep + " : " + phi);
-
- /*
- if ( phi > phiConvictThreshold_ )
- {
- isConvicted = true;
- for ( IFailureDetectionEventListener listener : fdEvntListeners_ )
- {
- listener.convict(ep);
- }
- }
- */
- if ( !isConvicted && phi > phiSuspectThreshold_ )
- {
- for ( IFailureDetectionEventListener listener : fdEvntListeners_ )
- {
- listener.suspect(ep);
- }
- }
- }
-
- public void registerFailureDetectionEventListener(IFailureDetectionEventListener listener)
- {
- fdEvntListeners_.add(listener);
- }
-
- public void unregisterFailureDetectionEventListener(IFailureDetectionEventListener listener)
- {
- fdEvntListeners_.remove(listener);
- }
-
- public String toString()
- {
- StringBuilder sb = new StringBuilder();
- Set<EndPoint> eps = arrivalSamples_.keySet();
-
- sb.append("-----------------------------------------------------------------------");
- for ( EndPoint ep : eps )
- {
- ArrivalWindow hWnd = arrivalSamples_.get(ep);
- sb.append(ep + " : ");
- sb.append(hWnd.toString());
- sb.append( System.getProperty("line.separator") );
- }
- sb.append("-----------------------------------------------------------------------");
- return sb.toString();
- }
-
- public static void main(String[] args) throws Throwable
- {
- }
-}
-
-class ArrivalWindow
-{
- private static Logger logger_ = Logger.getLogger(ArrivalWindow.class);
- private double tLast_ = 0L;
- private BoundedStatsDeque arrivalIntervals_;
-
- ArrivalWindow(int size)
- {
- arrivalIntervals_ = new BoundedStatsDeque(size);
- }
-
- synchronized void add(double value)
- {
- double interArrivalTime;
- if ( tLast_ > 0L )
- {
- interArrivalTime = (value - tLast_);
- }
- else
- {
- interArrivalTime = Gossiper.intervalInMillis_ / 2;
- }
- tLast_ = value;
- arrivalIntervals_.add(interArrivalTime);
- }
-
- synchronized double sum()
- {
- return arrivalIntervals_.sum();
- }
-
- synchronized double sumOfDeviations()
- {
- return arrivalIntervals_.sumOfDeviations();
- }
-
- synchronized double mean()
- {
- return arrivalIntervals_.mean();
- }
-
- synchronized double variance()
- {
- return arrivalIntervals_.variance();
- }
-
- double stdev()
- {
- return arrivalIntervals_.stdev();
- }
-
- void clear()
- {
- arrivalIntervals_.clear();
- }
-
- double p(double t)
- {
- double mean = mean();
- double exponent = (-1)*(t)/mean;
- return 1 - ( 1 - Math.pow(Math.E, exponent) );
- }
-
- double phi(long tnow)
- {
- int size = arrivalIntervals_.size();
- double log = 0d;
- if ( size > 0 )
- {
- double t = tnow - tLast_;
- double probability = p(t);
- log = (-1) * Math.log10( probability );
- }
- return log;
- }
-
- public String toString()
- {
- return StringUtils.join(arrivalIntervals_.iterator(), " ");
- }
-}
-
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import java.io.FileOutputStream;
+import java.lang.management.ManagementFactory;
+import java.net.UnknownHostException;
+import java.util.*;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.commons.lang.StringUtils;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.cassandra.utils.BoundedStatsDeque;
+import org.apache.log4j.Logger;
+
+/**
+ * This FailureDetector is an implementation of the paper titled
+ * "The Phi Accrual Failure Detector" by Hayashibara.
+ * Check the paper and the <i>IFailureDetector</i> interface for details.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+public class FailureDetector implements IFailureDetector, FailureDetectorMBean
+{
+ private static Logger logger_ = Logger.getLogger(FailureDetector.class);
+ private static final int sampleSize_ = 1000;
+ private static final int phiSuspectThreshold_ = 5;
+ private static final int phiConvictThreshold_ = 8;
+ /* The Failure Detector has to have been up for at least 1 min. */
+ private static final long uptimeThreshold_ = 60000;
+ private static IFailureDetector failureDetector_;
+ /* Used to lock the factory for creation of FailureDetector instance */
+ private static Lock createLock_ = new ReentrantLock();
+ /* The time when the module was instantiated. */
+ private static long creationTime_;
+
+ public static IFailureDetector instance()
+ {
+ if ( failureDetector_ == null )
+ {
+ FailureDetector.createLock_.lock();
+ try
+ {
+ if ( failureDetector_ == null )
+ {
+ failureDetector_ = new FailureDetector();
+ }
+ }
+ finally
+ {
+ createLock_.unlock();
+ }
+ }
+ return failureDetector_;
+ }
+
+ private Map<EndPoint, ArrivalWindow> arrivalSamples_ = new Hashtable<EndPoint, ArrivalWindow>();
+ private List<IFailureDetectionEventListener> fdEvntListeners_ = new ArrayList<IFailureDetectionEventListener>();
+
+ public FailureDetector()
+ {
+ creationTime_ = System.currentTimeMillis();
+ // Register this instance with JMX
+ try
+ {
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ mbs.registerMBean(this, new ObjectName("org.apache.cassandra.gms:type=FailureDetector"));
+ }
+ catch (Exception e)
+ {
+ logger_.error(LogUtil.throwableToString(e));
+ }
+ }
+
+ /**
+ * Dump the inter arrival times for examination if necessary.
+ */
+ public void dumpInterArrivalTimes()
+ {
+ try
+ {
+ FileOutputStream fos = new FileOutputStream("/var/tmp/output-" + System.currentTimeMillis() + ".dat", true);
+ fos.write(toString().getBytes());
+ fos.close();
+ }
+ catch(Throwable th)
+ {
+ logger_.warn(LogUtil.throwableToString(th));
+ }
+ }
+
+ /**
+ * We dump the arrival window for any endpoint only if the
+ * local Failure Detector module has been up for more than a
+ * minute.
+ *
+ * @param ep for which the arrival window needs to be dumped.
+ */
+ private void dumpInterArrivalTimes(EndPoint ep)
+ {
+ long now = System.currentTimeMillis();
+ if ( (now - FailureDetector.creationTime_) <= FailureDetector.uptimeThreshold_ )
+ return;
+ try
+ {
+ FileOutputStream fos = new FileOutputStream("/var/tmp/output-" + System.currentTimeMillis() + "-" + ep + ".dat", true);
+ ArrivalWindow hWnd = arrivalSamples_.get(ep);
+ fos.write(hWnd.toString().getBytes());
+ fos.close();
+ }
+ catch(Throwable th)
+ {
+ logger_.warn(LogUtil.throwableToString(th));
+ }
+ }
+
+ public boolean isAlive(EndPoint ep)
+ {
+ try
+ {
+ /* If the endpoint in question is the local endpoint return true. */
+ String localHost = FBUtilities.getHostAddress();
+ if ( localHost.equals( ep.getHost() ) )
+ return true;
+ }
+ catch( UnknownHostException ex )
+ {
+ logger_.info( LogUtil.throwableToString(ex) );
+ }
+ /* Incoming port is assumed to be the Storage port. We need to change it to the control port */
+ EndPoint ep2 = new EndPoint(ep.getHost(), DatabaseDescriptor.getControlPort());
+ EndPointState epState = Gossiper.instance().getEndPointStateForEndPoint(ep2);
+ return epState.isAlive();
+ }
+
+ public void report(EndPoint ep)
+ {
+ if (logger_.isTraceEnabled())
+ logger_.trace("reporting " + ep);
+ long now = System.currentTimeMillis();
+ ArrivalWindow heartbeatWindow = arrivalSamples_.get(ep);
+ if ( heartbeatWindow == null )
+ {
+ heartbeatWindow = new ArrivalWindow(sampleSize_);
+ arrivalSamples_.put(ep, heartbeatWindow);
+ }
+ heartbeatWindow.add(now);
+ }
+
+ public void interpret(EndPoint ep)
+ {
+ ArrivalWindow hbWnd = arrivalSamples_.get(ep);
+ if ( hbWnd == null )
+ {
+ return;
+ }
+ long now = System.currentTimeMillis();
+ /* We need this so that we do not suspect a convict. */
+ boolean isConvicted = false;
+ double phi = hbWnd.phi(now);
+ if (logger_.isTraceEnabled())
+ logger_.trace("PHI for " + ep + " : " + phi);
+
+ /*
+ if ( phi > phiConvictThreshold_ )
+ {
+ isConvicted = true;
+ for ( IFailureDetectionEventListener listener : fdEvntListeners_ )
+ {
+ listener.convict(ep);
+ }
+ }
+ */
+ if ( !isConvicted && phi > phiSuspectThreshold_ )
+ {
+ for ( IFailureDetectionEventListener listener : fdEvntListeners_ )
+ {
+ listener.suspect(ep);
+ }
+ }
+ }
+
+ public void registerFailureDetectionEventListener(IFailureDetectionEventListener listener)
+ {
+ fdEvntListeners_.add(listener);
+ }
+
+ public void unregisterFailureDetectionEventListener(IFailureDetectionEventListener listener)
+ {
+ fdEvntListeners_.remove(listener);
+ }
+
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder();
+ Set<EndPoint> eps = arrivalSamples_.keySet();
+
+ sb.append("-----------------------------------------------------------------------");
+ for ( EndPoint ep : eps )
+ {
+ ArrivalWindow hWnd = arrivalSamples_.get(ep);
+ sb.append(ep + " : ");
+ sb.append(hWnd.toString());
+ sb.append( System.getProperty("line.separator") );
+ }
+ sb.append("-----------------------------------------------------------------------");
+ return sb.toString();
+ }
+
+ public static void main(String[] args) throws Throwable
+ {
+ }
+}
+
+class ArrivalWindow
+{
+ private static Logger logger_ = Logger.getLogger(ArrivalWindow.class);
+ private double tLast_ = 0L;
+ private BoundedStatsDeque arrivalIntervals_;
+
+ ArrivalWindow(int size)
+ {
+ arrivalIntervals_ = new BoundedStatsDeque(size);
+ }
+
+ synchronized void add(double value)
+ {
+ double interArrivalTime;
+ if ( tLast_ > 0L )
+ {
+ interArrivalTime = (value - tLast_);
+ }
+ else
+ {
+ interArrivalTime = Gossiper.intervalInMillis_ / 2;
+ }
+ tLast_ = value;
+ arrivalIntervals_.add(interArrivalTime);
+ }
+
+ synchronized double sum()
+ {
+ return arrivalIntervals_.sum();
+ }
+
+ synchronized double sumOfDeviations()
+ {
+ return arrivalIntervals_.sumOfDeviations();
+ }
+
+ synchronized double mean()
+ {
+ return arrivalIntervals_.mean();
+ }
+
+ synchronized double variance()
+ {
+ return arrivalIntervals_.variance();
+ }
+
+ double stdev()
+ {
+ return arrivalIntervals_.stdev();
+ }
+
+ void clear()
+ {
+ arrivalIntervals_.clear();
+ }
+
+ double p(double t)
+ {
+ double mean = mean();
+ double exponent = (-1)*(t)/mean;
+ return 1 - ( 1 - Math.pow(Math.E, exponent) );
+ }
+
+ double phi(long tnow)
+ {
+ int size = arrivalIntervals_.size();
+ double log = 0d;
+ if ( size > 0 )
+ {
+ double t = tnow - tLast_;
+ double probability = p(t);
+ log = (-1) * Math.log10( probability );
+ }
+ return log;
+ }
+
+ public String toString()
+ {
+ return StringUtils.join(arrivalIntervals_.iterator(), " ");
+ }
+}
+
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetectorMBean.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetectorMBean.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetectorMBean.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetectorMBean.java Thu Jul 30 15:30:21 2009
@@ -1,24 +1,24 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.gms;
-
-public interface FailureDetectorMBean
-{
- public void dumpInterArrivalTimes();
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+public interface FailureDetectorMBean
+{
+ public void dumpInterArrivalTimes();
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigest.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigest.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigest.java Thu Jul 30 15:30:21 2009
@@ -1,110 +1,110 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.gms;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.net.CompactEndPointSerializationHelper;
-import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.net.*;
-
-/**
- * Contains information about a specified list of EndPoints and the largest version
- * of the state they have generated as known by the local endpoint.
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class GossipDigest implements Comparable<GossipDigest>
-{
- private static ICompactSerializer<GossipDigest> serializer_;
- static
- {
- serializer_ = new GossipDigestSerializer();
- }
-
- EndPoint endPoint_;
- int generation_;
- int maxVersion_;
-
- public static ICompactSerializer<GossipDigest> serializer()
- {
- return serializer_;
- }
-
- GossipDigest(EndPoint endPoint, int generation, int maxVersion)
- {
- endPoint_ = endPoint;
- generation_ = generation;
- maxVersion_ = maxVersion;
- }
-
- EndPoint getEndPoint()
- {
- return endPoint_;
- }
-
- int getGeneration()
- {
- return generation_;
- }
-
- int getMaxVersion()
- {
- return maxVersion_;
- }
-
- public int compareTo(GossipDigest gDigest)
- {
- if ( generation_ != gDigest.generation_ )
- return ( generation_ - gDigest.generation_ );
- return (maxVersion_ - gDigest.maxVersion_);
- }
-
- public String toString()
- {
- StringBuilder sb = new StringBuilder();
- sb.append(endPoint_);
- sb.append(":");
- sb.append(generation_);
- sb.append(":");
- sb.append(maxVersion_);
- return sb.toString();
- }
-}
-
-class GossipDigestSerializer implements ICompactSerializer<GossipDigest>
-{
- public void serialize(GossipDigest gDigest, DataOutputStream dos) throws IOException
- {
- CompactEndPointSerializationHelper.serialize(gDigest.endPoint_, dos);
- dos.writeInt(gDigest.generation_);
- dos.writeInt(gDigest.maxVersion_);
- }
-
- public GossipDigest deserialize(DataInputStream dis) throws IOException
- {
- EndPoint endPoint = CompactEndPointSerializationHelper.deserialize(dis);
- int generation = dis.readInt();
- int version = dis.readInt();
- return new GossipDigest(endPoint, generation, version);
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.CompactEndPointSerializationHelper;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.*;
+
+/**
+ * Contains information about a specified list of EndPoints and the largest version
+ * of the state they have generated as known by the local endpoint.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class GossipDigest implements Comparable<GossipDigest>
+{
+ private static ICompactSerializer<GossipDigest> serializer_;
+ static
+ {
+ serializer_ = new GossipDigestSerializer();
+ }
+
+ EndPoint endPoint_;
+ int generation_;
+ int maxVersion_;
+
+ public static ICompactSerializer<GossipDigest> serializer()
+ {
+ return serializer_;
+ }
+
+ GossipDigest(EndPoint endPoint, int generation, int maxVersion)
+ {
+ endPoint_ = endPoint;
+ generation_ = generation;
+ maxVersion_ = maxVersion;
+ }
+
+ EndPoint getEndPoint()
+ {
+ return endPoint_;
+ }
+
+ int getGeneration()
+ {
+ return generation_;
+ }
+
+ int getMaxVersion()
+ {
+ return maxVersion_;
+ }
+
+ public int compareTo(GossipDigest gDigest)
+ {
+ if ( generation_ != gDigest.generation_ )
+ return ( generation_ - gDigest.generation_ );
+ return (maxVersion_ - gDigest.maxVersion_);
+ }
+
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append(endPoint_);
+ sb.append(":");
+ sb.append(generation_);
+ sb.append(":");
+ sb.append(maxVersion_);
+ return sb.toString();
+ }
+}
+
+class GossipDigestSerializer implements ICompactSerializer<GossipDigest>
+{
+ public void serialize(GossipDigest gDigest, DataOutputStream dos) throws IOException
+ {
+ CompactEndPointSerializationHelper.serialize(gDigest.endPoint_, dos);
+ dos.writeInt(gDigest.generation_);
+ dos.writeInt(gDigest.maxVersion_);
+ }
+
+ public GossipDigest deserialize(DataInputStream dis) throws IOException
+ {
+ EndPoint endPoint = CompactEndPointSerializationHelper.deserialize(dis);
+ int generation = dis.readInt();
+ int version = dis.readInt();
+ return new GossipDigest(endPoint, generation, version);
+ }
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2Message.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2Message.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2Message.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAck2Message.java Thu Jul 30 15:30:21 2009
@@ -1,77 +1,77 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.gms;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.*;
-import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.net.*;
-
-
-/**
- * This message gets sent out as a result of the receipt of a GossipDigestAckMessage. This the
- * last stage of the 3 way messaging of the Gossip protocol.
- *
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-class GossipDigestAck2Message
-{
- private static ICompactSerializer<GossipDigestAck2Message> serializer_;
- static
- {
- serializer_ = new GossipDigestAck2MessageSerializer();
- }
-
- Map<EndPoint, EndPointState> epStateMap_ = new HashMap<EndPoint, EndPointState>();
-
- public static ICompactSerializer<GossipDigestAck2Message> serializer()
- {
- return serializer_;
- }
-
- GossipDigestAck2Message(Map<EndPoint, EndPointState> epStateMap)
- {
- epStateMap_ = epStateMap;
- }
-
- Map<EndPoint, EndPointState> getEndPointStateMap()
- {
- return epStateMap_;
- }
-}
-
-class GossipDigestAck2MessageSerializer implements ICompactSerializer<GossipDigestAck2Message>
-{
- public void serialize(GossipDigestAck2Message gDigestAck2Message, DataOutputStream dos) throws IOException
- {
- /* Use the EndPointState */
- EndPointStatesSerializationHelper.serialize(gDigestAck2Message.epStateMap_, dos);
- }
-
- public GossipDigestAck2Message deserialize(DataInputStream dis) throws IOException
- {
- Map<EndPoint, EndPointState> epStateMap = EndPointStatesSerializationHelper.deserialize(dis);
- return new GossipDigestAck2Message(epStateMap);
- }
-}
-
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.*;
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.*;
+
+
+/**
+ * This message gets sent out as a result of the receipt of a GossipDigestAckMessage. This the
+ * last stage of the 3 way messaging of the Gossip protocol.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class GossipDigestAck2Message
+{
+ private static ICompactSerializer<GossipDigestAck2Message> serializer_;
+ static
+ {
+ serializer_ = new GossipDigestAck2MessageSerializer();
+ }
+
+ Map<EndPoint, EndPointState> epStateMap_ = new HashMap<EndPoint, EndPointState>();
+
+ public static ICompactSerializer<GossipDigestAck2Message> serializer()
+ {
+ return serializer_;
+ }
+
+ GossipDigestAck2Message(Map<EndPoint, EndPointState> epStateMap)
+ {
+ epStateMap_ = epStateMap;
+ }
+
+ Map<EndPoint, EndPointState> getEndPointStateMap()
+ {
+ return epStateMap_;
+ }
+}
+
+class GossipDigestAck2MessageSerializer implements ICompactSerializer<GossipDigestAck2Message>
+{
+ public void serialize(GossipDigestAck2Message gDigestAck2Message, DataOutputStream dos) throws IOException
+ {
+ /* Use the EndPointState */
+ EndPointStatesSerializationHelper.serialize(gDigestAck2Message.epStateMap_, dos);
+ }
+
+ public GossipDigestAck2Message deserialize(DataInputStream dis) throws IOException
+ {
+ Map<EndPoint, EndPointState> epStateMap = EndPointStatesSerializationHelper.deserialize(dis);
+ return new GossipDigestAck2Message(epStateMap);
+ }
+}
+
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestAckMessage.java Thu Jul 30 15:30:21 2009
@@ -1,102 +1,102 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.gms;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.*;
-
-import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.net.EndPoint;
-
-
-
-/**
- * This message gets sent out as a result of the receipt of a GossipDigestSynMessage by an
- * endpoint. This is the 2 stage of the 3 way messaging in the Gossip protocol.
- *
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-class GossipDigestAckMessage
-{
- private static ICompactSerializer<GossipDigestAckMessage> serializer_;
- static
- {
- serializer_ = new GossipDigestAckMessageSerializer();
- }
-
- List<GossipDigest> gDigestList_ = new ArrayList<GossipDigest>();
- Map<EndPoint, EndPointState> epStateMap_ = new HashMap<EndPoint, EndPointState>();
-
- static ICompactSerializer<GossipDigestAckMessage> serializer()
- {
- return serializer_;
- }
-
- GossipDigestAckMessage(List<GossipDigest> gDigestList, Map<EndPoint, EndPointState> epStateMap)
- {
- gDigestList_ = gDigestList;
- epStateMap_ = epStateMap;
- }
-
- void addGossipDigest(EndPoint ep, int generation, int version)
- {
- gDigestList_.add( new GossipDigest(ep, generation, version) );
- }
-
- List<GossipDigest> getGossipDigestList()
- {
- return gDigestList_;
- }
-
- Map<EndPoint, EndPointState> getEndPointStateMap()
- {
- return epStateMap_;
- }
-}
-
-class GossipDigestAckMessageSerializer implements ICompactSerializer<GossipDigestAckMessage>
-{
- public void serialize(GossipDigestAckMessage gDigestAckMessage, DataOutputStream dos) throws IOException
- {
- /* Use the helper to serialize the GossipDigestList */
- boolean bContinue = GossipDigestSerializationHelper.serialize(gDigestAckMessage.gDigestList_, dos);
- dos.writeBoolean(bContinue);
- /* Use the EndPointState */
- if ( bContinue )
- {
- EndPointStatesSerializationHelper.serialize(gDigestAckMessage.epStateMap_, dos);
- }
- }
-
- public GossipDigestAckMessage deserialize(DataInputStream dis) throws IOException
- {
- Map<EndPoint, EndPointState> epStateMap = new HashMap<EndPoint, EndPointState>();
- List<GossipDigest> gDigestList = GossipDigestSerializationHelper.deserialize(dis);
- boolean bContinue = dis.readBoolean();
-
- if ( bContinue )
- {
- epStateMap = EndPointStatesSerializationHelper.deserialize(dis);
- }
- return new GossipDigestAckMessage(gDigestList, epStateMap);
- }
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.EndPoint;
+
+
+
+/**
+ * This message gets sent out as a result of the receipt of a GossipDigestSynMessage by an
+ * endpoint. This is the 2 stage of the 3 way messaging in the Gossip protocol.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class GossipDigestAckMessage
+{
+ private static ICompactSerializer<GossipDigestAckMessage> serializer_;
+ static
+ {
+ serializer_ = new GossipDigestAckMessageSerializer();
+ }
+
+ List<GossipDigest> gDigestList_ = new ArrayList<GossipDigest>();
+ Map<EndPoint, EndPointState> epStateMap_ = new HashMap<EndPoint, EndPointState>();
+
+ static ICompactSerializer<GossipDigestAckMessage> serializer()
+ {
+ return serializer_;
+ }
+
+ GossipDigestAckMessage(List<GossipDigest> gDigestList, Map<EndPoint, EndPointState> epStateMap)
+ {
+ gDigestList_ = gDigestList;
+ epStateMap_ = epStateMap;
+ }
+
+ void addGossipDigest(EndPoint ep, int generation, int version)
+ {
+ gDigestList_.add( new GossipDigest(ep, generation, version) );
+ }
+
+ List<GossipDigest> getGossipDigestList()
+ {
+ return gDigestList_;
+ }
+
+ Map<EndPoint, EndPointState> getEndPointStateMap()
+ {
+ return epStateMap_;
+ }
+}
+
+class GossipDigestAckMessageSerializer implements ICompactSerializer<GossipDigestAckMessage>
+{
+ public void serialize(GossipDigestAckMessage gDigestAckMessage, DataOutputStream dos) throws IOException
+ {
+ /* Use the helper to serialize the GossipDigestList */
+ boolean bContinue = GossipDigestSerializationHelper.serialize(gDigestAckMessage.gDigestList_, dos);
+ dos.writeBoolean(bContinue);
+ /* Use the EndPointState */
+ if ( bContinue )
+ {
+ EndPointStatesSerializationHelper.serialize(gDigestAckMessage.epStateMap_, dos);
+ }
+ }
+
+ public GossipDigestAckMessage deserialize(DataInputStream dis) throws IOException
+ {
+ Map<EndPoint, EndPointState> epStateMap = new HashMap<EndPoint, EndPointState>();
+ List<GossipDigest> gDigestList = GossipDigestSerializationHelper.deserialize(dis);
+ boolean bContinue = dis.readBoolean();
+
+ if ( bContinue )
+ {
+ epStateMap = EndPointStatesSerializationHelper.deserialize(dis);
+ }
+ return new GossipDigestAckMessage(gDigestList, epStateMap);
+ }
}
\ No newline at end of file
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/GossipDigestSynMessage.java Thu Jul 30 15:30:21 2009
@@ -1,184 +1,184 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.gms;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.*;
-
-import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.net.CompactEndPointSerializationHelper;
-import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.utils.Log4jLogger;
-import org.apache.log4j.Logger;
-import org.apache.cassandra.utils.*;
-
-
-/**
- * This is the first message that gets sent out as a start of the Gossip protocol in a
- * round.
- *
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-class GossipDigestSynMessage
-{
- private static ICompactSerializer<GossipDigestSynMessage> serializer_;
- static
- {
- serializer_ = new GossipDigestSynMessageSerializer();
- }
-
- String clusterId_;
- List<GossipDigest> gDigests_ = new ArrayList<GossipDigest>();
-
- public static ICompactSerializer<GossipDigestSynMessage> serializer()
- {
- return serializer_;
- }
-
- public GossipDigestSynMessage(String clusterId, List<GossipDigest> gDigests)
- {
- clusterId_ = clusterId;
- gDigests_ = gDigests;
- }
-
- List<GossipDigest> getGossipDigests()
- {
- return gDigests_;
- }
-}
-
-class GossipDigestSerializationHelper
-{
- private static Logger logger_ = Logger.getLogger(GossipDigestSerializationHelper.class);
-
- static boolean serialize(List<GossipDigest> gDigestList, DataOutputStream dos) throws IOException
- {
- boolean bVal = true;
- int size = gDigestList.size();
- dos.writeInt(size);
-
- int estimate = 0;
- for ( GossipDigest gDigest : gDigestList )
- {
- if ( Gossiper.MAX_GOSSIP_PACKET_SIZE - dos.size() < estimate )
- {
- logger_.info("@@@@ Breaking out to respect the MTU size in GD @@@@");
- bVal = false;
- break;
- }
- int pre = dos.size();
- GossipDigest.serializer().serialize( gDigest, dos );
- int post = dos.size();
- estimate = post - pre;
- }
- return bVal;
- }
-
- static List<GossipDigest> deserialize(DataInputStream dis) throws IOException
- {
- int size = dis.readInt();
- List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
-
- for ( int i = 0; i < size; ++i )
- {
- if ( dis.available() == 0 )
- {
- logger_.info("Remaining bytes zero. Stopping deserialization of GossipDigests.");
- break;
- }
-
- GossipDigest gDigest = GossipDigest.serializer().deserialize(dis);
- gDigests.add( gDigest );
- }
- return gDigests;
- }
-}
-
-class EndPointStatesSerializationHelper
-{
- private static Log4jLogger logger_ = new Log4jLogger(EndPointStatesSerializationHelper.class.getName());
-
- static boolean serialize(Map<EndPoint, EndPointState> epStateMap, DataOutputStream dos) throws IOException
- {
- boolean bVal = true;
- int estimate = 0;
- int size = epStateMap.size();
- dos.writeInt(size);
-
- Set<EndPoint> eps = epStateMap.keySet();
- for( EndPoint ep : eps )
- {
- if ( Gossiper.MAX_GOSSIP_PACKET_SIZE - dos.size() < estimate )
- {
- logger_.info("@@@@ Breaking out to respect the MTU size in EPS. Estimate is " + estimate + " @@@@");
- bVal = false;
- break;
- }
-
- int pre = dos.size();
- CompactEndPointSerializationHelper.serialize(ep, dos);
- EndPointState epState = epStateMap.get(ep);
- EndPointState.serializer().serialize(epState, dos);
- int post = dos.size();
- estimate = post - pre;
- }
- return bVal;
- }
-
- static Map<EndPoint, EndPointState> deserialize(DataInputStream dis) throws IOException
- {
- int size = dis.readInt();
- Map<EndPoint, EndPointState> epStateMap = new HashMap<EndPoint, EndPointState>();
-
- for ( int i = 0; i < size; ++i )
- {
- if ( dis.available() == 0 )
- {
- logger_.info("Remaining bytes zero. Stopping deserialization in EndPointState.");
- break;
- }
- // int length = dis.readInt();
- EndPoint ep = CompactEndPointSerializationHelper.deserialize(dis);
- EndPointState epState = EndPointState.serializer().deserialize(dis);
- epStateMap.put(ep, epState);
- }
- return epStateMap;
- }
-}
-
-class GossipDigestSynMessageSerializer implements ICompactSerializer<GossipDigestSynMessage>
-{
- public void serialize(GossipDigestSynMessage gDigestSynMessage, DataOutputStream dos) throws IOException
- {
- dos.writeUTF(gDigestSynMessage.clusterId_);
- GossipDigestSerializationHelper.serialize(gDigestSynMessage.gDigests_, dos);
- }
-
- public GossipDigestSynMessage deserialize(DataInputStream dis) throws IOException
- {
- String clusterId = dis.readUTF();
- List<GossipDigest> gDigests = GossipDigestSerializationHelper.deserialize(dis);
- return new GossipDigestSynMessage(clusterId, gDigests);
- }
-
-}
-
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.CompactEndPointSerializationHelper;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.utils.Log4jLogger;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.utils.*;
+
+
+/**
+ * This is the first message that gets sent out as a start of the Gossip protocol in a
+ * round.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class GossipDigestSynMessage
+{
+ private static ICompactSerializer<GossipDigestSynMessage> serializer_;
+ static
+ {
+ serializer_ = new GossipDigestSynMessageSerializer();
+ }
+
+ String clusterId_;
+ List<GossipDigest> gDigests_ = new ArrayList<GossipDigest>();
+
+ public static ICompactSerializer<GossipDigestSynMessage> serializer()
+ {
+ return serializer_;
+ }
+
+ public GossipDigestSynMessage(String clusterId, List<GossipDigest> gDigests)
+ {
+ clusterId_ = clusterId;
+ gDigests_ = gDigests;
+ }
+
+ List<GossipDigest> getGossipDigests()
+ {
+ return gDigests_;
+ }
+}
+
+class GossipDigestSerializationHelper
+{
+ private static Logger logger_ = Logger.getLogger(GossipDigestSerializationHelper.class);
+
+ static boolean serialize(List<GossipDigest> gDigestList, DataOutputStream dos) throws IOException
+ {
+ boolean bVal = true;
+ int size = gDigestList.size();
+ dos.writeInt(size);
+
+ int estimate = 0;
+ for ( GossipDigest gDigest : gDigestList )
+ {
+ if ( Gossiper.MAX_GOSSIP_PACKET_SIZE - dos.size() < estimate )
+ {
+ logger_.info("@@@@ Breaking out to respect the MTU size in GD @@@@");
+ bVal = false;
+ break;
+ }
+ int pre = dos.size();
+ GossipDigest.serializer().serialize( gDigest, dos );
+ int post = dos.size();
+ estimate = post - pre;
+ }
+ return bVal;
+ }
+
+ static List<GossipDigest> deserialize(DataInputStream dis) throws IOException
+ {
+ int size = dis.readInt();
+ List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
+
+ for ( int i = 0; i < size; ++i )
+ {
+ if ( dis.available() == 0 )
+ {
+ logger_.info("Remaining bytes zero. Stopping deserialization of GossipDigests.");
+ break;
+ }
+
+ GossipDigest gDigest = GossipDigest.serializer().deserialize(dis);
+ gDigests.add( gDigest );
+ }
+ return gDigests;
+ }
+}
+
+class EndPointStatesSerializationHelper
+{
+ private static Log4jLogger logger_ = new Log4jLogger(EndPointStatesSerializationHelper.class.getName());
+
+ static boolean serialize(Map<EndPoint, EndPointState> epStateMap, DataOutputStream dos) throws IOException
+ {
+ boolean bVal = true;
+ int estimate = 0;
+ int size = epStateMap.size();
+ dos.writeInt(size);
+
+ Set<EndPoint> eps = epStateMap.keySet();
+ for( EndPoint ep : eps )
+ {
+ if ( Gossiper.MAX_GOSSIP_PACKET_SIZE - dos.size() < estimate )
+ {
+ logger_.info("@@@@ Breaking out to respect the MTU size in EPS. Estimate is " + estimate + " @@@@");
+ bVal = false;
+ break;
+ }
+
+ int pre = dos.size();
+ CompactEndPointSerializationHelper.serialize(ep, dos);
+ EndPointState epState = epStateMap.get(ep);
+ EndPointState.serializer().serialize(epState, dos);
+ int post = dos.size();
+ estimate = post - pre;
+ }
+ return bVal;
+ }
+
+ static Map<EndPoint, EndPointState> deserialize(DataInputStream dis) throws IOException
+ {
+ int size = dis.readInt();
+ Map<EndPoint, EndPointState> epStateMap = new HashMap<EndPoint, EndPointState>();
+
+ for ( int i = 0; i < size; ++i )
+ {
+ if ( dis.available() == 0 )
+ {
+ logger_.info("Remaining bytes zero. Stopping deserialization in EndPointState.");
+ break;
+ }
+ // int length = dis.readInt();
+ EndPoint ep = CompactEndPointSerializationHelper.deserialize(dis);
+ EndPointState epState = EndPointState.serializer().deserialize(dis);
+ epStateMap.put(ep, epState);
+ }
+ return epStateMap;
+ }
+}
+
+class GossipDigestSynMessageSerializer implements ICompactSerializer<GossipDigestSynMessage>
+{
+ public void serialize(GossipDigestSynMessage gDigestSynMessage, DataOutputStream dos) throws IOException
+ {
+ dos.writeUTF(gDigestSynMessage.clusterId_);
+ GossipDigestSerializationHelper.serialize(gDigestSynMessage.gDigests_, dos);
+ }
+
+ public GossipDigestSynMessage deserialize(DataInputStream dis) throws IOException
+ {
+ String clusterId = dis.readUTF();
+ List<GossipDigest> gDigests = GossipDigestSerializationHelper.deserialize(dis);
+ return new GossipDigestSynMessage(clusterId, gDigests);
+ }
+
+}
+