You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2013/12/15 11:56:55 UTC

[1/3] git commit: Improve batchlog write performance with vnodes

Updated Branches:
  refs/heads/trunk 5d167cf3d -> 2ee6b8fd9


Improve batchlog write performance with vnodes

patch by Jonathan Ellis and Rick Branson; reviewed by Aleksey Yeschenko
for CASSANDRA-6488


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4be9e672
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4be9e672
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4be9e672

Branch: refs/heads/trunk
Commit: 4be9e6720d9f94a83aa42153c3e71ae1e557d2d9
Parents: a3d91dc
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Sun Dec 15 13:29:56 2013 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Sun Dec 15 13:29:56 2013 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../locator/AbstractReplicationStrategy.java    | 35 ++++--------------
 .../apache/cassandra/locator/TokenMetadata.java | 35 +++++++++++++-----
 .../apache/cassandra/service/StorageProxy.java  | 39 +++++++++-----------
 4 files changed, 53 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/4be9e672/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e586592..b55393b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -17,6 +17,7 @@
    (CASSANDRA-6413)
  * (Hadoop) add describe_local_ring (CASSANDRA-6268)
  * Fix handling of concurrent directory creation failure (CASSANDRA-6459)
+ * Improve batchlog write performance with vnodes (CASSANDRA-6488)
 
 
 1.2.12

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4be9e672/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
index c36fde4..85e229c 100644
--- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
+++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
@@ -20,12 +20,10 @@ package org.apache.cassandra.locator;
 import java.lang.reflect.Constructor;
 import java.net.InetAddress;
 import java.util.*;
-import java.util.concurrent.locks.Lock;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
-import com.google.common.util.concurrent.Striped;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,14 +54,8 @@ public abstract class AbstractReplicationStrategy
     public final Map<String, String> configOptions;
     private final TokenMetadata tokenMetadata;
 
-    // We want to make updating our replicas asynchronous vs the "master" TokenMetadata instance,
-    // so that our ownership calculations never block Gossip from processing an ownership change.
-    // But, we also can't afford to re-clone TM for each range after cache invalidation (CASSANDRA-6345),
-    // so we keep our own copy here.
-    //
-    // Writes to tokenMetadataClone should be synchronized.
-    private volatile TokenMetadata tokenMetadataClone = null;
-    private volatile long clonedTokenMetadataVersion = 0;
+    // track when the token range changes, signaling we need to invalidate our endpoint cache
+    private volatile long lastInvalidatedVersion = 0;
 
     public IEndpointSnitch snitch;
 
@@ -85,16 +77,15 @@ public abstract class AbstractReplicationStrategy
     {
         long lastVersion = tokenMetadata.getRingVersion();
 
-        if (lastVersion > clonedTokenMetadataVersion)
+        if (lastVersion > lastInvalidatedVersion)
         {
             synchronized (this)
             {
-                if (lastVersion > clonedTokenMetadataVersion)
+                if (lastVersion > lastInvalidatedVersion)
                 {
                     logger.debug("clearing cached endpoints");
-                    tokenMetadataClone = null;
                     cachedEndpoints.clear();
-                    clonedTokenMetadataVersion = lastVersion;
+                    lastInvalidatedVersion = lastVersion;
                 }
             }
         }
@@ -116,19 +107,9 @@ public abstract class AbstractReplicationStrategy
         ArrayList<InetAddress> endpoints = getCachedEndpoints(keyToken);
         if (endpoints == null)
         {
-            TokenMetadata tm; // local reference in case another thread nulls tMC out from under us
-            if ((tm = tokenMetadataClone) == null)
-            {
-                // synchronize to prevent thundering herd post-invalidation
-                synchronized (this)
-                {
-                    if ((tm = tokenMetadataClone) == null)
-                        tm = tokenMetadataClone = tokenMetadata.cloneOnlyTokenMap();
-                }
-                // if our clone got invalidated, it's possible there is a new token to account for too
-                keyToken = TokenMetadata.firstToken(tm.sortedTokens(), searchToken);
-            }
-
+            TokenMetadata tm = tokenMetadata.cloneOnlyTokenMap();
+            // if our cache got invalidated, it's possible there is a new token to account for too
+            keyToken = TokenMetadata.firstToken(tm.sortedTokens(), searchToken);
             endpoints = new ArrayList<InetAddress>(calculateNaturalEndpoints(searchToken, tm));
             cachedEndpoints.put(keyToken, endpoints);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4be9e672/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index b724894..be0f7c7 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -585,22 +586,37 @@ public class TokenMetadata
         }
     }
 
+    private final AtomicReference<TokenMetadata> cachedTokenMap = new AtomicReference<TokenMetadata>();
+
     /**
      * Create a copy of TokenMetadata with only tokenToEndpointMap. That is, pending ranges,
      * bootstrap tokens and leaving endpoints are not included in the copy.
+     *
+     * This uses a cached copy that is invalided when the ring changes, so in the common case
+     * no extra locking is required.
      */
     public TokenMetadata cloneOnlyTokenMap()
     {
-        lock.readLock().lock();
-        try
-        {
-            return new TokenMetadata(SortedBiMultiValMap.<Token, InetAddress>create(tokenToEndpointMap, null, inetaddressCmp),
-                                     HashBiMap.create(endpointToHostIdMap),
-                                     new Topology(topology));
-        }
-        finally
+        TokenMetadata tm = cachedTokenMap.get();
+        if (tm != null)
+            return tm;
+
+        // synchronize is to prevent thundering herd (CASSANDRA-6345); lock.readLock is for correctness vs updates to our internals
+        synchronized (this)
         {
-            lock.readLock().unlock();
+            lock.readLock().lock();
+            try
+            {
+                tm = new TokenMetadata(SortedBiMultiValMap.<Token, InetAddress>create(tokenToEndpointMap, null, inetaddressCmp),
+                                       HashBiMap.create(endpointToHostIdMap),
+                                       new Topology(topology));
+                cachedTokenMap.set(tm);
+                return tm;
+            }
+            finally
+            {
+                lock.readLock().unlock();
+            }
         }
     }
 
@@ -1057,6 +1073,7 @@ public class TokenMetadata
     public void invalidateCachedRings()
     {
         ringVersion++;
+        cachedTokenMap.set(null);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4be9e672/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 3e9f2cb..376edb6 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -23,6 +23,7 @@ import java.lang.management.ManagementFactory;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.*;
+import java.util.Random;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 import javax.management.MBeanServer;
@@ -414,30 +415,32 @@ public class StorageProxy implements StorageProxyMBean
      * - replicas should be alive according to the failure detector
      * - replicas should be in the local datacenter
      * - choose min(2, number of qualifying candiates above)
-     * - allow the local node to be the only replica only if it's a single-node cluster
+     * - allow the local node to be the only replica only if it's a single-node DC
      */
-    private static Collection<InetAddress> getBatchlogEndpoints(String localDataCenter, ConsistencyLevel consistencyLevel) throws UnavailableException
+    private static Collection<InetAddress> getBatchlogEndpoints(String localDataCenter, ConsistencyLevel consistencyLevel)
+    throws UnavailableException
     {
-        // will include every known node in the DC, including localhost.
         TokenMetadata.Topology topology = StorageService.instance.getTokenMetadata().cloneOnlyTokenMap().getTopology();
-        Collection<InetAddress> localMembers = topology.getDatacenterEndpoints().get(localDataCenter);
+        List<InetAddress> localEndpoints = new ArrayList<InetAddress>(topology.getDatacenterEndpoints().get(localDataCenter));
 
         // special case for single-node datacenters
-        if (localMembers.size() == 1)
-            return localMembers;
+        if (localEndpoints.size() == 1)
+            return localEndpoints;
 
-        // not a single-node cluster - don't count the local node.
-        localMembers.remove(FBUtilities.getBroadcastAddress());
+        List<InetAddress> chosenEndpoints = new ArrayList<InetAddress>(2);
+        int startOffset = new Random().nextInt(localEndpoints.size());
 
-        // include only alive nodes
-        List<InetAddress> candidates = new ArrayList<InetAddress>(localMembers.size());
-        for (InetAddress member : localMembers)
+        // starts at some random point in the list, advances forward until the end, then loops
+        // around to the beginning, advancing again until it is back at the starting point again.
+        for (int i = 0; i < localEndpoints.size() && chosenEndpoints.size() < 2; i++)
         {
-            if (FailureDetector.instance.isAlive(member))
-                candidates.add(member);
+            InetAddress endpoint = localEndpoints.get((i + startOffset) % localEndpoints.size());
+            // skip localhost and non-alive nodes
+            if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && FailureDetector.instance.isAlive(endpoint))
+                chosenEndpoints.add(endpoint);
         }
 
-        if (candidates.isEmpty())
+        if (chosenEndpoints.isEmpty())
         {
             if (consistencyLevel == ConsistencyLevel.ANY)
                 return Collections.singleton(FBUtilities.getBroadcastAddress());
@@ -445,13 +448,7 @@ public class StorageProxy implements StorageProxyMBean
             throw new UnavailableException(ConsistencyLevel.ONE, 1, 0);
         }
 
-        if (candidates.size() > 2)
-        {
-            Collections.shuffle(candidates);
-            candidates = candidates.subList(0, 2);
-        }
-
-        return candidates;
+        return chosenEndpoints;
     }
 
     /**


[3/3] git commit: Merge branch 'cassandra-2.0' into trunk

Posted by al...@apache.org.
Merge branch 'cassandra-2.0' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2ee6b8fd
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2ee6b8fd
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2ee6b8fd

Branch: refs/heads/trunk
Commit: 2ee6b8fd91db261dedae6e9a3539342b084056bd
Parents: 5d167cf bdff106
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Sun Dec 15 13:37:13 2013 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Sun Dec 15 13:37:13 2013 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../locator/AbstractReplicationStrategy.java    | 35 ++++--------------
 .../apache/cassandra/locator/TokenMetadata.java | 35 +++++++++++++-----
 .../apache/cassandra/service/StorageProxy.java  | 39 +++++++++-----------
 4 files changed, 53 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2ee6b8fd/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2ee6b8fd/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2ee6b8fd/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------


[2/3] git commit: Merge branch 'cassandra-1.2' into cassandra-2.0

Posted by al...@apache.org.
Merge branch 'cassandra-1.2' into cassandra-2.0

Conflicts:
	CHANGES.txt


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bdff106a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bdff106a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bdff106a

Branch: refs/heads/trunk
Commit: bdff106aa9698849ba4a82d8a19715e2dbbb7f62
Parents: bb09d3c 4be9e67
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Sun Dec 15 13:36:18 2013 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Sun Dec 15 13:36:18 2013 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../locator/AbstractReplicationStrategy.java    | 35 ++++--------------
 .../apache/cassandra/locator/TokenMetadata.java | 35 +++++++++++++-----
 .../apache/cassandra/service/StorageProxy.java  | 39 +++++++++-----------
 4 files changed, 53 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/bdff106a/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index a54231e,b55393b..89ef6e1
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -25,46 -17,10 +25,47 @@@ Merged from 1.2
     (CASSANDRA-6413)
   * (Hadoop) add describe_local_ring (CASSANDRA-6268)
   * Fix handling of concurrent directory creation failure (CASSANDRA-6459)
 + * Randomize batchlog candidates selection (CASSANDRA-6481)
+  * Improve batchlog write performance with vnodes (CASSANDRA-6488)
  
  
 -1.2.12
 +2.0.3
 + * Fix FD leak on slice read path (CASSANDRA-6275)
 + * Cancel read meter task when closing SSTR (CASSANDRA-6358)
 + * free off-heap IndexSummary during bulk (CASSANDRA-6359)
 + * Recover from IOException in accept() thread (CASSANDRA-6349)
 + * Improve Gossip tolerance of abnormally slow tasks (CASSANDRA-6338)
 + * Fix trying to hint timed out counter writes (CASSANDRA-6322)
 + * Allow restoring specific columnfamilies from archived CL (CASSANDRA-4809)
 + * Avoid flushing compaction_history after each operation (CASSANDRA-6287)
 + * Fix repair assertion error when tombstones expire (CASSANDRA-6277)
 + * Skip loading corrupt key cache (CASSANDRA-6260)
 + * Fixes for compacting larger-than-memory rows (CASSANDRA-6274)
 + * Compact hottest sstables first and optionally omit coldest from
 +   compaction entirely (CASSANDRA-6109)
 + * Fix modifying column_metadata from thrift (CASSANDRA-6182)
 + * cqlsh: fix LIST USERS output (CASSANDRA-6242)
 + * Add IRequestSink interface (CASSANDRA-6248)
 + * Update memtable size while flushing (CASSANDRA-6249)
 + * Provide hooks around CQL2/CQL3 statement execution (CASSANDRA-6252)
 + * Require Permission.SELECT for CAS updates (CASSANDRA-6247)
 + * New CQL-aware SSTableWriter (CASSANDRA-5894)
 + * Reject CAS operation when the protocol v1 is used (CASSANDRA-6270)
 + * Correctly throw error when frame too large (CASSANDRA-5981)
 + * Fix serialization bug in PagedRange with 2ndary indexes (CASSANDRA-6299)
 + * Fix CQL3 table validation in Thrift (CASSANDRA-6140)
 + * Fix bug missing results with IN clauses (CASSANDRA-6327)
 + * Fix paging with reversed slices (CASSANDRA-6343)
 + * Set minTimestamp correctly to be able to drop expired sstables (CASSANDRA-6337)
 + * Support NaN and Infinity as float literals (CASSANDRA-6003)
 + * Remove RF from nodetool ring output (CASSANDRA-6289)
 + * Fix attempting to flush empty rows (CASSANDRA-6374)
 + * Fix potential out of bounds exception when paging (CASSANDRA-6333)
 +Merged from 1.2:
 + * Optimize FD phi calculation (CASSANDRA-6386)
 + * Improve initial FD phi estimate when starting up (CASSANDRA-6385)
 + * Don't list CQL3 table in CLI describe even if named explicitely 
 +   (CASSANDRA-5750)
   * Invalidate row cache when dropping CF (CASSANDRA-6351)
   * add non-jamm path for cached statements (CASSANDRA-6293)
   * (Hadoop) Require CFRR batchSize to be at least 2 (CASSANDRA-6114)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bdff106a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bdff106a/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bdff106a/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageProxy.java
index 9957a37,376edb6..7f97da9
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@@ -767,30 -415,32 +768,32 @@@ public class StorageProxy implements St
       * - replicas should be alive according to the failure detector
       * - replicas should be in the local datacenter
       * - choose min(2, number of qualifying candiates above)
-      * - allow the local node to be the only replica only if it's a single-node cluster
+      * - allow the local node to be the only replica only if it's a single-node DC
       */
-     private static Collection<InetAddress> getBatchlogEndpoints(String localDataCenter, ConsistencyLevel consistencyLevel) throws UnavailableException
+     private static Collection<InetAddress> getBatchlogEndpoints(String localDataCenter, ConsistencyLevel consistencyLevel)
+     throws UnavailableException
      {
-         // will include every known node in the DC, including localhost.
          TokenMetadata.Topology topology = StorageService.instance.getTokenMetadata().cloneOnlyTokenMap().getTopology();
-         Collection<InetAddress> localMembers = topology.getDatacenterEndpoints().get(localDataCenter);
 -        List<InetAddress> localEndpoints = new ArrayList<InetAddress>(topology.getDatacenterEndpoints().get(localDataCenter));
++        List<InetAddress> localEndpoints = new ArrayList<>(topology.getDatacenterEndpoints().get(localDataCenter));
  
          // special case for single-node datacenters
-         if (localMembers.size() == 1)
-             return localMembers;
+         if (localEndpoints.size() == 1)
+             return localEndpoints;
  
-         // not a single-node cluster - don't count the local node.
-         localMembers.remove(FBUtilities.getBroadcastAddress());
 -        List<InetAddress> chosenEndpoints = new ArrayList<InetAddress>(2);
++        List<InetAddress> chosenEndpoints = new ArrayList<>(2);
+         int startOffset = new Random().nextInt(localEndpoints.size());
  
-         // include only alive nodes
-         List<InetAddress> candidates = new ArrayList<InetAddress>(localMembers.size());
-         for (InetAddress member : localMembers)
+         // starts at some random point in the list, advances forward until the end, then loops
+         // around to the beginning, advancing again until it is back at the starting point again.
+         for (int i = 0; i < localEndpoints.size() && chosenEndpoints.size() < 2; i++)
          {
-             if (FailureDetector.instance.isAlive(member))
-                 candidates.add(member);
+             InetAddress endpoint = localEndpoints.get((i + startOffset) % localEndpoints.size());
+             // skip localhost and non-alive nodes
+             if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && FailureDetector.instance.isAlive(endpoint))
+                 chosenEndpoints.add(endpoint);
          }
  
-         if (candidates.isEmpty())
+         if (chosenEndpoints.isEmpty())
          {
              if (consistencyLevel == ConsistencyLevel.ANY)
                  return Collections.singleton(FBUtilities.getBroadcastAddress());