You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2011/09/13 18:01:10 UTC
svn commit: r1170219 - in /cassandra/branches/cassandra-0.8: ./
src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/service/
src/java/org/apache/cassandra/utils/
Author: slebresne
Date: Tue Sep 13 16:01:10 2011
New Revision: 1170219
URL: http://svn.apache.org/viewvc?rev=1170219&view=rev
Log:
Randomize choice of first replica for counter increments
patch by slebresne; reviewed by jbellis for CASSANDRA-2890
Modified:
cassandra/branches/cassandra-0.8/CHANGES.txt
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/CounterMutation.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/HintedHandOffManager.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/ReadCallback.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/utils/FBUtilities.java
Modified: cassandra/branches/cassandra-0.8/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1170219&r1=1170218&r2=1170219&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.8/CHANGES.txt Tue Sep 13 16:01:10 2011
@@ -11,6 +11,7 @@
datacenters (CASSANDRA-3152)
* Improve caching of same-version Messages on digest and repair paths
(CASSANDRA-3158)
+ * Randomize choice of first replica for counter increment (CASSANDRA-2890)
0.8.5
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/CounterMutation.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/CounterMutation.java?rev=1170219&r1=1170218&r2=1170219&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/CounterMutation.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/CounterMutation.java Tue Sep 13 16:01:10 2011
@@ -29,7 +29,6 @@ import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.LinkedList;
-import java.util.Random;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,15 +51,6 @@ public class CounterMutation implements
private final RowMutation rowMutation;
private final ConsistencyLevel consistency;
- private static final ThreadLocal<Random> random = new ThreadLocal<Random>()
- {
- @Override
- protected Random initialValue()
- {
- return new Random();
- }
- };
-
public CounterMutation(RowMutation rowMutation, ConsistencyLevel consistency)
{
this.rowMutation = rowMutation;
@@ -135,7 +125,7 @@ public class CounterMutation implements
{
ColumnFamily cf = row.cf;
// random check for merging to allow lessening the performance impact
- if (cf.metadata().getMergeShardsChance() > random.get().nextDouble())
+ if (cf.metadata().getMergeShardsChance() > FBUtilities.threadLocalRandom().nextDouble())
{
ColumnFamily merger = computeShardMerger(cf);
if (merger != null)
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=1170219&r1=1170218&r2=1170219&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/HintedHandOffManager.java Tue Sep 13 16:01:10 2011
@@ -305,7 +305,7 @@ public class HintedHandOffManager implem
// sleep a random amount to stagger handoff delivery from different replicas.
// (if we had to wait, then gossiper randomness took care of that for us already.)
if (waited == 0) {
- int sleep = new Random().nextInt(60000);
+ int sleep = FBUtilities.threadLocalRandom().nextInt(60000);
logger_.debug("Sleeping {}ms to stagger hint delivery", sleep);
Thread.sleep(sleep);
}
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/ReadCallback.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/ReadCallback.java?rev=1170219&r1=1170218&r2=1170219&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/ReadCallback.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/ReadCallback.java Tue Sep 13 16:01:10 2011
@@ -42,6 +42,7 @@ import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.UnavailableException;
+import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.SimpleCondition;
import org.apache.cassandra.utils.WrappedRunnable;
@@ -49,15 +50,6 @@ public class ReadCallback<T> implements
{
protected static final Logger logger = LoggerFactory.getLogger( ReadCallback.class );
- private static final ThreadLocal<Random> random = new ThreadLocal<Random>()
- {
- @Override
- protected Random initialValue()
- {
- return new Random();
- }
- };
-
public final IResponseResolver<T> resolver;
protected final SimpleCondition condition = new SimpleCondition();
private final long startTime;
@@ -98,7 +90,7 @@ public class ReadCallback<T> implements
String table = ((RowDigestResolver) resolver).table;
String columnFamily = ((ReadCommand) command).getColumnFamilyName();
CFMetaData cfmd = DatabaseDescriptor.getTableMetaData(table).get(columnFamily);
- return cfmd.getReadRepairChance() > random.get().nextDouble();
+ return cfmd.getReadRepairChance() > FBUtilities.threadLocalRandom().nextDouble();
}
// we don't read repair on range scans
return false;
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1170219&r1=1170218&r2=1170219&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java Tue Sep 13 16:01:10 2011
@@ -48,6 +48,7 @@ import org.apache.cassandra.dht.IPartiti
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.locator.IEndpointSnitch;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.net.IAsyncCallback;
import org.apache.cassandra.net.Message;
@@ -383,7 +384,7 @@ public class StorageProxy implements Sto
*/
public static IWriteResponseHandler mutateCounter(CounterMutation cm, String localDataCenter) throws UnavailableException, TimeoutException, IOException
{
- InetAddress endpoint = findSuitableEndpoint(cm.getTable(), cm.key());
+ InetAddress endpoint = findSuitableEndpoint(cm.getTable(), cm.key(), localDataCenter);
if (endpoint.equals(FBUtilities.getLocalAddress()))
{
@@ -409,15 +410,43 @@ public class StorageProxy implements Sto
}
}
- private static InetAddress findSuitableEndpoint(String table, ByteBuffer key) throws UnavailableException
+ /**
+ * Find a suitable replica as leader for counter update.
+ * For now, we pick a random replica in the local DC (or ask the snitch if
+ * there is no replica alive in the local DC).
+ * TODO: if we track the latency of the counter writes (which makes sense
+ * contrarily to standard writes since there is a read involved), we could
+ * trust the dynamic snitch entirely, which may be a better solution. It
+ * is unclear we want to mix those latencies with read latencies, so this
+ * may be a bit involved.
+ */
+ private static InetAddress findSuitableEndpoint(String table, ByteBuffer key, String localDataCenter) throws UnavailableException
{
+ IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(table, key);
- DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getLocalAddress(), endpoints);
if (endpoints.isEmpty())
throw new UnavailableException();
- return endpoints.get(0);
+
+ List<InetAddress> localEndpoints = new ArrayList<InetAddress>();
+ for (InetAddress endpoint : endpoints)
+ {
+ if (snitch.getDatacenter(endpoint).equals(localDataCenter))
+ localEndpoints.add(endpoint);
+ }
+ if (localEndpoints.isEmpty())
+ {
+ // No endpoint in local DC, pick the closest endpoint according to the snitch
+ snitch.sortByProximity(FBUtilities.getLocalAddress(), endpoints);
+ return endpoints.get(0);
+ }
+ else
+ {
+ return localEndpoints.get(FBUtilities.threadLocalRandom().nextInt(localEndpoints.size()));
+ }
}
+
+
// Must be called on a replica of the mutation. This replica becomes the
// leader of this mutation.
public static IWriteResponseHandler applyCounterMutationOnLeader(CounterMutation cm, String localDataCenter) throws UnavailableException, TimeoutException, IOException
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/utils/FBUtilities.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/utils/FBUtilities.java?rev=1170219&r1=1170218&r2=1170219&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/utils/FBUtilities.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/utils/FBUtilities.java Tue Sep 13 16:01:10 2011
@@ -133,6 +133,15 @@ public class FBUtilities
}
};
+ private static final ThreadLocal<Random> localRandom = new ThreadLocal<Random>()
+ {
+ @Override
+ protected Random initialValue()
+ {
+ return new Random();
+ }
+ };
+
public static final int MAX_UNSIGNED_SHORT = 0xFFFF;
public static MessageDigest threadLocalMD5Digest()
@@ -152,6 +161,11 @@ public class FBUtilities
}
}
+ public static Random threadLocalRandom()
+ {
+ return localRandom.get();
+ }
+
/**
* Parses a string representing either a fraction, absolute value or percentage.
*/