You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pm...@apache.org on 2009/03/02 07:12:49 UTC

svn commit: r749205 [11/16] - in /incubator/cassandra/src/org/apache/cassandra: analytics/ cli/ concurrent/ config/ continuations/ cql/ cql/common/ cql/compiler/ cql/compiler/common/ cql/compiler/parse/ cql/compiler/sem/ cql/driver/ cql/execution/ dht/...

Added: incubator/cassandra/src/org/apache/cassandra/dht/Range.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/dht/Range.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/dht/Range.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/dht/Range.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,288 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.dht;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.cassandra.gms.GossipDigest;
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.io.IFileReader;
+import org.apache.cassandra.io.IFileWriter;
+import org.apache.cassandra.net.CompactEndPointSerializationHelper;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.service.StorageService;
+
+
+/**
+ * A representation of the range that a node is responsible for on the DHT ring.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class Range implements Comparable<Range>
+{
+    private static ICompactSerializer<Range> serializer_;
+    static
+    {
+        serializer_ = new RangeSerializer();
+    }
+    
+    public static ICompactSerializer<Range> serializer()
+    {
+        return serializer_;
+    }
+    
+    public static boolean isKeyInRanges(List<Range> ranges, String key)
+    {
+        if(ranges == null ) 
+            return false;
+        
+        for ( Range range : ranges)
+        {
+            if(range.contains(StorageService.hash(key)))
+            {
+                return true ;
+            }
+        }
+        return false;
+    }
+        
+    
+    private BigInteger left_;
+    private BigInteger right_;
+    
+    public Range(BigInteger left, BigInteger right)
+    {
+        left_ = left;
+        right_ = right;
+    }
+    
+    /**
+     * Returns the left endpoint of a range.
+     * @return left endpoint
+     */
+    public BigInteger left()
+    {
+        return left_;
+    }
+    
+    /**
+     * Returns the right endpoint of a range.
+     * @return right endpoint
+     */
+    public BigInteger right()
+    {
+        return right_;
+    }
+    
+    boolean isSplitRequired()
+    {
+        return ( left_.subtract(right_).signum() >= 0 );
+    }
+    
+    public boolean isSplitBy(BigInteger bi)
+    {
+        if ( left_.subtract(right_).signum() > 0 )
+        {
+            /* 
+             * left is greater than right we are wrapping around.
+             * So if the interval is [a,b) where a > b then we have
+             * 3 cases one of which holds for any given token k.
+             * (1) k > a -- return true
+             * (2) k < b -- return true
+             * (3) b < k < a -- return false
+            */
+            if ( bi.subtract(left_).signum() > 0 )
+                return true;
+            else if (right_.subtract(bi).signum() > 0 )
+                return true;
+            else
+                return false;
+        }
+        else if ( left_.subtract(right_).signum() < 0 )
+        {
+            /*
+             * This is the range [a, b) where a < b. 
+            */
+            return ( bi.subtract(left_).signum() > 0 && right_.subtract(bi).signum() > 0 );
+        }        
+        else
+        {
+            // should never be here.
+            return true;
+        }       
+    }
+    
+    /**
+     * Helps determine if a given point on the DHT ring is contained
+     * in the range in question.
+     * @param bi point in question
+     * @return true if the point contains within the range else false.
+     */
+    public boolean contains(BigInteger bi)
+    {
+        if ( left_.subtract(right_).signum() > 0 )
+        {
+            /* 
+             * left is greater than right we are wrapping around.
+             * So if the interval is [a,b) where a > b then we have
+             * 3 cases one of which holds for any given token k.
+             * (1) k > a -- return true
+             * (2) k < b -- return true
+             * (3) b < k < a -- return false
+            */
+            if ( bi.subtract(left_).signum() >= 0 )
+                return true;
+            else if (right_.subtract(bi).signum() > 0 )
+                return true;
+            else
+                return false;
+        }
+        else if ( left_.subtract(right_).signum() < 0 )
+        {
+            /*
+             * This is the range [a, b) where a < b. 
+            */
+            return ( bi.subtract(left_).signum() >= 0 && right_.subtract(bi).signum() >=0 );
+        }        
+        else
+    	{
+    		return true;
+    	}    	
+    }
+    
+    /**
+     * Helps determine if a given range on the DHT ring is contained
+     * within the range associated with the <i>this</i> pointer.
+     * @param rhs rhs in question
+     * @return true if the point contains within the range else false.
+     */
+    public boolean contains(Range rhs)
+    {
+        /* 
+         * If (a, b] and (c, d} are not wrap arounds
+         * then return true if a <= c <= d <= b.
+         */
+        if ( !isWrapAround(this) && !isWrapAround(rhs) )
+        {
+            if ( rhs.left_.subtract(left_).signum() >= 0 && right_.subtract(rhs.right_).signum() >= 0 )
+                return true;
+            else
+                return false;
+        }
+        
+        /*
+         * If lhs is a wrap around and rhs is not then
+         * rhs.left >= lhs.left and rhs.right >= lhs.left.
+         */
+        if ( isWrapAround(this) && !isWrapAround(rhs) )
+        {
+            if ( rhs.left_.subtract(left_).signum() >= 0 && rhs.right_.subtract(right_).signum() >= 0 )
+                return true;
+            else
+                return false;
+        }
+        
+        /* 
+         * If lhs is not a wrap around and rhs is a wrap 
+         * around then we just return false.
+         */
+        if ( !isWrapAround(this) && isWrapAround(rhs) )
+            return false;        
+        
+        if( isWrapAround(this) && isWrapAround(rhs) )
+        {
+            if ( rhs.left_.subtract(left_).signum() >= 0 && right_.subtract(right_).signum() >= 0 )
+                return true;
+            else
+                return false;
+        }
+        
+        /* should never be here */
+        return false;
+    }
+    
+    /**
+     * Tells if the given range is a wrap around.
+     * @param range
+     * @return
+     */
+    private boolean isWrapAround(Range range)
+    {
+        boolean bVal = ( range.left_.subtract(range.right_).signum() > 0 ) ? true : false;
+        return bVal;
+    }
+    
+    public int compareTo(Range rhs)
+    {
+        /* 
+         * If the range represented by the "this" pointer
+         * is a wrap around then it is the smaller one.
+        */
+        if ( isWrapAround(this) )
+            return -1;
+        
+        if ( isWrapAround(rhs) )
+            return 1;
+        
+        return right_.compareTo(rhs.right_);
+    }
+    
+    public boolean equals(Object o)
+    {
+        if ( !(o instanceof Range) )
+            return false;
+        Range rhs = (Range)o;
+        if ( left_.equals(rhs.left_) && right_.equals(rhs.right_) )
+            return true;
+        else
+            return false;
+    }
+    
+    public int hashCode()
+    {
+        return toString().hashCode();
+    }
+    
+    public String toString()
+    {
+        return "(" + left_ + "," + right_ + "]";
+    } 
+}
+
+class RangeSerializer implements ICompactSerializer<Range>
+{
+    public void serialize(Range range, DataOutputStream dos) throws IOException
+    {        
+        dos.writeUTF(range.left().toString());
+        dos.writeUTF(range.right().toString());
+    }
+
+    public Range deserialize(DataInputStream dis) throws IOException
+    {
+        BigInteger left = new BigInteger(dis.readUTF());
+        BigInteger right = new BigInteger(dis.readUTF());        
+        return new Range(left, right);
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/gms/ApplicationState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/gms/ApplicationState.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/gms/ApplicationState.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/gms/ApplicationState.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.io.IFileReader;
+import org.apache.cassandra.io.IFileWriter;
+
+
+/**
+ * This abstraction represents the state associated with a particular node which an
+ * application wants to make available to the rest of the nodes in the cluster. 
+ * Whenever a peice of state needs to be disseminated to the rest of cluster wrap
+ * the state in an instance of <i>ApplicationState</i> and add it to the Gossiper.
+ *  
+ * For eg. if we want to disseminate load information for node A do the following:
+ * 
+ *      ApplicationState loadState = new ApplicationState(<string reprensentation of load>);
+ *      Gossiper.instance().addApplicationState("LOAD STATE", loadState);
+ *  
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class ApplicationState
+{
+    private static ICompactSerializer<ApplicationState> serializer_;
+    static
+    {
+        serializer_ = new ApplicationStateSerializer();
+    }
+    
+    int version_;
+    String state_;
+
+        
+    ApplicationState(String state, int version)
+    {
+        state_ = state;
+        version_ = version;
+    }
+
+    public static ICompactSerializer<ApplicationState> serializer()
+    {
+        return serializer_;
+    }
+    
+    /**
+     * Wraps the specified state into a ApplicationState instance.
+     * @param state string representation of arbitrary state.
+     */
+    public ApplicationState(String state)
+    {
+        state_ = state;
+        version_ = VersionGenerator.getNextVersion();
+    }
+        
+    public String getState()
+    {
+        return state_;
+    }
+    
+    int getStateVersion()
+    {
+        return version_;
+    }
+}
+
+class ApplicationStateSerializer implements ICompactSerializer<ApplicationState>
+{
+    public void serialize(ApplicationState appState, DataOutputStream dos) throws IOException
+    {
+        dos.writeUTF(appState.state_);
+        dos.writeInt(appState.version_);
+    }
+
+    public ApplicationState deserialize(DataInputStream dis) throws IOException
+    {
+        String state = dis.readUTF();
+        int version = dis.readInt();
+        return new ApplicationState(state, version);
+    }
+}
+

Added: incubator/cassandra/src/org/apache/cassandra/gms/EndPointState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/gms/EndPointState.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/gms/EndPointState.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/gms/EndPointState.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.*;
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.io.IFileReader;
+import org.apache.cassandra.io.IFileWriter;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.utils.*;
+
+/**
+ * This abstraction represents both the HeartBeatState and the ApplicationState in an EndPointState
+ * instance. Any state for a given endpoint can be retrieved from this instance.
+ * 
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class EndPointState
+{
+    private static ICompactSerializer<EndPointState> serializer_;
+    static
+    {
+        serializer_ = new EndPointStateSerializer();
+    }
+    
+    HeartBeatState hbState_;
+    Map<String, ApplicationState> applicationState_ = new Hashtable<String, ApplicationState>();
+    
+    /* fields below do not get serialized */
+    long updateTimestamp_;
+    boolean isAlive_;
+    boolean isAGossiper_;
+
+    public static ICompactSerializer<EndPointState> serializer()
+    {
+        return serializer_;
+    }
+    
+    EndPointState(HeartBeatState hbState) 
+    { 
+        hbState_ = hbState; 
+        updateTimestamp_ = System.currentTimeMillis(); 
+        isAlive_ = true; 
+        isAGossiper_ = false;
+    }
+        
+    HeartBeatState getHeartBeatState()
+    {
+        return hbState_;
+    }
+    
+    synchronized void setHeartBeatState(HeartBeatState hbState)
+    {
+        updateTimestamp();
+        hbState_ = hbState;
+    }
+    
+    public ApplicationState getApplicationState(String key)
+    {
+        return applicationState_.get(key);
+    }
+    
+    public Map<String, ApplicationState> getApplicationState()
+    {
+        return applicationState_;
+    }
+    
+    void addApplicationState(String key, ApplicationState appState)
+    {        
+        applicationState_.put(key, appState);        
+    }
+
+    /* getters and setters */
+    long getUpdateTimestamp()
+    {
+        return updateTimestamp_;
+    }
+    
+    synchronized void updateTimestamp()
+    {
+        updateTimestamp_ = System.currentTimeMillis();
+    }
+    
+    public boolean isAlive()
+    {        
+        return isAlive_;
+    }
+
+    synchronized void isAlive(boolean value)
+    {        
+        isAlive_ = value;        
+    }
+
+    
+    boolean isAGossiper()
+    {        
+        return isAGossiper_;
+    }
+
+    synchronized void isAGossiper(boolean value)
+    {                
+        //isAlive_ = false;
+        isAGossiper_ = value;        
+    }
+}
+
+class EndPointStateSerializer implements ICompactSerializer<EndPointState>
+{
+    private static Logger logger_ = Logger.getLogger(EndPointStateSerializer.class);
+    
+    public void serialize(EndPointState epState, DataOutputStream dos) throws IOException
+    {
+        /* These are for estimating whether we overshoot the MTU limit */
+        int estimate = 0;
+
+        /* serialize the HeartBeatState */
+        HeartBeatState hbState = epState.getHeartBeatState();
+        HeartBeatState.serializer().serialize(hbState, dos);
+
+        /* serialize the map of ApplicationState objects */
+        int size = epState.applicationState_.size();
+        dos.writeInt(size);
+        if ( size > 0 )
+        {   
+            Set<String> keys = epState.applicationState_.keySet();
+            for( String key : keys )
+            {
+                if ( Gossiper.MAX_GOSSIP_PACKET_SIZE - dos.size() < estimate )
+                {
+                    logger_.info("@@@@ Breaking out to respect the MTU size in EndPointState serializer. Estimate is " + estimate + " @@@@");
+                    break;
+                }
+            
+                ApplicationState appState = epState.applicationState_.get(key);
+                if ( appState != null )
+                {
+                    int pre = dos.size();
+                    dos.writeUTF(key);
+                    ApplicationState.serializer().serialize(appState, dos);                    
+                    int post = dos.size();
+                    estimate = post - pre;
+                }                
+            }
+        }
+    }
+
+    public EndPointState deserialize(DataInputStream dis) throws IOException
+    {
+        HeartBeatState hbState = HeartBeatState.serializer().deserialize(dis);
+        EndPointState epState = new EndPointState(hbState);               
+
+        int appStateSize = dis.readInt();
+        for ( int i = 0; i < appStateSize; ++i )
+        {
+            if ( dis.available() == 0 )
+            {
+                break;
+            }
+            
+            String key = dis.readUTF();    
+            ApplicationState appState = ApplicationState.serializer().deserialize(dis);            
+            epState.addApplicationState(key, appState);            
+        }
+        return epState;
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/gms/FailureDetector.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/gms/FailureDetector.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/gms/FailureDetector.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/gms/FailureDetector.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,343 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import java.io.FileOutputStream;
+import java.lang.management.ManagementFactory;
+import java.net.UnknownHostException;
+import java.util.*;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.utils.*;
+
+/**
+ * This FailureDetector is an implementation of the paper titled
+ * "The Phi Accrual Failure Detector" by Hayashibara. 
+ * Check the paper and the <i>IFailureDetector</i> interface for details.
+ * 
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+public class FailureDetector implements IFailureDetector, FailureDetectorMBean
+{
+    private static Logger logger_ = Logger.getLogger(FailureDetector.class);
+    private static final int sampleSize_ = 1000;
+    private static final int phiSuspectThreshold_ = 5;
+    private static final int phiConvictThreshold_ = 8;
+    /* The Failure Detector has to have been up for atleast 1 min. */
+    private static final long uptimeThreshold_ = 60000;
+    private static IFailureDetector failureDetector_;
+    /* Used to lock the factory for creation of FailureDetector instance */
+    private static Lock createLock_ = new ReentrantLock();
+    /* The time when the module was instantiated. */
+    private static long creationTime_;
+    
+    public static IFailureDetector instance()
+    {        
+        if ( failureDetector_ == null )
+        {
+            FailureDetector.createLock_.lock();
+            try
+            {
+                if ( failureDetector_ == null )
+                {
+                    failureDetector_ = new FailureDetector();
+                }
+            }
+            finally
+            {
+                createLock_.unlock();
+            }
+        }        
+        return failureDetector_;
+    }
+    
+    private Map<EndPoint, ArrivalWindow> arrivalSamples_ = new Hashtable<EndPoint, ArrivalWindow>();
+    private List<IFailureDetectionEventListener> fdEvntListeners_ = new ArrayList<IFailureDetectionEventListener>();
+    
+    public FailureDetector()
+    {
+        creationTime_ = System.currentTimeMillis();
+        // Register this instance with JMX
+        try
+        {
+            MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+            mbs.registerMBean(this, new ObjectName("com.facebook.infrastructure.gms:type=FailureDetector"));
+        }
+        catch (Exception e)
+        {
+            logger_.error(LogUtil.throwableToString(e));
+        }
+    }
+    
+    /**
+     * Dump the inter arrival times for examination if necessary.
+     */
+    public void dumpInterArrivalTimes()
+    {
+        try
+        {
+            FileOutputStream fos = new FileOutputStream("/var/tmp/output-" + System.currentTimeMillis() + ".dat", true);
+            fos.write(toString().getBytes());
+            fos.close();
+        }
+        catch(Throwable th)
+        {
+            logger_.warn(LogUtil.throwableToString(th));
+        }
+    }
+    
+    /**
+     * We dump the arrival window for any endpoint only if the 
+     * local Failure Detector module has been up for more than a 
+     * minute.
+     * 
+     * @param ep for which the arrival window needs to be dumped.
+     */
+    private void dumpInterArrivalTimes(EndPoint ep)
+    {
+        long now = System.currentTimeMillis();
+        if ( (now - FailureDetector.creationTime_) <= FailureDetector.uptimeThreshold_ )
+            return;
+        try
+        {
+            FileOutputStream fos = new FileOutputStream("/var/tmp/output-" + System.currentTimeMillis() + "-" + ep + ".dat", true);
+            ArrivalWindow hWnd = arrivalSamples_.get(ep);
+            fos.write(hWnd.toString().getBytes());
+            fos.close();
+        }
+        catch(Throwable th)
+        {
+            logger_.warn(LogUtil.throwableToString(th));
+        }
+    }
+    
+    public boolean isAlive(EndPoint ep)
+    {
+        try
+        {
+            /* If the endpoint in question is the local endpoint return true. */
+            String localHost = FBUtilities.getLocalHostName();
+            if ( localHost.equals( ep.getHost() ) )
+                    return true;
+        }
+        catch( UnknownHostException ex )
+        {
+            logger_.info( LogUtil.throwableToString(ex) );
+        }
+    	/* Incoming port is assumed to be the Storage port. We need to change it to the control port */
+    	EndPoint ep2 = new EndPoint(ep.getHost(), DatabaseDescriptor.getControlPort());        
+        EndPointState epState = Gossiper.instance().getEndPointStateForEndPoint(ep2);
+        return epState.isAlive();
+    }
+    
+    public void report(EndPoint ep)
+    {
+        long now = System.currentTimeMillis();
+        ArrivalWindow hbWnd = arrivalSamples_.get(ep);
+        if ( hbWnd == null )
+        {
+            hbWnd = new ArrivalWindow(sampleSize_);
+            arrivalSamples_.put(ep, hbWnd);
+        }
+        hbWnd.add(now);  
+    }
+    
+    public void intepret(EndPoint ep)
+    {
+        ArrivalWindow hbWnd = arrivalSamples_.get(ep);
+        if ( hbWnd == null )
+        {            
+            return;
+        }
+        long now = System.currentTimeMillis();
+        /* We need this so that we do not suspect a convict. */
+        boolean isConvicted = false;
+        double phi = hbWnd.phi(now);
+        logger_.info("PHI for " + ep + " : " + phi);
+        
+        /*
+        if ( phi > phiConvictThreshold_ )
+        {            
+            isConvicted = true;     
+            for ( IFailureDetectionEventListener listener : fdEvntListeners_ )
+            {
+                listener.convict(ep);                
+            }
+        }
+        */
+        if ( !isConvicted && phi > phiSuspectThreshold_ )
+        {     
+            for ( IFailureDetectionEventListener listener : fdEvntListeners_ )
+            {
+                listener.suspect(ep);
+            }
+        }        
+    }
+    
+    public void registerFailureDetectionEventListener(IFailureDetectionEventListener listener)
+    {
+        fdEvntListeners_.add(listener);
+    }
+    
+    public void unregisterFailureDetectionEventListener(IFailureDetectionEventListener listener)
+    {
+        fdEvntListeners_.remove(listener);
+    }
+    
+    public String toString()
+    {
+        StringBuilder sb = new StringBuilder();
+        Set<EndPoint> eps = arrivalSamples_.keySet();
+        
+        sb.append("-----------------------------------------------------------------------");
+        for ( EndPoint ep : eps )
+        {
+            ArrivalWindow hWnd = arrivalSamples_.get(ep);
+            sb.append(ep + " : ");
+            sb.append(hWnd.toString());
+            sb.append( System.getProperty("line.separator") );
+        }
+        sb.append("-----------------------------------------------------------------------");
+        return sb.toString();
+    }
+    
+    public static void main(String[] args) throws Throwable
+    {           
+    }
+}
+
+class ArrivalWindow
+{
+    private static Logger logger_ = Logger.getLogger(ArrivalWindow.class);
+    private double tLast_ = 0L;
+    private List<Double> arrivalIntervals_;
+    private int size_;
+    
+    ArrivalWindow(int size)
+    {
+        size_ = size;
+        arrivalIntervals_ = new ArrayList<Double>(size);        
+    }
+    
+    synchronized void add(double value)
+    {
+        if ( arrivalIntervals_.size() == size_ )
+        {                          
+            arrivalIntervals_.remove(0);            
+        }
+        
+        double interArrivalTime = 0;
+        if ( tLast_ > 0L )
+        {                        
+            interArrivalTime = (value - tLast_);            
+        }   
+        tLast_ = value;            
+        arrivalIntervals_.add(interArrivalTime);        
+    }
+    
+    synchronized double sum()
+    {
+        double sum = 0d;
+        int size = arrivalIntervals_.size();
+        for( int i = 0; i < size; ++i )
+        {
+            sum += arrivalIntervals_.get(i);
+        }
+        return sum;
+    }
+    
+    synchronized double sumOfDeviations()
+    {
+        double sumOfDeviations = 0d;
+        double mean = mean();
+        int size = arrivalIntervals_.size();
+        
+        for( int i = 0; i < size; ++i )
+        {
+            sumOfDeviations += (arrivalIntervals_.get(i) - mean)*(arrivalIntervals_.get(i) - mean);
+        }
+        return sumOfDeviations;
+    }
+    
+    synchronized double mean()
+    {
+        return sum()/arrivalIntervals_.size();
+    }
+    
+    synchronized double variance()
+    {                
+        return sumOfDeviations() / (arrivalIntervals_.size());        
+    }
+    
+    double deviation()
+    {        
+        return Math.sqrt(variance());
+    }
+    
+    void clear()
+    {
+        arrivalIntervals_.clear();
+    }
+    
+    double p(double t)
+    {
+        // Stat stat = new Stat();
+        double mean = mean();        
+        double deviation = deviation();   
+        /* Exponential CDF = 1 -e^-lambda*x */
+        double exponent = (-1)*(t)/mean;
+        return 1 - ( 1 - Math.pow(Math.E, exponent) );
+        // return stat.gaussianCDF(mean, deviation, t, Double.POSITIVE_INFINITY);             
+    }
+    
+    double phi(long tnow)
+    {            
+        int size = arrivalIntervals_.size();
+        double log = 0d;
+        if ( size > 0 )
+        {
+            double t = tnow - tLast_;                
+            double probability = p(t);       
+            log = (-1) * Math.log10( probability );                                 
+        }
+        return log;           
+    } 
+    
+    public String toString()
+    {
+        StringBuilder sb = new StringBuilder();    
+        List<Double> arrivalIntervals = new ArrayList<Double>(arrivalIntervals_);
+        int size = arrivalIntervals.size();
+        for ( int i = 0; i < size; ++i )
+        {
+            sb.append(arrivalIntervals.get(i));
+            sb.append(" ");    
+        }
+        return sb.toString();
+    }
+}
+

Added: incubator/cassandra/src/org/apache/cassandra/gms/FailureDetectorMBean.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/gms/FailureDetectorMBean.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/gms/FailureDetectorMBean.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/gms/FailureDetectorMBean.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+public interface FailureDetectorMBean
+{
+    public void dumpInterArrivalTimes();
+}

Added: incubator/cassandra/src/org/apache/cassandra/gms/GossipDigest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/gms/GossipDigest.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/gms/GossipDigest.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/gms/GossipDigest.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.CompactEndPointSerializationHelper;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.*;
+
+/**
+ * Contains information about a specified list of EndPoints and the largest version 
+ * of the state they have generated as known by the local endpoint.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class GossipDigest implements Comparable<GossipDigest>
+{
+    private static ICompactSerializer<GossipDigest> serializer_;
+    static
+    {
+        serializer_ = new GossipDigestSerializer();
+    }
+    
+    EndPoint endPoint_;
+    int generation_;
+    int maxVersion_;
+
+    public static ICompactSerializer<GossipDigest> serializer()
+    {
+        return serializer_;
+    }
+    
+    GossipDigest(EndPoint endPoint, int generation, int maxVersion)
+    {
+        endPoint_ = endPoint;
+        generation_ = generation; 
+        maxVersion_ = maxVersion;
+    }
+    
+    EndPoint getEndPoint()
+    {
+        return endPoint_;
+    }
+    
+    int getGeneration()
+    {
+        return generation_;
+    }
+    
+    int getMaxVersion()
+    {
+        return maxVersion_;
+    }
+    
+    public int compareTo(GossipDigest gDigest)
+    {
+        if ( generation_ != gDigest.generation_ )
+            return ( generation_ - gDigest.generation_ );
+        return (maxVersion_ - gDigest.maxVersion_);
+    }
+    
+    public String toString()
+    {
+        StringBuilder sb = new StringBuilder();
+        sb.append(endPoint_);
+        sb.append(":");
+        sb.append(generation_);
+        sb.append(":");
+        sb.append(maxVersion_);
+        return sb.toString();
+    }
+}
+
+class GossipDigestSerializer implements ICompactSerializer<GossipDigest>
+{       
+    public void serialize(GossipDigest gDigest, DataOutputStream dos) throws IOException
+    {        
+        CompactEndPointSerializationHelper.serialize(gDigest.endPoint_, dos);
+        dos.writeInt(gDigest.generation_);
+        dos.writeInt(gDigest.maxVersion_);
+    }
+
+    public GossipDigest deserialize(DataInputStream dis) throws IOException
+    {
+        EndPoint endPoint = CompactEndPointSerializationHelper.deserialize(dis);
+        int generation = dis.readInt();
+        int version = dis.readInt();
+        return new GossipDigest(endPoint, generation, version);
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/gms/GossipDigestAck2Message.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/gms/GossipDigestAck2Message.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/gms/GossipDigestAck2Message.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/gms/GossipDigestAck2Message.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.*;
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.*;
+
+
+/**
+ * This message gets sent out as a result of the receipt of a GossipDigestAckMessage. This the 
+ * last stage of the 3 way messaging of the Gossip protocol.
+ *  
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class GossipDigestAck2Message
+{
+    private static  ICompactSerializer<GossipDigestAck2Message> serializer_;
+    static
+    {
+        serializer_ = new GossipDigestAck2MessageSerializer();
+    }
+    
+    Map<EndPoint, EndPointState> epStateMap_ = new HashMap<EndPoint, EndPointState>();
+
+    public static ICompactSerializer<GossipDigestAck2Message> serializer()
+    {
+        return serializer_;
+    }
+    
+    GossipDigestAck2Message(Map<EndPoint, EndPointState> epStateMap)
+    {
+        epStateMap_ = epStateMap;
+    }
+        
+    Map<EndPoint, EndPointState> getEndPointStateMap()
+    {
+         return epStateMap_;
+    }
+}
+
+class GossipDigestAck2MessageSerializer implements ICompactSerializer<GossipDigestAck2Message>
+{
+    public void serialize(GossipDigestAck2Message gDigestAck2Message, DataOutputStream dos) throws IOException
+    {
+        /* Use the EndPointState */
+        EndPointStatesSerializationHelper.serialize(gDigestAck2Message.epStateMap_, dos);
+    }
+
+    public GossipDigestAck2Message deserialize(DataInputStream dis) throws IOException
+    {
+        Map<EndPoint, EndPointState> epStateMap = EndPointStatesSerializationHelper.deserialize(dis);
+        return new GossipDigestAck2Message(epStateMap);        
+    }
+}
+

Added: incubator/cassandra/src/org/apache/cassandra/gms/GossipDigestAckMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/gms/GossipDigestAckMessage.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/gms/GossipDigestAckMessage.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/gms/GossipDigestAckMessage.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.EndPoint;
+
+
+
+/**
+ * This message gets sent out as a result of the receipt of a GossipDigestSynMessage by an
+ * endpoint. This is the 2 stage of the 3 way messaging in the Gossip protocol.
+ * 
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class GossipDigestAckMessage
+{
+    private static ICompactSerializer<GossipDigestAckMessage> serializer_;
+    static
+    {
+        serializer_ = new GossipDigestAckMessageSerializer();
+    }
+    
+    List<GossipDigest> gDigestList_ = new ArrayList<GossipDigest>();
+    Map<EndPoint, EndPointState> epStateMap_ = new HashMap<EndPoint, EndPointState>();
+    
+    static ICompactSerializer<GossipDigestAckMessage> serializer()
+    {
+        return serializer_;
+    }
+    
+    GossipDigestAckMessage(List<GossipDigest> gDigestList, Map<EndPoint, EndPointState> epStateMap)
+    {
+        gDigestList_ = gDigestList;
+        epStateMap_ = epStateMap;
+    }
+    
+    void addGossipDigest(EndPoint ep, int generation, int version)
+    {
+        gDigestList_.add( new GossipDigest(ep, generation, version) );
+    }
+    
+    List<GossipDigest> getGossipDigestList()
+    {
+        return gDigestList_;
+    }
+    
+    Map<EndPoint, EndPointState> getEndPointStateMap()
+    {
+        return epStateMap_;
+    }
+}
+
+class GossipDigestAckMessageSerializer implements ICompactSerializer<GossipDigestAckMessage>
+{
+    public void serialize(GossipDigestAckMessage gDigestAckMessage, DataOutputStream dos) throws IOException
+    {
+        /* Use the helper to serialize the GossipDigestList */
+        boolean bContinue = GossipDigestSerializationHelper.serialize(gDigestAckMessage.gDigestList_, dos);
+        dos.writeBoolean(bContinue);
+        /* Use the EndPointState */
+        if ( bContinue )
+        {
+            EndPointStatesSerializationHelper.serialize(gDigestAckMessage.epStateMap_, dos);            
+        }
+    }
+
+    public GossipDigestAckMessage deserialize(DataInputStream dis) throws IOException
+    {
+        Map<EndPoint, EndPointState> epStateMap = new HashMap<EndPoint, EndPointState>();
+        List<GossipDigest> gDigestList = GossipDigestSerializationHelper.deserialize(dis);                
+        boolean bContinue = dis.readBoolean();
+
+        if ( bContinue )
+        {
+            epStateMap = EndPointStatesSerializationHelper.deserialize(dis);                                    
+        }
+        return new GossipDigestAckMessage(gDigestList, epStateMap);
+    }
+}
\ No newline at end of file

Added: incubator/cassandra/src/org/apache/cassandra/gms/GossipDigestSynMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/gms/GossipDigestSynMessage.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/gms/GossipDigestSynMessage.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/gms/GossipDigestSynMessage.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.CompactEndPointSerializationHelper;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.utils.Log4jLogger;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.utils.*;
+
+
+/**
+ * This is the first message that gets sent out as a start of the Gossip protocol in a 
+ * round. 
+ * 
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class GossipDigestSynMessage
+{
+    private static ICompactSerializer<GossipDigestSynMessage> serializer_;
+    static
+    {
+        serializer_ = new GossipDigestSynMessageSerializer();
+    }
+    
+    String clusterId_;
+    List<GossipDigest> gDigests_ = new ArrayList<GossipDigest>();
+
+    public static ICompactSerializer<GossipDigestSynMessage> serializer()
+    {
+        return serializer_;
+    }
+ 
+    public GossipDigestSynMessage(String clusterId, List<GossipDigest> gDigests)
+    {      
+        clusterId_ = clusterId;
+        gDigests_ = gDigests;
+    }
+    
+    List<GossipDigest> getGossipDigests()
+    {
+        return gDigests_;
+    }
+}
+
+class GossipDigestSerializationHelper
+{
+    private static Logger logger_ = Logger.getLogger(GossipDigestSerializationHelper.class);
+    
+    static boolean serialize(List<GossipDigest> gDigestList, DataOutputStream dos) throws IOException
+    {
+        boolean bVal = true;
+        int size = gDigestList.size();                        
+        dos.writeInt(size);
+        
+        int estimate = 0;            
+        for ( GossipDigest gDigest : gDigestList )
+        {
+            if ( Gossiper.MAX_GOSSIP_PACKET_SIZE - dos.size() < estimate )
+            {
+                logger_.info("@@@@ Breaking out to respect the MTU size in GD @@@@");
+                bVal = false;
+                break;
+            }
+            int pre = dos.size();               
+            GossipDigest.serializer().serialize( gDigest, dos );
+            int post = dos.size();
+            estimate = post - pre;
+        }
+        return bVal;
+    }
+
+    static List<GossipDigest> deserialize(DataInputStream dis) throws IOException
+    {
+        int size = dis.readInt();            
+        List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
+        
+        for ( int i = 0; i < size; ++i )
+        {
+            if ( dis.available() == 0 )
+            {
+                logger_.info("Remaining bytes zero. Stopping deserialization of GossipDigests.");
+                break;
+            }
+                            
+            GossipDigest gDigest = GossipDigest.serializer().deserialize(dis);                
+            gDigests.add( gDigest );                
+        }        
+        return gDigests;
+    }
+}
+
+class EndPointStatesSerializationHelper
+{
+    private static Log4jLogger logger_ = new Log4jLogger(EndPointStatesSerializationHelper.class.getName());
+    
+    static boolean serialize(Map<EndPoint, EndPointState> epStateMap, DataOutputStream dos) throws IOException
+    {
+        boolean bVal = true;
+        int estimate = 0;                
+        int size = epStateMap.size();
+        dos.writeInt(size);
+    
+        Set<EndPoint> eps = epStateMap.keySet();
+        for( EndPoint ep : eps )
+        {
+            if ( Gossiper.MAX_GOSSIP_PACKET_SIZE - dos.size() < estimate )
+            {
+                logger_.info("@@@@ Breaking out to respect the MTU size in EPS. Estimate is " + estimate + " @@@@");
+                bVal = false;
+                break;
+            }
+    
+            int pre = dos.size();
+            CompactEndPointSerializationHelper.serialize(ep, dos);
+            EndPointState epState = epStateMap.get(ep);            
+            EndPointState.serializer().serialize(epState, dos);
+            int post = dos.size();
+            estimate = post - pre;
+        }
+        return bVal;
+    }
+
+    static Map<EndPoint, EndPointState> deserialize(DataInputStream dis) throws IOException
+    {
+        int size = dis.readInt();            
+        Map<EndPoint, EndPointState> epStateMap = new HashMap<EndPoint, EndPointState>();
+        
+        for ( int i = 0; i < size; ++i )
+        {
+            if ( dis.available() == 0 )
+            {
+                logger_.info("Remaining bytes zero. Stopping deserialization in EndPointState.");
+                break;
+            }
+            // int length = dis.readInt();            
+            EndPoint ep = CompactEndPointSerializationHelper.deserialize(dis);
+            EndPointState epState = EndPointState.serializer().deserialize(dis);            
+            epStateMap.put(ep, epState);
+        }        
+        return epStateMap;
+    }
+}
+
+class GossipDigestSynMessageSerializer implements ICompactSerializer<GossipDigestSynMessage>
+{   
+    public void serialize(GossipDigestSynMessage gDigestSynMessage, DataOutputStream dos) throws IOException
+    {    
+        dos.writeUTF(gDigestSynMessage.clusterId_);
+        GossipDigestSerializationHelper.serialize(gDigestSynMessage.gDigests_, dos);
+    }
+
+    public GossipDigestSynMessage deserialize(DataInputStream dis) throws IOException
+    {
+        String clusterId = dis.readUTF();
+        List<GossipDigest> gDigests = GossipDigestSerializationHelper.deserialize(dis);
+        return new GossipDigestSynMessage(clusterId, gDigests);
+    }
+
+}
+