You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/11/26 21:13:16 UTC
[3/6] git commit: Fix thundering herd on endpoint cache invalidation
patch by rbranson and jbellis for CASSANDRA-6345
Fix thundering herd on endpoint cache invalidation
patch by rbranson and jbellis for CASSANDRA-6345
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8145c835
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8145c835
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8145c835
Branch: refs/heads/trunk
Commit: 8145c83566450feb68a12352ac88efe9983ec266
Parents: fce1735
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Nov 26 14:09:56 2013 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Nov 26 14:09:56 2013 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../locator/AbstractReplicationStrategy.java | 58 +++++++++++++-------
.../apache/cassandra/locator/TokenMetadata.java | 47 +++++++---------
3 files changed, 59 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8145c835/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 57c1896..8d443f9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
1.2.13
+ * Fix thundering herd on endpoint cache invalidation (CASSANDRA-6345)
* Optimize FD phi calculation (CASSANDRA-6386)
* Improve initial FD phi estimate when starting up (CASSANDRA-6385)
* Don't list CQL3 table in CLI describe even if named explicitely (CASSANDRA-5750)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8145c835/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
index e17b0b4..51c4119 100644
--- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
+++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
@@ -20,10 +20,12 @@ package org.apache.cassandra.locator;
import java.lang.reflect.Constructor;
import java.net.InetAddress;
import java.util.*;
+import java.util.concurrent.locks.Lock;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.Striped;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,6 +56,15 @@ public abstract class AbstractReplicationStrategy
public final Map<String, String> configOptions;
private final TokenMetadata tokenMetadata;
+ // We want to make updating our replicas asynchronous vs the "master" TokenMetadata instance,
+ // so that our ownership calculations never block Gossip from processing an ownership change.
+ // But, we also can't afford to re-clone TM for each range after cache invalidation (CASSANDRA-6345),
+ // so we keep our own copy here.
+ //
+ // Writes to tokenMetadataClone should be synchronized.
+ private volatile TokenMetadata tokenMetadataClone = null;
+ private volatile long clonedTokenMetadataVersion = 0;
+
public IEndpointSnitch snitch;
AbstractReplicationStrategy(String tableName, TokenMetadata tokenMetadata, IEndpointSnitch snitch, Map<String, String> configOptions)
@@ -63,7 +74,6 @@ public abstract class AbstractReplicationStrategy
assert tokenMetadata != null;
this.tokenMetadata = tokenMetadata;
this.snitch = snitch;
- this.tokenMetadata.register(this);
this.configOptions = configOptions == null ? Collections.<String, String>emptyMap() : configOptions;
this.tableName = tableName;
// lazy-initialize table itself since we don't create them until after the replication strategies
@@ -73,18 +83,23 @@ public abstract class AbstractReplicationStrategy
public ArrayList<InetAddress> getCachedEndpoints(Token t)
{
- return cachedEndpoints.get(t);
- }
+ long lastVersion = tokenMetadata.getRingVersion();
- public void cacheEndpoint(Token t, ArrayList<InetAddress> addr)
- {
- cachedEndpoints.put(t, addr);
- }
+ if (lastVersion > clonedTokenMetadataVersion)
+ {
+ synchronized (this)
+ {
+ if (lastVersion > clonedTokenMetadataVersion)
+ {
+ logger.debug("clearing cached endpoints");
+ tokenMetadataClone = null;
+ cachedEndpoints.clear();
+ clonedTokenMetadataVersion = lastVersion;
+ }
+ }
+ }
- public void clearEndpointCache()
- {
- logger.debug("clearing cached endpoints");
- cachedEndpoints.clear();
+ return cachedEndpoints.get(t);
}
/**
@@ -101,10 +116,20 @@ public abstract class AbstractReplicationStrategy
ArrayList<InetAddress> endpoints = getCachedEndpoints(keyToken);
if (endpoints == null)
{
- TokenMetadata tokenMetadataClone = tokenMetadata.cloneOnlyTokenMap();
- keyToken = TokenMetadata.firstToken(tokenMetadataClone.sortedTokens(), searchToken);
+ if (tokenMetadataClone == null)
+ {
+ // synchronize to prevent thundering herd post-invalidation
+ synchronized (this)
+ {
+ if (tokenMetadataClone == null)
+ tokenMetadataClone = tokenMetadata.cloneOnlyTokenMap();
+ }
+ // if our clone got invalidated, it's possible there is a new token to account for too
+ keyToken = TokenMetadata.firstToken(tokenMetadataClone.sortedTokens(), searchToken);
+ }
+
endpoints = new ArrayList<InetAddress>(calculateNaturalEndpoints(searchToken, tokenMetadataClone));
- cacheEndpoint(keyToken, endpoints);
+ cachedEndpoints.put(keyToken, endpoints);
}
return new ArrayList<InetAddress>(endpoints);
@@ -204,11 +229,6 @@ public abstract class AbstractReplicationStrategy
return getAddressRanges(temp).get(pendingAddress);
}
- public void invalidateCachedTokenEndpointValues()
- {
- clearEndpointCache();
- }
-
public abstract void validateOptions() throws ConfigurationException;
/*
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8145c835/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index 5ab1b3f..818ca8f 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -22,15 +22,10 @@ import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.google.common.collect.*;
-
-import org.apache.cassandra.utils.BiMultiValMap;
-import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.SortedBiMultiValMap;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,6 +35,9 @@ import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.BiMultiValMap;
+import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.SortedBiMultiValMap;
public class TokenMetadata
{
@@ -96,8 +94,6 @@ public class TokenMetadata
private volatile ArrayList<Token> sortedTokens;
private final Topology topology;
- /* list of subscribers that are notified when the tokenToEndpointMap changed */
- private final CopyOnWriteArrayList<AbstractReplicationStrategy> subscribers = new CopyOnWriteArrayList<AbstractReplicationStrategy>();
private static final Comparator<InetAddress> inetaddressCmp = new Comparator<InetAddress>()
{
@@ -107,6 +103,9 @@ public class TokenMetadata
}
};
+ // signals replication strategies that nodes have joined or left the ring and they need to recompute ownership
+ private volatile long ringVersion = 0;
+
public TokenMetadata()
{
this(SortedBiMultiValMap.<Token, InetAddress>create(null, inetaddressCmp),
@@ -428,7 +427,7 @@ public class TokenMetadata
leavingEndpoints.remove(endpoint);
endpointToHostIdMap.remove(endpoint);
sortedTokens = sortTokens();
- invalidateCaches();
+ invalidateCachedRings();
}
finally
{
@@ -456,7 +455,7 @@ public class TokenMetadata
}
}
- invalidateCaches();
+ invalidateCachedRings();
}
finally
{
@@ -885,7 +884,7 @@ public class TokenMetadata
leavingEndpoints.clear();
pendingRanges.clear();
endpointToHostIdMap.clear();
- invalidateCaches();
+ invalidateCachedRings();
}
finally
{
@@ -977,24 +976,6 @@ public class TokenMetadata
return sb.toString();
}
- public void invalidateCaches()
- {
- for (AbstractReplicationStrategy subscriber : subscribers)
- {
- subscriber.invalidateCachedTokenEndpointValues();
- }
- }
-
- public void register(AbstractReplicationStrategy subscriber)
- {
- subscribers.add(subscriber);
- }
-
- public void unregister(AbstractReplicationStrategy subscriber)
- {
- subscribers.remove(subscriber);
- }
-
public Collection<InetAddress> pendingEndpointsFor(Token token, String table)
{
Map<Range<Token>, Collection<InetAddress>> ranges = getPendingRanges(table);
@@ -1068,6 +1049,16 @@ public class TokenMetadata
return topology;
}
+ public long getRingVersion()
+ {
+ return ringVersion;
+ }
+
+ private void invalidateCachedRings()
+ {
+ ringVersion++;
+ }
+
/**
* Tracks the assignment of racks and endpoints in each datacenter for all the "normal" endpoints
* in this TokenMetadata. This allows faster calculation of endpoints in NetworkTopologyStrategy.