You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2010/08/12 18:37:19 UTC
svn commit: r984853 - in /cassandra/branches/cassandra-0.6: ./
contrib/property_snitch/src/java/org/apache/cassandra/locator/
src/java/org/apache/cassandra/config/
src/java/org/apache/cassandra/locator/ src/java/org/apache/cassandra/net/
src/java/org/a...
Author: brandonwilliams
Date: Thu Aug 12 16:37:19 2010
New Revision: 984853
URL: http://svn.apache.org/viewvc?rev=984853&view=rev
Log:
Dynamic snitch. Patch by brandonwilliams; reviewed by jbellis for CASSANDRA-981
Added:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/ILatencyPublisher.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/ILatencySubscriber.java
cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java
Modified:
cassandra/branches/cassandra-0.6/CHANGES.txt
cassandra/branches/cassandra-0.6/contrib/property_snitch/src/java/org/apache/cassandra/locator/PropertyFileEndPointSnitch.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/IEndPointSnitch.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/ExpiringMap.java
Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=984853&r1=984852&r2=984853&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Thu Aug 12 16:37:19 2010
@@ -6,6 +6,7 @@
* fix compilation on non-sun JKDs (CASSANDRA-1061)
* remove String.trim() call on row keys in batch mutations (CASSANDRA-1235)
* Log summary of dropped messages instead of spamming log (CASSANDRA-1284)
+ * Dynamic snitch (CASSANDRA-981)
0.6.4
Modified: cassandra/branches/cassandra-0.6/contrib/property_snitch/src/java/org/apache/cassandra/locator/PropertyFileEndPointSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/contrib/property_snitch/src/java/org/apache/cassandra/locator/PropertyFileEndPointSnitch.java?rev=984853&r1=984852&r2=984853&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/contrib/property_snitch/src/java/org/apache/cassandra/locator/PropertyFileEndPointSnitch.java (original)
+++ cassandra/branches/cassandra-0.6/contrib/property_snitch/src/java/org/apache/cassandra/locator/PropertyFileEndPointSnitch.java Thu Aug 12 16:37:19 2010
@@ -110,6 +110,9 @@ public class PropertyFileEndPointSnitch
return getEndPointInfo(endPoint)[0];
}
+ public String getLocation(InetAddress endPoint) {
+ return getEndPointInfo(endPoint)[0];
+ }
/**
* Return the rack for which an endpoint resides in
*
Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=984853&r1=984852&r2=984853&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Thu Aug 12 16:37:19 2010
@@ -27,6 +27,7 @@ import org.apache.cassandra.db.marshal.B
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.locator.IEndPointSnitch;
+import org.apache.cassandra.locator.DynamicEndpointSnitch;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.FBUtilities;
@@ -626,7 +627,12 @@ public class DatabaseDescriptor
try
{
Class cls = Class.forName(endPointSnitchClassName);
- epSnitch = (IEndPointSnitch)cls.getConstructor().newInstance();
+ IEndPointSnitch snitch = (IEndPointSnitch)cls.getConstructor().newInstance();
+ String dynamic = System.getProperty("cassandra.dynamic_snitch");
+ if (dynamic == null || Boolean.getBoolean(dynamic) == false)
+ epSnitch = snitch;
+ else
+ epSnitch = new DynamicEndpointSnitch(snitch);
}
catch (ClassNotFoundException e)
{
@@ -648,7 +654,6 @@ public class DatabaseDescriptor
{
throw new ConfigurationException("Invalid endpointsnitch class " + endPointSnitchClassName + " " + e.getMessage());
}
-
String xqlTable = "/Storage/Keyspaces/Keyspace[@Name='" + ksName + "']/";
NodeList columnFamilies = xmlUtils.getRequestedNodeList(xqlTable + "ColumnFamily");
Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java?rev=984853&r1=984852&r2=984853&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java Thu Aug 12 16:37:19 2010
@@ -70,28 +70,33 @@ public abstract class AbstractEndpointSn
{
public int compare(InetAddress a1, InetAddress a2)
{
- try
- {
- if (address.equals(a1) && !address.equals(a2))
- return -1;
- if (address.equals(a2) && !address.equals(a1))
- return 1;
- if (isOnSameRack(address, a1) && !isOnSameRack(address, a2))
- return -1;
- if (isOnSameRack(address, a2) && !isOnSameRack(address, a1))
- return 1;
- if (isInSameDataCenter(address, a1) && !isInSameDataCenter(address, a2))
- return -1;
- if (isInSameDataCenter(address, a2) && !isInSameDataCenter(address, a1))
- return 1;
- return 0;
- }
- catch (UnknownHostException e)
- {
- throw new RuntimeException(e);
- }
+ return compareEndpoints(address, a1, a2);
}
});
return addresses;
}
+
+ public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2)
+ {
+ try
+ {
+ if (target.equals(a1) && !target.equals(a2))
+ return -1;
+ if (target.equals(a2) && !target.equals(a1))
+ return 1;
+ if (isOnSameRack(target, a1) && !isOnSameRack(target, a2))
+ return -1;
+ if (isOnSameRack(target, a2) && !isOnSameRack(target, a1))
+ return 1;
+ if (isInSameDataCenter(target, a1) && !isInSameDataCenter(target, a2))
+ return -1;
+ if (isInSameDataCenter(target, a2) && !isInSameDataCenter(target, a1))
+ return 1;
+ return 0;
+ }
+ catch (UnknownHostException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
}
Added: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java?rev=984853&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java (added)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java Thu Aug 12 16:37:19 2010
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.*;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import java.lang.management.ManagementFactory;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.ResponseVerbHandler;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.AbstractStatsDeque;
+import org.apache.cassandra.locator.IEndPointSnitch;
+import org.apache.cassandra.locator.ILatencyPublisher;
+import org.apache.cassandra.locator.ILatencySubscriber;
+import org.apache.cassandra.locator.AbstractEndpointSnitch;
+import org.apache.cassandra.locator.DynamicEndpointSnitchMBean;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * A dynamic snitch that sorts endpoints by latency with an adapted phi failure detector
+ */
+public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILatencySubscriber, DynamicEndpointSnitchMBean
+{
+ private static int UPDATES_PER_INTERVAL = 10000;
+ private static int UPDATE_INTERVAL_IN_MS = 100;
+ private static int RESET_INTERVAL_IN_MS = 60000 * 10;
+ private static int WINDOW_SIZE = 100;
+ private boolean registered = false;
+
+ private ConcurrentHashMap<InetAddress, Double> scores = new ConcurrentHashMap();
+ private ConcurrentHashMap<InetAddress, AdaptiveLatencyTracker> windows = new ConcurrentHashMap();
+ private AtomicInteger intervalupdates = new AtomicInteger(0);
+ public IEndPointSnitch subsnitch;
+
+ public DynamicEndpointSnitch(IEndPointSnitch snitch)
+ {
+ subsnitch = snitch;
+ TimerTask update = new TimerTask()
+ {
+ public void run()
+ {
+ updateScores();
+ }
+ };
+ TimerTask reset = new TimerTask()
+ {
+ public void run()
+ {
+ // we do this so that a host considered bad has a chance to recover, otherwise would we never try
+ // to read from it, which would cause its score to never change
+ reset();
+ }
+ };
+ Timer timer = new Timer("DynamicEndpointSnitch");
+ timer.schedule(update, UPDATE_INTERVAL_IN_MS, UPDATE_INTERVAL_IN_MS);
+ timer.schedule(reset, RESET_INTERVAL_IN_MS, RESET_INTERVAL_IN_MS);
+
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ try
+ {
+ mbs.registerMBean(this, new ObjectName("org.apache.cassandra.locator:type=DynamicEndpointSnitch"));
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public boolean isOnSameRack(InetAddress a1, InetAddress a2) throws UnknownHostException
+ {
+ return subsnitch.isOnSameRack(a1, a2);
+ }
+
+ public boolean isInSameDataCenter(InetAddress a1, InetAddress a2) throws UnknownHostException
+ {
+ return subsnitch.isInSameDataCenter(a1, a2);
+ }
+
+ public String getLocation(InetAddress endpoint) throws UnknownHostException
+ {
+ return subsnitch.getLocation(endpoint);
+ }
+
+ public List<InetAddress> getSortedListByProximity(final InetAddress address, Collection<InetAddress> addresses)
+ {
+ List<InetAddress> list = new ArrayList<InetAddress>(addresses);
+ sortByProximity(address, list);
+ return list;
+ }
+
+ public List<InetAddress> sortByProximity(final InetAddress address, List<InetAddress> addresses)
+ {
+ assert address == FBUtilities.getLocalAddress(); // we only know about ourself
+ Collections.sort(addresses, new Comparator<InetAddress>()
+ {
+ public int compare(InetAddress a1, InetAddress a2)
+ {
+ return compareEndpoints(address, a1, a2);
+ }
+ });
+ return addresses;
+ }
+
+ public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2)
+ {
+ Double scored1 = scores.get(a1);
+ Double scored2 = scores.get(a2);
+
+ if (scored1 == null || scored2 == null)
+ return subsnitch.compareEndpoints(target, a1, a2);
+ if (scored1.equals(scored2))
+ return 0;
+ if (scored1 < scored2)
+ return 1;
+ else
+ return -1;
+ }
+
+ public void receiveTiming(InetAddress host, Double latency) // this is cheap
+ {
+ if (intervalupdates.intValue() >= UPDATES_PER_INTERVAL)
+ return;
+ AdaptiveLatencyTracker tracker = windows.get(host);
+ if (tracker == null)
+ {
+ AdaptiveLatencyTracker alt = new AdaptiveLatencyTracker(WINDOW_SIZE);
+ tracker = windows.putIfAbsent(host, alt);
+ if (tracker == null)
+ tracker = alt;
+ }
+ tracker.add(latency);
+ intervalupdates.getAndIncrement();
+ }
+
+ private void updateScores() // this is expensive
+ {
+ if (!registered)
+ {
+ ILatencyPublisher handler = (ILatencyPublisher)MessagingService.instance.getVerbHandler(StorageService.Verb.READ_RESPONSE);
+ if (handler != null)
+ {
+ handler.register(this);
+ registered = true;
+ }
+
+ }
+ for (Map.Entry<InetAddress, AdaptiveLatencyTracker> entry: windows.entrySet())
+ {
+ scores.put(entry.getKey(), entry.getValue().score());
+ }
+ intervalupdates.set(0);
+ }
+
+ private void reset()
+ {
+ for (AdaptiveLatencyTracker tracker : windows.values())
+ {
+ tracker.clear();
+ }
+ }
+
+ public Map<InetAddress, Double> getScores()
+ {
+ return scores;
+ }
+}
+
+/** a threadsafe version of BoundedStatsDeque+ArrivalWindow with modification for arbitrary times **/
+class AdaptiveLatencyTracker extends AbstractStatsDeque
+{
+ private LinkedBlockingDeque latencies;
+ private final int size;
+ private static double SENTINEL_COMPARE = 0.0001; // arbitrary; as long as it is the same across hosts it doesn't matter
+
+ AdaptiveLatencyTracker(int size)
+ {
+ this.size = size;
+ latencies = new LinkedBlockingDeque(size);
+ }
+
+ public void add(double i)
+ {
+ if (!latencies.offer(i))
+ {
+ latencies.remove();
+ latencies.offer(i);
+ }
+ }
+
+ public void clear()
+ {
+ latencies.clear();
+ }
+
+ public Iterator<Double> iterator()
+ {
+ return latencies.iterator();
+ }
+
+ public int size()
+ {
+ return latencies.size();
+ }
+
+ double p(double t)
+ {
+ double mean = mean();
+ double exponent = (-1) * (t) / mean;
+ return 1 - Math.pow( Math.E, exponent);
+ }
+
+ double score()
+ {
+ double log = 0d;
+ if ( latencies.size() > 0 )
+ {
+ double probability = p(SENTINEL_COMPARE);
+ log = (-1) * Math.log10( probability );
+ }
+ return log;
+ }
+
+}
Added: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java?rev=984853&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java (added)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/DynamicEndpointSnitchMBean.java Thu Aug 12 16:37:19 2010
@@ -0,0 +1,27 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.cassandra.locator;
+
+import java.net.InetAddress;
+import java.util.Map;
+
+public interface DynamicEndpointSnitchMBean {
+ public Map<InetAddress, Double> getScores();
+}
Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/IEndPointSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/IEndPointSnitch.java?rev=984853&r1=984852&r2=984853&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/IEndPointSnitch.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/IEndPointSnitch.java Thu Aug 12 16:37:19 2010
@@ -45,5 +45,22 @@ public interface IEndPointSnitch
* This method will sort the List<InetAddress> according to the proximity of the given address.
*/
public List<InetAddress> sortByProximity(InetAddress address, List<InetAddress> addresses);
+
+ /**
+ * compares two endpoints in relation to the target endpoint, returning as Comparator.compare would
+ */
+ public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2);
+
+ /**
+ * indicates whether two endpoints are on the same rack
+ */
+ public boolean isOnSameRack(InetAddress a1, InetAddress a2) throws UnknownHostException;
+
+ /**
+ * indicates whether two endpoints are in the same datacenter
+ */
+ public boolean isInSameDataCenter(InetAddress a1, InetAddress a2) throws UnknownHostException;
+
+ public String getLocation(InetAddress endpoint) throws UnknownHostException;
}
Added: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/ILatencyPublisher.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/ILatencyPublisher.java?rev=984853&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/ILatencyPublisher.java (added)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/ILatencyPublisher.java Thu Aug 12 16:37:19 2010
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import org.apache.cassandra.locator.ILatencySubscriber;
+
+public interface ILatencyPublisher
+{
+ public void register(ILatencySubscriber subcriber);
+}
Added: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/ILatencySubscriber.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/ILatencySubscriber.java?rev=984853&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/ILatencySubscriber.java (added)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/ILatencySubscriber.java Thu Aug 12 16:37:19 2010
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import java.net.InetAddress;
+
+public interface ILatencySubscriber
+{
+ public void receiveTiming(InetAddress address, Double latency);
+}
Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java?rev=984853&r1=984852&r2=984853&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java Thu Aug 12 16:37:19 2010
@@ -415,6 +415,16 @@ public class MessagingService
return taskCompletionMap_.remove(key);
}
+ public static long getRegisteredCallbackAge(String key)
+ {
+ return callbackMap_.getAge(key);
+ }
+
+ public static long getAsyncResultAge(String key)
+ {
+ return taskCompletionMap_.getAge(key);
+ }
+
public static ExecutorService getDeserializationExecutor()
{
return messageDeserializerExecutor_;
Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/ResponseVerbHandler.java?rev=984853&r1=984852&r2=984853&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/ResponseVerbHandler.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/ResponseVerbHandler.java Thu Aug 12 16:37:19 2010
@@ -18,20 +18,29 @@
package org.apache.cassandra.net;
+import java.util.*;
+import java.net.InetAddress;
+
+import org.apache.cassandra.locator.ILatencyPublisher;
+import org.apache.cassandra.locator.ILatencySubscriber;
+
import org.apache.log4j.Logger;
-public class ResponseVerbHandler implements IVerbHandler
+public class ResponseVerbHandler implements IVerbHandler, ILatencyPublisher
{
private static final Logger logger_ = Logger.getLogger( ResponseVerbHandler.class );
+ private List<ILatencySubscriber> subscribers = new ArrayList<ILatencySubscriber>();
public void doVerb(Message message)
{
String messageId = message.getMessageId();
IAsyncCallback cb = MessagingService.getRegisteredCallback(messageId);
+ double age = 0;
if (cb != null)
{
if (logger_.isDebugEnabled())
logger_.debug("Processing response on a callback from " + message.getMessageId() + "@" + message.getFrom());
+ age = System.currentTimeMillis() - MessagingService.getRegisteredCallbackAge(messageId);
cb.response(message);
}
else
@@ -41,8 +50,23 @@ public class ResponseVerbHandler impleme
{
if (logger_.isDebugEnabled())
logger_.debug("Processing response on an async result from " + message.getMessageId() + "@" + message.getFrom());
+ age = System.currentTimeMillis() - MessagingService.getAsyncResultAge(messageId);
ar.result(message);
}
}
+ notifySubscribers(message.getFrom(), age);
+ }
+
+ private void notifySubscribers(InetAddress host, double latency)
+ {
+ for (ILatencySubscriber subscriber : subscribers)
+ {
+ subscriber.receiveTiming(host, latency);
+ }
+ }
+
+ public void register(ILatencySubscriber subscriber)
+ {
+ subscribers.add(subscriber);
}
}
Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/ExpiringMap.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/ExpiringMap.java?rev=984853&r1=984852&r2=984853&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/ExpiringMap.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/ExpiringMap.java Thu Aug 12 16:37:19 2010
@@ -30,12 +30,12 @@ public class ExpiringMap<K, V>
private class CacheableObject
{
private V value_;
- private long age_;
+ public long age;
CacheableObject(V o)
{
value_ = o;
- age_ = System.currentTimeMillis();
+ age = System.currentTimeMillis();
}
@Override
@@ -57,7 +57,7 @@ public class ExpiringMap<K, V>
boolean isReadyToDie(long expiration)
{
- return ((System.currentTimeMillis() - age_) > expiration);
+ return ((System.currentTimeMillis() - age) > expiration);
}
}
@@ -175,6 +175,17 @@ public class ExpiringMap<K, V>
return result;
}
+ public long getAge(K key)
+ {
+ long age = 0;
+ CacheableObject co = cache_.get(key);
+ if (co != null)
+ {
+ age = co.age;
+ }
+ return age;
+ }
+
public int size()
{
return cache_.size();
Added: cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java?rev=984853&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java (added)
+++ cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java Thu Aug 12 16:37:19 2010
@@ -0,0 +1,109 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.cassandra.locator;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+import org.apache.cassandra.locator.DynamicEndpointSnitch;
+import org.apache.cassandra.locator.SimpleSnitch;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class DynamicEndpointSnitchTest
+{
+ @Test
+ public void testSnitch() throws UnknownHostException, InterruptedException
+ {
+ DynamicEndpointSnitch dsnitch = new DynamicEndpointSnitch(new SimpleSnitch());
+ InetAddress self = FBUtilities.getLocalAddress();
+ ArrayList<InetAddress> order = new ArrayList<InetAddress>();
+ InetAddress host1 = InetAddress.getByName("127.0.0.1");
+ InetAddress host2 = InetAddress.getByName("127.0.0.2");
+ InetAddress host3 = InetAddress.getByName("127.0.0.3");
+
+ // first, make all hosts equal
+ for (int i = 0; i < 5; i++)
+ {
+ dsnitch.receiveTiming(host1, 1.0);
+ dsnitch.receiveTiming(host2, 1.0);
+ dsnitch.receiveTiming(host3, 1.0);
+ }
+
+ Thread.sleep(1500);
+
+ order.add(host1);
+ order.add(host2);
+ order.add(host3);
+
+ assert dsnitch.getSortedListByProximity(self, order).equals(order);
+
+ // make host1 a little worse
+ dsnitch.receiveTiming(host1, 2.0);
+ Thread.sleep(1500);
+ order.clear();
+
+ order.add(host2);
+ order.add(host3);
+ order.add(host1);
+
+ assert dsnitch.getSortedListByProximity(self, order).equals(order);
+
+ // make host2 a little worse
+ dsnitch.receiveTiming(host2, 2.0);
+ Thread.sleep(1500);
+ order.clear();
+
+ order.add(host3);
+ order.add(host2);
+ order.add(host1);
+
+ assert dsnitch.getSortedListByProximity(self, order).equals(order);
+
+ // make host3 the worst
+ for (int i = 0; i < 2; i++)
+ {
+ dsnitch.receiveTiming(host3, 2.0);
+ }
+ Thread.sleep(1500);
+ order.clear();
+
+ order.add(host2);
+ order.add(host1);
+ order.add(host3);
+
+ // make host3 equal to the others
+ for (int i = 0; i < 2; i++)
+ {
+ dsnitch.receiveTiming(host3, 1.0);
+ }
+ Thread.sleep(1500);
+ order.clear();
+
+ order.add(host1);
+ order.add(host2);
+ order.add(host3);
+
+ assert dsnitch.getSortedListByProximity(self, order).equals(order);
+ }
+}