You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2022/01/17 13:19:46 UTC
[cassandra] branch cassandra-3.0 updated: Avoid race in AbstractReplicationStrategy endpoint caching
This is an automated email from the ASF dual-hosted git repository.
marcuse pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-3.0 by this push:
new b1a8a56 Avoid race in AbstractReplicationStrategy endpoint caching
b1a8a56 is described below
commit b1a8a56c563b85ab9a34d3bbf9c16278dd441157
Author: Marcus Eriksson <ma...@apache.org>
AuthorDate: Tue May 4 09:56:32 2021 +0200
Avoid race in AbstractReplicationStrategy endpoint caching
Patch by marcuse; reviewed by Alex Petrov and Jon Meredith for CASSANDRA-16673
Co-authored-by: Jon Meredith <jo...@apache.org>
---
CHANGES.txt | 1 +
.../locator/AbstractReplicationStrategy.java | 98 ++++++++++++-----
.../apache/cassandra/locator/TokenMetadata.java | 9 +-
.../distributed/test/ReadsDuringBootstrapTest.java | 120 +++++++++++++++++++++
.../locator/AbstractReplicationStrategyTest.java | 45 ++++++++
5 files changed, 246 insertions(+), 27 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 20fbb32..e23f5e7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.26:
+ * Avoid race in AbstractReplicationStrategy endpoint caching (CASSANDRA-16673)
* Fix abort when window resizing during cqlsh COPY (CASSANDRA-15230)
* Fix slow keycache load which blocks startup for tables with many sstables (CASSANDRA-14898)
* Fix rare NPE caused by batchlog replay / node decomission races (CASSANDRA-17049)
diff --git a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
index b326e1c..709de20 100644
--- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
+++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
@@ -21,6 +21,10 @@ import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.net.InetAddress;
import java.util.*;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.HashMultimap;
@@ -54,10 +58,7 @@ public abstract class AbstractReplicationStrategy
private Keyspace keyspace;
public final Map<String, String> configOptions;
private final TokenMetadata tokenMetadata;
-
- // track when the token range changes, signaling we need to invalidate our endpoint cache
- private volatile long lastInvalidatedVersion = 0;
-
+ private final ReplicaCache<Token, ArrayList<InetAddress>> replicas = new ReplicaCache<>();
public IEndpointSnitch snitch;
protected AbstractReplicationStrategy(String keyspaceName, TokenMetadata tokenMetadata, IEndpointSnitch snitch, Map<String, String> configOptions)
@@ -72,26 +73,9 @@ public abstract class AbstractReplicationStrategy
// lazy-initialize keyspace itself since we don't create them until after the replication strategies
}
- private final Map<Token, ArrayList<InetAddress>> cachedEndpoints = new NonBlockingHashMap<Token, ArrayList<InetAddress>>();
-
- public ArrayList<InetAddress> getCachedEndpoints(Token t)
+ private ArrayList<InetAddress> getCachedEndpoints(long ringVersion, Token t)
{
- long lastVersion = tokenMetadata.getRingVersion();
-
- if (lastVersion > lastInvalidatedVersion)
- {
- synchronized (this)
- {
- if (lastVersion > lastInvalidatedVersion)
- {
- logger.trace("clearing cached endpoints");
- cachedEndpoints.clear();
- lastInvalidatedVersion = lastVersion;
- }
- }
- }
-
- return cachedEndpoints.get(t);
+ return replicas.get(ringVersion, t);
}
/**
@@ -104,18 +88,19 @@ public abstract class AbstractReplicationStrategy
public ArrayList<InetAddress> getNaturalEndpoints(RingPosition searchPosition)
{
Token searchToken = searchPosition.getToken();
+ long currentRingVersion = tokenMetadata.getRingVersion();
Token keyToken = TokenMetadata.firstToken(tokenMetadata.sortedTokens(), searchToken);
- ArrayList<InetAddress> endpoints = getCachedEndpoints(keyToken);
+ ArrayList<InetAddress> endpoints = getCachedEndpoints(currentRingVersion, keyToken);
if (endpoints == null)
{
TokenMetadata tm = tokenMetadata.cachedOnlyTokenMap();
// if our cache got invalidated, it's possible there is a new token to account for too
keyToken = TokenMetadata.firstToken(tm.sortedTokens(), searchToken);
endpoints = new ArrayList<InetAddress>(calculateNaturalEndpoints(searchToken, tm));
- cachedEndpoints.put(keyToken, endpoints);
+ replicas.put(tm.getRingVersion(), keyToken, endpoints);
}
- return new ArrayList<InetAddress>(endpoints);
+ return new ArrayList<>(endpoints);
}
/**
@@ -331,4 +316,65 @@ public abstract class AbstractReplicationStrategy
throw new ConfigurationException(String.format("Unrecognized strategy option {%s} passed to %s for keyspace %s", key, getClass().getSimpleName(), keyspaceName));
}
}
+
+ @VisibleForTesting
+ public static class ReplicaCache<K, V>
+ {
+ private final AtomicReference<ReplicaHolder<K, V>> cachedReplicas = new AtomicReference<>(new ReplicaHolder<>(0, 4));
+
+ V get(long ringVersion, K keyToken)
+ {
+ ReplicaHolder<K, V> replicaHolder = maybeClearAndGet(ringVersion);
+ if (replicaHolder == null)
+ return null;
+
+ return replicaHolder.replicas.get(keyToken);
+ }
+
+ void put(long ringVersion, K keyToken, V endpoints)
+ {
+ ReplicaHolder<K, V> current = maybeClearAndGet(ringVersion);
+ if (current != null)
+ {
+ // if we have the same ringVersion, but already know about the keyToken the endpoints should be the same
+ current.replicas.putIfAbsent(keyToken, endpoints);
+ }
+ }
+
+ ReplicaHolder<K, V> maybeClearAndGet(long ringVersion)
+ {
+ ReplicaHolder<K, V> current = cachedReplicas.get();
+ if (ringVersion == current.ringVersion)
+ return current;
+ else if (ringVersion < current.ringVersion) // things have already moved on
+ return null;
+
+ // If ring version has changed, create a fresh replica holder and try to replace the current one.
+ // This may race with other threads that have the same new ring version and one will win and the loosers
+ // will be garbage collected
+ ReplicaHolder<K, V> cleaned = new ReplicaHolder<>(ringVersion, current.replicas.size());
+ cachedReplicas.compareAndSet(current, cleaned);
+
+ // A new ring version may have come along while making the new holder, so re-check the
+ // reference and return the ring version if the same, otherwise return null as there is no point
+ // in using it.
+ current = cachedReplicas.get();
+ if (ringVersion == current.ringVersion)
+ return current;
+ else
+ return null;
+ }
+ }
+
+ static class ReplicaHolder<K, V>
+ {
+ private final long ringVersion;
+ private final NonBlockingHashMap<K, V> replicas;
+
+ ReplicaHolder(long ringVersion, int expectedEntries)
+ {
+ this.ringVersion = ringVersion;
+ this.replicas = new NonBlockingHashMap<>(expectedEntries);
+ }
+ }
}
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index da5f5e3..2d7afcd 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -125,11 +125,17 @@ public class TokenMetadata
private TokenMetadata(BiMultiValMap<Token, InetAddress> tokenToEndpointMap, BiMap<InetAddress, UUID> endpointsMap, Topology topology, IPartitioner partitioner)
{
+ this(tokenToEndpointMap, endpointsMap, topology, partitioner, 0);
+ }
+
+ private TokenMetadata(BiMultiValMap<Token, InetAddress> tokenToEndpointMap, BiMap<InetAddress, UUID> endpointsMap, Topology topology, IPartitioner partitioner, long ringVersion)
+ {
this.tokenToEndpointMap = tokenToEndpointMap;
this.topology = topology;
this.partitioner = partitioner;
endpointToHostIdMap = endpointsMap;
sortedTokens = sortTokens();
+ this.ringVersion = ringVersion;
}
/**
@@ -643,7 +649,8 @@ public class TokenMetadata
return new TokenMetadata(SortedBiMultiValMap.create(tokenToEndpointMap, null, inetaddressCmp),
HashBiMap.create(endpointToHostIdMap),
topology,
- partitioner);
+ partitioner,
+ ringVersion);
}
finally
{
diff --git a/test/distributed/org/apache/cassandra/distributed/test/ReadsDuringBootstrapTest.java b/test/distributed/org/apache/cassandra/distributed/test/ReadsDuringBootstrapTest.java
new file mode 100644
index 0000000..92f892c
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/ReadsDuringBootstrapTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.Test;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.FieldValue;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.shared.NetworkTopology;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+
+public class ReadsDuringBootstrapTest extends TestBaseImpl
+{
+ @Test
+ public void readsDuringBootstrapTest() throws IOException, ExecutionException, InterruptedException, TimeoutException
+ {
+ int originalNodeCount = 3;
+ int expandedNodeCount = originalNodeCount + 1;
+ ExecutorService es = Executors.newSingleThreadExecutor();
+ try (Cluster cluster = builder().withNodes(originalNodeCount)
+ .withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount))
+ .withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, "dc0", "rack0"))
+ .withConfig(config -> config.with(NETWORK, GOSSIP)
+ .set("read_request_timeout_in_ms", Integer.MAX_VALUE)
+ .set("request_timeout_in_ms", Integer.MAX_VALUE))
+ .withInstanceInitializer(BB::install)
+ .start())
+ {
+ String query = withKeyspace("SELECT * FROM %s.tbl WHERE id = ?");
+ cluster.schemaChange(withKeyspace("CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 2};"));
+ cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (id int PRIMARY KEY)"));
+ cluster.get(1).runOnInstance(() -> BB.block.set(true));
+ Future<?> read = es.submit(() -> cluster.coordinator(1).execute(query, ConsistencyLevel.QUORUM, 3));
+ long mark = cluster.get(1).logs().mark();
+
+ IInstanceConfig config = cluster.newInstanceConfig();
+ config.set("auto_bootstrap", true);
+ IInvokableInstance bootstrapInstance = cluster.bootstrap(config);
+ bootstrapInstance.startup();
+
+ cluster.get(1).logs().watchFor(mark, "New node /127.0.0.4");
+ cluster.get(1).runOnInstance(() -> BB.block.set(false));
+ // populate cache
+ for (int i = 0; i < 10; i++)
+ cluster.coordinator(1).execute(query, ConsistencyLevel.QUORUM, i);
+ cluster.get(1).runOnInstance(() -> BB.latch.countDown());
+ read.get();
+ }
+ finally
+ {
+ es.shutdown();
+ }
+ }
+
+ public static class BB
+ {
+ public static final AtomicBoolean block = new AtomicBoolean();
+ public static final CountDownLatch latch = new CountDownLatch(1);
+ private static void install(ClassLoader cl, Integer instanceId)
+ {
+ if (instanceId != 1)
+ return;
+ new ByteBuddy().rebase(AbstractReplicationStrategy.class)
+ .method(named("get"))
+ .intercept(MethodDelegation.to(BB.class))
+ .make()
+ .load(cl, ClassLoadingStrategy.Default.INJECTION);
+ }
+
+ public static ArrayList<InetAddress> getCachedEndpoints(long ringVersion, Token t,
+ @FieldValue("keyspaceName") String keyspaceName,
+ @SuperCall Callable<ArrayList<InetAddress>> zuper) throws Exception
+ {
+ if (keyspaceName.equals(KEYSPACE) && block.get())
+ latch.await();
+ return zuper.call();
+ }
+ }
+}
diff --git a/test/unit/org/apache/cassandra/locator/AbstractReplicationStrategyTest.java b/test/unit/org/apache/cassandra/locator/AbstractReplicationStrategyTest.java
new file mode 100644
index 0000000..62206ce
--- /dev/null
+++ b/test/unit/org/apache/cassandra/locator/AbstractReplicationStrategyTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class AbstractReplicationStrategyTest
+{
+ @Test
+ public void testReplicaCache()
+ {
+ AbstractReplicationStrategy.ReplicaCache<Integer, Integer> cache = new AbstractReplicationStrategy.ReplicaCache<>();
+
+ cache.put(10, 1, 1);
+ assertEquals(1, (int)cache.get(10, 1));
+ assertNull(cache.get(9,1)); // get with old ringversion, return null to force a recalculation
+ assertNull(cache.get(11,1)); // newer ringVersion - cache gets cleared
+ assertNull(cache.get(10,1)); // and make sure the map got cleared
+
+ cache.put(11, 1, 100);
+ cache.put(10, 1, 99);
+ assertEquals(100, (int)cache.get(11, 1));
+ assertNull(cache.get(12, 55));
+ assertNull(cache.get(11, 1));
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org