You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2011/09/13 18:01:10 UTC

svn commit: r1170219 - in /cassandra/branches/cassandra-0.8: ./ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/service/ src/java/org/apache/cassandra/utils/

Author: slebresne
Date: Tue Sep 13 16:01:10 2011
New Revision: 1170219

URL: http://svn.apache.org/viewvc?rev=1170219&view=rev
Log:
Randomize choice of first replica for counter increments
patch by slebresne; reviewed by jbellis for CASSANDRA-2890

Modified:
    cassandra/branches/cassandra-0.8/CHANGES.txt
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/CounterMutation.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/HintedHandOffManager.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/ReadCallback.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/utils/FBUtilities.java

Modified: cassandra/branches/cassandra-0.8/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1170219&r1=1170218&r2=1170219&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.8/CHANGES.txt Tue Sep 13 16:01:10 2011
@@ -11,6 +11,7 @@
    datacenters (CASSANDRA-3152)
  * Improve caching of same-version Messages on digest and repair paths
    (CASSANDRA-3158)
+ * Randomize choice of first replica for counter increment (CASSANDRA-2890)
 
 
 0.8.5

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/CounterMutation.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/CounterMutation.java?rev=1170219&r1=1170218&r2=1170219&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/CounterMutation.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/CounterMutation.java Tue Sep 13 16:01:10 2011
@@ -29,7 +29,6 @@ import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.LinkedList;
-import java.util.Random;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,15 +51,6 @@ public class CounterMutation implements 
     private final RowMutation rowMutation;
     private final ConsistencyLevel consistency;
 
-    private static final ThreadLocal<Random> random = new ThreadLocal<Random>()
-    {
-        @Override
-        protected Random initialValue()
-        {
-            return new Random();
-        }
-    };
-
     public CounterMutation(RowMutation rowMutation, ConsistencyLevel consistency)
     {
         this.rowMutation = rowMutation;
@@ -135,7 +125,7 @@ public class CounterMutation implements 
     {
         ColumnFamily cf = row.cf;
         // random check for merging to allow lessening the performance impact
-        if (cf.metadata().getMergeShardsChance() > random.get().nextDouble())
+        if (cf.metadata().getMergeShardsChance() > FBUtilities.threadLocalRandom().nextDouble())
         {
             ColumnFamily merger = computeShardMerger(cf);
             if (merger != null)

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=1170219&r1=1170218&r2=1170219&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/HintedHandOffManager.java Tue Sep 13 16:01:10 2011
@@ -305,7 +305,7 @@ public class HintedHandOffManager implem
             // sleep a random amount to stagger handoff delivery from different replicas.
             // (if we had to wait, then gossiper randomness took care of that for us already.)
             if (waited == 0) {
-                int sleep = new Random().nextInt(60000);
+                int sleep = FBUtilities.threadLocalRandom().nextInt(60000);
                 logger_.debug("Sleeping {}ms to stagger hint delivery", sleep);
                 Thread.sleep(sleep);
             }

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/ReadCallback.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/ReadCallback.java?rev=1170219&r1=1170218&r2=1170219&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/ReadCallback.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/ReadCallback.java Tue Sep 13 16:01:10 2011
@@ -42,6 +42,7 @@ import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.thrift.ConsistencyLevel;
 import org.apache.cassandra.thrift.UnavailableException;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.SimpleCondition;
 import org.apache.cassandra.utils.WrappedRunnable;
 
@@ -49,15 +50,6 @@ public class ReadCallback<T> implements 
 {
     protected static final Logger logger = LoggerFactory.getLogger( ReadCallback.class );
 
-    private static final ThreadLocal<Random> random = new ThreadLocal<Random>()
-    {
-        @Override
-        protected Random initialValue()
-        {
-            return new Random();
-        }
-    };
-
     public final IResponseResolver<T> resolver;
     protected final SimpleCondition condition = new SimpleCondition();
     private final long startTime;
@@ -98,7 +90,7 @@ public class ReadCallback<T> implements 
             String table = ((RowDigestResolver) resolver).table;
             String columnFamily = ((ReadCommand) command).getColumnFamilyName();
             CFMetaData cfmd = DatabaseDescriptor.getTableMetaData(table).get(columnFamily);
-            return cfmd.getReadRepairChance() > random.get().nextDouble();
+            return cfmd.getReadRepairChance() > FBUtilities.threadLocalRandom().nextDouble();
         }
         // we don't read repair on range scans
         return false;

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1170219&r1=1170218&r2=1170219&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java Tue Sep 13 16:01:10 2011
@@ -48,6 +48,7 @@ import org.apache.cassandra.dht.IPartiti
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.net.IAsyncCallback;
 import org.apache.cassandra.net.Message;
@@ -383,7 +384,7 @@ public class StorageProxy implements Sto
      */
     public static IWriteResponseHandler mutateCounter(CounterMutation cm, String localDataCenter) throws UnavailableException, TimeoutException, IOException
     {
-        InetAddress endpoint = findSuitableEndpoint(cm.getTable(), cm.key());
+        InetAddress endpoint = findSuitableEndpoint(cm.getTable(), cm.key(), localDataCenter);
 
         if (endpoint.equals(FBUtilities.getLocalAddress()))
         {
@@ -409,15 +410,43 @@ public class StorageProxy implements Sto
         }
     }
 
-    private static InetAddress findSuitableEndpoint(String table, ByteBuffer key) throws UnavailableException
+    /**
+     * Find a suitable replica as leader for counter update.
+     * For now, we pick a random replica in the local DC (or ask the snitch if
+     * there is no replica alive in the local DC).
+     * TODO: if we track the latency of the counter writes (which makes sense
+     * contrarily to standard writes since there is a read involved), we could
+     * trust the dynamic snitch entirely, which may be a better solution. It
+     * is unclear we want to mix those latencies with read latencies, so this
+     * may be a bit involved.
+     */
+    private static InetAddress findSuitableEndpoint(String table, ByteBuffer key, String localDataCenter) throws UnavailableException
     {
+        IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
         List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(table, key);
-        DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getLocalAddress(), endpoints);
         if (endpoints.isEmpty())
             throw new UnavailableException();
-        return endpoints.get(0);
+
+        List<InetAddress> localEndpoints = new ArrayList<InetAddress>();
+        for (InetAddress endpoint : endpoints)
+        {
+            if (snitch.getDatacenter(endpoint).equals(localDataCenter))
+                localEndpoints.add(endpoint);
+        }
+        if (localEndpoints.isEmpty())
+        {
+            // No endpoint in local DC, pick the closest endpoint according to the snitch
+            snitch.sortByProximity(FBUtilities.getLocalAddress(), endpoints);
+            return endpoints.get(0);
+        }
+        else
+        {
+            return localEndpoints.get(FBUtilities.threadLocalRandom().nextInt(localEndpoints.size()));
+        }
     }
 
+
+
     // Must be called on a replica of the mutation. This replica becomes the
     // leader of this mutation.
     public static IWriteResponseHandler applyCounterMutationOnLeader(CounterMutation cm, String localDataCenter) throws UnavailableException, TimeoutException, IOException

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/utils/FBUtilities.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/utils/FBUtilities.java?rev=1170219&r1=1170218&r2=1170219&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/utils/FBUtilities.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/utils/FBUtilities.java Tue Sep 13 16:01:10 2011
@@ -133,6 +133,15 @@ public class FBUtilities
         }
     };
 
+    private static final ThreadLocal<Random> localRandom = new ThreadLocal<Random>()
+    {
+        @Override
+        protected Random initialValue()
+        {
+            return new Random();
+        }
+    };
+
     public static final int MAX_UNSIGNED_SHORT = 0xFFFF;
 
     public static MessageDigest threadLocalMD5Digest()
@@ -152,6 +161,11 @@ public class FBUtilities
         }
     }
 
+    public static Random threadLocalRandom()
+    {
+        return localRandom.get();
+    }
+
     /**
      * Parses a string representing either a fraction, absolute value or percentage.
      */