You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/01/19 15:43:38 UTC
svn commit: r1060828 - in /cassandra/branches/cassandra-0.6: ./
src/java/org/apache/cassandra/locator/ src/java/org/apache/cassandra/net/
src/java/org/apache/cassandra/service/
Author: jbellis
Date: Wed Jan 19 14:43:37 2011
New Revision: 1060828
URL: http://svn.apache.org/viewvc?rev=1060828&view=rev
Log:
add latency information fromlocal reads to DynamicSnitch
patch by brandonwilliams and jbellis for CASSANDRA-2004
Removed:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/ILatencyPublisher.java
Modified:
cassandra/branches/cassandra-0.6/CHANGES.txt
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=1060828&r1=1060827&r2=1060828&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Wed Jan 19 14:43:37 2011
@@ -2,6 +2,7 @@
* buffer network stack to avoid inefficient small TCP messages while avoiding
the nagle/delayed ack problem (CASSANDRA-1896)
* fix race condition in MessagingService.targets (CASSANDRA-1959)
+ * add latency information from local reads to DynamicSnitch (CASSANDRA-2004)
0.6.9
Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java?rev=1060828&r1=1060827&r2=1060828&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java Wed Jan 19 14:43:37 2011
@@ -38,10 +38,11 @@ import org.apache.cassandra.utils.FBUtil
*/
public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILatencySubscriber, DynamicEndpointSnitchMBean
{
- private static int UPDATES_PER_INTERVAL = 10000;
- private static int UPDATE_INTERVAL_IN_MS = 100;
- private static int RESET_INTERVAL_IN_MS = 60000 * 10;
- private static int WINDOW_SIZE = 100;
+ private static final int UPDATES_PER_INTERVAL = 10000;
+ private static final int UPDATE_INTERVAL_IN_MS = 100;
+ private static final int RESET_INTERVAL_IN_MS = 60000 * 10;
+ private static final int WINDOW_SIZE = 100;
+
private boolean registered = false;
private ConcurrentHashMap<InetAddress, Double> scores = new ConcurrentHashMap();
@@ -151,13 +152,8 @@ public class DynamicEndpointSnitch exten
{
if (!registered)
{
- ILatencyPublisher handler = (ILatencyPublisher)MessagingService.instance.getVerbHandler(StorageService.Verb.READ_RESPONSE);
- if (handler != null)
- {
- handler.register(this);
- registered = true;
- }
-
+ MessagingService.instance.register(this);
+ registered = true;
}
for (Map.Entry<InetAddress, AdaptiveLatencyTracker> entry: windows.entrySet())
{
Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java?rev=1060828&r1=1060827&r2=1060828&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java Wed Jan 19 14:43:37 2011
@@ -35,16 +35,12 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.base.Function;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Multimaps;
import org.apache.log4j.Logger;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.locator.ILatencyPublisher;
import org.apache.cassandra.locator.ILatencySubscriber;
import org.apache.cassandra.net.io.SerializerType;
import org.apache.cassandra.net.sink.SinkManager;
@@ -56,7 +52,7 @@ import org.apache.cassandra.utils.Simple
import org.cliffc.high_scale_lib.NonBlockingHashMap;
import org.cliffc.high_scale_lib.NonBlockingHashSet;
-public class MessagingService implements ILatencyPublisher
+public class MessagingService
{
private static int version_ = 1;
//TODO: make this parameter dynamic somehow. Not sure if config is appropriate.
@@ -116,10 +112,7 @@ public class MessagingService implements
return null;
for (InetAddress address : addresses)
- {
- for (ILatencySubscriber subscriber : subscribers)
- subscriber.receiveTiming(address, (double) DatabaseDescriptor.getRpcTimeout());
- }
+ addLatency(address, (double) DatabaseDescriptor.getRpcTimeout());
return null;
}
@@ -140,6 +133,12 @@ public class MessagingService implements
timer.schedule(logDropped, LOG_DROPPED_INTERVAL_IN_MS, LOG_DROPPED_INTERVAL_IN_MS);
}
+ public void addLatency(InetAddress address, double latency)
+ {
+ for (ILatencySubscriber subscriber : subscribers)
+ subscriber.receiveTiming(address, latency);
+ }
+
public byte[] hash(String type, byte data[])
{
byte result[];
Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/ResponseVerbHandler.java?rev=1060828&r1=1060827&r2=1060828&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/ResponseVerbHandler.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/ResponseVerbHandler.java Wed Jan 19 14:43:37 2011
@@ -18,19 +18,12 @@
package org.apache.cassandra.net;
-import java.util.ArrayList;
-import java.util.List;
-
import org.apache.log4j.Logger;
-import org.apache.cassandra.locator.ILatencyPublisher;
-import org.apache.cassandra.locator.ILatencySubscriber;
-
-public class ResponseVerbHandler implements IVerbHandler, ILatencyPublisher
+public class ResponseVerbHandler implements IVerbHandler
{
private static final Logger logger_ = Logger.getLogger( ResponseVerbHandler.class );
- private List<ILatencySubscriber> subscribers = new ArrayList<ILatencySubscriber>();
-
+
public void doVerb(Message message)
{
String messageId = message.getMessageId();
@@ -41,8 +34,7 @@ public class ResponseVerbHandler impleme
return;
// if cb is not null, then age will be valid
- for (ILatencySubscriber subscriber : subscribers)
- subscriber.receiveTiming(message.getFrom(), age);
+ MessagingService.instance.addLatency(message.getFrom(), age);
if (cb instanceof IAsyncCallback)
{
@@ -58,8 +50,4 @@ public class ResponseVerbHandler impleme
}
}
- public void register(ILatencySubscriber subscriber)
- {
- subscribers.add(subscriber);
- }
}
Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1060828&r1=1060827&r2=1060828&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java Wed Jan 19 14:43:37 2011
@@ -702,7 +702,8 @@ public class StorageProxy implements Sto
static class weakReadLocalCallable implements Callable<Object>
{
- private ReadCommand command;
+ private final ReadCommand command;
+ private final long start = System.currentTimeMillis();
weakReadLocalCallable(ReadCommand command)
{
@@ -718,6 +719,7 @@ public class StorageProxy implements Sto
Row row = command.getRow(table);
StorageService.instance.doConsistencyCheck(row, command, FBUtilities.getLocalAddress());
+ MessagingService.instance.addLatency(FBUtilities.getLocalAddress(), System.currentTimeMillis() - start);
return row;
}
}