You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pm...@apache.org on 2009/03/02 08:57:31 UTC
svn commit: r749218 [17/34] - in /incubator/cassandra: branches/ dist/
nightly/ site/ tags/ trunk/ trunk/lib/ trunk/src/ trunk/src/org/
trunk/src/org/apache/ trunk/src/org/apache/cassandra/
trunk/src/org/apache/cassandra/analytics/ trunk/src/org/apache...
Added: incubator/cassandra/trunk/src/org/apache/cassandra/gms/EndPointState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/gms/EndPointState.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/gms/EndPointState.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/gms/EndPointState.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.*;
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.io.IFileReader;
+import org.apache.cassandra.io.IFileWriter;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.utils.*;
+
+/**
+ * This abstraction represents both the HeartBeatState and the ApplicationState in an EndPointState
+ * instance. Any state for a given endpoint can be retrieved from this instance.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class EndPointState
+{
+ private static ICompactSerializer<EndPointState> serializer_;
+ static
+ {
+ serializer_ = new EndPointStateSerializer();
+ }
+
+ HeartBeatState hbState_;
+ Map<String, ApplicationState> applicationState_ = new Hashtable<String, ApplicationState>();
+
+ /* fields below do not get serialized */
+ long updateTimestamp_;
+ boolean isAlive_;
+ boolean isAGossiper_;
+
+ public static ICompactSerializer<EndPointState> serializer()
+ {
+ return serializer_;
+ }
+
+ EndPointState(HeartBeatState hbState)
+ {
+ hbState_ = hbState;
+ updateTimestamp_ = System.currentTimeMillis();
+ isAlive_ = true;
+ isAGossiper_ = false;
+ }
+
+ HeartBeatState getHeartBeatState()
+ {
+ return hbState_;
+ }
+
+ synchronized void setHeartBeatState(HeartBeatState hbState)
+ {
+ updateTimestamp();
+ hbState_ = hbState;
+ }
+
+ public ApplicationState getApplicationState(String key)
+ {
+ return applicationState_.get(key);
+ }
+
+ public Map<String, ApplicationState> getApplicationState()
+ {
+ return applicationState_;
+ }
+
+ void addApplicationState(String key, ApplicationState appState)
+ {
+ applicationState_.put(key, appState);
+ }
+
+ /* getters and setters */
+ long getUpdateTimestamp()
+ {
+ return updateTimestamp_;
+ }
+
+ synchronized void updateTimestamp()
+ {
+ updateTimestamp_ = System.currentTimeMillis();
+ }
+
+ public boolean isAlive()
+ {
+ return isAlive_;
+ }
+
+ synchronized void isAlive(boolean value)
+ {
+ isAlive_ = value;
+ }
+
+
+ boolean isAGossiper()
+ {
+ return isAGossiper_;
+ }
+
+ synchronized void isAGossiper(boolean value)
+ {
+ //isAlive_ = false;
+ isAGossiper_ = value;
+ }
+}
+
+class EndPointStateSerializer implements ICompactSerializer<EndPointState>
+{
+ private static Logger logger_ = Logger.getLogger(EndPointStateSerializer.class);
+
+ public void serialize(EndPointState epState, DataOutputStream dos) throws IOException
+ {
+ /* These are for estimating whether we overshoot the MTU limit */
+ int estimate = 0;
+
+ /* serialize the HeartBeatState */
+ HeartBeatState hbState = epState.getHeartBeatState();
+ HeartBeatState.serializer().serialize(hbState, dos);
+
+ /* serialize the map of ApplicationState objects */
+ int size = epState.applicationState_.size();
+ dos.writeInt(size);
+ if ( size > 0 )
+ {
+ Set<String> keys = epState.applicationState_.keySet();
+ for( String key : keys )
+ {
+ if ( Gossiper.MAX_GOSSIP_PACKET_SIZE - dos.size() < estimate )
+ {
+ logger_.info("@@@@ Breaking out to respect the MTU size in EndPointState serializer. Estimate is " + estimate + " @@@@");
+ break;
+ }
+
+ ApplicationState appState = epState.applicationState_.get(key);
+ if ( appState != null )
+ {
+ int pre = dos.size();
+ dos.writeUTF(key);
+ ApplicationState.serializer().serialize(appState, dos);
+ int post = dos.size();
+ estimate = post - pre;
+ }
+ }
+ }
+ }
+
+ public EndPointState deserialize(DataInputStream dis) throws IOException
+ {
+ HeartBeatState hbState = HeartBeatState.serializer().deserialize(dis);
+ EndPointState epState = new EndPointState(hbState);
+
+ int appStateSize = dis.readInt();
+ for ( int i = 0; i < appStateSize; ++i )
+ {
+ if ( dis.available() == 0 )
+ {
+ break;
+ }
+
+ String key = dis.readUTF();
+ ApplicationState appState = ApplicationState.serializer().deserialize(dis);
+ epState.addApplicationState(key, appState);
+ }
+ return epState;
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/gms/FailureDetector.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/gms/FailureDetector.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/gms/FailureDetector.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/gms/FailureDetector.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,343 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import java.io.FileOutputStream;
+import java.lang.management.ManagementFactory;
+import java.net.UnknownHostException;
+import java.util.*;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.utils.*;
+
+/**
+ * This FailureDetector is an implementation of the paper titled
+ * "The Phi Accrual Failure Detector" by Hayashibara.
+ * Check the paper and the <i>IFailureDetector</i> interface for details.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+public class FailureDetector implements IFailureDetector, FailureDetectorMBean
+{
+ private static Logger logger_ = Logger.getLogger(FailureDetector.class);
+ private static final int sampleSize_ = 1000;
+ private static final int phiSuspectThreshold_ = 5;
+ private static final int phiConvictThreshold_ = 8;
+ /* The Failure Detector has to have been up for atleast 1 min. */
+ private static final long uptimeThreshold_ = 60000;
+ private static IFailureDetector failureDetector_;
+ /* Used to lock the factory for creation of FailureDetector instance */
+ private static Lock createLock_ = new ReentrantLock();
+ /* The time when the module was instantiated. */
+ private static long creationTime_;
+
+ public static IFailureDetector instance()
+ {
+ if ( failureDetector_ == null )
+ {
+ FailureDetector.createLock_.lock();
+ try
+ {
+ if ( failureDetector_ == null )
+ {
+ failureDetector_ = new FailureDetector();
+ }
+ }
+ finally
+ {
+ createLock_.unlock();
+ }
+ }
+ return failureDetector_;
+ }
+
+ private Map<EndPoint, ArrivalWindow> arrivalSamples_ = new Hashtable<EndPoint, ArrivalWindow>();
+ private List<IFailureDetectionEventListener> fdEvntListeners_ = new ArrayList<IFailureDetectionEventListener>();
+
+ public FailureDetector()
+ {
+ creationTime_ = System.currentTimeMillis();
+ // Register this instance with JMX
+ try
+ {
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ mbs.registerMBean(this, new ObjectName("com.facebook.infrastructure.gms:type=FailureDetector"));
+ }
+ catch (Exception e)
+ {
+ logger_.error(LogUtil.throwableToString(e));
+ }
+ }
+
+ /**
+ * Dump the inter arrival times for examination if necessary.
+ */
+ public void dumpInterArrivalTimes()
+ {
+ try
+ {
+ FileOutputStream fos = new FileOutputStream("/var/tmp/output-" + System.currentTimeMillis() + ".dat", true);
+ fos.write(toString().getBytes());
+ fos.close();
+ }
+ catch(Throwable th)
+ {
+ logger_.warn(LogUtil.throwableToString(th));
+ }
+ }
+
+ /**
+ * We dump the arrival window for any endpoint only if the
+ * local Failure Detector module has been up for more than a
+ * minute.
+ *
+ * @param ep for which the arrival window needs to be dumped.
+ */
+ private void dumpInterArrivalTimes(EndPoint ep)
+ {
+ long now = System.currentTimeMillis();
+ if ( (now - FailureDetector.creationTime_) <= FailureDetector.uptimeThreshold_ )
+ return;
+ try
+ {
+ FileOutputStream fos = new FileOutputStream("/var/tmp/output-" + System.currentTimeMillis() + "-" + ep + ".dat", true);
+ ArrivalWindow hWnd = arrivalSamples_.get(ep);
+ fos.write(hWnd.toString().getBytes());
+ fos.close();
+ }
+ catch(Throwable th)
+ {
+ logger_.warn(LogUtil.throwableToString(th));
+ }
+ }
+
+ public boolean isAlive(EndPoint ep)
+ {
+ try
+ {
+ /* If the endpoint in question is the local endpoint return true. */
+ String localHost = FBUtilities.getLocalHostName();
+ if ( localHost.equals( ep.getHost() ) )
+ return true;
+ }
+ catch( UnknownHostException ex )
+ {
+ logger_.info( LogUtil.throwableToString(ex) );
+ }
+ /* Incoming port is assumed to be the Storage port. We need to change it to the control port */
+ EndPoint ep2 = new EndPoint(ep.getHost(), DatabaseDescriptor.getControlPort());
+ EndPointState epState = Gossiper.instance().getEndPointStateForEndPoint(ep2);
+ return epState.isAlive();
+ }
+
+ public void report(EndPoint ep)
+ {
+ long now = System.currentTimeMillis();
+ ArrivalWindow hbWnd = arrivalSamples_.get(ep);
+ if ( hbWnd == null )
+ {
+ hbWnd = new ArrivalWindow(sampleSize_);
+ arrivalSamples_.put(ep, hbWnd);
+ }
+ hbWnd.add(now);
+ }
+
+ public void intepret(EndPoint ep)
+ {
+ ArrivalWindow hbWnd = arrivalSamples_.get(ep);
+ if ( hbWnd == null )
+ {
+ return;
+ }
+ long now = System.currentTimeMillis();
+ /* We need this so that we do not suspect a convict. */
+ boolean isConvicted = false;
+ double phi = hbWnd.phi(now);
+ logger_.info("PHI for " + ep + " : " + phi);
+
+ /*
+ if ( phi > phiConvictThreshold_ )
+ {
+ isConvicted = true;
+ for ( IFailureDetectionEventListener listener : fdEvntListeners_ )
+ {
+ listener.convict(ep);
+ }
+ }
+ */
+ if ( !isConvicted && phi > phiSuspectThreshold_ )
+ {
+ for ( IFailureDetectionEventListener listener : fdEvntListeners_ )
+ {
+ listener.suspect(ep);
+ }
+ }
+ }
+
+ public void registerFailureDetectionEventListener(IFailureDetectionEventListener listener)
+ {
+ fdEvntListeners_.add(listener);
+ }
+
+ public void unregisterFailureDetectionEventListener(IFailureDetectionEventListener listener)
+ {
+ fdEvntListeners_.remove(listener);
+ }
+
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder();
+ Set<EndPoint> eps = arrivalSamples_.keySet();
+
+ sb.append("-----------------------------------------------------------------------");
+ for ( EndPoint ep : eps )
+ {
+ ArrivalWindow hWnd = arrivalSamples_.get(ep);
+ sb.append(ep + " : ");
+ sb.append(hWnd.toString());
+ sb.append( System.getProperty("line.separator") );
+ }
+ sb.append("-----------------------------------------------------------------------");
+ return sb.toString();
+ }
+
+ public static void main(String[] args) throws Throwable
+ {
+ }
+}
+
+class ArrivalWindow
+{
+ private static Logger logger_ = Logger.getLogger(ArrivalWindow.class);
+ private double tLast_ = 0L;
+ private List<Double> arrivalIntervals_;
+ private int size_;
+
+ ArrivalWindow(int size)
+ {
+ size_ = size;
+ arrivalIntervals_ = new ArrayList<Double>(size);
+ }
+
+ synchronized void add(double value)
+ {
+ if ( arrivalIntervals_.size() == size_ )
+ {
+ arrivalIntervals_.remove(0);
+ }
+
+ double interArrivalTime = 0;
+ if ( tLast_ > 0L )
+ {
+ interArrivalTime = (value - tLast_);
+ }
+ tLast_ = value;
+ arrivalIntervals_.add(interArrivalTime);
+ }
+
+ synchronized double sum()
+ {
+ double sum = 0d;
+ int size = arrivalIntervals_.size();
+ for( int i = 0; i < size; ++i )
+ {
+ sum += arrivalIntervals_.get(i);
+ }
+ return sum;
+ }
+
+ synchronized double sumOfDeviations()
+ {
+ double sumOfDeviations = 0d;
+ double mean = mean();
+ int size = arrivalIntervals_.size();
+
+ for( int i = 0; i < size; ++i )
+ {
+ sumOfDeviations += (arrivalIntervals_.get(i) - mean)*(arrivalIntervals_.get(i) - mean);
+ }
+ return sumOfDeviations;
+ }
+
+ synchronized double mean()
+ {
+ return sum()/arrivalIntervals_.size();
+ }
+
+ synchronized double variance()
+ {
+ return sumOfDeviations() / (arrivalIntervals_.size());
+ }
+
+ double deviation()
+ {
+ return Math.sqrt(variance());
+ }
+
+ void clear()
+ {
+ arrivalIntervals_.clear();
+ }
+
+ double p(double t)
+ {
+ // Stat stat = new Stat();
+ double mean = mean();
+ double deviation = deviation();
+ /* Exponential CDF = 1 -e^-lambda*x */
+ double exponent = (-1)*(t)/mean;
+ return 1 - ( 1 - Math.pow(Math.E, exponent) );
+ // return stat.gaussianCDF(mean, deviation, t, Double.POSITIVE_INFINITY);
+ }
+
+ double phi(long tnow)
+ {
+ int size = arrivalIntervals_.size();
+ double log = 0d;
+ if ( size > 0 )
+ {
+ double t = tnow - tLast_;
+ double probability = p(t);
+ log = (-1) * Math.log10( probability );
+ }
+ return log;
+ }
+
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder();
+ List<Double> arrivalIntervals = new ArrayList<Double>(arrivalIntervals_);
+ int size = arrivalIntervals.size();
+ for ( int i = 0; i < size; ++i )
+ {
+ sb.append(arrivalIntervals.get(i));
+ sb.append(" ");
+ }
+ return sb.toString();
+ }
+}
+
Added: incubator/cassandra/trunk/src/org/apache/cassandra/gms/FailureDetectorMBean.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/gms/FailureDetectorMBean.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/gms/FailureDetectorMBean.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/gms/FailureDetectorMBean.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+public interface FailureDetectorMBean
+{
+ public void dumpInterArrivalTimes();
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/gms/GossipDigest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/gms/GossipDigest.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/gms/GossipDigest.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/gms/GossipDigest.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.CompactEndPointSerializationHelper;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.*;
+
+/**
+ * Contains information about a specified list of EndPoints and the largest version
+ * of the state they have generated as known by the local endpoint.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class GossipDigest implements Comparable<GossipDigest>
+{
+ private static ICompactSerializer<GossipDigest> serializer_;
+ static
+ {
+ serializer_ = new GossipDigestSerializer();
+ }
+
+ EndPoint endPoint_;
+ int generation_;
+ int maxVersion_;
+
+ public static ICompactSerializer<GossipDigest> serializer()
+ {
+ return serializer_;
+ }
+
+ GossipDigest(EndPoint endPoint, int generation, int maxVersion)
+ {
+ endPoint_ = endPoint;
+ generation_ = generation;
+ maxVersion_ = maxVersion;
+ }
+
+ EndPoint getEndPoint()
+ {
+ return endPoint_;
+ }
+
+ int getGeneration()
+ {
+ return generation_;
+ }
+
+ int getMaxVersion()
+ {
+ return maxVersion_;
+ }
+
+ public int compareTo(GossipDigest gDigest)
+ {
+ if ( generation_ != gDigest.generation_ )
+ return ( generation_ - gDigest.generation_ );
+ return (maxVersion_ - gDigest.maxVersion_);
+ }
+
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append(endPoint_);
+ sb.append(":");
+ sb.append(generation_);
+ sb.append(":");
+ sb.append(maxVersion_);
+ return sb.toString();
+ }
+}
+
+class GossipDigestSerializer implements ICompactSerializer<GossipDigest>
+{
+ public void serialize(GossipDigest gDigest, DataOutputStream dos) throws IOException
+ {
+ CompactEndPointSerializationHelper.serialize(gDigest.endPoint_, dos);
+ dos.writeInt(gDigest.generation_);
+ dos.writeInt(gDigest.maxVersion_);
+ }
+
+ public GossipDigest deserialize(DataInputStream dis) throws IOException
+ {
+ EndPoint endPoint = CompactEndPointSerializationHelper.deserialize(dis);
+ int generation = dis.readInt();
+ int version = dis.readInt();
+ return new GossipDigest(endPoint, generation, version);
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/gms/GossipDigestAck2Message.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/gms/GossipDigestAck2Message.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/gms/GossipDigestAck2Message.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/gms/GossipDigestAck2Message.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.*;
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.*;
+
+
+/**
+ * This message gets sent out as a result of the receipt of a GossipDigestAckMessage. This the
+ * last stage of the 3 way messaging of the Gossip protocol.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class GossipDigestAck2Message
+{
+ private static ICompactSerializer<GossipDigestAck2Message> serializer_;
+ static
+ {
+ serializer_ = new GossipDigestAck2MessageSerializer();
+ }
+
+ Map<EndPoint, EndPointState> epStateMap_ = new HashMap<EndPoint, EndPointState>();
+
+ public static ICompactSerializer<GossipDigestAck2Message> serializer()
+ {
+ return serializer_;
+ }
+
+ GossipDigestAck2Message(Map<EndPoint, EndPointState> epStateMap)
+ {
+ epStateMap_ = epStateMap;
+ }
+
+ Map<EndPoint, EndPointState> getEndPointStateMap()
+ {
+ return epStateMap_;
+ }
+}
+
+class GossipDigestAck2MessageSerializer implements ICompactSerializer<GossipDigestAck2Message>
+{
+ public void serialize(GossipDigestAck2Message gDigestAck2Message, DataOutputStream dos) throws IOException
+ {
+ /* Use the EndPointState */
+ EndPointStatesSerializationHelper.serialize(gDigestAck2Message.epStateMap_, dos);
+ }
+
+ public GossipDigestAck2Message deserialize(DataInputStream dis) throws IOException
+ {
+ Map<EndPoint, EndPointState> epStateMap = EndPointStatesSerializationHelper.deserialize(dis);
+ return new GossipDigestAck2Message(epStateMap);
+ }
+}
+
Added: incubator/cassandra/trunk/src/org/apache/cassandra/gms/GossipDigestAckMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/gms/GossipDigestAckMessage.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/gms/GossipDigestAckMessage.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/gms/GossipDigestAckMessage.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.EndPoint;
+
+
+
+/**
+ * This message gets sent out as a result of the receipt of a GossipDigestSynMessage by an
+ * endpoint. This is the 2 stage of the 3 way messaging in the Gossip protocol.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class GossipDigestAckMessage
+{
+ private static ICompactSerializer<GossipDigestAckMessage> serializer_;
+ static
+ {
+ serializer_ = new GossipDigestAckMessageSerializer();
+ }
+
+ List<GossipDigest> gDigestList_ = new ArrayList<GossipDigest>();
+ Map<EndPoint, EndPointState> epStateMap_ = new HashMap<EndPoint, EndPointState>();
+
+ static ICompactSerializer<GossipDigestAckMessage> serializer()
+ {
+ return serializer_;
+ }
+
+ GossipDigestAckMessage(List<GossipDigest> gDigestList, Map<EndPoint, EndPointState> epStateMap)
+ {
+ gDigestList_ = gDigestList;
+ epStateMap_ = epStateMap;
+ }
+
+ void addGossipDigest(EndPoint ep, int generation, int version)
+ {
+ gDigestList_.add( new GossipDigest(ep, generation, version) );
+ }
+
+ List<GossipDigest> getGossipDigestList()
+ {
+ return gDigestList_;
+ }
+
+ Map<EndPoint, EndPointState> getEndPointStateMap()
+ {
+ return epStateMap_;
+ }
+}
+
+class GossipDigestAckMessageSerializer implements ICompactSerializer<GossipDigestAckMessage>
+{
+ public void serialize(GossipDigestAckMessage gDigestAckMessage, DataOutputStream dos) throws IOException
+ {
+ /* Use the helper to serialize the GossipDigestList */
+ boolean bContinue = GossipDigestSerializationHelper.serialize(gDigestAckMessage.gDigestList_, dos);
+ dos.writeBoolean(bContinue);
+ /* Use the EndPointState */
+ if ( bContinue )
+ {
+ EndPointStatesSerializationHelper.serialize(gDigestAckMessage.epStateMap_, dos);
+ }
+ }
+
+ public GossipDigestAckMessage deserialize(DataInputStream dis) throws IOException
+ {
+ Map<EndPoint, EndPointState> epStateMap = new HashMap<EndPoint, EndPointState>();
+ List<GossipDigest> gDigestList = GossipDigestSerializationHelper.deserialize(dis);
+ boolean bContinue = dis.readBoolean();
+
+ if ( bContinue )
+ {
+ epStateMap = EndPointStatesSerializationHelper.deserialize(dis);
+ }
+ return new GossipDigestAckMessage(gDigestList, epStateMap);
+ }
+}
\ No newline at end of file
Added: incubator/cassandra/trunk/src/org/apache/cassandra/gms/GossipDigestSynMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/gms/GossipDigestSynMessage.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/gms/GossipDigestSynMessage.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/gms/GossipDigestSynMessage.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.CompactEndPointSerializationHelper;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.utils.Log4jLogger;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.utils.*;
+
+
+/**
+ * This is the first message that gets sent out as a start of the Gossip protocol in a
+ * round.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class GossipDigestSynMessage
+{
+ private static ICompactSerializer<GossipDigestSynMessage> serializer_;
+ static
+ {
+ serializer_ = new GossipDigestSynMessageSerializer();
+ }
+
+ String clusterId_;
+ List<GossipDigest> gDigests_ = new ArrayList<GossipDigest>();
+
+ public static ICompactSerializer<GossipDigestSynMessage> serializer()
+ {
+ return serializer_;
+ }
+
+ public GossipDigestSynMessage(String clusterId, List<GossipDigest> gDigests)
+ {
+ clusterId_ = clusterId;
+ gDigests_ = gDigests;
+ }
+
+ List<GossipDigest> getGossipDigests()
+ {
+ return gDigests_;
+ }
+}
+
+class GossipDigestSerializationHelper
+{
+ private static Logger logger_ = Logger.getLogger(GossipDigestSerializationHelper.class);
+
+ static boolean serialize(List<GossipDigest> gDigestList, DataOutputStream dos) throws IOException
+ {
+ boolean bVal = true;
+ int size = gDigestList.size();
+ dos.writeInt(size);
+
+ int estimate = 0;
+ for ( GossipDigest gDigest : gDigestList )
+ {
+ if ( Gossiper.MAX_GOSSIP_PACKET_SIZE - dos.size() < estimate )
+ {
+ logger_.info("@@@@ Breaking out to respect the MTU size in GD @@@@");
+ bVal = false;
+ break;
+ }
+ int pre = dos.size();
+ GossipDigest.serializer().serialize( gDigest, dos );
+ int post = dos.size();
+ estimate = post - pre;
+ }
+ return bVal;
+ }
+
+ static List<GossipDigest> deserialize(DataInputStream dis) throws IOException
+ {
+ int size = dis.readInt();
+ List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
+
+ for ( int i = 0; i < size; ++i )
+ {
+ if ( dis.available() == 0 )
+ {
+ logger_.info("Remaining bytes zero. Stopping deserialization of GossipDigests.");
+ break;
+ }
+
+ GossipDigest gDigest = GossipDigest.serializer().deserialize(dis);
+ gDigests.add( gDigest );
+ }
+ return gDigests;
+ }
+}
+
+class EndPointStatesSerializationHelper
+{
+ private static Log4jLogger logger_ = new Log4jLogger(EndPointStatesSerializationHelper.class.getName());
+
+ static boolean serialize(Map<EndPoint, EndPointState> epStateMap, DataOutputStream dos) throws IOException
+ {
+ boolean bVal = true;
+ int estimate = 0;
+ int size = epStateMap.size();
+ dos.writeInt(size);
+
+ Set<EndPoint> eps = epStateMap.keySet();
+ for( EndPoint ep : eps )
+ {
+ if ( Gossiper.MAX_GOSSIP_PACKET_SIZE - dos.size() < estimate )
+ {
+ logger_.info("@@@@ Breaking out to respect the MTU size in EPS. Estimate is " + estimate + " @@@@");
+ bVal = false;
+ break;
+ }
+
+ int pre = dos.size();
+ CompactEndPointSerializationHelper.serialize(ep, dos);
+ EndPointState epState = epStateMap.get(ep);
+ EndPointState.serializer().serialize(epState, dos);
+ int post = dos.size();
+ estimate = post - pre;
+ }
+ return bVal;
+ }
+
+ static Map<EndPoint, EndPointState> deserialize(DataInputStream dis) throws IOException
+ {
+ int size = dis.readInt();
+ Map<EndPoint, EndPointState> epStateMap = new HashMap<EndPoint, EndPointState>();
+
+ for ( int i = 0; i < size; ++i )
+ {
+ if ( dis.available() == 0 )
+ {
+ logger_.info("Remaining bytes zero. Stopping deserialization in EndPointState.");
+ break;
+ }
+ // int length = dis.readInt();
+ EndPoint ep = CompactEndPointSerializationHelper.deserialize(dis);
+ EndPointState epState = EndPointState.serializer().deserialize(dis);
+ epStateMap.put(ep, epState);
+ }
+ return epStateMap;
+ }
+}
+
+class GossipDigestSynMessageSerializer implements ICompactSerializer<GossipDigestSynMessage>
+{
+ public void serialize(GossipDigestSynMessage gDigestSynMessage, DataOutputStream dos) throws IOException
+ {
+ dos.writeUTF(gDigestSynMessage.clusterId_);
+ GossipDigestSerializationHelper.serialize(gDigestSynMessage.gDigests_, dos);
+ }
+
+ public GossipDigestSynMessage deserialize(DataInputStream dis) throws IOException
+ {
+ String clusterId = dis.readUTF();
+ List<GossipDigest> gDigests = GossipDigestSerializationHelper.deserialize(dis);
+ return new GossipDigestSynMessage(clusterId, gDigests);
+ }
+
+}
+
Added: incubator/cassandra/trunk/src/org/apache/cassandra/gms/Gossiper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/gms/Gossiper.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/gms/Gossiper.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/gms/Gossiper.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,1129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import java.io.*;
+import java.util.*;
+
+import org.apache.cassandra.concurrent.SingleThreadedStage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.IComponentShutdown;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.utils.*;
+import org.apache.cassandra.net.*;
+
+/**
+ * This module is responsible for Gossiping information for the local endpoint. This abstraction
+ * maintains the list of live and dead endpoints. Periodically i.e. every 1 second this module
+ * chooses a random node and initiates a round of Gossip with it. A round of Gossip involves 3
+ * rounds of messaging. For instance if node A wants to initiate a round of Gossip with node B
+ * it starts off by sending node B a GossipDigestSynMessage. Node B on receipt of this message
+ * sends node A a GossipDigestAckMessage. On receipt of this message node A sends node B a
+ * GossipDigestAck2Message which completes a round of Gossip. This module as and when it hears one
+ * of the three above mentioned messages updates the Failure Detector with the liveness information.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class Gossiper implements IFailureDetectionEventListener, IEndPointStateChangePublisher, IComponentShutdown
+{
+ private class GossipTimerTask extends TimerTask
+ {
+ public void run()
+ {
+ try
+ {
+ synchronized( Gossiper.instance() )
+ {
+ /* Update the local heartbeat counter. */
+ endPointStateMap_.get(localEndPoint_).getHeartBeatState().updateHeartBeat();
+ List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
+ Gossiper.instance().makeRandomGossipDigest(gDigests);
+
+ if ( gDigests.size() > 0 )
+ {
+ Message message = makeGossipDigestSynMessage(gDigests);
+ /* Gossip to some random live member */
+ boolean bVal = doGossipToLiveMember(message);
+
+ /* Gossip to some unreachable member with some probability to check if he is back up */
+ doGossipToUnreachableMember(message);
+
+ /* Gossip to the seed. */
+ if ( !bVal )
+ doGossipToSeed(message);
+
+ logger_.debug("Performing status check ...");
+ doStatusCheck();
+ }
+ }
+ }
+ catch ( Throwable th )
+ {
+ logger_.info( LogUtil.throwableToString(th) );
+ }
+ }
+ }
+
+ final static int MAX_GOSSIP_PACKET_SIZE = 1428;
+ /* GS - abbreviation for GOSSIPER_STAGE */
+ final static String GOSSIP_STAGE = "GS";
+ /* GSV - abbreviation for GOSSIP-DIGEST-SYN-VERB */
+ final static String JOIN_VERB_HANDLER = "JVH";
+ /* GSV - abbreviation for GOSSIP-DIGEST-SYN-VERB */
+ final static String GOSSIP_DIGEST_SYN_VERB = "GSV";
+ /* GAV - abbreviation for GOSSIP-DIGEST-ACK-VERB */
+ final static String GOSSIP_DIGEST_ACK_VERB = "GAV";
+ /* GA2V - abbreviation for GOSSIP-DIGEST-ACK2-VERB */
+ final static String GOSSIP_DIGEST_ACK2_VERB = "GA2V";
+ private final static int intervalInMillis_ = 1000;
+ private static Logger logger_ = Logger.getLogger(Gossiper.class);
+ static Gossiper gossiper_;
+
+ public synchronized static Gossiper instance()
+ {
+ if ( gossiper_ == null )
+ {
+ gossiper_ = new Gossiper();
+ }
+ return gossiper_;
+ }
+
+ private Timer gossipTimer_ = new Timer(false);
+ private EndPoint localEndPoint_;
+ private long aVeryLongTime_;
+ private Random random_ = new Random();
+ /* index used previously */
+ private int prevIndex_ = 0;
+ /* round robin index through live endpoint set */
+ private int rrIndex_ = 0;
+
+ /* subscribers for interest in EndPointState change */
+ private List<IEndPointStateChangeSubscriber> subscribers_ = new ArrayList<IEndPointStateChangeSubscriber>();
+
+ /* live member set */
+ private Set<EndPoint> liveEndpoints_ = new HashSet<EndPoint>();
+
+ /* unreachable member set */
+ private Set<EndPoint> unreachableEndpoints_ = new HashSet<EndPoint>();
+
+ /* initial seeds for joining the cluster */
+ private Set<EndPoint> seeds_ = new HashSet<EndPoint>();
+
+ /* map where key is the endpoint and value is the state associated with the endpoint */
+ Map<EndPoint, EndPointState> endPointStateMap_ = new Hashtable<EndPoint, EndPointState>();
+
+ /* private CTOR */
+ Gossiper()
+ {
+ aVeryLongTime_ = 259200 * 1000;
+ /* register with the Failure Detector for receiving Failure detector events */
+ FailureDetector.instance().registerFailureDetectionEventListener(this);
+ /* register the verbs */
+ MessagingService.getMessagingInstance().registerVerbHandlers(JOIN_VERB_HANDLER, new JoinVerbHandler());
+ MessagingService.getMessagingInstance().registerVerbHandlers(GOSSIP_DIGEST_SYN_VERB, new GossipDigestSynVerbHandler());
+ MessagingService.getMessagingInstance().registerVerbHandlers(GOSSIP_DIGEST_ACK_VERB, new GossipDigestAckVerbHandler());
+ MessagingService.getMessagingInstance().registerVerbHandlers(GOSSIP_DIGEST_ACK2_VERB, new GossipDigestAck2VerbHandler());
+ /* register the Gossip stage */
+ StageManager.registerStage( Gossiper.GOSSIP_STAGE, new SingleThreadedStage("GMFD") );
+ /* register with Storage Service for shutdown */
+ StorageService.instance().registerComponentForShutdown(this);
+ }
+
+ public void register(IEndPointStateChangeSubscriber subscriber)
+ {
+ subscribers_.add(subscriber);
+ }
+
+ public void unregister(IEndPointStateChangeSubscriber subscriber)
+ {
+ subscribers_.remove(subscriber);
+ }
+
+ public Set<EndPoint> getAllMembers()
+ {
+ Set<EndPoint> allMbrs = new HashSet<EndPoint>();
+ allMbrs.addAll(getLiveMembers());
+ allMbrs.addAll(getUnreachableMembers());
+ return allMbrs;
+ }
+
+ public Set<EndPoint> getLiveMembers()
+ {
+ Set<EndPoint> liveMbrs = new HashSet<EndPoint>(liveEndpoints_);
+ liveMbrs.add( new EndPoint( localEndPoint_.getHost(), localEndPoint_.getPort() ) );
+ return liveMbrs;
+ }
+
+ public Set<EndPoint> getUnreachableMembers()
+ {
+ return new HashSet<EndPoint>(unreachableEndpoints_);
+ }
+
+ /**
+ * This method is used to forcibly remove a node from the membership
+ * set. He is forgotten locally immediately.
+ *
+ * param@ ep the endpoint to be removed from membership.
+ */
+ public synchronized void removeFromMembership(EndPoint ep)
+ {
+ endPointStateMap_.remove(ep);
+ liveEndpoints_.remove(ep);
+ unreachableEndpoints_ .remove(ep);
+ }
+
+ /**
+ * This method is part of IFailureDetectionEventListener interface. This is invoked
+ * by the Failure Detector when it convicts an end point.
+ *
+ * param @ endpoint end point that is convicted.
+ */
+
+ public void convict(EndPoint endpoint)
+ {
+ EndPointState epState = endPointStateMap_.get(endpoint);
+ if ( epState != null )
+ {
+ if ( !epState.isAlive() && epState.isAGossiper() )
+ {
+ /*
+ * just to be sure - is invoked just to make sure that
+ * it was called atleast once.
+ */
+ if ( liveEndpoints_.contains(endpoint) )
+ {
+ logger_.info("EndPoint " + endpoint + " is now dead.");
+ isAlive(endpoint, epState, false);
+
+ /* Notify an endpoint is dead to interested parties. */
+ EndPointState deltaState = new EndPointState(epState.getHeartBeatState());
+ doNotifications(endpoint, deltaState);
+ }
+ epState.isAGossiper(false);
+ }
+ }
+ }
+
+ /**
+ * This method is part of IFailureDetectionEventListener interface. This is invoked
+ * by the Failure Detector when it suspects an end point.
+ *
+ * param @ endpoint end point that is suspected.
+ */
+ public void suspect(EndPoint endpoint)
+ {
+ EndPointState epState = endPointStateMap_.get(endpoint);
+ if ( epState.isAlive() )
+ {
+ logger_.info("EndPoint " + endpoint + " is now dead.");
+ isAlive(endpoint, epState, false);
+
+ /* Notify an endpoint is dead to interested parties. */
+ EndPointState deltaState = new EndPointState(epState.getHeartBeatState());
+ doNotifications(endpoint, deltaState);
+ }
+ }
+
+ int getMaxEndPointStateVersion(EndPointState epState)
+ {
+ List<Integer> versions = new ArrayList<Integer>();
+ versions.add( epState.getHeartBeatState().getHeartBeatVersion() );
+ Map<String, ApplicationState> appStateMap = epState.getApplicationState();
+
+ Set<String> keys = appStateMap.keySet();
+ for ( String key : keys )
+ {
+ int stateVersion = appStateMap.get(key).getStateVersion();
+ versions.add( stateVersion );
+ }
+
+ /* sort to get the max version to build GossipDigest for this endpoint */
+ Collections.sort(versions);
+ int maxVersion = versions.get(versions.size() - 1);
+ versions.clear();
+ return maxVersion;
+ }
+
+ /**
+ * Removes the endpoint from unreachable endpoint set
+ *
+ * @param endpoint endpoint to be removed from the current membership.
+ */
+ void evictFromMembership(EndPoint endpoint)
+ {
+ unreachableEndpoints_.remove(endpoint);
+ }
+
+ /* No locking required since it is called from a method that already has acquired a lock */
+ @Deprecated
+ void makeGossipDigest(List<GossipDigest> gDigests)
+ {
+ /* Add the local endpoint state */
+ EndPointState epState = endPointStateMap_.get(localEndPoint_);
+ int generation = epState.getHeartBeatState().getGeneration();
+ int maxVersion = getMaxEndPointStateVersion(epState);
+ gDigests.add( new GossipDigest(localEndPoint_, generation, maxVersion) );
+
+ for ( EndPoint liveEndPoint : liveEndpoints_ )
+ {
+ epState = endPointStateMap_.get(liveEndPoint);
+ if ( epState != null )
+ {
+ generation = epState.getHeartBeatState().getGeneration();
+ maxVersion = getMaxEndPointStateVersion(epState);
+ gDigests.add( new GossipDigest(liveEndPoint, generation, maxVersion) );
+ }
+ else
+ {
+ gDigests.add( new GossipDigest(liveEndPoint, 0, 0) );
+ }
+ }
+ }
+
+ /**
+ * No locking required since it is called from a method that already
+ * has acquired a lock. The gossip digest is built based on randomization
+ * rather than just looping through the collection of live endpoints.
+ *
+ * @param gDigests list of Gossip Digests.
+ */
+ void makeRandomGossipDigest(List<GossipDigest> gDigests)
+ {
+ /* Add the local endpoint state */
+ EndPointState epState = endPointStateMap_.get(localEndPoint_);
+ int generation = epState.getHeartBeatState().getGeneration();
+ int maxVersion = getMaxEndPointStateVersion(epState);
+ gDigests.add( new GossipDigest(localEndPoint_, generation, maxVersion) );
+
+ int size = liveEndpoints_.size();
+ List<EndPoint> endpoints = new ArrayList<EndPoint>( liveEndpoints_ );
+ Collections.shuffle(endpoints, random_);
+ for ( EndPoint liveEndPoint : endpoints )
+ {
+ epState = endPointStateMap_.get(liveEndPoint);
+ if ( epState != null )
+ {
+ generation = epState.getHeartBeatState().getGeneration();
+ maxVersion = getMaxEndPointStateVersion(epState);
+ gDigests.add( new GossipDigest(liveEndPoint, generation, maxVersion) );
+ }
+ else
+ {
+ gDigests.add( new GossipDigest(liveEndPoint, 0, 0) );
+ }
+ }
+
+ /* FOR DEBUG ONLY - remove later */
+ StringBuilder sb = new StringBuilder();
+ for ( GossipDigest gDigest : gDigests )
+ {
+ sb.append(gDigest);
+ sb.append(" ");
+ }
+ logger_.debug("Gossip Digests are : " + sb.toString());
+ }
+
+ public int getCurrentGenerationNumber(EndPoint endpoint)
+ {
+ return endPointStateMap_.get(endpoint).getHeartBeatState().getGeneration();
+ }
+
+ Message makeGossipDigestSynMessage(List<GossipDigest> gDigests) throws IOException
+ {
+ GossipDigestSynMessage gDigestMessage = new GossipDigestSynMessage(DatabaseDescriptor.getClusterName(), gDigests);
+ ByteArrayOutputStream bos = new ByteArrayOutputStream(Gossiper.MAX_GOSSIP_PACKET_SIZE);
+ DataOutputStream dos = new DataOutputStream( bos );
+ GossipDigestSynMessage.serializer().serialize(gDigestMessage, dos);
+ Message message = new Message(localEndPoint_, Gossiper.GOSSIP_STAGE, GOSSIP_DIGEST_SYN_VERB, new Object[]{bos.toByteArray()});
+ return message;
+ }
+
+ Message makeGossipDigestAckMessage(GossipDigestAckMessage gDigestAckMessage) throws IOException
+ {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream(Gossiper.MAX_GOSSIP_PACKET_SIZE);
+ DataOutputStream dos = new DataOutputStream(bos);
+ GossipDigestAckMessage.serializer().serialize(gDigestAckMessage, dos);
+ logger_.debug("@@@@ Size of GossipDigestAckMessage is " + bos.toByteArray().length);
+ Message message = new Message(localEndPoint_, Gossiper.GOSSIP_STAGE, GOSSIP_DIGEST_ACK_VERB, new Object[]{bos.toByteArray()});
+ return message;
+ }
+
+ Message makeGossipDigestAck2Message(GossipDigestAck2Message gDigestAck2Message) throws IOException
+ {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream(Gossiper.MAX_GOSSIP_PACKET_SIZE);
+ DataOutputStream dos = new DataOutputStream(bos);
+ GossipDigestAck2Message.serializer().serialize(gDigestAck2Message, dos);
+ Message message = new Message(localEndPoint_, Gossiper.GOSSIP_STAGE, GOSSIP_DIGEST_ACK2_VERB, new Object[]{bos.toByteArray()});
+ return message;
+ }
+
+ boolean sendGossipToLiveNode(Message message)
+ {
+ int size = liveEndpoints_.size();
+ List<EndPoint> eps = new ArrayList<EndPoint>(liveEndpoints_);
+
+ if ( rrIndex_ >= size )
+ {
+ rrIndex_ = -1;
+ }
+
+ EndPoint to = eps.get(++rrIndex_);
+ logger_.info("Sending a GossipDigestSynMessage to " + to + " ...");
+ MessagingService.getMessagingInstance().sendUdpOneWay(message, to);
+ return seeds_.contains(to);
+ }
+
+ /**
+ * Returns true if the chosen target was also a seed. False otherwise
+ *
+ * @param message message to sent
+ * @param epSet a set of endpoint from which a random endpoint is chosen.
+ * @return true if the chosen endpoint is also a seed.
+ */
+ boolean sendGossip(Message message, Set<EndPoint> epSet)
+ {
+ int size = epSet.size();
+ /* Generate a random number from 0 -> size */
+ List<EndPoint> liveEndPoints = new ArrayList<EndPoint>(epSet);
+ int index = (size == 1) ? 0 : random_.nextInt(size);
+ EndPoint to = liveEndPoints.get(index);
+ logger_.info("Sending a GossipDigestSynMessage to " + to + " ...");
+ MessagingService.getMessagingInstance().sendUdpOneWay(message, to);
+ return seeds_.contains(to);
+ }
+
+ /* Sends a Gossip message to a live member and returns a reference to the member */
+ boolean doGossipToLiveMember(Message message)
+ {
+ int size = liveEndpoints_.size();
+ if ( size == 0 )
+ return false;
+ // return sendGossipToLiveNode(message);
+ /* Use this for a cluster size >= 30 */
+ return sendGossip(message, liveEndpoints_);
+ }
+
+ /* Sends a Gossip message to an unreachable member */
+ void doGossipToUnreachableMember(Message message)
+ {
+ double liveEndPoints = liveEndpoints_.size();
+ double unreachableEndPoints = unreachableEndpoints_.size();
+ if ( unreachableEndPoints > 0 )
+ {
+ /* based on some probability */
+ double prob = unreachableEndPoints / (liveEndPoints + 1);
+ double randDbl = random_.nextDouble();
+ if ( randDbl < prob )
+ sendGossip(message, unreachableEndpoints_);
+ }
+ }
+
+ /* Gossip to a seed for facilitating partition healing */
+ void doGossipToSeed(Message message)
+ {
+ int size = seeds_.size();
+ if ( size > 0 )
+ {
+ if ( size == 1 && seeds_.contains(localEndPoint_) )
+ {
+ return;
+ }
+
+ if ( liveEndpoints_.size() == 0 )
+ {
+ sendGossip(message, seeds_);
+ }
+ else
+ {
+ /* Gossip with the seed with some probability. */
+ double probability = seeds_.size() / ( liveEndpoints_.size() + unreachableEndpoints_.size() );
+ double randDbl = random_.nextDouble();
+ if ( randDbl <= probability )
+ sendGossip(message, seeds_);
+ }
+ }
+ }
+
+ void doStatusCheck()
+ {
+ long now = System.currentTimeMillis();
+ Set<EndPoint> eps = endPointStateMap_.keySet();
+
+ for ( EndPoint endpoint : eps )
+ {
+ if ( endpoint.equals(localEndPoint_) )
+ continue;
+
+ FailureDetector.instance().intepret(endpoint);
+ EndPointState epState = endPointStateMap_.get(endpoint);
+ if ( epState != null )
+ {
+ long l = now - epState.getUpdateTimestamp();
+ long duration = now - l;
+ if ( !epState.isAlive() && (duration > aVeryLongTime_) )
+ {
+ evictFromMembership(endpoint);
+ }
+ }
+ }
+ }
+
+ EndPointState getEndPointStateForEndPoint(EndPoint ep)
+ {
+ return endPointStateMap_.get(ep);
+ }
+
+ synchronized EndPointState getStateForVersionBiggerThan(EndPoint forEndpoint, int version)
+ {
+ EndPointState epState = endPointStateMap_.get(forEndpoint);
+ EndPointState reqdEndPointState = null;
+
+ if ( epState != null )
+ {
+ /*
+ * Here we try to include the Heart Beat state only if it is
+ * greater than the version passed in. It might happen that
+ * the heart beat version maybe lesser than the version passed
+ * in and some application state has a version that is greater
+ * than the version passed in. In this case we also send the old
+ * heart beat and throw it away on the receiver if it is redundant.
+ */
+ int localHbVersion = epState.getHeartBeatState().getHeartBeatVersion();
+ if ( localHbVersion > version )
+ {
+ reqdEndPointState = new EndPointState(epState.getHeartBeatState());
+ }
+ Map<String, ApplicationState> appStateMap = epState.getApplicationState();
+ /* Accumulate all application states whose versions are greater than "version" variable */
+ Set<String> keys = appStateMap.keySet();
+ for ( String key : keys )
+ {
+ ApplicationState appState = appStateMap.get(key);
+ if ( appState.getStateVersion() > version )
+ {
+ if ( reqdEndPointState == null )
+ {
+ reqdEndPointState = new EndPointState(epState.getHeartBeatState());
+ }
+ reqdEndPointState.addApplicationState(key, appState);
+ }
+ }
+ }
+ return reqdEndPointState;
+ }
+
+ /*
+ * This method is called only from the JoinVerbHandler. This happens
+ * when a new node coming up multicasts the JoinMessage. Here we need
+ * to add the endPoint to the list of live endpoints.
+ */
+ synchronized void join(EndPoint from)
+ {
+ if ( !from.equals( localEndPoint_ ) )
+ {
+ /* Mark this endpoint as "live" */
+ liveEndpoints_.add(from);
+ unreachableEndpoints_.remove(from);
+ }
+ }
+
+ void notifyFailureDetector(List<GossipDigest> gDigests)
+ {
+ IFailureDetector fd = FailureDetector.instance();
+ for ( GossipDigest gDigest : gDigests )
+ {
+ EndPointState localEndPointState = endPointStateMap_.get(gDigest.endPoint_);
+ /*
+ * If the local endpoint state exists then report to the FD only
+ * if the versions workout.
+ */
+ if ( localEndPointState != null )
+ {
+ int localGeneration = endPointStateMap_.get(gDigest.endPoint_).getHeartBeatState().generation_;
+ int remoteGeneration = gDigest.generation_;
+ if ( remoteGeneration > localGeneration )
+ {
+ logger_.debug("Reporting " + gDigest.endPoint_ + " to the FD.");
+ fd.report(gDigest.endPoint_);
+ continue;
+ }
+
+ if ( remoteGeneration == localGeneration )
+ {
+ int localVersion = getMaxEndPointStateVersion(localEndPointState);
+ //int localVersion = endPointStateMap_.get(gDigest.endPoint_).getHeartBeatState().getHeartBeatVersion();
+ int remoteVersion = gDigest.maxVersion_;
+ if ( remoteVersion > localVersion )
+ {
+ logger_.debug("Reporting " + gDigest.endPoint_ + " to the FD.");
+ fd.report(gDigest.endPoint_);
+ }
+ }
+ }
+ }
+ }
+
+ void notifyFailureDetector(Map<EndPoint, EndPointState> remoteEpStateMap)
+ {
+ IFailureDetector fd = FailureDetector.instance();
+ Set<EndPoint> endpoints = remoteEpStateMap.keySet();
+ for ( EndPoint endpoint : endpoints )
+ {
+ EndPointState remoteEndPointState = remoteEpStateMap.get(endpoint);
+ EndPointState localEndPointState = endPointStateMap_.get(endpoint);
+ /*
+ * If the local endpoint state exists then report to the FD only
+ * if the versions workout.
+ */
+ if ( localEndPointState != null )
+ {
+ int localGeneration = localEndPointState.getHeartBeatState().generation_;
+ int remoteGeneration = remoteEndPointState.getHeartBeatState().generation_;
+ if ( remoteGeneration > localGeneration )
+ {
+ logger_.debug("Reporting " + endpoint + " to the FD.");
+ fd.report(endpoint);
+ continue;
+ }
+
+ if ( remoteGeneration == localGeneration )
+ {
+ int localVersion = getMaxEndPointStateVersion(localEndPointState);
+ //int localVersion = localEndPointState.getHeartBeatState().getHeartBeatVersion();
+ int remoteVersion = remoteEndPointState.getHeartBeatState().getHeartBeatVersion();
+ if ( remoteVersion > localVersion )
+ {
+ logger_.debug("Reporting " + endpoint + " to the FD.");
+ fd.report(endpoint);
+ }
+ }
+ }
+ }
+ }
+
+ void resusitate(EndPoint addr, EndPointState localState)
+ {
+ logger_.debug("Attempting to resusitate " + addr);
+ if ( !localState.isAlive() )
+ {
+ isAlive(addr, localState, true);
+ logger_.info("EndPoint " + addr + " is now UP");
+ }
+ }
+
+ private void handleNewJoin(EndPoint ep, EndPointState epState)
+ {
+ logger_.info("Node " + ep + " has now joined.");
+ /* Mark this endpoint as "live" */
+ endPointStateMap_.put(ep, epState);
+ isAlive(ep, epState, true);
+ /* Notify interested parties about endpoint state change */
+ doNotifications(ep, epState);
+ }
+
+ synchronized void applyStateLocally(Map<EndPoint, EndPointState> epStateMap)
+ {
+ Set<EndPoint> eps = epStateMap.keySet();
+ for( EndPoint ep : eps )
+ {
+ if ( ep.equals( localEndPoint_ ) )
+ continue;
+
+ EndPointState localEpStatePtr = endPointStateMap_.get(ep);
+ EndPointState remoteState = epStateMap.get(ep);
+ /*
+ If state does not exist just add it. If it does then add it only if the version
+ of the remote copy is greater than the local copy.
+ */
+ if ( localEpStatePtr != null )
+ {
+ int localGeneration = localEpStatePtr.getHeartBeatState().getGeneration();
+ int remoteGeneration = remoteState.getHeartBeatState().getGeneration();
+
+ if (remoteGeneration > localGeneration)
+ {
+ handleNewJoin(ep, remoteState);
+ }
+ else if ( remoteGeneration == localGeneration )
+ {
+ /* manage the membership state */
+ int localMaxVersion = getMaxEndPointStateVersion(localEpStatePtr);
+ int remoteMaxVersion = getMaxEndPointStateVersion(remoteState);
+ if ( remoteMaxVersion > localMaxVersion )
+ {
+ resusitate(ep, localEpStatePtr);
+ applyHeartBeatStateLocally(ep, localEpStatePtr, remoteState);
+ /* apply ApplicationState */
+ applyApplicationStateLocally(ep, localEpStatePtr, remoteState);
+ }
+ }
+ }
+ else
+ {
+ handleNewJoin(ep, remoteState);
+ }
+ }
+ }
+
+ void applyHeartBeatStateLocally(EndPoint addr, EndPointState localState, EndPointState remoteState)
+ {
+ HeartBeatState localHbState = localState.getHeartBeatState();
+ HeartBeatState remoteHbState = remoteState.getHeartBeatState();
+
+ if ( remoteHbState.getGeneration() > localHbState.getGeneration() )
+ {
+ resusitate(addr, localState);
+ localState.setHeartBeatState(remoteHbState);
+ }
+ if ( localHbState.getGeneration() == remoteHbState.getGeneration() )
+ {
+ if ( remoteHbState.getHeartBeatVersion() > localHbState.getHeartBeatVersion() )
+ {
+ int oldVersion = localHbState.getHeartBeatVersion();
+ localState.setHeartBeatState(remoteHbState);
+ logger_.debug("Updating heartbeat state version to " + localState.getHeartBeatState().getHeartBeatVersion() + " from " + oldVersion + " for " + addr + " ...");
+ }
+ }
+ }
+
+ void applyApplicationStateLocally(EndPoint addr, EndPointState localStatePtr, EndPointState remoteStatePtr)
+ {
+ Map<String, ApplicationState> localAppStateMap = localStatePtr.getApplicationState();
+ Map<String, ApplicationState> remoteAppStateMap = remoteStatePtr.getApplicationState();
+
+ Set<String> remoteKeys = remoteAppStateMap.keySet();
+ for ( String remoteKey : remoteKeys )
+ {
+ ApplicationState remoteAppState = remoteAppStateMap.get(remoteKey);
+ ApplicationState localAppState = localAppStateMap.get(remoteKey);
+
+ /* If state doesn't exist locally for this key then just apply it */
+ if ( localAppState == null )
+ {
+ localStatePtr.addApplicationState(remoteKey, remoteAppState);
+ /* notify interested parties of endpoint state change */
+ EndPointState deltaState = new EndPointState(localStatePtr.getHeartBeatState());
+ deltaState.addApplicationState(remoteKey, remoteAppState);
+ doNotifications(addr, deltaState);
+ continue;
+ }
+
+ int remoteGeneration = remoteStatePtr.getHeartBeatState().getGeneration();
+ int localGeneration = localStatePtr.getHeartBeatState().getGeneration();
+
+ /* If the remoteGeneration is greater than localGeneration then apply state blindly */
+ if ( remoteGeneration > localGeneration )
+ {
+ localStatePtr.addApplicationState(remoteKey, remoteAppState);
+ /* notify interested parties of endpoint state change */
+ EndPointState deltaState = new EndPointState(localStatePtr.getHeartBeatState());
+ deltaState.addApplicationState(remoteKey, remoteAppState);
+ doNotifications(addr, deltaState);
+ continue;
+ }
+
+ /* If the generations are the same then apply state if the remote version is greater than local version. */
+ if ( remoteGeneration == localGeneration )
+ {
+ int remoteVersion = remoteAppState.getStateVersion();
+ int localVersion = localAppState.getStateVersion();
+
+ if ( remoteVersion > localVersion )
+ {
+ localStatePtr.addApplicationState(remoteKey, remoteAppState);
+ /* notify interested parties of endpoint state change */
+ EndPointState deltaState = new EndPointState(localStatePtr.getHeartBeatState());
+ deltaState.addApplicationState(remoteKey, remoteAppState);
+ doNotifications(addr, deltaState);
+ }
+ }
+ }
+ }
+
+ void doNotifications(EndPoint addr, EndPointState epState)
+ {
+ for ( IEndPointStateChangeSubscriber subscriber : subscribers_ )
+ {
+ subscriber.onChange(addr, epState);
+ }
+ }
+
+ synchronized void isAlive(EndPoint addr, EndPointState epState, boolean value)
+ {
+ epState.isAlive(value);
+ if ( value )
+ {
+ liveEndpoints_.add(addr);
+ unreachableEndpoints_.remove(addr);
+ }
+ else
+ {
+ liveEndpoints_.remove(addr);
+ unreachableEndpoints_.add(addr);
+ }
+ if ( epState.isAGossiper() )
+ return;
+ epState.isAGossiper(true);
+ }
+
+ /* These are helper methods used from GossipDigestSynVerbHandler */
+ Map<EndPoint, GossipDigest> getEndPointGossipDigestMap(List<GossipDigest> gDigestList)
+ {
+ Map<EndPoint, GossipDigest> epMap = new HashMap<EndPoint, GossipDigest>();
+ for( GossipDigest gDigest : gDigestList )
+ {
+ epMap.put( gDigest.getEndPoint(), gDigest );
+ }
+ return epMap;
+ }
+
+ /* This is a helper method to get all EndPoints from a list of GossipDigests */
+ EndPoint[] getEndPointsFromGossipDigest(List<GossipDigest> gDigestList)
+ {
+ Set<EndPoint> set = new HashSet<EndPoint>();
+ for ( GossipDigest gDigest : gDigestList )
+ {
+ set.add( gDigest.getEndPoint() );
+ }
+ return set.toArray( new EndPoint[0] );
+ }
+
+ /* Request all the state for the endpoint in the gDigest */
+ void requestAll(GossipDigest gDigest, List<GossipDigest> deltaGossipDigestList, int remoteGeneration)
+ {
+ /* We are here since we have no data for this endpoint locally so request everthing. */
+ deltaGossipDigestList.add( new GossipDigest(gDigest.getEndPoint(), remoteGeneration, 0) );
+ }
+
+ /* Send all the data with version greater than maxRemoteVersion */
+ void sendAll(GossipDigest gDigest, Map<EndPoint, EndPointState> deltaEpStateMap, int maxRemoteVersion)
+ {
+ EndPointState localEpStatePtr = getStateForVersionBiggerThan(gDigest.getEndPoint(), maxRemoteVersion) ;
+ if ( localEpStatePtr != null )
+ deltaEpStateMap.put(gDigest.getEndPoint(), localEpStatePtr);
+ }
+
+ /*
+ This method is used to figure the state that the Gossiper has but Gossipee doesn't. The delta digests
+ and the delta state are built up.
+ */
+ synchronized void examineGossiper(List<GossipDigest> gDigestList, List<GossipDigest> deltaGossipDigestList, Map<EndPoint, EndPointState> deltaEpStateMap)
+ {
+ for ( GossipDigest gDigest : gDigestList )
+ {
+ int remoteGeneration = gDigest.getGeneration();
+ int maxRemoteVersion = gDigest.getMaxVersion();
+ /* Get state associated with the end point in digest */
+ EndPointState epStatePtr = endPointStateMap_.get(gDigest.getEndPoint());
+ /*
+ Here we need to fire a GossipDigestAckMessage. If we have some data associated with this endpoint locally
+ then we follow the "if" path of the logic. If we have absolutely nothing for this endpoint we need to
+ request all the data for this endpoint.
+ */
+ if ( epStatePtr != null )
+ {
+ int localGeneration = epStatePtr.getHeartBeatState().getGeneration();
+ /* get the max version of all keys in the state associated with this endpoint */
+ int maxLocalVersion = getMaxEndPointStateVersion(epStatePtr);
+ if ( remoteGeneration == localGeneration && maxRemoteVersion == maxLocalVersion )
+ continue;
+
+ if ( remoteGeneration > localGeneration )
+ {
+ /* we request everything from the gossiper */
+ requestAll(gDigest, deltaGossipDigestList, remoteGeneration);
+ }
+ if ( remoteGeneration < localGeneration )
+ {
+ /* send all data with generation = localgeneration and version > 0 */
+ sendAll(gDigest, deltaEpStateMap, 0);
+ }
+ if ( remoteGeneration == localGeneration )
+ {
+ /*
+ If the max remote version is greater then we request the remote endpoint send us all the data
+ for this endpoint with version greater than the max version number we have locally for this
+ endpoint.
+ If the max remote version is lesser, then we send all the data we have locally for this endpoint
+ with version greater than the max remote version.
+ */
+ if ( maxRemoteVersion > maxLocalVersion )
+ {
+ deltaGossipDigestList.add( new GossipDigest(gDigest.getEndPoint(), remoteGeneration, maxLocalVersion) );
+ }
+ if ( maxRemoteVersion < maxLocalVersion )
+ {
+ /* send all data with generation = localgeneration and version > maxRemoteVersion */
+ sendAll(gDigest, deltaEpStateMap, maxRemoteVersion);
+ }
+ }
+ }
+ else
+ {
+ /* We are here since we have no data for this endpoint locally so request everthing. */
+ requestAll(gDigest, deltaGossipDigestList, remoteGeneration);
+ }
+ }
+ }
+
+ public void start(EndPoint localEndPoint, int generationNbr) throws IOException
+ {
+ localEndPoint_ = localEndPoint;
+ /* Get the seeds from the config and initialize them. */
+ Set<String> seedHosts = DatabaseDescriptor.getSeeds();
+ for( String seedHost : seedHosts )
+ {
+ EndPoint seed = new EndPoint(seedHost, DatabaseDescriptor.getControlPort());
+ if ( seed.equals(localEndPoint) )
+ continue;
+ seeds_.add(seed);
+ }
+
+ /* initialize the heartbeat state for this localEndPoint */
+ EndPointState localState = endPointStateMap_.get(localEndPoint_);
+ if ( localState == null )
+ {
+ HeartBeatState hbState = new HeartBeatState(generationNbr, 0);
+ localState = new EndPointState(hbState);
+ localState.isAlive(true);
+ localState.isAGossiper(true);
+ endPointStateMap_.put(localEndPoint_, localState);
+ }
+
+ /* starts a timer thread */
+ gossipTimer_.schedule( new GossipTimerTask(), Gossiper.intervalInMillis_, Gossiper.intervalInMillis_);
+ }
+
+ public void shutdown()
+ {
+ /* This prevents this guy from responding to Gossip messages */
+ MessagingService.getMessagingInstance().deregisterVerbHandlers(GOSSIP_DIGEST_SYN_VERB);
+ MessagingService.getMessagingInstance().deregisterVerbHandlers(GOSSIP_DIGEST_ACK_VERB);
+ MessagingService.getMessagingInstance().deregisterVerbHandlers(GOSSIP_DIGEST_ACK2_VERB);
+ /* This prevents this guy from Gossiping */
+ gossipTimer_.cancel();
+ }
+
+ public synchronized void addApplicationState(String key, ApplicationState appState)
+ {
+ EndPointState epState = endPointStateMap_.get(localEndPoint_);
+ if ( epState != null )
+ {
+ epState.addApplicationState(key, appState);
+ }
+ }
+
+ public void stop()
+ {
+ gossipTimer_.cancel();
+ }
+}
+
+class JoinVerbHandler implements IVerbHandler
+{
+ private static Logger logger_ = Logger.getLogger( JoinVerbHandler.class);
+
+ public void doVerb(Message message)
+ {
+ EndPoint from = message.getFrom();
+ logger_.debug("Received a JoinMessage from " + from);
+
+ byte[] bytes = (byte[])message.getMessageBody()[0];
+ DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
+
+ try
+ {
+ JoinMessage joinMessage = JoinMessage.serializer().deserialize(dis);
+ if ( joinMessage.clusterId_.equals( DatabaseDescriptor.getClusterName() ) )
+ {
+ Gossiper.instance().join(from);
+ }
+ }
+ catch ( IOException ex )
+ {
+ logger_.info( LogUtil.throwableToString(ex) );
+ }
+ }
+}
+
+class GossipDigestSynVerbHandler implements IVerbHandler
+{
+ private static Logger logger_ = Logger.getLogger( GossipDigestSynVerbHandler.class);
+
+ public void doVerb(Message message)
+ {
+ EndPoint from = message.getFrom();
+ logger_.info("Received a GossipDigestSynMessage from " + from);
+
+ byte[] bytes = (byte[])message.getMessageBody()[0];
+ DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
+
+ try
+ {
+ GossipDigestSynMessage gDigestMessage = GossipDigestSynMessage.serializer().deserialize(dis);
+ /* If the message is from a different cluster throw it away. */
+ if ( !gDigestMessage.clusterId_.equals(DatabaseDescriptor.getClusterName()) )
+ return;
+
+ List<GossipDigest> gDigestList = gDigestMessage.getGossipDigests();
+ /* Notify the Failure Detector */
+ Gossiper.instance().notifyFailureDetector(gDigestList);
+
+ doSort(gDigestList);
+
+ List<GossipDigest> deltaGossipDigestList = new ArrayList<GossipDigest>();
+ Map<EndPoint, EndPointState> deltaEpStateMap = new HashMap<EndPoint, EndPointState>();
+ Gossiper.instance().examineGossiper(gDigestList, deltaGossipDigestList, deltaEpStateMap);
+
+ GossipDigestAckMessage gDigestAck = new GossipDigestAckMessage(deltaGossipDigestList, deltaEpStateMap);
+ Message gDigestAckMessage = Gossiper.instance().makeGossipDigestAckMessage(gDigestAck);
+ logger_.info("Sending a GossipDigestAckMessage to " + from);
+ MessagingService.getMessagingInstance().sendUdpOneWay(gDigestAckMessage, from);
+ }
+ catch (IOException e)
+ {
+ logger_.info( LogUtil.throwableToString(e) );
+ }
+ }
+
+ /*
+ * First construct a map whose key is the endpoint in the GossipDigest and the value is the
+ * GossipDigest itself. Then build a list of version differences i.e difference between the
+ * version in the GossipDigest and the version in the local state for a given EndPoint.
+ * Sort this list. Now loop through the sorted list and retrieve the GossipDigest corresponding
+ * to the endpoint from the map that was initially constructed.
+ */
+ private void doSort(List<GossipDigest> gDigestList)
+ {
+ /* Construct a map of endpoint to GossipDigest. */
+ Map<EndPoint, GossipDigest> epToDigestMap = new HashMap<EndPoint, GossipDigest>();
+ for ( GossipDigest gDigest : gDigestList )
+ {
+ epToDigestMap.put(gDigest.getEndPoint(), gDigest);
+ }
+
+ /*
+ * These digests have their maxVersion set to the difference of the version
+ * of the local EndPointState and the version found in the GossipDigest.
+ */
+ List<GossipDigest> diffDigests = new ArrayList<GossipDigest>();
+ for ( GossipDigest gDigest : gDigestList )
+ {
+ EndPoint ep = gDigest.getEndPoint();
+ EndPointState epState = Gossiper.instance().getEndPointStateForEndPoint(ep);
+ int version = (epState != null) ? Gossiper.instance().getMaxEndPointStateVersion( epState ) : 0;
+ int diffVersion = Math.abs(version - gDigest.getMaxVersion() );
+ diffDigests.add( new GossipDigest(ep, gDigest.getGeneration(), diffVersion) );
+ }
+
+ gDigestList.clear();
+ Collections.sort(diffDigests);
+ int size = diffDigests.size();
+ /*
+ * Report the digests in descending order. This takes care of the endpoints
+ * that are far behind w.r.t this local endpoint
+ */
+ for ( int i = size - 1; i >= 0; --i )
+ {
+ gDigestList.add( epToDigestMap.get(diffDigests.get(i).getEndPoint()) );
+ }
+ }
+}
+
+class GossipDigestAckVerbHandler implements IVerbHandler
+{
+ private static Logger logger_ = Logger.getLogger(GossipDigestAckVerbHandler.class);
+
+ public void doVerb(Message message)
+ {
+ EndPoint from = message.getFrom();
+ logger_.info("Received a GossipDigestAckMessage from " + from);
+
+ byte[] bytes = (byte[])message.getMessageBody()[0];
+ DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
+
+ try
+ {
+ GossipDigestAckMessage gDigestAckMessage = GossipDigestAckMessage.serializer().deserialize(dis);
+ List<GossipDigest> gDigestList = gDigestAckMessage.getGossipDigestList();
+ Map<EndPoint, EndPointState> epStateMap = gDigestAckMessage.getEndPointStateMap();
+
+ if ( epStateMap.size() > 0 )
+ {
+ /* Notify the Failure Detector */
+ Gossiper.instance().notifyFailureDetector(epStateMap);
+ Gossiper.instance().applyStateLocally(epStateMap);
+ }
+
+ /* Get the state required to send to this gossipee - construct GossipDigestAck2Message */
+ Map<EndPoint, EndPointState> deltaEpStateMap = new HashMap<EndPoint, EndPointState>();
+ for( GossipDigest gDigest : gDigestList )
+ {
+ EndPoint addr = gDigest.getEndPoint();
+ EndPointState localEpStatePtr = Gossiper.instance().getStateForVersionBiggerThan(addr, gDigest.getMaxVersion());
+ if ( localEpStatePtr != null )
+ deltaEpStateMap.put(addr, localEpStatePtr);
+ }
+
+ GossipDigestAck2Message gDigestAck2 = new GossipDigestAck2Message(deltaEpStateMap);
+ Message gDigestAck2Message = Gossiper.instance().makeGossipDigestAck2Message(gDigestAck2);
+ logger_.info("Sending a GossipDigestAck2Message to " + from);
+ MessagingService.getMessagingInstance().sendUdpOneWay(gDigestAck2Message, from);
+ }
+ catch ( IOException e )
+ {
+ logger_.info( LogUtil.throwableToString(e) );
+ }
+ }
+}
+
+class GossipDigestAck2VerbHandler implements IVerbHandler
+{
+ private static Logger logger_ = Logger.getLogger(GossipDigestAck2VerbHandler.class);
+
+ public void doVerb(Message message)
+ {
+ EndPoint from = message.getFrom();
+ logger_.info("Received a GossipDigestAck2Message from " + from);
+
+ byte[] bytes = (byte[])message.getMessageBody()[0];
+ DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
+ try
+ {
+ GossipDigestAck2Message gDigestAck2Message = GossipDigestAck2Message.serializer().deserialize(dis);
+ Map<EndPoint, EndPointState> remoteEpStateMap = gDigestAck2Message.getEndPointStateMap();
+ /* Notify the Failure Detector */
+ Gossiper.instance().notifyFailureDetector(remoteEpStateMap);
+ Gossiper.instance().applyStateLocally(remoteEpStateMap);
+ }
+ catch ( IOException e )
+ {
+ logger_.info( LogUtil.throwableToString(e) );
+ }
+ }
+}
+