You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2022/01/17 13:19:49 UTC

[cassandra] branch cassandra-4.0 updated (98e798f -> 9a1bb62)

This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a change to branch cassandra-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git.


    from 98e798f  Don't block gossip when clearing repair snapshots
     new b1a8a56  Avoid race in AbstractReplicationStrategy endpoint caching
     new b5d0626  Merge branch 'cassandra-3.0' into cassandra-3.11
     new 9a1bb62  Merge branch 'cassandra-3.11' into cassandra-4.0

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CHANGES.txt                                        |   1 +
 .../locator/AbstractReplicationStrategy.java       |  92 ++++++++++++-----
 .../apache/cassandra/locator/TokenMetadata.java    |   9 +-
 .../distributed/test/ring/BootstrapTest.java       |   1 +
 .../test/ring/ReadsDuringBootstrapTest.java        | 114 +++++++++++++++++++++
 .../AbstractReplicationStrategyTest.java}          |  36 +++----
 6 files changed, 208 insertions(+), 45 deletions(-)
 create mode 100644 test/distributed/org/apache/cassandra/distributed/test/ring/ReadsDuringBootstrapTest.java
 copy test/unit/org/apache/cassandra/{tools/cqlsh/CqlshTest.java => locator/AbstractReplicationStrategyTest.java} (52%)

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 01/01: Merge branch 'cassandra-3.11' into cassandra-4.0

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch cassandra-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 9a1bb62822806ea947236b2f5491ecb3669fabde
Merge: 98e798f b5d0626
Author: Marcus Eriksson <ma...@apache.org>
AuthorDate: Mon Jan 17 14:09:10 2022 +0100

    Merge branch 'cassandra-3.11' into cassandra-4.0

 CHANGES.txt                                        |   1 +
 .../locator/AbstractReplicationStrategy.java       |  92 ++++++++++++-----
 .../apache/cassandra/locator/TokenMetadata.java    |   9 +-
 .../distributed/test/ring/BootstrapTest.java       |   1 +
 .../test/ring/ReadsDuringBootstrapTest.java        | 114 +++++++++++++++++++++
 .../locator/AbstractReplicationStrategyTest.java   |  45 ++++++++
 6 files changed, 236 insertions(+), 26 deletions(-)

diff --cc CHANGES.txt
index 0896d56,5e8213f..d9303f8
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,36 -1,17 +1,37 @@@
 -3.11.12
 - * Upgrade snakeyaml to 1.26 in 3.11 (CASSANDRA=17028)
 +4.0.2
 + * Don't block gossip when clearing repair snapshots (CASSANDRA-17168)
 + * Deduplicate warnings for deprecated parameters (changed names) (CASSANDRA-17160)
 + * Update ant-junit to version 1.10.12 (CASSANDRA-17218)
 + * Add droppable tombstone metrics to nodetool tablestats (CASSANDRA-16308)
 + * Fix disk failure triggered when enabling FQL on an unclean directory (CASSANDRA-17136)
 + * Fixed broken classpath when multiple jars in build directory (CASSANDRA-17129)
 + * DebuggableThreadPoolExecutor does not propagate client warnings (CASSANDRA-17072)
 + * internode_send_buff_size_in_bytes and internode_recv_buff_size_in_bytes have new names. Backward compatibility with the old names added (CASSANDRA-17141)
 + * Remove unused configuration parameters from cassandra.yaml (CASSANDRA-17132)
 + * Queries performed with NODE_LOCAL consistency level do not update request metrics (CASSANDRA-17052)
 + * Fix multiple full sources can be select unexpectedly for bootstrap streaming (CASSANDRA-16945)
 + * Fix cassandra.yaml formatting of parameters (CASSANDRA-17131)
 + * Add backward compatibility for CQLSSTableWriter Date fields (CASSANDRA-17117)
 + * Push initial client connection messages to trace (CASSANDRA-17038)
 + * Correct the internode message timestamp if sending node has wrapped (CASSANDRA-16997)
 + * Avoid race causing us to return null in RangesAtEndpoint (CASSANDRA-16965)
 + * Avoid rewriting all sstables during cleanup when transient replication is enabled (CASSANDRA-16966)
 + * Prevent CQLSH from failure on Python 3.10 (CASSANDRA-16987)
 + * Avoid trying to acquire 0 permits from the rate limiter when taking snapshot (CASSANDRA-16872)
 + * Upgrade Caffeine to 2.5.6 (CASSANDRA-15153)
 + * Include SASI components to snapshots (CASSANDRA-15134)
 + * Fix missed wait latencies in the output of `nodetool tpstats -F` (CASSANDRA-16938)
 + * Remove all the state pollution between tests in SSTableReaderTest (CASSANDRA-16888)
 + * Delay auth setup until after gossip has settled to avoid unavailables on startup (CASSANDRA-16783)
 + * Fix clustering order logic in CREATE MATERIALIZED VIEW (CASSANDRA-16898)
 + * org.apache.cassandra.db.rows.ArrayCell#unsharedHeapSizeExcludingData includes data twice (CASSANDRA-16900)
 + * Exclude Jackson 1.x transitive dependency of hadoop* provided dependencies (CASSANDRA-16854)
 +Merged from 3.11:
   * Add key validation to ssstablescrub (CASSANDRA-16969)
   * Update Jackson from 2.9.10 to 2.12.5 (CASSANDRA-16851)
 - * Include SASI components to snapshots (CASSANDRA-15134)
   * Make assassinate more resilient to missing tokens (CASSANDRA-16847)
 - * Exclude Jackson 1.x transitive dependency of hadoop* provided dependencies (CASSANDRA-16854)
 - * Validate SASI tokenizer options before adding index to schema (CASSANDRA-15135)
 - * Fixup scrub output when no data post-scrub and clear up old use of row, which really means partition (CASSANDRA-16835)
 - * Fix ant-junit dependency issue (CASSANDRA-16827)
 - * Reduce thread contention in CommitLogSegment and HintsBuffer (CASSANDRA-16072)
 - * Avoid sending CDC column if not enabled (CASSANDRA-16770)
  Merged from 3.0:
+  * Avoid race in AbstractReplicationStrategy endpoint caching (CASSANDRA-16673)
   * Fix abort when window resizing during cqlsh COPY (CASSANDRA-15230)
   * Fix slow keycache load which blocks startup for tables with many sstables (CASSANDRA-14898)
   * Fix rare NPE caused by batchlog replay / node decomission races (CASSANDRA-17049)
diff --cc src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
index 7891895,f4dc3b6..909a7f6
--- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
+++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
@@@ -23,9 -24,11 +23,10 @@@ import java.lang.reflect.Method
  import java.util.Collection;
  import java.util.Collections;
  import java.util.Map;
+ import java.util.concurrent.atomic.AtomicReference;
  
 -import com.google.common.annotations.VisibleForTesting;
 -import com.google.common.collect.HashMultimap;
 -import com.google.common.collect.Multimap;
 +import com.google.common.base.Preconditions;
 +
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
@@@ -51,13 -53,12 +52,10 @@@ public abstract class AbstractReplicati
  {
      private static final Logger logger = LoggerFactory.getLogger(AbstractReplicationStrategy.class);
  
 -    @VisibleForTesting
 -    final String keyspaceName;
 -    private Keyspace keyspace;
      public final Map<String, String> configOptions;
 +    protected final String keyspaceName;
      private final TokenMetadata tokenMetadata;
- 
-     // track when the token range changes, signaling we need to invalidate our endpoint cache
-     private volatile long lastInvalidatedVersion = 0;
- 
 -    private final ReplicaCache<Token, ArrayList<InetAddress>> replicas = new ReplicaCache<>();
++    private final ReplicaCache<Token, EndpointsForRange> replicas = new ReplicaCache<>();
      public IEndpointSnitch snitch;
  
      protected AbstractReplicationStrategy(String keyspaceName, TokenMetadata tokenMetadata, IEndpointSnitch snitch, Map<String, String> configOptions)
@@@ -68,28 -70,12 +66,11 @@@
          this.snitch = snitch;
          this.configOptions = configOptions == null ? Collections.<String, String>emptyMap() : configOptions;
          this.keyspaceName = keyspaceName;
 -        // lazy-initialize keyspace itself since we don't create them until after the replication strategies
      }
  
-     private final Map<Token, EndpointsForRange> cachedReplicas = new NonBlockingHashMap<>();
- 
-     public EndpointsForRange getCachedReplicas(Token t)
 -    private ArrayList<InetAddress> getCachedEndpoints(long ringVersion, Token t)
++    public EndpointsForRange getCachedReplicas(long ringVersion, Token t)
      {
-         long lastVersion = tokenMetadata.getRingVersion();
- 
-         if (lastVersion > lastInvalidatedVersion)
-         {
-             synchronized (this)
-             {
-                 if (lastVersion > lastInvalidatedVersion)
-                 {
-                     logger.trace("clearing cached endpoints");
-                     cachedReplicas.clear();
-                     lastInvalidatedVersion = lastVersion;
-                 }
-             }
-         }
- 
-         return cachedReplicas.get(t);
+         return replicas.get(ringVersion, t);
      }
  
      /**
@@@ -99,33 -85,22 +80,34 @@@
       * @param searchPosition the position the natural endpoints are requested for
       * @return a copy of the natural endpoints for the given token
       */
 -    public ArrayList<InetAddress> getNaturalEndpoints(RingPosition searchPosition)
 +    public EndpointsForToken getNaturalReplicasForToken(RingPosition<?> searchPosition)
 +    {
 +        return getNaturalReplicas(searchPosition).forToken(searchPosition.getToken());
 +    }
 +
 +    public EndpointsForRange getNaturalReplicas(RingPosition<?> searchPosition)
      {
          Token searchToken = searchPosition.getToken();
+         long currentRingVersion = tokenMetadata.getRingVersion();
          Token keyToken = TokenMetadata.firstToken(tokenMetadata.sortedTokens(), searchToken);
-         EndpointsForRange endpoints = getCachedReplicas(keyToken);
 -        ArrayList<InetAddress> endpoints = getCachedEndpoints(currentRingVersion, keyToken);
++        EndpointsForRange endpoints = getCachedReplicas(currentRingVersion, keyToken);
          if (endpoints == null)
          {
              TokenMetadata tm = tokenMetadata.cachedOnlyTokenMap();
              // if our cache got invalidated, it's possible there is a new token to account for too
              keyToken = TokenMetadata.firstToken(tm.sortedTokens(), searchToken);
 -            endpoints = new ArrayList<InetAddress>(calculateNaturalEndpoints(searchToken, tm));
 +            endpoints = calculateNaturalReplicas(searchToken, tm);
-             cachedReplicas.put(keyToken, endpoints);
+             replicas.put(tm.getRingVersion(), keyToken, endpoints);
          }
  
 -        return new ArrayList<>(endpoints);
 +        return endpoints;
 +    }
 +
 +    public Replica getLocalReplicaFor(RingPosition<?> searchPosition)
 +    {
 +        return getNaturalReplicas(searchPosition)
 +               .byEndpoint()
 +               .get(FBUtilities.getBroadcastAddressAndPort());
      }
  
      /**
@@@ -457,4 -317,65 +439,64 @@@
                  throw new ConfigurationException(String.format("Unrecognized strategy option {%s} passed to %s for keyspace %s", key, getClass().getSimpleName(), keyspaceName));
          }
      }
+ 
 -    @VisibleForTesting
 -    public static class ReplicaCache<K, V>
++    static class ReplicaCache<K, V>
+     {
+         private final AtomicReference<ReplicaHolder<K, V>> cachedReplicas = new AtomicReference<>(new ReplicaHolder<>(0, 4));
+ 
+         V get(long ringVersion, K keyToken)
+         {
+             ReplicaHolder<K, V> replicaHolder = maybeClearAndGet(ringVersion);
+             if (replicaHolder == null)
+                 return null;
+ 
+             return replicaHolder.replicas.get(keyToken);
+         }
+ 
+         void put(long ringVersion, K keyToken, V endpoints)
+         {
+             ReplicaHolder<K, V> current = maybeClearAndGet(ringVersion);
+             if (current != null)
+             {
+                 // if we have the same ringVersion, but already know about the keyToken the endpoints should be the same
+                 current.replicas.putIfAbsent(keyToken, endpoints);
+             }
+         }
+ 
+         ReplicaHolder<K, V> maybeClearAndGet(long ringVersion)
+         {
+             ReplicaHolder<K, V> current = cachedReplicas.get();
+             if (ringVersion == current.ringVersion)
+                 return current;
+             else if (ringVersion < current.ringVersion) // things have already moved on
+                 return null;
+ 
+             // If ring version has changed, create a fresh replica holder and try to replace the current one.
+             // This may race with other threads that have the same new ring version and one will win and the loosers
+             // will be garbage collected
+             ReplicaHolder<K, V> cleaned = new ReplicaHolder<>(ringVersion, current.replicas.size());
+             cachedReplicas.compareAndSet(current, cleaned);
+ 
+             // A new ring version may have come along while making the new holder, so re-check the
+             // reference and return the ring version if the same, otherwise return null as there is no point
+             // in using it.
+             current = cachedReplicas.get();
+             if (ringVersion == current.ringVersion)
+                 return current;
+             else
+                 return null;
+         }
+     }
+ 
+     static class ReplicaHolder<K, V>
+     {
+         private final long ringVersion;
+         private final NonBlockingHashMap<K, V> replicas;
+ 
+         ReplicaHolder(long ringVersion, int expectedEntries)
+         {
+             this.ringVersion = ringVersion;
+             this.replicas = new NonBlockingHashMap<>(expectedEntries);
+         }
+     }
  }
diff --cc src/java/org/apache/cassandra/locator/TokenMetadata.java
index ab21045,0221187..108e218
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@@ -120,16 -123,13 +120,21 @@@ public class TokenMetadat
               DatabaseDescriptor.getPartitioner());
      }
  
 -    private TokenMetadata(BiMultiValMap<Token, InetAddress> tokenToEndpointMap, BiMap<InetAddress, UUID> endpointsMap, Topology topology, IPartitioner partitioner)
 +    public TokenMetadata(IEndpointSnitch snitch)
 +    {
 +        this(SortedBiMultiValMap.create(),
 +             HashBiMap.create(),
 +             Topology.builder(() -> snitch).build(),
 +             DatabaseDescriptor.getPartitioner());
 +    }
 +
 +    private TokenMetadata(BiMultiValMap<Token, InetAddressAndPort> tokenToEndpointMap, BiMap<InetAddressAndPort, UUID> endpointsMap, Topology topology, IPartitioner partitioner)
      {
+         this(tokenToEndpointMap, endpointsMap, topology, partitioner, 0);
+     }
+ 
 -    private TokenMetadata(BiMultiValMap<Token, InetAddress> tokenToEndpointMap, BiMap<InetAddress, UUID> endpointsMap, Topology topology, IPartitioner partitioner, long ringVersion)
++    private TokenMetadata(BiMultiValMap<Token, InetAddressAndPort> tokenToEndpointMap, BiMap<InetAddressAndPort, UUID> endpointsMap, Topology topology, IPartitioner partitioner, long ringVersion)
+     {
          this.tokenToEndpointMap = tokenToEndpointMap;
          this.topology = topology;
          this.partitioner = partitioner;
@@@ -645,10 -646,11 +651,11 @@@
          lock.readLock().lock();
          try
          {
 -            return new TokenMetadata(SortedBiMultiValMap.create(tokenToEndpointMap, null, inetaddressCmp),
 +            return new TokenMetadata(SortedBiMultiValMap.create(tokenToEndpointMap),
                                       HashBiMap.create(endpointToHostIdMap),
                                       topology,
-                                      partitioner);
+                                      partitioner,
+                                      ringVersion);
          }
          finally
          {
diff --cc test/distributed/org/apache/cassandra/distributed/test/ring/BootstrapTest.java
index 065ef0e,0000000..52d0f16
mode 100644,000000..100644
--- a/test/distributed/org/apache/cassandra/distributed/test/ring/BootstrapTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/ring/BootstrapTest.java
@@@ -1,148 -1,0 +1,149 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.distributed.test.ring;
 +
 +import java.util.Map;
 +import java.util.stream.Collectors;
 +import java.util.stream.IntStream;
 +
 +import org.junit.Assert;
 +import org.junit.Test;
 +
 +import org.apache.cassandra.distributed.Cluster;
 +import org.apache.cassandra.distributed.api.ConsistencyLevel;
 +import org.apache.cassandra.distributed.api.ICluster;
 +import org.apache.cassandra.distributed.api.IInstanceConfig;
 +import org.apache.cassandra.distributed.api.IInvokableInstance;
 +import org.apache.cassandra.distributed.api.TokenSupplier;
 +import org.apache.cassandra.distributed.shared.NetworkTopology;
 +import org.apache.cassandra.distributed.test.TestBaseImpl;
 +
 +import static java.util.Arrays.asList;
 +import static org.apache.cassandra.distributed.action.GossipHelper.bootstrap;
 +import static org.apache.cassandra.distributed.action.GossipHelper.pullSchemaFrom;
 +import static org.apache.cassandra.distributed.action.GossipHelper.statusToBootstrap;
 +import static org.apache.cassandra.distributed.action.GossipHelper.withProperty;
 +import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
 +import static org.apache.cassandra.distributed.api.Feature.NETWORK;
 +
 +public class BootstrapTest extends TestBaseImpl
 +{
 +    @Test
 +    public void bootstrapTest() throws Throwable
 +    {
 +        int originalNodeCount = 2;
 +        int expandedNodeCount = originalNodeCount + 1;
 +
 +        try (Cluster cluster = builder().withNodes(originalNodeCount)
 +                                        .withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount))
 +                                        .withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, "dc0", "rack0"))
 +                                        .withConfig(config -> config.with(NETWORK, GOSSIP))
 +                                        .start())
 +        {
 +            populate(cluster,0, 100);
 +
 +            IInstanceConfig config = cluster.newInstanceConfig();
 +            IInvokableInstance newInstance = cluster.bootstrap(config);
 +            withProperty("cassandra.join_ring", false,
 +                         () -> newInstance.startup(cluster));
 +
 +            cluster.forEach(statusToBootstrap(newInstance));
 +
 +            cluster.run(asList(pullSchemaFrom(cluster.get(1)),
 +                               bootstrap()),
 +                        newInstance.config().num());
 +
 +            for (Map.Entry<Integer, Long> e : count(cluster).entrySet())
 +                Assert.assertEquals("Node " + e.getKey() + " has incorrect row state",
 +                                    100L,
 +                                    e.getValue().longValue());
 +        }
 +    }
 +
 +    @Test
 +    public void readWriteDuringBootstrapTest() throws Throwable
 +    {
 +        int originalNodeCount = 2;
 +        int expandedNodeCount = originalNodeCount + 1;
 +
 +        try (Cluster cluster = builder().withNodes(originalNodeCount)
 +                                        .withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount))
 +                                        .withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, "dc0", "rack0"))
 +                                        .withConfig(config -> config.with(NETWORK, GOSSIP))
 +                                        .start())
 +        {
 +            IInstanceConfig config = cluster.newInstanceConfig();
 +            IInvokableInstance newInstance = cluster.bootstrap(config);
 +            withProperty("cassandra.join_ring", false,
 +                         () -> newInstance.startup(cluster));
 +
 +            cluster.forEach(statusToBootstrap(newInstance));
 +
 +            populate(cluster,0, 100);
 +
 +            Assert.assertEquals(100, newInstance.executeInternal("SELECT *FROM " + KEYSPACE + ".tbl").length);
 +        }
 +    }
 +
 +    @Test
 +    public void autoBootstrapTest() throws Throwable
 +    {
 +        int originalNodeCount = 2;
 +        int expandedNodeCount = originalNodeCount + 1;
 +
 +        try (Cluster cluster = builder().withNodes(originalNodeCount)
 +                                        .withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount))
 +                                        .withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, "dc0", "rack0"))
 +                                        .withConfig(config -> config.with(NETWORK, GOSSIP))
 +                                        .start())
 +        {
 +            populate(cluster,0, 100);
 +            bootstrapAndJoinNode(cluster);
 +
 +            for (Map.Entry<Integer, Long> e : count(cluster).entrySet())
 +                Assert.assertEquals("Node " + e.getKey() + " has incorrect row state", e.getValue().longValue(), 100L);
 +        }
 +    }
 +
 +    public static void populate(ICluster cluster, int from, int to)
 +    {
 +        populate(cluster, from, to, 1, 3, ConsistencyLevel.QUORUM);
 +    }
 +
 +    public static void populate(ICluster cluster, int from, int to, int coord, int rf, ConsistencyLevel cl)
 +    {
 +        cluster.schemaChange("CREATE KEYSPACE IF NOT EXISTS " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + rf + "};");
 +        cluster.schemaChange("CREATE TABLE IF NOT EXISTS " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
 +        for (int i = from; i < to; i++)
 +        {
 +            cluster.coordinator(coord).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (?, ?, ?)",
 +                                               cl,
 +                                               i, i, i);
 +        }
 +    }
 +
 +    public static Map<Integer, Long> count(ICluster cluster)
 +    {
 +        return IntStream.rangeClosed(1, cluster.size())
 +                        .boxed()
 +                        .collect(Collectors.toMap(nodeId -> nodeId,
 +                                                  nodeId -> (Long) cluster.get(nodeId).executeInternal("SELECT count(*) FROM " + KEYSPACE + ".tbl")[0][0]));
 +    }
++
 +}
diff --cc test/distributed/org/apache/cassandra/distributed/test/ring/ReadsDuringBootstrapTest.java
index 0000000,0000000..4898479
new file mode 100644
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/ring/ReadsDuringBootstrapTest.java
@@@ -1,0 -1,0 +1,114 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.cassandra.distributed.test.ring;
++
++import java.io.IOException;
++import java.util.concurrent.Callable;
++import java.util.concurrent.CountDownLatch;
++import java.util.concurrent.ExecutionException;
++import java.util.concurrent.ExecutorService;
++import java.util.concurrent.Executors;
++import java.util.concurrent.Future;
++import java.util.concurrent.TimeoutException;
++import java.util.concurrent.atomic.AtomicBoolean;
++
++import org.junit.Test;
++
++import net.bytebuddy.ByteBuddy;
++import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
++import net.bytebuddy.implementation.MethodDelegation;
++import net.bytebuddy.implementation.bind.annotation.FieldValue;
++import net.bytebuddy.implementation.bind.annotation.SuperCall;
++import org.apache.cassandra.dht.Token;
++import org.apache.cassandra.distributed.Cluster;
++import org.apache.cassandra.distributed.api.ConsistencyLevel;
++import org.apache.cassandra.distributed.api.TokenSupplier;
++import org.apache.cassandra.distributed.shared.NetworkTopology;
++import org.apache.cassandra.distributed.test.TestBaseImpl;
++import org.apache.cassandra.locator.AbstractReplicationStrategy;
++import org.apache.cassandra.locator.EndpointsForRange;
++
++import static net.bytebuddy.matcher.ElementMatchers.named;
++import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
++import static org.apache.cassandra.distributed.api.Feature.NETWORK;
++
++public class ReadsDuringBootstrapTest extends TestBaseImpl
++{
++    @Test
++    public void readsDuringBootstrapTest() throws IOException, ExecutionException, InterruptedException, TimeoutException
++    {
++        int originalNodeCount = 3;
++        int expandedNodeCount = originalNodeCount + 1;
++        ExecutorService es = Executors.newSingleThreadExecutor();
++        try (Cluster cluster = builder().withNodes(originalNodeCount)
++                                        .withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount))
++                                        .withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, "dc0", "rack0"))
++                                        .withConfig(config -> config.with(NETWORK, GOSSIP)
++                                                                    .set("read_request_timeout_in_ms", Integer.MAX_VALUE)
++                                                                    .set("request_timeout_in_ms", Integer.MAX_VALUE))
++                                        .withInstanceInitializer(BB::install)
++                                        .start())
++        {
++            String query = withKeyspace("SELECT * FROM %s.tbl WHERE id = ?");
++            cluster.schemaChange(withKeyspace("CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 2};"));
++            cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (id int PRIMARY KEY)"));
++            cluster.get(1).runOnInstance(() -> BB.block.set(true));
++            Future<?> read = es.submit(() -> cluster.coordinator(1).execute(query, ConsistencyLevel.QUORUM, 3));
++            long mark = cluster.get(1).logs().mark();
++            bootstrapAndJoinNode(cluster);
++            cluster.get(1).logs().watchFor(mark, "New node /127.0.0.4");
++            cluster.get(1).runOnInstance(() -> BB.block.set(false));
++            // populate cache
++            for (int i = 0; i < 10; i++)
++                cluster.coordinator(1).execute(query, ConsistencyLevel.QUORUM, i);
++            cluster.get(1).runOnInstance(() -> BB.latch.countDown());
++            read.get();
++        }
++        finally
++        {
++            es.shutdown();
++        }
++    }
++
++    public static class BB
++    {
++        public static final AtomicBoolean block = new AtomicBoolean();
++        public static final CountDownLatch latch = new CountDownLatch(1);
++        private static void install(ClassLoader cl, Integer instanceId)
++        {
++            if (instanceId != 1)
++                return;
++            new ByteBuddy().rebase(AbstractReplicationStrategy.class)
++                           .method(named("getCachedReplicas"))
++                           .intercept(MethodDelegation.to(BB.class))
++                           .make()
++                           .load(cl, ClassLoadingStrategy.Default.INJECTION);
++        }
++
++        public static EndpointsForRange getCachedReplicas(long ringVersion, Token t,
++                                                          @FieldValue("keyspaceName") String keyspaceName,
++                                                          @SuperCall Callable<EndpointsForRange> zuper) throws Exception
++        {
++            if (keyspaceName.equals(KEYSPACE) && block.get())
++                latch.await();
++            return zuper.call();
++        }
++    }
++
++}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org