You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2010/07/10 01:07:02 UTC
svn commit: r962720 - in /cassandra/trunk: conf/
src/java/org/apache/cassandra/config/
src/java/org/apache/cassandra/locator/ src/java/org/apache/cassandra/net/
src/java/org/apache/cassandra/utils/ test/unit/org/apache/cassandra/locator/
Author: brandonwilliams
Date: Fri Jul 9 23:07:00 2010
New Revision: 962720
URL: http://svn.apache.org/viewvc?rev=962720&view=rev
Log:
Dynamic snitch to adaptively avoid reading from slow nodes. Patch by brandonwilliams; reviewed by jbellis for CASSANDRA-981
Added:
cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java
cassandra/trunk/src/java/org/apache/cassandra/locator/ILatencyPublisher.java
- copied, changed from r962683, cassandra/trunk/src/java/org/apache/cassandra/locator/SimpleSnitch.java
cassandra/trunk/src/java/org/apache/cassandra/locator/ILatencySubscriber.java
- copied, changed from r962683, cassandra/trunk/src/java/org/apache/cassandra/locator/SimpleSnitch.java
cassandra/trunk/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java
Modified:
cassandra/trunk/conf/cassandra.yaml
cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java
cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractRackAwareSnitch.java
cassandra/trunk/src/java/org/apache/cassandra/locator/IEndpointSnitch.java
cassandra/trunk/src/java/org/apache/cassandra/locator/SimpleSnitch.java
cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java
Modified: cassandra/trunk/conf/cassandra.yaml
URL: http://svn.apache.org/viewvc/cassandra/trunk/conf/cassandra.yaml?rev=962720&r1=962719&r2=962720&view=diff
==============================================================================
--- cassandra/trunk/conf/cassandra.yaml (original)
+++ cassandra/trunk/conf/cassandra.yaml Fri Jul 9 23:07:00 2010
@@ -170,6 +170,12 @@ request_scheduler: org.apache.cassandra.
# the request scheduling. The current supported option is "keyspace"
request_scheduler_id: keyspace
+# dynamic_snitch -- This boolean controls whether the above snitch is
+# wrapped with a dynamic snitch, which will monitor read latencies
+# and avoid reading from hosts that have slowed (due to compaction,
+# for instance)
+dynamic_snitch: false
+
# A ColumnFamily is the Cassandra concept closest to a relational table.
#
# Keyspaces are separate groups of ColumnFamilies. Except in very
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/Config.java?rev=962720&r1=962719&r2=962720&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/Config.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/Config.java Fri Jul 9 23:07:00 2010
@@ -71,6 +71,7 @@ public class Config {
public Integer commitlog_sync_period_in_ms;
public String endpoint_snitch;
+ public Boolean dynamic_snitch = false;
public String request_scheduler;
public RequestSchedulerId request_scheduler_id;
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=962720&r1=962719&r2=962720&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Fri Jul 9 23:07:00 2010
@@ -27,6 +27,7 @@ import java.net.URL;
import java.net.UnknownHostException;
import java.util.*;
+import org.apache.cassandra.locator.DynamicEndpointSnitch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -415,10 +416,10 @@ public class DatabaseDescriptor
throw (ConfigurationException)e.getCause();
throw new ConfigurationException("Error instantiating " + endpointSnitchClassName + " " + e.getMessage());
}
- return snitch;
+ return conf.dynamic_snitch ? new DynamicEndpointSnitch(snitch) : snitch;
}
- public static void loadSchemas() throws IOException
+ public static void loadSchemas() throws IOException
{
// we can load tables from local storage if a version is set in the system table and that acutally maps to
// real data in the definitions table. If we do end up loading from xml, store the defintions so that we
Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java?rev=962720&r1=962719&r2=962720&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java Fri Jul 9 23:07:00 2010
@@ -21,6 +21,7 @@ package org.apache.cassandra.locator;
import java.net.InetAddress;
import java.util.Collection;
+import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -42,4 +43,9 @@ public abstract class AbstractEndpointSn
public abstract List<InetAddress> getSortedListByProximity(InetAddress address, Collection<InetAddress> unsortedAddress);
public abstract List<InetAddress> sortByProximity(InetAddress address, List<InetAddress> addresses);
+
+ public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2)
+ {
+ return a1.getHostAddress().compareTo(a2.getHostAddress());
+ }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractRackAwareSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractRackAwareSnitch.java?rev=962720&r1=962719&r2=962720&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractRackAwareSnitch.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractRackAwareSnitch.java Fri Jul 9 23:07:00 2010
@@ -72,30 +72,35 @@ public abstract class AbstractRackAwareS
{
public int compare(InetAddress a1, InetAddress a2)
{
- if (address.equals(a1) && !address.equals(a2))
- return -1;
- if (address.equals(a2) && !address.equals(a1))
- return 1;
+ return compareEndpoints(address, a1, a2);
+ };
+ });
+ return addresses;
+ }
- String addressRack = getRack(address);
- String a1Rack = getRack(a1);
- String a2Rack = getRack(a2);
- if (addressRack.equals(a1Rack) && !addressRack.equals(a2Rack))
- return -1;
- if (addressRack.equals(a2Rack) && !addressRack.equals(a1Rack))
- return 1;
+ public int compareEndpoints(InetAddress address, InetAddress a1, InetAddress a2)
+ {
+ if (address.equals(a1) && !address.equals(a2))
+ return -1;
+ if (address.equals(a2) && !address.equals(a1))
+ return 1;
- String addressDatacenter = getDatacenter(address);
- String a1Datacenter = getDatacenter(a1);
- String a2Datacenter = getDatacenter(a2);
- if (addressDatacenter.equals(a1Datacenter) && !addressDatacenter.equals(a2Datacenter))
- return -1;
- if (addressDatacenter.equals(a2Datacenter) && !addressDatacenter.equals(a1Datacenter))
- return 1;
+ String addressRack = getRack(address);
+ String a1Rack = getRack(a1);
+ String a2Rack = getRack(a2);
+ if (addressRack.equals(a1Rack) && !addressRack.equals(a2Rack))
+ return -1;
+ if (addressRack.equals(a2Rack) && !addressRack.equals(a1Rack))
+ return 1;
- return 0;
- }
- });
- return addresses;
+ String addressDatacenter = getDatacenter(address);
+ String a1Datacenter = getDatacenter(a1);
+ String a2Datacenter = getDatacenter(a2);
+ if (addressDatacenter.equals(a1Datacenter) && !addressDatacenter.equals(a2Datacenter))
+ return -1;
+ if (addressDatacenter.equals(a2Datacenter) && !addressDatacenter.equals(a1Datacenter))
+ return 1;
+
+ return 0;
}
}
Added: cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java?rev=962720&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java Fri Jul 9 23:07:00 2010
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import java.net.InetAddress;
+import java.util.*;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import java.lang.management.ManagementFactory;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.ResponseVerbHandler;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.AbstractStatsDeque;
+import org.apache.cassandra.locator.IEndpointSnitch;
+import org.apache.cassandra.locator.ILatencyPublisher;
+import org.apache.cassandra.locator.ILatencySubscriber;
+import org.apache.cassandra.locator.AbstractEndpointSnitch;
+import org.apache.cassandra.locator.DynamicEndpointSnitchMBean;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * A dynamic snitch that sorts endpoints by latency with an adapted phi failure detector
+ */
+public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILatencySubscriber, DynamicEndpointSnitchMBean
+{
+ private static int UPDATES_PER_INTERVAL = 100;
+ private static int UPDATE_INTERVAL_IN_MS = 1000;
+ private static int RESET_INTERVAL_IN_MS = 60000;
+ private static int WINDOW_SIZE = 100;
+ private boolean registered = false;
+
+ private ConcurrentHashMap<InetAddress, Double> scores = new ConcurrentHashMap();
+ private ConcurrentHashMap<InetAddress, AdaptiveLatencyTracker> windows = new ConcurrentHashMap();
+ private AtomicInteger intervalupdates = new AtomicInteger(0);
+ public IEndpointSnitch subsnitch;
+
+ public DynamicEndpointSnitch(IEndpointSnitch snitch)
+ {
+ subsnitch = snitch;
+ TimerTask update = new TimerTask()
+ {
+ public void run()
+ {
+ updateScores();
+ }
+ };
+ TimerTask reset = new TimerTask()
+ {
+ public void run()
+ {
+ // we do this so that a host considered bad has a chance to recover, otherwise would we never try
+ // to read from it, which would cause its score to never change
+ reset();
+ }
+ };
+ Timer timer = new Timer("DynamicEndpointSnitch");
+ timer.schedule(update, UPDATE_INTERVAL_IN_MS, UPDATE_INTERVAL_IN_MS);
+ timer.schedule(reset, RESET_INTERVAL_IN_MS, RESET_INTERVAL_IN_MS);
+
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ try
+ {
+ mbs.registerMBean(this, new ObjectName("org.apache.cassandra.locator:type=DynamicEndpointSnitch"));
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public String getRack(InetAddress endpoint)
+ {
+ return subsnitch.getRack(endpoint);
+ }
+
+ public String getDatacenter(InetAddress endpoint)
+ {
+ return subsnitch.getDatacenter(endpoint);
+ }
+
+ public List<InetAddress> getSortedListByProximity(final InetAddress address, Collection<InetAddress> addresses)
+ {
+ List<InetAddress> list = new ArrayList<InetAddress>(addresses);
+ sortByProximity(address, list);
+ return list;
+ }
+
+ public List<InetAddress> sortByProximity(final InetAddress address, List<InetAddress> addresses)
+ {
+ assert address == FBUtilities.getLocalAddress(); // we only know about ourself
+ Collections.sort(addresses, new Comparator<InetAddress>()
+ {
+ public int compare(InetAddress a1, InetAddress a2)
+ {
+ return compareEndpoints(address, a1, a2);
+ }
+ });
+ return addresses;
+ }
+
+ public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2)
+ {
+ Double scored1 = scores.get(a1);
+ Double scored2 = scores.get(a2);
+
+ if (scored1 == null || scored2 == null)
+ return subsnitch.compareEndpoints(target, a1, a2);
+ if (scored1.equals(scored2))
+ return 0;
+ if (scored1 < scored2)
+ return 1;
+ else
+ return -1;
+ }
+
+ public void receiveTiming(InetAddress host, Double latency) // this is cheap
+ {
+ if (intervalupdates.intValue() >= UPDATES_PER_INTERVAL)
+ return;
+ AdaptiveLatencyTracker tracker = windows.get(host);
+ if (tracker == null)
+ {
+ AdaptiveLatencyTracker alt = new AdaptiveLatencyTracker(WINDOW_SIZE);
+ tracker = windows.putIfAbsent(host, alt);
+ if (tracker == null)
+ tracker = alt;
+ }
+ tracker.add(latency);
+ intervalupdates.getAndIncrement();
+ }
+
+ private void updateScores() // this is expensive
+ {
+ if (!registered)
+ {
+ ILatencyPublisher handler = (ILatencyPublisher)MessagingService.instance.getVerbHandler(StorageService.Verb.READ_RESPONSE);
+ if (handler != null)
+ {
+ handler.register(this);
+ registered = true;
+ }
+
+ }
+ for (Map.Entry<InetAddress, AdaptiveLatencyTracker> entry: windows.entrySet())
+ {
+ scores.put(entry.getKey(), entry.getValue().score());
+ }
+ intervalupdates.set(0);
+ }
+
+ private void reset()
+ {
+ for (AdaptiveLatencyTracker tracker : windows.values())
+ {
+ tracker.clear();
+ }
+ }
+
+ public Map<InetAddress, Double> getScores()
+ {
+ return scores;
+ }
+}
+
+/** a threadsafe version of BoundedStatsDeque+ArrivalWindow with modification for arbitrary times **/
+class AdaptiveLatencyTracker extends AbstractStatsDeque
+{
+ private LinkedBlockingDeque latencies;
+ private final int size;
+ private static double SENTINEL_COMPARE = 0.0001; // arbitrary; as long as it is the same across hosts it doesn't matter
+
+ AdaptiveLatencyTracker(int size)
+ {
+ this.size = size;
+ latencies = new LinkedBlockingDeque(size);
+ }
+
+ public void add(double i)
+ {
+ latencies.offer(i);
+ }
+
+ public void clear()
+ {
+ latencies.clear();
+ }
+
+ public Iterator<Double> iterator()
+ {
+ return latencies.iterator();
+ }
+
+ public int size()
+ {
+ return latencies.size();
+ }
+
+ double p(double t)
+ {
+ double mean = mean();
+ double exponent = (-1) * (t) / mean;
+ return 1 - ( 1 - Math.pow( Math.E, exponent) );
+ }
+
+ double score()
+ {
+ double log = 0d;
+ if ( latencies.size() > 0 )
+ {
+ double probability = p(SENTINEL_COMPARE);
+ log = (-1) * Math.log10( probability );
+ }
+ return log;
+ }
+
+}
Added: cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java?rev=962720&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java Fri Jul 9 23:07:00 2010
@@ -0,0 +1,27 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.cassandra.locator;
+
+import java.net.InetAddress;
+import java.util.Map;
+
+public interface DynamicEndpointSnitchMBean {
+ public Map<InetAddress, Double> getScores();
+}
Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/IEndpointSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/IEndpointSnitch.java?rev=962720&r1=962719&r2=962720&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/IEndpointSnitch.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/IEndpointSnitch.java Fri Jul 9 23:07:00 2010
@@ -31,6 +31,16 @@ import java.util.List;
public interface IEndpointSnitch
{
/**
+ * returns a String repesenting the rack this endpoint belongs to
+ */
+ public String getRack(InetAddress endpoint);
+
+ /**
+ * returns a String representing the datacenter this endpoint belongs to
+ */
+ public String getDatacenter(InetAddress endpoint);
+
+ /**
* returns a new <tt>List</tt> sorted by proximity to the given endpoint
*/
public List<InetAddress> getSortedListByProximity(InetAddress address, Collection<InetAddress> unsortedAddress);
@@ -45,4 +55,9 @@ public interface IEndpointSnitch
* @param subscriber the subscriber to notify
*/
public void register(AbstractReplicationStrategy subscriber);
+
+ /**
+ * compares two endpoints in relation to the target endpoint, returning as Comparator.compare would
+ */
+ public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2);
}
Copied: cassandra/trunk/src/java/org/apache/cassandra/locator/ILatencyPublisher.java (from r962683, cassandra/trunk/src/java/org/apache/cassandra/locator/SimpleSnitch.java)
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/ILatencyPublisher.java?p2=cassandra/trunk/src/java/org/apache/cassandra/locator/ILatencyPublisher.java&p1=cassandra/trunk/src/java/org/apache/cassandra/locator/SimpleSnitch.java&r1=962683&r2=962720&rev=962720&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/SimpleSnitch.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/ILatencyPublisher.java Fri Jul 9 23:07:00 2010
@@ -18,24 +18,9 @@
package org.apache.cassandra.locator;
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
+import org.apache.cassandra.locator.ILatencySubscriber;
-/**
- * A simple endpoint snitch implementation does not sort addresses by
- * proximity.
- */
-public class SimpleSnitch extends AbstractEndpointSnitch
+public interface ILatencyPublisher
{
- public List<InetAddress> getSortedListByProximity(final InetAddress address, Collection<InetAddress> addresses)
- {
- return new ArrayList<InetAddress>(addresses);
- }
-
- public List<InetAddress> sortByProximity(final InetAddress address, List<InetAddress> addresses)
- {
- return addresses;
- }
+ public void register(ILatencySubscriber subcriber);
}
Copied: cassandra/trunk/src/java/org/apache/cassandra/locator/ILatencySubscriber.java (from r962683, cassandra/trunk/src/java/org/apache/cassandra/locator/SimpleSnitch.java)
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/ILatencySubscriber.java?p2=cassandra/trunk/src/java/org/apache/cassandra/locator/ILatencySubscriber.java&p1=cassandra/trunk/src/java/org/apache/cassandra/locator/SimpleSnitch.java&r1=962683&r2=962720&rev=962720&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/SimpleSnitch.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/ILatencySubscriber.java Fri Jul 9 23:07:00 2010
@@ -19,23 +19,8 @@
package org.apache.cassandra.locator;
import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-/**
- * A simple endpoint snitch implementation does not sort addresses by
- * proximity.
- */
-public class SimpleSnitch extends AbstractEndpointSnitch
+public interface ILatencySubscriber
{
- public List<InetAddress> getSortedListByProximity(final InetAddress address, Collection<InetAddress> addresses)
- {
- return new ArrayList<InetAddress>(addresses);
- }
-
- public List<InetAddress> sortByProximity(final InetAddress address, List<InetAddress> addresses)
- {
- return addresses;
- }
+ public void receiveTiming(InetAddress address, Double latency);
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/SimpleSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/SimpleSnitch.java?rev=962720&r1=962719&r2=962720&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/SimpleSnitch.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/SimpleSnitch.java Fri Jul 9 23:07:00 2010
@@ -23,12 +23,24 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import org.apache.commons.lang.NotImplementedException;
+
/**
* A simple endpoint snitch implementation does not sort addresses by
* proximity.
*/
public class SimpleSnitch extends AbstractEndpointSnitch
{
+ public String getRack(InetAddress endpoint)
+ {
+ throw new NotImplementedException();
+ }
+
+ public String getDatacenter(InetAddress endpoint)
+ {
+ throw new NotImplementedException();
+ }
+
public List<InetAddress> getSortedListByProximity(final InetAddress address, Collection<InetAddress> addresses)
{
return new ArrayList<InetAddress>(addresses);
Modified: cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=962720&r1=962719&r2=962720&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Fri Jul 9 23:07:00 2010
@@ -400,6 +400,16 @@ public class MessagingService implements
return taskCompletionMap_.remove(key);
}
+ public static long getRegisteredCallbackAge(String key)
+ {
+ return callbackMap_.getAge(key);
+ }
+
+ public static long getAsyncResultAge(String key)
+ {
+ return taskCompletionMap_.getAge(key);
+ }
+
public static ExecutorService getDeserializationExecutor()
{
return messageDeserializerExecutor_;
Modified: cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java?rev=962720&r1=962719&r2=962720&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java Fri Jul 9 23:07:00 2010
@@ -18,21 +18,30 @@
package org.apache.cassandra.net;
+
+import java.util.*;
+import java.net.InetAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.locator.ILatencyPublisher;
+import org.apache.cassandra.locator.ILatencySubscriber;
-public class ResponseVerbHandler implements IVerbHandler
+public class ResponseVerbHandler implements IVerbHandler, ILatencyPublisher
{
private static final Logger logger_ = LoggerFactory.getLogger( ResponseVerbHandler.class );
-
+ private List<ILatencySubscriber> subscribers = new ArrayList<ILatencySubscriber>();
+
+
public void doVerb(Message message)
{
String messageId = message.getMessageId();
IAsyncCallback cb = MessagingService.getRegisteredCallback(messageId);
+ double age = 0;
if (cb != null)
{
if (logger_.isDebugEnabled())
logger_.debug("Processing response on a callback from " + message.getMessageId() + "@" + message.getFrom());
+ age = System.currentTimeMillis() - MessagingService.getRegisteredCallbackAge(messageId);
cb.response(message);
}
else
@@ -42,8 +51,23 @@ public class ResponseVerbHandler impleme
{
if (logger_.isDebugEnabled())
logger_.debug("Processing response on an async result from " + message.getMessageId() + "@" + message.getFrom());
+ age = System.currentTimeMillis() - MessagingService.getAsyncResultAge(messageId);
ar.result(message);
}
}
+ notifySubscribers(message.getFrom(), age);
+ }
+
+ private void notifySubscribers(InetAddress host, double latency)
+ {
+ for (ILatencySubscriber subscriber : subscribers)
+ {
+ subscriber.receiveTiming(host, latency);
+ }
+ }
+
+ public void register(ILatencySubscriber subscriber)
+ {
+ subscribers.add(subscriber);
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java?rev=962720&r1=962719&r2=962720&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java Fri Jul 9 23:07:00 2010
@@ -129,6 +129,17 @@ public class ExpiringMap<K, V>
return result;
}
+ public long getAge(K key)
+ {
+ long age = 0;
+ CacheableObject<V> co = cache.get(key);
+ if (co != null)
+ {
+ age = co.age;
+ }
+ return age;
+ }
+
public int size()
{
return cache.size();
Added: cassandra/trunk/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java?rev=962720&view=auto
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java (added)
+++ cassandra/trunk/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java Fri Jul 9 23:07:00 2010
@@ -0,0 +1,109 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.cassandra.locator;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+import org.apache.cassandra.locator.DynamicEndpointSnitch;
+import org.apache.cassandra.locator.SimpleSnitch;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class DynamicEndpointSnitchTest
+{
+ @Test
+ public void testSnitch() throws UnknownHostException, InterruptedException
+ {
+ DynamicEndpointSnitch dsnitch = new DynamicEndpointSnitch(new SimpleSnitch());
+ InetAddress self = FBUtilities.getLocalAddress();
+ ArrayList<InetAddress> order = new ArrayList<InetAddress>();
+ InetAddress host1 = InetAddress.getByName("127.0.0.1");
+ InetAddress host2 = InetAddress.getByName("127.0.0.2");
+ InetAddress host3 = InetAddress.getByName("127.0.0.3");
+
+ // first, make all hosts equal
+ for (int i = 0; i < 5; i++)
+ {
+ dsnitch.receiveTiming(host1, 1.0);
+ dsnitch.receiveTiming(host2, 1.0);
+ dsnitch.receiveTiming(host3, 1.0);
+ }
+
+ Thread.sleep(1500);
+
+ order.add(host1);
+ order.add(host2);
+ order.add(host3);
+
+ assert dsnitch.getSortedListByProximity(self, order).equals(order);
+
+ // make host1 a little worse
+ dsnitch.receiveTiming(host1, 2.0);
+ Thread.sleep(1500);
+ order.clear();
+
+ order.add(host2);
+ order.add(host3);
+ order.add(host1);
+
+ assert dsnitch.getSortedListByProximity(self, order).equals(order);
+
+ // make host2 a little worse
+ dsnitch.receiveTiming(host2, 2.0);
+ Thread.sleep(1500);
+ order.clear();
+
+ order.add(host3);
+ order.add(host2);
+ order.add(host1);
+
+ assert dsnitch.getSortedListByProximity(self, order).equals(order);
+
+ // make host3 the worst
+ for (int i = 0; i < 2; i++)
+ {
+ dsnitch.receiveTiming(host3, 2.0);
+ }
+ Thread.sleep(1500);
+ order.clear();
+
+ order.add(host2);
+ order.add(host1);
+ order.add(host3);
+
+ // make host3 equal to the others
+ for (int i = 0; i < 2; i++)
+ {
+ dsnitch.receiveTiming(host3, 1.0);
+ }
+ Thread.sleep(1500);
+ order.clear();
+
+ order.add(host1);
+ order.add(host2);
+ order.add(host3);
+
+ assert dsnitch.getSortedListByProximity(self, order).equals(order);
+ }
+}