You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2010/07/10 01:07:02 UTC

svn commit: r962720 - in /cassandra/trunk: conf/ src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/locator/ src/java/org/apache/cassandra/net/ src/java/org/apache/cassandra/utils/ test/unit/org/apache/cassandra/locator/

Author: brandonwilliams
Date: Fri Jul  9 23:07:00 2010
New Revision: 962720

URL: http://svn.apache.org/viewvc?rev=962720&view=rev
Log:
Dynamic snitch to adaptively avoid reading from slow nodes.  Patch by brandonwilliams; reviewed by jbellis for CASSANDRA-981

Added:
    cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
    cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java
    cassandra/trunk/src/java/org/apache/cassandra/locator/ILatencyPublisher.java
      - copied, changed from r962683, cassandra/trunk/src/java/org/apache/cassandra/locator/SimpleSnitch.java
    cassandra/trunk/src/java/org/apache/cassandra/locator/ILatencySubscriber.java
      - copied, changed from r962683, cassandra/trunk/src/java/org/apache/cassandra/locator/SimpleSnitch.java
    cassandra/trunk/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java
Modified:
    cassandra/trunk/conf/cassandra.yaml
    cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
    cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java
    cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractRackAwareSnitch.java
    cassandra/trunk/src/java/org/apache/cassandra/locator/IEndpointSnitch.java
    cassandra/trunk/src/java/org/apache/cassandra/locator/SimpleSnitch.java
    cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
    cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java

Modified: cassandra/trunk/conf/cassandra.yaml
URL: http://svn.apache.org/viewvc/cassandra/trunk/conf/cassandra.yaml?rev=962720&r1=962719&r2=962720&view=diff
==============================================================================
--- cassandra/trunk/conf/cassandra.yaml (original)
+++ cassandra/trunk/conf/cassandra.yaml Fri Jul  9 23:07:00 2010
@@ -170,6 +170,12 @@ request_scheduler: org.apache.cassandra.
 # the request scheduling. The current supported option is "keyspace"
 request_scheduler_id: keyspace
 
+# dynamic_snitch -- This boolean controls whether the above snitch is
+# wrapped with a dynamic snitch, which will monitor read latencies
+# and avoid reading from hosts that have slowed (due to compaction,
+# for instance)
+dynamic_snitch: false
+
 # A ColumnFamily is the Cassandra concept closest to a relational table. 
 #
 # Keyspaces are separate groups of ColumnFamilies.  Except in very

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/Config.java?rev=962720&r1=962719&r2=962720&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/Config.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/Config.java Fri Jul  9 23:07:00 2010
@@ -71,6 +71,7 @@ public class Config {
     public Integer commitlog_sync_period_in_ms;
     
     public String endpoint_snitch;
+    public Boolean dynamic_snitch = false;
     
     public String request_scheduler;
     public RequestSchedulerId request_scheduler_id;

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=962720&r1=962719&r2=962720&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Fri Jul  9 23:07:00 2010
@@ -27,6 +27,7 @@ import java.net.URL;
 import java.net.UnknownHostException;
 import java.util.*;
 
+import org.apache.cassandra.locator.DynamicEndpointSnitch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -415,10 +416,10 @@ public class DatabaseDescriptor
                 throw (ConfigurationException)e.getCause();
             throw new ConfigurationException("Error instantiating " + endpointSnitchClassName + " " + e.getMessage());
         }
-        return snitch;
+        return conf.dynamic_snitch ? new DynamicEndpointSnitch(snitch) : snitch;
     }
     
-    public static void loadSchemas() throws IOException
+    public static void loadSchemas() throws IOException                         
     {
         // we can load tables from local storage if a version is set in the system table and that acutally maps to
         // real data in the definitions table.  If we do end up loading from xml, store the defintions so that we

Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java?rev=962720&r1=962719&r2=962720&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java Fri Jul  9 23:07:00 2010
@@ -21,6 +21,7 @@ package org.apache.cassandra.locator;
 
 import java.net.InetAddress;
 import java.util.Collection;
+import java.util.Comparator;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 
@@ -42,4 +43,9 @@ public abstract class AbstractEndpointSn
 
     public abstract List<InetAddress> getSortedListByProximity(InetAddress address, Collection<InetAddress> unsortedAddress);
     public abstract List<InetAddress> sortByProximity(InetAddress address, List<InetAddress> addresses);
+
+    public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2)
+    {
+        return a1.getHostAddress().compareTo(a2.getHostAddress());
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractRackAwareSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractRackAwareSnitch.java?rev=962720&r1=962719&r2=962720&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractRackAwareSnitch.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractRackAwareSnitch.java Fri Jul  9 23:07:00 2010
@@ -72,30 +72,35 @@ public abstract class AbstractRackAwareS
         {
             public int compare(InetAddress a1, InetAddress a2)
             {
-                if (address.equals(a1) && !address.equals(a2))
-                    return -1;
-                if (address.equals(a2) && !address.equals(a1))
-                    return 1;
+                   return compareEndpoints(address, a1, a2);
+            };
+        });
+        return addresses;
+    }
 
-                String addressRack = getRack(address);
-                String a1Rack = getRack(a1);
-                String a2Rack = getRack(a2);
-                if (addressRack.equals(a1Rack) && !addressRack.equals(a2Rack))
-                    return -1;
-                if (addressRack.equals(a2Rack) && !addressRack.equals(a1Rack))
-                    return 1;
+    public int compareEndpoints(InetAddress address, InetAddress a1, InetAddress a2)
+    {
+        if (address.equals(a1) && !address.equals(a2))
+            return -1;
+        if (address.equals(a2) && !address.equals(a1))
+            return 1;
 
-                String addressDatacenter = getDatacenter(address);
-                String a1Datacenter = getDatacenter(a1);
-                String a2Datacenter = getDatacenter(a2);
-                if (addressDatacenter.equals(a1Datacenter) && !addressDatacenter.equals(a2Datacenter))
-                    return -1;
-                if (addressDatacenter.equals(a2Datacenter) && !addressDatacenter.equals(a1Datacenter))
-                    return 1;
+        String addressRack = getRack(address);
+        String a1Rack = getRack(a1);
+        String a2Rack = getRack(a2);
+        if (addressRack.equals(a1Rack) && !addressRack.equals(a2Rack))
+            return -1;
+        if (addressRack.equals(a2Rack) && !addressRack.equals(a1Rack))
+            return 1;
 
-                return 0;
-            }
-        });
-        return addresses;
+        String addressDatacenter = getDatacenter(address);
+        String a1Datacenter = getDatacenter(a1);
+        String a2Datacenter = getDatacenter(a2);
+        if (addressDatacenter.equals(a1Datacenter) && !addressDatacenter.equals(a2Datacenter))
+            return -1;
+        if (addressDatacenter.equals(a2Datacenter) && !addressDatacenter.equals(a1Datacenter))
+            return 1;
+
+        return 0;
     }
 }

Added: cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java?rev=962720&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java Fri Jul  9 23:07:00 2010
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import java.net.InetAddress;
+import java.util.*;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import java.lang.management.ManagementFactory;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.ResponseVerbHandler;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.AbstractStatsDeque;
+import org.apache.cassandra.locator.IEndpointSnitch;
+import org.apache.cassandra.locator.ILatencyPublisher;
+import org.apache.cassandra.locator.ILatencySubscriber;
+import org.apache.cassandra.locator.AbstractEndpointSnitch;
+import org.apache.cassandra.locator.DynamicEndpointSnitchMBean;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * A dynamic snitch that sorts endpoints by latency with an adapted phi failure detector
+ */
+public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILatencySubscriber, DynamicEndpointSnitchMBean
+{
+    private static int UPDATES_PER_INTERVAL = 100;
+    private static int UPDATE_INTERVAL_IN_MS = 1000;
+    private static int RESET_INTERVAL_IN_MS = 60000;
+    private static int WINDOW_SIZE = 100;
+    private boolean registered = false;
+
+    private ConcurrentHashMap<InetAddress, Double> scores = new ConcurrentHashMap();
+    private ConcurrentHashMap<InetAddress, AdaptiveLatencyTracker> windows = new ConcurrentHashMap();
+    private AtomicInteger intervalupdates = new AtomicInteger(0);
+    public IEndpointSnitch subsnitch;
+
+    public DynamicEndpointSnitch(IEndpointSnitch snitch)
+    {
+        subsnitch = snitch;
+        TimerTask update = new TimerTask()
+        {
+            public void run()
+            {
+                updateScores();
+            }
+        };
+        TimerTask reset = new TimerTask()
+        {
+            public void run()
+            {
+                // we do this so that a host considered bad has a chance to recover, otherwise would we never try
+                // to read from it, which would cause its score to never change
+                reset();
+            }
+        };
+        Timer timer = new Timer("DynamicEndpointSnitch");
+        timer.schedule(update, UPDATE_INTERVAL_IN_MS, UPDATE_INTERVAL_IN_MS);
+        timer.schedule(reset, RESET_INTERVAL_IN_MS, RESET_INTERVAL_IN_MS);
+
+        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+        try
+        {
+            mbs.registerMBean(this, new ObjectName("org.apache.cassandra.locator:type=DynamicEndpointSnitch"));
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public String getRack(InetAddress endpoint)
+    {
+        return subsnitch.getRack(endpoint);
+    }
+
+    public String getDatacenter(InetAddress endpoint)
+    {
+        return subsnitch.getDatacenter(endpoint);
+    }
+
+    public List<InetAddress> getSortedListByProximity(final InetAddress address, Collection<InetAddress> addresses)
+    {
+        List<InetAddress> list = new ArrayList<InetAddress>(addresses);
+        sortByProximity(address, list);
+        return list;
+    }
+
+    public List<InetAddress> sortByProximity(final InetAddress address, List<InetAddress> addresses)
+    {
+        assert address == FBUtilities.getLocalAddress(); // we only know about ourself
+        Collections.sort(addresses, new Comparator<InetAddress>()
+        {
+            public int compare(InetAddress a1, InetAddress a2)
+            {
+                return compareEndpoints(address, a1, a2);
+            }
+        });
+        return addresses;
+    }
+
+    public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2)
+    {
+        Double scored1 = scores.get(a1);
+        Double scored2 = scores.get(a2);
+
+        if (scored1 == null || scored2 == null)
+            return subsnitch.compareEndpoints(target, a1, a2);
+        if (scored1.equals(scored2))
+            return 0;
+        if (scored1 < scored2)
+            return 1;
+        else
+            return -1;
+    }
+
+    public void receiveTiming(InetAddress host, Double latency) // this is cheap
+    {
+        if (intervalupdates.intValue() >= UPDATES_PER_INTERVAL)
+            return;
+        AdaptiveLatencyTracker tracker = windows.get(host);
+        if (tracker == null)
+        {
+            AdaptiveLatencyTracker alt = new AdaptiveLatencyTracker(WINDOW_SIZE);
+            tracker = windows.putIfAbsent(host, alt);
+            if (tracker == null)
+                tracker = alt;
+        }
+        tracker.add(latency);
+        intervalupdates.getAndIncrement();
+    }
+
+    private void updateScores() // this is expensive
+    {
+        if (!registered)
+        {
+       	    ILatencyPublisher handler = (ILatencyPublisher)MessagingService.instance.getVerbHandler(StorageService.Verb.READ_RESPONSE);
+            if (handler != null)
+            {
+                handler.register(this);
+                registered = true;
+            }
+
+        }
+        for (Map.Entry<InetAddress, AdaptiveLatencyTracker> entry: windows.entrySet())
+        {
+            scores.put(entry.getKey(), entry.getValue().score());
+        }
+        intervalupdates.set(0);
+    }
+
+    private void reset()
+    {
+        for (AdaptiveLatencyTracker tracker : windows.values())
+        {
+            tracker.clear();
+        }
+    }
+
+    public Map<InetAddress, Double> getScores()
+    {
+        return scores;
+    }
+}
+
+/** a threadsafe version of BoundedStatsDeque+ArrivalWindow with modification for arbitrary times **/
+class AdaptiveLatencyTracker extends AbstractStatsDeque
+{
+    private LinkedBlockingDeque latencies;
+    private final int size;                                   
+    private static double SENTINEL_COMPARE = 0.0001; // arbitrary; as long as it is the same across hosts it doesn't matter
+
+    AdaptiveLatencyTracker(int size)
+    {
+        this.size = size;
+        latencies = new LinkedBlockingDeque(size);
+    }
+
+    public void add(double i)
+    {
+        latencies.offer(i);
+    }
+
+    public void clear()
+    {
+        latencies.clear();
+    }
+
+    public Iterator<Double> iterator()
+    {
+        return latencies.iterator();
+    }
+
+    public int size()
+    {
+        return latencies.size();
+    }
+
+    double p(double t)
+    {
+        double mean = mean();
+        double exponent = (-1) * (t) / mean;
+        return 1 - ( 1 - Math.pow( Math.E, exponent) );
+    }
+
+    double score()
+    {
+        double log = 0d;
+        if ( latencies.size() > 0 )
+        {
+            double probability = p(SENTINEL_COMPARE);
+            log = (-1) * Math.log10( probability );
+        }
+        return log;
+    }
+
+}

Added: cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java?rev=962720&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java Fri Jul  9 23:07:00 2010
@@ -0,0 +1,27 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.cassandra.locator;
+
+import java.net.InetAddress;
+import java.util.Map;
+
+public interface DynamicEndpointSnitchMBean {
+    public Map<InetAddress, Double> getScores();
+}

Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/IEndpointSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/IEndpointSnitch.java?rev=962720&r1=962719&r2=962720&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/IEndpointSnitch.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/IEndpointSnitch.java Fri Jul  9 23:07:00 2010
@@ -31,6 +31,16 @@ import java.util.List;
 public interface IEndpointSnitch
 {
     /**
+     * returns a String repesenting the rack this endpoint belongs to
+     */
+    public String getRack(InetAddress endpoint);
+
+    /**
+     * returns a String representing the datacenter this endpoint belongs to
+     */
+    public String getDatacenter(InetAddress endpoint);
+
+    /**
      * returns a new <tt>List</tt> sorted by proximity to the given endpoint
      */
     public List<InetAddress> getSortedListByProximity(InetAddress address, Collection<InetAddress> unsortedAddress);
@@ -45,4 +55,9 @@ public interface IEndpointSnitch
      * @param subscriber the subscriber to notify
      */
     public void register(AbstractReplicationStrategy subscriber);
+
+    /**
+     * compares two endpoints in relation to the target endpoint, returning as Comparator.compare would
+     */
+    public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2);
 }

Copied: cassandra/trunk/src/java/org/apache/cassandra/locator/ILatencyPublisher.java (from r962683, cassandra/trunk/src/java/org/apache/cassandra/locator/SimpleSnitch.java)
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/ILatencyPublisher.java?p2=cassandra/trunk/src/java/org/apache/cassandra/locator/ILatencyPublisher.java&p1=cassandra/trunk/src/java/org/apache/cassandra/locator/SimpleSnitch.java&r1=962683&r2=962720&rev=962720&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/SimpleSnitch.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/ILatencyPublisher.java Fri Jul  9 23:07:00 2010
@@ -18,24 +18,9 @@
 
 package org.apache.cassandra.locator;
 
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
+import org.apache.cassandra.locator.ILatencySubscriber;
 
-/**
- * A simple endpoint snitch implementation does not sort addresses by
- * proximity.
- */
-public class SimpleSnitch extends AbstractEndpointSnitch
+public interface ILatencyPublisher
 {
-    public List<InetAddress> getSortedListByProximity(final InetAddress address, Collection<InetAddress> addresses)
-    {
-        return new ArrayList<InetAddress>(addresses);
-    }
-
-    public List<InetAddress> sortByProximity(final InetAddress address, List<InetAddress> addresses)
-    {
-        return addresses;
-    }
+    public void register(ILatencySubscriber subcriber);
 }

Copied: cassandra/trunk/src/java/org/apache/cassandra/locator/ILatencySubscriber.java (from r962683, cassandra/trunk/src/java/org/apache/cassandra/locator/SimpleSnitch.java)
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/ILatencySubscriber.java?p2=cassandra/trunk/src/java/org/apache/cassandra/locator/ILatencySubscriber.java&p1=cassandra/trunk/src/java/org/apache/cassandra/locator/SimpleSnitch.java&r1=962683&r2=962720&rev=962720&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/SimpleSnitch.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/ILatencySubscriber.java Fri Jul  9 23:07:00 2010
@@ -19,23 +19,8 @@
 package org.apache.cassandra.locator;
 
 import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
 
-/**
- * A simple endpoint snitch implementation does not sort addresses by
- * proximity.
- */
-public class SimpleSnitch extends AbstractEndpointSnitch
+public interface ILatencySubscriber
 {
-    public List<InetAddress> getSortedListByProximity(final InetAddress address, Collection<InetAddress> addresses)
-    {
-        return new ArrayList<InetAddress>(addresses);
-    }
-
-    public List<InetAddress> sortByProximity(final InetAddress address, List<InetAddress> addresses)
-    {
-        return addresses;
-    }
+    public void receiveTiming(InetAddress address, Double latency);
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/SimpleSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/SimpleSnitch.java?rev=962720&r1=962719&r2=962720&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/SimpleSnitch.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/SimpleSnitch.java Fri Jul  9 23:07:00 2010
@@ -23,12 +23,24 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 
+import org.apache.commons.lang.NotImplementedException;
+
 /**
  * A simple endpoint snitch implementation does not sort addresses by
  * proximity.
  */
 public class SimpleSnitch extends AbstractEndpointSnitch
 {
+    public String getRack(InetAddress endpoint)
+    {
+        throw new NotImplementedException();
+    }
+
+    public String getDatacenter(InetAddress endpoint)
+    {
+        throw new NotImplementedException();
+    }
+    
     public List<InetAddress> getSortedListByProximity(final InetAddress address, Collection<InetAddress> addresses)
     {
         return new ArrayList<InetAddress>(addresses);

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=962720&r1=962719&r2=962720&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Fri Jul  9 23:07:00 2010
@@ -400,6 +400,16 @@ public class MessagingService implements
         return taskCompletionMap_.remove(key);
     }
 
+    public static long getRegisteredCallbackAge(String key)
+    {
+        return callbackMap_.getAge(key);
+    }
+
+    public static long getAsyncResultAge(String key)
+    {
+        return taskCompletionMap_.getAge(key);
+    }
+
     public static ExecutorService getDeserializationExecutor()
     {
         return messageDeserializerExecutor_;

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java?rev=962720&r1=962719&r2=962720&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java Fri Jul  9 23:07:00 2010
@@ -18,21 +18,30 @@
 
 package org.apache.cassandra.net;
 
+
+import java.util.*;
+import java.net.InetAddress;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.cassandra.locator.ILatencyPublisher;
+import org.apache.cassandra.locator.ILatencySubscriber;
 
-public class ResponseVerbHandler implements IVerbHandler
+public class ResponseVerbHandler implements IVerbHandler, ILatencyPublisher
 {
     private static final Logger logger_ = LoggerFactory.getLogger( ResponseVerbHandler.class );
-    
+    private List<ILatencySubscriber>  subscribers = new ArrayList<ILatencySubscriber>();
+
+
     public void doVerb(Message message)
     {     
         String messageId = message.getMessageId();        
         IAsyncCallback cb = MessagingService.getRegisteredCallback(messageId);
+        double age = 0;
         if (cb != null)
         {
             if (logger_.isDebugEnabled())
                 logger_.debug("Processing response on a callback from " + message.getMessageId() + "@" + message.getFrom());
+            age = System.currentTimeMillis() - MessagingService.getRegisteredCallbackAge(messageId);
             cb.response(message);
         }
         else
@@ -42,8 +51,23 @@ public class ResponseVerbHandler impleme
             {
                 if (logger_.isDebugEnabled())
                     logger_.debug("Processing response on an async result from " + message.getMessageId() + "@" + message.getFrom());
+                age = System.currentTimeMillis() - MessagingService.getAsyncResultAge(messageId);
                 ar.result(message);
             }
         }
+        notifySubscribers(message.getFrom(), age);
+    }
+
+    private void notifySubscribers(InetAddress host, double latency)
+    {
+        for (ILatencySubscriber subscriber : subscribers)
+        {
+            subscriber.receiveTiming(host, latency);
+        }
+    }
+
+    public void register(ILatencySubscriber subscriber)
+    {
+        subscribers.add(subscriber);
     }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java?rev=962720&r1=962719&r2=962720&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java Fri Jul  9 23:07:00 2010
@@ -129,6 +129,17 @@ public class ExpiringMap<K, V>
         return result;
     }
 
+    public long getAge(K key)
+    {
+        long age = 0;
+        CacheableObject<V> co = cache.get(key);
+        if (co != null)
+        {
+            age = co.age;
+        }
+        return age;
+    }
+
     public int size()
     {
         return cache.size();

Added: cassandra/trunk/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java?rev=962720&view=auto
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java (added)
+++ cassandra/trunk/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java Fri Jul  9 23:07:00 2010
@@ -0,0 +1,109 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.cassandra.locator;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+import org.apache.cassandra.locator.DynamicEndpointSnitch;
+import org.apache.cassandra.locator.SimpleSnitch;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class DynamicEndpointSnitchTest
+{
+    @Test
+    public void testSnitch() throws UnknownHostException, InterruptedException
+    {
+        DynamicEndpointSnitch dsnitch = new DynamicEndpointSnitch(new SimpleSnitch());
+        InetAddress self = FBUtilities.getLocalAddress();
+        ArrayList<InetAddress> order = new ArrayList<InetAddress>();
+        InetAddress host1 = InetAddress.getByName("127.0.0.1");
+        InetAddress host2 = InetAddress.getByName("127.0.0.2");
+        InetAddress host3 = InetAddress.getByName("127.0.0.3");
+
+        // first, make all hosts equal
+        for (int i = 0; i < 5; i++)
+        {
+            dsnitch.receiveTiming(host1, 1.0);
+            dsnitch.receiveTiming(host2, 1.0);
+            dsnitch.receiveTiming(host3, 1.0);
+        }
+
+        Thread.sleep(1500);
+
+        order.add(host1);
+        order.add(host2);
+        order.add(host3);
+
+        assert dsnitch.getSortedListByProximity(self, order).equals(order);
+
+        // make host1 a little worse
+        dsnitch.receiveTiming(host1, 2.0);
+        Thread.sleep(1500);
+        order.clear();
+
+        order.add(host2);
+        order.add(host3);
+        order.add(host1);
+
+        assert dsnitch.getSortedListByProximity(self, order).equals(order);
+
+        // make host2 a little worse
+        dsnitch.receiveTiming(host2, 2.0);
+        Thread.sleep(1500);
+        order.clear();
+
+        order.add(host3);
+        order.add(host2);
+        order.add(host1);
+
+        assert dsnitch.getSortedListByProximity(self, order).equals(order);
+
+        // make host3 the worst
+        for (int i = 0; i < 2; i++)
+        {
+            dsnitch.receiveTiming(host3, 2.0);
+        }
+        Thread.sleep(1500);
+        order.clear();
+
+        order.add(host2);
+        order.add(host1);
+        order.add(host3);
+
+        // make host3 equal to the others
+        for (int i = 0; i < 2; i++)
+        {
+            dsnitch.receiveTiming(host3, 1.0);
+        }
+        Thread.sleep(1500);
+        order.clear();
+
+        order.add(host1);
+        order.add(host2);
+        order.add(host3);
+
+        assert dsnitch.getSortedListByProximity(self, order).equals(order);
+    }
+}