You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2016/11/18 17:39:09 UTC

[04/10] cassandra git commit: Avoid blocking gossip during pending range calculation

Avoid blocking gossip during pending range calculation

patch by Stefan Podkowinski; reviewed by Joel Knighton for CASSANDRA-12281


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9cd7d540
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9cd7d540
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9cd7d540

Branch: refs/heads/trunk
Commit: 9cd7d540de2ea525982d139a4c8a11233c4e98c9
Parents: 7d2fdfe
Author: Stefan Podkowinski <s....@gmail.com>
Authored: Fri Oct 21 11:34:53 2016 +0200
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Fri Nov 18 17:29:14 2016 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/locator/TokenMetadata.java | 211 +++++++++++--------
 .../service/PendingRangeCalculatorService.java  |  10 +-
 .../gms/PendingRangeCalculatorServiceTest.java  | 133 ++++++++++++
 4 files changed, 267 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9cd7d540/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 54dc4b5..5a2e0ab 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.9
+ * Avoid blocking gossip during pending range calculation (CASSANDRA-12281)
  * Fix purgeability of tombstones with max timestamp (CASSANDRA-12792)
  * Fail repair if participant dies during sync or anticompaction (CASSANDRA-12901)
  * cqlsh COPY: unprotected pk values before converting them if not using prepared statements (CASSANDRA-12863)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9cd7d540/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index b06c9c8..aafd7f9 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -649,12 +649,7 @@ public class TokenMetadata
         lock.readLock().lock();
         try
         {
-            TokenMetadata allLeftMetadata = cloneOnlyTokenMap();
-
-            for (InetAddress endpoint : leavingEndpoints)
-                allLeftMetadata.removeEndpoint(endpoint);
-
-            return allLeftMetadata;
+            return removeEndpoints(cloneOnlyTokenMap(), leavingEndpoints);
         }
         finally
         {
@@ -662,6 +657,14 @@ public class TokenMetadata
         }
     }
 
+    private static TokenMetadata removeEndpoints(TokenMetadata allLeftMetadata, Set<InetAddress> leavingEndpoints)
+    {
+        for (InetAddress endpoint : leavingEndpoints)
+            allLeftMetadata.removeEndpoint(endpoint);
+
+        return allLeftMetadata;
+    }
+
     /**
      * Create a copy of TokenMetadata with tokenToEndpointMap reflecting situation after all
      * current leave, and move operations have finished.
@@ -787,118 +790,154 @@ public class TokenMetadata
      */
     public void calculatePendingRanges(AbstractReplicationStrategy strategy, String keyspaceName)
     {
-        lock.readLock().lock();
-        try
+        // avoid race between both branches - do not use a lock here as this will block any other unrelated operations!
+        synchronized (pendingRanges)
         {
-            PendingRangeMaps newPendingRanges = new PendingRangeMaps();
-
             if (bootstrapTokens.isEmpty() && leavingEndpoints.isEmpty() && movingEndpoints.isEmpty())
             {
                 if (logger.isTraceEnabled())
                     logger.trace("No bootstrapping, leaving or moving nodes -> empty pending ranges for {}", keyspaceName);
 
-                pendingRanges.put(keyspaceName, newPendingRanges);
-                return;
+                pendingRanges.put(keyspaceName, new PendingRangeMaps());
             }
+            else
+            {
+                if (logger.isDebugEnabled())
+                    logger.debug("Starting pending range calculation for {}", keyspaceName);
 
-            Multimap<InetAddress, Range<Token>> addressRanges = strategy.getAddressRanges();
-
-            // Copy of metadata reflecting the situation after all leave operations are finished.
-            TokenMetadata allLeftMetadata = cloneAfterAllLeft();
+                long startedAt = System.currentTimeMillis();
 
-            // get all ranges that will be affected by leaving nodes
-            Set<Range<Token>> affectedRanges = new HashSet<Range<Token>>();
-            for (InetAddress endpoint : leavingEndpoints)
-                affectedRanges.addAll(addressRanges.get(endpoint));
+                // create clone of current state
+                BiMultiValMap<Token, InetAddress> bootstrapTokens = new BiMultiValMap<>();
+                Set<InetAddress> leavingEndpoints = new HashSet<>();
+                Set<Pair<Token, InetAddress>> movingEndpoints = new HashSet<>();
+                TokenMetadata metadata;
 
-            // for each of those ranges, find what new nodes will be responsible for the range when
-            // all leaving nodes are gone.
-            TokenMetadata metadata = cloneOnlyTokenMap(); // don't do this in the loop! #7758
-            for (Range<Token> range : affectedRanges)
-            {
-                Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, metadata));
-                Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata));
-                for (InetAddress address : Sets.difference(newEndpoints, currentEndpoints))
+                lock.readLock().lock();
+                try
+                {
+                    bootstrapTokens.putAll(this.bootstrapTokens);
+                    leavingEndpoints.addAll(this.leavingEndpoints);
+                    movingEndpoints.addAll(this.movingEndpoints);
+                    metadata = this.cloneOnlyTokenMap();
+                }
+                finally
                 {
-                    newPendingRanges.addPendingRange(range, address);
+                    lock.readLock().unlock();
                 }
+
+                pendingRanges.put(keyspaceName, calculatePendingRanges(strategy, metadata, bootstrapTokens,
+                                                                       leavingEndpoints, movingEndpoints));
+                long took = System.currentTimeMillis() - startedAt;
+
+                if (logger.isDebugEnabled())
+                    logger.debug("Pending range calculation for {} completed (took: {}ms)", keyspaceName, took);
+                if (logger.isTraceEnabled())
+                    logger.trace("Calculated pending ranges for {}:\n{}", keyspaceName, (pendingRanges.isEmpty() ? "<empty>" : printPendingRanges()));
             }
+        }
+    }
+
+    /**
+     * @see TokenMetadata#calculatePendingRanges(AbstractReplicationStrategy, String)
+     */
+    private static PendingRangeMaps calculatePendingRanges(AbstractReplicationStrategy strategy,
+                                                           TokenMetadata metadata,
+                                                           BiMultiValMap<Token, InetAddress> bootstrapTokens,
+                                                           Set<InetAddress> leavingEndpoints,
+                                                           Set<Pair<Token, InetAddress>> movingEndpoints)
+    {
+        PendingRangeMaps newPendingRanges = new PendingRangeMaps();
+
+        Multimap<InetAddress, Range<Token>> addressRanges = strategy.getAddressRanges(metadata);
 
-            // At this stage newPendingRanges has been updated according to leave operations. We can
-            // now continue the calculation by checking bootstrapping nodes.
+        // Copy of metadata reflecting the situation after all leave operations are finished.
+        TokenMetadata allLeftMetadata = removeEndpoints(metadata.cloneOnlyTokenMap(), leavingEndpoints);
 
-            // For each of the bootstrapping nodes, simply add and remove them one by one to
-            // allLeftMetadata and check in between what their ranges would be.
-            Multimap<InetAddress, Token> bootstrapAddresses = bootstrapTokens.inverse();
-            for (InetAddress endpoint : bootstrapAddresses.keySet())
+        // get all ranges that will be affected by leaving nodes
+        Set<Range<Token>> affectedRanges = new HashSet<Range<Token>>();
+        for (InetAddress endpoint : leavingEndpoints)
+            affectedRanges.addAll(addressRanges.get(endpoint));
+
+        // for each of those ranges, find what new nodes will be responsible for the range when
+        // all leaving nodes are gone.
+        for (Range<Token> range : affectedRanges)
+        {
+            Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, metadata));
+            Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata));
+            for (InetAddress address : Sets.difference(newEndpoints, currentEndpoints))
             {
-                Collection<Token> tokens = bootstrapAddresses.get(endpoint);
+                newPendingRanges.addPendingRange(range, address);
+            }
+        }
 
-                allLeftMetadata.updateNormalTokens(tokens, endpoint);
-                for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
-                {
-                    newPendingRanges.addPendingRange(range, endpoint);
-                }
-                allLeftMetadata.removeEndpoint(endpoint);
+        // At this stage newPendingRanges has been updated according to leave operations. We can
+        // now continue the calculation by checking bootstrapping nodes.
+
+        // For each of the bootstrapping nodes, simply add and remove them one by one to
+        // allLeftMetadata and check in between what their ranges would be.
+        Multimap<InetAddress, Token> bootstrapAddresses = bootstrapTokens.inverse();
+        for (InetAddress endpoint : bootstrapAddresses.keySet())
+        {
+            Collection<Token> tokens = bootstrapAddresses.get(endpoint);
+
+            allLeftMetadata.updateNormalTokens(tokens, endpoint);
+            for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
+            {
+                newPendingRanges.addPendingRange(range, endpoint);
             }
+            allLeftMetadata.removeEndpoint(endpoint);
+        }
 
-            // At this stage newPendingRanges has been updated according to leaving and bootstrapping nodes.
-            // We can now finish the calculation by checking moving nodes.
+        // At this stage newPendingRanges has been updated according to leaving and bootstrapping nodes.
+        // We can now finish the calculation by checking moving nodes.
 
-            // For each of the moving nodes, we do the same thing we did for bootstrapping:
-            // simply add and remove them one by one to allLeftMetadata and check in between what their ranges would be.
-            for (Pair<Token, InetAddress> moving : movingEndpoints)
+        // For each of the moving nodes, we do the same thing we did for bootstrapping:
+        // simply add and remove them one by one to allLeftMetadata and check in between what their ranges would be.
+        for (Pair<Token, InetAddress> moving : movingEndpoints)
+        {
+            //Calculate all the ranges which will could be affected. This will include the ranges before and after the move.
+            Set<Range<Token>> moveAffectedRanges = new HashSet<>();
+            InetAddress endpoint = moving.right; // address of the moving node
+            //Add ranges before the move
+            for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
             {
-                //Calculate all the ranges which will could be affected. This will include the ranges before and after the move.
-                Set<Range<Token>> moveAffectedRanges = new HashSet<>();
-                InetAddress endpoint = moving.right; // address of the moving node
-                //Add ranges before the move
-                for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
-                {
-                    moveAffectedRanges.add(range);
-                }
+                moveAffectedRanges.add(range);
+            }
 
-                allLeftMetadata.updateNormalToken(moving.left, endpoint);
-                //Add ranges after the move
-                for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
-                {
-                    moveAffectedRanges.add(range);
-                }
+            allLeftMetadata.updateNormalToken(moving.left, endpoint);
+            //Add ranges after the move
+            for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
+            {
+                moveAffectedRanges.add(range);
+            }
 
-                for(Range<Token> range : moveAffectedRanges)
+            for(Range<Token> range : moveAffectedRanges)
+            {
+                Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, metadata));
+                Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata));
+                Set<InetAddress> difference = Sets.difference(newEndpoints, currentEndpoints);
+                for(final InetAddress address : difference)
                 {
-                    Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, metadata));
-                    Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata));
-                    Set<InetAddress> difference = Sets.difference(newEndpoints, currentEndpoints);
-                    for(final InetAddress address : difference)
-                    {
-                        Collection<Range<Token>> newRanges = strategy.getAddressRanges(allLeftMetadata).get(address);
-                        Collection<Range<Token>> oldRanges = strategy.getAddressRanges(metadata).get(address);
-                        //We want to get rid of any ranges which the node is currently getting.
-                        newRanges.removeAll(oldRanges);
+                    Collection<Range<Token>> newRanges = strategy.getAddressRanges(allLeftMetadata).get(address);
+                    Collection<Range<Token>> oldRanges = strategy.getAddressRanges(metadata).get(address);
+                    //We want to get rid of any ranges which the node is currently getting.
+                    newRanges.removeAll(oldRanges);
 
-                        for(Range<Token> newRange : newRanges)
+                    for(Range<Token> newRange : newRanges)
+                    {
+                        for(Range<Token> pendingRange : newRange.subtractAll(oldRanges))
                         {
-                            for(Range<Token> pendingRange : newRange.subtractAll(oldRanges))
-                            {
-                                newPendingRanges.addPendingRange(pendingRange, address);
-                            }
+                            newPendingRanges.addPendingRange(pendingRange, address);
                         }
                     }
                 }
-
-                allLeftMetadata.removeEndpoint(endpoint);
             }
 
-            pendingRanges.put(keyspaceName, newPendingRanges);
-
-            if (logger.isTraceEnabled())
-                logger.trace("Pending ranges:\n{}", (pendingRanges.isEmpty() ? "<empty>" : printPendingRanges()));
-        }
-        finally
-        {
-            lock.readLock().unlock();
+            allLeftMetadata.removeEndpoint(endpoint);
         }
+
+        return newPendingRanges;
     }
 
     public Token getPredecessor(Token token)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9cd7d540/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
index 806f6a5..116cede 100644
--- a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
+++ b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
@@ -26,6 +26,7 @@ import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.List;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -34,6 +35,9 @@ public class PendingRangeCalculatorService
     public static final PendingRangeCalculatorService instance = new PendingRangeCalculatorService();
 
     private static Logger logger = LoggerFactory.getLogger(PendingRangeCalculatorService.class);
+
+    // the executor will only run a single range calculation at a time while keeping at most one task queued in order
+    // to trigger an update only after the most recent state change and not for each update individually
     private final JMXEnabledThreadPoolExecutor executor = new JMXEnabledThreadPoolExecutor(1, Integer.MAX_VALUE, TimeUnit.SECONDS,
             new LinkedBlockingQueue<Runnable>(1), new NamedThreadFactory("PendingRangeCalculator"), "internal");
 
@@ -58,9 +62,11 @@ public class PendingRangeCalculatorService
             try
             {
                 long start = System.currentTimeMillis();
-                for (String keyspaceName : Schema.instance.getNonSystemKeyspaces())
+                List<String> keyspaces = Schema.instance.getNonSystemKeyspaces();
+                for (String keyspaceName : keyspaces)
                     calculatePendingRanges(Keyspace.open(keyspaceName).getReplicationStrategy(), keyspaceName);
-                logger.debug("finished calculation for {} keyspaces in {}ms", Schema.instance.getNonSystemKeyspaces().size(), System.currentTimeMillis() - start);
+                if (logger.isTraceEnabled())
+                    logger.trace("Finished PendingRangeTask for {} keyspaces in {}ms", keyspaces.size(), System.currentTimeMillis() - start);
             }
             finally
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9cd7d540/test/unit/org/apache/cassandra/gms/PendingRangeCalculatorServiceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/gms/PendingRangeCalculatorServiceTest.java b/test/unit/org/apache/cassandra/gms/PendingRangeCalculatorServiceTest.java
new file mode 100644
index 0000000..507948c
--- /dev/null
+++ b/test/unit/org/apache/cassandra/gms/PendingRangeCalculatorServiceTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.service.StorageService;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+
+/**
+ * Test for "Gossip blocks on startup when another node is bootstrapping" (CASSANDRA-12281).
+ */
+@RunWith(BMUnitRunner.class)
+public class PendingRangeCalculatorServiceTest
+{
+    static ReentrantLock calculationLock = new ReentrantLock();
+
+    @BeforeClass
+    public static void setUp() throws ConfigurationException
+    {
+        SchemaLoader.prepareServer();
+        StorageService.instance.initServer();
+    }
+
+    @Test
+    @BMRule(name = "Block pending range calculation",
+            targetClass = "TokenMetadata",
+            targetMethod = "calculatePendingRanges",
+            targetLocation = "AT INVOKE org.apache.cassandra.locator.AbstractReplicationStrategy.getAddressRanges",
+            action = "org.apache.cassandra.gms.PendingRangeCalculatorServiceTest.calculationLock.lock()")
+    public void testDelayedResponse() throws UnknownHostException, InterruptedException
+    {
+        final InetAddress otherNodeAddr = InetAddress.getByName("127.0.0.2");
+        final UUID otherHostId = UUID.randomUUID();
+
+        // introduce node for first major state change
+        Gossiper.instance.applyStateLocally(getStates(otherNodeAddr, otherHostId, 1, false));
+
+        // acquire lock to block pending range calculation via byteman
+        calculationLock.lock();
+        try
+        {
+            // spawn thread that will trigger handling of a bootstrap state change which in turn will trigger
+            // the pending range calculation that will be blocked by our lock
+            Thread t1 = new Thread()
+            {
+                public void run()
+                {
+                    Gossiper.instance.applyStateLocally(getStates(otherNodeAddr, otherHostId, 2, true));
+                }
+            };
+            t1.start();
+
+            // busy-spin until t1 is blocked by lock
+            while (!calculationLock.hasQueuedThreads()) ;
+
+            // trigger further state changes in case we don't want the blocked thread from the
+            // expensive range calculation to block us here as well
+            Thread t2 = new Thread()
+            {
+                public void run()
+                {
+                    Gossiper.instance.applyStateLocally(getStates(otherNodeAddr, otherHostId, 3, false));
+                    Gossiper.instance.applyStateLocally(getStates(otherNodeAddr, otherHostId, 4, false));
+                    Gossiper.instance.applyStateLocally(getStates(otherNodeAddr, otherHostId, 5, false));
+                }
+            };
+            t2.start();
+            t2.join(2000);
+            assertFalse("Thread still blocked by pending range calculation", t2.isAlive());
+            assertEquals(5, Gossiper.instance.getEndpointStateForEndpoint(otherNodeAddr).getHeartBeatState().getHeartBeatVersion());
+        }
+        finally
+        {
+            calculationLock.unlock();
+        }
+    }
+
+    private Map<InetAddress, EndpointState> getStates(InetAddress otherNodeAddr, UUID hostId, int ver, boolean bootstrapping)
+    {
+        HeartBeatState hb = new HeartBeatState(1, ver);
+        EndpointState state = new EndpointState(hb);
+        Collection<Token> tokens = new ArrayList<>();
+
+        tokens.add(new ByteOrderedPartitioner.BytesToken(new byte[]{1,2,3}));
+        state.addApplicationState(ApplicationState.TOKENS, StorageService.instance.valueFactory.tokens(tokens));
+        state.addApplicationState(ApplicationState.STATUS, bootstrapping ?
+                StorageService.instance.valueFactory.bootstrapping(tokens) : StorageService.instance.valueFactory.normal(tokens));
+        state.addApplicationState(ApplicationState.HOST_ID, StorageService.instance.valueFactory.hostId(hostId));
+        state.addApplicationState(ApplicationState.NET_VERSION, StorageService.instance.valueFactory.networkVersion());
+
+        Map<InetAddress, EndpointState> states = new HashMap<>();
+        states.put(otherNodeAddr, state);
+        return states;
+    }
+}