You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pm...@apache.org on 2009/03/02 07:12:49 UTC
svn commit: r749205 [11/16] - in
/incubator/cassandra/src/org/apache/cassandra: analytics/ cli/ concurrent/
config/ continuations/ cql/ cql/common/ cql/compiler/ cql/compiler/common/
cql/compiler/parse/ cql/compiler/sem/ cql/driver/ cql/execution/ dht/...
Added: incubator/cassandra/src/org/apache/cassandra/dht/Range.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/dht/Range.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/dht/Range.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/dht/Range.java Mon Mar 2 06:12:46 2009
@@ -0,0 +1,288 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.dht;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.cassandra.gms.GossipDigest;
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.io.IFileReader;
+import org.apache.cassandra.io.IFileWriter;
+import org.apache.cassandra.net.CompactEndPointSerializationHelper;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.service.StorageService;
+
+
+/**
+ * A representation of the range that a node is responsible for on the DHT ring.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class Range implements Comparable<Range>
+{
+ private static ICompactSerializer<Range> serializer_;
+ static
+ {
+ serializer_ = new RangeSerializer();
+ }
+
+ public static ICompactSerializer<Range> serializer()
+ {
+ return serializer_;
+ }
+
+ public static boolean isKeyInRanges(List<Range> ranges, String key)
+ {
+ if(ranges == null )
+ return false;
+
+ for ( Range range : ranges)
+ {
+ if(range.contains(StorageService.hash(key)))
+ {
+ return true ;
+ }
+ }
+ return false;
+ }
+
+
+ private BigInteger left_;
+ private BigInteger right_;
+
+ public Range(BigInteger left, BigInteger right)
+ {
+ left_ = left;
+ right_ = right;
+ }
+
+ /**
+ * Returns the left endpoint of a range.
+ * @return left endpoint
+ */
+ public BigInteger left()
+ {
+ return left_;
+ }
+
+ /**
+ * Returns the right endpoint of a range.
+ * @return right endpoint
+ */
+ public BigInteger right()
+ {
+ return right_;
+ }
+
+ boolean isSplitRequired()
+ {
+ return ( left_.subtract(right_).signum() >= 0 );
+ }
+
+ public boolean isSplitBy(BigInteger bi)
+ {
+ if ( left_.subtract(right_).signum() > 0 )
+ {
+ /*
+ * left is greater than right we are wrapping around.
+ * So if the interval is [a,b) where a > b then we have
+ * 3 cases one of which holds for any given token k.
+ * (1) k > a -- return true
+ * (2) k < b -- return true
+ * (3) b < k < a -- return false
+ */
+ if ( bi.subtract(left_).signum() > 0 )
+ return true;
+ else if (right_.subtract(bi).signum() > 0 )
+ return true;
+ else
+ return false;
+ }
+ else if ( left_.subtract(right_).signum() < 0 )
+ {
+ /*
+ * This is the range [a, b) where a < b.
+ */
+ return ( bi.subtract(left_).signum() > 0 && right_.subtract(bi).signum() > 0 );
+ }
+ else
+ {
+ // should never be here.
+ return true;
+ }
+ }
+
+ /**
+ * Helps determine if a given point on the DHT ring is contained
+ * in the range in question.
+ * @param bi point in question
+ * @return true if the point contains within the range else false.
+ */
+ public boolean contains(BigInteger bi)
+ {
+ if ( left_.subtract(right_).signum() > 0 )
+ {
+ /*
+ * left is greater than right we are wrapping around.
+ * So if the interval is [a,b) where a > b then we have
+ * 3 cases one of which holds for any given token k.
+ * (1) k > a -- return true
+ * (2) k < b -- return true
+ * (3) b < k < a -- return false
+ */
+ if ( bi.subtract(left_).signum() >= 0 )
+ return true;
+ else if (right_.subtract(bi).signum() > 0 )
+ return true;
+ else
+ return false;
+ }
+ else if ( left_.subtract(right_).signum() < 0 )
+ {
+ /*
+ * This is the range [a, b) where a < b.
+ */
+ return ( bi.subtract(left_).signum() >= 0 && right_.subtract(bi).signum() >=0 );
+ }
+ else
+ {
+ return true;
+ }
+ }
+
+ /**
+ * Helps determine if a given range on the DHT ring is contained
+ * within the range associated with the <i>this</i> pointer.
+ * @param rhs rhs in question
+ * @return true if the point contains within the range else false.
+ */
+ public boolean contains(Range rhs)
+ {
+ /*
+ * If (a, b] and (c, d} are not wrap arounds
+ * then return true if a <= c <= d <= b.
+ */
+ if ( !isWrapAround(this) && !isWrapAround(rhs) )
+ {
+ if ( rhs.left_.subtract(left_).signum() >= 0 && right_.subtract(rhs.right_).signum() >= 0 )
+ return true;
+ else
+ return false;
+ }
+
+ /*
+ * If lhs is a wrap around and rhs is not then
+ * rhs.left >= lhs.left and rhs.right >= lhs.left.
+ */
+ if ( isWrapAround(this) && !isWrapAround(rhs) )
+ {
+ if ( rhs.left_.subtract(left_).signum() >= 0 && rhs.right_.subtract(right_).signum() >= 0 )
+ return true;
+ else
+ return false;
+ }
+
+ /*
+ * If lhs is not a wrap around and rhs is a wrap
+ * around then we just return false.
+ */
+ if ( !isWrapAround(this) && isWrapAround(rhs) )
+ return false;
+
+ if( isWrapAround(this) && isWrapAround(rhs) )
+ {
+ if ( rhs.left_.subtract(left_).signum() >= 0 && right_.subtract(right_).signum() >= 0 )
+ return true;
+ else
+ return false;
+ }
+
+ /* should never be here */
+ return false;
+ }
+
+ /**
+ * Tells if the given range is a wrap around.
+ * @param range
+ * @return
+ */
+ private boolean isWrapAround(Range range)
+ {
+ boolean bVal = ( range.left_.subtract(range.right_).signum() > 0 ) ? true : false;
+ return bVal;
+ }
+
+ public int compareTo(Range rhs)
+ {
+ /*
+ * If the range represented by the "this" pointer
+ * is a wrap around then it is the smaller one.
+ */
+ if ( isWrapAround(this) )
+ return -1;
+
+ if ( isWrapAround(rhs) )
+ return 1;
+
+ return right_.compareTo(rhs.right_);
+ }
+
+ public boolean equals(Object o)
+ {
+ if ( !(o instanceof Range) )
+ return false;
+ Range rhs = (Range)o;
+ if ( left_.equals(rhs.left_) && right_.equals(rhs.right_) )
+ return true;
+ else
+ return false;
+ }
+
+ public int hashCode()
+ {
+ return toString().hashCode();
+ }
+
+ public String toString()
+ {
+ return "(" + left_ + "," + right_ + "]";
+ }
+}
+
+class RangeSerializer implements ICompactSerializer<Range>
+{
+ public void serialize(Range range, DataOutputStream dos) throws IOException
+ {
+ dos.writeUTF(range.left().toString());
+ dos.writeUTF(range.right().toString());
+ }
+
+ public Range deserialize(DataInputStream dis) throws IOException
+ {
+ BigInteger left = new BigInteger(dis.readUTF());
+ BigInteger right = new BigInteger(dis.readUTF());
+ return new Range(left, right);
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/gms/ApplicationState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/gms/ApplicationState.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/gms/ApplicationState.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/gms/ApplicationState.java Mon Mar 2 06:12:46 2009
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.io.IFileReader;
+import org.apache.cassandra.io.IFileWriter;
+
+
+/**
+ * This abstraction represents the state associated with a particular node which an
+ * application wants to make available to the rest of the nodes in the cluster.
+ * Whenever a peice of state needs to be disseminated to the rest of cluster wrap
+ * the state in an instance of <i>ApplicationState</i> and add it to the Gossiper.
+ *
+ * For eg. if we want to disseminate load information for node A do the following:
+ *
+ * ApplicationState loadState = new ApplicationState(<string reprensentation of load>);
+ * Gossiper.instance().addApplicationState("LOAD STATE", loadState);
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class ApplicationState
+{
+ private static ICompactSerializer<ApplicationState> serializer_;
+ static
+ {
+ serializer_ = new ApplicationStateSerializer();
+ }
+
+ int version_;
+ String state_;
+
+
+ ApplicationState(String state, int version)
+ {
+ state_ = state;
+ version_ = version;
+ }
+
+ public static ICompactSerializer<ApplicationState> serializer()
+ {
+ return serializer_;
+ }
+
+ /**
+ * Wraps the specified state into a ApplicationState instance.
+ * @param state string representation of arbitrary state.
+ */
+ public ApplicationState(String state)
+ {
+ state_ = state;
+ version_ = VersionGenerator.getNextVersion();
+ }
+
+ public String getState()
+ {
+ return state_;
+ }
+
+ int getStateVersion()
+ {
+ return version_;
+ }
+}
+
+class ApplicationStateSerializer implements ICompactSerializer<ApplicationState>
+{
+ public void serialize(ApplicationState appState, DataOutputStream dos) throws IOException
+ {
+ dos.writeUTF(appState.state_);
+ dos.writeInt(appState.version_);
+ }
+
+ public ApplicationState deserialize(DataInputStream dis) throws IOException
+ {
+ String state = dis.readUTF();
+ int version = dis.readInt();
+ return new ApplicationState(state, version);
+ }
+}
+
Added: incubator/cassandra/src/org/apache/cassandra/gms/EndPointState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/gms/EndPointState.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/gms/EndPointState.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/gms/EndPointState.java Mon Mar 2 06:12:46 2009
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.*;
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.io.IFileReader;
+import org.apache.cassandra.io.IFileWriter;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.utils.*;
+
+/**
+ * This abstraction represents both the HeartBeatState and the ApplicationState in an EndPointState
+ * instance. Any state for a given endpoint can be retrieved from this instance.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class EndPointState
+{
+ private static ICompactSerializer<EndPointState> serializer_;
+ static
+ {
+ serializer_ = new EndPointStateSerializer();
+ }
+
+ HeartBeatState hbState_;
+ Map<String, ApplicationState> applicationState_ = new Hashtable<String, ApplicationState>();
+
+ /* fields below do not get serialized */
+ long updateTimestamp_;
+ boolean isAlive_;
+ boolean isAGossiper_;
+
+ public static ICompactSerializer<EndPointState> serializer()
+ {
+ return serializer_;
+ }
+
+ EndPointState(HeartBeatState hbState)
+ {
+ hbState_ = hbState;
+ updateTimestamp_ = System.currentTimeMillis();
+ isAlive_ = true;
+ isAGossiper_ = false;
+ }
+
+ HeartBeatState getHeartBeatState()
+ {
+ return hbState_;
+ }
+
+ synchronized void setHeartBeatState(HeartBeatState hbState)
+ {
+ updateTimestamp();
+ hbState_ = hbState;
+ }
+
+ public ApplicationState getApplicationState(String key)
+ {
+ return applicationState_.get(key);
+ }
+
+ public Map<String, ApplicationState> getApplicationState()
+ {
+ return applicationState_;
+ }
+
+ void addApplicationState(String key, ApplicationState appState)
+ {
+ applicationState_.put(key, appState);
+ }
+
+ /* getters and setters */
+ long getUpdateTimestamp()
+ {
+ return updateTimestamp_;
+ }
+
+ synchronized void updateTimestamp()
+ {
+ updateTimestamp_ = System.currentTimeMillis();
+ }
+
+ public boolean isAlive()
+ {
+ return isAlive_;
+ }
+
+ synchronized void isAlive(boolean value)
+ {
+ isAlive_ = value;
+ }
+
+
+ boolean isAGossiper()
+ {
+ return isAGossiper_;
+ }
+
+ synchronized void isAGossiper(boolean value)
+ {
+ //isAlive_ = false;
+ isAGossiper_ = value;
+ }
+}
+
+class EndPointStateSerializer implements ICompactSerializer<EndPointState>
+{
+ private static Logger logger_ = Logger.getLogger(EndPointStateSerializer.class);
+
+ public void serialize(EndPointState epState, DataOutputStream dos) throws IOException
+ {
+ /* These are for estimating whether we overshoot the MTU limit */
+ int estimate = 0;
+
+ /* serialize the HeartBeatState */
+ HeartBeatState hbState = epState.getHeartBeatState();
+ HeartBeatState.serializer().serialize(hbState, dos);
+
+ /* serialize the map of ApplicationState objects */
+ int size = epState.applicationState_.size();
+ dos.writeInt(size);
+ if ( size > 0 )
+ {
+ Set<String> keys = epState.applicationState_.keySet();
+ for( String key : keys )
+ {
+ if ( Gossiper.MAX_GOSSIP_PACKET_SIZE - dos.size() < estimate )
+ {
+ logger_.info("@@@@ Breaking out to respect the MTU size in EndPointState serializer. Estimate is " + estimate + " @@@@");
+ break;
+ }
+
+ ApplicationState appState = epState.applicationState_.get(key);
+ if ( appState != null )
+ {
+ int pre = dos.size();
+ dos.writeUTF(key);
+ ApplicationState.serializer().serialize(appState, dos);
+ int post = dos.size();
+ estimate = post - pre;
+ }
+ }
+ }
+ }
+
+ public EndPointState deserialize(DataInputStream dis) throws IOException
+ {
+ HeartBeatState hbState = HeartBeatState.serializer().deserialize(dis);
+ EndPointState epState = new EndPointState(hbState);
+
+ int appStateSize = dis.readInt();
+ for ( int i = 0; i < appStateSize; ++i )
+ {
+ if ( dis.available() == 0 )
+ {
+ break;
+ }
+
+ String key = dis.readUTF();
+ ApplicationState appState = ApplicationState.serializer().deserialize(dis);
+ epState.addApplicationState(key, appState);
+ }
+ return epState;
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/gms/FailureDetector.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/gms/FailureDetector.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/gms/FailureDetector.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/gms/FailureDetector.java Mon Mar 2 06:12:46 2009
@@ -0,0 +1,343 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import java.io.FileOutputStream;
+import java.lang.management.ManagementFactory;
+import java.net.UnknownHostException;
+import java.util.*;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.utils.*;
+
+/**
+ * This FailureDetector is an implementation of the paper titled
+ * "The Phi Accrual Failure Detector" by Hayashibara.
+ * Check the paper and the <i>IFailureDetector</i> interface for details.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+public class FailureDetector implements IFailureDetector, FailureDetectorMBean
+{
+ private static Logger logger_ = Logger.getLogger(FailureDetector.class);
+ private static final int sampleSize_ = 1000;
+ private static final int phiSuspectThreshold_ = 5;
+ private static final int phiConvictThreshold_ = 8;
+ /* The Failure Detector has to have been up for atleast 1 min. */
+ private static final long uptimeThreshold_ = 60000;
+ private static IFailureDetector failureDetector_;
+ /* Used to lock the factory for creation of FailureDetector instance */
+ private static Lock createLock_ = new ReentrantLock();
+ /* The time when the module was instantiated. */
+ private static long creationTime_;
+
+ public static IFailureDetector instance()
+ {
+ if ( failureDetector_ == null )
+ {
+ FailureDetector.createLock_.lock();
+ try
+ {
+ if ( failureDetector_ == null )
+ {
+ failureDetector_ = new FailureDetector();
+ }
+ }
+ finally
+ {
+ createLock_.unlock();
+ }
+ }
+ return failureDetector_;
+ }
+
+ private Map<EndPoint, ArrivalWindow> arrivalSamples_ = new Hashtable<EndPoint, ArrivalWindow>();
+ private List<IFailureDetectionEventListener> fdEvntListeners_ = new ArrayList<IFailureDetectionEventListener>();
+
+ public FailureDetector()
+ {
+ creationTime_ = System.currentTimeMillis();
+ // Register this instance with JMX
+ try
+ {
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ mbs.registerMBean(this, new ObjectName("com.facebook.infrastructure.gms:type=FailureDetector"));
+ }
+ catch (Exception e)
+ {
+ logger_.error(LogUtil.throwableToString(e));
+ }
+ }
+
+ /**
+ * Dump the inter arrival times for examination if necessary.
+ */
+ public void dumpInterArrivalTimes()
+ {
+ try
+ {
+ FileOutputStream fos = new FileOutputStream("/var/tmp/output-" + System.currentTimeMillis() + ".dat", true);
+ fos.write(toString().getBytes());
+ fos.close();
+ }
+ catch(Throwable th)
+ {
+ logger_.warn(LogUtil.throwableToString(th));
+ }
+ }
+
+ /**
+ * We dump the arrival window for any endpoint only if the
+ * local Failure Detector module has been up for more than a
+ * minute.
+ *
+ * @param ep for which the arrival window needs to be dumped.
+ */
+ private void dumpInterArrivalTimes(EndPoint ep)
+ {
+ long now = System.currentTimeMillis();
+ if ( (now - FailureDetector.creationTime_) <= FailureDetector.uptimeThreshold_ )
+ return;
+ try
+ {
+ FileOutputStream fos = new FileOutputStream("/var/tmp/output-" + System.currentTimeMillis() + "-" + ep + ".dat", true);
+ ArrivalWindow hWnd = arrivalSamples_.get(ep);
+ fos.write(hWnd.toString().getBytes());
+ fos.close();
+ }
+ catch(Throwable th)
+ {
+ logger_.warn(LogUtil.throwableToString(th));
+ }
+ }
+
+ public boolean isAlive(EndPoint ep)
+ {
+ try
+ {
+ /* If the endpoint in question is the local endpoint return true. */
+ String localHost = FBUtilities.getLocalHostName();
+ if ( localHost.equals( ep.getHost() ) )
+ return true;
+ }
+ catch( UnknownHostException ex )
+ {
+ logger_.info( LogUtil.throwableToString(ex) );
+ }
+ /* Incoming port is assumed to be the Storage port. We need to change it to the control port */
+ EndPoint ep2 = new EndPoint(ep.getHost(), DatabaseDescriptor.getControlPort());
+ EndPointState epState = Gossiper.instance().getEndPointStateForEndPoint(ep2);
+ return epState.isAlive();
+ }
+
+ public void report(EndPoint ep)
+ {
+ long now = System.currentTimeMillis();
+ ArrivalWindow hbWnd = arrivalSamples_.get(ep);
+ if ( hbWnd == null )
+ {
+ hbWnd = new ArrivalWindow(sampleSize_);
+ arrivalSamples_.put(ep, hbWnd);
+ }
+ hbWnd.add(now);
+ }
+
+ public void intepret(EndPoint ep)
+ {
+ ArrivalWindow hbWnd = arrivalSamples_.get(ep);
+ if ( hbWnd == null )
+ {
+ return;
+ }
+ long now = System.currentTimeMillis();
+ /* We need this so that we do not suspect a convict. */
+ boolean isConvicted = false;
+ double phi = hbWnd.phi(now);
+ logger_.info("PHI for " + ep + " : " + phi);
+
+ /*
+ if ( phi > phiConvictThreshold_ )
+ {
+ isConvicted = true;
+ for ( IFailureDetectionEventListener listener : fdEvntListeners_ )
+ {
+ listener.convict(ep);
+ }
+ }
+ */
+ if ( !isConvicted && phi > phiSuspectThreshold_ )
+ {
+ for ( IFailureDetectionEventListener listener : fdEvntListeners_ )
+ {
+ listener.suspect(ep);
+ }
+ }
+ }
+
+ public void registerFailureDetectionEventListener(IFailureDetectionEventListener listener)
+ {
+ fdEvntListeners_.add(listener);
+ }
+
+ public void unregisterFailureDetectionEventListener(IFailureDetectionEventListener listener)
+ {
+ fdEvntListeners_.remove(listener);
+ }
+
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder();
+ Set<EndPoint> eps = arrivalSamples_.keySet();
+
+ sb.append("-----------------------------------------------------------------------");
+ for ( EndPoint ep : eps )
+ {
+ ArrivalWindow hWnd = arrivalSamples_.get(ep);
+ sb.append(ep + " : ");
+ sb.append(hWnd.toString());
+ sb.append( System.getProperty("line.separator") );
+ }
+ sb.append("-----------------------------------------------------------------------");
+ return sb.toString();
+ }
+
+ public static void main(String[] args) throws Throwable
+ {
+ }
+}
+
+class ArrivalWindow
+{
+ private static Logger logger_ = Logger.getLogger(ArrivalWindow.class);
+ private double tLast_ = 0L;
+ private List<Double> arrivalIntervals_;
+ private int size_;
+
+ ArrivalWindow(int size)
+ {
+ size_ = size;
+ arrivalIntervals_ = new ArrayList<Double>(size);
+ }
+
+ synchronized void add(double value)
+ {
+ if ( arrivalIntervals_.size() == size_ )
+ {
+ arrivalIntervals_.remove(0);
+ }
+
+ double interArrivalTime = 0;
+ if ( tLast_ > 0L )
+ {
+ interArrivalTime = (value - tLast_);
+ }
+ tLast_ = value;
+ arrivalIntervals_.add(interArrivalTime);
+ }
+
+ synchronized double sum()
+ {
+ double sum = 0d;
+ int size = arrivalIntervals_.size();
+ for( int i = 0; i < size; ++i )
+ {
+ sum += arrivalIntervals_.get(i);
+ }
+ return sum;
+ }
+
+ synchronized double sumOfDeviations()
+ {
+ double sumOfDeviations = 0d;
+ double mean = mean();
+ int size = arrivalIntervals_.size();
+
+ for( int i = 0; i < size; ++i )
+ {
+ sumOfDeviations += (arrivalIntervals_.get(i) - mean)*(arrivalIntervals_.get(i) - mean);
+ }
+ return sumOfDeviations;
+ }
+
+ synchronized double mean()
+ {
+ return sum()/arrivalIntervals_.size();
+ }
+
+ synchronized double variance()
+ {
+ return sumOfDeviations() / (arrivalIntervals_.size());
+ }
+
+ double deviation()
+ {
+ return Math.sqrt(variance());
+ }
+
+ void clear()
+ {
+ arrivalIntervals_.clear();
+ }
+
+ double p(double t)
+ {
+ // Stat stat = new Stat();
+ double mean = mean();
+ double deviation = deviation();
+ /* Exponential CDF = 1 -e^-lambda*x */
+ double exponent = (-1)*(t)/mean;
+ return 1 - ( 1 - Math.pow(Math.E, exponent) );
+ // return stat.gaussianCDF(mean, deviation, t, Double.POSITIVE_INFINITY);
+ }
+
+ double phi(long tnow)
+ {
+ int size = arrivalIntervals_.size();
+ double log = 0d;
+ if ( size > 0 )
+ {
+ double t = tnow - tLast_;
+ double probability = p(t);
+ log = (-1) * Math.log10( probability );
+ }
+ return log;
+ }
+
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder();
+ List<Double> arrivalIntervals = new ArrayList<Double>(arrivalIntervals_);
+ int size = arrivalIntervals.size();
+ for ( int i = 0; i < size; ++i )
+ {
+ sb.append(arrivalIntervals.get(i));
+ sb.append(" ");
+ }
+ return sb.toString();
+ }
+}
+
Added: incubator/cassandra/src/org/apache/cassandra/gms/FailureDetectorMBean.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/gms/FailureDetectorMBean.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/gms/FailureDetectorMBean.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/gms/FailureDetectorMBean.java Mon Mar 2 06:12:46 2009
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+public interface FailureDetectorMBean
+{
+ public void dumpInterArrivalTimes();
+}
Added: incubator/cassandra/src/org/apache/cassandra/gms/GossipDigest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/gms/GossipDigest.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/gms/GossipDigest.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/gms/GossipDigest.java Mon Mar 2 06:12:46 2009
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.CompactEndPointSerializationHelper;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.*;
+
+/**
+ * Contains information about a specified list of EndPoints and the largest version
+ * of the state they have generated as known by the local endpoint.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class GossipDigest implements Comparable<GossipDigest>
+{
+ private static ICompactSerializer<GossipDigest> serializer_;
+ static
+ {
+ serializer_ = new GossipDigestSerializer();
+ }
+
+ EndPoint endPoint_;
+ int generation_;
+ int maxVersion_;
+
+ public static ICompactSerializer<GossipDigest> serializer()
+ {
+ return serializer_;
+ }
+
+ GossipDigest(EndPoint endPoint, int generation, int maxVersion)
+ {
+ endPoint_ = endPoint;
+ generation_ = generation;
+ maxVersion_ = maxVersion;
+ }
+
+ EndPoint getEndPoint()
+ {
+ return endPoint_;
+ }
+
+ int getGeneration()
+ {
+ return generation_;
+ }
+
+ int getMaxVersion()
+ {
+ return maxVersion_;
+ }
+
+ public int compareTo(GossipDigest gDigest)
+ {
+ if ( generation_ != gDigest.generation_ )
+ return ( generation_ - gDigest.generation_ );
+ return (maxVersion_ - gDigest.maxVersion_);
+ }
+
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append(endPoint_);
+ sb.append(":");
+ sb.append(generation_);
+ sb.append(":");
+ sb.append(maxVersion_);
+ return sb.toString();
+ }
+}
+
+class GossipDigestSerializer implements ICompactSerializer<GossipDigest>
+{
+ public void serialize(GossipDigest gDigest, DataOutputStream dos) throws IOException
+ {
+ CompactEndPointSerializationHelper.serialize(gDigest.endPoint_, dos);
+ dos.writeInt(gDigest.generation_);
+ dos.writeInt(gDigest.maxVersion_);
+ }
+
+ public GossipDigest deserialize(DataInputStream dis) throws IOException
+ {
+ EndPoint endPoint = CompactEndPointSerializationHelper.deserialize(dis);
+ int generation = dis.readInt();
+ int version = dis.readInt();
+ return new GossipDigest(endPoint, generation, version);
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/gms/GossipDigestAck2Message.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/gms/GossipDigestAck2Message.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/gms/GossipDigestAck2Message.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/gms/GossipDigestAck2Message.java Mon Mar 2 06:12:46 2009
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.*;
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.*;
+
+
+/**
+ * This message gets sent out as a result of the receipt of a GossipDigestAckMessage. This the
+ * last stage of the 3 way messaging of the Gossip protocol.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class GossipDigestAck2Message
+{
+ private static ICompactSerializer<GossipDigestAck2Message> serializer_;
+ static
+ {
+ serializer_ = new GossipDigestAck2MessageSerializer();
+ }
+
+ Map<EndPoint, EndPointState> epStateMap_ = new HashMap<EndPoint, EndPointState>();
+
+ public static ICompactSerializer<GossipDigestAck2Message> serializer()
+ {
+ return serializer_;
+ }
+
+ GossipDigestAck2Message(Map<EndPoint, EndPointState> epStateMap)
+ {
+ epStateMap_ = epStateMap;
+ }
+
+ Map<EndPoint, EndPointState> getEndPointStateMap()
+ {
+ return epStateMap_;
+ }
+}
+
+class GossipDigestAck2MessageSerializer implements ICompactSerializer<GossipDigestAck2Message>
+{
+ public void serialize(GossipDigestAck2Message gDigestAck2Message, DataOutputStream dos) throws IOException
+ {
+ /* Use the EndPointState */
+ EndPointStatesSerializationHelper.serialize(gDigestAck2Message.epStateMap_, dos);
+ }
+
+ public GossipDigestAck2Message deserialize(DataInputStream dis) throws IOException
+ {
+ Map<EndPoint, EndPointState> epStateMap = EndPointStatesSerializationHelper.deserialize(dis);
+ return new GossipDigestAck2Message(epStateMap);
+ }
+}
+
Added: incubator/cassandra/src/org/apache/cassandra/gms/GossipDigestAckMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/gms/GossipDigestAckMessage.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/gms/GossipDigestAckMessage.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/gms/GossipDigestAckMessage.java Mon Mar 2 06:12:46 2009
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.EndPoint;
+
+
+
+/**
+ * This message gets sent out as a result of the receipt of a GossipDigestSynMessage by an
+ * endpoint. This is the 2 stage of the 3 way messaging in the Gossip protocol.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class GossipDigestAckMessage
+{
+ private static ICompactSerializer<GossipDigestAckMessage> serializer_;
+ static
+ {
+ serializer_ = new GossipDigestAckMessageSerializer();
+ }
+
+ List<GossipDigest> gDigestList_ = new ArrayList<GossipDigest>();
+ Map<EndPoint, EndPointState> epStateMap_ = new HashMap<EndPoint, EndPointState>();
+
+ static ICompactSerializer<GossipDigestAckMessage> serializer()
+ {
+ return serializer_;
+ }
+
+ GossipDigestAckMessage(List<GossipDigest> gDigestList, Map<EndPoint, EndPointState> epStateMap)
+ {
+ gDigestList_ = gDigestList;
+ epStateMap_ = epStateMap;
+ }
+
+ void addGossipDigest(EndPoint ep, int generation, int version)
+ {
+ gDigestList_.add( new GossipDigest(ep, generation, version) );
+ }
+
+ List<GossipDigest> getGossipDigestList()
+ {
+ return gDigestList_;
+ }
+
+ Map<EndPoint, EndPointState> getEndPointStateMap()
+ {
+ return epStateMap_;
+ }
+}
+
+class GossipDigestAckMessageSerializer implements ICompactSerializer<GossipDigestAckMessage>
+{
+ public void serialize(GossipDigestAckMessage gDigestAckMessage, DataOutputStream dos) throws IOException
+ {
+ /* Use the helper to serialize the GossipDigestList */
+ boolean bContinue = GossipDigestSerializationHelper.serialize(gDigestAckMessage.gDigestList_, dos);
+ dos.writeBoolean(bContinue);
+ /* Use the EndPointState */
+ if ( bContinue )
+ {
+ EndPointStatesSerializationHelper.serialize(gDigestAckMessage.epStateMap_, dos);
+ }
+ }
+
+ public GossipDigestAckMessage deserialize(DataInputStream dis) throws IOException
+ {
+ Map<EndPoint, EndPointState> epStateMap = new HashMap<EndPoint, EndPointState>();
+ List<GossipDigest> gDigestList = GossipDigestSerializationHelper.deserialize(dis);
+ boolean bContinue = dis.readBoolean();
+
+ if ( bContinue )
+ {
+ epStateMap = EndPointStatesSerializationHelper.deserialize(dis);
+ }
+ return new GossipDigestAckMessage(gDigestList, epStateMap);
+ }
+}
\ No newline at end of file
Added: incubator/cassandra/src/org/apache/cassandra/gms/GossipDigestSynMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/gms/GossipDigestSynMessage.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/gms/GossipDigestSynMessage.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/gms/GossipDigestSynMessage.java Mon Mar 2 06:12:46 2009
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.CompactEndPointSerializationHelper;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.utils.Log4jLogger;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.utils.*;
+
+
+/**
+ * This is the first message that gets sent out as a start of the Gossip protocol in a
+ * round.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class GossipDigestSynMessage
+{
+ private static ICompactSerializer<GossipDigestSynMessage> serializer_;
+ static
+ {
+ serializer_ = new GossipDigestSynMessageSerializer();
+ }
+
+ String clusterId_;
+ List<GossipDigest> gDigests_ = new ArrayList<GossipDigest>();
+
+ public static ICompactSerializer<GossipDigestSynMessage> serializer()
+ {
+ return serializer_;
+ }
+
+ public GossipDigestSynMessage(String clusterId, List<GossipDigest> gDigests)
+ {
+ clusterId_ = clusterId;
+ gDigests_ = gDigests;
+ }
+
+ List<GossipDigest> getGossipDigests()
+ {
+ return gDigests_;
+ }
+}
+
+class GossipDigestSerializationHelper
+{
+ private static Logger logger_ = Logger.getLogger(GossipDigestSerializationHelper.class);
+
+ static boolean serialize(List<GossipDigest> gDigestList, DataOutputStream dos) throws IOException
+ {
+ boolean bVal = true;
+ int size = gDigestList.size();
+ dos.writeInt(size);
+
+ int estimate = 0;
+ for ( GossipDigest gDigest : gDigestList )
+ {
+ if ( Gossiper.MAX_GOSSIP_PACKET_SIZE - dos.size() < estimate )
+ {
+ logger_.info("@@@@ Breaking out to respect the MTU size in GD @@@@");
+ bVal = false;
+ break;
+ }
+ int pre = dos.size();
+ GossipDigest.serializer().serialize( gDigest, dos );
+ int post = dos.size();
+ estimate = post - pre;
+ }
+ return bVal;
+ }
+
+ static List<GossipDigest> deserialize(DataInputStream dis) throws IOException
+ {
+ int size = dis.readInt();
+ List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
+
+ for ( int i = 0; i < size; ++i )
+ {
+ if ( dis.available() == 0 )
+ {
+ logger_.info("Remaining bytes zero. Stopping deserialization of GossipDigests.");
+ break;
+ }
+
+ GossipDigest gDigest = GossipDigest.serializer().deserialize(dis);
+ gDigests.add( gDigest );
+ }
+ return gDigests;
+ }
+}
+
+class EndPointStatesSerializationHelper
+{
+ private static Log4jLogger logger_ = new Log4jLogger(EndPointStatesSerializationHelper.class.getName());
+
+ static boolean serialize(Map<EndPoint, EndPointState> epStateMap, DataOutputStream dos) throws IOException
+ {
+ boolean bVal = true;
+ int estimate = 0;
+ int size = epStateMap.size();
+ dos.writeInt(size);
+
+ Set<EndPoint> eps = epStateMap.keySet();
+ for( EndPoint ep : eps )
+ {
+ if ( Gossiper.MAX_GOSSIP_PACKET_SIZE - dos.size() < estimate )
+ {
+ logger_.info("@@@@ Breaking out to respect the MTU size in EPS. Estimate is " + estimate + " @@@@");
+ bVal = false;
+ break;
+ }
+
+ int pre = dos.size();
+ CompactEndPointSerializationHelper.serialize(ep, dos);
+ EndPointState epState = epStateMap.get(ep);
+ EndPointState.serializer().serialize(epState, dos);
+ int post = dos.size();
+ estimate = post - pre;
+ }
+ return bVal;
+ }
+
+ static Map<EndPoint, EndPointState> deserialize(DataInputStream dis) throws IOException
+ {
+ int size = dis.readInt();
+ Map<EndPoint, EndPointState> epStateMap = new HashMap<EndPoint, EndPointState>();
+
+ for ( int i = 0; i < size; ++i )
+ {
+ if ( dis.available() == 0 )
+ {
+ logger_.info("Remaining bytes zero. Stopping deserialization in EndPointState.");
+ break;
+ }
+ // int length = dis.readInt();
+ EndPoint ep = CompactEndPointSerializationHelper.deserialize(dis);
+ EndPointState epState = EndPointState.serializer().deserialize(dis);
+ epStateMap.put(ep, epState);
+ }
+ return epStateMap;
+ }
+}
+
+class GossipDigestSynMessageSerializer implements ICompactSerializer<GossipDigestSynMessage>
+{
+ public void serialize(GossipDigestSynMessage gDigestSynMessage, DataOutputStream dos) throws IOException
+ {
+ dos.writeUTF(gDigestSynMessage.clusterId_);
+ GossipDigestSerializationHelper.serialize(gDigestSynMessage.gDigests_, dos);
+ }
+
+ public GossipDigestSynMessage deserialize(DataInputStream dis) throws IOException
+ {
+ String clusterId = dis.readUTF();
+ List<GossipDigest> gDigests = GossipDigestSerializationHelper.deserialize(dis);
+ return new GossipDigestSynMessage(clusterId, gDigests);
+ }
+
+}
+