You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by dc...@apache.org on 2021/05/13 20:21:08 UTC
[cassandra] branch trunk updated: Move CASSANDRA-14559s
bootstrap_test.py::TestBootstrap::test_node_cannot_join_as_hibernating_node_without_replace_address
into a jvm-dtest
This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new f91418b Move CASSANDRA-14559s bootstrap_test.py::TestBootstrap::test_node_cannot_join_as_hibernating_node_without_replace_address into a jvm-dtest
f91418b is described below
commit f91418bbbedf4e0d5396becf14a5e884ac7f0d3b
Author: David Capwell <dc...@apache.org>
AuthorDate: Thu May 13 12:43:06 2021 -0700
Move CASSANDRA-14559s bootstrap_test.py::TestBootstrap::test_node_cannot_join_as_hibernating_node_without_replace_address into a jvm-dtest
patch by David Capwell; reviewed by Stefan Miklosovic for CASSANDRA-16662
---
.../org/apache/cassandra/distributed/Cluster.java | 2 +
.../distributed/impl/AbstractCluster.java | 3 +
.../cassandra/distributed/shared/ClusterUtils.java | 61 +++++++++-
.../cassandra/distributed/test/TestBaseImpl.java | 2 +-
...AsHibernatingNodeWithoutReplaceAddressTest.java | 133 +++++++++++++++++++++
5 files changed, 197 insertions(+), 4 deletions(-)
diff --git a/test/distributed/org/apache/cassandra/distributed/Cluster.java b/test/distributed/org/apache/cassandra/distributed/Cluster.java
index 5c5a954..a613fc5 100644
--- a/test/distributed/org/apache/cassandra/distributed/Cluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/Cluster.java
@@ -24,12 +24,14 @@ import java.util.function.Consumer;
import org.apache.cassandra.distributed.api.IInstanceConfig;
import org.apache.cassandra.distributed.impl.AbstractCluster;
import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.shared.Shared;
import org.apache.cassandra.distributed.shared.Versions;
/**
* A simple cluster supporting only the 'current' Cassandra version, offering easy access to the convenience methods
* of IInvokableInstance on each node.
*/
+@Shared
public class Cluster extends AbstractCluster<IInvokableInstance>
{
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index 8026357..ba584f3 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@ -109,6 +109,7 @@ import static org.apache.cassandra.distributed.shared.NetworkTopology.addressAnd
* handlers for internode to have more control over it. Messaging is wired by passing verbs manually.
* coordinator-handling code and hooks to the callbacks can be found in {@link Coordinator}.
*/
+@Shared
public abstract class AbstractCluster<I extends IInstance> implements ICluster<I>, AutoCloseable
{
public static Versions.Version CURRENT_VERSION = new Versions.Version(FBUtilities.getReleaseVersionString(), Versions.getClassPath());
@@ -254,6 +255,8 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster<I
instanceMap.put(newAddress, (I) this); // if the broadcast address changes, update
instanceMap.remove(previous);
broadcastAddress = newAddress;
+ // remove delegate to make sure static state is reset
+ delegate = null;
}
try
{
diff --git a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
index a68e819..cda742c 100644
--- a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
+++ b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
@@ -19,6 +19,7 @@
package org.apache.cassandra.distributed.shared;
import java.io.File;
+import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
@@ -136,6 +137,22 @@ public class ClusterUtils
* Create a new instance and add it to the cluster, without starting it.
*
* @param cluster to add to
+ * @param other config to copy from
+ * @param fn function to add to the config before starting
+ * @param <I> instance type
+ * @return the instance added
+ */
+ public static <I extends IInstance> I addInstance(AbstractCluster<I> cluster,
+ IInstanceConfig other,
+ Consumer<IInstanceConfig> fn)
+ {
+ return addInstance(cluster, other.localDatacenter(), other.localRack(), fn);
+ }
+
+ /**
+ * Create a new instance and add it to the cluster, without starting it.
+ *
+ * @param cluster to add to
* @param dc the instance should be in
* @param rack the instance should be in
* @param <I> instance type
@@ -205,8 +222,26 @@ public class ClusterUtils
IInstance toReplace,
Consumer<WithProperties> fn)
{
+ return replaceHostAndStart(cluster, toReplace, (ignore, prop) -> fn.accept(prop));
+ }
+
+ /**
+ * Create and start a new instance that replaces an existing instance.
+ *
+ * The instance will be in the same datacenter and rack as the existing instance.
+ *
+ * @param cluster to add to
+ * @param toReplace instance to replace
+ * @param fn lambda to add additional properties or modify instance
+ * @param <I> instance type
+ * @return the instance added
+ */
+ public static <I extends IInstance> I replaceHostAndStart(AbstractCluster<I> cluster,
+ IInstance toReplace,
+ BiConsumer<I, WithProperties> fn)
+ {
IInstanceConfig toReplaceConf = toReplace.config();
- I inst = addInstance(cluster, toReplaceConf.localDatacenter(), toReplaceConf.localRack(), c -> c.set("auto_bootstrap", true));
+ I inst = addInstance(cluster, toReplaceConf, c -> c.set("auto_bootstrap", true));
return start(inst, properties -> {
// lower this so the replacement waits less time
@@ -218,7 +253,7 @@ public class ClusterUtils
// state which node to replace
properties.setProperty("cassandra.replace_address_first_boot", toReplace.config().broadcastAddress().getAddress().getHostAddress());
- fn.accept(properties);
+ fn.accept(inst, properties);
});
}
@@ -671,13 +706,33 @@ public class ClusterUtils
*/
private static void updateAddress(IInstanceConfig conf, String address)
{
+ InetSocketAddress previous = conf.broadcastAddress();
+
for (String key : Arrays.asList("broadcast_address", "listen_address", "broadcast_rpc_address", "rpc_address"))
conf.set(key, address);
// InstanceConfig caches InetSocketAddress -> InetAddressAndPort
// this causes issues as startup now ignores config, so force reset it to pull from conf.
((InstanceConfig) conf).unsetBroadcastAddressAndPort(); //TODO remove the need to null out the cache...
- conf.networkTopology().put(conf.broadcastAddress(), NetworkTopology.dcAndRack(conf.localDatacenter(), conf.localRack()));
+
+ //TODO NetworkTopology class isn't flexible and doesn't handle adding/removing nodes well...
+ // it also uses a HashMap which makes the class not thread safe... so mutating AFTER starting nodes
+ // are a risk
+ if (!conf.broadcastAddress().equals(previous))
+ {
+ conf.networkTopology().put(conf.broadcastAddress(), NetworkTopology.dcAndRack(conf.localDatacenter(), conf.localRack()));
+ try
+ {
+ Field field = NetworkTopology.class.getDeclaredField("map");
+ field.setAccessible(true);
+ Map<InetSocketAddress, NetworkTopology.DcAndRack> map = (Map<InetSocketAddress, NetworkTopology.DcAndRack>) field.get(conf.networkTopology());
+ map.remove(previous);
+ }
+ catch (NoSuchFieldException | IllegalAccessException e)
+ {
+ throw new AssertionError(e);
+ }
+ }
}
/**
diff --git a/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java b/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
index 343ccc8..4755d70 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
@@ -178,7 +178,7 @@ public class TestBaseImpl extends DistributedTestBase
public static void fixDistributedSchemas(Cluster cluster)
{
- // These keyspaces are under replicated by default, so must be updated when doing a mulit-node cluster;
+ // These keyspaces are under replicated by default, so must be updated when doing a multi-node cluster;
// else bootstrap will fail with 'Unable to find sufficient sources for streaming range <range> in keyspace <name>'
for (String ks : Arrays.asList("system_auth", "system_traces"))
{
diff --git a/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/NodeCannotJoinAsHibernatingNodeWithoutReplaceAddressTest.java b/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/NodeCannotJoinAsHibernatingNodeWithoutReplaceAddressTest.java
new file mode 100644
index 0000000..78d55e3
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/NodeCannotJoinAsHibernatingNodeWithoutReplaceAddressTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.hostreplacement;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.Assert;
+import org.junit.Test;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.Constants;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.impl.InstanceIDDefiner;
+import org.apache.cassandra.distributed.shared.ClusterUtils;
+import org.apache.cassandra.distributed.shared.Shared;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.service.PendingRangeCalculatorService;
+import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.assertj.core.api.Assertions;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+
+public class NodeCannotJoinAsHibernatingNodeWithoutReplaceAddressTest extends TestBaseImpl
+{
+ @Test
+ public void test() throws IOException, InterruptedException
+ {
+ TokenSupplier even = TokenSupplier.evenlyDistributedTokens(2);
+ try (Cluster cluster = init(Cluster.build(2)
+ .withConfig(c -> c.with(Feature.values()).set(Constants.KEY_DTEST_API_STARTUP_FAILURE_AS_SHUTDOWN, false))
+ .withInstanceInitializer(BBHelper::install)
+ .withTokenSupplier(node -> even.token((node == 3 || node == 4) ? 2 : node))
+ .start()))
+ {
+ final IInvokableInstance toReplace = cluster.get(2);
+ final String toReplaceAddress = toReplace.broadcastAddress().getAddress().getHostAddress();
+
+ SharedState.cluster = cluster;
+ cluster.setUncaughtExceptionsFilter((nodeId, cause) -> nodeId > 2); // ignore host replacement errors
+ fixDistributedSchemas(cluster);
+
+ ClusterUtils.stopUnchecked(toReplace);
+
+ try
+ {
+ ClusterUtils.replaceHostAndStart(cluster, toReplace, (inst, ignore) -> ClusterUtils.updateAddress(inst, toReplaceAddress));
+ Assert.fail("Host replacement should exit with an error");
+ }
+ catch (Exception e)
+ {
+ // the instance is expected to fail, but it may not have finished shutdown yet, so wait for it to shutdown
+ SharedState.shutdownComplete.await(1, TimeUnit.MINUTES);
+ }
+
+ IInvokableInstance inst = ClusterUtils.addInstance(cluster, toReplace.config(), c -> c.set("auto_bootstrap", true));
+ ClusterUtils.updateAddress(inst, toReplaceAddress);
+ Assertions.assertThatThrownBy(() -> inst.startup())
+ .hasMessageContaining("A node with address")
+ .hasMessageContaining("already exists, cancelling join");
+ }
+ }
+
+ public static class BBHelper
+ {
+ static void install(ClassLoader cl, int nodeNumber)
+ {
+ if (nodeNumber != 3)
+ return;
+ shutdownBeforeNormal(cl);
+ }
+
+ private static void shutdownBeforeNormal(ClassLoader cl)
+ {
+ new ByteBuddy().rebase(PendingRangeCalculatorService.class)
+ .method(named("blockUntilFinished"))
+ .intercept(MethodDelegation.to(ShutdownBeforeNormal.class))
+ .make()
+ .load(cl, ClassLoadingStrategy.Default.INJECTION);
+ }
+ }
+
+ @Shared
+ public static class SharedState
+ {
+ public static volatile Cluster cluster;
+ // Instance.shutdown can only be called once so only the caller knows when its done (isShutdown looks at a field set BEFORE shutting down..)
+ // since the test needs to know when shutdown completes, add this static state so the caller (bytebuddy rewrite) can update it
+ public static final CountDownLatch shutdownComplete = new CountDownLatch(1);
+ }
+
+ public static class ShutdownBeforeNormal
+ {
+ public static void blockUntilFinished(@SuperCall Runnable fn)
+ {
+ fn.run();
+ int id = Integer.parseInt(InstanceIDDefiner.getInstanceId().replace("node", ""));
+ Cluster cluster = Objects.requireNonNull(SharedState.cluster);
+ // can't stop here as the stop method and start method share a lock; and block gets called in start...
+ ForkJoinPool.commonPool().execute(() -> {
+ ClusterUtils.stopAbrupt(cluster, cluster.get(id));
+ SharedState.shutdownComplete.countDown();
+ });
+ JVMStabilityInspector.killCurrentJVM(new RuntimeException("Attempting to stop the instance"), false);
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org