You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/01/19 15:43:38 UTC

svn commit: r1060828 - in /cassandra/branches/cassandra-0.6: ./ src/java/org/apache/cassandra/locator/ src/java/org/apache/cassandra/net/ src/java/org/apache/cassandra/service/

Author: jbellis
Date: Wed Jan 19 14:43:37 2011
New Revision: 1060828

URL: http://svn.apache.org/viewvc?rev=1060828&view=rev
Log:
add latency information fromlocal reads to DynamicSnitch
patch by brandonwilliams and jbellis for CASSANDRA-2004

Removed:
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/ILatencyPublisher.java
Modified:
    cassandra/branches/cassandra-0.6/CHANGES.txt
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java

Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=1060828&r1=1060827&r2=1060828&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Wed Jan 19 14:43:37 2011
@@ -2,6 +2,7 @@
  * buffer network stack to avoid inefficient small TCP messages while avoiding
    the nagle/delayed ack problem (CASSANDRA-1896)
  * fix race condition in MessagingService.targets (CASSANDRA-1959)
+ * add latency information from local reads to DynamicSnitch (CASSANDRA-2004)
 
 
 0.6.9

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java?rev=1060828&r1=1060827&r2=1060828&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java Wed Jan 19 14:43:37 2011
@@ -38,10 +38,11 @@ import org.apache.cassandra.utils.FBUtil
  */
 public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILatencySubscriber, DynamicEndpointSnitchMBean
 {
-    private static int UPDATES_PER_INTERVAL = 10000;
-    private static int UPDATE_INTERVAL_IN_MS = 100;
-    private static int RESET_INTERVAL_IN_MS = 60000 * 10;
-    private static int WINDOW_SIZE = 100;
+    private static final int UPDATES_PER_INTERVAL = 10000;
+    private static final int UPDATE_INTERVAL_IN_MS = 100;
+    private static final int RESET_INTERVAL_IN_MS = 60000 * 10;
+    private static final int WINDOW_SIZE = 100;
+
     private boolean registered = false;
 
     private ConcurrentHashMap<InetAddress, Double> scores = new ConcurrentHashMap();
@@ -151,13 +152,8 @@ public class DynamicEndpointSnitch exten
     {
         if (!registered)
         {
-       	    ILatencyPublisher handler = (ILatencyPublisher)MessagingService.instance.getVerbHandler(StorageService.Verb.READ_RESPONSE);
-            if (handler != null)
-            {
-                handler.register(this);
-                registered = true;
-            }
-
+            MessagingService.instance.register(this);
+            registered = true;
         }
         for (Map.Entry<InetAddress, AdaptiveLatencyTracker> entry: windows.entrySet())
         {

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java?rev=1060828&r1=1060827&r2=1060828&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java Wed Jan 19 14:43:37 2011
@@ -35,16 +35,12 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.base.Function;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Multimaps;
 import org.apache.log4j.Logger;
 
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.locator.ILatencyPublisher;
 import org.apache.cassandra.locator.ILatencySubscriber;
 import org.apache.cassandra.net.io.SerializerType;
 import org.apache.cassandra.net.sink.SinkManager;
@@ -56,7 +52,7 @@ import org.apache.cassandra.utils.Simple
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 import org.cliffc.high_scale_lib.NonBlockingHashSet;
 
-public class MessagingService implements ILatencyPublisher
+public class MessagingService
 {
     private static int version_ = 1;
     //TODO: make this parameter dynamic somehow.  Not sure if config is appropriate.
@@ -116,10 +112,7 @@ public class MessagingService implements
                     return null;
 
                 for (InetAddress address : addresses)
-                {
-                    for (ILatencySubscriber subscriber : subscribers)
-                        subscriber.receiveTiming(address, (double) DatabaseDescriptor.getRpcTimeout());
-                }
+                    addLatency(address, (double) DatabaseDescriptor.getRpcTimeout());
 
                 return null;
             }
@@ -140,6 +133,12 @@ public class MessagingService implements
         timer.schedule(logDropped, LOG_DROPPED_INTERVAL_IN_MS, LOG_DROPPED_INTERVAL_IN_MS);
     }
 
+    public void addLatency(InetAddress address, double latency)
+    {
+        for (ILatencySubscriber subscriber : subscribers)
+            subscriber.receiveTiming(address, latency);
+    }
+
     public byte[] hash(String type, byte data[])
     {
         byte result[];

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/ResponseVerbHandler.java?rev=1060828&r1=1060827&r2=1060828&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/ResponseVerbHandler.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/ResponseVerbHandler.java Wed Jan 19 14:43:37 2011
@@ -18,19 +18,12 @@
 
 package org.apache.cassandra.net;
 
-import java.util.ArrayList;
-import java.util.List;
-
 import org.apache.log4j.Logger;
 
-import org.apache.cassandra.locator.ILatencyPublisher;
-import org.apache.cassandra.locator.ILatencySubscriber;
-
-public class ResponseVerbHandler implements IVerbHandler, ILatencyPublisher
+public class ResponseVerbHandler implements IVerbHandler
 {
     private static final Logger logger_ = Logger.getLogger( ResponseVerbHandler.class );
-    private List<ILatencySubscriber>  subscribers = new ArrayList<ILatencySubscriber>();
-    
+
     public void doVerb(Message message)
     {     
         String messageId = message.getMessageId();
@@ -41,8 +34,7 @@ public class ResponseVerbHandler impleme
             return;
 
         // if cb is not null, then age will be valid
-        for (ILatencySubscriber subscriber : subscribers)
-            subscriber.receiveTiming(message.getFrom(), age);
+        MessagingService.instance.addLatency(message.getFrom(), age);
 
         if (cb instanceof IAsyncCallback)
         {
@@ -58,8 +50,4 @@ public class ResponseVerbHandler impleme
         }
     }
 
-    public void register(ILatencySubscriber subscriber)
-    {
-        subscribers.add(subscriber);
-    }
 }

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1060828&r1=1060827&r2=1060828&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java Wed Jan 19 14:43:37 2011
@@ -702,7 +702,8 @@ public class StorageProxy implements Sto
 
     static class weakReadLocalCallable implements Callable<Object>
     {
-        private ReadCommand command;
+        private final ReadCommand command;
+        private final long start = System.currentTimeMillis();
 
         weakReadLocalCallable(ReadCommand command)
         {
@@ -718,6 +719,7 @@ public class StorageProxy implements Sto
             Row row = command.getRow(table);
             StorageService.instance.doConsistencyCheck(row, command, FBUtilities.getLocalAddress());
 
+            MessagingService.instance.addLatency(FBUtilities.getLocalAddress(), System.currentTimeMillis() - start);
             return row;
         }
     }