You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2016/11/18 17:39:06 UTC

[01/10] cassandra git commit: Avoid blocking gossip during pending range calculation

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.2 7d2fdfeb4 -> 9cd7d540d
  refs/heads/cassandra-3.0 eb41380cc -> 59b40b317
  refs/heads/cassandra-3.X f33cd55a5 -> 96d67b109
  refs/heads/trunk 29cb59106 -> f1c3aac76


Avoid blocking gossip during pending range calculation

patch by Stefan Podkowinski; reviewed by Joel Knighton for CASSANDRA-12281


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9cd7d540
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9cd7d540
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9cd7d540

Branch: refs/heads/cassandra-2.2
Commit: 9cd7d540de2ea525982d139a4c8a11233c4e98c9
Parents: 7d2fdfe
Author: Stefan Podkowinski <s....@gmail.com>
Authored: Fri Oct 21 11:34:53 2016 +0200
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Fri Nov 18 17:29:14 2016 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/locator/TokenMetadata.java | 211 +++++++++++--------
 .../service/PendingRangeCalculatorService.java  |  10 +-
 .../gms/PendingRangeCalculatorServiceTest.java  | 133 ++++++++++++
 4 files changed, 267 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9cd7d540/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 54dc4b5..5a2e0ab 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.9
+ * Avoid blocking gossip during pending range calculation (CASSANDRA-12281)
  * Fix purgeability of tombstones with max timestamp (CASSANDRA-12792)
  * Fail repair if participant dies during sync or anticompaction (CASSANDRA-12901)
  * cqlsh COPY: unprotected pk values before converting them if not using prepared statements (CASSANDRA-12863)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9cd7d540/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index b06c9c8..aafd7f9 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -649,12 +649,7 @@ public class TokenMetadata
         lock.readLock().lock();
         try
         {
-            TokenMetadata allLeftMetadata = cloneOnlyTokenMap();
-
-            for (InetAddress endpoint : leavingEndpoints)
-                allLeftMetadata.removeEndpoint(endpoint);
-
-            return allLeftMetadata;
+            return removeEndpoints(cloneOnlyTokenMap(), leavingEndpoints);
         }
         finally
         {
@@ -662,6 +657,14 @@ public class TokenMetadata
         }
     }
 
+    private static TokenMetadata removeEndpoints(TokenMetadata allLeftMetadata, Set<InetAddress> leavingEndpoints)
+    {
+        for (InetAddress endpoint : leavingEndpoints)
+            allLeftMetadata.removeEndpoint(endpoint);
+
+        return allLeftMetadata;
+    }
+
     /**
      * Create a copy of TokenMetadata with tokenToEndpointMap reflecting situation after all
      * current leave, and move operations have finished.
@@ -787,118 +790,154 @@ public class TokenMetadata
      */
     public void calculatePendingRanges(AbstractReplicationStrategy strategy, String keyspaceName)
     {
-        lock.readLock().lock();
-        try
+        // avoid race between both branches - do not use a lock here as this will block any other unrelated operations!
+        synchronized (pendingRanges)
         {
-            PendingRangeMaps newPendingRanges = new PendingRangeMaps();
-
             if (bootstrapTokens.isEmpty() && leavingEndpoints.isEmpty() && movingEndpoints.isEmpty())
             {
                 if (logger.isTraceEnabled())
                     logger.trace("No bootstrapping, leaving or moving nodes -> empty pending ranges for {}", keyspaceName);
 
-                pendingRanges.put(keyspaceName, newPendingRanges);
-                return;
+                pendingRanges.put(keyspaceName, new PendingRangeMaps());
             }
+            else
+            {
+                if (logger.isDebugEnabled())
+                    logger.debug("Starting pending range calculation for {}", keyspaceName);
 
-            Multimap<InetAddress, Range<Token>> addressRanges = strategy.getAddressRanges();
-
-            // Copy of metadata reflecting the situation after all leave operations are finished.
-            TokenMetadata allLeftMetadata = cloneAfterAllLeft();
+                long startedAt = System.currentTimeMillis();
 
-            // get all ranges that will be affected by leaving nodes
-            Set<Range<Token>> affectedRanges = new HashSet<Range<Token>>();
-            for (InetAddress endpoint : leavingEndpoints)
-                affectedRanges.addAll(addressRanges.get(endpoint));
+                // create clone of current state
+                BiMultiValMap<Token, InetAddress> bootstrapTokens = new BiMultiValMap<>();
+                Set<InetAddress> leavingEndpoints = new HashSet<>();
+                Set<Pair<Token, InetAddress>> movingEndpoints = new HashSet<>();
+                TokenMetadata metadata;
 
-            // for each of those ranges, find what new nodes will be responsible for the range when
-            // all leaving nodes are gone.
-            TokenMetadata metadata = cloneOnlyTokenMap(); // don't do this in the loop! #7758
-            for (Range<Token> range : affectedRanges)
-            {
-                Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, metadata));
-                Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata));
-                for (InetAddress address : Sets.difference(newEndpoints, currentEndpoints))
+                lock.readLock().lock();
+                try
+                {
+                    bootstrapTokens.putAll(this.bootstrapTokens);
+                    leavingEndpoints.addAll(this.leavingEndpoints);
+                    movingEndpoints.addAll(this.movingEndpoints);
+                    metadata = this.cloneOnlyTokenMap();
+                }
+                finally
                 {
-                    newPendingRanges.addPendingRange(range, address);
+                    lock.readLock().unlock();
                 }
+
+                pendingRanges.put(keyspaceName, calculatePendingRanges(strategy, metadata, bootstrapTokens,
+                                                                       leavingEndpoints, movingEndpoints));
+                long took = System.currentTimeMillis() - startedAt;
+
+                if (logger.isDebugEnabled())
+                    logger.debug("Pending range calculation for {} completed (took: {}ms)", keyspaceName, took);
+                if (logger.isTraceEnabled())
+                    logger.trace("Calculated pending ranges for {}:\n{}", keyspaceName, (pendingRanges.isEmpty() ? "<empty>" : printPendingRanges()));
             }
+        }
+    }
+
+    /**
+     * @see TokenMetadata#calculatePendingRanges(AbstractReplicationStrategy, String)
+     */
+    private static PendingRangeMaps calculatePendingRanges(AbstractReplicationStrategy strategy,
+                                                           TokenMetadata metadata,
+                                                           BiMultiValMap<Token, InetAddress> bootstrapTokens,
+                                                           Set<InetAddress> leavingEndpoints,
+                                                           Set<Pair<Token, InetAddress>> movingEndpoints)
+    {
+        PendingRangeMaps newPendingRanges = new PendingRangeMaps();
+
+        Multimap<InetAddress, Range<Token>> addressRanges = strategy.getAddressRanges(metadata);
 
-            // At this stage newPendingRanges has been updated according to leave operations. We can
-            // now continue the calculation by checking bootstrapping nodes.
+        // Copy of metadata reflecting the situation after all leave operations are finished.
+        TokenMetadata allLeftMetadata = removeEndpoints(metadata.cloneOnlyTokenMap(), leavingEndpoints);
 
-            // For each of the bootstrapping nodes, simply add and remove them one by one to
-            // allLeftMetadata and check in between what their ranges would be.
-            Multimap<InetAddress, Token> bootstrapAddresses = bootstrapTokens.inverse();
-            for (InetAddress endpoint : bootstrapAddresses.keySet())
+        // get all ranges that will be affected by leaving nodes
+        Set<Range<Token>> affectedRanges = new HashSet<Range<Token>>();
+        for (InetAddress endpoint : leavingEndpoints)
+            affectedRanges.addAll(addressRanges.get(endpoint));
+
+        // for each of those ranges, find what new nodes will be responsible for the range when
+        // all leaving nodes are gone.
+        for (Range<Token> range : affectedRanges)
+        {
+            Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, metadata));
+            Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata));
+            for (InetAddress address : Sets.difference(newEndpoints, currentEndpoints))
             {
-                Collection<Token> tokens = bootstrapAddresses.get(endpoint);
+                newPendingRanges.addPendingRange(range, address);
+            }
+        }
 
-                allLeftMetadata.updateNormalTokens(tokens, endpoint);
-                for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
-                {
-                    newPendingRanges.addPendingRange(range, endpoint);
-                }
-                allLeftMetadata.removeEndpoint(endpoint);
+        // At this stage newPendingRanges has been updated according to leave operations. We can
+        // now continue the calculation by checking bootstrapping nodes.
+
+        // For each of the bootstrapping nodes, simply add and remove them one by one to
+        // allLeftMetadata and check in between what their ranges would be.
+        Multimap<InetAddress, Token> bootstrapAddresses = bootstrapTokens.inverse();
+        for (InetAddress endpoint : bootstrapAddresses.keySet())
+        {
+            Collection<Token> tokens = bootstrapAddresses.get(endpoint);
+
+            allLeftMetadata.updateNormalTokens(tokens, endpoint);
+            for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
+            {
+                newPendingRanges.addPendingRange(range, endpoint);
             }
+            allLeftMetadata.removeEndpoint(endpoint);
+        }
 
-            // At this stage newPendingRanges has been updated according to leaving and bootstrapping nodes.
-            // We can now finish the calculation by checking moving nodes.
+        // At this stage newPendingRanges has been updated according to leaving and bootstrapping nodes.
+        // We can now finish the calculation by checking moving nodes.
 
-            // For each of the moving nodes, we do the same thing we did for bootstrapping:
-            // simply add and remove them one by one to allLeftMetadata and check in between what their ranges would be.
-            for (Pair<Token, InetAddress> moving : movingEndpoints)
+        // For each of the moving nodes, we do the same thing we did for bootstrapping:
+        // simply add and remove them one by one to allLeftMetadata and check in between what their ranges would be.
+        for (Pair<Token, InetAddress> moving : movingEndpoints)
+        {
+            //Calculate all the ranges which will could be affected. This will include the ranges before and after the move.
+            Set<Range<Token>> moveAffectedRanges = new HashSet<>();
+            InetAddress endpoint = moving.right; // address of the moving node
+            //Add ranges before the move
+            for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
             {
-                //Calculate all the ranges which will could be affected. This will include the ranges before and after the move.
-                Set<Range<Token>> moveAffectedRanges = new HashSet<>();
-                InetAddress endpoint = moving.right; // address of the moving node
-                //Add ranges before the move
-                for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
-                {
-                    moveAffectedRanges.add(range);
-                }
+                moveAffectedRanges.add(range);
+            }
 
-                allLeftMetadata.updateNormalToken(moving.left, endpoint);
-                //Add ranges after the move
-                for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
-                {
-                    moveAffectedRanges.add(range);
-                }
+            allLeftMetadata.updateNormalToken(moving.left, endpoint);
+            //Add ranges after the move
+            for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
+            {
+                moveAffectedRanges.add(range);
+            }
 
-                for(Range<Token> range : moveAffectedRanges)
+            for(Range<Token> range : moveAffectedRanges)
+            {
+                Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, metadata));
+                Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata));
+                Set<InetAddress> difference = Sets.difference(newEndpoints, currentEndpoints);
+                for(final InetAddress address : difference)
                 {
-                    Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, metadata));
-                    Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata));
-                    Set<InetAddress> difference = Sets.difference(newEndpoints, currentEndpoints);
-                    for(final InetAddress address : difference)
-                    {
-                        Collection<Range<Token>> newRanges = strategy.getAddressRanges(allLeftMetadata).get(address);
-                        Collection<Range<Token>> oldRanges = strategy.getAddressRanges(metadata).get(address);
-                        //We want to get rid of any ranges which the node is currently getting.
-                        newRanges.removeAll(oldRanges);
+                    Collection<Range<Token>> newRanges = strategy.getAddressRanges(allLeftMetadata).get(address);
+                    Collection<Range<Token>> oldRanges = strategy.getAddressRanges(metadata).get(address);
+                    //We want to get rid of any ranges which the node is currently getting.
+                    newRanges.removeAll(oldRanges);
 
-                        for(Range<Token> newRange : newRanges)
+                    for(Range<Token> newRange : newRanges)
+                    {
+                        for(Range<Token> pendingRange : newRange.subtractAll(oldRanges))
                         {
-                            for(Range<Token> pendingRange : newRange.subtractAll(oldRanges))
-                            {
-                                newPendingRanges.addPendingRange(pendingRange, address);
-                            }
+                            newPendingRanges.addPendingRange(pendingRange, address);
                         }
                     }
                 }
-
-                allLeftMetadata.removeEndpoint(endpoint);
             }
 
-            pendingRanges.put(keyspaceName, newPendingRanges);
-
-            if (logger.isTraceEnabled())
-                logger.trace("Pending ranges:\n{}", (pendingRanges.isEmpty() ? "<empty>" : printPendingRanges()));
-        }
-        finally
-        {
-            lock.readLock().unlock();
+            allLeftMetadata.removeEndpoint(endpoint);
         }
+
+        return newPendingRanges;
     }
 
     public Token getPredecessor(Token token)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9cd7d540/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
index 806f6a5..116cede 100644
--- a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
+++ b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
@@ -26,6 +26,7 @@ import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.List;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -34,6 +35,9 @@ public class PendingRangeCalculatorService
     public static final PendingRangeCalculatorService instance = new PendingRangeCalculatorService();
 
     private static Logger logger = LoggerFactory.getLogger(PendingRangeCalculatorService.class);
+
+    // the executor will only run a single range calculation at a time while keeping at most one task queued in order
+    // to trigger an update only after the most recent state change and not for each update individually
     private final JMXEnabledThreadPoolExecutor executor = new JMXEnabledThreadPoolExecutor(1, Integer.MAX_VALUE, TimeUnit.SECONDS,
             new LinkedBlockingQueue<Runnable>(1), new NamedThreadFactory("PendingRangeCalculator"), "internal");
 
@@ -58,9 +62,11 @@ public class PendingRangeCalculatorService
             try
             {
                 long start = System.currentTimeMillis();
-                for (String keyspaceName : Schema.instance.getNonSystemKeyspaces())
+                List<String> keyspaces = Schema.instance.getNonSystemKeyspaces();
+                for (String keyspaceName : keyspaces)
                     calculatePendingRanges(Keyspace.open(keyspaceName).getReplicationStrategy(), keyspaceName);
-                logger.debug("finished calculation for {} keyspaces in {}ms", Schema.instance.getNonSystemKeyspaces().size(), System.currentTimeMillis() - start);
+                if (logger.isTraceEnabled())
+                    logger.trace("Finished PendingRangeTask for {} keyspaces in {}ms", keyspaces.size(), System.currentTimeMillis() - start);
             }
             finally
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9cd7d540/test/unit/org/apache/cassandra/gms/PendingRangeCalculatorServiceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/gms/PendingRangeCalculatorServiceTest.java b/test/unit/org/apache/cassandra/gms/PendingRangeCalculatorServiceTest.java
new file mode 100644
index 0000000..507948c
--- /dev/null
+++ b/test/unit/org/apache/cassandra/gms/PendingRangeCalculatorServiceTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.service.StorageService;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+
+/**
+ * Test for "Gossip blocks on startup when another node is bootstrapping" (CASSANDRA-12281).
+ */
+@RunWith(BMUnitRunner.class)
+public class PendingRangeCalculatorServiceTest
+{
+    static ReentrantLock calculationLock = new ReentrantLock();
+
+    @BeforeClass
+    public static void setUp() throws ConfigurationException
+    {
+        SchemaLoader.prepareServer();
+        StorageService.instance.initServer();
+    }
+
+    @Test
+    @BMRule(name = "Block pending range calculation",
+            targetClass = "TokenMetadata",
+            targetMethod = "calculatePendingRanges",
+            targetLocation = "AT INVOKE org.apache.cassandra.locator.AbstractReplicationStrategy.getAddressRanges",
+            action = "org.apache.cassandra.gms.PendingRangeCalculatorServiceTest.calculationLock.lock()")
+    public void testDelayedResponse() throws UnknownHostException, InterruptedException
+    {
+        final InetAddress otherNodeAddr = InetAddress.getByName("127.0.0.2");
+        final UUID otherHostId = UUID.randomUUID();
+
+        // introduce node for first major state change
+        Gossiper.instance.applyStateLocally(getStates(otherNodeAddr, otherHostId, 1, false));
+
+        // acquire lock to block pending range calculation via byteman
+        calculationLock.lock();
+        try
+        {
+            // spawn thread that will trigger handling of a bootstrap state change which in turn will trigger
+            // the pending range calculation that will be blocked by our lock
+            Thread t1 = new Thread()
+            {
+                public void run()
+                {
+                    Gossiper.instance.applyStateLocally(getStates(otherNodeAddr, otherHostId, 2, true));
+                }
+            };
+            t1.start();
+
+            // busy-spin until t1 is blocked by lock
+            while (!calculationLock.hasQueuedThreads()) ;
+
+            // trigger further state changes in case we don't want the blocked thread from the
+            // expensive range calculation to block us here as well
+            Thread t2 = new Thread()
+            {
+                public void run()
+                {
+                    Gossiper.instance.applyStateLocally(getStates(otherNodeAddr, otherHostId, 3, false));
+                    Gossiper.instance.applyStateLocally(getStates(otherNodeAddr, otherHostId, 4, false));
+                    Gossiper.instance.applyStateLocally(getStates(otherNodeAddr, otherHostId, 5, false));
+                }
+            };
+            t2.start();
+            t2.join(2000);
+            assertFalse("Thread still blocked by pending range calculation", t2.isAlive());
+            assertEquals(5, Gossiper.instance.getEndpointStateForEndpoint(otherNodeAddr).getHeartBeatState().getHeartBeatVersion());
+        }
+        finally
+        {
+            calculationLock.unlock();
+        }
+    }
+
+    private Map<InetAddress, EndpointState> getStates(InetAddress otherNodeAddr, UUID hostId, int ver, boolean bootstrapping)
+    {
+        HeartBeatState hb = new HeartBeatState(1, ver);
+        EndpointState state = new EndpointState(hb);
+        Collection<Token> tokens = new ArrayList<>();
+
+        tokens.add(new ByteOrderedPartitioner.BytesToken(new byte[]{1,2,3}));
+        state.addApplicationState(ApplicationState.TOKENS, StorageService.instance.valueFactory.tokens(tokens));
+        state.addApplicationState(ApplicationState.STATUS, bootstrapping ?
+                StorageService.instance.valueFactory.bootstrapping(tokens) : StorageService.instance.valueFactory.normal(tokens));
+        state.addApplicationState(ApplicationState.HOST_ID, StorageService.instance.valueFactory.hostId(hostId));
+        state.addApplicationState(ApplicationState.NET_VERSION, StorageService.instance.valueFactory.networkVersion());
+
+        Map<InetAddress, EndpointState> states = new HashMap<>();
+        states.put(otherNodeAddr, state);
+        return states;
+    }
+}


[02/10] cassandra git commit: Avoid blocking gossip during pending range calculation

Posted by al...@apache.org.
Avoid blocking gossip during pending range calculation

patch by Stefan Podkowinski; reviewed by Joel Knighton for CASSANDRA-12281


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9cd7d540
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9cd7d540
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9cd7d540

Branch: refs/heads/cassandra-3.0
Commit: 9cd7d540de2ea525982d139a4c8a11233c4e98c9
Parents: 7d2fdfe
Author: Stefan Podkowinski <s....@gmail.com>
Authored: Fri Oct 21 11:34:53 2016 +0200
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Fri Nov 18 17:29:14 2016 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/locator/TokenMetadata.java | 211 +++++++++++--------
 .../service/PendingRangeCalculatorService.java  |  10 +-
 .../gms/PendingRangeCalculatorServiceTest.java  | 133 ++++++++++++
 4 files changed, 267 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9cd7d540/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 54dc4b5..5a2e0ab 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.9
+ * Avoid blocking gossip during pending range calculation (CASSANDRA-12281)
  * Fix purgeability of tombstones with max timestamp (CASSANDRA-12792)
  * Fail repair if participant dies during sync or anticompaction (CASSANDRA-12901)
  * cqlsh COPY: unprotected pk values before converting them if not using prepared statements (CASSANDRA-12863)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9cd7d540/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index b06c9c8..aafd7f9 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -649,12 +649,7 @@ public class TokenMetadata
         lock.readLock().lock();
         try
         {
-            TokenMetadata allLeftMetadata = cloneOnlyTokenMap();
-
-            for (InetAddress endpoint : leavingEndpoints)
-                allLeftMetadata.removeEndpoint(endpoint);
-
-            return allLeftMetadata;
+            return removeEndpoints(cloneOnlyTokenMap(), leavingEndpoints);
         }
         finally
         {
@@ -662,6 +657,14 @@ public class TokenMetadata
         }
     }
 
+    private static TokenMetadata removeEndpoints(TokenMetadata allLeftMetadata, Set<InetAddress> leavingEndpoints)
+    {
+        for (InetAddress endpoint : leavingEndpoints)
+            allLeftMetadata.removeEndpoint(endpoint);
+
+        return allLeftMetadata;
+    }
+
     /**
      * Create a copy of TokenMetadata with tokenToEndpointMap reflecting situation after all
      * current leave, and move operations have finished.
@@ -787,118 +790,154 @@ public class TokenMetadata
      */
     public void calculatePendingRanges(AbstractReplicationStrategy strategy, String keyspaceName)
     {
-        lock.readLock().lock();
-        try
+        // avoid race between both branches - do not use a lock here as this will block any other unrelated operations!
+        synchronized (pendingRanges)
         {
-            PendingRangeMaps newPendingRanges = new PendingRangeMaps();
-
             if (bootstrapTokens.isEmpty() && leavingEndpoints.isEmpty() && movingEndpoints.isEmpty())
             {
                 if (logger.isTraceEnabled())
                     logger.trace("No bootstrapping, leaving or moving nodes -> empty pending ranges for {}", keyspaceName);
 
-                pendingRanges.put(keyspaceName, newPendingRanges);
-                return;
+                pendingRanges.put(keyspaceName, new PendingRangeMaps());
             }
+            else
+            {
+                if (logger.isDebugEnabled())
+                    logger.debug("Starting pending range calculation for {}", keyspaceName);
 
-            Multimap<InetAddress, Range<Token>> addressRanges = strategy.getAddressRanges();
-
-            // Copy of metadata reflecting the situation after all leave operations are finished.
-            TokenMetadata allLeftMetadata = cloneAfterAllLeft();
+                long startedAt = System.currentTimeMillis();
 
-            // get all ranges that will be affected by leaving nodes
-            Set<Range<Token>> affectedRanges = new HashSet<Range<Token>>();
-            for (InetAddress endpoint : leavingEndpoints)
-                affectedRanges.addAll(addressRanges.get(endpoint));
+                // create clone of current state
+                BiMultiValMap<Token, InetAddress> bootstrapTokens = new BiMultiValMap<>();
+                Set<InetAddress> leavingEndpoints = new HashSet<>();
+                Set<Pair<Token, InetAddress>> movingEndpoints = new HashSet<>();
+                TokenMetadata metadata;
 
-            // for each of those ranges, find what new nodes will be responsible for the range when
-            // all leaving nodes are gone.
-            TokenMetadata metadata = cloneOnlyTokenMap(); // don't do this in the loop! #7758
-            for (Range<Token> range : affectedRanges)
-            {
-                Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, metadata));
-                Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata));
-                for (InetAddress address : Sets.difference(newEndpoints, currentEndpoints))
+                lock.readLock().lock();
+                try
+                {
+                    bootstrapTokens.putAll(this.bootstrapTokens);
+                    leavingEndpoints.addAll(this.leavingEndpoints);
+                    movingEndpoints.addAll(this.movingEndpoints);
+                    metadata = this.cloneOnlyTokenMap();
+                }
+                finally
                 {
-                    newPendingRanges.addPendingRange(range, address);
+                    lock.readLock().unlock();
                 }
+
+                pendingRanges.put(keyspaceName, calculatePendingRanges(strategy, metadata, bootstrapTokens,
+                                                                       leavingEndpoints, movingEndpoints));
+                long took = System.currentTimeMillis() - startedAt;
+
+                if (logger.isDebugEnabled())
+                    logger.debug("Pending range calculation for {} completed (took: {}ms)", keyspaceName, took);
+                if (logger.isTraceEnabled())
+                    logger.trace("Calculated pending ranges for {}:\n{}", keyspaceName, (pendingRanges.isEmpty() ? "<empty>" : printPendingRanges()));
             }
+        }
+    }
+
+    /**
+     * @see TokenMetadata#calculatePendingRanges(AbstractReplicationStrategy, String)
+     */
+    private static PendingRangeMaps calculatePendingRanges(AbstractReplicationStrategy strategy,
+                                                           TokenMetadata metadata,
+                                                           BiMultiValMap<Token, InetAddress> bootstrapTokens,
+                                                           Set<InetAddress> leavingEndpoints,
+                                                           Set<Pair<Token, InetAddress>> movingEndpoints)
+    {
+        PendingRangeMaps newPendingRanges = new PendingRangeMaps();
+
+        Multimap<InetAddress, Range<Token>> addressRanges = strategy.getAddressRanges(metadata);
 
-            // At this stage newPendingRanges has been updated according to leave operations. We can
-            // now continue the calculation by checking bootstrapping nodes.
+        // Copy of metadata reflecting the situation after all leave operations are finished.
+        TokenMetadata allLeftMetadata = removeEndpoints(metadata.cloneOnlyTokenMap(), leavingEndpoints);
 
-            // For each of the bootstrapping nodes, simply add and remove them one by one to
-            // allLeftMetadata and check in between what their ranges would be.
-            Multimap<InetAddress, Token> bootstrapAddresses = bootstrapTokens.inverse();
-            for (InetAddress endpoint : bootstrapAddresses.keySet())
+        // get all ranges that will be affected by leaving nodes
+        Set<Range<Token>> affectedRanges = new HashSet<Range<Token>>();
+        for (InetAddress endpoint : leavingEndpoints)
+            affectedRanges.addAll(addressRanges.get(endpoint));
+
+        // for each of those ranges, find what new nodes will be responsible for the range when
+        // all leaving nodes are gone.
+        for (Range<Token> range : affectedRanges)
+        {
+            Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, metadata));
+            Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata));
+            for (InetAddress address : Sets.difference(newEndpoints, currentEndpoints))
             {
-                Collection<Token> tokens = bootstrapAddresses.get(endpoint);
+                newPendingRanges.addPendingRange(range, address);
+            }
+        }
 
-                allLeftMetadata.updateNormalTokens(tokens, endpoint);
-                for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
-                {
-                    newPendingRanges.addPendingRange(range, endpoint);
-                }
-                allLeftMetadata.removeEndpoint(endpoint);
+        // At this stage newPendingRanges has been updated according to leave operations. We can
+        // now continue the calculation by checking bootstrapping nodes.
+
+        // For each of the bootstrapping nodes, simply add and remove them one by one to
+        // allLeftMetadata and check in between what their ranges would be.
+        Multimap<InetAddress, Token> bootstrapAddresses = bootstrapTokens.inverse();
+        for (InetAddress endpoint : bootstrapAddresses.keySet())
+        {
+            Collection<Token> tokens = bootstrapAddresses.get(endpoint);
+
+            allLeftMetadata.updateNormalTokens(tokens, endpoint);
+            for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
+            {
+                newPendingRanges.addPendingRange(range, endpoint);
             }
+            allLeftMetadata.removeEndpoint(endpoint);
+        }
 
-            // At this stage newPendingRanges has been updated according to leaving and bootstrapping nodes.
-            // We can now finish the calculation by checking moving nodes.
+        // At this stage newPendingRanges has been updated according to leaving and bootstrapping nodes.
+        // We can now finish the calculation by checking moving nodes.
 
-            // For each of the moving nodes, we do the same thing we did for bootstrapping:
-            // simply add and remove them one by one to allLeftMetadata and check in between what their ranges would be.
-            for (Pair<Token, InetAddress> moving : movingEndpoints)
+        // For each of the moving nodes, we do the same thing we did for bootstrapping:
+        // simply add and remove them one by one to allLeftMetadata and check in between what their ranges would be.
+        for (Pair<Token, InetAddress> moving : movingEndpoints)
+        {
+            //Calculate all the ranges which will could be affected. This will include the ranges before and after the move.
+            Set<Range<Token>> moveAffectedRanges = new HashSet<>();
+            InetAddress endpoint = moving.right; // address of the moving node
+            //Add ranges before the move
+            for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
             {
-                //Calculate all the ranges which will could be affected. This will include the ranges before and after the move.
-                Set<Range<Token>> moveAffectedRanges = new HashSet<>();
-                InetAddress endpoint = moving.right; // address of the moving node
-                //Add ranges before the move
-                for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
-                {
-                    moveAffectedRanges.add(range);
-                }
+                moveAffectedRanges.add(range);
+            }
 
-                allLeftMetadata.updateNormalToken(moving.left, endpoint);
-                //Add ranges after the move
-                for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
-                {
-                    moveAffectedRanges.add(range);
-                }
+            allLeftMetadata.updateNormalToken(moving.left, endpoint);
+            //Add ranges after the move
+            for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
+            {
+                moveAffectedRanges.add(range);
+            }
 
-                for(Range<Token> range : moveAffectedRanges)
+            for(Range<Token> range : moveAffectedRanges)
+            {
+                Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, metadata));
+                Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata));
+                Set<InetAddress> difference = Sets.difference(newEndpoints, currentEndpoints);
+                for(final InetAddress address : difference)
                 {
-                    Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, metadata));
-                    Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata));
-                    Set<InetAddress> difference = Sets.difference(newEndpoints, currentEndpoints);
-                    for(final InetAddress address : difference)
-                    {
-                        Collection<Range<Token>> newRanges = strategy.getAddressRanges(allLeftMetadata).get(address);
-                        Collection<Range<Token>> oldRanges = strategy.getAddressRanges(metadata).get(address);
-                        //We want to get rid of any ranges which the node is currently getting.
-                        newRanges.removeAll(oldRanges);
+                    Collection<Range<Token>> newRanges = strategy.getAddressRanges(allLeftMetadata).get(address);
+                    Collection<Range<Token>> oldRanges = strategy.getAddressRanges(metadata).get(address);
+                    //We want to get rid of any ranges which the node is currently getting.
+                    newRanges.removeAll(oldRanges);
 
-                        for(Range<Token> newRange : newRanges)
+                    for(Range<Token> newRange : newRanges)
+                    {
+                        for(Range<Token> pendingRange : newRange.subtractAll(oldRanges))
                         {
-                            for(Range<Token> pendingRange : newRange.subtractAll(oldRanges))
-                            {
-                                newPendingRanges.addPendingRange(pendingRange, address);
-                            }
+                            newPendingRanges.addPendingRange(pendingRange, address);
                         }
                     }
                 }
-
-                allLeftMetadata.removeEndpoint(endpoint);
             }
 
-            pendingRanges.put(keyspaceName, newPendingRanges);
-
-            if (logger.isTraceEnabled())
-                logger.trace("Pending ranges:\n{}", (pendingRanges.isEmpty() ? "<empty>" : printPendingRanges()));
-        }
-        finally
-        {
-            lock.readLock().unlock();
+            allLeftMetadata.removeEndpoint(endpoint);
         }
+
+        return newPendingRanges;
     }
 
     public Token getPredecessor(Token token)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9cd7d540/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
index 806f6a5..116cede 100644
--- a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
+++ b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
@@ -26,6 +26,7 @@ import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.List;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -34,6 +35,9 @@ public class PendingRangeCalculatorService
     public static final PendingRangeCalculatorService instance = new PendingRangeCalculatorService();
 
     private static Logger logger = LoggerFactory.getLogger(PendingRangeCalculatorService.class);
+
+    // the executor will only run a single range calculation at a time while keeping at most one task queued in order
+    // to trigger an update only after the most recent state change and not for each update individually
     private final JMXEnabledThreadPoolExecutor executor = new JMXEnabledThreadPoolExecutor(1, Integer.MAX_VALUE, TimeUnit.SECONDS,
             new LinkedBlockingQueue<Runnable>(1), new NamedThreadFactory("PendingRangeCalculator"), "internal");
 
@@ -58,9 +62,11 @@ public class PendingRangeCalculatorService
             try
             {
                 long start = System.currentTimeMillis();
-                for (String keyspaceName : Schema.instance.getNonSystemKeyspaces())
+                List<String> keyspaces = Schema.instance.getNonSystemKeyspaces();
+                for (String keyspaceName : keyspaces)
                     calculatePendingRanges(Keyspace.open(keyspaceName).getReplicationStrategy(), keyspaceName);
-                logger.debug("finished calculation for {} keyspaces in {}ms", Schema.instance.getNonSystemKeyspaces().size(), System.currentTimeMillis() - start);
+                if (logger.isTraceEnabled())
+                    logger.trace("Finished PendingRangeTask for {} keyspaces in {}ms", keyspaces.size(), System.currentTimeMillis() - start);
             }
             finally
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9cd7d540/test/unit/org/apache/cassandra/gms/PendingRangeCalculatorServiceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/gms/PendingRangeCalculatorServiceTest.java b/test/unit/org/apache/cassandra/gms/PendingRangeCalculatorServiceTest.java
new file mode 100644
index 0000000..507948c
--- /dev/null
+++ b/test/unit/org/apache/cassandra/gms/PendingRangeCalculatorServiceTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.service.StorageService;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+
+/**
+ * Test for "Gossip blocks on startup when another node is bootstrapping" (CASSANDRA-12281).
+ */
+@RunWith(BMUnitRunner.class)
+public class PendingRangeCalculatorServiceTest
+{
+    static ReentrantLock calculationLock = new ReentrantLock();
+
+    @BeforeClass
+    public static void setUp() throws ConfigurationException
+    {
+        SchemaLoader.prepareServer();
+        StorageService.instance.initServer();
+    }
+
+    @Test
+    @BMRule(name = "Block pending range calculation",
+            targetClass = "TokenMetadata",
+            targetMethod = "calculatePendingRanges",
+            targetLocation = "AT INVOKE org.apache.cassandra.locator.AbstractReplicationStrategy.getAddressRanges",
+            action = "org.apache.cassandra.gms.PendingRangeCalculatorServiceTest.calculationLock.lock()")
+    public void testDelayedResponse() throws UnknownHostException, InterruptedException
+    {
+        final InetAddress otherNodeAddr = InetAddress.getByName("127.0.0.2");
+        final UUID otherHostId = UUID.randomUUID();
+
+        // introduce node for first major state change
+        Gossiper.instance.applyStateLocally(getStates(otherNodeAddr, otherHostId, 1, false));
+
+        // acquire lock to block pending range calculation via byteman
+        calculationLock.lock();
+        try
+        {
+            // spawn thread that will trigger handling of a bootstrap state change which in turn will trigger
+            // the pending range calculation that will be blocked by our lock
+            Thread t1 = new Thread()
+            {
+                public void run()
+                {
+                    Gossiper.instance.applyStateLocally(getStates(otherNodeAddr, otherHostId, 2, true));
+                }
+            };
+            t1.start();
+
+            // busy-spin until t1 is blocked by lock
+            while (!calculationLock.hasQueuedThreads()) ;
+
+            // trigger further state changes in case we don't want the blocked thread from the
+            // expensive range calculation to block us here as well
+            Thread t2 = new Thread()
+            {
+                public void run()
+                {
+                    Gossiper.instance.applyStateLocally(getStates(otherNodeAddr, otherHostId, 3, false));
+                    Gossiper.instance.applyStateLocally(getStates(otherNodeAddr, otherHostId, 4, false));
+                    Gossiper.instance.applyStateLocally(getStates(otherNodeAddr, otherHostId, 5, false));
+                }
+            };
+            t2.start();
+            t2.join(2000);
+            assertFalse("Thread still blocked by pending range calculation", t2.isAlive());
+            assertEquals(5, Gossiper.instance.getEndpointStateForEndpoint(otherNodeAddr).getHeartBeatState().getHeartBeatVersion());
+        }
+        finally
+        {
+            calculationLock.unlock();
+        }
+    }
+
+    private Map<InetAddress, EndpointState> getStates(InetAddress otherNodeAddr, UUID hostId, int ver, boolean bootstrapping)
+    {
+        HeartBeatState hb = new HeartBeatState(1, ver);
+        EndpointState state = new EndpointState(hb);
+        Collection<Token> tokens = new ArrayList<>();
+
+        tokens.add(new ByteOrderedPartitioner.BytesToken(new byte[]{1,2,3}));
+        state.addApplicationState(ApplicationState.TOKENS, StorageService.instance.valueFactory.tokens(tokens));
+        state.addApplicationState(ApplicationState.STATUS, bootstrapping ?
+                StorageService.instance.valueFactory.bootstrapping(tokens) : StorageService.instance.valueFactory.normal(tokens));
+        state.addApplicationState(ApplicationState.HOST_ID, StorageService.instance.valueFactory.hostId(hostId));
+        state.addApplicationState(ApplicationState.NET_VERSION, StorageService.instance.valueFactory.networkVersion());
+
+        Map<InetAddress, EndpointState> states = new HashMap<>();
+        states.put(otherNodeAddr, state);
+        return states;
+    }
+}


[08/10] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.X

Posted by al...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.X


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/96d67b10
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/96d67b10
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/96d67b10

Branch: refs/heads/cassandra-3.X
Commit: 96d67b109a2ef858c2753bbb9853d01460cb8f8e
Parents: f33cd55 59b40b3
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Fri Nov 18 17:31:11 2016 +0000
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Fri Nov 18 17:38:31 2016 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +-
 .../apache/cassandra/locator/TokenMetadata.java | 211 +++++++++++--------
 .../service/PendingRangeCalculatorService.java  |   6 +-
 .../gms/PendingRangeCalculatorServiceTest.java  | 133 ++++++++++++
 4 files changed, 264 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d67b10/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index ee73b81,bcd0b5c..2826011
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -150,7 -40,6 +150,7 @@@ Merged from 3.0
   * Correct log message for statistics of offheap memtable flush (CASSANDRA-12776)
   * Explicitly set locale for string validation (CASSANDRA-12541,CASSANDRA-12542,CASSANDRA-12543,CASSANDRA-12545)
  Merged from 2.2:
- =======
++ * Avoid blocking gossip during pending range calculation (CASSANDRA-12281)
   * Fix purgeability of tombstones with max timestamp (CASSANDRA-12792)
   * Fail repair if participant dies during sync or anticompaction (CASSANDRA-12901)
   * cqlsh COPY: unprotected pk values before converting them if not using prepared statements (CASSANDRA-12863)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d67b10/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------


[06/10] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by al...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/59b40b31
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/59b40b31
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/59b40b31

Branch: refs/heads/cassandra-3.0
Commit: 59b40b3173933620bc5f30e26366cd09b3a4ca10
Parents: eb41380 9cd7d54
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Fri Nov 18 17:29:41 2016 +0000
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Fri Nov 18 17:30:46 2016 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +
 .../apache/cassandra/locator/TokenMetadata.java | 213 +++++++++++--------
 .../service/PendingRangeCalculatorService.java  |   6 +-
 .../gms/PendingRangeCalculatorServiceTest.java  | 133 ++++++++++++
 4 files changed, 267 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b40b31/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 8a3ac65,5a2e0ab..bcd0b5c
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,42 -1,5 +1,45 @@@
 -2.2.9
 +3.0.11
 + * Prevent reloading of logback.xml from UDF sandbox (CASSANDRA-12535)
++Merged from 2.2:
+  * Avoid blocking gossip during pending range calculation (CASSANDRA-12281)
++
 +
 +3.0.10
 + * Disallow offheap_buffers memtable allocation (CASSANDRA-11039)
 + * Fix CommitLogSegmentManagerTest (CASSANDRA-12283)
 + * Pass root cause to CorruptBlockException when uncompression failed (CASSANDRA-12889)
 + * Fix partition count log during compaction (CASSANDRA-12184)
 + * Batch with multiple conditional updates for the same partition causes AssertionError (CASSANDRA-12867)
 + * Make AbstractReplicationStrategy extendable from outside its package (CASSANDRA-12788)
 + * Fix CommitLogTest.testDeleteIfNotDirty (CASSANDRA-12854)
 + * Don't tell users to turn off consistent rangemovements during rebuild. (CASSANDRA-12296)
 + * Avoid deadlock due to materialized view lock contention (CASSANDRA-12689)
 + * Fix for KeyCacheCqlTest flakiness (CASSANDRA-12801)
 + * Include SSTable filename in compacting large row message (CASSANDRA-12384)
 + * Fix potential socket leak (CASSANDRA-12329, CASSANDRA-12330)
 + * Fix ViewTest.testCompaction (CASSANDRA-12789)
 + * Improve avg aggregate functions (CASSANDRA-12417)
 + * Preserve quoted reserved keyword column names in MV creation (CASSANDRA-11803)
 + * nodetool stopdaemon errors out (CASSANDRA-12646)
 + * Split materialized view mutations on build to prevent OOM (CASSANDRA-12268)
 + * mx4j does not work in 3.0.8 (CASSANDRA-12274)
 + * Abort cqlsh copy-from in case of no answer after prolonged period of time (CASSANDRA-12740)
 + * Avoid sstable corrupt exception due to dropped static column (CASSANDRA-12582)
 + * Make stress use client mode to avoid checking commit log size on startup (CASSANDRA-12478)
 + * Fix exceptions with new vnode allocation (CASSANDRA-12715)
 + * Unify drain and shutdown processes (CASSANDRA-12509)
 + * Fix NPE in ComponentOfSlice.isEQ() (CASSANDRA-12706)
 + * Fix failure in LogTransactionTest (CASSANDRA-12632)
 + * Fix potentially incomplete non-frozen UDT values when querying with the
 +   full primary key specified (CASSANDRA-12605)
 + * Skip writing MV mutations to commitlog on mutation.applyUnsafe() (CASSANDRA-11670)
 + * Establish consistent distinction between non-existing partition and NULL value for LWTs on static columns (CASSANDRA-12060)
 + * Extend ColumnIdentifier.internedInstances key to include the type that generated the byte buffer (CASSANDRA-12516)
 + * Backport CASSANDRA-10756 (race condition in NativeTransportService shutdown) (CASSANDRA-12472)
 + * If CF has no clustering columns, any row cache is full partition cache (CASSANDRA-12499)
 + * Correct log message for statistics of offheap memtable flush (CASSANDRA-12776)
 + * Explicitly set locale for string validation (CASSANDRA-12541,CASSANDRA-12542,CASSANDRA-12543,CASSANDRA-12545)
 +Merged from 2.2:
   * Fix purgeability of tombstones with max timestamp (CASSANDRA-12792)
   * Fail repair if participant dies during sync or anticompaction (CASSANDRA-12901)
   * cqlsh COPY: unprotected pk values before converting them if not using prepared statements (CASSANDRA-12863)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b40b31/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/locator/TokenMetadata.java
index b50db00,aafd7f9..b44a1a1
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@@ -128,15 -123,6 +128,15 @@@ public class TokenMetadat
          sortedTokens = sortTokens();
      }
  
 +    /**
-      * To be used by tests only (via {@link StorageService.setPartitionerUnsafe}).
++     * To be used by tests only (via {@link StorageService#setPartitionerUnsafe}).
 +     */
 +    @VisibleForTesting
 +    public TokenMetadata cloneWithNewPartitioner(IPartitioner newPartitioner)
 +    {
 +        return new TokenMetadata(tokenToEndpointMap, endpointToHostIdMap, topology, newPartitioner);
 +    }
 +
      private ArrayList<Token> sortTokens()
      {
          return new ArrayList<>(tokenToEndpointMap.keySet());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b40b31/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
index 352a763,116cede..5b1aa0d
--- a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
+++ b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
@@@ -35,8 -35,11 +35,11 @@@ public class PendingRangeCalculatorServ
      public static final PendingRangeCalculatorService instance = new PendingRangeCalculatorService();
  
      private static Logger logger = LoggerFactory.getLogger(PendingRangeCalculatorService.class);
+ 
+     // the executor will only run a single range calculation at a time while keeping at most one task queued in order
+     // to trigger an update only after the most recent state change and not for each update individually
      private final JMXEnabledThreadPoolExecutor executor = new JMXEnabledThreadPoolExecutor(1, Integer.MAX_VALUE, TimeUnit.SECONDS,
 -            new LinkedBlockingQueue<Runnable>(1), new NamedThreadFactory("PendingRangeCalculator"), "internal");
 +            new LinkedBlockingQueue<>(1), new NamedThreadFactory("PendingRangeCalculator"), "internal");
  
      private AtomicInteger updateJobs = new AtomicInteger(0);
  
@@@ -59,10 -62,11 +62,11 @@@
              try
              {
                  long start = System.currentTimeMillis();
 -                List<String> keyspaces = Schema.instance.getNonSystemKeyspaces();
 +                List<String> keyspaces = Schema.instance.getNonLocalStrategyKeyspaces();
                  for (String keyspaceName : keyspaces)
                      calculatePendingRanges(Keyspace.open(keyspaceName).getReplicationStrategy(), keyspaceName);
-                 logger.debug("finished calculation for {} keyspaces in {}ms", keyspaces.size(), System.currentTimeMillis() - start);
+                 if (logger.isTraceEnabled())
+                     logger.trace("Finished PendingRangeTask for {} keyspaces in {}ms", keyspaces.size(), System.currentTimeMillis() - start);
              }
              finally
              {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b40b31/test/unit/org/apache/cassandra/gms/PendingRangeCalculatorServiceTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/gms/PendingRangeCalculatorServiceTest.java
index 0000000,507948c..90bbf1d
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/gms/PendingRangeCalculatorServiceTest.java
+++ b/test/unit/org/apache/cassandra/gms/PendingRangeCalculatorServiceTest.java
@@@ -1,0 -1,133 +1,133 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.gms;
+ 
+ import java.net.InetAddress;
+ import java.net.UnknownHostException;
+ import java.util.ArrayList;
+ import java.util.Collection;
+ import java.util.HashMap;
+ import java.util.Map;
+ import java.util.UUID;
+ import java.util.concurrent.locks.ReentrantLock;
+ 
+ import org.junit.BeforeClass;
+ import org.junit.Test;
+ import org.junit.runner.RunWith;
+ 
+ import org.apache.cassandra.SchemaLoader;
+ import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.dht.ByteOrderedPartitioner;
+ import org.apache.cassandra.dht.Token;
+ import org.apache.cassandra.exceptions.ConfigurationException;
+ import org.apache.cassandra.service.StorageService;
+ import org.jboss.byteman.contrib.bmunit.BMRule;
+ import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+ 
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertFalse;
+ 
+ 
+ /**
+  * Test for "Gossip blocks on startup when another node is bootstrapping" (CASSANDRA-12281).
+  */
+ @RunWith(BMUnitRunner.class)
+ public class PendingRangeCalculatorServiceTest
+ {
+     static ReentrantLock calculationLock = new ReentrantLock();
+ 
+     @BeforeClass
+     public static void setUp() throws ConfigurationException
+     {
+         SchemaLoader.prepareServer();
+         StorageService.instance.initServer();
+     }
+ 
+     @Test
+     @BMRule(name = "Block pending range calculation",
+             targetClass = "TokenMetadata",
+             targetMethod = "calculatePendingRanges",
+             targetLocation = "AT INVOKE org.apache.cassandra.locator.AbstractReplicationStrategy.getAddressRanges",
+             action = "org.apache.cassandra.gms.PendingRangeCalculatorServiceTest.calculationLock.lock()")
+     public void testDelayedResponse() throws UnknownHostException, InterruptedException
+     {
 -        final InetAddress otherNodeAddr = InetAddress.getByName("127.0.0.2");
 -        final UUID otherHostId = UUID.randomUUID();
++        InetAddress otherNodeAddr = InetAddress.getByName("127.0.0.2");
++        UUID otherHostId = UUID.randomUUID();
+ 
+         // introduce node for first major state change
+         Gossiper.instance.applyStateLocally(getStates(otherNodeAddr, otherHostId, 1, false));
+ 
+         // acquire lock to block pending range calculation via byteman
+         calculationLock.lock();
+         try
+         {
+             // spawn thread that will trigger handling of a bootstrap state change which in turn will trigger
+             // the pending range calculation that will be blocked by our lock
+             Thread t1 = new Thread()
+             {
+                 public void run()
+                 {
+                     Gossiper.instance.applyStateLocally(getStates(otherNodeAddr, otherHostId, 2, true));
+                 }
+             };
+             t1.start();
+ 
+             // busy-spin until t1 is blocked by lock
+             while (!calculationLock.hasQueuedThreads()) ;
+ 
+             // trigger further state changes in case we don't want the blocked thread from the
+             // expensive range calculation to block us here as well
+             Thread t2 = new Thread()
+             {
+                 public void run()
+                 {
+                     Gossiper.instance.applyStateLocally(getStates(otherNodeAddr, otherHostId, 3, false));
+                     Gossiper.instance.applyStateLocally(getStates(otherNodeAddr, otherHostId, 4, false));
+                     Gossiper.instance.applyStateLocally(getStates(otherNodeAddr, otherHostId, 5, false));
+                 }
+             };
+             t2.start();
+             t2.join(2000);
+             assertFalse("Thread still blocked by pending range calculation", t2.isAlive());
+             assertEquals(5, Gossiper.instance.getEndpointStateForEndpoint(otherNodeAddr).getHeartBeatState().getHeartBeatVersion());
+         }
+         finally
+         {
+             calculationLock.unlock();
+         }
+     }
+ 
+     private Map<InetAddress, EndpointState> getStates(InetAddress otherNodeAddr, UUID hostId, int ver, boolean bootstrapping)
+     {
+         HeartBeatState hb = new HeartBeatState(1, ver);
+         EndpointState state = new EndpointState(hb);
+         Collection<Token> tokens = new ArrayList<>();
+ 
+         tokens.add(new ByteOrderedPartitioner.BytesToken(new byte[]{1,2,3}));
+         state.addApplicationState(ApplicationState.TOKENS, StorageService.instance.valueFactory.tokens(tokens));
+         state.addApplicationState(ApplicationState.STATUS, bootstrapping ?
+                 StorageService.instance.valueFactory.bootstrapping(tokens) : StorageService.instance.valueFactory.normal(tokens));
+         state.addApplicationState(ApplicationState.HOST_ID, StorageService.instance.valueFactory.hostId(hostId));
+         state.addApplicationState(ApplicationState.NET_VERSION, StorageService.instance.valueFactory.networkVersion());
+ 
+         Map<InetAddress, EndpointState> states = new HashMap<>();
+         states.put(otherNodeAddr, state);
+         return states;
+     }
+ }


[09/10] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.X

Posted by al...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.X


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/96d67b10
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/96d67b10
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/96d67b10

Branch: refs/heads/trunk
Commit: 96d67b109a2ef858c2753bbb9853d01460cb8f8e
Parents: f33cd55 59b40b3
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Fri Nov 18 17:31:11 2016 +0000
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Fri Nov 18 17:38:31 2016 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +-
 .../apache/cassandra/locator/TokenMetadata.java | 211 +++++++++++--------
 .../service/PendingRangeCalculatorService.java  |   6 +-
 .../gms/PendingRangeCalculatorServiceTest.java  | 133 ++++++++++++
 4 files changed, 264 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d67b10/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index ee73b81,bcd0b5c..2826011
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -150,7 -40,6 +150,7 @@@ Merged from 3.0
   * Correct log message for statistics of offheap memtable flush (CASSANDRA-12776)
   * Explicitly set locale for string validation (CASSANDRA-12541,CASSANDRA-12542,CASSANDRA-12543,CASSANDRA-12545)
  Merged from 2.2:
- =======
++ * Avoid blocking gossip during pending range calculation (CASSANDRA-12281)
   * Fix purgeability of tombstones with max timestamp (CASSANDRA-12792)
   * Fail repair if participant dies during sync or anticompaction (CASSANDRA-12901)
   * cqlsh COPY: unprotected pk values before converting them if not using prepared statements (CASSANDRA-12863)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d67b10/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------


[05/10] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by al...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/59b40b31
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/59b40b31
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/59b40b31

Branch: refs/heads/cassandra-3.X
Commit: 59b40b3173933620bc5f30e26366cd09b3a4ca10
Parents: eb41380 9cd7d54
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Fri Nov 18 17:29:41 2016 +0000
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Fri Nov 18 17:30:46 2016 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +
 .../apache/cassandra/locator/TokenMetadata.java | 213 +++++++++++--------
 .../service/PendingRangeCalculatorService.java  |   6 +-
 .../gms/PendingRangeCalculatorServiceTest.java  | 133 ++++++++++++
 4 files changed, 267 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b40b31/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 8a3ac65,5a2e0ab..bcd0b5c
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,42 -1,5 +1,45 @@@
 -2.2.9
 +3.0.11
 + * Prevent reloading of logback.xml from UDF sandbox (CASSANDRA-12535)
++Merged from 2.2:
+  * Avoid blocking gossip during pending range calculation (CASSANDRA-12281)
++
 +
 +3.0.10
 + * Disallow offheap_buffers memtable allocation (CASSANDRA-11039)
 + * Fix CommitLogSegmentManagerTest (CASSANDRA-12283)
 + * Pass root cause to CorruptBlockException when uncompression failed (CASSANDRA-12889)
 + * Fix partition count log during compaction (CASSANDRA-12184)
 + * Batch with multiple conditional updates for the same partition causes AssertionError (CASSANDRA-12867)
 + * Make AbstractReplicationStrategy extendable from outside its package (CASSANDRA-12788)
 + * Fix CommitLogTest.testDeleteIfNotDirty (CASSANDRA-12854)
 + * Don't tell users to turn off consistent rangemovements during rebuild. (CASSANDRA-12296)
 + * Avoid deadlock due to materialized view lock contention (CASSANDRA-12689)
 + * Fix for KeyCacheCqlTest flakiness (CASSANDRA-12801)
 + * Include SSTable filename in compacting large row message (CASSANDRA-12384)
 + * Fix potential socket leak (CASSANDRA-12329, CASSANDRA-12330)
 + * Fix ViewTest.testCompaction (CASSANDRA-12789)
 + * Improve avg aggregate functions (CASSANDRA-12417)
 + * Preserve quoted reserved keyword column names in MV creation (CASSANDRA-11803)
 + * nodetool stopdaemon errors out (CASSANDRA-12646)
 + * Split materialized view mutations on build to prevent OOM (CASSANDRA-12268)
 + * mx4j does not work in 3.0.8 (CASSANDRA-12274)
 + * Abort cqlsh copy-from in case of no answer after prolonged period of time (CASSANDRA-12740)
 + * Avoid sstable corrupt exception due to dropped static column (CASSANDRA-12582)
 + * Make stress use client mode to avoid checking commit log size on startup (CASSANDRA-12478)
 + * Fix exceptions with new vnode allocation (CASSANDRA-12715)
 + * Unify drain and shutdown processes (CASSANDRA-12509)
 + * Fix NPE in ComponentOfSlice.isEQ() (CASSANDRA-12706)
 + * Fix failure in LogTransactionTest (CASSANDRA-12632)
 + * Fix potentially incomplete non-frozen UDT values when querying with the
 +   full primary key specified (CASSANDRA-12605)
 + * Skip writing MV mutations to commitlog on mutation.applyUnsafe() (CASSANDRA-11670)
 + * Establish consistent distinction between non-existing partition and NULL value for LWTs on static columns (CASSANDRA-12060)
 + * Extend ColumnIdentifier.internedInstances key to include the type that generated the byte buffer (CASSANDRA-12516)
 + * Backport CASSANDRA-10756 (race condition in NativeTransportService shutdown) (CASSANDRA-12472)
 + * If CF has no clustering columns, any row cache is full partition cache (CASSANDRA-12499)
 + * Correct log message for statistics of offheap memtable flush (CASSANDRA-12776)
 + * Explicitly set locale for string validation (CASSANDRA-12541,CASSANDRA-12542,CASSANDRA-12543,CASSANDRA-12545)
 +Merged from 2.2:
   * Fix purgeability of tombstones with max timestamp (CASSANDRA-12792)
   * Fail repair if participant dies during sync or anticompaction (CASSANDRA-12901)
   * cqlsh COPY: unprotected pk values before converting them if not using prepared statements (CASSANDRA-12863)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b40b31/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/locator/TokenMetadata.java
index b50db00,aafd7f9..b44a1a1
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@@ -128,15 -123,6 +128,15 @@@ public class TokenMetadat
          sortedTokens = sortTokens();
      }
  
 +    /**
-      * To be used by tests only (via {@link StorageService.setPartitionerUnsafe}).
++     * To be used by tests only (via {@link StorageService#setPartitionerUnsafe}).
 +     */
 +    @VisibleForTesting
 +    public TokenMetadata cloneWithNewPartitioner(IPartitioner newPartitioner)
 +    {
 +        return new TokenMetadata(tokenToEndpointMap, endpointToHostIdMap, topology, newPartitioner);
 +    }
 +
      private ArrayList<Token> sortTokens()
      {
          return new ArrayList<>(tokenToEndpointMap.keySet());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b40b31/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
index 352a763,116cede..5b1aa0d
--- a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
+++ b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
@@@ -35,8 -35,11 +35,11 @@@ public class PendingRangeCalculatorServ
      public static final PendingRangeCalculatorService instance = new PendingRangeCalculatorService();
  
      private static Logger logger = LoggerFactory.getLogger(PendingRangeCalculatorService.class);
+ 
+     // the executor will only run a single range calculation at a time while keeping at most one task queued in order
+     // to trigger an update only after the most recent state change and not for each update individually
      private final JMXEnabledThreadPoolExecutor executor = new JMXEnabledThreadPoolExecutor(1, Integer.MAX_VALUE, TimeUnit.SECONDS,
 -            new LinkedBlockingQueue<Runnable>(1), new NamedThreadFactory("PendingRangeCalculator"), "internal");
 +            new LinkedBlockingQueue<>(1), new NamedThreadFactory("PendingRangeCalculator"), "internal");
  
      private AtomicInteger updateJobs = new AtomicInteger(0);
  
@@@ -59,10 -62,11 +62,11 @@@
              try
              {
                  long start = System.currentTimeMillis();
 -                List<String> keyspaces = Schema.instance.getNonSystemKeyspaces();
 +                List<String> keyspaces = Schema.instance.getNonLocalStrategyKeyspaces();
                  for (String keyspaceName : keyspaces)
                      calculatePendingRanges(Keyspace.open(keyspaceName).getReplicationStrategy(), keyspaceName);
-                 logger.debug("finished calculation for {} keyspaces in {}ms", keyspaces.size(), System.currentTimeMillis() - start);
+                 if (logger.isTraceEnabled())
+                     logger.trace("Finished PendingRangeTask for {} keyspaces in {}ms", keyspaces.size(), System.currentTimeMillis() - start);
              }
              finally
              {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b40b31/test/unit/org/apache/cassandra/gms/PendingRangeCalculatorServiceTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/gms/PendingRangeCalculatorServiceTest.java
index 0000000,507948c..90bbf1d
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/gms/PendingRangeCalculatorServiceTest.java
+++ b/test/unit/org/apache/cassandra/gms/PendingRangeCalculatorServiceTest.java
@@@ -1,0 -1,133 +1,133 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.gms;
+ 
+ import java.net.InetAddress;
+ import java.net.UnknownHostException;
+ import java.util.ArrayList;
+ import java.util.Collection;
+ import java.util.HashMap;
+ import java.util.Map;
+ import java.util.UUID;
+ import java.util.concurrent.locks.ReentrantLock;
+ 
+ import org.junit.BeforeClass;
+ import org.junit.Test;
+ import org.junit.runner.RunWith;
+ 
+ import org.apache.cassandra.SchemaLoader;
+ import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.dht.ByteOrderedPartitioner;
+ import org.apache.cassandra.dht.Token;
+ import org.apache.cassandra.exceptions.ConfigurationException;
+ import org.apache.cassandra.service.StorageService;
+ import org.jboss.byteman.contrib.bmunit.BMRule;
+ import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+ 
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertFalse;
+ 
+ 
+ /**
+  * Test for "Gossip blocks on startup when another node is bootstrapping" (CASSANDRA-12281).
+  */
+ @RunWith(BMUnitRunner.class)
+ public class PendingRangeCalculatorServiceTest
+ {
+     static ReentrantLock calculationLock = new ReentrantLock();
+ 
+     @BeforeClass
+     public static void setUp() throws ConfigurationException
+     {
+         SchemaLoader.prepareServer();
+         StorageService.instance.initServer();
+     }
+ 
+     @Test
+     @BMRule(name = "Block pending range calculation",
+             targetClass = "TokenMetadata",
+             targetMethod = "calculatePendingRanges",
+             targetLocation = "AT INVOKE org.apache.cassandra.locator.AbstractReplicationStrategy.getAddressRanges",
+             action = "org.apache.cassandra.gms.PendingRangeCalculatorServiceTest.calculationLock.lock()")
+     public void testDelayedResponse() throws UnknownHostException, InterruptedException
+     {
 -        final InetAddress otherNodeAddr = InetAddress.getByName("127.0.0.2");
 -        final UUID otherHostId = UUID.randomUUID();
++        InetAddress otherNodeAddr = InetAddress.getByName("127.0.0.2");
++        UUID otherHostId = UUID.randomUUID();
+ 
+         // introduce node for first major state change
+         Gossiper.instance.applyStateLocally(getStates(otherNodeAddr, otherHostId, 1, false));
+ 
+         // acquire lock to block pending range calculation via byteman
+         calculationLock.lock();
+         try
+         {
+             // spawn thread that will trigger handling of a bootstrap state change which in turn will trigger
+             // the pending range calculation that will be blocked by our lock
+             Thread t1 = new Thread()
+             {
+                 public void run()
+                 {
+                     Gossiper.instance.applyStateLocally(getStates(otherNodeAddr, otherHostId, 2, true));
+                 }
+             };
+             t1.start();
+ 
+             // busy-spin until t1 is blocked by lock
+             while (!calculationLock.hasQueuedThreads()) ;
+ 
+             // trigger further state changes in case we don't want the blocked thread from the
+             // expensive range calculation to block us here as well
+             Thread t2 = new Thread()
+             {
+                 public void run()
+                 {
+                     Gossiper.instance.applyStateLocally(getStates(otherNodeAddr, otherHostId, 3, false));
+                     Gossiper.instance.applyStateLocally(getStates(otherNodeAddr, otherHostId, 4, false));
+                     Gossiper.instance.applyStateLocally(getStates(otherNodeAddr, otherHostId, 5, false));
+                 }
+             };
+             t2.start();
+             t2.join(2000);
+             assertFalse("Thread still blocked by pending range calculation", t2.isAlive());
+             assertEquals(5, Gossiper.instance.getEndpointStateForEndpoint(otherNodeAddr).getHeartBeatState().getHeartBeatVersion());
+         }
+         finally
+         {
+             calculationLock.unlock();
+         }
+     }
+ 
+     private Map<InetAddress, EndpointState> getStates(InetAddress otherNodeAddr, UUID hostId, int ver, boolean bootstrapping)
+     {
+         HeartBeatState hb = new HeartBeatState(1, ver);
+         EndpointState state = new EndpointState(hb);
+         Collection<Token> tokens = new ArrayList<>();
+ 
+         tokens.add(new ByteOrderedPartitioner.BytesToken(new byte[]{1,2,3}));
+         state.addApplicationState(ApplicationState.TOKENS, StorageService.instance.valueFactory.tokens(tokens));
+         state.addApplicationState(ApplicationState.STATUS, bootstrapping ?
+                 StorageService.instance.valueFactory.bootstrapping(tokens) : StorageService.instance.valueFactory.normal(tokens));
+         state.addApplicationState(ApplicationState.HOST_ID, StorageService.instance.valueFactory.hostId(hostId));
+         state.addApplicationState(ApplicationState.NET_VERSION, StorageService.instance.valueFactory.networkVersion());
+ 
+         Map<InetAddress, EndpointState> states = new HashMap<>();
+         states.put(otherNodeAddr, state);
+         return states;
+     }
+ }


[10/10] cassandra git commit: Merge branch 'cassandra-3.X' into trunk

Posted by al...@apache.org.
Merge branch 'cassandra-3.X' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f1c3aac7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f1c3aac7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f1c3aac7

Branch: refs/heads/trunk
Commit: f1c3aac7685bd7d847fbb846d91d68a4fa8be1fc
Parents: 29cb591 96d67b1
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Fri Nov 18 17:38:38 2016 +0000
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Fri Nov 18 17:38:38 2016 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +-
 .../apache/cassandra/locator/TokenMetadata.java | 211 +++++++++++--------
 .../service/PendingRangeCalculatorService.java  |   6 +-
 .../gms/PendingRangeCalculatorServiceTest.java  | 133 ++++++++++++
 4 files changed, 264 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1c3aac7/CHANGES.txt
----------------------------------------------------------------------


[04/10] cassandra git commit: Avoid blocking gossip during pending range calculation

Posted by al...@apache.org.
Avoid blocking gossip during pending range calculation

patch by Stefan Podkowinski; reviewed by Joel Knighton for CASSANDRA-12281


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9cd7d540
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9cd7d540
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9cd7d540

Branch: refs/heads/trunk
Commit: 9cd7d540de2ea525982d139a4c8a11233c4e98c9
Parents: 7d2fdfe
Author: Stefan Podkowinski <s....@gmail.com>
Authored: Fri Oct 21 11:34:53 2016 +0200
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Fri Nov 18 17:29:14 2016 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/locator/TokenMetadata.java | 211 +++++++++++--------
 .../service/PendingRangeCalculatorService.java  |  10 +-
 .../gms/PendingRangeCalculatorServiceTest.java  | 133 ++++++++++++
 4 files changed, 267 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9cd7d540/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 54dc4b5..5a2e0ab 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.9
+ * Avoid blocking gossip during pending range calculation (CASSANDRA-12281)
  * Fix purgeability of tombstones with max timestamp (CASSANDRA-12792)
  * Fail repair if participant dies during sync or anticompaction (CASSANDRA-12901)
  * cqlsh COPY: unprotected pk values before converting them if not using prepared statements (CASSANDRA-12863)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9cd7d540/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index b06c9c8..aafd7f9 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -649,12 +649,7 @@ public class TokenMetadata
         lock.readLock().lock();
         try
         {
-            TokenMetadata allLeftMetadata = cloneOnlyTokenMap();
-
-            for (InetAddress endpoint : leavingEndpoints)
-                allLeftMetadata.removeEndpoint(endpoint);
-
-            return allLeftMetadata;
+            return removeEndpoints(cloneOnlyTokenMap(), leavingEndpoints);
         }
         finally
         {
@@ -662,6 +657,14 @@ public class TokenMetadata
         }
     }
 
+    private static TokenMetadata removeEndpoints(TokenMetadata allLeftMetadata, Set<InetAddress> leavingEndpoints)
+    {
+        for (InetAddress endpoint : leavingEndpoints)
+            allLeftMetadata.removeEndpoint(endpoint);
+
+        return allLeftMetadata;
+    }
+
     /**
      * Create a copy of TokenMetadata with tokenToEndpointMap reflecting situation after all
      * current leave, and move operations have finished.
@@ -787,118 +790,154 @@ public class TokenMetadata
      */
     public void calculatePendingRanges(AbstractReplicationStrategy strategy, String keyspaceName)
     {
-        lock.readLock().lock();
-        try
+        // avoid race between both branches - do not use a lock here as this will block any other unrelated operations!
+        synchronized (pendingRanges)
         {
-            PendingRangeMaps newPendingRanges = new PendingRangeMaps();
-
             if (bootstrapTokens.isEmpty() && leavingEndpoints.isEmpty() && movingEndpoints.isEmpty())
             {
                 if (logger.isTraceEnabled())
                     logger.trace("No bootstrapping, leaving or moving nodes -> empty pending ranges for {}", keyspaceName);
 
-                pendingRanges.put(keyspaceName, newPendingRanges);
-                return;
+                pendingRanges.put(keyspaceName, new PendingRangeMaps());
             }
+            else
+            {
+                if (logger.isDebugEnabled())
+                    logger.debug("Starting pending range calculation for {}", keyspaceName);
 
-            Multimap<InetAddress, Range<Token>> addressRanges = strategy.getAddressRanges();
-
-            // Copy of metadata reflecting the situation after all leave operations are finished.
-            TokenMetadata allLeftMetadata = cloneAfterAllLeft();
+                long startedAt = System.currentTimeMillis();
 
-            // get all ranges that will be affected by leaving nodes
-            Set<Range<Token>> affectedRanges = new HashSet<Range<Token>>();
-            for (InetAddress endpoint : leavingEndpoints)
-                affectedRanges.addAll(addressRanges.get(endpoint));
+                // create clone of current state
+                BiMultiValMap<Token, InetAddress> bootstrapTokens = new BiMultiValMap<>();
+                Set<InetAddress> leavingEndpoints = new HashSet<>();
+                Set<Pair<Token, InetAddress>> movingEndpoints = new HashSet<>();
+                TokenMetadata metadata;
 
-            // for each of those ranges, find what new nodes will be responsible for the range when
-            // all leaving nodes are gone.
-            TokenMetadata metadata = cloneOnlyTokenMap(); // don't do this in the loop! #7758
-            for (Range<Token> range : affectedRanges)
-            {
-                Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, metadata));
-                Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata));
-                for (InetAddress address : Sets.difference(newEndpoints, currentEndpoints))
+                lock.readLock().lock();
+                try
+                {
+                    bootstrapTokens.putAll(this.bootstrapTokens);
+                    leavingEndpoints.addAll(this.leavingEndpoints);
+                    movingEndpoints.addAll(this.movingEndpoints);
+                    metadata = this.cloneOnlyTokenMap();
+                }
+                finally
                 {
-                    newPendingRanges.addPendingRange(range, address);
+                    lock.readLock().unlock();
                 }
+
+                pendingRanges.put(keyspaceName, calculatePendingRanges(strategy, metadata, bootstrapTokens,
+                                                                       leavingEndpoints, movingEndpoints));
+                long took = System.currentTimeMillis() - startedAt;
+
+                if (logger.isDebugEnabled())
+                    logger.debug("Pending range calculation for {} completed (took: {}ms)", keyspaceName, took);
+                if (logger.isTraceEnabled())
+                    logger.trace("Calculated pending ranges for {}:\n{}", keyspaceName, (pendingRanges.isEmpty() ? "<empty>" : printPendingRanges()));
             }
+        }
+    }
+
+    /**
+     * @see TokenMetadata#calculatePendingRanges(AbstractReplicationStrategy, String)
+     */
+    private static PendingRangeMaps calculatePendingRanges(AbstractReplicationStrategy strategy,
+                                                           TokenMetadata metadata,
+                                                           BiMultiValMap<Token, InetAddress> bootstrapTokens,
+                                                           Set<InetAddress> leavingEndpoints,
+                                                           Set<Pair<Token, InetAddress>> movingEndpoints)
+    {
+        PendingRangeMaps newPendingRanges = new PendingRangeMaps();
+
+        Multimap<InetAddress, Range<Token>> addressRanges = strategy.getAddressRanges(metadata);
 
-            // At this stage newPendingRanges has been updated according to leave operations. We can
-            // now continue the calculation by checking bootstrapping nodes.
+        // Copy of metadata reflecting the situation after all leave operations are finished.
+        TokenMetadata allLeftMetadata = removeEndpoints(metadata.cloneOnlyTokenMap(), leavingEndpoints);
 
-            // For each of the bootstrapping nodes, simply add and remove them one by one to
-            // allLeftMetadata and check in between what their ranges would be.
-            Multimap<InetAddress, Token> bootstrapAddresses = bootstrapTokens.inverse();
-            for (InetAddress endpoint : bootstrapAddresses.keySet())
+        // get all ranges that will be affected by leaving nodes
+        Set<Range<Token>> affectedRanges = new HashSet<Range<Token>>();
+        for (InetAddress endpoint : leavingEndpoints)
+            affectedRanges.addAll(addressRanges.get(endpoint));
+
+        // for each of those ranges, find what new nodes will be responsible for the range when
+        // all leaving nodes are gone.
+        for (Range<Token> range : affectedRanges)
+        {
+            Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, metadata));
+            Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata));
+            for (InetAddress address : Sets.difference(newEndpoints, currentEndpoints))
             {
-                Collection<Token> tokens = bootstrapAddresses.get(endpoint);
+                newPendingRanges.addPendingRange(range, address);
+            }
+        }
 
-                allLeftMetadata.updateNormalTokens(tokens, endpoint);
-                for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
-                {
-                    newPendingRanges.addPendingRange(range, endpoint);
-                }
-                allLeftMetadata.removeEndpoint(endpoint);
+        // At this stage newPendingRanges has been updated according to leave operations. We can
+        // now continue the calculation by checking bootstrapping nodes.
+
+        // For each of the bootstrapping nodes, simply add and remove them one by one to
+        // allLeftMetadata and check in between what their ranges would be.
+        Multimap<InetAddress, Token> bootstrapAddresses = bootstrapTokens.inverse();
+        for (InetAddress endpoint : bootstrapAddresses.keySet())
+        {
+            Collection<Token> tokens = bootstrapAddresses.get(endpoint);
+
+            allLeftMetadata.updateNormalTokens(tokens, endpoint);
+            for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
+            {
+                newPendingRanges.addPendingRange(range, endpoint);
             }
+            allLeftMetadata.removeEndpoint(endpoint);
+        }
 
-            // At this stage newPendingRanges has been updated according to leaving and bootstrapping nodes.
-            // We can now finish the calculation by checking moving nodes.
+        // At this stage newPendingRanges has been updated according to leaving and bootstrapping nodes.
+        // We can now finish the calculation by checking moving nodes.
 
-            // For each of the moving nodes, we do the same thing we did for bootstrapping:
-            // simply add and remove them one by one to allLeftMetadata and check in between what their ranges would be.
-            for (Pair<Token, InetAddress> moving : movingEndpoints)
+        // For each of the moving nodes, we do the same thing we did for bootstrapping:
+        // simply add and remove them one by one to allLeftMetadata and check in between what their ranges would be.
+        for (Pair<Token, InetAddress> moving : movingEndpoints)
+        {
+            //Calculate all the ranges which will could be affected. This will include the ranges before and after the move.
+            Set<Range<Token>> moveAffectedRanges = new HashSet<>();
+            InetAddress endpoint = moving.right; // address of the moving node
+            //Add ranges before the move
+            for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
             {
-                //Calculate all the ranges which will could be affected. This will include the ranges before and after the move.
-                Set<Range<Token>> moveAffectedRanges = new HashSet<>();
-                InetAddress endpoint = moving.right; // address of the moving node
-                //Add ranges before the move
-                for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
-                {
-                    moveAffectedRanges.add(range);
-                }
+                moveAffectedRanges.add(range);
+            }
 
-                allLeftMetadata.updateNormalToken(moving.left, endpoint);
-                //Add ranges after the move
-                for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
-                {
-                    moveAffectedRanges.add(range);
-                }
+            allLeftMetadata.updateNormalToken(moving.left, endpoint);
+            //Add ranges after the move
+            for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
+            {
+                moveAffectedRanges.add(range);
+            }
 
-                for(Range<Token> range : moveAffectedRanges)
+            for(Range<Token> range : moveAffectedRanges)
+            {
+                Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, metadata));
+                Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata));
+                Set<InetAddress> difference = Sets.difference(newEndpoints, currentEndpoints);
+                for(final InetAddress address : difference)
                 {
-                    Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, metadata));
-                    Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata));
-                    Set<InetAddress> difference = Sets.difference(newEndpoints, currentEndpoints);
-                    for(final InetAddress address : difference)
-                    {
-                        Collection<Range<Token>> newRanges = strategy.getAddressRanges(allLeftMetadata).get(address);
-                        Collection<Range<Token>> oldRanges = strategy.getAddressRanges(metadata).get(address);
-                        //We want to get rid of any ranges which the node is currently getting.
-                        newRanges.removeAll(oldRanges);
+                    Collection<Range<Token>> newRanges = strategy.getAddressRanges(allLeftMetadata).get(address);
+                    Collection<Range<Token>> oldRanges = strategy.getAddressRanges(metadata).get(address);
+                    //We want to get rid of any ranges which the node is currently getting.
+                    newRanges.removeAll(oldRanges);
 
-                        for(Range<Token> newRange : newRanges)
+                    for(Range<Token> newRange : newRanges)
+                    {
+                        for(Range<Token> pendingRange : newRange.subtractAll(oldRanges))
                         {
-                            for(Range<Token> pendingRange : newRange.subtractAll(oldRanges))
-                            {
-                                newPendingRanges.addPendingRange(pendingRange, address);
-                            }
+                            newPendingRanges.addPendingRange(pendingRange, address);
                         }
                     }
                 }
-
-                allLeftMetadata.removeEndpoint(endpoint);
             }
 
-            pendingRanges.put(keyspaceName, newPendingRanges);
-
-            if (logger.isTraceEnabled())
-                logger.trace("Pending ranges:\n{}", (pendingRanges.isEmpty() ? "<empty>" : printPendingRanges()));
-        }
-        finally
-        {
-            lock.readLock().unlock();
+            allLeftMetadata.removeEndpoint(endpoint);
         }
+
+        return newPendingRanges;
     }
 
     public Token getPredecessor(Token token)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9cd7d540/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
index 806f6a5..116cede 100644
--- a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
+++ b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
@@ -26,6 +26,7 @@ import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.List;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -34,6 +35,9 @@ public class PendingRangeCalculatorService
     public static final PendingRangeCalculatorService instance = new PendingRangeCalculatorService();
 
     private static Logger logger = LoggerFactory.getLogger(PendingRangeCalculatorService.class);
+
+    // the executor will only run a single range calculation at a time while keeping at most one task queued in order
+    // to trigger an update only after the most recent state change and not for each update individually
     private final JMXEnabledThreadPoolExecutor executor = new JMXEnabledThreadPoolExecutor(1, Integer.MAX_VALUE, TimeUnit.SECONDS,
             new LinkedBlockingQueue<Runnable>(1), new NamedThreadFactory("PendingRangeCalculator"), "internal");
 
@@ -58,9 +62,11 @@ public class PendingRangeCalculatorService
             try
             {
                 long start = System.currentTimeMillis();
-                for (String keyspaceName : Schema.instance.getNonSystemKeyspaces())
+                List<String> keyspaces = Schema.instance.getNonSystemKeyspaces();
+                for (String keyspaceName : keyspaces)
                     calculatePendingRanges(Keyspace.open(keyspaceName).getReplicationStrategy(), keyspaceName);
-                logger.debug("finished calculation for {} keyspaces in {}ms", Schema.instance.getNonSystemKeyspaces().size(), System.currentTimeMillis() - start);
+                if (logger.isTraceEnabled())
+                    logger.trace("Finished PendingRangeTask for {} keyspaces in {}ms", keyspaces.size(), System.currentTimeMillis() - start);
             }
             finally
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9cd7d540/test/unit/org/apache/cassandra/gms/PendingRangeCalculatorServiceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/gms/PendingRangeCalculatorServiceTest.java b/test/unit/org/apache/cassandra/gms/PendingRangeCalculatorServiceTest.java
new file mode 100644
index 0000000..507948c
--- /dev/null
+++ b/test/unit/org/apache/cassandra/gms/PendingRangeCalculatorServiceTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.service.StorageService;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+
+/**
+ * Test for "Gossip blocks on startup when another node is bootstrapping" (CASSANDRA-12281).
+ */
+@RunWith(BMUnitRunner.class)
+public class PendingRangeCalculatorServiceTest
+{
+    static ReentrantLock calculationLock = new ReentrantLock();
+
+    @BeforeClass
+    public static void setUp() throws ConfigurationException
+    {
+        SchemaLoader.prepareServer();
+        StorageService.instance.initServer();
+    }
+
+    @Test
+    @BMRule(name = "Block pending range calculation",
+            targetClass = "TokenMetadata",
+            targetMethod = "calculatePendingRanges",
+            targetLocation = "AT INVOKE org.apache.cassandra.locator.AbstractReplicationStrategy.getAddressRanges",
+            action = "org.apache.cassandra.gms.PendingRangeCalculatorServiceTest.calculationLock.lock()")
+    public void testDelayedResponse() throws UnknownHostException, InterruptedException
+    {
+        final InetAddress otherNodeAddr = InetAddress.getByName("127.0.0.2");
+        final UUID otherHostId = UUID.randomUUID();
+
+        // introduce node for first major state change
+        Gossiper.instance.applyStateLocally(getStates(otherNodeAddr, otherHostId, 1, false));
+
+        // acquire lock to block pending range calculation via byteman
+        calculationLock.lock();
+        try
+        {
+            // spawn thread that will trigger handling of a bootstrap state change which in turn will trigger
+            // the pending range calculation that will be blocked by our lock
+            Thread t1 = new Thread()
+            {
+                public void run()
+                {
+                    Gossiper.instance.applyStateLocally(getStates(otherNodeAddr, otherHostId, 2, true));
+                }
+            };
+            t1.start();
+
+            // busy-spin until t1 is blocked by lock
+            while (!calculationLock.hasQueuedThreads()) ;
+
+            // trigger further state changes in case we don't want the blocked thread from the
+            // expensive range calculation to block us here as well
+            Thread t2 = new Thread()
+            {
+                public void run()
+                {
+                    Gossiper.instance.applyStateLocally(getStates(otherNodeAddr, otherHostId, 3, false));
+                    Gossiper.instance.applyStateLocally(getStates(otherNodeAddr, otherHostId, 4, false));
+                    Gossiper.instance.applyStateLocally(getStates(otherNodeAddr, otherHostId, 5, false));
+                }
+            };
+            t2.start();
+            t2.join(2000);
+            assertFalse("Thread still blocked by pending range calculation", t2.isAlive());
+            assertEquals(5, Gossiper.instance.getEndpointStateForEndpoint(otherNodeAddr).getHeartBeatState().getHeartBeatVersion());
+        }
+        finally
+        {
+            calculationLock.unlock();
+        }
+    }
+
+    private Map<InetAddress, EndpointState> getStates(InetAddress otherNodeAddr, UUID hostId, int ver, boolean bootstrapping)
+    {
+        HeartBeatState hb = new HeartBeatState(1, ver);
+        EndpointState state = new EndpointState(hb);
+        Collection<Token> tokens = new ArrayList<>();
+
+        tokens.add(new ByteOrderedPartitioner.BytesToken(new byte[]{1,2,3}));
+        state.addApplicationState(ApplicationState.TOKENS, StorageService.instance.valueFactory.tokens(tokens));
+        state.addApplicationState(ApplicationState.STATUS, bootstrapping ?
+                StorageService.instance.valueFactory.bootstrapping(tokens) : StorageService.instance.valueFactory.normal(tokens));
+        state.addApplicationState(ApplicationState.HOST_ID, StorageService.instance.valueFactory.hostId(hostId));
+        state.addApplicationState(ApplicationState.NET_VERSION, StorageService.instance.valueFactory.networkVersion());
+
+        Map<InetAddress, EndpointState> states = new HashMap<>();
+        states.put(otherNodeAddr, state);
+        return states;
+    }
+}


[07/10] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by al...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/59b40b31
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/59b40b31
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/59b40b31

Branch: refs/heads/trunk
Commit: 59b40b3173933620bc5f30e26366cd09b3a4ca10
Parents: eb41380 9cd7d54
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Fri Nov 18 17:29:41 2016 +0000
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Fri Nov 18 17:30:46 2016 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +
 .../apache/cassandra/locator/TokenMetadata.java | 213 +++++++++++--------
 .../service/PendingRangeCalculatorService.java  |   6 +-
 .../gms/PendingRangeCalculatorServiceTest.java  | 133 ++++++++++++
 4 files changed, 267 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b40b31/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 8a3ac65,5a2e0ab..bcd0b5c
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,42 -1,5 +1,45 @@@
 -2.2.9
 +3.0.11
 + * Prevent reloading of logback.xml from UDF sandbox (CASSANDRA-12535)
++Merged from 2.2:
+  * Avoid blocking gossip during pending range calculation (CASSANDRA-12281)
++
 +
 +3.0.10
 + * Disallow offheap_buffers memtable allocation (CASSANDRA-11039)
 + * Fix CommitLogSegmentManagerTest (CASSANDRA-12283)
 + * Pass root cause to CorruptBlockException when uncompression failed (CASSANDRA-12889)
 + * Fix partition count log during compaction (CASSANDRA-12184)
 + * Batch with multiple conditional updates for the same partition causes AssertionError (CASSANDRA-12867)
 + * Make AbstractReplicationStrategy extendable from outside its package (CASSANDRA-12788)
 + * Fix CommitLogTest.testDeleteIfNotDirty (CASSANDRA-12854)
 + * Don't tell users to turn off consistent rangemovements during rebuild. (CASSANDRA-12296)
 + * Avoid deadlock due to materialized view lock contention (CASSANDRA-12689)
 + * Fix for KeyCacheCqlTest flakiness (CASSANDRA-12801)
 + * Include SSTable filename in compacting large row message (CASSANDRA-12384)
 + * Fix potential socket leak (CASSANDRA-12329, CASSANDRA-12330)
 + * Fix ViewTest.testCompaction (CASSANDRA-12789)
 + * Improve avg aggregate functions (CASSANDRA-12417)
 + * Preserve quoted reserved keyword column names in MV creation (CASSANDRA-11803)
 + * nodetool stopdaemon errors out (CASSANDRA-12646)
 + * Split materialized view mutations on build to prevent OOM (CASSANDRA-12268)
 + * mx4j does not work in 3.0.8 (CASSANDRA-12274)
 + * Abort cqlsh copy-from in case of no answer after prolonged period of time (CASSANDRA-12740)
 + * Avoid sstable corrupt exception due to dropped static column (CASSANDRA-12582)
 + * Make stress use client mode to avoid checking commit log size on startup (CASSANDRA-12478)
 + * Fix exceptions with new vnode allocation (CASSANDRA-12715)
 + * Unify drain and shutdown processes (CASSANDRA-12509)
 + * Fix NPE in ComponentOfSlice.isEQ() (CASSANDRA-12706)
 + * Fix failure in LogTransactionTest (CASSANDRA-12632)
 + * Fix potentially incomplete non-frozen UDT values when querying with the
 +   full primary key specified (CASSANDRA-12605)
 + * Skip writing MV mutations to commitlog on mutation.applyUnsafe() (CASSANDRA-11670)
 + * Establish consistent distinction between non-existing partition and NULL value for LWTs on static columns (CASSANDRA-12060)
 + * Extend ColumnIdentifier.internedInstances key to include the type that generated the byte buffer (CASSANDRA-12516)
 + * Backport CASSANDRA-10756 (race condition in NativeTransportService shutdown) (CASSANDRA-12472)
 + * If CF has no clustering columns, any row cache is full partition cache (CASSANDRA-12499)
 + * Correct log message for statistics of offheap memtable flush (CASSANDRA-12776)
 + * Explicitly set locale for string validation (CASSANDRA-12541,CASSANDRA-12542,CASSANDRA-12543,CASSANDRA-12545)
 +Merged from 2.2:
   * Fix purgeability of tombstones with max timestamp (CASSANDRA-12792)
   * Fail repair if participant dies during sync or anticompaction (CASSANDRA-12901)
   * cqlsh COPY: unprotected pk values before converting them if not using prepared statements (CASSANDRA-12863)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b40b31/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/locator/TokenMetadata.java
index b50db00,aafd7f9..b44a1a1
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@@ -128,15 -123,6 +128,15 @@@ public class TokenMetadat
          sortedTokens = sortTokens();
      }
  
 +    /**
-      * To be used by tests only (via {@link StorageService.setPartitionerUnsafe}).
++     * To be used by tests only (via {@link StorageService#setPartitionerUnsafe}).
 +     */
 +    @VisibleForTesting
 +    public TokenMetadata cloneWithNewPartitioner(IPartitioner newPartitioner)
 +    {
 +        return new TokenMetadata(tokenToEndpointMap, endpointToHostIdMap, topology, newPartitioner);
 +    }
 +
      private ArrayList<Token> sortTokens()
      {
          return new ArrayList<>(tokenToEndpointMap.keySet());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b40b31/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
index 352a763,116cede..5b1aa0d
--- a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
+++ b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
@@@ -35,8 -35,11 +35,11 @@@ public class PendingRangeCalculatorServ
      public static final PendingRangeCalculatorService instance = new PendingRangeCalculatorService();
  
      private static Logger logger = LoggerFactory.getLogger(PendingRangeCalculatorService.class);
+ 
+     // the executor will only run a single range calculation at a time while keeping at most one task queued in order
+     // to trigger an update only after the most recent state change and not for each update individually
      private final JMXEnabledThreadPoolExecutor executor = new JMXEnabledThreadPoolExecutor(1, Integer.MAX_VALUE, TimeUnit.SECONDS,
 -            new LinkedBlockingQueue<Runnable>(1), new NamedThreadFactory("PendingRangeCalculator"), "internal");
 +            new LinkedBlockingQueue<>(1), new NamedThreadFactory("PendingRangeCalculator"), "internal");
  
      private AtomicInteger updateJobs = new AtomicInteger(0);
  
@@@ -59,10 -62,11 +62,11 @@@
              try
              {
                  long start = System.currentTimeMillis();
 -                List<String> keyspaces = Schema.instance.getNonSystemKeyspaces();
 +                List<String> keyspaces = Schema.instance.getNonLocalStrategyKeyspaces();
                  for (String keyspaceName : keyspaces)
                      calculatePendingRanges(Keyspace.open(keyspaceName).getReplicationStrategy(), keyspaceName);
-                 logger.debug("finished calculation for {} keyspaces in {}ms", keyspaces.size(), System.currentTimeMillis() - start);
+                 if (logger.isTraceEnabled())
+                     logger.trace("Finished PendingRangeTask for {} keyspaces in {}ms", keyspaces.size(), System.currentTimeMillis() - start);
              }
              finally
              {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b40b31/test/unit/org/apache/cassandra/gms/PendingRangeCalculatorServiceTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/gms/PendingRangeCalculatorServiceTest.java
index 0000000,507948c..90bbf1d
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/gms/PendingRangeCalculatorServiceTest.java
+++ b/test/unit/org/apache/cassandra/gms/PendingRangeCalculatorServiceTest.java
@@@ -1,0 -1,133 +1,133 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.gms;
+ 
+ import java.net.InetAddress;
+ import java.net.UnknownHostException;
+ import java.util.ArrayList;
+ import java.util.Collection;
+ import java.util.HashMap;
+ import java.util.Map;
+ import java.util.UUID;
+ import java.util.concurrent.locks.ReentrantLock;
+ 
+ import org.junit.BeforeClass;
+ import org.junit.Test;
+ import org.junit.runner.RunWith;
+ 
+ import org.apache.cassandra.SchemaLoader;
+ import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.dht.ByteOrderedPartitioner;
+ import org.apache.cassandra.dht.Token;
+ import org.apache.cassandra.exceptions.ConfigurationException;
+ import org.apache.cassandra.service.StorageService;
+ import org.jboss.byteman.contrib.bmunit.BMRule;
+ import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+ 
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertFalse;
+ 
+ 
+ /**
+  * Test for "Gossip blocks on startup when another node is bootstrapping" (CASSANDRA-12281).
+  */
+ @RunWith(BMUnitRunner.class)
+ public class PendingRangeCalculatorServiceTest
+ {
+     static ReentrantLock calculationLock = new ReentrantLock();
+ 
+     @BeforeClass
+     public static void setUp() throws ConfigurationException
+     {
+         SchemaLoader.prepareServer();
+         StorageService.instance.initServer();
+     }
+ 
+     @Test
+     @BMRule(name = "Block pending range calculation",
+             targetClass = "TokenMetadata",
+             targetMethod = "calculatePendingRanges",
+             targetLocation = "AT INVOKE org.apache.cassandra.locator.AbstractReplicationStrategy.getAddressRanges",
+             action = "org.apache.cassandra.gms.PendingRangeCalculatorServiceTest.calculationLock.lock()")
+     public void testDelayedResponse() throws UnknownHostException, InterruptedException
+     {
 -        final InetAddress otherNodeAddr = InetAddress.getByName("127.0.0.2");
 -        final UUID otherHostId = UUID.randomUUID();
++        InetAddress otherNodeAddr = InetAddress.getByName("127.0.0.2");
++        UUID otherHostId = UUID.randomUUID();
+ 
+         // introduce node for first major state change
+         Gossiper.instance.applyStateLocally(getStates(otherNodeAddr, otherHostId, 1, false));
+ 
+         // acquire lock to block pending range calculation via byteman
+         calculationLock.lock();
+         try
+         {
+             // spawn thread that will trigger handling of a bootstrap state change which in turn will trigger
+             // the pending range calculation that will be blocked by our lock
+             Thread t1 = new Thread()
+             {
+                 public void run()
+                 {
+                     Gossiper.instance.applyStateLocally(getStates(otherNodeAddr, otherHostId, 2, true));
+                 }
+             };
+             t1.start();
+ 
+             // busy-spin until t1 is blocked by lock
+             while (!calculationLock.hasQueuedThreads()) ;
+ 
+             // trigger further state changes in case we don't want the blocked thread from the
+             // expensive range calculation to block us here as well
+             Thread t2 = new Thread()
+             {
+                 public void run()
+                 {
+                     Gossiper.instance.applyStateLocally(getStates(otherNodeAddr, otherHostId, 3, false));
+                     Gossiper.instance.applyStateLocally(getStates(otherNodeAddr, otherHostId, 4, false));
+                     Gossiper.instance.applyStateLocally(getStates(otherNodeAddr, otherHostId, 5, false));
+                 }
+             };
+             t2.start();
+             t2.join(2000);
+             assertFalse("Thread still blocked by pending range calculation", t2.isAlive());
+             assertEquals(5, Gossiper.instance.getEndpointStateForEndpoint(otherNodeAddr).getHeartBeatState().getHeartBeatVersion());
+         }
+         finally
+         {
+             calculationLock.unlock();
+         }
+     }
+ 
+     private Map<InetAddress, EndpointState> getStates(InetAddress otherNodeAddr, UUID hostId, int ver, boolean bootstrapping)
+     {
+         HeartBeatState hb = new HeartBeatState(1, ver);
+         EndpointState state = new EndpointState(hb);
+         Collection<Token> tokens = new ArrayList<>();
+ 
+         tokens.add(new ByteOrderedPartitioner.BytesToken(new byte[]{1,2,3}));
+         state.addApplicationState(ApplicationState.TOKENS, StorageService.instance.valueFactory.tokens(tokens));
+         state.addApplicationState(ApplicationState.STATUS, bootstrapping ?
+                 StorageService.instance.valueFactory.bootstrapping(tokens) : StorageService.instance.valueFactory.normal(tokens));
+         state.addApplicationState(ApplicationState.HOST_ID, StorageService.instance.valueFactory.hostId(hostId));
+         state.addApplicationState(ApplicationState.NET_VERSION, StorageService.instance.valueFactory.networkVersion());
+ 
+         Map<InetAddress, EndpointState> states = new HashMap<>();
+         states.put(otherNodeAddr, state);
+         return states;
+     }
+ }


[03/10] cassandra git commit: Avoid blocking gossip during pending range calculation

Posted by al...@apache.org.
Avoid blocking gossip during pending range calculation

patch by Stefan Podkowinski; reviewed by Joel Knighton for CASSANDRA-12281


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9cd7d540
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9cd7d540
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9cd7d540

Branch: refs/heads/cassandra-3.X
Commit: 9cd7d540de2ea525982d139a4c8a11233c4e98c9
Parents: 7d2fdfe
Author: Stefan Podkowinski <s....@gmail.com>
Authored: Fri Oct 21 11:34:53 2016 +0200
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Fri Nov 18 17:29:14 2016 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/locator/TokenMetadata.java | 211 +++++++++++--------
 .../service/PendingRangeCalculatorService.java  |  10 +-
 .../gms/PendingRangeCalculatorServiceTest.java  | 133 ++++++++++++
 4 files changed, 267 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9cd7d540/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 54dc4b5..5a2e0ab 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.9
+ * Avoid blocking gossip during pending range calculation (CASSANDRA-12281)
  * Fix purgeability of tombstones with max timestamp (CASSANDRA-12792)
  * Fail repair if participant dies during sync or anticompaction (CASSANDRA-12901)
  * cqlsh COPY: unprotected pk values before converting them if not using prepared statements (CASSANDRA-12863)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9cd7d540/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index b06c9c8..aafd7f9 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -649,12 +649,7 @@ public class TokenMetadata
         lock.readLock().lock();
         try
         {
-            TokenMetadata allLeftMetadata = cloneOnlyTokenMap();
-
-            for (InetAddress endpoint : leavingEndpoints)
-                allLeftMetadata.removeEndpoint(endpoint);
-
-            return allLeftMetadata;
+            return removeEndpoints(cloneOnlyTokenMap(), leavingEndpoints);
         }
         finally
         {
@@ -662,6 +657,14 @@ public class TokenMetadata
         }
     }
 
+    private static TokenMetadata removeEndpoints(TokenMetadata allLeftMetadata, Set<InetAddress> leavingEndpoints)
+    {
+        for (InetAddress endpoint : leavingEndpoints)
+            allLeftMetadata.removeEndpoint(endpoint);
+
+        return allLeftMetadata;
+    }
+
     /**
      * Create a copy of TokenMetadata with tokenToEndpointMap reflecting situation after all
      * current leave, and move operations have finished.
@@ -787,118 +790,154 @@ public class TokenMetadata
      */
     public void calculatePendingRanges(AbstractReplicationStrategy strategy, String keyspaceName)
     {
-        lock.readLock().lock();
-        try
+        // avoid race between both branches - do not use a lock here as this will block any other unrelated operations!
+        synchronized (pendingRanges)
         {
-            PendingRangeMaps newPendingRanges = new PendingRangeMaps();
-
             if (bootstrapTokens.isEmpty() && leavingEndpoints.isEmpty() && movingEndpoints.isEmpty())
             {
                 if (logger.isTraceEnabled())
                     logger.trace("No bootstrapping, leaving or moving nodes -> empty pending ranges for {}", keyspaceName);
 
-                pendingRanges.put(keyspaceName, newPendingRanges);
-                return;
+                pendingRanges.put(keyspaceName, new PendingRangeMaps());
             }
+            else
+            {
+                if (logger.isDebugEnabled())
+                    logger.debug("Starting pending range calculation for {}", keyspaceName);
 
-            Multimap<InetAddress, Range<Token>> addressRanges = strategy.getAddressRanges();
-
-            // Copy of metadata reflecting the situation after all leave operations are finished.
-            TokenMetadata allLeftMetadata = cloneAfterAllLeft();
+                long startedAt = System.currentTimeMillis();
 
-            // get all ranges that will be affected by leaving nodes
-            Set<Range<Token>> affectedRanges = new HashSet<Range<Token>>();
-            for (InetAddress endpoint : leavingEndpoints)
-                affectedRanges.addAll(addressRanges.get(endpoint));
+                // create clone of current state
+                BiMultiValMap<Token, InetAddress> bootstrapTokens = new BiMultiValMap<>();
+                Set<InetAddress> leavingEndpoints = new HashSet<>();
+                Set<Pair<Token, InetAddress>> movingEndpoints = new HashSet<>();
+                TokenMetadata metadata;
 
-            // for each of those ranges, find what new nodes will be responsible for the range when
-            // all leaving nodes are gone.
-            TokenMetadata metadata = cloneOnlyTokenMap(); // don't do this in the loop! #7758
-            for (Range<Token> range : affectedRanges)
-            {
-                Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, metadata));
-                Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata));
-                for (InetAddress address : Sets.difference(newEndpoints, currentEndpoints))
+                lock.readLock().lock();
+                try
+                {
+                    bootstrapTokens.putAll(this.bootstrapTokens);
+                    leavingEndpoints.addAll(this.leavingEndpoints);
+                    movingEndpoints.addAll(this.movingEndpoints);
+                    metadata = this.cloneOnlyTokenMap();
+                }
+                finally
                 {
-                    newPendingRanges.addPendingRange(range, address);
+                    lock.readLock().unlock();
                 }
+
+                pendingRanges.put(keyspaceName, calculatePendingRanges(strategy, metadata, bootstrapTokens,
+                                                                       leavingEndpoints, movingEndpoints));
+                long took = System.currentTimeMillis() - startedAt;
+
+                if (logger.isDebugEnabled())
+                    logger.debug("Pending range calculation for {} completed (took: {}ms)", keyspaceName, took);
+                if (logger.isTraceEnabled())
+                    logger.trace("Calculated pending ranges for {}:\n{}", keyspaceName, (pendingRanges.isEmpty() ? "<empty>" : printPendingRanges()));
             }
+        }
+    }
+
+    /**
+     * @see TokenMetadata#calculatePendingRanges(AbstractReplicationStrategy, String)
+     */
+    private static PendingRangeMaps calculatePendingRanges(AbstractReplicationStrategy strategy,
+                                                           TokenMetadata metadata,
+                                                           BiMultiValMap<Token, InetAddress> bootstrapTokens,
+                                                           Set<InetAddress> leavingEndpoints,
+                                                           Set<Pair<Token, InetAddress>> movingEndpoints)
+    {
+        PendingRangeMaps newPendingRanges = new PendingRangeMaps();
+
+        Multimap<InetAddress, Range<Token>> addressRanges = strategy.getAddressRanges(metadata);
 
-            // At this stage newPendingRanges has been updated according to leave operations. We can
-            // now continue the calculation by checking bootstrapping nodes.
+        // Copy of metadata reflecting the situation after all leave operations are finished.
+        TokenMetadata allLeftMetadata = removeEndpoints(metadata.cloneOnlyTokenMap(), leavingEndpoints);
 
-            // For each of the bootstrapping nodes, simply add and remove them one by one to
-            // allLeftMetadata and check in between what their ranges would be.
-            Multimap<InetAddress, Token> bootstrapAddresses = bootstrapTokens.inverse();
-            for (InetAddress endpoint : bootstrapAddresses.keySet())
+        // get all ranges that will be affected by leaving nodes
+        Set<Range<Token>> affectedRanges = new HashSet<Range<Token>>();
+        for (InetAddress endpoint : leavingEndpoints)
+            affectedRanges.addAll(addressRanges.get(endpoint));
+
+        // for each of those ranges, find what new nodes will be responsible for the range when
+        // all leaving nodes are gone.
+        for (Range<Token> range : affectedRanges)
+        {
+            Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, metadata));
+            Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata));
+            for (InetAddress address : Sets.difference(newEndpoints, currentEndpoints))
             {
-                Collection<Token> tokens = bootstrapAddresses.get(endpoint);
+                newPendingRanges.addPendingRange(range, address);
+            }
+        }
 
-                allLeftMetadata.updateNormalTokens(tokens, endpoint);
-                for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
-                {
-                    newPendingRanges.addPendingRange(range, endpoint);
-                }
-                allLeftMetadata.removeEndpoint(endpoint);
+        // At this stage newPendingRanges has been updated according to leave operations. We can
+        // now continue the calculation by checking bootstrapping nodes.
+
+        // For each of the bootstrapping nodes, simply add and remove them one by one to
+        // allLeftMetadata and check in between what their ranges would be.
+        Multimap<InetAddress, Token> bootstrapAddresses = bootstrapTokens.inverse();
+        for (InetAddress endpoint : bootstrapAddresses.keySet())
+        {
+            Collection<Token> tokens = bootstrapAddresses.get(endpoint);
+
+            allLeftMetadata.updateNormalTokens(tokens, endpoint);
+            for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
+            {
+                newPendingRanges.addPendingRange(range, endpoint);
             }
+            allLeftMetadata.removeEndpoint(endpoint);
+        }
 
-            // At this stage newPendingRanges has been updated according to leaving and bootstrapping nodes.
-            // We can now finish the calculation by checking moving nodes.
+        // At this stage newPendingRanges has been updated according to leaving and bootstrapping nodes.
+        // We can now finish the calculation by checking moving nodes.
 
-            // For each of the moving nodes, we do the same thing we did for bootstrapping:
-            // simply add and remove them one by one to allLeftMetadata and check in between what their ranges would be.
-            for (Pair<Token, InetAddress> moving : movingEndpoints)
+        // For each of the moving nodes, we do the same thing we did for bootstrapping:
+        // simply add and remove them one by one to allLeftMetadata and check in between what their ranges would be.
+        for (Pair<Token, InetAddress> moving : movingEndpoints)
+        {
+            //Calculate all the ranges which will could be affected. This will include the ranges before and after the move.
+            Set<Range<Token>> moveAffectedRanges = new HashSet<>();
+            InetAddress endpoint = moving.right; // address of the moving node
+            //Add ranges before the move
+            for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
             {
-                //Calculate all the ranges which will could be affected. This will include the ranges before and after the move.
-                Set<Range<Token>> moveAffectedRanges = new HashSet<>();
-                InetAddress endpoint = moving.right; // address of the moving node
-                //Add ranges before the move
-                for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
-                {
-                    moveAffectedRanges.add(range);
-                }
+                moveAffectedRanges.add(range);
+            }
 
-                allLeftMetadata.updateNormalToken(moving.left, endpoint);
-                //Add ranges after the move
-                for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
-                {
-                    moveAffectedRanges.add(range);
-                }
+            allLeftMetadata.updateNormalToken(moving.left, endpoint);
+            //Add ranges after the move
+            for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
+            {
+                moveAffectedRanges.add(range);
+            }
 
-                for(Range<Token> range : moveAffectedRanges)
+            for(Range<Token> range : moveAffectedRanges)
+            {
+                Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, metadata));
+                Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata));
+                Set<InetAddress> difference = Sets.difference(newEndpoints, currentEndpoints);
+                for(final InetAddress address : difference)
                 {
-                    Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, metadata));
-                    Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata));
-                    Set<InetAddress> difference = Sets.difference(newEndpoints, currentEndpoints);
-                    for(final InetAddress address : difference)
-                    {
-                        Collection<Range<Token>> newRanges = strategy.getAddressRanges(allLeftMetadata).get(address);
-                        Collection<Range<Token>> oldRanges = strategy.getAddressRanges(metadata).get(address);
-                        //We want to get rid of any ranges which the node is currently getting.
-                        newRanges.removeAll(oldRanges);
+                    Collection<Range<Token>> newRanges = strategy.getAddressRanges(allLeftMetadata).get(address);
+                    Collection<Range<Token>> oldRanges = strategy.getAddressRanges(metadata).get(address);
+                    //We want to get rid of any ranges which the node is currently getting.
+                    newRanges.removeAll(oldRanges);
 
-                        for(Range<Token> newRange : newRanges)
+                    for(Range<Token> newRange : newRanges)
+                    {
+                        for(Range<Token> pendingRange : newRange.subtractAll(oldRanges))
                         {
-                            for(Range<Token> pendingRange : newRange.subtractAll(oldRanges))
-                            {
-                                newPendingRanges.addPendingRange(pendingRange, address);
-                            }
+                            newPendingRanges.addPendingRange(pendingRange, address);
                         }
                     }
                 }
-
-                allLeftMetadata.removeEndpoint(endpoint);
             }
 
-            pendingRanges.put(keyspaceName, newPendingRanges);
-
-            if (logger.isTraceEnabled())
-                logger.trace("Pending ranges:\n{}", (pendingRanges.isEmpty() ? "<empty>" : printPendingRanges()));
-        }
-        finally
-        {
-            lock.readLock().unlock();
+            allLeftMetadata.removeEndpoint(endpoint);
         }
+
+        return newPendingRanges;
     }
 
     public Token getPredecessor(Token token)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9cd7d540/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
index 806f6a5..116cede 100644
--- a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
+++ b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
@@ -26,6 +26,7 @@ import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.List;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -34,6 +35,9 @@ public class PendingRangeCalculatorService
     public static final PendingRangeCalculatorService instance = new PendingRangeCalculatorService();
 
     private static Logger logger = LoggerFactory.getLogger(PendingRangeCalculatorService.class);
+
+    // the executor will only run a single range calculation at a time while keeping at most one task queued in order
+    // to trigger an update only after the most recent state change and not for each update individually
     private final JMXEnabledThreadPoolExecutor executor = new JMXEnabledThreadPoolExecutor(1, Integer.MAX_VALUE, TimeUnit.SECONDS,
             new LinkedBlockingQueue<Runnable>(1), new NamedThreadFactory("PendingRangeCalculator"), "internal");
 
@@ -58,9 +62,11 @@ public class PendingRangeCalculatorService
             try
             {
                 long start = System.currentTimeMillis();
-                for (String keyspaceName : Schema.instance.getNonSystemKeyspaces())
+                List<String> keyspaces = Schema.instance.getNonSystemKeyspaces();
+                for (String keyspaceName : keyspaces)
                     calculatePendingRanges(Keyspace.open(keyspaceName).getReplicationStrategy(), keyspaceName);
-                logger.debug("finished calculation for {} keyspaces in {}ms", Schema.instance.getNonSystemKeyspaces().size(), System.currentTimeMillis() - start);
+                if (logger.isTraceEnabled())
+                    logger.trace("Finished PendingRangeTask for {} keyspaces in {}ms", keyspaces.size(), System.currentTimeMillis() - start);
             }
             finally
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9cd7d540/test/unit/org/apache/cassandra/gms/PendingRangeCalculatorServiceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/gms/PendingRangeCalculatorServiceTest.java b/test/unit/org/apache/cassandra/gms/PendingRangeCalculatorServiceTest.java
new file mode 100644
index 0000000..507948c
--- /dev/null
+++ b/test/unit/org/apache/cassandra/gms/PendingRangeCalculatorServiceTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.service.StorageService;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+
+/**
+ * Test for "Gossip blocks on startup when another node is bootstrapping" (CASSANDRA-12281).
+ */
+@RunWith(BMUnitRunner.class)
+public class PendingRangeCalculatorServiceTest
+{
+    static ReentrantLock calculationLock = new ReentrantLock();
+
+    @BeforeClass
+    public static void setUp() throws ConfigurationException
+    {
+        SchemaLoader.prepareServer();
+        StorageService.instance.initServer();
+    }
+
+    @Test
+    @BMRule(name = "Block pending range calculation",
+            targetClass = "TokenMetadata",
+            targetMethod = "calculatePendingRanges",
+            targetLocation = "AT INVOKE org.apache.cassandra.locator.AbstractReplicationStrategy.getAddressRanges",
+            action = "org.apache.cassandra.gms.PendingRangeCalculatorServiceTest.calculationLock.lock()")
+    public void testDelayedResponse() throws UnknownHostException, InterruptedException
+    {
+        final InetAddress otherNodeAddr = InetAddress.getByName("127.0.0.2");
+        final UUID otherHostId = UUID.randomUUID();
+
+        // introduce node for first major state change
+        Gossiper.instance.applyStateLocally(getStates(otherNodeAddr, otherHostId, 1, false));
+
+        // acquire lock to block pending range calculation via byteman
+        calculationLock.lock();
+        try
+        {
+            // spawn thread that will trigger handling of a bootstrap state change which in turn will trigger
+            // the pending range calculation that will be blocked by our lock
+            Thread t1 = new Thread()
+            {
+                public void run()
+                {
+                    Gossiper.instance.applyStateLocally(getStates(otherNodeAddr, otherHostId, 2, true));
+                }
+            };
+            t1.start();
+
+            // busy-spin until t1 is blocked by lock
+            while (!calculationLock.hasQueuedThreads()) ;
+
+            // trigger further state changes in case we don't want the blocked thread from the
+            // expensive range calculation to block us here as well
+            Thread t2 = new Thread()
+            {
+                public void run()
+                {
+                    Gossiper.instance.applyStateLocally(getStates(otherNodeAddr, otherHostId, 3, false));
+                    Gossiper.instance.applyStateLocally(getStates(otherNodeAddr, otherHostId, 4, false));
+                    Gossiper.instance.applyStateLocally(getStates(otherNodeAddr, otherHostId, 5, false));
+                }
+            };
+            t2.start();
+            t2.join(2000);
+            assertFalse("Thread still blocked by pending range calculation", t2.isAlive());
+            assertEquals(5, Gossiper.instance.getEndpointStateForEndpoint(otherNodeAddr).getHeartBeatState().getHeartBeatVersion());
+        }
+        finally
+        {
+            calculationLock.unlock();
+        }
+    }
+
+    private Map<InetAddress, EndpointState> getStates(InetAddress otherNodeAddr, UUID hostId, int ver, boolean bootstrapping)
+    {
+        HeartBeatState hb = new HeartBeatState(1, ver);
+        EndpointState state = new EndpointState(hb);
+        Collection<Token> tokens = new ArrayList<>();
+
+        tokens.add(new ByteOrderedPartitioner.BytesToken(new byte[]{1,2,3}));
+        state.addApplicationState(ApplicationState.TOKENS, StorageService.instance.valueFactory.tokens(tokens));
+        state.addApplicationState(ApplicationState.STATUS, bootstrapping ?
+                StorageService.instance.valueFactory.bootstrapping(tokens) : StorageService.instance.valueFactory.normal(tokens));
+        state.addApplicationState(ApplicationState.HOST_ID, StorageService.instance.valueFactory.hostId(hostId));
+        state.addApplicationState(ApplicationState.NET_VERSION, StorageService.instance.valueFactory.networkVersion());
+
+        Map<InetAddress, EndpointState> states = new HashMap<>();
+        states.put(otherNodeAddr, state);
+        return states;
+    }
+}