You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2013/12/15 11:30:50 UTC
git commit: Improve batchlog write performance with vnodes
Updated Branches:
refs/heads/cassandra-1.2 a3d91dc9d -> 4be9e6720
Improve batchlog write performance with vnodes
patch by Jonathan Ellis and Rick Branson; reviewed by Aleksey Yeschenko
for CASSANDRA-6488
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4be9e672
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4be9e672
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4be9e672
Branch: refs/heads/cassandra-1.2
Commit: 4be9e6720d9f94a83aa42153c3e71ae1e557d2d9
Parents: a3d91dc
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Sun Dec 15 13:29:56 2013 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Sun Dec 15 13:29:56 2013 +0300
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../locator/AbstractReplicationStrategy.java | 35 ++++--------------
.../apache/cassandra/locator/TokenMetadata.java | 35 +++++++++++++-----
.../apache/cassandra/service/StorageProxy.java | 39 +++++++++-----------
4 files changed, 53 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4be9e672/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e586592..b55393b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -17,6 +17,7 @@
(CASSANDRA-6413)
* (Hadoop) add describe_local_ring (CASSANDRA-6268)
* Fix handling of concurrent directory creation failure (CASSANDRA-6459)
+ * Improve batchlog write performance with vnodes (CASSANDRA-6488)
1.2.12
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4be9e672/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
index c36fde4..85e229c 100644
--- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
+++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
@@ -20,12 +20,10 @@ package org.apache.cassandra.locator;
import java.lang.reflect.Constructor;
import java.net.InetAddress;
import java.util.*;
-import java.util.concurrent.locks.Lock;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
-import com.google.common.util.concurrent.Striped;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,14 +54,8 @@ public abstract class AbstractReplicationStrategy
public final Map<String, String> configOptions;
private final TokenMetadata tokenMetadata;
- // We want to make updating our replicas asynchronous vs the "master" TokenMetadata instance,
- // so that our ownership calculations never block Gossip from processing an ownership change.
- // But, we also can't afford to re-clone TM for each range after cache invalidation (CASSANDRA-6345),
- // so we keep our own copy here.
- //
- // Writes to tokenMetadataClone should be synchronized.
- private volatile TokenMetadata tokenMetadataClone = null;
- private volatile long clonedTokenMetadataVersion = 0;
+ // track when the token range changes, signaling we need to invalidate our endpoint cache
+ private volatile long lastInvalidatedVersion = 0;
public IEndpointSnitch snitch;
@@ -85,16 +77,15 @@ public abstract class AbstractReplicationStrategy
{
long lastVersion = tokenMetadata.getRingVersion();
- if (lastVersion > clonedTokenMetadataVersion)
+ if (lastVersion > lastInvalidatedVersion)
{
synchronized (this)
{
- if (lastVersion > clonedTokenMetadataVersion)
+ if (lastVersion > lastInvalidatedVersion)
{
logger.debug("clearing cached endpoints");
- tokenMetadataClone = null;
cachedEndpoints.clear();
- clonedTokenMetadataVersion = lastVersion;
+ lastInvalidatedVersion = lastVersion;
}
}
}
@@ -116,19 +107,9 @@ public abstract class AbstractReplicationStrategy
ArrayList<InetAddress> endpoints = getCachedEndpoints(keyToken);
if (endpoints == null)
{
- TokenMetadata tm; // local reference in case another thread nulls tMC out from under us
- if ((tm = tokenMetadataClone) == null)
- {
- // synchronize to prevent thundering herd post-invalidation
- synchronized (this)
- {
- if ((tm = tokenMetadataClone) == null)
- tm = tokenMetadataClone = tokenMetadata.cloneOnlyTokenMap();
- }
- // if our clone got invalidated, it's possible there is a new token to account for too
- keyToken = TokenMetadata.firstToken(tm.sortedTokens(), searchToken);
- }
-
+ TokenMetadata tm = tokenMetadata.cloneOnlyTokenMap();
+ // if our cache got invalidated, it's possible there is a new token to account for too
+ keyToken = TokenMetadata.firstToken(tm.sortedTokens(), searchToken);
endpoints = new ArrayList<InetAddress>(calculateNaturalEndpoints(searchToken, tm));
cachedEndpoints.put(keyToken, endpoints);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4be9e672/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index b724894..be0f7c7 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -585,22 +586,37 @@ public class TokenMetadata
}
}
+ private final AtomicReference<TokenMetadata> cachedTokenMap = new AtomicReference<TokenMetadata>();
+
/**
* Create a copy of TokenMetadata with only tokenToEndpointMap. That is, pending ranges,
* bootstrap tokens and leaving endpoints are not included in the copy.
+ *
+ * This uses a cached copy that is invalided when the ring changes, so in the common case
+ * no extra locking is required.
*/
public TokenMetadata cloneOnlyTokenMap()
{
- lock.readLock().lock();
- try
- {
- return new TokenMetadata(SortedBiMultiValMap.<Token, InetAddress>create(tokenToEndpointMap, null, inetaddressCmp),
- HashBiMap.create(endpointToHostIdMap),
- new Topology(topology));
- }
- finally
+ TokenMetadata tm = cachedTokenMap.get();
+ if (tm != null)
+ return tm;
+
+ // synchronize is to prevent thundering herd (CASSANDRA-6345); lock.readLock is for correctness vs updates to our internals
+ synchronized (this)
{
- lock.readLock().unlock();
+ lock.readLock().lock();
+ try
+ {
+ tm = new TokenMetadata(SortedBiMultiValMap.<Token, InetAddress>create(tokenToEndpointMap, null, inetaddressCmp),
+ HashBiMap.create(endpointToHostIdMap),
+ new Topology(topology));
+ cachedTokenMap.set(tm);
+ return tm;
+ }
+ finally
+ {
+ lock.readLock().unlock();
+ }
}
}
@@ -1057,6 +1073,7 @@ public class TokenMetadata
public void invalidateCachedRings()
{
ringVersion++;
+ cachedTokenMap.set(null);
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4be9e672/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 3e9f2cb..376edb6 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -23,6 +23,7 @@ import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.*;
+import java.util.Random;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import javax.management.MBeanServer;
@@ -414,30 +415,32 @@ public class StorageProxy implements StorageProxyMBean
* - replicas should be alive according to the failure detector
* - replicas should be in the local datacenter
* - choose min(2, number of qualifying candiates above)
- * - allow the local node to be the only replica only if it's a single-node cluster
+ * - allow the local node to be the only replica only if it's a single-node DC
*/
- private static Collection<InetAddress> getBatchlogEndpoints(String localDataCenter, ConsistencyLevel consistencyLevel) throws UnavailableException
+ private static Collection<InetAddress> getBatchlogEndpoints(String localDataCenter, ConsistencyLevel consistencyLevel)
+ throws UnavailableException
{
- // will include every known node in the DC, including localhost.
TokenMetadata.Topology topology = StorageService.instance.getTokenMetadata().cloneOnlyTokenMap().getTopology();
- Collection<InetAddress> localMembers = topology.getDatacenterEndpoints().get(localDataCenter);
+ List<InetAddress> localEndpoints = new ArrayList<InetAddress>(topology.getDatacenterEndpoints().get(localDataCenter));
// special case for single-node datacenters
- if (localMembers.size() == 1)
- return localMembers;
+ if (localEndpoints.size() == 1)
+ return localEndpoints;
- // not a single-node cluster - don't count the local node.
- localMembers.remove(FBUtilities.getBroadcastAddress());
+ List<InetAddress> chosenEndpoints = new ArrayList<InetAddress>(2);
+ int startOffset = new Random().nextInt(localEndpoints.size());
- // include only alive nodes
- List<InetAddress> candidates = new ArrayList<InetAddress>(localMembers.size());
- for (InetAddress member : localMembers)
+ // starts at some random point in the list, advances forward until the end, then loops
+ // around to the beginning, advancing again until it is back at the starting point again.
+ for (int i = 0; i < localEndpoints.size() && chosenEndpoints.size() < 2; i++)
{
- if (FailureDetector.instance.isAlive(member))
- candidates.add(member);
+ InetAddress endpoint = localEndpoints.get((i + startOffset) % localEndpoints.size());
+ // skip localhost and non-alive nodes
+ if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && FailureDetector.instance.isAlive(endpoint))
+ chosenEndpoints.add(endpoint);
}
- if (candidates.isEmpty())
+ if (chosenEndpoints.isEmpty())
{
if (consistencyLevel == ConsistencyLevel.ANY)
return Collections.singleton(FBUtilities.getBroadcastAddress());
@@ -445,13 +448,7 @@ public class StorageProxy implements StorageProxyMBean
throw new UnavailableException(ConsistencyLevel.ONE, 1, 0);
}
- if (candidates.size() > 2)
- {
- Collections.shuffle(candidates);
- candidates = candidates.subList(0, 2);
- }
-
- return candidates;
+ return chosenEndpoints;
}
/**