You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by dc...@apache.org on 2021/05/13 20:21:08 UTC

[cassandra] branch trunk updated: Move CASSANDRA-14559s bootstrap_test.py::TestBootstrap::test_node_cannot_join_as_hibernating_node_without_replace_address into a jvm-dtest

This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f91418b  Move CASSANDRA-14559s bootstrap_test.py::TestBootstrap::test_node_cannot_join_as_hibernating_node_without_replace_address into a jvm-dtest
f91418b is described below

commit f91418bbbedf4e0d5396becf14a5e884ac7f0d3b
Author: David Capwell <dc...@apache.org>
AuthorDate: Thu May 13 12:43:06 2021 -0700

    Move CASSANDRA-14559s bootstrap_test.py::TestBootstrap::test_node_cannot_join_as_hibernating_node_without_replace_address into a jvm-dtest
    
    patch by David Capwell; reviewed by Stefan Miklosovic for CASSANDRA-16662
---
 .../org/apache/cassandra/distributed/Cluster.java  |   2 +
 .../distributed/impl/AbstractCluster.java          |   3 +
 .../cassandra/distributed/shared/ClusterUtils.java |  61 +++++++++-
 .../cassandra/distributed/test/TestBaseImpl.java   |   2 +-
 ...AsHibernatingNodeWithoutReplaceAddressTest.java | 133 +++++++++++++++++++++
 5 files changed, 197 insertions(+), 4 deletions(-)

diff --git a/test/distributed/org/apache/cassandra/distributed/Cluster.java b/test/distributed/org/apache/cassandra/distributed/Cluster.java
index 5c5a954..a613fc5 100644
--- a/test/distributed/org/apache/cassandra/distributed/Cluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/Cluster.java
@@ -24,12 +24,14 @@ import java.util.function.Consumer;
 import org.apache.cassandra.distributed.api.IInstanceConfig;
 import org.apache.cassandra.distributed.impl.AbstractCluster;
 import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.shared.Shared;
 import org.apache.cassandra.distributed.shared.Versions;
 
 /**
  * A simple cluster supporting only the 'current' Cassandra version, offering easy access to the convenience methods
  * of IInvokableInstance on each node.
  */
+@Shared
 public class Cluster extends AbstractCluster<IInvokableInstance>
 {
 
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index 8026357..ba584f3 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@ -109,6 +109,7 @@ import static org.apache.cassandra.distributed.shared.NetworkTopology.addressAnd
  * handlers for internode to have more control over it. Messaging is wired by passing verbs manually.
  * coordinator-handling code and hooks to the callbacks can be found in {@link Coordinator}.
  */
+@Shared
 public abstract class AbstractCluster<I extends IInstance> implements ICluster<I>, AutoCloseable
 {
     public static Versions.Version CURRENT_VERSION = new Versions.Version(FBUtilities.getReleaseVersionString(), Versions.getClassPath());
@@ -254,6 +255,8 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster<I
                 instanceMap.put(newAddress, (I) this); // if the broadcast address changes, update
                 instanceMap.remove(previous);
                 broadcastAddress = newAddress;
+                // remove delegate to make sure static state is reset
+                delegate = null;
             }
             try
             {
diff --git a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
index a68e819..cda742c 100644
--- a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
+++ b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
@@ -19,6 +19,7 @@
 package org.apache.cassandra.distributed.shared;
 
 import java.io.File;
+import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -136,6 +137,22 @@ public class ClusterUtils
      * Create a new instance and add it to the cluster, without starting it.
      *
      * @param cluster to add to
+     * @param other config to copy from
+     * @param fn function to add to the config before starting
+     * @param <I> instance type
+     * @return the instance added
+     */
+    public static <I extends IInstance> I addInstance(AbstractCluster<I> cluster,
+                                                      IInstanceConfig other,
+                                                      Consumer<IInstanceConfig> fn)
+    {
+        return addInstance(cluster, other.localDatacenter(), other.localRack(), fn);
+    }
+
+    /**
+     * Create a new instance and add it to the cluster, without starting it.
+     *
+     * @param cluster to add to
      * @param dc the instance should be in
      * @param rack the instance should be in
      * @param <I> instance type
@@ -205,8 +222,26 @@ public class ClusterUtils
                                                               IInstance toReplace,
                                                               Consumer<WithProperties> fn)
     {
+        return replaceHostAndStart(cluster, toReplace, (ignore, prop) -> fn.accept(prop));
+    }
+
+    /**
+     * Create and start a new instance that replaces an existing instance.
+     *
+     * The instance will be in the same datacenter and rack as the existing instance.
+     *
+     * @param cluster to add to
+     * @param toReplace instance to replace
+     * @param fn lambda to add additional properties or modify instance
+     * @param <I> instance type
+     * @return the instance added
+     */
+    public static <I extends IInstance> I replaceHostAndStart(AbstractCluster<I> cluster,
+                                                              IInstance toReplace,
+                                                              BiConsumer<I, WithProperties> fn)
+    {
         IInstanceConfig toReplaceConf = toReplace.config();
-        I inst = addInstance(cluster, toReplaceConf.localDatacenter(), toReplaceConf.localRack(), c -> c.set("auto_bootstrap", true));
+        I inst = addInstance(cluster, toReplaceConf, c -> c.set("auto_bootstrap", true));
 
         return start(inst, properties -> {
             // lower this so the replacement waits less time
@@ -218,7 +253,7 @@ public class ClusterUtils
             // state which node to replace
             properties.setProperty("cassandra.replace_address_first_boot", toReplace.config().broadcastAddress().getAddress().getHostAddress());
 
-            fn.accept(properties);
+            fn.accept(inst, properties);
         });
     }
 
@@ -671,13 +706,33 @@ public class ClusterUtils
      */
     private static void updateAddress(IInstanceConfig conf, String address)
     {
+        InetSocketAddress previous = conf.broadcastAddress();
+
         for (String key : Arrays.asList("broadcast_address", "listen_address", "broadcast_rpc_address", "rpc_address"))
             conf.set(key, address);
 
         // InstanceConfig caches InetSocketAddress -> InetAddressAndPort
         // this causes issues as startup now ignores config, so force reset it to pull from conf.
         ((InstanceConfig) conf).unsetBroadcastAddressAndPort(); //TODO remove the need to null out the cache...
-        conf.networkTopology().put(conf.broadcastAddress(), NetworkTopology.dcAndRack(conf.localDatacenter(), conf.localRack()));
+
+        //TODO NetworkTopology class isn't flexible and doesn't handle adding/removing nodes well...
+        // it also uses a HashMap which makes the class not thread safe... so mutating AFTER starting nodes
+        // are a risk
+        if (!conf.broadcastAddress().equals(previous))
+        {
+            conf.networkTopology().put(conf.broadcastAddress(), NetworkTopology.dcAndRack(conf.localDatacenter(), conf.localRack()));
+            try
+            {
+                Field field = NetworkTopology.class.getDeclaredField("map");
+                field.setAccessible(true);
+                Map<InetSocketAddress, NetworkTopology.DcAndRack> map = (Map<InetSocketAddress, NetworkTopology.DcAndRack>) field.get(conf.networkTopology());
+                map.remove(previous);
+            }
+            catch (NoSuchFieldException | IllegalAccessException e)
+            {
+                throw new AssertionError(e);
+            }
+        }
     }
 
     /**
diff --git a/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java b/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
index 343ccc8..4755d70 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
@@ -178,7 +178,7 @@ public class TestBaseImpl extends DistributedTestBase
 
     public static void fixDistributedSchemas(Cluster cluster)
     {
-        // These keyspaces are under replicated by default, so must be updated when doing a mulit-node cluster;
+        // These keyspaces are under replicated by default, so must be updated when doing a multi-node cluster;
         // else bootstrap will fail with 'Unable to find sufficient sources for streaming range <range> in keyspace <name>'
         for (String ks : Arrays.asList("system_auth", "system_traces"))
         {
diff --git a/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/NodeCannotJoinAsHibernatingNodeWithoutReplaceAddressTest.java b/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/NodeCannotJoinAsHibernatingNodeWithoutReplaceAddressTest.java
new file mode 100644
index 0000000..78d55e3
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/NodeCannotJoinAsHibernatingNodeWithoutReplaceAddressTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.hostreplacement;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.Assert;
+import org.junit.Test;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.Constants;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.impl.InstanceIDDefiner;
+import org.apache.cassandra.distributed.shared.ClusterUtils;
+import org.apache.cassandra.distributed.shared.Shared;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.service.PendingRangeCalculatorService;
+import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.assertj.core.api.Assertions;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+
+public class NodeCannotJoinAsHibernatingNodeWithoutReplaceAddressTest extends TestBaseImpl
+{
+    @Test
+    public void test() throws IOException, InterruptedException
+    {
+        TokenSupplier even = TokenSupplier.evenlyDistributedTokens(2);
+        try (Cluster cluster = init(Cluster.build(2)
+                                           .withConfig(c -> c.with(Feature.values()).set(Constants.KEY_DTEST_API_STARTUP_FAILURE_AS_SHUTDOWN, false))
+                                           .withInstanceInitializer(BBHelper::install)
+                                           .withTokenSupplier(node -> even.token((node == 3 || node == 4) ? 2 : node))
+                                           .start()))
+        {
+            final IInvokableInstance toReplace = cluster.get(2);
+            final String toReplaceAddress = toReplace.broadcastAddress().getAddress().getHostAddress();
+
+            SharedState.cluster = cluster;
+            cluster.setUncaughtExceptionsFilter((nodeId, cause) -> nodeId > 2); // ignore host replacement errors
+            fixDistributedSchemas(cluster);
+
+            ClusterUtils.stopUnchecked(toReplace);
+
+            try
+            {
+                ClusterUtils.replaceHostAndStart(cluster, toReplace, (inst, ignore) -> ClusterUtils.updateAddress(inst, toReplaceAddress));
+                Assert.fail("Host replacement should exit with an error");
+            }
+            catch (Exception e)
+            {
+                // the instance is expected to fail, but it may not have finished shutdown yet, so wait for it to shutdown
+                SharedState.shutdownComplete.await(1, TimeUnit.MINUTES);
+            }
+
+            IInvokableInstance inst = ClusterUtils.addInstance(cluster, toReplace.config(), c -> c.set("auto_bootstrap", true));
+            ClusterUtils.updateAddress(inst, toReplaceAddress);
+            Assertions.assertThatThrownBy(() -> inst.startup())
+                      .hasMessageContaining("A node with address")
+                      .hasMessageContaining("already exists, cancelling join");
+        }
+    }
+
+    public static class BBHelper
+    {
+        static void install(ClassLoader cl, int nodeNumber)
+        {
+            if (nodeNumber != 3)
+                return;
+            shutdownBeforeNormal(cl);
+        }
+
+        private static void shutdownBeforeNormal(ClassLoader cl)
+        {
+            new ByteBuddy().rebase(PendingRangeCalculatorService.class)
+                           .method(named("blockUntilFinished"))
+                           .intercept(MethodDelegation.to(ShutdownBeforeNormal.class))
+                           .make()
+                           .load(cl, ClassLoadingStrategy.Default.INJECTION);
+        }
+    }
+
+    @Shared
+    public static class SharedState
+    {
+        public static volatile Cluster cluster;
+        // Instance.shutdown can only be called once so only the caller knows when its done (isShutdown looks at a field set BEFORE shutting down..)
+        // since the test needs to know when shutdown completes, add this static state so the caller (bytebuddy rewrite) can update it
+        public static final CountDownLatch shutdownComplete = new CountDownLatch(1);
+    }
+
+    public static class ShutdownBeforeNormal
+    {
+        public static void blockUntilFinished(@SuperCall Runnable fn)
+        {
+            fn.run();
+            int id = Integer.parseInt(InstanceIDDefiner.getInstanceId().replace("node", ""));
+            Cluster cluster = Objects.requireNonNull(SharedState.cluster);
+            // can't stop here as the stop method and start method share a lock; and block gets called in start...
+            ForkJoinPool.commonPool().execute(() -> {
+                ClusterUtils.stopAbrupt(cluster, cluster.get(id));
+                SharedState.shutdownComplete.countDown();
+            });
+            JVMStabilityInspector.killCurrentJVM(new RuntimeException("Attempting to stop the instance"), false);
+        }
+    }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org