You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2010/08/12 18:37:19 UTC

svn commit: r984853 - in /cassandra/branches/cassandra-0.6: ./ contrib/property_snitch/src/java/org/apache/cassandra/locator/ src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/locator/ src/java/org/apache/cassandra/net/ src/java/org/a...

Author: brandonwilliams
Date: Thu Aug 12 16:37:19 2010
New Revision: 984853

URL: http://svn.apache.org/viewvc?rev=984853&view=rev
Log:
Dynamic snitch.  Patch by brandonwilliams; reviewed by jbellis for CASSANDRA-981

Added:
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/ILatencyPublisher.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/ILatencySubscriber.java
    cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java
Modified:
    cassandra/branches/cassandra-0.6/CHANGES.txt
    cassandra/branches/cassandra-0.6/contrib/property_snitch/src/java/org/apache/cassandra/locator/PropertyFileEndPointSnitch.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/IEndPointSnitch.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/ExpiringMap.java

Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=984853&r1=984852&r2=984853&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Thu Aug 12 16:37:19 2010
@@ -6,6 +6,7 @@
  * fix compilation on non-sun JKDs (CASSANDRA-1061)
  * remove String.trim() call on row keys in batch mutations (CASSANDRA-1235)
  * Log summary of dropped messages instead of spamming log (CASSANDRA-1284)
+ * Dynamic snitch (CASSANDRA-981)
 
 
 0.6.4

Modified: cassandra/branches/cassandra-0.6/contrib/property_snitch/src/java/org/apache/cassandra/locator/PropertyFileEndPointSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/contrib/property_snitch/src/java/org/apache/cassandra/locator/PropertyFileEndPointSnitch.java?rev=984853&r1=984852&r2=984853&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/contrib/property_snitch/src/java/org/apache/cassandra/locator/PropertyFileEndPointSnitch.java (original)
+++ cassandra/branches/cassandra-0.6/contrib/property_snitch/src/java/org/apache/cassandra/locator/PropertyFileEndPointSnitch.java Thu Aug 12 16:37:19 2010
@@ -110,6 +110,9 @@ public class PropertyFileEndPointSnitch 
         return getEndPointInfo(endPoint)[0];
     }
 
+    public String getLocation(InetAddress endPoint) {
+        return getEndPointInfo(endPoint)[0];
+    }
     /**
      * Return the rack for which an endpoint resides in
      *  

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=984853&r1=984852&r2=984853&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Thu Aug 12 16:37:19 2010
@@ -27,6 +27,7 @@ import org.apache.cassandra.db.marshal.B
 import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.locator.IEndPointSnitch;
+import org.apache.cassandra.locator.DynamicEndpointSnitch;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.utils.FBUtilities;
@@ -626,7 +627,12 @@ public class DatabaseDescriptor
                 try
                 {
                     Class cls = Class.forName(endPointSnitchClassName);
-                    epSnitch = (IEndPointSnitch)cls.getConstructor().newInstance();
+                    IEndPointSnitch snitch = (IEndPointSnitch)cls.getConstructor().newInstance();
+                    String dynamic = System.getProperty("cassandra.dynamic_snitch");
+                    if (dynamic == null || Boolean.getBoolean(dynamic) == false)
+                        epSnitch = snitch;
+                    else
+                        epSnitch = new DynamicEndpointSnitch(snitch);
                 }
                 catch (ClassNotFoundException e)
                 {
@@ -648,7 +654,6 @@ public class DatabaseDescriptor
                 {
                     throw new ConfigurationException("Invalid endpointsnitch class " + endPointSnitchClassName + " " + e.getMessage());
                 }
-
                 String xqlTable = "/Storage/Keyspaces/Keyspace[@Name='" + ksName + "']/";
                 NodeList columnFamilies = xmlUtils.getRequestedNodeList(xqlTable + "ColumnFamily");
 

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java?rev=984853&r1=984852&r2=984853&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java Thu Aug 12 16:37:19 2010
@@ -70,28 +70,33 @@ public abstract class AbstractEndpointSn
         {
             public int compare(InetAddress a1, InetAddress a2)
             {
-                try
-                {
-                    if (address.equals(a1) && !address.equals(a2))
-                        return -1;
-                    if (address.equals(a2) && !address.equals(a1))
-                        return 1;
-                    if (isOnSameRack(address, a1) && !isOnSameRack(address, a2))
-                        return -1;
-                    if (isOnSameRack(address, a2) && !isOnSameRack(address, a1))
-                        return 1;
-                    if (isInSameDataCenter(address, a1) && !isInSameDataCenter(address, a2))
-                        return -1;
-                    if (isInSameDataCenter(address, a2) && !isInSameDataCenter(address, a1))
-                        return 1;
-                    return 0;
-                }
-                catch (UnknownHostException e)
-                {
-                    throw new RuntimeException(e);
-                }
+                return compareEndpoints(address, a1, a2);
             }
         });
         return addresses;
     }
+
+    public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2)
+    {
+        try
+        {
+            if (target.equals(a1) && !target.equals(a2))
+                return -1;
+            if (target.equals(a2) && !target.equals(a1))
+                return 1;
+            if (isOnSameRack(target, a1) && !isOnSameRack(target, a2))
+                return -1;
+            if (isOnSameRack(target, a2) && !isOnSameRack(target, a1))
+                return 1;
+            if (isInSameDataCenter(target, a1) && !isInSameDataCenter(target, a2))
+                return -1;
+            if (isInSameDataCenter(target, a2) && !isInSameDataCenter(target, a1))
+                return 1;
+            return 0;
+        }
+        catch (UnknownHostException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
 }

Added: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java?rev=984853&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java (added)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java Thu Aug 12 16:37:19 2010
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.*;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import java.lang.management.ManagementFactory;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.ResponseVerbHandler;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.AbstractStatsDeque;
+import org.apache.cassandra.locator.IEndPointSnitch;
+import org.apache.cassandra.locator.ILatencyPublisher;
+import org.apache.cassandra.locator.ILatencySubscriber;
+import org.apache.cassandra.locator.AbstractEndpointSnitch;
+import org.apache.cassandra.locator.DynamicEndpointSnitchMBean;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * A dynamic snitch that sorts endpoints by latency with an adapted phi failure detector
+ */
+public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILatencySubscriber, DynamicEndpointSnitchMBean
+{
+    private static int UPDATES_PER_INTERVAL = 10000;
+    private static int UPDATE_INTERVAL_IN_MS = 100;
+    private static int RESET_INTERVAL_IN_MS = 60000 * 10;
+    private static int WINDOW_SIZE = 100;
+    private boolean registered = false;
+
+    private ConcurrentHashMap<InetAddress, Double> scores = new ConcurrentHashMap();
+    private ConcurrentHashMap<InetAddress, AdaptiveLatencyTracker> windows = new ConcurrentHashMap();
+    private AtomicInteger intervalupdates = new AtomicInteger(0);
+    public IEndPointSnitch subsnitch;
+
+    public DynamicEndpointSnitch(IEndPointSnitch snitch)
+    {
+        subsnitch = snitch;
+        TimerTask update = new TimerTask()
+        {
+            public void run()
+            {
+                updateScores();
+            }
+        };
+        TimerTask reset = new TimerTask()
+        {
+            public void run()
+            {
+                // we do this so that a host considered bad has a chance to recover, otherwise would we never try
+                // to read from it, which would cause its score to never change
+                reset();
+            }
+        };
+        Timer timer = new Timer("DynamicEndpointSnitch");
+        timer.schedule(update, UPDATE_INTERVAL_IN_MS, UPDATE_INTERVAL_IN_MS);
+        timer.schedule(reset, RESET_INTERVAL_IN_MS, RESET_INTERVAL_IN_MS);
+
+        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+        try
+        {
+            mbs.registerMBean(this, new ObjectName("org.apache.cassandra.locator:type=DynamicEndpointSnitch"));
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public boolean isOnSameRack(InetAddress a1, InetAddress a2) throws UnknownHostException
+    {
+        return subsnitch.isOnSameRack(a1, a2);
+    }
+
+    public boolean isInSameDataCenter(InetAddress a1, InetAddress a2) throws UnknownHostException
+    {
+        return subsnitch.isInSameDataCenter(a1, a2);
+    }
+
+    public String getLocation(InetAddress endpoint) throws UnknownHostException
+    {
+        return subsnitch.getLocation(endpoint);
+    }
+
+    public List<InetAddress> getSortedListByProximity(final InetAddress address, Collection<InetAddress> addresses)
+    {
+        List<InetAddress> list = new ArrayList<InetAddress>(addresses);
+        sortByProximity(address, list);
+        return list;
+    }
+
+    public List<InetAddress> sortByProximity(final InetAddress address, List<InetAddress> addresses)
+    {
+        assert address == FBUtilities.getLocalAddress(); // we only know about ourself
+        Collections.sort(addresses, new Comparator<InetAddress>()
+        {
+            public int compare(InetAddress a1, InetAddress a2)
+            {
+                return compareEndpoints(address, a1, a2);
+            }
+        });
+        return addresses;
+    }
+
+    public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2)
+    {
+        Double scored1 = scores.get(a1);
+        Double scored2 = scores.get(a2);
+
+        if (scored1 == null || scored2 == null)
+            return subsnitch.compareEndpoints(target, a1, a2);
+        if (scored1.equals(scored2))
+            return 0;
+        if (scored1 < scored2)
+            return 1;
+        else
+            return -1;
+    }
+
+    public void receiveTiming(InetAddress host, Double latency) // this is cheap
+    {
+        if (intervalupdates.intValue() >= UPDATES_PER_INTERVAL)
+            return;
+        AdaptiveLatencyTracker tracker = windows.get(host);
+        if (tracker == null)
+        {
+            AdaptiveLatencyTracker alt = new AdaptiveLatencyTracker(WINDOW_SIZE);
+            tracker = windows.putIfAbsent(host, alt);
+            if (tracker == null)
+                tracker = alt;
+        }
+        tracker.add(latency);
+        intervalupdates.getAndIncrement();
+    }
+
+    private void updateScores() // this is expensive
+    {
+        if (!registered)
+        {
+       	    ILatencyPublisher handler = (ILatencyPublisher)MessagingService.instance.getVerbHandler(StorageService.Verb.READ_RESPONSE);
+            if (handler != null)
+            {
+                handler.register(this);
+                registered = true;
+            }
+
+        }
+        for (Map.Entry<InetAddress, AdaptiveLatencyTracker> entry: windows.entrySet())
+        {
+            scores.put(entry.getKey(), entry.getValue().score());
+        }
+        intervalupdates.set(0);
+    }
+
+    private void reset()
+    {
+        for (AdaptiveLatencyTracker tracker : windows.values())
+        {
+            tracker.clear();
+        }
+    }
+
+    public Map<InetAddress, Double> getScores()
+    {
+        return scores;
+    }
+}
+
+/** a threadsafe version of BoundedStatsDeque+ArrivalWindow with modification for arbitrary times **/
+class AdaptiveLatencyTracker extends AbstractStatsDeque
+{
+    private LinkedBlockingDeque latencies;
+    private final int size;                                   
+    private static double SENTINEL_COMPARE = 0.0001; // arbitrary; as long as it is the same across hosts it doesn't matter
+
+    AdaptiveLatencyTracker(int size)
+    {
+        this.size = size;
+        latencies = new LinkedBlockingDeque(size);
+    }
+
+    public void add(double i)
+    {
+        if (!latencies.offer(i))
+        {
+            latencies.remove();
+            latencies.offer(i);
+        }
+    }
+
+    public void clear()
+    {
+        latencies.clear();
+    }
+
+    public Iterator<Double> iterator()
+    {
+        return latencies.iterator();
+    }
+
+    public int size()
+    {
+        return latencies.size();
+    }
+
+    double p(double t)
+    {
+        double mean = mean();
+        double exponent = (-1) * (t) / mean;
+        return 1 - Math.pow( Math.E, exponent);
+    }
+
+    double score()
+    {
+        double log = 0d;
+        if ( latencies.size() > 0 )
+        {
+            double probability = p(SENTINEL_COMPARE);
+            log = (-1) * Math.log10( probability );
+        }
+        return log;
+    }
+
+}

Added: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java?rev=984853&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java (added)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java Thu Aug 12 16:37:19 2010
@@ -0,0 +1,27 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.cassandra.locator;
+
+import java.net.InetAddress;
+import java.util.Map;
+
+public interface DynamicEndpointSnitchMBean {
+    public Map<InetAddress, Double> getScores();
+}

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/IEndPointSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/IEndPointSnitch.java?rev=984853&r1=984852&r2=984853&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/IEndPointSnitch.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/IEndPointSnitch.java Thu Aug 12 16:37:19 2010
@@ -45,5 +45,22 @@ public interface IEndPointSnitch
      * This method will sort the List<InetAddress> according to the proximity of the given address.
      */
     public List<InetAddress> sortByProximity(InetAddress address, List<InetAddress> addresses);
+
+    /**
+     * compares two endpoints in relation to the target endpoint, returning as Comparator.compare would
+     */
+    public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2);
+
+    /**
+     * indicates whether two endpoints are on the same rack
+     */
+    public boolean isOnSameRack(InetAddress a1, InetAddress a2) throws UnknownHostException;
+
+    /**
+     * indicates whether two endpoints are in the same datacenter
+     */
+    public boolean isInSameDataCenter(InetAddress a1, InetAddress a2) throws UnknownHostException;
+
+    public String getLocation(InetAddress endpoint) throws UnknownHostException;
 }
 

Added: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/ILatencyPublisher.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/ILatencyPublisher.java?rev=984853&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/ILatencyPublisher.java (added)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/ILatencyPublisher.java Thu Aug 12 16:37:19 2010
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import org.apache.cassandra.locator.ILatencySubscriber;
+
+public interface ILatencyPublisher
+{
+    public void register(ILatencySubscriber subcriber);
+}

Added: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/ILatencySubscriber.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/ILatencySubscriber.java?rev=984853&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/ILatencySubscriber.java (added)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/ILatencySubscriber.java Thu Aug 12 16:37:19 2010
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import java.net.InetAddress;
+
+public interface ILatencySubscriber
+{
+    public void receiveTiming(InetAddress address, Double latency);
+}

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java?rev=984853&r1=984852&r2=984853&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java Thu Aug 12 16:37:19 2010
@@ -415,6 +415,16 @@ public class MessagingService
         return taskCompletionMap_.remove(key);
     }
 
+    public static long getRegisteredCallbackAge(String key)
+    {
+        return callbackMap_.getAge(key);
+    }
+
+    public static long getAsyncResultAge(String key)
+    {
+        return taskCompletionMap_.getAge(key);
+    }
+
     public static ExecutorService getDeserializationExecutor()
     {
         return messageDeserializerExecutor_;

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/ResponseVerbHandler.java?rev=984853&r1=984852&r2=984853&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/ResponseVerbHandler.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/ResponseVerbHandler.java Thu Aug 12 16:37:19 2010
@@ -18,20 +18,29 @@
 
 package org.apache.cassandra.net;
 
+import java.util.*;
+import java.net.InetAddress;
+
+import org.apache.cassandra.locator.ILatencyPublisher;
+import org.apache.cassandra.locator.ILatencySubscriber;
+
 import org.apache.log4j.Logger;
 
-public class ResponseVerbHandler implements IVerbHandler
+public class ResponseVerbHandler implements IVerbHandler, ILatencyPublisher
 {
     private static final Logger logger_ = Logger.getLogger( ResponseVerbHandler.class );
+    private List<ILatencySubscriber>  subscribers = new ArrayList<ILatencySubscriber>();
     
     public void doVerb(Message message)
     {     
         String messageId = message.getMessageId();        
         IAsyncCallback cb = MessagingService.getRegisteredCallback(messageId);
+        double age = 0;
         if (cb != null)
         {
             if (logger_.isDebugEnabled())
                 logger_.debug("Processing response on a callback from " + message.getMessageId() + "@" + message.getFrom());
+            age = System.currentTimeMillis() - MessagingService.getRegisteredCallbackAge(messageId);
             cb.response(message);
         }
         else
@@ -41,8 +50,23 @@ public class ResponseVerbHandler impleme
             {
                 if (logger_.isDebugEnabled())
                     logger_.debug("Processing response on an async result from " + message.getMessageId() + "@" + message.getFrom());
+                age = System.currentTimeMillis() - MessagingService.getAsyncResultAge(messageId);
                 ar.result(message);
             }
         }
+        notifySubscribers(message.getFrom(), age);
+    }
+
+    private void notifySubscribers(InetAddress host, double latency)
+    {
+        for (ILatencySubscriber subscriber : subscribers)
+        {
+            subscriber.receiveTiming(host, latency);
+        }
+    }
+
+    public void register(ILatencySubscriber subscriber)
+    {
+        subscribers.add(subscriber);
     }
 }

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/ExpiringMap.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/ExpiringMap.java?rev=984853&r1=984852&r2=984853&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/ExpiringMap.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/ExpiringMap.java Thu Aug 12 16:37:19 2010
@@ -30,12 +30,12 @@ public class ExpiringMap<K, V>
     private class CacheableObject
     {
         private V value_;
-        private long age_;
+        public long age;
 
         CacheableObject(V o)
         {
             value_ = o;
-            age_ = System.currentTimeMillis();
+            age = System.currentTimeMillis();
         }
 
         @Override
@@ -57,7 +57,7 @@ public class ExpiringMap<K, V>
 
         boolean isReadyToDie(long expiration)
         {
-            return ((System.currentTimeMillis() - age_) > expiration);
+            return ((System.currentTimeMillis() - age) > expiration);
         }
     }
 
@@ -175,6 +175,17 @@ public class ExpiringMap<K, V>
         return result;
     }
 
+    public long getAge(K key)
+    {
+        long age = 0;
+        CacheableObject co = cache_.get(key);
+        if (co != null)
+        {
+            age = co.age;
+        }
+        return age;
+    }
+
     public int size()
     {
         return cache_.size();

Added: cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java?rev=984853&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java (added)
+++ cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java Thu Aug 12 16:37:19 2010
@@ -0,0 +1,109 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.cassandra.locator;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+import org.apache.cassandra.locator.DynamicEndpointSnitch;
+import org.apache.cassandra.locator.SimpleSnitch;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class DynamicEndpointSnitchTest
+{
+    @Test
+    public void testSnitch() throws UnknownHostException, InterruptedException
+    {
+        DynamicEndpointSnitch dsnitch = new DynamicEndpointSnitch(new SimpleSnitch());
+        InetAddress self = FBUtilities.getLocalAddress();
+        ArrayList<InetAddress> order = new ArrayList<InetAddress>();
+        InetAddress host1 = InetAddress.getByName("127.0.0.1");
+        InetAddress host2 = InetAddress.getByName("127.0.0.2");
+        InetAddress host3 = InetAddress.getByName("127.0.0.3");
+
+        // first, make all hosts equal
+        for (int i = 0; i < 5; i++)
+        {
+            dsnitch.receiveTiming(host1, 1.0);
+            dsnitch.receiveTiming(host2, 1.0);
+            dsnitch.receiveTiming(host3, 1.0);
+        }
+
+        Thread.sleep(1500);
+
+        order.add(host1);
+        order.add(host2);
+        order.add(host3);
+
+        assert dsnitch.getSortedListByProximity(self, order).equals(order);
+
+        // make host1 a little worse
+        dsnitch.receiveTiming(host1, 2.0);
+        Thread.sleep(1500);
+        order.clear();
+
+        order.add(host2);
+        order.add(host3);
+        order.add(host1);
+
+        assert dsnitch.getSortedListByProximity(self, order).equals(order);
+
+        // make host2 a little worse
+        dsnitch.receiveTiming(host2, 2.0);
+        Thread.sleep(1500);
+        order.clear();
+
+        order.add(host3);
+        order.add(host2);
+        order.add(host1);
+
+        assert dsnitch.getSortedListByProximity(self, order).equals(order);
+
+        // make host3 the worst
+        for (int i = 0; i < 2; i++)
+        {
+            dsnitch.receiveTiming(host3, 2.0);
+        }
+        Thread.sleep(1500);
+        order.clear();
+
+        order.add(host2);
+        order.add(host1);
+        order.add(host3);
+
+        // make host3 equal to the others
+        for (int i = 0; i < 2; i++)
+        {
+            dsnitch.receiveTiming(host3, 1.0);
+        }
+        Thread.sleep(1500);
+        order.clear();
+
+        order.add(host1);
+        order.add(host2);
+        order.add(host3);
+
+        assert dsnitch.getSortedListByProximity(self, order).equals(order);
+    }
+}