You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2022/01/17 13:19:46 UTC

[cassandra] branch cassandra-3.0 updated: Avoid race in AbstractReplicationStrategy endpoint caching

This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-3.0 by this push:
     new b1a8a56  Avoid race in AbstractReplicationStrategy endpoint caching
b1a8a56 is described below

commit b1a8a56c563b85ab9a34d3bbf9c16278dd441157
Author: Marcus Eriksson <ma...@apache.org>
AuthorDate: Tue May 4 09:56:32 2021 +0200

    Avoid race in AbstractReplicationStrategy endpoint caching
    
    Patch by marcuse; reviewed by Alex Petrov and Jon Meredith for CASSANDRA-16673
    
    Co-authored-by: Jon Meredith <jo...@apache.org>
---
 CHANGES.txt                                        |   1 +
 .../locator/AbstractReplicationStrategy.java       |  98 ++++++++++++-----
 .../apache/cassandra/locator/TokenMetadata.java    |   9 +-
 .../distributed/test/ReadsDuringBootstrapTest.java | 120 +++++++++++++++++++++
 .../locator/AbstractReplicationStrategyTest.java   |  45 ++++++++
 5 files changed, 246 insertions(+), 27 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 20fbb32..e23f5e7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.26:
+ * Avoid race in AbstractReplicationStrategy endpoint caching (CASSANDRA-16673)
  * Fix abort when window resizing during cqlsh COPY (CASSANDRA-15230)
  * Fix slow keycache load which blocks startup for tables with many sstables (CASSANDRA-14898)
  * Fix rare NPE caused by batchlog replay / node decomission races (CASSANDRA-17049)
diff --git a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
index b326e1c..709de20 100644
--- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
+++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
@@ -21,6 +21,10 @@ import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.net.InetAddress;
 import java.util.*;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.HashMultimap;
@@ -54,10 +58,7 @@ public abstract class AbstractReplicationStrategy
     private Keyspace keyspace;
     public final Map<String, String> configOptions;
     private final TokenMetadata tokenMetadata;
-
-    // track when the token range changes, signaling we need to invalidate our endpoint cache
-    private volatile long lastInvalidatedVersion = 0;
-
+    private final ReplicaCache<Token, ArrayList<InetAddress>> replicas = new ReplicaCache<>();
     public IEndpointSnitch snitch;
 
     protected AbstractReplicationStrategy(String keyspaceName, TokenMetadata tokenMetadata, IEndpointSnitch snitch, Map<String, String> configOptions)
@@ -72,26 +73,9 @@ public abstract class AbstractReplicationStrategy
         // lazy-initialize keyspace itself since we don't create them until after the replication strategies
     }
 
-    private final Map<Token, ArrayList<InetAddress>> cachedEndpoints = new NonBlockingHashMap<Token, ArrayList<InetAddress>>();
-
-    public ArrayList<InetAddress> getCachedEndpoints(Token t)
+    private ArrayList<InetAddress> getCachedEndpoints(long ringVersion, Token t)
     {
-        long lastVersion = tokenMetadata.getRingVersion();
-
-        if (lastVersion > lastInvalidatedVersion)
-        {
-            synchronized (this)
-            {
-                if (lastVersion > lastInvalidatedVersion)
-                {
-                    logger.trace("clearing cached endpoints");
-                    cachedEndpoints.clear();
-                    lastInvalidatedVersion = lastVersion;
-                }
-            }
-        }
-
-        return cachedEndpoints.get(t);
+        return replicas.get(ringVersion, t);
     }
 
     /**
@@ -104,18 +88,19 @@ public abstract class AbstractReplicationStrategy
     public ArrayList<InetAddress> getNaturalEndpoints(RingPosition searchPosition)
     {
         Token searchToken = searchPosition.getToken();
+        long currentRingVersion = tokenMetadata.getRingVersion();
         Token keyToken = TokenMetadata.firstToken(tokenMetadata.sortedTokens(), searchToken);
-        ArrayList<InetAddress> endpoints = getCachedEndpoints(keyToken);
+        ArrayList<InetAddress> endpoints = getCachedEndpoints(currentRingVersion, keyToken);
         if (endpoints == null)
         {
             TokenMetadata tm = tokenMetadata.cachedOnlyTokenMap();
             // if our cache got invalidated, it's possible there is a new token to account for too
             keyToken = TokenMetadata.firstToken(tm.sortedTokens(), searchToken);
             endpoints = new ArrayList<InetAddress>(calculateNaturalEndpoints(searchToken, tm));
-            cachedEndpoints.put(keyToken, endpoints);
+            replicas.put(tm.getRingVersion(), keyToken, endpoints);
         }
 
-        return new ArrayList<InetAddress>(endpoints);
+        return new ArrayList<>(endpoints);
     }
 
     /**
@@ -331,4 +316,65 @@ public abstract class AbstractReplicationStrategy
                 throw new ConfigurationException(String.format("Unrecognized strategy option {%s} passed to %s for keyspace %s", key, getClass().getSimpleName(), keyspaceName));
         }
     }
+
+    @VisibleForTesting
+    public static class ReplicaCache<K, V>
+    {
+        private final AtomicReference<ReplicaHolder<K, V>> cachedReplicas = new AtomicReference<>(new ReplicaHolder<>(0, 4));
+
+        V get(long ringVersion, K keyToken)
+        {
+            ReplicaHolder<K, V> replicaHolder = maybeClearAndGet(ringVersion);
+            if (replicaHolder == null)
+                return null;
+
+            return replicaHolder.replicas.get(keyToken);
+        }
+
+        void put(long ringVersion, K keyToken, V endpoints)
+        {
+            ReplicaHolder<K, V> current = maybeClearAndGet(ringVersion);
+            if (current != null)
+            {
+                // if we have the same ringVersion, but already know about the keyToken the endpoints should be the same
+                current.replicas.putIfAbsent(keyToken, endpoints);
+            }
+        }
+
+        ReplicaHolder<K, V> maybeClearAndGet(long ringVersion)
+        {
+            ReplicaHolder<K, V> current = cachedReplicas.get();
+            if (ringVersion == current.ringVersion)
+                return current;
+            else if (ringVersion < current.ringVersion) // things have already moved on
+                return null;
+
+            // If ring version has changed, create a fresh replica holder and try to replace the current one.
+            // This may race with other threads that have the same new ring version and one will win and the loosers
+            // will be garbage collected
+            ReplicaHolder<K, V> cleaned = new ReplicaHolder<>(ringVersion, current.replicas.size());
+            cachedReplicas.compareAndSet(current, cleaned);
+
+            // A new ring version may have come along while making the new holder, so re-check the
+            // reference and return the ring version if the same, otherwise return null as there is no point
+            // in using it.
+            current = cachedReplicas.get();
+            if (ringVersion == current.ringVersion)
+                return current;
+            else
+                return null;
+        }
+    }
+
+    static class ReplicaHolder<K, V>
+    {
+        private final long ringVersion;
+        private final NonBlockingHashMap<K, V> replicas;
+
+        ReplicaHolder(long ringVersion, int expectedEntries)
+        {
+            this.ringVersion = ringVersion;
+            this.replicas = new NonBlockingHashMap<>(expectedEntries);
+        }
+    }
 }
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index da5f5e3..2d7afcd 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -125,11 +125,17 @@ public class TokenMetadata
 
     private TokenMetadata(BiMultiValMap<Token, InetAddress> tokenToEndpointMap, BiMap<InetAddress, UUID> endpointsMap, Topology topology, IPartitioner partitioner)
     {
+        this(tokenToEndpointMap, endpointsMap, topology, partitioner, 0);
+    }
+
+    private TokenMetadata(BiMultiValMap<Token, InetAddress> tokenToEndpointMap, BiMap<InetAddress, UUID> endpointsMap, Topology topology, IPartitioner partitioner, long ringVersion)
+    {
         this.tokenToEndpointMap = tokenToEndpointMap;
         this.topology = topology;
         this.partitioner = partitioner;
         endpointToHostIdMap = endpointsMap;
         sortedTokens = sortTokens();
+        this.ringVersion = ringVersion;
     }
 
     /**
@@ -643,7 +649,8 @@ public class TokenMetadata
             return new TokenMetadata(SortedBiMultiValMap.create(tokenToEndpointMap, null, inetaddressCmp),
                                      HashBiMap.create(endpointToHostIdMap),
                                      topology,
-                                     partitioner);
+                                     partitioner,
+                                     ringVersion);
         }
         finally
         {
diff --git a/test/distributed/org/apache/cassandra/distributed/test/ReadsDuringBootstrapTest.java b/test/distributed/org/apache/cassandra/distributed/test/ReadsDuringBootstrapTest.java
new file mode 100644
index 0000000..92f892c
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/ReadsDuringBootstrapTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.Test;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.FieldValue;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.shared.NetworkTopology;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+
+public class ReadsDuringBootstrapTest extends TestBaseImpl
+{
+    @Test
+    public void readsDuringBootstrapTest() throws IOException, ExecutionException, InterruptedException, TimeoutException
+    {
+        int originalNodeCount = 3;
+        int expandedNodeCount = originalNodeCount + 1;
+        ExecutorService es = Executors.newSingleThreadExecutor();
+        try (Cluster cluster = builder().withNodes(originalNodeCount)
+                                        .withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount))
+                                        .withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, "dc0", "rack0"))
+                                        .withConfig(config -> config.with(NETWORK, GOSSIP)
+                                                                    .set("read_request_timeout_in_ms", Integer.MAX_VALUE)
+                                                                    .set("request_timeout_in_ms", Integer.MAX_VALUE))
+                                        .withInstanceInitializer(BB::install)
+                                        .start())
+        {
+            String query = withKeyspace("SELECT * FROM %s.tbl WHERE id = ?");
+            cluster.schemaChange(withKeyspace("CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 2};"));
+            cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (id int PRIMARY KEY)"));
+            cluster.get(1).runOnInstance(() -> BB.block.set(true));
+            Future<?> read = es.submit(() -> cluster.coordinator(1).execute(query, ConsistencyLevel.QUORUM, 3));
+            long mark = cluster.get(1).logs().mark();
+
+            IInstanceConfig config = cluster.newInstanceConfig();
+            config.set("auto_bootstrap", true);
+            IInvokableInstance bootstrapInstance = cluster.bootstrap(config);
+            bootstrapInstance.startup();
+
+            cluster.get(1).logs().watchFor(mark, "New node /127.0.0.4");
+            cluster.get(1).runOnInstance(() -> BB.block.set(false));
+            // populate cache
+            for (int i = 0; i < 10; i++)
+                cluster.coordinator(1).execute(query, ConsistencyLevel.QUORUM, i);
+            cluster.get(1).runOnInstance(() -> BB.latch.countDown());
+            read.get();
+        }
+        finally
+        {
+            es.shutdown();
+        }
+    }
+
+    public static class BB
+    {
+        public static final AtomicBoolean block = new AtomicBoolean();
+        public static final CountDownLatch latch = new CountDownLatch(1);
+        private static void install(ClassLoader cl, Integer instanceId)
+        {
+            if (instanceId != 1)
+                return;
+            new ByteBuddy().rebase(AbstractReplicationStrategy.class)
+                           .method(named("get"))
+                           .intercept(MethodDelegation.to(BB.class))
+                           .make()
+                           .load(cl, ClassLoadingStrategy.Default.INJECTION);
+        }
+
+        public static ArrayList<InetAddress> getCachedEndpoints(long ringVersion, Token t,
+                                                                @FieldValue("keyspaceName") String keyspaceName,
+                                                                @SuperCall Callable<ArrayList<InetAddress>> zuper) throws Exception
+        {
+            if (keyspaceName.equals(KEYSPACE) && block.get())
+                latch.await();
+            return zuper.call();
+        }
+    }
+}
diff --git a/test/unit/org/apache/cassandra/locator/AbstractReplicationStrategyTest.java b/test/unit/org/apache/cassandra/locator/AbstractReplicationStrategyTest.java
new file mode 100644
index 0000000..62206ce
--- /dev/null
+++ b/test/unit/org/apache/cassandra/locator/AbstractReplicationStrategyTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class AbstractReplicationStrategyTest
+{
+    @Test
+    public void testReplicaCache()
+    {
+        AbstractReplicationStrategy.ReplicaCache<Integer, Integer> cache = new AbstractReplicationStrategy.ReplicaCache<>();
+
+        cache.put(10, 1, 1);
+        assertEquals(1, (int)cache.get(10, 1));
+        assertNull(cache.get(9,1)); // get with old ringversion, return null to force a recalculation
+        assertNull(cache.get(11,1)); // newer ringVersion - cache gets cleared
+        assertNull(cache.get(10,1)); // and make sure the map got cleared
+
+        cache.put(11, 1, 100);
+        cache.put(10, 1, 99);
+        assertEquals(100, (int)cache.get(11, 1));
+        assertNull(cache.get(12, 55));
+        assertNull(cache.get(11, 1));
+    }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org