You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/11/26 21:13:14 UTC

[1/6] git commit: Fix thundering herd on endpoint cache invalidation patch by rbranson and jbellis for CASSANDRA-6345

Updated Branches:
  refs/heads/cassandra-1.2 fce173532 -> 8145c8356
  refs/heads/cassandra-2.0 6c68b30fe -> 504f66dc1
  refs/heads/trunk 9f3a7f8a6 -> 1bfd062fd


Fix thundering herd on endpoint cache invalidation
patch by rbranson and jbellis for CASSANDRA-6345


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8145c835
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8145c835
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8145c835

Branch: refs/heads/cassandra-1.2
Commit: 8145c83566450feb68a12352ac88efe9983ec266
Parents: fce1735
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Nov 26 14:09:56 2013 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Nov 26 14:09:56 2013 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../locator/AbstractReplicationStrategy.java    | 58 +++++++++++++-------
 .../apache/cassandra/locator/TokenMetadata.java | 47 +++++++---------
 3 files changed, 59 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/8145c835/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 57c1896..8d443f9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.2.13
+ * Fix thundering herd on endpoint cache invalidation (CASSANDRA-6345)
  * Optimize FD phi calculation (CASSANDRA-6386)
  * Improve initial FD phi estimate when starting up (CASSANDRA-6385)
  * Don't list CQL3 table in CLI describe even if named explicitely (CASSANDRA-5750)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8145c835/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
index e17b0b4..51c4119 100644
--- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
+++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
@@ -20,10 +20,12 @@ package org.apache.cassandra.locator;
 import java.lang.reflect.Constructor;
 import java.net.InetAddress;
 import java.util.*;
+import java.util.concurrent.locks.Lock;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.Striped;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,6 +56,15 @@ public abstract class AbstractReplicationStrategy
     public final Map<String, String> configOptions;
     private final TokenMetadata tokenMetadata;
 
+    // We want to make updating our replicas asynchronous vs the "master" TokenMetadata instance,
+    // so that our ownership calculations never block Gossip from processing an ownership change.
+    // But, we also can't afford to re-clone TM for each range after cache invalidation (CASSANDRA-6345),
+    // so we keep our own copy here.
+    //
+    // Writes to tokenMetadataClone should be synchronized.
+    private volatile TokenMetadata tokenMetadataClone = null;
+    private volatile long clonedTokenMetadataVersion = 0;
+
     public IEndpointSnitch snitch;
 
     AbstractReplicationStrategy(String tableName, TokenMetadata tokenMetadata, IEndpointSnitch snitch, Map<String, String> configOptions)
@@ -63,7 +74,6 @@ public abstract class AbstractReplicationStrategy
         assert tokenMetadata != null;
         this.tokenMetadata = tokenMetadata;
         this.snitch = snitch;
-        this.tokenMetadata.register(this);
         this.configOptions = configOptions == null ? Collections.<String, String>emptyMap() : configOptions;
         this.tableName = tableName;
         // lazy-initialize table itself since we don't create them until after the replication strategies
@@ -73,18 +83,23 @@ public abstract class AbstractReplicationStrategy
 
     public ArrayList<InetAddress> getCachedEndpoints(Token t)
     {
-        return cachedEndpoints.get(t);
-    }
+        long lastVersion = tokenMetadata.getRingVersion();
 
-    public void cacheEndpoint(Token t, ArrayList<InetAddress> addr)
-    {
-        cachedEndpoints.put(t, addr);
-    }
+        if (lastVersion > clonedTokenMetadataVersion)
+        {
+            synchronized (this)
+            {
+                if (lastVersion > clonedTokenMetadataVersion)
+                {
+                    logger.debug("clearing cached endpoints");
+                    tokenMetadataClone = null;
+                    cachedEndpoints.clear();
+                    clonedTokenMetadataVersion = lastVersion;
+                }
+            }
+        }
 
-    public void clearEndpointCache()
-    {
-        logger.debug("clearing cached endpoints");
-        cachedEndpoints.clear();
+        return cachedEndpoints.get(t);
     }
 
     /**
@@ -101,10 +116,20 @@ public abstract class AbstractReplicationStrategy
         ArrayList<InetAddress> endpoints = getCachedEndpoints(keyToken);
         if (endpoints == null)
         {
-            TokenMetadata tokenMetadataClone = tokenMetadata.cloneOnlyTokenMap();
-            keyToken = TokenMetadata.firstToken(tokenMetadataClone.sortedTokens(), searchToken);
+            if (tokenMetadataClone == null)
+            {
+                // synchronize to prevent thundering herd post-invalidation
+                synchronized (this)
+                {
+                    if (tokenMetadataClone == null)
+                        tokenMetadataClone = tokenMetadata.cloneOnlyTokenMap();
+                }
+                // if our clone got invalidated, it's possible there is a new token to account for too
+                keyToken = TokenMetadata.firstToken(tokenMetadataClone.sortedTokens(), searchToken);
+            }
+
             endpoints = new ArrayList<InetAddress>(calculateNaturalEndpoints(searchToken, tokenMetadataClone));
-            cacheEndpoint(keyToken, endpoints);
+            cachedEndpoints.put(keyToken, endpoints);
         }
 
         return new ArrayList<InetAddress>(endpoints);
@@ -204,11 +229,6 @@ public abstract class AbstractReplicationStrategy
         return getAddressRanges(temp).get(pendingAddress);
     }
 
-    public void invalidateCachedTokenEndpointValues()
-    {
-        clearEndpointCache();
-    }
-
     public abstract void validateOptions() throws ConfigurationException;
 
     /*

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8145c835/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index 5ab1b3f..818ca8f 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -22,15 +22,10 @@ import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import com.google.common.collect.*;
-
-import org.apache.cassandra.utils.BiMultiValMap;
-import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.SortedBiMultiValMap;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,6 +35,9 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.BiMultiValMap;
+import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.SortedBiMultiValMap;
 
 public class TokenMetadata
 {
@@ -96,8 +94,6 @@ public class TokenMetadata
     private volatile ArrayList<Token> sortedTokens;
 
     private final Topology topology;
-    /* list of subscribers that are notified when the tokenToEndpointMap changed */
-    private final CopyOnWriteArrayList<AbstractReplicationStrategy> subscribers = new CopyOnWriteArrayList<AbstractReplicationStrategy>();
 
     private static final Comparator<InetAddress> inetaddressCmp = new Comparator<InetAddress>()
     {
@@ -107,6 +103,9 @@ public class TokenMetadata
         }
     };
 
+    // signals replication strategies that nodes have joined or left the ring and they need to recompute ownership
+    private volatile long ringVersion = 0;
+
     public TokenMetadata()
     {
         this(SortedBiMultiValMap.<Token, InetAddress>create(null, inetaddressCmp),
@@ -428,7 +427,7 @@ public class TokenMetadata
             leavingEndpoints.remove(endpoint);
             endpointToHostIdMap.remove(endpoint);
             sortedTokens = sortTokens();
-            invalidateCaches();
+            invalidateCachedRings();
         }
         finally
         {
@@ -456,7 +455,7 @@ public class TokenMetadata
                 }
             }
 
-            invalidateCaches();
+            invalidateCachedRings();
         }
         finally
         {
@@ -885,7 +884,7 @@ public class TokenMetadata
             leavingEndpoints.clear();
             pendingRanges.clear();
             endpointToHostIdMap.clear();
-            invalidateCaches();
+            invalidateCachedRings();
         }
         finally
         {
@@ -977,24 +976,6 @@ public class TokenMetadata
         return sb.toString();
     }
 
-    public void invalidateCaches()
-    {
-        for (AbstractReplicationStrategy subscriber : subscribers)
-        {
-            subscriber.invalidateCachedTokenEndpointValues();
-        }
-    }
-
-    public void register(AbstractReplicationStrategy subscriber)
-    {
-        subscribers.add(subscriber);
-    }
-
-    public void unregister(AbstractReplicationStrategy subscriber)
-    {
-        subscribers.remove(subscriber);
-    }
-
     public Collection<InetAddress> pendingEndpointsFor(Token token, String table)
     {
         Map<Range<Token>, Collection<InetAddress>> ranges = getPendingRanges(table);
@@ -1068,6 +1049,16 @@ public class TokenMetadata
         return topology;
     }
 
+    public long getRingVersion()
+    {
+        return ringVersion;
+    }
+
+    private void invalidateCachedRings()
+    {
+        ringVersion++;
+    }
+
     /**
      * Tracks the assignment of racks and endpoints in each datacenter for all the "normal" endpoints
      * in this TokenMetadata. This allows faster calculation of endpoints in NetworkTopologyStrategy.


[5/6] git commit: merge from 1.2

Posted by jb...@apache.org.
merge from 1.2


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/504f66dc
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/504f66dc
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/504f66dc

Branch: refs/heads/cassandra-2.0
Commit: 504f66dc148ab4277756f7d7ca34d760d6f4a179
Parents: 6c68b30 8145c83
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Nov 26 14:13:02 2013 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Nov 26 14:13:02 2013 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../locator/AbstractReplicationStrategy.java    | 58 +++++++++++++-------
 .../apache/cassandra/locator/TokenMetadata.java | 49 +++++++----------
 3 files changed, 60 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/504f66dc/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 34dc7a5,8d443f9..d52c508
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,8 -1,8 +1,9 @@@
 -1.2.13
 +2.0.4
 + * Fix divide-by-zero in PCI (CASSANDRA-6403)
 + * Fix setting last compacted key in the wrong level for LCS (CASSANDRA-6284)
 + * Add sub-ms precision formats to the timestamp parser (CASSANDRA-6395)
 +Merged from 1.2:
+  * Fix thundering herd on endpoint cache invalidation (CASSANDRA-6345)
 - * Optimize FD phi calculation (CASSANDRA-6386)
 - * Improve initial FD phi estimate when starting up (CASSANDRA-6385)
 - * Don't list CQL3 table in CLI describe even if named explicitely (CASSANDRA-5750)
   * cqlsh: quote single quotes in strings inside collections (CASSANDRA-6172)
  
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/504f66dc/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
index f83c889,51c4119..69c133b
--- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
+++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
@@@ -54,19 -56,27 +56,27 @@@ public abstract class AbstractReplicati
      public final Map<String, String> configOptions;
      private final TokenMetadata tokenMetadata;
  
+     // We want to make updating our replicas asynchronous vs the "master" TokenMetadata instance,
+     // so that our ownership calculations never block Gossip from processing an ownership change.
+     // But, we also can't afford to re-clone TM for each range after cache invalidation (CASSANDRA-6345),
+     // so we keep our own copy here.
+     //
+     // Writes to tokenMetadataClone should be synchronized.
+     private volatile TokenMetadata tokenMetadataClone = null;
+     private volatile long clonedTokenMetadataVersion = 0;
+ 
      public IEndpointSnitch snitch;
  
 -    AbstractReplicationStrategy(String tableName, TokenMetadata tokenMetadata, IEndpointSnitch snitch, Map<String, String> configOptions)
 +    AbstractReplicationStrategy(String keyspaceName, TokenMetadata tokenMetadata, IEndpointSnitch snitch, Map<String, String> configOptions)
      {
 -        assert tableName != null;
 +        assert keyspaceName != null;
          assert snitch != null;
          assert tokenMetadata != null;
          this.tokenMetadata = tokenMetadata;
          this.snitch = snitch;
-         this.tokenMetadata.register(this);
          this.configOptions = configOptions == null ? Collections.<String, String>emptyMap() : configOptions;
 -        this.tableName = tableName;
 -        // lazy-initialize table itself since we don't create them until after the replication strategies
 +        this.keyspaceName = keyspaceName;
 +        // lazy-initialize keyspace itself since we don't create them until after the replication strategies
      }
  
      private final Map<Token, ArrayList<InetAddress>> cachedEndpoints = new NonBlockingHashMap<Token, ArrayList<InetAddress>>();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/504f66dc/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/locator/TokenMetadata.java
index 7f794ea,818ca8f..b20be18
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@@ -27,11 -26,7 +26,7 @@@ import java.util.concurrent.locks.ReadW
  import java.util.concurrent.locks.ReentrantReadWriteLock;
  
  import com.google.common.collect.*;
- 
- import org.apache.cassandra.utils.BiMultiValMap;
- import org.apache.cassandra.utils.Pair;
- import org.apache.cassandra.utils.SortedBiMultiValMap;
 -import org.apache.commons.lang.StringUtils;
 +import org.apache.commons.lang3.StringUtils;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
@@@ -977,27 -976,9 +976,9 @@@ public class TokenMetadat
          return sb.toString();
      }
  
-     public void invalidateCaches()
-     {
-         for (AbstractReplicationStrategy subscriber : subscribers)
-         {
-             subscriber.invalidateCachedTokenEndpointValues();
-         }
-     }
- 
-     public void register(AbstractReplicationStrategy subscriber)
-     {
-         subscribers.add(subscriber);
-     }
- 
-     public void unregister(AbstractReplicationStrategy subscriber)
-     {
-         subscribers.remove(subscriber);
-     }
- 
-     public Collection<InetAddress> pendingEndpointsFor(Token token, String keyspaceName)
+     public Collection<InetAddress> pendingEndpointsFor(Token token, String table)
      {
 -        Map<Range<Token>, Collection<InetAddress>> ranges = getPendingRanges(table);
 +        Map<Range<Token>, Collection<InetAddress>> ranges = getPendingRanges(keyspaceName);
          if (ranges.isEmpty())
              return Collections.emptyList();
  


[2/6] git commit: Fix thundering herd on endpoint cache invalidation patch by rbranson and jbellis for CASSANDRA-6345

Posted by jb...@apache.org.
Fix thundering herd on endpoint cache invalidation
patch by rbranson and jbellis for CASSANDRA-6345


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8145c835
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8145c835
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8145c835

Branch: refs/heads/cassandra-2.0
Commit: 8145c83566450feb68a12352ac88efe9983ec266
Parents: fce1735
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Nov 26 14:09:56 2013 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Nov 26 14:09:56 2013 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../locator/AbstractReplicationStrategy.java    | 58 +++++++++++++-------
 .../apache/cassandra/locator/TokenMetadata.java | 47 +++++++---------
 3 files changed, 59 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/8145c835/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 57c1896..8d443f9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.2.13
+ * Fix thundering herd on endpoint cache invalidation (CASSANDRA-6345)
  * Optimize FD phi calculation (CASSANDRA-6386)
  * Improve initial FD phi estimate when starting up (CASSANDRA-6385)
  * Don't list CQL3 table in CLI describe even if named explicitely (CASSANDRA-5750)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8145c835/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
index e17b0b4..51c4119 100644
--- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
+++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
@@ -20,10 +20,12 @@ package org.apache.cassandra.locator;
 import java.lang.reflect.Constructor;
 import java.net.InetAddress;
 import java.util.*;
+import java.util.concurrent.locks.Lock;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.Striped;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,6 +56,15 @@ public abstract class AbstractReplicationStrategy
     public final Map<String, String> configOptions;
     private final TokenMetadata tokenMetadata;
 
+    // We want to make updating our replicas asynchronous vs the "master" TokenMetadata instance,
+    // so that our ownership calculations never block Gossip from processing an ownership change.
+    // But, we also can't afford to re-clone TM for each range after cache invalidation (CASSANDRA-6345),
+    // so we keep our own copy here.
+    //
+    // Writes to tokenMetadataClone should be synchronized.
+    private volatile TokenMetadata tokenMetadataClone = null;
+    private volatile long clonedTokenMetadataVersion = 0;
+
     public IEndpointSnitch snitch;
 
     AbstractReplicationStrategy(String tableName, TokenMetadata tokenMetadata, IEndpointSnitch snitch, Map<String, String> configOptions)
@@ -63,7 +74,6 @@ public abstract class AbstractReplicationStrategy
         assert tokenMetadata != null;
         this.tokenMetadata = tokenMetadata;
         this.snitch = snitch;
-        this.tokenMetadata.register(this);
         this.configOptions = configOptions == null ? Collections.<String, String>emptyMap() : configOptions;
         this.tableName = tableName;
         // lazy-initialize table itself since we don't create them until after the replication strategies
@@ -73,18 +83,23 @@ public abstract class AbstractReplicationStrategy
 
     public ArrayList<InetAddress> getCachedEndpoints(Token t)
     {
-        return cachedEndpoints.get(t);
-    }
+        long lastVersion = tokenMetadata.getRingVersion();
 
-    public void cacheEndpoint(Token t, ArrayList<InetAddress> addr)
-    {
-        cachedEndpoints.put(t, addr);
-    }
+        if (lastVersion > clonedTokenMetadataVersion)
+        {
+            synchronized (this)
+            {
+                if (lastVersion > clonedTokenMetadataVersion)
+                {
+                    logger.debug("clearing cached endpoints");
+                    tokenMetadataClone = null;
+                    cachedEndpoints.clear();
+                    clonedTokenMetadataVersion = lastVersion;
+                }
+            }
+        }
 
-    public void clearEndpointCache()
-    {
-        logger.debug("clearing cached endpoints");
-        cachedEndpoints.clear();
+        return cachedEndpoints.get(t);
     }
 
     /**
@@ -101,10 +116,20 @@ public abstract class AbstractReplicationStrategy
         ArrayList<InetAddress> endpoints = getCachedEndpoints(keyToken);
         if (endpoints == null)
         {
-            TokenMetadata tokenMetadataClone = tokenMetadata.cloneOnlyTokenMap();
-            keyToken = TokenMetadata.firstToken(tokenMetadataClone.sortedTokens(), searchToken);
+            if (tokenMetadataClone == null)
+            {
+                // synchronize to prevent thundering herd post-invalidation
+                synchronized (this)
+                {
+                    if (tokenMetadataClone == null)
+                        tokenMetadataClone = tokenMetadata.cloneOnlyTokenMap();
+                }
+                // if our clone got invalidated, it's possible there is a new token to account for too
+                keyToken = TokenMetadata.firstToken(tokenMetadataClone.sortedTokens(), searchToken);
+            }
+
             endpoints = new ArrayList<InetAddress>(calculateNaturalEndpoints(searchToken, tokenMetadataClone));
-            cacheEndpoint(keyToken, endpoints);
+            cachedEndpoints.put(keyToken, endpoints);
         }
 
         return new ArrayList<InetAddress>(endpoints);
@@ -204,11 +229,6 @@ public abstract class AbstractReplicationStrategy
         return getAddressRanges(temp).get(pendingAddress);
     }
 
-    public void invalidateCachedTokenEndpointValues()
-    {
-        clearEndpointCache();
-    }
-
     public abstract void validateOptions() throws ConfigurationException;
 
     /*

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8145c835/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index 5ab1b3f..818ca8f 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -22,15 +22,10 @@ import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import com.google.common.collect.*;
-
-import org.apache.cassandra.utils.BiMultiValMap;
-import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.SortedBiMultiValMap;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,6 +35,9 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.BiMultiValMap;
+import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.SortedBiMultiValMap;
 
 public class TokenMetadata
 {
@@ -96,8 +94,6 @@ public class TokenMetadata
     private volatile ArrayList<Token> sortedTokens;
 
     private final Topology topology;
-    /* list of subscribers that are notified when the tokenToEndpointMap changed */
-    private final CopyOnWriteArrayList<AbstractReplicationStrategy> subscribers = new CopyOnWriteArrayList<AbstractReplicationStrategy>();
 
     private static final Comparator<InetAddress> inetaddressCmp = new Comparator<InetAddress>()
     {
@@ -107,6 +103,9 @@ public class TokenMetadata
         }
     };
 
+    // signals replication strategies that nodes have joined or left the ring and they need to recompute ownership
+    private volatile long ringVersion = 0;
+
     public TokenMetadata()
     {
         this(SortedBiMultiValMap.<Token, InetAddress>create(null, inetaddressCmp),
@@ -428,7 +427,7 @@ public class TokenMetadata
             leavingEndpoints.remove(endpoint);
             endpointToHostIdMap.remove(endpoint);
             sortedTokens = sortTokens();
-            invalidateCaches();
+            invalidateCachedRings();
         }
         finally
         {
@@ -456,7 +455,7 @@ public class TokenMetadata
                 }
             }
 
-            invalidateCaches();
+            invalidateCachedRings();
         }
         finally
         {
@@ -885,7 +884,7 @@ public class TokenMetadata
             leavingEndpoints.clear();
             pendingRanges.clear();
             endpointToHostIdMap.clear();
-            invalidateCaches();
+            invalidateCachedRings();
         }
         finally
         {
@@ -977,24 +976,6 @@ public class TokenMetadata
         return sb.toString();
     }
 
-    public void invalidateCaches()
-    {
-        for (AbstractReplicationStrategy subscriber : subscribers)
-        {
-            subscriber.invalidateCachedTokenEndpointValues();
-        }
-    }
-
-    public void register(AbstractReplicationStrategy subscriber)
-    {
-        subscribers.add(subscriber);
-    }
-
-    public void unregister(AbstractReplicationStrategy subscriber)
-    {
-        subscribers.remove(subscriber);
-    }
-
     public Collection<InetAddress> pendingEndpointsFor(Token token, String table)
     {
         Map<Range<Token>, Collection<InetAddress>> ranges = getPendingRanges(table);
@@ -1068,6 +1049,16 @@ public class TokenMetadata
         return topology;
     }
 
+    public long getRingVersion()
+    {
+        return ringVersion;
+    }
+
+    private void invalidateCachedRings()
+    {
+        ringVersion++;
+    }
+
     /**
      * Tracks the assignment of racks and endpoints in each datacenter for all the "normal" endpoints
      * in this TokenMetadata. This allows faster calculation of endpoints in NetworkTopologyStrategy.


[6/6] git commit: Merge branch 'cassandra-2.0' into trunk

Posted by jb...@apache.org.
Merge branch 'cassandra-2.0' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1bfd062f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1bfd062f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1bfd062f

Branch: refs/heads/trunk
Commit: 1bfd062fdc9daa35fbabcebb3ac31e726504f1ff
Parents: 9f3a7f8 504f66d
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Nov 26 14:13:06 2013 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Nov 26 14:13:06 2013 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../locator/AbstractReplicationStrategy.java    | 58 +++++++++++++-------
 .../apache/cassandra/locator/TokenMetadata.java | 49 +++++++----------
 3 files changed, 60 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1bfd062f/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1bfd062f/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------


[3/6] git commit: Fix thundering herd on endpoint cache invalidation patch by rbranson and jbellis for CASSANDRA-6345

Posted by jb...@apache.org.
Fix thundering herd on endpoint cache invalidation
patch by rbranson and jbellis for CASSANDRA-6345


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8145c835
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8145c835
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8145c835

Branch: refs/heads/trunk
Commit: 8145c83566450feb68a12352ac88efe9983ec266
Parents: fce1735
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Nov 26 14:09:56 2013 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Nov 26 14:09:56 2013 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../locator/AbstractReplicationStrategy.java    | 58 +++++++++++++-------
 .../apache/cassandra/locator/TokenMetadata.java | 47 +++++++---------
 3 files changed, 59 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/8145c835/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 57c1896..8d443f9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.2.13
+ * Fix thundering herd on endpoint cache invalidation (CASSANDRA-6345)
  * Optimize FD phi calculation (CASSANDRA-6386)
  * Improve initial FD phi estimate when starting up (CASSANDRA-6385)
  * Don't list CQL3 table in CLI describe even if named explicitely (CASSANDRA-5750)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8145c835/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
index e17b0b4..51c4119 100644
--- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
+++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
@@ -20,10 +20,12 @@ package org.apache.cassandra.locator;
 import java.lang.reflect.Constructor;
 import java.net.InetAddress;
 import java.util.*;
+import java.util.concurrent.locks.Lock;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.Striped;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,6 +56,15 @@ public abstract class AbstractReplicationStrategy
     public final Map<String, String> configOptions;
     private final TokenMetadata tokenMetadata;
 
+    // We want to make updating our replicas asynchronous vs the "master" TokenMetadata instance,
+    // so that our ownership calculations never block Gossip from processing an ownership change.
+    // But, we also can't afford to re-clone TM for each range after cache invalidation (CASSANDRA-6345),
+    // so we keep our own copy here.
+    //
+    // Writes to tokenMetadataClone should be synchronized.
+    private volatile TokenMetadata tokenMetadataClone = null;
+    private volatile long clonedTokenMetadataVersion = 0;
+
     public IEndpointSnitch snitch;
 
     AbstractReplicationStrategy(String tableName, TokenMetadata tokenMetadata, IEndpointSnitch snitch, Map<String, String> configOptions)
@@ -63,7 +74,6 @@ public abstract class AbstractReplicationStrategy
         assert tokenMetadata != null;
         this.tokenMetadata = tokenMetadata;
         this.snitch = snitch;
-        this.tokenMetadata.register(this);
         this.configOptions = configOptions == null ? Collections.<String, String>emptyMap() : configOptions;
         this.tableName = tableName;
         // lazy-initialize table itself since we don't create them until after the replication strategies
@@ -73,18 +83,23 @@ public abstract class AbstractReplicationStrategy
 
     public ArrayList<InetAddress> getCachedEndpoints(Token t)
     {
-        return cachedEndpoints.get(t);
-    }
+        long lastVersion = tokenMetadata.getRingVersion();
 
-    public void cacheEndpoint(Token t, ArrayList<InetAddress> addr)
-    {
-        cachedEndpoints.put(t, addr);
-    }
+        if (lastVersion > clonedTokenMetadataVersion)
+        {
+            synchronized (this)
+            {
+                if (lastVersion > clonedTokenMetadataVersion)
+                {
+                    logger.debug("clearing cached endpoints");
+                    tokenMetadataClone = null;
+                    cachedEndpoints.clear();
+                    clonedTokenMetadataVersion = lastVersion;
+                }
+            }
+        }
 
-    public void clearEndpointCache()
-    {
-        logger.debug("clearing cached endpoints");
-        cachedEndpoints.clear();
+        return cachedEndpoints.get(t);
     }
 
     /**
@@ -101,10 +116,20 @@ public abstract class AbstractReplicationStrategy
         ArrayList<InetAddress> endpoints = getCachedEndpoints(keyToken);
         if (endpoints == null)
         {
-            TokenMetadata tokenMetadataClone = tokenMetadata.cloneOnlyTokenMap();
-            keyToken = TokenMetadata.firstToken(tokenMetadataClone.sortedTokens(), searchToken);
+            if (tokenMetadataClone == null)
+            {
+                // synchronize to prevent thundering herd post-invalidation
+                synchronized (this)
+                {
+                    if (tokenMetadataClone == null)
+                        tokenMetadataClone = tokenMetadata.cloneOnlyTokenMap();
+                }
+                // if our clone got invalidated, it's possible there is a new token to account for too
+                keyToken = TokenMetadata.firstToken(tokenMetadataClone.sortedTokens(), searchToken);
+            }
+
             endpoints = new ArrayList<InetAddress>(calculateNaturalEndpoints(searchToken, tokenMetadataClone));
-            cacheEndpoint(keyToken, endpoints);
+            cachedEndpoints.put(keyToken, endpoints);
         }
 
         return new ArrayList<InetAddress>(endpoints);
@@ -204,11 +229,6 @@ public abstract class AbstractReplicationStrategy
         return getAddressRanges(temp).get(pendingAddress);
     }
 
-    public void invalidateCachedTokenEndpointValues()
-    {
-        clearEndpointCache();
-    }
-
     public abstract void validateOptions() throws ConfigurationException;
 
     /*

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8145c835/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index 5ab1b3f..818ca8f 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -22,15 +22,10 @@ import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import com.google.common.collect.*;
-
-import org.apache.cassandra.utils.BiMultiValMap;
-import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.SortedBiMultiValMap;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,6 +35,9 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.BiMultiValMap;
+import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.SortedBiMultiValMap;
 
 public class TokenMetadata
 {
@@ -96,8 +94,6 @@ public class TokenMetadata
     private volatile ArrayList<Token> sortedTokens;
 
     private final Topology topology;
-    /* list of subscribers that are notified when the tokenToEndpointMap changed */
-    private final CopyOnWriteArrayList<AbstractReplicationStrategy> subscribers = new CopyOnWriteArrayList<AbstractReplicationStrategy>();
 
     private static final Comparator<InetAddress> inetaddressCmp = new Comparator<InetAddress>()
     {
@@ -107,6 +103,9 @@ public class TokenMetadata
         }
     };
 
+    // signals replication strategies that nodes have joined or left the ring and they need to recompute ownership
+    private volatile long ringVersion = 0;
+
     public TokenMetadata()
     {
         this(SortedBiMultiValMap.<Token, InetAddress>create(null, inetaddressCmp),
@@ -428,7 +427,7 @@ public class TokenMetadata
             leavingEndpoints.remove(endpoint);
             endpointToHostIdMap.remove(endpoint);
             sortedTokens = sortTokens();
-            invalidateCaches();
+            invalidateCachedRings();
         }
         finally
         {
@@ -456,7 +455,7 @@ public class TokenMetadata
                 }
             }
 
-            invalidateCaches();
+            invalidateCachedRings();
         }
         finally
         {
@@ -885,7 +884,7 @@ public class TokenMetadata
             leavingEndpoints.clear();
             pendingRanges.clear();
             endpointToHostIdMap.clear();
-            invalidateCaches();
+            invalidateCachedRings();
         }
         finally
         {
@@ -977,24 +976,6 @@ public class TokenMetadata
         return sb.toString();
     }
 
-    public void invalidateCaches()
-    {
-        for (AbstractReplicationStrategy subscriber : subscribers)
-        {
-            subscriber.invalidateCachedTokenEndpointValues();
-        }
-    }
-
-    public void register(AbstractReplicationStrategy subscriber)
-    {
-        subscribers.add(subscriber);
-    }
-
-    public void unregister(AbstractReplicationStrategy subscriber)
-    {
-        subscribers.remove(subscriber);
-    }
-
     public Collection<InetAddress> pendingEndpointsFor(Token token, String table)
     {
         Map<Range<Token>, Collection<InetAddress>> ranges = getPendingRanges(table);
@@ -1068,6 +1049,16 @@ public class TokenMetadata
         return topology;
     }
 
+    public long getRingVersion()
+    {
+        return ringVersion;
+    }
+
+    private void invalidateCachedRings()
+    {
+        ringVersion++;
+    }
+
     /**
      * Tracks the assignment of racks and endpoints in each datacenter for all the "normal" endpoints
      * in this TokenMetadata. This allows faster calculation of endpoints in NetworkTopologyStrategy.


[4/6] git commit: merge from 1.2

Posted by jb...@apache.org.
merge from 1.2


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/504f66dc
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/504f66dc
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/504f66dc

Branch: refs/heads/trunk
Commit: 504f66dc148ab4277756f7d7ca34d760d6f4a179
Parents: 6c68b30 8145c83
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Nov 26 14:13:02 2013 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Nov 26 14:13:02 2013 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../locator/AbstractReplicationStrategy.java    | 58 +++++++++++++-------
 .../apache/cassandra/locator/TokenMetadata.java | 49 +++++++----------
 3 files changed, 60 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/504f66dc/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 34dc7a5,8d443f9..d52c508
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,8 -1,8 +1,9 @@@
 -1.2.13
 +2.0.4
 + * Fix divide-by-zero in PCI (CASSANDRA-6403)
 + * Fix setting last compacted key in the wrong level for LCS (CASSANDRA-6284)
 + * Add sub-ms precision formats to the timestamp parser (CASSANDRA-6395)
 +Merged from 1.2:
+  * Fix thundering herd on endpoint cache invalidation (CASSANDRA-6345)
 - * Optimize FD phi calculation (CASSANDRA-6386)
 - * Improve initial FD phi estimate when starting up (CASSANDRA-6385)
 - * Don't list CQL3 table in CLI describe even if named explicitely (CASSANDRA-5750)
   * cqlsh: quote single quotes in strings inside collections (CASSANDRA-6172)
  
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/504f66dc/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
index f83c889,51c4119..69c133b
--- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
+++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
@@@ -54,19 -56,27 +56,27 @@@ public abstract class AbstractReplicati
      public final Map<String, String> configOptions;
      private final TokenMetadata tokenMetadata;
  
+     // We want to make updating our replicas asynchronous vs the "master" TokenMetadata instance,
+     // so that our ownership calculations never block Gossip from processing an ownership change.
+     // But, we also can't afford to re-clone TM for each range after cache invalidation (CASSANDRA-6345),
+     // so we keep our own copy here.
+     //
+     // Writes to tokenMetadataClone should be synchronized.
+     private volatile TokenMetadata tokenMetadataClone = null;
+     private volatile long clonedTokenMetadataVersion = 0;
+ 
      public IEndpointSnitch snitch;
  
 -    AbstractReplicationStrategy(String tableName, TokenMetadata tokenMetadata, IEndpointSnitch snitch, Map<String, String> configOptions)
 +    AbstractReplicationStrategy(String keyspaceName, TokenMetadata tokenMetadata, IEndpointSnitch snitch, Map<String, String> configOptions)
      {
 -        assert tableName != null;
 +        assert keyspaceName != null;
          assert snitch != null;
          assert tokenMetadata != null;
          this.tokenMetadata = tokenMetadata;
          this.snitch = snitch;
-         this.tokenMetadata.register(this);
          this.configOptions = configOptions == null ? Collections.<String, String>emptyMap() : configOptions;
 -        this.tableName = tableName;
 -        // lazy-initialize table itself since we don't create them until after the replication strategies
 +        this.keyspaceName = keyspaceName;
 +        // lazy-initialize keyspace itself since we don't create them until after the replication strategies
      }
  
      private final Map<Token, ArrayList<InetAddress>> cachedEndpoints = new NonBlockingHashMap<Token, ArrayList<InetAddress>>();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/504f66dc/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/locator/TokenMetadata.java
index 7f794ea,818ca8f..b20be18
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@@ -27,11 -26,7 +26,7 @@@ import java.util.concurrent.locks.ReadW
  import java.util.concurrent.locks.ReentrantReadWriteLock;
  
  import com.google.common.collect.*;
- 
- import org.apache.cassandra.utils.BiMultiValMap;
- import org.apache.cassandra.utils.Pair;
- import org.apache.cassandra.utils.SortedBiMultiValMap;
 -import org.apache.commons.lang.StringUtils;
 +import org.apache.commons.lang3.StringUtils;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
@@@ -977,27 -976,9 +976,9 @@@ public class TokenMetadat
          return sb.toString();
      }
  
-     public void invalidateCaches()
-     {
-         for (AbstractReplicationStrategy subscriber : subscribers)
-         {
-             subscriber.invalidateCachedTokenEndpointValues();
-         }
-     }
- 
-     public void register(AbstractReplicationStrategy subscriber)
-     {
-         subscribers.add(subscriber);
-     }
- 
-     public void unregister(AbstractReplicationStrategy subscriber)
-     {
-         subscribers.remove(subscriber);
-     }
- 
-     public Collection<InetAddress> pendingEndpointsFor(Token token, String keyspaceName)
+     public Collection<InetAddress> pendingEndpointsFor(Token token, String table)
      {
 -        Map<Range<Token>, Collection<InetAddress>> ranges = getPendingRanges(table);
 +        Map<Range<Token>, Collection<InetAddress>> ranges = getPendingRanges(keyspaceName);
          if (ranges.isEmpty())
              return Collections.emptyList();