You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/11/26 21:13:16 UTC

[3/6] git commit: Fix thundering herd on endpoint cache invalidation patch by rbranson and jbellis for CASSANDRA-6345

Fix thundering herd on endpoint cache invalidation
patch by rbranson and jbellis for CASSANDRA-6345


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8145c835
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8145c835
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8145c835

Branch: refs/heads/trunk
Commit: 8145c83566450feb68a12352ac88efe9983ec266
Parents: fce1735
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Nov 26 14:09:56 2013 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Nov 26 14:09:56 2013 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../locator/AbstractReplicationStrategy.java    | 58 +++++++++++++-------
 .../apache/cassandra/locator/TokenMetadata.java | 47 +++++++---------
 3 files changed, 59 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/8145c835/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 57c1896..8d443f9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.2.13
+ * Fix thundering herd on endpoint cache invalidation (CASSANDRA-6345)
  * Optimize FD phi calculation (CASSANDRA-6386)
  * Improve initial FD phi estimate when starting up (CASSANDRA-6385)
  * Don't list CQL3 table in CLI describe even if named explicitely (CASSANDRA-5750)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8145c835/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
index e17b0b4..51c4119 100644
--- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
+++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
@@ -20,10 +20,12 @@ package org.apache.cassandra.locator;
 import java.lang.reflect.Constructor;
 import java.net.InetAddress;
 import java.util.*;
+import java.util.concurrent.locks.Lock;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.Striped;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,6 +56,15 @@ public abstract class AbstractReplicationStrategy
     public final Map<String, String> configOptions;
     private final TokenMetadata tokenMetadata;
 
+    // We want to make updating our replicas asynchronous vs the "master" TokenMetadata instance,
+    // so that our ownership calculations never block Gossip from processing an ownership change.
+    // But, we also can't afford to re-clone TM for each range after cache invalidation (CASSANDRA-6345),
+    // so we keep our own copy here.
+    //
+    // Writes to tokenMetadataClone should be synchronized.
+    private volatile TokenMetadata tokenMetadataClone = null;
+    private volatile long clonedTokenMetadataVersion = 0;
+
     public IEndpointSnitch snitch;
 
     AbstractReplicationStrategy(String tableName, TokenMetadata tokenMetadata, IEndpointSnitch snitch, Map<String, String> configOptions)
@@ -63,7 +74,6 @@ public abstract class AbstractReplicationStrategy
         assert tokenMetadata != null;
         this.tokenMetadata = tokenMetadata;
         this.snitch = snitch;
-        this.tokenMetadata.register(this);
         this.configOptions = configOptions == null ? Collections.<String, String>emptyMap() : configOptions;
         this.tableName = tableName;
         // lazy-initialize table itself since we don't create them until after the replication strategies
@@ -73,18 +83,23 @@ public abstract class AbstractReplicationStrategy
 
     public ArrayList<InetAddress> getCachedEndpoints(Token t)
     {
-        return cachedEndpoints.get(t);
-    }
+        long lastVersion = tokenMetadata.getRingVersion();
 
-    public void cacheEndpoint(Token t, ArrayList<InetAddress> addr)
-    {
-        cachedEndpoints.put(t, addr);
-    }
+        if (lastVersion > clonedTokenMetadataVersion)
+        {
+            synchronized (this)
+            {
+                if (lastVersion > clonedTokenMetadataVersion)
+                {
+                    logger.debug("clearing cached endpoints");
+                    tokenMetadataClone = null;
+                    cachedEndpoints.clear();
+                    clonedTokenMetadataVersion = lastVersion;
+                }
+            }
+        }
 
-    public void clearEndpointCache()
-    {
-        logger.debug("clearing cached endpoints");
-        cachedEndpoints.clear();
+        return cachedEndpoints.get(t);
     }
 
     /**
@@ -101,10 +116,20 @@ public abstract class AbstractReplicationStrategy
         ArrayList<InetAddress> endpoints = getCachedEndpoints(keyToken);
         if (endpoints == null)
         {
-            TokenMetadata tokenMetadataClone = tokenMetadata.cloneOnlyTokenMap();
-            keyToken = TokenMetadata.firstToken(tokenMetadataClone.sortedTokens(), searchToken);
+            if (tokenMetadataClone == null)
+            {
+                // synchronize to prevent thundering herd post-invalidation
+                synchronized (this)
+                {
+                    if (tokenMetadataClone == null)
+                        tokenMetadataClone = tokenMetadata.cloneOnlyTokenMap();
+                }
+                // if our clone got invalidated, it's possible there is a new token to account for too
+                keyToken = TokenMetadata.firstToken(tokenMetadataClone.sortedTokens(), searchToken);
+            }
+
             endpoints = new ArrayList<InetAddress>(calculateNaturalEndpoints(searchToken, tokenMetadataClone));
-            cacheEndpoint(keyToken, endpoints);
+            cachedEndpoints.put(keyToken, endpoints);
         }
 
         return new ArrayList<InetAddress>(endpoints);
@@ -204,11 +229,6 @@ public abstract class AbstractReplicationStrategy
         return getAddressRanges(temp).get(pendingAddress);
     }
 
-    public void invalidateCachedTokenEndpointValues()
-    {
-        clearEndpointCache();
-    }
-
     public abstract void validateOptions() throws ConfigurationException;
 
     /*

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8145c835/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index 5ab1b3f..818ca8f 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -22,15 +22,10 @@ import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import com.google.common.collect.*;
-
-import org.apache.cassandra.utils.BiMultiValMap;
-import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.SortedBiMultiValMap;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,6 +35,9 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.BiMultiValMap;
+import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.SortedBiMultiValMap;
 
 public class TokenMetadata
 {
@@ -96,8 +94,6 @@ public class TokenMetadata
     private volatile ArrayList<Token> sortedTokens;
 
     private final Topology topology;
-    /* list of subscribers that are notified when the tokenToEndpointMap changed */
-    private final CopyOnWriteArrayList<AbstractReplicationStrategy> subscribers = new CopyOnWriteArrayList<AbstractReplicationStrategy>();
 
     private static final Comparator<InetAddress> inetaddressCmp = new Comparator<InetAddress>()
     {
@@ -107,6 +103,9 @@ public class TokenMetadata
         }
     };
 
+    // signals replication strategies that nodes have joined or left the ring and they need to recompute ownership
+    private volatile long ringVersion = 0;
+
     public TokenMetadata()
     {
         this(SortedBiMultiValMap.<Token, InetAddress>create(null, inetaddressCmp),
@@ -428,7 +427,7 @@ public class TokenMetadata
             leavingEndpoints.remove(endpoint);
             endpointToHostIdMap.remove(endpoint);
             sortedTokens = sortTokens();
-            invalidateCaches();
+            invalidateCachedRings();
         }
         finally
         {
@@ -456,7 +455,7 @@ public class TokenMetadata
                 }
             }
 
-            invalidateCaches();
+            invalidateCachedRings();
         }
         finally
         {
@@ -885,7 +884,7 @@ public class TokenMetadata
             leavingEndpoints.clear();
             pendingRanges.clear();
             endpointToHostIdMap.clear();
-            invalidateCaches();
+            invalidateCachedRings();
         }
         finally
         {
@@ -977,24 +976,6 @@ public class TokenMetadata
         return sb.toString();
     }
 
-    public void invalidateCaches()
-    {
-        for (AbstractReplicationStrategy subscriber : subscribers)
-        {
-            subscriber.invalidateCachedTokenEndpointValues();
-        }
-    }
-
-    public void register(AbstractReplicationStrategy subscriber)
-    {
-        subscribers.add(subscriber);
-    }
-
-    public void unregister(AbstractReplicationStrategy subscriber)
-    {
-        subscribers.remove(subscriber);
-    }
-
     public Collection<InetAddress> pendingEndpointsFor(Token token, String table)
     {
         Map<Range<Token>, Collection<InetAddress>> ranges = getPendingRanges(table);
@@ -1068,6 +1049,16 @@ public class TokenMetadata
         return topology;
     }
 
+    public long getRingVersion()
+    {
+        return ringVersion;
+    }
+
+    private void invalidateCachedRings()
+    {
+        ringVersion++;
+    }
+
     /**
      * Tracks the assignment of racks and endpoints in each datacenter for all the "normal" endpoints
      * in this TokenMetadata. This allows faster calculation of endpoints in NetworkTopologyStrategy.