You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2020/03/27 18:24:26 UTC
[cassandra] branch cassandra-2.2 updated: Extract in-jvm-dtest API
This is an automated email from the ASF dual-hosted git repository.
ifesdjeen pushed a commit to branch cassandra-2.2
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-2.2 by this push:
new 1f72cc6 Extract in-jvm-dtest API
1f72cc6 is described below
commit 1f72cc6197187abac5b1f70a19589dd4883e8d98
Author: Alex Petrov <ol...@gmail.com>
AuthorDate: Fri Feb 7 18:47:33 2020 -0800
Extract in-jvm-dtest API
Patch by Alex Petrov for CASSANDRA-15539; reviewed by David Capwell and Dinesh Joshi.
---
build.xml | 5 +
.../org/apache/cassandra/net/MessagingService.java | 2 +-
src/java/org/apache/cassandra/tools/NodeProbe.java | 16 +
src/java/org/apache/cassandra/tools/NodeTool.java | 4 +-
.../org/apache/cassandra/distributed/Cluster.java | 32 +-
.../cassandra/distributed/UpgradeableCluster.java | 32 +-
.../apache/cassandra/distributed/api/Feature.java | 24 --
.../apache/cassandra/distributed/api/ICluster.java | 36 --
.../cassandra/distributed/api/ICoordinator.java | 36 --
.../cassandra/distributed/api/IInstance.java | 57 ----
.../cassandra/distributed/api/IInstanceConfig.java | 58 ----
.../distributed/api/IIsolatedExecutor.java | 126 -------
.../apache/cassandra/distributed/api/IListen.java | 28 --
.../apache/cassandra/distributed/api/IMessage.java | 37 --
.../cassandra/distributed/api/IMessageFilters.java | 56 ----
.../distributed/impl/AbstractCluster.java | 373 +++++----------------
.../cassandra/distributed/impl/Coordinator.java | 52 ++-
.../impl/DelegatingInvokableInstance.java | 10 +-
.../distributed/impl/DistributedTestSnitch.java | 33 +-
.../distributed/impl/IInvokableInstance.java | 67 ----
.../distributed/impl/IUpgradeableInstance.java | 1 +
.../cassandra/distributed/impl/Instance.java | 290 ++++++++++++----
.../distributed/impl/InstanceClassLoader.java | 132 --------
.../cassandra/distributed/impl/InstanceConfig.java | 98 +++---
.../cassandra/distributed/impl/InstanceKiller.java | 50 +++
.../distributed/impl/IsolatedExecutor.java | 4 +
.../apache/cassandra/distributed/impl/Listen.java | 1 -
.../cassandra/distributed/impl/MessageFilters.java | 165 ---------
.../impl/{Message.java => MessageImpl.java} | 27 +-
.../distributed/impl/NetworkTopology.java | 137 --------
.../apache/cassandra/distributed/impl/RowUtil.java | 7 +
.../cassandra/distributed/impl/TracingUtil.java | 2 +-
.../cassandra/distributed/impl/Versions.java | 190 -----------
.../mock/nodetool/InternalNodeProbe.java | 33 +-
.../mock/nodetool/InternalNodeProbeFactory.java | 11 +-
.../cassandra/distributed/test/BootstrapTest.java | 50 +--
.../test/DistributedReadWritePathTest.java | 210 ------------
.../distributed/test/DistributedTestBase.java | 169 ----------
.../distributed/test/GossipSettlesTest.java | 16 +-
.../distributed/test/MessageFiltersTest.java | 92 +++--
.../distributed/test/MessageForwardingTest.java | 10 +-
.../distributed/test/NativeProtocolTest.java | 49 +--
.../distributed/test/NetworkTopologyTest.java | 40 ++-
.../cassandra/distributed/test/NodeToolTest.java | 2 +-
.../distributed/test/ResourceLeakTest.java | 13 +-
.../distributed/test/SimpleReadWritePathTest.java | 225 +++++++++++++
.../cassandra/distributed/test/TestBaseImpl.java | 49 +++
.../upgrade/MixedModeReadRepairTest.java | 17 +-
.../cassandra/distributed/upgrade/UpgradeTest.java | 42 +--
.../distributed/upgrade/UpgradeTestBase.java | 20 +-
50 files changed, 1099 insertions(+), 2137 deletions(-)
diff --git a/build.xml b/build.xml
index d86cc99..ed9c1a2 100644
--- a/build.xml
+++ b/build.xml
@@ -394,6 +394,8 @@
<exclusion groupId="commons-logging" artifactId="commons-logging"/>
</dependency>
<dependency groupId="junit" artifactId="junit" version="4.6" />
+ <dependency groupId="org.mockito" artifactId="mockito-core" version="3.2.4" />
+ <dependency groupId="org.apache.cassandra" artifactId="dtest-api" version="0.0.1" />
<dependency groupId="org.apache.rat" artifactId="apache-rat" version="0.10">
<exclusion groupId="commons-lang" artifactId="commons-lang"/>
</dependency>
@@ -506,6 +508,8 @@
artifactId="cassandra-parent"
version="${version}"/>
<dependency groupId="junit" artifactId="junit"/>
+ <dependency groupId="org.mockito" artifactId="mockito-core" />
+ <dependency groupId="org.apache.cassandra" artifactId="dtest-api" />
<dependency groupId="org.apache.rat" artifactId="apache-rat"/>
<dependency groupId="org.apache.hadoop" artifactId="hadoop-core"/>
<dependency groupId="org.apache.hadoop" artifactId="hadoop-minicluster"/>
@@ -530,6 +534,7 @@
artifactId="cassandra-parent"
version="${version}"/>
<dependency groupId="junit" artifactId="junit"/>
+ <dependency groupId="org.mockito" artifactId="mockito-core" />
<dependency groupId="org.apache.pig" artifactId="pig">
<exclusion groupId="xmlenc" artifactId="xmlenc"/>
<exclusion groupId="tomcat" artifactId="jasper-runtime"/>
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index e42b91b..f125b09 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -254,7 +254,7 @@ public final class MessagingService implements MessagingServiceMBean
* a placeholder class that means "deserialize using the callback." We can't implement this without
* special-case code in InboundTcpConnection because there is no way to pass the message id to IVersionedSerializer.
*/
- static class CallbackDeterminedSerializer implements IVersionedSerializer<Object>
+ public static class CallbackDeterminedSerializer implements IVersionedSerializer<Object>
{
public static final CallbackDeterminedSerializer instance = new CallbackDeterminedSerializer();
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 3d7db43..9798763 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -163,6 +163,13 @@ public class NodeProbe implements AutoCloseable
connect();
}
+ protected NodeProbe()
+ {
+ // this constructor is only used for extensions to rewrite their own connect method
+ this.host = "";
+ this.port = 0;
+ }
+
/**
* Create a connection to the JMX agent and setup the M[X]Bean proxies.
*
@@ -763,6 +770,15 @@ public class NodeProbe implements AutoCloseable
return spProxy;
}
+ public StorageServiceMBean getStorageService() {
+ return ssProxy;
+ }
+
+ public GossiperMBean getGossProxy()
+ {
+ return gossProxy;
+ }
+
public String getEndpoint()
{
Map<String, String> hostIdToEndpoint = ssProxy.getHostIdToEndpoint();
diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java
index a11b80e..b6dadd6 100644
--- a/src/java/org/apache/cassandra/tools/NodeTool.java
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@ -210,13 +210,13 @@ public class NodeTool
}
}
- private static void badUse(Exception e)
+ protected void badUse(Exception e)
{
System.out.println("nodetool: " + e.getMessage());
System.out.println("See 'nodetool help' or 'nodetool help <command>'.");
}
- private static void err(Throwable e)
+ protected void err(Throwable e)
{
System.err.println("error: " + e.getMessage());
System.err.println("-- StackTrace --");
diff --git a/test/distributed/org/apache/cassandra/distributed/Cluster.java b/test/distributed/org/apache/cassandra/distributed/Cluster.java
index d3533d3..d657638 100644
--- a/test/distributed/org/apache/cassandra/distributed/Cluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/Cluster.java
@@ -23,31 +23,44 @@ import java.io.IOException;
import java.util.List;
import java.util.function.Consumer;
-import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.impl.AbstractCluster;
-import org.apache.cassandra.distributed.impl.IInvokableInstance;
import org.apache.cassandra.distributed.impl.InstanceConfig;
-import org.apache.cassandra.distributed.impl.Versions;
+import org.apache.cassandra.distributed.shared.Builder;
+import org.apache.cassandra.distributed.shared.NetworkTopology;
+import org.apache.cassandra.distributed.shared.Versions;
/**
* A simple cluster supporting only the 'current' Cassandra version, offering easy access to the convenience methods
* of IInvokableInstance on each node.
*/
-public class Cluster extends AbstractCluster<IInvokableInstance> implements ICluster, AutoCloseable
+public class Cluster extends AbstractCluster<IInvokableInstance>
{
- private Cluster(File root, Versions.Version version, List<InstanceConfig> configs, ClassLoader sharedClassLoader)
+
+ private Cluster(File root, Versions.Version version, List<IInstanceConfig> configs, ClassLoader sharedClassLoader)
{
super(root, version, configs, sharedClassLoader);
}
- protected IInvokableInstance newInstanceWrapper(int generation, Versions.Version version, InstanceConfig config)
+ protected IInvokableInstance newInstanceWrapper(int generation, Versions.Version version, IInstanceConfig config)
{
return new Wrapper(generation, version, config);
}
public static Builder<IInvokableInstance, Cluster> build()
{
- return new Builder<>(Cluster::new);
+ return new Builder<IInvokableInstance, Cluster>(Cluster::new)
+ {
+ {
+ withVersion(CURRENT_VERSION);
+ }
+
+ protected IInstanceConfig generateConfig(int nodeNum, String ipAddress, NetworkTopology networkTopology, File root, String token, String seedIp)
+ {
+ return InstanceConfig.generate(nodeNum, ipAddress, networkTopology, root, token, seedIp);
+ }
+ };
}
public static Builder<IInvokableInstance, Cluster> build(int nodeCount)
@@ -55,7 +68,7 @@ public class Cluster extends AbstractCluster<IInvokableInstance> implements IClu
return build().withNodes(nodeCount);
}
- public static Cluster create(int nodeCount, Consumer<InstanceConfig> configUpdater) throws IOException
+ public static Cluster create(int nodeCount, Consumer<IInstanceConfig> configUpdater) throws IOException
{
return build(nodeCount).withConfig(configUpdater).start();
}
@@ -64,5 +77,4 @@ public class Cluster extends AbstractCluster<IInvokableInstance> implements IClu
{
return build(nodeCount).start();
}
-}
-
+}
\ No newline at end of file
diff --git a/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java b/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java
index 1fe960a..a7899fe 100644
--- a/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java
@@ -21,11 +21,13 @@ package org.apache.cassandra.distributed;
import java.io.File;
import java.util.List;
-import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
import org.apache.cassandra.distributed.impl.AbstractCluster;
-import org.apache.cassandra.distributed.impl.IUpgradeableInstance;
+import org.apache.cassandra.distributed.api.IUpgradeableInstance;
import org.apache.cassandra.distributed.impl.InstanceConfig;
-import org.apache.cassandra.distributed.impl.Versions;
+import org.apache.cassandra.distributed.shared.Builder;
+import org.apache.cassandra.distributed.shared.NetworkTopology;
+import org.apache.cassandra.distributed.shared.Versions;
/**
* A multi-version cluster, offering only the cross-version API
@@ -34,21 +36,36 @@ import org.apache.cassandra.distributed.impl.Versions;
* to permit upgrade tests to perform cluster operations without updating the cross-version API,
* so long as one node is up-to-date.
*/
-public class UpgradeableCluster extends AbstractCluster<IUpgradeableInstance> implements ICluster, AutoCloseable
+public class UpgradeableCluster extends AbstractCluster<IUpgradeableInstance> implements AutoCloseable
{
- private UpgradeableCluster(File root, Versions.Version version, List<InstanceConfig> configs, ClassLoader sharedClassLoader)
+ private UpgradeableCluster(File root, Versions.Version version, List<IInstanceConfig> configs, ClassLoader sharedClassLoader)
{
super(root, version, configs, sharedClassLoader);
}
- protected IUpgradeableInstance newInstanceWrapper(int generation, Versions.Version version, InstanceConfig config)
+ protected IUpgradeableInstance newInstanceWrapper(int generation, Versions.Version version, IInstanceConfig config)
{
return new Wrapper(generation, version, config);
}
public static Builder<IUpgradeableInstance, UpgradeableCluster> build()
{
- return new Builder<>(UpgradeableCluster::new);
+ return new Builder<IUpgradeableInstance, UpgradeableCluster>(UpgradeableCluster::new)
+ {
+ {
+ withVersion(CURRENT_VERSION);
+ }
+
+ protected void setupLogging(File file)
+ {
+ setupLogging(file);
+ }
+
+ protected IInstanceConfig generateConfig(int nodeNum, String ipAddress, NetworkTopology networkTopology, File root, String token, String seedIp)
+ {
+ return InstanceConfig.generate(nodeNum, ipAddress, networkTopology, root, token, seedIp);
+ }
+ };
}
public static Builder<IUpgradeableInstance, UpgradeableCluster> build(int nodeCount)
@@ -66,4 +83,3 @@ public class UpgradeableCluster extends AbstractCluster<IUpgradeableInstance> im
return build(nodeCount).withVersion(version).start();
}
}
-
diff --git a/test/distributed/org/apache/cassandra/distributed/api/Feature.java b/test/distributed/org/apache/cassandra/distributed/api/Feature.java
deleted file mode 100644
index b4ba036..0000000
--- a/test/distributed/org/apache/cassandra/distributed/api/Feature.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.distributed.api;
-
-public enum Feature
-{
- NETWORK, GOSSIP, NATIVE_PROTOCOL
-}
diff --git a/test/distributed/org/apache/cassandra/distributed/api/ICluster.java b/test/distributed/org/apache/cassandra/distributed/api/ICluster.java
deleted file mode 100644
index 091e5f0..0000000
--- a/test/distributed/org/apache/cassandra/distributed/api/ICluster.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.distributed.api;
-
-import org.apache.cassandra.locator.InetAddressAndPort;
-
-import java.util.stream.Stream;
-
-public interface ICluster
-{
-
- IInstance get(int i);
- IInstance get(InetAddressAndPort endpoint);
- int size();
- Stream<? extends IInstance> stream();
- Stream<? extends IInstance> stream(String dcName);
- Stream<? extends IInstance> stream(String dcName, String rackName);
- IMessageFilters filters();
-
-}
diff --git a/test/distributed/org/apache/cassandra/distributed/api/ICoordinator.java b/test/distributed/org/apache/cassandra/distributed/api/ICoordinator.java
deleted file mode 100644
index ef44853..0000000
--- a/test/distributed/org/apache/cassandra/distributed/api/ICoordinator.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.distributed.api;
-
-import java.util.Iterator;
-import java.util.UUID;
-import java.util.concurrent.Future;
-
-// The cross-version API requires that a Coordinator can be constructed without any constructor arguments
-public interface ICoordinator
-{
- // a bit hacky, but ConsistencyLevel draws in too many dependent classes, so we cannot have a cross-version
- // method signature that accepts ConsistencyLevel directly. So we just accept an Enum<?> and cast.
- Object[][] execute(String query, Enum<?> consistencyLevel, Object... boundValues);
-
- Iterator<Object[]> executeWithPaging(String query, Enum<?> consistencyLevel, int pageSize, Object... boundValues);
-
- Future<Object[][]> asyncExecuteWithTracing(UUID sessionId, String query, Enum<?> consistencyLevel, Object... boundValues);
- Object[][] executeWithTracing(UUID sessionId, String query, Enum<?> consistencyLevel, Object... boundValues);
-}
diff --git a/test/distributed/org/apache/cassandra/distributed/api/IInstance.java b/test/distributed/org/apache/cassandra/distributed/api/IInstance.java
deleted file mode 100644
index 240c7ec..0000000
--- a/test/distributed/org/apache/cassandra/distributed/api/IInstance.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.distributed.api;
-
-import org.apache.cassandra.locator.InetAddressAndPort;
-
-import java.util.UUID;
-import java.util.concurrent.Future;
-
-// The cross-version API requires that an Instance has a constructor signature of (IInstanceConfig, ClassLoader)
-public interface IInstance extends IIsolatedExecutor
-{
- ICoordinator coordinator();
- IListen listen();
-
- void schemaChangeInternal(String query);
- public Object[][] executeInternal(String query, Object... args);
-
- IInstanceConfig config();
- public InetAddressAndPort broadcastAddressAndPort();
- UUID schemaVersion();
-
- void startup();
- boolean isShutdown();
- Future<Void> shutdown();
- Future<Void> shutdown(boolean graceful);
-
- int liveMemberCount();
-
- int nodetool(String... commandAndArgs);
-
- // these methods are not for external use, but for simplicity we leave them public and on the normal IInstance interface
- void startup(ICluster cluster);
- void receiveMessage(IMessage message);
-
- int getMessagingVersion();
- void setMessagingVersion(InetAddressAndPort endpoint, int version);
-
- void flush(String keyspace);
- void forceCompact(String keyspace, String table);
-}
diff --git a/test/distributed/org/apache/cassandra/distributed/api/IInstanceConfig.java b/test/distributed/org/apache/cassandra/distributed/api/IInstanceConfig.java
deleted file mode 100644
index d2804c2..0000000
--- a/test/distributed/org/apache/cassandra/distributed/api/IInstanceConfig.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.distributed.api;
-
-import java.util.UUID;
-
-import org.apache.cassandra.distributed.impl.NetworkTopology;
-import org.apache.cassandra.locator.InetAddressAndPort;
-
-public interface IInstanceConfig
-{
- int num();
- UUID hostId();
- InetAddressAndPort broadcastAddressAndPort();
- NetworkTopology networkTopology();
-
- default public String localRack()
- {
- return networkTopology().localRack(broadcastAddressAndPort());
- }
-
- default public String localDatacenter()
- {
- return networkTopology().localDC(broadcastAddressAndPort());
- }
-
- /**
- * write the specified parameters to the Config object; we do not specify Config as the type to support a Config
- * from any ClassLoader; the implementation must not directly access any fields of the Object, or cast it, but
- * must use the reflection API to modify the state
- */
- void propagate(Object writeToConfig);
-
- /**
- * Validates whether the config properties are within range of accepted values.
- */
- void validate();
- Object get(String fieldName);
- String getString(String fieldName);
- int getInt(String fieldName);
- boolean has(Feature featureFlag);
-}
diff --git a/test/distributed/org/apache/cassandra/distributed/api/IIsolatedExecutor.java b/test/distributed/org/apache/cassandra/distributed/api/IIsolatedExecutor.java
deleted file mode 100644
index 6bb41d7..0000000
--- a/test/distributed/org/apache/cassandra/distributed/api/IIsolatedExecutor.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.distributed.api;
-
-import java.io.Serializable;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Future;
-import java.util.function.BiConsumer;
-import java.util.function.BiFunction;
-import java.util.function.Consumer;
-import java.util.function.Function;
-
-/**
- * Represents a clean way to handoff evaluation of some work to an executor associated
- * with a node's lifetime.
- *
- * There is no transfer of execution to the parallel class hierarchy.
- *
- * Classes, such as Instance, that are themselves instantiated on the correct ClassLoader, utilise this class
- * to ensure the lifetime of any thread evaluating one of its method invocations matches the lifetime of the class itself.
- * Since they are instantiated on the correct ClassLoader, sharing only the interface, there is no serialization necessary.
- */
-public interface IIsolatedExecutor
-{
- public interface CallableNoExcept<O> extends Callable<O> { public O call(); }
- public interface SerializableCallable<O> extends CallableNoExcept<O>, Serializable { }
- public interface SerializableRunnable extends Runnable, Serializable {}
- public interface SerializableConsumer<O> extends Consumer<O>, Serializable {}
- public interface SerializableBiConsumer<I1, I2> extends BiConsumer<I1, I2>, Serializable {}
- public interface SerializableFunction<I, O> extends Function<I, O>, Serializable {}
- public interface SerializableBiFunction<I1, I2, O> extends BiFunction<I1, I2, O>, Serializable {}
- public interface TriFunction<I1, I2, I3, O>
- {
- O apply(I1 i1, I2 i2, I3 i3);
- }
- public interface SerializableTriFunction<I1, I2, I3, O> extends Serializable, TriFunction<I1, I2, I3, O> { }
-
- Future<Void> shutdown();
-
- /**
- * Convert the execution to one performed asynchronously on the IsolatedExecutor, returning a Future of the execution result
- */
- <O> CallableNoExcept<Future<O>> async(CallableNoExcept<O> call);
-
- /**
- * Convert the execution to one performed synchronously on the IsolatedExecutor
- */
- <O> CallableNoExcept<O> sync(CallableNoExcept<O> call);
-
- /**
- * Convert the execution to one performed asynchronously on the IsolatedExecutor, returning a Future of the execution result
- */
- CallableNoExcept<Future<?>> async(Runnable run);
-
- /**
- * Convert the execution to one performed synchronously on the IsolatedExecutor
- */
- Runnable sync(Runnable run);
-
- /**
- * Convert the execution to one performed asynchronously on the IsolatedExecutor, returning a Future of the execution result
- */
- <I> Function<I, Future<?>> async(Consumer<I> consumer);
-
- /**
- * Convert the execution to one performed synchronously on the IsolatedExecutor
- */
- <I> Consumer<I> sync(Consumer<I> consumer);
-
- /**
- * Convert the execution to one performed asynchronously on the IsolatedExecutor, returning a Future of the execution result
- */
- <I1, I2> BiFunction<I1, I2, Future<?>> async(BiConsumer<I1, I2> consumer);
-
- /**
- * Convert the execution to one performed synchronously on the IsolatedExecutor
- */
- <I1, I2> BiConsumer<I1, I2> sync(BiConsumer<I1, I2> consumer);
-
- /**
- * Convert the execution to one performed asynchronously on the IsolatedExecutor, returning a Future of the execution result
- */
- <I, O> Function<I, Future<O>> async(Function<I, O> f);
-
- /**
- * Convert the execution to one performed synchronously on the IsolatedExecutor
- */
- <I, O> Function<I, O> sync(Function<I, O> f);
-
- /**
- * Convert the execution to one performed asynchronously on the IsolatedExecutor, returning a Future of the execution result
- */
- <I1, I2, O> BiFunction<I1, I2, Future<O>> async(BiFunction<I1, I2, O> f);
-
- /**
- * Convert the execution to one performed synchronously on the IsolatedExecutor
- */
- <I1, I2, O> BiFunction<I1, I2, O> sync(BiFunction<I1, I2, O> f);
-
- /**
- * Convert the execution to one performed asynchronously on the IsolatedExecutor, returning a Future of the execution result
- */
- <I1, I2, I3, O> TriFunction<I1, I2, I3, Future<O>> async(TriFunction<I1, I2, I3, O> f);
-
- /**
- * Convert the execution to one performed synchronously on the IsolatedExecutor
- */
- <I1, I2, I3, O> TriFunction<I1, I2, I3, O> sync(TriFunction<I1, I2, I3, O> f);
-
-}
diff --git a/test/distributed/org/apache/cassandra/distributed/api/IListen.java b/test/distributed/org/apache/cassandra/distributed/api/IListen.java
deleted file mode 100644
index c2e8dd6..0000000
--- a/test/distributed/org/apache/cassandra/distributed/api/IListen.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.distributed.api;
-
-public interface IListen
-{
- public interface Cancel { void cancel(); }
-
- Cancel schema(Runnable onChange);
-
- Cancel liveMembers(Runnable onChange);
-}
diff --git a/test/distributed/org/apache/cassandra/distributed/api/IMessage.java b/test/distributed/org/apache/cassandra/distributed/api/IMessage.java
deleted file mode 100644
index cd98543..0000000
--- a/test/distributed/org/apache/cassandra/distributed/api/IMessage.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.distributed.api;
-
-import java.io.Serializable;
-
-import org.apache.cassandra.locator.InetAddressAndPort;
-
-/**
- * A cross-version interface for delivering internode messages via message sinks.
- *
- * Message implementations should be serializable so we could load into instances.
- */
-public interface IMessage extends Serializable
-{
- int verb();
- byte[] bytes();
- int id();
- int version();
- InetAddressAndPort from();
-}
diff --git a/test/distributed/org/apache/cassandra/distributed/api/IMessageFilters.java b/test/distributed/org/apache/cassandra/distributed/api/IMessageFilters.java
deleted file mode 100644
index 01fe972..0000000
--- a/test/distributed/org/apache/cassandra/distributed/api/IMessageFilters.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.distributed.api;
-
-public interface IMessageFilters
-{
- public interface Filter
- {
- Filter off();
- Filter on();
- }
-
- public interface Builder
- {
- Builder from(int ... nums);
- Builder to(int ... nums);
-
- /**
- * Every message for which matcher returns `true` will be _dropped_ (assuming all
- * other matchers in the chain will return `true` as well).
- */
- Builder messagesMatching(Matcher filter);
- Filter drop();
- }
-
- public interface Matcher
- {
- boolean matches(int from, int to, IMessage message);
- }
-
- Builder verbs(int... verbs);
- Builder allVerbs();
- void reset();
-
- /**
- * {@code true} value returned by the implementation implies that the message was
- * not matched by any filters and therefore should be delivered.
- */
- boolean permit(int from, int to, IMessage msg);
-}
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index 474ade8..05c8af8 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@ -19,10 +19,7 @@
package org.apache.cassandra.distributed.impl;
import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
+import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -35,28 +32,34 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
-import java.util.stream.IntStream;
import java.util.stream.Stream;
import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.Feature;
import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Token;
import org.apache.cassandra.distributed.api.ICoordinator;
import org.apache.cassandra.distributed.api.IInstance;
import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.api.IIsolatedExecutor;
import org.apache.cassandra.distributed.api.IListen;
import org.apache.cassandra.distributed.api.IMessage;
import org.apache.cassandra.distributed.api.IMessageFilters;
+import org.apache.cassandra.distributed.api.IUpgradeableInstance;
+import org.apache.cassandra.distributed.api.NodeToolResult;
+import org.apache.cassandra.distributed.shared.InstanceClassLoader;
+import org.apache.cassandra.distributed.shared.MessageFilters;
+import org.apache.cassandra.distributed.shared.NetworkTopology;
+import org.apache.cassandra.distributed.shared.Versions;
import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.concurrent.SimpleCondition;
@@ -85,8 +88,10 @@ import org.apache.cassandra.utils.concurrent.SimpleCondition;
* handlers for internode to have more control over it. Messaging is wired by passing verbs manually.
* coordinator-handling code and hooks to the callbacks can be found in {@link Coordinator}.
*/
-public abstract class AbstractCluster<I extends IInstance> implements ICluster, AutoCloseable
+public abstract class AbstractCluster<I extends IInstance> implements ICluster<I>, AutoCloseable
{
+ public static Versions.Version CURRENT_VERSION = new Versions.Version(FBUtilities.getReleaseVersionString(), Versions.getClassPath());;
+
// WARNING: we have this logger not (necessarily) for logging, but
// to ensure we have instantiated the main classloader's LoggerFactory (and any LogbackStatusListener)
// before we instantiate any for a new instance
@@ -98,17 +103,18 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
// mutated by starting/stopping a node
private final List<I> instances;
- private final Map<InetAddressAndPort, I> instanceMap;
+ private final Map<InetSocketAddress, I> instanceMap;
private final Versions.Version initialVersion;
// mutated by user-facing API
private final MessageFilters filters;
+ private volatile Thread.UncaughtExceptionHandler previousHandler = null;
protected class Wrapper extends DelegatingInvokableInstance implements IUpgradeableInstance
{
private final int generation;
- private final InstanceConfig config;
+ private final IInstanceConfig config;
private volatile IInvokableInstance delegate;
private volatile Versions.Version version;
private volatile boolean isShutdown = true;
@@ -120,7 +126,7 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
return delegate;
}
- public Wrapper(int generation, Versions.Version version, InstanceConfig config)
+ public Wrapper(int generation, Versions.Version version, IInstanceConfig config)
{
this.generation = generation;
this.config = config;
@@ -131,9 +137,9 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
private IInvokableInstance newInstance(int generation)
{
- ClassLoader classLoader = new InstanceClassLoader(generation, config.num, version.classpath, sharedClassLoader);
- return Instance.transferAdhoc((SerializableBiFunction<IInstanceConfig, ClassLoader, Instance>) Instance::new, classLoader)
- .apply(config, classLoader);
+ ClassLoader classLoader = new InstanceClassLoader(generation, config.num(), version.classpath, sharedClassLoader);
+ return Instance.transferAdhoc((SerializableBiFunction<IInstanceConfig, ClassLoader, IInvokableInstance>)Instance::new, classLoader)
+ .apply(config, classLoader);
}
public IInstanceConfig config()
@@ -181,10 +187,18 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
throw new IllegalStateException("Cannot get live member count on shutdown instance");
}
- public int nodetool(String... commandAndArgs)
+ public NodeToolResult nodetoolResult(boolean withNotifications, String... commandAndArgs)
{
+ return delegate().nodetoolResult(withNotifications, commandAndArgs);
+ }
- return delegate().nodetool(commandAndArgs);
+ public long killAttempts()
+ {
+ IInvokableInstance local = delegate;
+ // if shutdown cleared the delegate, then no longer know how many kill attempts happened, so return -1
+ if (local == null)
+ return -1;
+ return local.killAttempts();
}
@Override
@@ -209,9 +223,18 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
delegate = null;
}
}
+
+ public void uncaughtException(Thread thread, Throwable throwable)
+ {
+ IInvokableInstance delegate = this.delegate;
+ if (delegate != null)
+ delegate.uncaughtException(thread, throwable);
+ else
+ logger.error("uncaught exception in thread {}", thread, throwable);
+ }
}
- protected AbstractCluster(File root, Versions.Version initialVersion, List<InstanceConfig> configs,
+ protected AbstractCluster(File root, Versions.Version initialVersion, List<IInstanceConfig> configs,
ClassLoader sharedClassLoader)
{
this.root = root;
@@ -221,27 +244,27 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
this.initialVersion = initialVersion;
int generation = AbstractCluster.generation.incrementAndGet();
- for (InstanceConfig config : configs)
+ for (IInstanceConfig config : configs)
{
I instance = newInstanceWrapperInternal(generation, initialVersion, config);
instances.add(instance);
// we use the config().broadcastAddressAndPort() here because we have not initialised the Instance
- I prev = instanceMap.put(instance.broadcastAddressAndPort(), instance);
+ I prev = instanceMap.put(instance.broadcastAddress(), instance);
if (null != prev)
- throw new IllegalStateException("Cluster cannot have multiple nodes with same InetAddressAndPort: " + instance.broadcastAddressAndPort() + " vs " + prev.broadcastAddressAndPort());
+ throw new IllegalStateException("Cluster cannot have multiple nodes with same InetAddressAndPort: " + instance.broadcastAddress() + " vs " + prev.broadcastAddress());
}
this.filters = new MessageFilters();
}
- protected abstract I newInstanceWrapper(int generation, Versions.Version version, InstanceConfig config);
+ protected abstract I newInstanceWrapper(int generation, Versions.Version version, IInstanceConfig config);
- protected I newInstanceWrapperInternal(int generation, Versions.Version version, InstanceConfig config)
+ protected I newInstanceWrapperInternal(int generation, Versions.Version version, IInstanceConfig config)
{
config.validate();
return newInstanceWrapper(generation, version, config);
}
- public I bootstrap(InstanceConfig config)
+ public I bootstrap(IInstanceConfig config)
{
if (!config.has(Feature.GOSSIP) || !config.has(Feature.NETWORK))
throw new IllegalStateException("New nodes can only be bootstrapped when gossip and networking is enabled.");
@@ -249,12 +272,13 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
I instance = newInstanceWrapperInternal(0, initialVersion, config);
instances.add(instance);
- I prev = instanceMap.put(config.broadcastAddressAndPort(), instance);
+
+ I prev = instanceMap.put(config.broadcastAddress(), instance);
if (null != prev)
{
throw new IllegalStateException(String.format("This cluster already contains a node (%d) with with same address and port: %s",
- config.num,
+ config.num(),
instance));
}
@@ -277,7 +301,7 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
return instances.get(node - 1);
}
- public I get(InetAddressAndPort addr)
+ public I get(InetSocketAddress addr)
{
return instanceMap.get(addr);
}
@@ -336,7 +360,7 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
return filters;
}
- public MessageFilters.Builder verbs(MessagingService.Verb... verbs)
+ public IMessageFilters.Builder verbs(MessagingService.Verb... verbs)
{
int[] ids = new int[verbs.length];
for (int i = 0; i < verbs.length; ++i)
@@ -364,6 +388,11 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
}).run();
}
+ public void schemaChange(String statement, int instance)
+ {
+ get(instance).schemaChangeInternal(statement);
+ }
+
private void updateMessagingVersions()
{
for (IInstance reportTo : instances)
@@ -377,7 +406,7 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
continue;
int minVersion = Math.min(reportFrom.getMessagingVersion(), reportTo.getMessagingVersion());
- reportTo.setMessagingVersion(reportFrom.broadcastAddressAndPort(), minVersion);
+ reportTo.setMessagingVersion(reportFrom.broadcastAddress(), minVersion);
}
}
}
@@ -497,13 +526,10 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
}
}
- public void schemaChange(String statement, int instance)
- {
- get(instance).schemaChangeInternal(statement);
- }
-
- void startup()
+ public void startup()
{
+ previousHandler = Thread.getDefaultUncaughtExceptionHandler();
+ Thread.setDefaultUncaughtExceptionHandler(this::uncaughtExceptions);
try (AllMembersAliveMonitor monitor = new AllMembersAliveMonitor())
{
// Start any instances with auto_bootstrap enabled first, and in series to avoid issues
@@ -527,255 +553,17 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
}
}
- protected interface Factory<I extends IInstance, C extends AbstractCluster<I>>
- {
- C newCluster(File root, Versions.Version version, List<InstanceConfig> configs, ClassLoader sharedClassLoader);
- }
-
- public static class Builder<I extends IInstance, C extends AbstractCluster<I>>
- {
- private final Factory<I, C> factory;
- private int nodeCount;
- private int subnet;
- private Map<Integer, NetworkTopology.DcAndRack> nodeIdTopology;
- private TokenSupplier tokenSupplier;
- private File root;
- private Versions.Version version = Versions.CURRENT;
- private Consumer<InstanceConfig> configUpdater;
-
- public Builder(Factory<I, C> factory)
- {
- this.factory = factory;
- }
-
- public Builder<I, C> withTokenSupplier(TokenSupplier tokenSupplier)
- {
- this.tokenSupplier = tokenSupplier;
- return this;
- }
-
- public Builder<I, C> withSubnet(int subnet)
- {
- this.subnet = subnet;
- return this;
- }
-
- public Builder<I, C> withNodes(int nodeCount)
- {
- this.nodeCount = nodeCount;
- return this;
- }
-
- public Builder<I, C> withDCs(int dcCount)
- {
- return withRacks(dcCount, 1);
- }
-
- public Builder<I, C> withRacks(int dcCount, int racksPerDC)
- {
- if (nodeCount == 0)
- throw new IllegalStateException("Node count will be calculated. Do not supply total node count in the builder");
-
- int totalRacks = dcCount * racksPerDC;
- int nodesPerRack = (nodeCount + totalRacks - 1) / totalRacks; // round up to next integer
- return withRacks(dcCount, racksPerDC, nodesPerRack);
- }
-
- public Builder<I, C> withRacks(int dcCount, int racksPerDC, int nodesPerRack)
- {
- if (nodeIdTopology != null)
- throw new IllegalStateException("Network topology already created. Call withDCs/withRacks once or before withDC/withRack calls");
-
- nodeIdTopology = new HashMap<>();
- int nodeId = 1;
- for (int dc = 1; dc <= dcCount; dc++)
- {
- for (int rack = 1; rack <= racksPerDC; rack++)
- {
- for (int rackNodeIdx = 0; rackNodeIdx < nodesPerRack; rackNodeIdx++)
- nodeIdTopology.put(nodeId++, NetworkTopology.dcAndRack(dcName(dc), rackName(rack)));
- }
- }
- // adjust the node count to match the allocatation
- final int adjustedNodeCount = dcCount * racksPerDC * nodesPerRack;
- if (adjustedNodeCount != nodeCount)
- {
- assert adjustedNodeCount > nodeCount : "withRacks should only ever increase the node count";
- logger.info("Network topology of {} DCs with {} racks per DC and {} nodes per rack required increasing total nodes to {}",
- dcCount, racksPerDC, nodesPerRack, adjustedNodeCount);
- nodeCount = adjustedNodeCount;
- }
- return this;
- }
-
- public Builder<I, C> withDC(String dcName, int nodeCount)
- {
- return withRack(dcName, rackName(1), nodeCount);
- }
-
- public Builder<I, C> withRack(String dcName, String rackName, int nodesInRack)
- {
- if (nodeIdTopology == null)
- {
- if (nodeCount > 0)
- throw new IllegalStateException("Node count must not be explicitly set, or allocated using withDCs/withRacks");
-
- nodeIdTopology = new HashMap<>();
- }
- for (int nodeId = nodeCount + 1; nodeId <= nodeCount + nodesInRack; nodeId++)
- nodeIdTopology.put(nodeId, NetworkTopology.dcAndRack(dcName, rackName));
-
- nodeCount += nodesInRack;
- return this;
- }
-
- // Map of node ids to dc and rack - must be contiguous with an entry nodeId 1 to nodeCount
- public Builder<I, C> withNodeIdTopology(Map<Integer, NetworkTopology.DcAndRack> nodeIdTopology)
- {
- if (nodeIdTopology.isEmpty())
- throw new IllegalStateException("Topology is empty. It must have an entry for every nodeId.");
-
- IntStream.rangeClosed(1, nodeIdTopology.size()).forEach(nodeId -> {
- if (nodeIdTopology.get(nodeId) == null)
- throw new IllegalStateException("Topology is missing entry for nodeId " + nodeId);
- });
-
- if (nodeCount != nodeIdTopology.size())
- {
- nodeCount = nodeIdTopology.size();
- logger.info("Adjusting node count to {} for supplied network topology", nodeCount);
- }
-
- this.nodeIdTopology = new HashMap<>(nodeIdTopology);
-
- return this;
- }
-
- public Builder<I, C> withRoot(File root)
- {
- this.root = root;
- return this;
- }
-
- public Builder<I, C> withVersion(Versions.Version version)
- {
- this.version = version;
- return this;
- }
-
- public Builder<I, C> withConfig(Consumer<InstanceConfig> updater)
- {
- this.configUpdater = updater;
- return this;
- }
-
- public C createWithoutStarting() throws IOException
- {
- if (root == null)
- root = Files.createTempDirectory("dtests").toFile();
-
- if (nodeCount <= 0)
- throw new IllegalStateException("Cluster must have at least one node");
-
- if (nodeIdTopology == null)
- {
- nodeIdTopology = IntStream.rangeClosed(1, nodeCount).boxed()
- .collect(Collectors.toMap(nodeId -> nodeId,
- nodeId -> NetworkTopology.dcAndRack(dcName(0), rackName(0))));
- }
-
- root.mkdirs();
- setupLogging(root);
-
- ClassLoader sharedClassLoader = Thread.currentThread().getContextClassLoader();
-
- List<InstanceConfig> configs = new ArrayList<>();
-
- if (tokenSupplier == null)
- tokenSupplier = evenlyDistributedTokens(nodeCount);
-
- for (int i = 0; i < nodeCount; ++i)
- {
- int nodeNum = i + 1;
- configs.add(createInstanceConfig(nodeNum));
- }
-
- return factory.newCluster(root, version, configs, sharedClassLoader);
- }
-
- public InstanceConfig newInstanceConfig(C cluster)
- {
- return createInstanceConfig(cluster.size() + 1);
- }
-
- private InstanceConfig createInstanceConfig(int nodeNum)
- {
- String ipPrefix = "127.0." + subnet + ".";
- String seedIp = ipPrefix + "1";
- String ipAddress = ipPrefix + nodeNum;
- long token = tokenSupplier.token(nodeNum);
-
- NetworkTopology topology = NetworkTopology.build(ipPrefix, 7012, nodeIdTopology);
-
- InstanceConfig config = InstanceConfig.generate(nodeNum, ipAddress, topology, root, String.valueOf(token), seedIp);
- if (configUpdater != null)
- configUpdater.accept(config);
-
- return config;
- }
-
- public C start() throws IOException
- {
- C cluster = createWithoutStarting();
- cluster.startup();
- return cluster;
- }
- }
-
- public static TokenSupplier evenlyDistributedTokens(int numNodes)
- {
- long increment = (Long.MAX_VALUE / numNodes) * 2;
- return (int nodeId) -> {
- assert nodeId <= numNodes : String.format("Can not allocate a token for a node %s, since only %s nodes are allowed by the token allocation strategy",
- nodeId, numNodes);
- return Long.MIN_VALUE + 1 + nodeId * increment;
- };
- }
-
- public static interface TokenSupplier
- {
- public long token(int nodeId);
- }
-
- static String dcName(int index)
- {
- return "datacenter" + index;
- }
-
- static String rackName(int index)
+ private void uncaughtExceptions(Thread thread, Throwable error)
{
- return "rack" + index;
- }
-
- private static void setupLogging(File root)
- {
- try
- {
- String testConfPath = "test/conf/logback-dtest.xml";
- Path logConfPath = Paths.get(root.getPath(), "/logback-dtest.xml");
-
- if (!logConfPath.toFile().exists())
- {
- Files.copy(new File(testConfPath).toPath(),
- logConfPath);
- }
-
- System.setProperty("logback.configurationFile", "file://" + logConfPath);
- }
- catch (IOException e)
+ if (!(thread.getContextClassLoader() instanceof InstanceClassLoader))
{
- throw new RuntimeException(e);
+ Thread.UncaughtExceptionHandler handler = previousHandler;
+ if (null != handler)
+ handler.uncaughtException(thread, error);
+ return;
}
+ InstanceClassLoader cl = (InstanceClassLoader) thread.getContextClassLoader();
+ get(cl.getInstanceId()).uncaughtException(thread, error);
}
@Override
@@ -790,7 +578,10 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
instances.clear();
instanceMap.clear();
// Make sure to only delete directory when threads are stopped
- FileUtils.deleteRecursive(root);
+ if (root.exists())
+ FileUtils.deleteRecursive(root);
+ Thread.setDefaultUncaughtExceptionHandler(previousHandler);
+ previousHandler = null;
//withThreadLeakCheck(futures);
}
@@ -813,5 +604,23 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
}
}
+ public List<Token> tokens()
+ {
+ return stream()
+ .map(i ->
+ {
+ try
+ {
+ IPartitioner partitioner = ((IPartitioner)Class.forName(i.config().getString("partitioner")).newInstance());
+ return partitioner.getTokenFactory().fromString(i.config().getString("initial_token"));
+ }
+ catch (Throwable t)
+ {
+ throw new RuntimeException(t);
+ }
+ })
+ .collect(Collectors.toList());
+ }
+
}
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java b/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
index cca3643..91a2aaf 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.distributed.impl;
+import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
@@ -30,8 +31,10 @@ import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.cql3.statements.SelectStatement;
-import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.ICoordinator;
+import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.distributed.api.QueryResult;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.service.pager.Pageable;
@@ -41,6 +44,7 @@ import org.apache.cassandra.transport.Server;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
public class Coordinator implements ICoordinator
{
@@ -51,18 +55,18 @@ public class Coordinator implements ICoordinator
}
@Override
- public Object[][] execute(String query, Enum<?> consistencyLevelOrigin, Object... boundValues)
+ public QueryResult executeWithResult(String query, ConsistencyLevel consistencyLevel, Object... boundValues)
{
- return instance.sync(() -> executeInternal(query, consistencyLevelOrigin, boundValues)).call();
+ return instance().sync(() -> executeInternal(query, consistencyLevel, boundValues)).call();
}
- public Future<Object[][]> asyncExecuteWithTracing(UUID sessionId, String query, Enum<?> consistencyLevelOrigin, Object... boundValues)
+ public Future<Object[][]> asyncExecuteWithTracing(UUID sessionId, String query, ConsistencyLevel consistencyLevelOrigin, Object... boundValues)
{
return instance.async(() -> {
try
{
Tracing.instance.newSession(sessionId);
- return executeInternal(query, consistencyLevelOrigin, boundValues);
+ return executeInternal(query, consistencyLevelOrigin, boundValues).toObjectArrays();
}
finally
{
@@ -71,7 +75,12 @@ public class Coordinator implements ICoordinator
}).call();
}
- private Object[][] executeInternal(String query, Enum<?> consistencyLevelOrigin, Object[] boundValues)
+ protected org.apache.cassandra.db.ConsistencyLevel toCassandraCL(ConsistencyLevel cl)
+ {
+ return org.apache.cassandra.db.ConsistencyLevel.fromCode(cl.ordinal());
+ }
+
+ private QueryResult executeInternal(String query, ConsistencyLevel consistencyLevelOrigin, Object[] boundValues)
{
ClientState clientState = ClientState.forInternalCalls();
CQLStatement prepared = QueryProcessor.getStatement(query, clientState).statement;
@@ -82,7 +91,7 @@ public class Coordinator implements ICoordinator
prepared.validate(QueryState.forInternalCalls().getClientState());
ResultMessage res = prepared.execute(QueryState.forInternalCalls(),
- QueryOptions.create(consistencyLevel,
+ QueryOptions.create(toCassandraCL(consistencyLevel),
boundBBValues,
false,
Integer.MAX_VALUE,
@@ -91,18 +100,30 @@ public class Coordinator implements ICoordinator
Server.CURRENT_VERSION));
if (res != null && res.kind == ResultMessage.Kind.ROWS)
- return RowUtil.toObjects((ResultMessage.Rows) res);
+ {
+ ResultMessage.Rows rows = (ResultMessage.Rows) res;
+ String[] names = rows.result.metadata.names.stream().map(c -> c.name.toString()).toArray(String[]::new);
+ Object[][] results = RowUtil.toObjects(rows);
+ return new QueryResult(names, results);
+ }
else
- return new Object[][]{};
+ {
+ return QueryResult.EMPTY;
+ }
}
- public Object[][] executeWithTracing(UUID sessionId, String query, Enum<?> consistencyLevelOrigin, Object... boundValues)
+ public Object[][] executeWithTracing(UUID sessionId, String query, ConsistencyLevel consistencyLevelOrigin, Object... boundValues)
{
return IsolatedExecutor.waitOn(asyncExecuteWithTracing(sessionId, query, consistencyLevelOrigin, boundValues));
}
+ public IInstance instance()
+ {
+ return instance;
+ }
+
@Override
- public Iterator<Object[]> executeWithPaging(String query, Enum<?> consistencyLevelOrigin, int pageSize, Object... boundValues)
+ public Iterator<Object[]> executeWithPaging(String query, ConsistencyLevel consistencyLevelOrigin, int pageSize, Object... boundValues)
{
if (pageSize <= 0)
throw new IllegalArgumentException("Page size should be strictly positive but was " + pageSize);
@@ -121,7 +142,7 @@ public class Coordinator implements ICoordinator
ClientState clientState = QueryState.forInternalCalls().getClientState();
SelectStatement selectStatement = (SelectStatement) prepared;
- QueryOptions queryOptions = QueryOptions.create(consistencyLevel,
+ QueryOptions queryOptions = QueryOptions.create(toCassandraCL(consistencyLevel),
boundBBValues,
false,
pageSize,
@@ -132,7 +153,7 @@ public class Coordinator implements ICoordinator
// Usually pager fetches a single page (see SelectStatement#execute). We need to iterate over all
// of the results lazily.
- QueryPager pager = QueryPagers.pager(pageable, consistencyLevel, clientState, null);
+ QueryPager pager = QueryPagers.pager(pageable, toCassandraCL(consistencyLevel), clientState, null);
Iterator<Object[]> iter = RowUtil.toObjects(selectStatement.getResultMetadata().names,
UntypedResultSet.create(selectStatement,
pager,
@@ -152,4 +173,9 @@ public class Coordinator implements ICoordinator
};
}).call();
}
+
+ private static ClientState makeFakeClientState()
+ {
+ return ClientState.forExternalCalls(new InetSocketAddress(FBUtilities.getLocalAddress(), 9042));
+ }
}
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/DelegatingInvokableInstance.java b/test/distributed/org/apache/cassandra/distributed/impl/DelegatingInvokableInstance.java
index 7294944..2f7a043 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/DelegatingInvokableInstance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/DelegatingInvokableInstance.java
@@ -19,6 +19,7 @@
package org.apache.cassandra.distributed.impl;
import java.io.Serializable;
+import java.net.InetSocketAddress;
import java.util.UUID;
import java.util.concurrent.Future;
import java.util.function.BiConsumer;
@@ -29,9 +30,10 @@ import java.util.function.Function;
import org.apache.cassandra.distributed.api.ICluster;
import org.apache.cassandra.distributed.api.ICoordinator;
import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.api.IListen;
import org.apache.cassandra.distributed.api.IMessage;
-import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.distributed.shared.NetworkTopology;
public abstract class DelegatingInvokableInstance implements IInvokableInstance
{
@@ -44,9 +46,9 @@ public abstract class DelegatingInvokableInstance implements IInvokableInstance
}
@Override
- public InetAddressAndPort broadcastAddressAndPort()
+ public InetSocketAddress broadcastAddress()
{
- return delegate().broadcastAddressAndPort();
+ return delegate().broadcastAddress();
}
@Override
@@ -80,7 +82,7 @@ public abstract class DelegatingInvokableInstance implements IInvokableInstance
}
@Override
- public void setMessagingVersion(InetAddressAndPort endpoint, int version)
+ public void setMessagingVersion(InetSocketAddress endpoint, int version)
{
delegate().setMessagingVersion(endpoint, version);
}
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/DistributedTestSnitch.java b/test/distributed/org/apache/cassandra/distributed/impl/DistributedTestSnitch.java
index f8f157a..8652bbc 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/DistributedTestSnitch.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/DistributedTestSnitch.java
@@ -19,11 +19,14 @@
package org.apache.cassandra.distributed.impl;
import java.net.InetAddress;
+import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.distributed.shared.NetworkTopology;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.Gossiper;
@@ -35,6 +38,30 @@ import org.apache.cassandra.utils.FBUtilities;
public class DistributedTestSnitch extends AbstractNetworkTopologySnitch
{
private static NetworkTopology mapping = null;
+ private static final Map<InetAddressAndPort, InetSocketAddress> cache = new ConcurrentHashMap<>();
+ private static final Map<InetSocketAddress, InetAddressAndPort> cacheInverse = new ConcurrentHashMap<>();
+
+ static InetAddressAndPort toCassandraInetAddressAndPort(InetSocketAddress addressAndPort)
+ {
+ InetAddressAndPort m = cacheInverse.get(addressAndPort);
+ if (m == null)
+ {
+ m = InetAddressAndPort.getByAddressOverrideDefaults(addressAndPort.getAddress(), addressAndPort.getPort());
+ cache.put(m, addressAndPort);
+ }
+ return m;
+ }
+
+ static InetSocketAddress fromCassandraInetAddressAndPort(InetAddressAndPort addressAndPort)
+ {
+ InetSocketAddress m = cache.get(addressAndPort);
+ if (m == null)
+ {
+ m = NetworkTopology.addressAndPort(addressAndPort.address, addressAndPort.port);
+ cache.put(addressAndPort, m);
+ }
+ return m;
+ }
private Map<InetAddressAndPort, Map<String, String>> savedEndpoints;
private static final String DEFAULT_DC = "UNKNOWN_DC";
@@ -49,7 +76,7 @@ public class DistributedTestSnitch extends AbstractNetworkTopologySnitch
public String getRack(InetAddressAndPort endpoint)
{
assert mapping != null : "network topology must be assigned before using snitch";
- return maybeGetFromEndpointState(mapping.localRack(endpoint), endpoint, ApplicationState.RACK, DEFAULT_RACK);
+ return maybeGetFromEndpointState(mapping.localRack(fromCassandraInetAddressAndPort(endpoint)), endpoint, ApplicationState.RACK, DEFAULT_RACK);
}
public String getDatacenter(InetAddress endpoint)
@@ -61,7 +88,7 @@ public class DistributedTestSnitch extends AbstractNetworkTopologySnitch
public String getDatacenter(InetAddressAndPort endpoint)
{
assert mapping != null : "network topology must be assigned before using snitch";
- return maybeGetFromEndpointState(mapping.localDC(endpoint), endpoint, ApplicationState.DC, DEFAULT_DC);
+ return maybeGetFromEndpointState(mapping.localDC(fromCassandraInetAddressAndPort(endpoint)), endpoint, ApplicationState.DC, DEFAULT_DC);
}
// Here, the logic is slightly different from what we have in GossipingPropertyFileSnitch since we have a different
@@ -84,7 +111,6 @@ public class DistributedTestSnitch extends AbstractNetworkTopologySnitch
entry.getValue());
}
}
-
if (savedEndpoints.containsKey(endpoint))
return savedEndpoints.get(endpoint).get("data_center");
@@ -103,6 +129,7 @@ public class DistributedTestSnitch extends AbstractNetworkTopologySnitch
{
super.gossiperStarting();
+
Gossiper.instance.addLocalApplicationState(ApplicationState.INTERNAL_IP,
StorageService.instance.valueFactory.internalIP(FBUtilities.getLocalAddress().getHostAddress()));
}
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/IInvokableInstance.java b/test/distributed/org/apache/cassandra/distributed/impl/IInvokableInstance.java
deleted file mode 100644
index d0f8a5f..0000000
--- a/test/distributed/org/apache/cassandra/distributed/impl/IInvokableInstance.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.distributed.impl;
-
-import java.io.Serializable;
-import java.util.concurrent.Future;
-import java.util.function.BiConsumer;
-import java.util.function.BiFunction;
-import java.util.function.Consumer;
-import java.util.function.Function;
-
-import org.apache.cassandra.distributed.api.IInstance;
-
-/**
- * This version is only supported for a Cluster running the same code as the test environment, and permits
- * ergonomic cross-node behaviours, without editing the cross-version API.
- *
- * A lambda can be written tto be invoked on any or all of the nodes.
- *
- * The reason this cannot (easily) be made cross-version is that the lambda is tied to the declaring class, which will
- * not be the same in the alternate version. Even were it not, there would likely be a runtime linkage error given
- * any code divergence.
- */
-public interface IInvokableInstance extends IInstance
-{
- public default <O> CallableNoExcept<Future<O>> asyncCallsOnInstance(SerializableCallable<O> call) { return async(transfer(call)); }
- public default <O> CallableNoExcept<O> callsOnInstance(SerializableCallable<O> call) { return sync(transfer(call)); }
- public default <O> O callOnInstance(SerializableCallable<O> call) { return callsOnInstance(call).call(); }
-
- public default CallableNoExcept<Future<?>> asyncRunsOnInstance(SerializableRunnable run) { return async(transfer(run)); }
- public default Runnable runsOnInstance(SerializableRunnable run) { return sync(transfer(run)); }
- public default void runOnInstance(SerializableRunnable run) { runsOnInstance(run).run(); }
-
- public default <I> Function<I, Future<?>> asyncAcceptsOnInstance(SerializableConsumer<I> consumer) { return async(transfer(consumer)); }
- public default <I> Consumer<I> acceptsOnInstance(SerializableConsumer<I> consumer) { return sync(transfer(consumer)); }
-
- public default <I1, I2> BiFunction<I1, I2, Future<?>> asyncAcceptsOnInstance(SerializableBiConsumer<I1, I2> consumer) { return async(transfer(consumer)); }
- public default <I1, I2> BiConsumer<I1, I2> acceptsOnInstance(SerializableBiConsumer<I1, I2> consumer) { return sync(transfer(consumer)); }
-
- public default <I, O> Function<I, Future<O>> asyncAppliesOnInstance(SerializableFunction<I, O> f) { return async(transfer(f)); }
- public default <I, O> Function<I, O> appliesOnInstance(SerializableFunction<I, O> f) { return sync(transfer(f)); }
-
- public default <I1, I2, O> BiFunction<I1, I2, Future<O>> asyncAppliesOnInstance(SerializableBiFunction<I1, I2, O> f) { return async(transfer(f)); }
- public default <I1, I2, O> BiFunction<I1, I2, O> appliesOnInstance(SerializableBiFunction<I1, I2, O> f) { return sync(transfer(f)); }
-
- public default <I1, I2, I3, O> TriFunction<I1, I2, I3, Future<O>> asyncAppliesOnInstance(SerializableTriFunction<I1, I2, I3, O> f) { return async(transfer(f)); }
- public default <I1, I2, I3, O> TriFunction<I1, I2, I3, O> appliesOnInstance(SerializableTriFunction<I1, I2, I3, O> f) { return sync(transfer(f)); }
-
- public <E extends Serializable> E transfer(E object);
-
-}
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/IUpgradeableInstance.java b/test/distributed/org/apache/cassandra/distributed/impl/IUpgradeableInstance.java
index 3eb3657..d42e799 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/IUpgradeableInstance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/IUpgradeableInstance.java
@@ -19,6 +19,7 @@
package org.apache.cassandra.distributed.impl;
import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.distributed.shared.Versions;
// this lives outside the api package so that we do not have to worry about inter-version compatibility
public interface IUpgradeableInstance extends IInstance
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index 0647198..1c19bca 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -23,6 +23,7 @@ import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
+import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
@@ -31,11 +32,17 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.Function;
+import javax.management.ListenerNotFoundException;
+import javax.management.Notification;
+import javax.management.NotificationListener;
+
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.concurrent.SharedExecutorPool;
import org.apache.cassandra.concurrent.StageManager;
@@ -60,16 +67,21 @@ import org.apache.cassandra.distributed.api.ICluster;
import org.apache.cassandra.distributed.api.ICoordinator;
import org.apache.cassandra.distributed.api.IInstance;
import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.api.IListen;
import org.apache.cassandra.distributed.api.IMessage;
+import org.apache.cassandra.distributed.api.NodeToolResult;
+import org.apache.cassandra.distributed.mock.nodetool.InternalNodeProbe;
import org.apache.cassandra.distributed.mock.nodetool.InternalNodeProbeFactory;
+import org.apache.cassandra.distributed.shared.NetworkTopology;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.VersionedValue;
+import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.sstable.IndexSummaryManager;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.CompactEndpointSerializationHelper;
import org.apache.cassandra.net.IMessageSink;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
@@ -79,18 +91,19 @@ import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.PendingRangeCalculatorService;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.StorageServiceMBean;
import org.apache.cassandra.streaming.StreamCoordinator;
import org.apache.cassandra.tools.NodeTool;
import org.apache.cassandra.tracing.TraceState;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.transport.messages.ResultMessage;
+import org.apache.cassandra.utils.ExecutorUtils;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.NanoTimeToCurrentTimeMillis;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.Throwables;
import org.apache.cassandra.utils.UUIDGen;
import org.apache.cassandra.utils.concurrent.Ref;
-import org.w3c.dom.events.UIEvent;
import static java.util.concurrent.TimeUnit.MINUTES;
import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
@@ -99,6 +112,13 @@ import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
public class Instance extends IsolatedExecutor implements IInvokableInstance
{
+ private static final Map<Class<?>, Function<Object, Object>> mapper = new HashMap<Class<?>, Function<Object, Object>>() {{
+ this.put(IInstanceConfig.ParameterizedClass.class, (obj) -> {
+ IInstanceConfig.ParameterizedClass pc = (IInstanceConfig.ParameterizedClass) obj;
+ return new org.apache.cassandra.config.ParameterizedClass(pc.class_name, pc.parameters);
+ });
+ }};
+
public final IInstanceConfig config;
// should never be invoked directly, so that it is instantiated on other class loader;
@@ -108,18 +128,21 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
super("node" + config.num(), classLoader);
this.config = config;
InstanceIDDefiner.setInstanceId(config.num());
- FBUtilities.setBroadcastInetAddress(config.broadcastAddressAndPort().address);
+ FBUtilities.setBroadcastInetAddress(config.broadcastAddress().getAddress());
+
// Set the config at instance creation, possibly before startup() has run on all other instances.
// setMessagingVersions below will call runOnInstance which will instantiate
// the MessagingService and dependencies preventing later changes to network parameters.
Config.setOverrideLoadConfig(() -> loadConfig(config));
}
+ @Override
public IInstanceConfig config()
{
return config;
}
+ @Override
public ICoordinator coordinator()
{
return new Coordinator(this);
@@ -131,8 +154,9 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
}
@Override
- public InetAddressAndPort broadcastAddressAndPort() { return config.broadcastAddressAndPort(); }
+ public InetSocketAddress broadcastAddress() { return config.broadcastAddress(); }
+ @Override
public Object[][] executeInternal(String query, Object... args)
{
return sync(() -> {
@@ -162,7 +186,7 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
public boolean isShutdown()
{
- throw new UnsupportedOperationException();
+ return isolatedExecutor.isShutdown();
}
@Override
@@ -188,32 +212,34 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
}).run();
}
- private void registerMockMessaging(ICluster cluster)
+ private void registerMockMessaging(ICluster<IInstance> cluster)
{
- BiConsumer<InetAddressAndPort, IMessage> deliverToInstance = (to, message) -> cluster.get(to).receiveMessage(message);
- BiConsumer<InetAddressAndPort, IMessage> deliverToInstanceIfNotFiltered = (to, message) -> {
+ BiConsumer<InetSocketAddress, IMessage> deliverToInstance = (to, message) -> cluster.get(to).receiveMessage(message);
+ BiConsumer<InetSocketAddress, IMessage> deliverToInstanceIfNotFiltered = (to, message) -> {
int fromNum = config().num();
int toNum = cluster.get(to).config().num();
- if (cluster.filters().permit(fromNum, toNum, message))
+
+ if (cluster.filters().permitOutbound(fromNum, toNum, message)
+ && cluster.filters().permitInbound(fromNum, toNum, message))
deliverToInstance.accept(to, message);
};
- Map<InetAddress, InetAddressAndPort> addressAndPortMap = new HashMap<>();
+ Map<InetAddress, InetSocketAddress> addressAndPortMap = new HashMap<>();
cluster.stream().forEach(instance -> {
- InetAddressAndPort addressAndPort = instance.broadcastAddressAndPort();
- if (!addressAndPort.equals(instance.config().broadcastAddressAndPort()))
- throw new IllegalStateException("addressAndPort mismatch: " + addressAndPort + " vs " + instance.config().broadcastAddressAndPort());
- InetAddressAndPort prev = addressAndPortMap.put(addressAndPort.address, addressAndPort);
+ InetSocketAddress addressAndPort = instance.broadcastAddress();
+ if (!addressAndPort.equals(instance.config().broadcastAddress()))
+ throw new IllegalStateException("addressAndPort mismatch: " + addressAndPort + " vs " + instance.config().broadcastAddress());
+ InetSocketAddress prev = addressAndPortMap.put(addressAndPort.getAddress(),
+ addressAndPort);
if (null != prev)
throw new IllegalStateException("This version of Cassandra does not support multiple nodes with the same InetAddress: " + addressAndPort + " vs " + prev);
});
- MessagingService.instance().addMessageSink(
- new MessageDeliverySink(deliverToInstanceIfNotFiltered, addressAndPortMap::get));
+ MessagingService.instance().addMessageSink(new MessageDeliverySink(deliverToInstanceIfNotFiltered, addressAndPortMap::get));
}
// unnecessary if registerMockMessaging used
- private void registerFilter(ICluster cluster)
+ private void registerFilters(ICluster cluster)
{
IInstance instance = this;
MessagingService.instance().addMessageSink(new IMessageSink()
@@ -221,31 +247,93 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress toAddress)
{
// Port is not passed in, so take a best guess at the destination port from this instance
- IInstance to = cluster.get(InetAddressAndPort.getByAddressOverrideDefaults(toAddress, instance.config().broadcastAddressAndPort().port));
+ IInstance to = cluster.get(NetworkTopology.addressAndPort(toAddress,
+ instance.config().broadcastAddress().getPort()));
int fromNum = config().num();
int toNum = to.config().num();
- return cluster.filters().permit(fromNum, toNum, serializeMessage(message, id, broadcastAddressAndPort(), to.broadcastAddressAndPort()));
+ return cluster.filters().permitOutbound(fromNum, toNum, serializeMessage(message, id,
+ broadcastAddress(),
+ to.broadcastAddress()));
}
public boolean allowIncomingMessage(MessageIn message, int id)
{
- return true;
+ // Port is not passed in, so take a best guess at the destination port from this instance
+ IInstance from = cluster.get(NetworkTopology.addressAndPort(message.from,
+ instance.config().broadcastAddress().getPort()));
+ int fromNum = from.config().num();
+ int toNum = config().num();
+
+
+ IMessage msg = serializeMessage(message, id, from.broadcastAddress(), broadcastAddress());
+
+ return cluster.filters().permitInbound(fromNum, toNum, msg);
}
});
}
- public static IMessage serializeMessage(MessageOut messageOut, int id, InetAddressAndPort from, InetAddressAndPort to)
+ public static IMessage serializeMessage(MessageOut messageOut, int id, InetSocketAddress from, InetSocketAddress to)
{
try (DataOutputBuffer out = new DataOutputBuffer(1024))
{
- int version = MessagingService.instance().getVersion(to.address);
+ int version = MessagingService.instance().getVersion(to.getAddress());
out.writeInt(MessagingService.PROTOCOL_MAGIC);
out.writeInt(id);
long timestamp = System.currentTimeMillis();
out.writeInt((int) timestamp);
messageOut.serialize(out, version);
- return new Message(messageOut.verb.ordinal(), out.toByteArray(), id, version, from);
+ return new MessageImpl(messageOut.verb.ordinal(), out.toByteArray(), id, version, from);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static IMessage serializeMessage(MessageIn<?> messageIn, int id, InetSocketAddress from, InetSocketAddress to)
+ {
+ try (DataOutputBuffer out = new DataOutputBuffer(1024))
+ {
+ // Serialize header
+ int version = MessagingService.instance().getVersion(to.getAddress());
+
+ out.writeInt(MessagingService.PROTOCOL_MAGIC);
+ out.writeInt(id);
+ long timestamp = System.currentTimeMillis();
+ out.writeInt((int) timestamp);
+
+ // Serialize the message itself
+ IVersionedSerializer serializer = MessagingService.instance().verbSerializers.get(messageIn.verb);
+ CompactEndpointSerializationHelper.serialize(from.getAddress(), out);
+
+ out.writeInt(messageIn.verb.ordinal());
+ out.writeInt(messageIn.parameters.size());
+ for (Map.Entry<String, byte[]> entry : messageIn.parameters.entrySet())
+ {
+ out.writeUTF(entry.getKey());
+ out.writeInt(entry.getValue().length);
+ out.write(entry.getValue());
+ }
+
+ if (messageIn.payload != null && serializer != MessagingService.CallbackDeterminedSerializer.instance)
+ {
+ try (DataOutputBuffer dob = new DataOutputBuffer())
+ {
+ serializer.serialize(messageIn.payload, dob, version);
+
+ int size = dob.getLength();
+ out.writeInt(size);
+ out.write(dob.getData(), 0, size);
+ }
+ }
+ else
+ {
+ out.writeInt(0);
+ }
+
+
+ return new MessageImpl(messageIn.verb.ordinal(), out.toByteArray(), id, version, from);
}
catch (IOException e)
{
@@ -255,9 +343,11 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
private class MessageDeliverySink implements IMessageSink
{
- private final BiConsumer<InetAddressAndPort, IMessage> deliver;
- private final Function<InetAddress, InetAddressAndPort> lookupAddressAndPort;
- MessageDeliverySink(BiConsumer<InetAddressAndPort, IMessage> deliver, Function<InetAddress, InetAddressAndPort> lookupAddressAndPort)
+ private final BiConsumer<InetSocketAddress, IMessage> deliver;
+ private final Function<InetAddress, InetSocketAddress> lookupAddressAndPort;
+
+ MessageDeliverySink(BiConsumer<InetSocketAddress, IMessage> deliver,
+ Function<InetAddress, InetSocketAddress> lookupAddressAndPort)
{
this.deliver = deliver;
this.lookupAddressAndPort = lookupAddressAndPort;
@@ -265,35 +355,35 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
public boolean allowOutgoingMessage(MessageOut messageOut, int id, InetAddress to)
{
- InetAddressAndPort from = broadcastAddressAndPort();
- InetAddressAndPort toFull = lookupAddressAndPort.apply(to);
+ InetSocketAddress from = broadcastAddress();
assert from.equals(lookupAddressAndPort.apply(messageOut.from));
- IMessage serialized = serializeMessage(messageOut, id, broadcastAddressAndPort(), lookupAddressAndPort.apply(messageOut.from));
-
// Tracing logic - similar to org.apache.cassandra.net.OutboundTcpConnection.writeConnected
byte[] sessionBytes = (byte[]) messageOut.parameters.get(Tracing.TRACE_HEADER);
if (sessionBytes != null)
{
UUID sessionId = UUIDGen.getUUID(ByteBuffer.wrap(sessionBytes));
TraceState state = Tracing.instance.get(sessionId);
- String message = String.format("Sending %s message to %s", messageOut.verb, to);
+ String traceMessage = String.format("Sending %s message to %s", messageOut.verb, to);
// session may have already finished; see CASSANDRA-5668
if (state == null)
{
byte[] traceTypeBytes = (byte[]) messageOut.parameters.get(Tracing.TRACE_TYPE);
Tracing.TraceType traceType = traceTypeBytes == null ? Tracing.TraceType.QUERY : Tracing.TraceType.deserialize(traceTypeBytes[0]);
- TraceState.mutateWithTracing(ByteBuffer.wrap(sessionBytes), message, -1, traceType.getTTL());
+ TraceState.mutateWithTracing(ByteBuffer.wrap(sessionBytes), traceMessage, -1, traceType.getTTL());
}
else
{
- state.trace(message);
+ state.trace(traceMessage);
if (messageOut.verb == MessagingService.Verb.REQUEST_RESPONSE)
Tracing.instance.doneWithNonLocalSession(state);
}
}
- deliver.accept(toFull, serialized);
+ InetSocketAddress toFull = lookupAddressAndPort.apply(to);
+ deliver.accept(toFull,
+ serializeMessage(messageOut, id, broadcastAddress(), toFull));
+
return false;
}
@@ -304,12 +394,12 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
}
}
- public static Pair<MessageIn<Object>, Integer> deserializeMessage(IMessage msg)
+ public static Pair<MessageIn<Object>, Integer> deserializeMessage(IMessage imessage)
{
// Based on org.apache.cassandra.net.IncomingTcpConnection.receiveMessage
- try (DataInputStream input = new DataInputStream(new ByteArrayInputStream(msg.bytes())))
+ try (DataInputStream input = new DataInputStream(new ByteArrayInputStream(imessage.bytes())))
{
- int version = msg.version();
+ int version = imessage.version();
if (version > MessagingService.current_version)
{
throw new IllegalStateException(String.format("Received message version %d but current version is %d",
@@ -323,8 +413,8 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
id = Integer.parseInt(input.readUTF());
else
id = input.readInt();
- if (msg.id() != id)
- throw new IllegalStateException(String.format("Message id mismatch: %d != %d", msg.id(), id));
+ if (imessage.id() != id)
+ throw new IllegalStateException(String.format("Message id mismatch: %d != %d", imessage.id(), id));
// make sure to readInt, even if cross_node_to is not enabled
int partial = input.readInt();
@@ -347,7 +437,7 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
}
catch (Throwable t)
{
- throw new RuntimeException("Exception occurred on node " + broadcastAddressAndPort(), t);
+ throw new RuntimeException("Exception occurred on node " + broadcastAddress(), t);
}
MessageIn<Object> message = deserialized.left;
@@ -380,9 +470,9 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
return callsOnInstance(() -> MessagingService.current_version).call();
}
- public void setMessagingVersion(InetAddressAndPort endpoint, int version)
+ public void setMessagingVersion(InetSocketAddress endpoint, int version)
{
- runOnInstance(() -> MessagingService.instance().setVersion(endpoint.address, version));
+ runOnInstance(() -> MessagingService.instance().setVersion(endpoint.getAddress(), version));
}
public void flush(String keyspace)
@@ -412,7 +502,7 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
{
mkdirs();
- assert config.networkTopology().contains(config.broadcastAddressAndPort());
+ assert config.networkTopology().contains(config.broadcastAddress());
DistributedTestSnitch.assign(config.networkTopology());
DatabaseDescriptor.setDaemonInitialized();
@@ -446,7 +536,7 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
if (config.has(NETWORK))
{
- registerFilter(cluster);
+ registerFilters(cluster);
MessagingService.instance().listen();
}
else
@@ -479,9 +569,9 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
StorageService.instance.setRpcReady(true);
}
- if (!FBUtilities.getBroadcastAddress().equals(broadcastAddressAndPort().address))
+ if (!FBUtilities.getBroadcastAddress().equals(broadcastAddress().getAddress()))
throw new IllegalStateException();
- if (DatabaseDescriptor.getStoragePort() != broadcastAddressAndPort().port)
+ if (DatabaseDescriptor.getStoragePort() != broadcastAddress().getPort())
throw new IllegalStateException();
}
catch (Throwable t)
@@ -504,7 +594,7 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
private static Config loadConfig(IInstanceConfig overrides)
{
Config config = new Config();
- overrides.propagate(config);
+ overrides.propagate(config, mapper);
return config;
}
@@ -513,13 +603,13 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
// This should be done outside instance in order to avoid serializing config
String partitionerName = config.getString("partitioner");
List<String> initialTokens = new ArrayList<>();
- List<InetAddressAndPort> hosts = new ArrayList<>();
+ List<InetSocketAddress> hosts = new ArrayList<>();
List<UUID> hostIds = new ArrayList<>();
for (int i = 1 ; i <= cluster.size() ; ++i)
{
IInstanceConfig config = cluster.get(i).config();
initialTokens.add(config.getString("initial_token"));
- hosts.add(config.broadcastAddressAndPort());
+ hosts.add(config.broadcastAddress());
hostIds.add(config.hostId());
}
@@ -533,24 +623,24 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
for (int i = 0; i < tokens.size(); i++)
{
- InetAddressAndPort ep = hosts.get(i);
- Gossiper.instance.initializeNodeUnsafe(ep.address, hostIds.get(i), 1);
- Gossiper.instance.injectApplicationState(ep.address,
+ InetSocketAddress ep = hosts.get(i);
+ Gossiper.instance.initializeNodeUnsafe(ep.getAddress(), hostIds.get(i), 1);
+ Gossiper.instance.injectApplicationState(ep.getAddress(),
ApplicationState.TOKENS,
new VersionedValue.VersionedValueFactory(partitioner).tokens(Collections.singleton(tokens.get(i))));
- storageService.onChange(ep.address,
+ storageService.onChange(ep.getAddress(),
ApplicationState.STATUS,
new VersionedValue.VersionedValueFactory(partitioner).normal(Collections.singleton(tokens.get(i))));
- Gossiper.instance.realMarkAlive(ep.address, Gossiper.instance.getEndpointStateForEndpoint(ep.address));
+ Gossiper.instance.realMarkAlive(ep.getAddress(), Gossiper.instance.getEndpointStateForEndpoint(ep.getAddress()));
int messagingVersion = cluster.get(ep).isShutdown()
? MessagingService.current_version
: Math.min(MessagingService.current_version, cluster.get(ep).getMessagingVersion());
- MessagingService.instance().setVersion(ep.address, messagingVersion);
+ MessagingService.instance().setVersion(ep.getAddress(), messagingVersion);
}
// check that all nodes are in token metadata
for (int i = 0; i < tokens.size(); ++i)
- assert storageService.getTokenMetadata().isMember(hosts.get(i).address);
+ assert storageService.getTokenMetadata().isMember(hosts.get(i).getAddress());
}
catch (Throwable e) // UnknownHostException
{
@@ -563,6 +653,7 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
return shutdown(true);
}
+ @Override
public Future<Void> shutdown(boolean graceful)
{
Future<?> future = async((ExecutorService executor) -> {
@@ -582,6 +673,10 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
}
error = parallelRun(error, executor,
+ MessagingService.instance()::shutdown
+ );
+
+ error = parallelRun(error, executor,
() -> Gossiper.instance.stopShutdownAndWait(1L, MINUTES),
CompactionManager.instance::forceShutdown,
() -> BatchlogManager.shutdownAndWait(1L, MINUTES),
@@ -597,8 +692,7 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
() -> ScheduledExecutors.shutdownAndWait(1L, MINUTES)
);
error = parallelRun(error, executor,
- CommitLog.instance::shutdownBlocking,
- MessagingService.instance()::shutdown
+ CommitLog.instance::shutdownBlocking
);
error = parallelRun(error, executor,
() -> StageManager.shutdownAndWait(1L, MINUTES),
@@ -621,11 +715,85 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
}).call();
}
- public int nodetool(String... commandAndArgs)
+ public NodeToolResult nodetoolResult(boolean withNotifications, String... commandAndArgs)
+ {
+ return sync(() -> {
+ DTestNodeTool nodetool = new DTestNodeTool(withNotifications);
+ int rc = nodetool.execute(commandAndArgs);
+ return new NodeToolResult(commandAndArgs, rc, new ArrayList<>(nodetool.notifications.notifications), nodetool.latestError);
+ }).call();
+ }
+
+ private static class DTestNodeTool extends NodeTool {
+ private final StorageServiceMBean storageProxy;
+ private final CollectingNotificationListener notifications = new CollectingNotificationListener();
+
+ private Throwable latestError;
+
+ DTestNodeTool(boolean withNotifications) {
+ super(new InternalNodeProbeFactory(withNotifications));
+ storageProxy = new InternalNodeProbe(withNotifications).getStorageService();
+ storageProxy.addNotificationListener(notifications, null, null);
+ }
+
+ public int execute(String... args)
+ {
+ try
+ {
+ return super.execute(args);
+ }
+ finally
+ {
+ try
+ {
+ storageProxy.removeNotificationListener(notifications, null, null);
+ }
+ catch (ListenerNotFoundException e)
+ {
+ // ignored
+ }
+ }
+ }
+
+ protected void badUse(Exception e)
+ {
+ super.badUse(e);
+ latestError = e;
+ }
+
+ protected void err(Throwable e)
+ {
+ super.err(e);
+ latestError = e;
+ }
+ }
+
+ private static final class CollectingNotificationListener implements NotificationListener
{
- return sync(() -> new NodeTool(new InternalNodeProbeFactory()).execute(commandAndArgs)).call();
+ private final List<Notification> notifications = new CopyOnWriteArrayList<>();
+
+ public void handleNotification(Notification notification, Object handback)
+ {
+ notifications.add(notification);
+ }
+ }
+
+ public void uncaughtException(Thread thread, Throwable throwable)
+ {
+ System.out.println(String.format("Exception %s occurred on thread %s", throwable.getMessage(), thread.getName()));
+ throwable.printStackTrace();
}
+ public long killAttempts()
+ {
+ return callOnInstance(InstanceKiller::getKillAttempts);
+ }
+
+ private static void shutdownAndWait(List<ExecutorService> executors) throws TimeoutException, InterruptedException
+ {
+ ExecutorUtils.shutdownNow(executors);
+ ExecutorUtils.awaitTermination(1L, MINUTES, executors);
+ }
private static Throwable parallelRun(Throwable accumulate, ExecutorService runOn, ThrowingRunnable ... runnables)
{
@@ -659,4 +827,4 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
}
return accumulate;
}
-}
+}
\ No newline at end of file
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/InstanceClassLoader.java b/test/distributed/org/apache/cassandra/distributed/impl/InstanceClassLoader.java
deleted file mode 100644
index fb78902..0000000
--- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceClassLoader.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.distributed.impl;
-
-import com.google.common.base.Predicate;
-import org.apache.cassandra.config.ParameterizedClass;
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.SigarLibrary;
-
-import java.io.IOException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.Arrays;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-public class InstanceClassLoader extends URLClassLoader
-{
- // Classes that have to be shared between instances, for configuration or returning values
- private static final Set<String> sharedClassNames = Arrays.stream(new Class[]
- {
- Pair.class,
- InetAddressAndPort.class,
- ParameterizedClass.class,
- SigarLibrary.class,
- IInvokableInstance.class,
- NetworkTopology.class
- })
- .map(Class::getName)
- .collect(Collectors.toSet());
-
- private static final Predicate<String> sharePackage = name ->
- name.startsWith("org.apache.cassandra.distributed.api.")
- || name.startsWith("sun.")
- || name.startsWith("oracle.")
- || name.startsWith("com.intellij.")
- || name.startsWith("com.sun.")
- || name.startsWith("com.oracle.")
- || name.startsWith("java.")
- || name.startsWith("javax.")
- || name.startsWith("jdk.")
- || name.startsWith("netscape.")
- || name.startsWith("org.xml.sax.");
-
- private static final Predicate<String> shareClass = name -> sharePackage.apply(name) || sharedClassNames.contains(name);
-
- public static interface Factory
- {
- InstanceClassLoader create(int id, URL[] urls, ClassLoader sharedClassLoader);
- }
-
- private volatile boolean isClosed = false;
- private final URL[] urls;
- private final int generation; // used to help debug class loader leaks, by helping determine which classloaders should have been collected
- private final int id;
- private final ClassLoader sharedClassLoader;
-
- InstanceClassLoader(int generation, int id, URL[] urls, ClassLoader sharedClassLoader)
- {
- super(urls, null);
- this.urls = urls;
- this.sharedClassLoader = sharedClassLoader;
- this.generation = generation;
- this.id = id;
- }
-
- @Override
- public Class<?> loadClass(String name) throws ClassNotFoundException
- {
- if (shareClass.apply(name))
- return sharedClassLoader.loadClass(name);
-
- return loadClassInternal(name);
- }
-
- Class<?> loadClassInternal(String name) throws ClassNotFoundException
- {
- if (isClosed)
- throw new IllegalStateException(String.format("Can't load %s. Instance class loader is already closed.", name));
-
- synchronized (getClassLoadingLock(name))
- {
- // First, check if the class has already been loaded
- Class<?> c = findLoadedClass(name);
-
- if (c == null)
- c = findClass(name);
-
- return c;
- }
- }
-
- /**
- * @return true iff this class was loaded by an InstanceClassLoader, and as such is used by a dtest node
- */
- public static boolean wasLoadedByAnInstanceClassLoader(Class<?> clazz)
- {
- return clazz.getClassLoader().getClass().getName().equals(InstanceClassLoader.class.getName());
- }
-
- public String toString()
- {
- return "InstanceClassLoader{" +
- "generation=" + generation +
- ", id = " + id +
- ", urls=" + Arrays.toString(urls) +
- '}';
- }
-
- public void close() throws IOException
- {
- isClosed = true;
- super.close();
- }
-}
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
index 6d668e6..0aafbca 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
@@ -18,16 +18,9 @@
package org.apache.cassandra.distributed.impl;
-import org.apache.cassandra.config.Config;
-import org.apache.cassandra.config.ParameterizedClass;
-import org.apache.cassandra.distributed.api.Feature;
-import org.apache.cassandra.distributed.api.IInstanceConfig;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.locator.SimpleSeedProvider;
-
import java.io.File;
import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Collections;
@@ -35,6 +28,14 @@ import java.util.EnumSet;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
+import java.util.function.Function;
+
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.shared.NetworkTopology;
+import org.apache.cassandra.distributed.shared.Versions;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.SimpleSeedProvider;
public class InstanceConfig implements IInstanceConfig
{
@@ -54,28 +55,6 @@ public class InstanceConfig implements IInstanceConfig
private volatile InetAddressAndPort broadcastAddressAndPort;
- @Override
- public InetAddressAndPort broadcastAddressAndPort()
- {
- if (broadcastAddressAndPort == null)
- {
- broadcastAddressAndPort = getAddressAndPortFromConfig("broadcast_address", "storage_port");
- }
- return broadcastAddressAndPort;
- }
-
- private InetAddressAndPort getAddressAndPortFromConfig(String addressProp, String portProp)
- {
- try
- {
- return InetAddressAndPort.getByNameOverrideDefaults(getString(addressProp), getInt(portProp));
- }
- catch (UnknownHostException e)
- {
- throw new IllegalStateException(e);
- }
- }
-
private InstanceConfig(int num,
NetworkTopology networkTopology,
String broadcast_address,
@@ -138,6 +117,44 @@ public class InstanceConfig implements IInstanceConfig
this.broadcastAddressAndPort = copy.broadcastAddressAndPort;
}
+
+ @Override
+ public InetSocketAddress broadcastAddress()
+ {
+ return DistributedTestSnitch.fromCassandraInetAddressAndPort(getBroadcastAddressAndPort());
+ }
+
+ protected InetAddressAndPort getBroadcastAddressAndPort()
+ {
+ if (broadcastAddressAndPort == null)
+ {
+ broadcastAddressAndPort = getAddressAndPortFromConfig("broadcast_address", "storage_port");
+ }
+ return broadcastAddressAndPort;
+ }
+
+ private InetAddressAndPort getAddressAndPortFromConfig(String addressProp, String portProp)
+ {
+ try
+ {
+ return InetAddressAndPort.getByNameOverrideDefaults(getString(addressProp), getInt(portProp));
+ }
+ catch (UnknownHostException e)
+ {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ public String localRack()
+ {
+ return networkTopology().localRack(broadcastAddress());
+ }
+
+ public String localDatacenter()
+ {
+ return networkTopology().localDC(broadcastAddress());
+ }
+
public InstanceConfig with(Feature featureFlag)
{
featureFlags.add(featureFlag);
@@ -161,8 +178,6 @@ public class InstanceConfig implements IInstanceConfig
if (value == null)
value = NULL;
- // test value
- propagate(new Config(), fieldName, value, false);
params.put(fieldName, value);
return this;
}
@@ -177,16 +192,10 @@ public class InstanceConfig implements IInstanceConfig
return this;
}
- public void propagateIfSet(Object writeToConfig, String fieldName)
- {
- if (params.containsKey(fieldName))
- propagate(writeToConfig, fieldName, params.get(fieldName), true);
- }
-
- public void propagate(Object writeToConfig)
+ public void propagate(Object writeToConfig, Map<Class<?>, Function<Object, Object>> mapping)
{
for (Map.Entry<String, Object> e : params.entrySet())
- propagate(writeToConfig, e.getKey(), e.getValue(), true);
+ propagate(writeToConfig, e.getKey(), e.getValue(), mapping);
}
public void validate()
@@ -195,11 +204,14 @@ public class InstanceConfig implements IInstanceConfig
throw new IllegalArgumentException("In-JVM dtests do not support vnodes as of now.");
}
- private void propagate(Object writeToConfig, String fieldName, Object value, boolean ignoreMissing)
+ private void propagate(Object writeToConfig, String fieldName, Object value, Map<Class<?>, Function<Object, Object>> mapping)
{
if (value == NULL)
value = null;
+ if (mapping != null && mapping.containsKey(value.getClass()))
+ value = mapping.get(value.getClass()).apply(value);
+
Class<?> configClass = writeToConfig.getClass();
Field valueField;
try
@@ -208,9 +220,7 @@ public class InstanceConfig implements IInstanceConfig
}
catch (NoSuchFieldException e)
{
- if (!ignoreMissing)
- throw new IllegalStateException(e);
- return;
+ throw new IllegalStateException(e);
}
if (valueField.getType().isEnum() && value instanceof String)
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/InstanceKiller.java b/test/distributed/org/apache/cassandra/distributed/impl/InstanceKiller.java
new file mode 100644
index 0000000..e7ca49b
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceKiller.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.impl;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.cassandra.utils.JVMStabilityInspector;
+
+public class InstanceKiller extends JVMStabilityInspector.Killer
+{
+ private static final AtomicLong KILL_ATTEMPTS = new AtomicLong(0);
+
+ public static long getKillAttempts()
+ {
+ return KILL_ATTEMPTS.get();
+ }
+
+ public static void clear()
+ {
+ KILL_ATTEMPTS.set(0);
+ }
+
+ @Override
+ protected void killCurrentJVM(Throwable t, boolean quiet)
+ {
+ KILL_ATTEMPTS.incrementAndGet();
+ // the bad part is that System.exit kills the JVM, so all code which calls kill won't hit the
+ // next line; yet in in-JVM dtests System.exit is not desirable, so need to rely on a runtime exception
+ // as a means to try to stop execution
+ throw new InstanceShutdown();
+ }
+
+ public static final class InstanceShutdown extends RuntimeException { }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/IsolatedExecutor.java b/test/distributed/org/apache/cassandra/distributed/impl/IsolatedExecutor.java
index 248155a..fc31fdf 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/IsolatedExecutor.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/IsolatedExecutor.java
@@ -39,6 +39,7 @@ import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
+import java.util.function.Supplier;
import org.slf4j.LoggerFactory;
@@ -101,6 +102,9 @@ public class IsolatedExecutor implements IIsolatedExecutor
});
}
+ public <O> Supplier<Future<O>> supplyAsync(SerializableSupplier<O> call) { return () -> isolatedExecutor.submit(call::get); }
+ public <O> Supplier<O> supplySync(SerializableSupplier<O> call) { return () -> waitOn(supplyAsync(call).get()); }
+
public <O> CallableNoExcept<Future<O>> async(CallableNoExcept<O> call) { return () -> isolatedExecutor.submit(call); }
public <O> CallableNoExcept<O> sync(CallableNoExcept<O> call) { return () -> waitOn(async(call).call()); }
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Listen.java b/test/distributed/org/apache/cassandra/distributed/impl/Listen.java
index 27ae156..9d9beea 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Listen.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Listen.java
@@ -24,7 +24,6 @@ import java.util.concurrent.locks.LockSupport;
import java.util.function.Supplier;
import org.apache.cassandra.distributed.api.IListen;
-import org.apache.cassandra.gms.Gossiper;
public class Listen implements IListen
{
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/MessageFilters.java b/test/distributed/org/apache/cassandra/distributed/impl/MessageFilters.java
deleted file mode 100644
index c92553f..0000000
--- a/test/distributed/org/apache/cassandra/distributed/impl/MessageFilters.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.distributed.impl;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-import org.apache.cassandra.distributed.api.IMessage;
-import org.apache.cassandra.distributed.api.IMessageFilters;
-
-public class MessageFilters implements IMessageFilters
-{
- private final List<Filter> filters = new CopyOnWriteArrayList<>();
-
- public boolean permit(int from, int to, IMessage msg)
- {
- for (Filter filter : filters)
- {
- if (filter.matches(from, to, msg))
- return false;
- }
- return true;
- }
-
- public class Filter implements IMessageFilters.Filter
- {
- final int[] from;
- final int[] to;
- final int[] verbs;
- final Matcher matcher;
-
- Filter(int[] from, int[] to, int[] verbs, Matcher matcher)
- {
- if (from != null)
- {
- from = from.clone();
- Arrays.sort(from);
- }
- if (to != null)
- {
- to = to.clone();
- Arrays.sort(to);
- }
- if (verbs != null)
- {
- verbs = verbs.clone();
- Arrays.sort(verbs);
- }
- this.from = from;
- this.to = to;
- this.verbs = verbs;
- this.matcher = matcher;
- }
-
- public int hashCode()
- {
- return (from == null ? 0 : Arrays.hashCode(from))
- + (to == null ? 0 : Arrays.hashCode(to))
- + (verbs == null ? 0 : Arrays.hashCode(verbs));
- }
-
- public boolean equals(Object that)
- {
- return that instanceof Filter && equals((Filter) that);
- }
-
- public boolean equals(Filter that)
- {
- return Arrays.equals(from, that.from)
- && Arrays.equals(to, that.to)
- && Arrays.equals(verbs, that.verbs);
- }
-
- public Filter off()
- {
- filters.remove(this);
- return this;
- }
-
- public Filter on()
- {
- filters.add(this);
- return this;
- }
-
- public boolean matches(int from, int to, IMessage msg)
- {
- return (this.from == null || Arrays.binarySearch(this.from, from) >= 0)
- && (this.to == null || Arrays.binarySearch(this.to, to) >= 0)
- && (this.verbs == null || Arrays.binarySearch(this.verbs, msg.verb()) >= 0)
- && (this.matcher == null || this.matcher.matches(from, to, msg));
- }
- }
-
- public class Builder implements IMessageFilters.Builder
- {
- int[] from;
- int[] to;
- int[] verbs;
- Matcher matcher;
-
- private Builder(int[] verbs)
- {
- this.verbs = verbs;
- }
-
- public Builder from(int... nums)
- {
- from = nums;
- return this;
- }
-
- public Builder to(int... nums)
- {
- to = nums;
- return this;
- }
-
- public IMessageFilters.Builder messagesMatching(Matcher matcher)
- {
- this.matcher = matcher;
- return this;
- }
-
- public Filter drop()
- {
- return new Filter(from, to, verbs, matcher).on();
- }
- }
-
-
- public Builder verbs(int... verbs)
- {
- return new Builder(verbs);
- }
-
- @Override
- public Builder allVerbs()
- {
- return new Builder(null);
- }
-
- @Override
- public void reset()
- {
- filters.clear();
- }
-}
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Message.java b/test/distributed/org/apache/cassandra/distributed/impl/MessageImpl.java
similarity index 74%
rename from test/distributed/org/apache/cassandra/distributed/impl/Message.java
rename to test/distributed/org/apache/cassandra/distributed/impl/MessageImpl.java
index 6f8085c..ebc31b1 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Message.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/MessageImpl.java
@@ -18,19 +18,21 @@
package org.apache.cassandra.distributed.impl;
+import java.net.InetSocketAddress;
+
import org.apache.cassandra.distributed.api.IMessage;
-import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.distributed.shared.NetworkTopology;
// a container for simplifying the method signature for per-instance message handling/delivery
-public class Message implements IMessage
+public class MessageImpl implements IMessage
{
- private final int verb;
- private final byte[] bytes;
- private final int id;
- private final int version;
- private final InetAddressAndPort from;
+ public final int verb;
+ public final byte[] bytes;
+ public final long id;
+ public final int version;
+ public final InetSocketAddress from;
- public Message(int verb, byte[] bytes, int id, int version, InetAddressAndPort from)
+ public MessageImpl(int verb, byte[] bytes, long id, int version, InetSocketAddress from)
{
this.verb = verb;
this.bytes = bytes;
@@ -39,32 +41,27 @@ public class Message implements IMessage
this.from = from;
}
- @Override
public int verb()
{
return verb;
}
- @Override
public byte[] bytes()
{
return bytes;
}
- @Override
public int id()
{
- return id;
+ return (int) id;
}
- @Override
public int version()
{
return version;
}
- @Override
- public InetAddressAndPort from()
+ public InetSocketAddress from()
{
return from;
}
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/NetworkTopology.java b/test/distributed/org/apache/cassandra/distributed/impl/NetworkTopology.java
deleted file mode 100644
index f7c31ff..0000000
--- a/test/distributed/org/apache/cassandra/distributed/impl/NetworkTopology.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.distributed.impl;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.function.IntFunction;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.utils.Pair;
-
-public class NetworkTopology
-{
- private final Map<InetAddressAndPort, DcAndRack> map;
-
- public static class DcAndRack
- {
- private final String dc;
- private final String rack;
-
- private DcAndRack(String dc, String rack)
- {
- this.dc = dc;
- this.rack = rack;
- }
- }
-
- public static DcAndRack dcAndRack(String dc, String rack)
- {
- return new DcAndRack(dc, rack);
- }
-
- private NetworkTopology() {
- map = new HashMap<>();
- }
-
- @SuppressWarnings("WeakerAccess")
- public NetworkTopology(NetworkTopology networkTopology)
- {
- map = new HashMap<>(networkTopology.map);
- }
-
- public static NetworkTopology build(String ipPrefix, int broadcastPort, Map<Integer, DcAndRack> nodeIdTopology)
- {
- final NetworkTopology topology = new NetworkTopology();
-
- for (int nodeId = 1; nodeId <= nodeIdTopology.size(); nodeId++)
- {
- String broadcastAddress = ipPrefix + nodeId;
-
- try
- {
- DcAndRack dcAndRack = nodeIdTopology.get(nodeId);
- if (dcAndRack == null)
- throw new IllegalStateException("nodeId " + nodeId + "not found in instanceMap");
-
- InetAddressAndPort broadcastAddressAndPort = InetAddressAndPort.getByAddressOverrideDefaults(
- InetAddress.getByName(broadcastAddress), broadcastPort);
- topology.put(broadcastAddressAndPort, dcAndRack);
- }
- catch (UnknownHostException e)
- {
- throw new ConfigurationException("Unknown broadcast_address '" + broadcastAddress + '\'', false);
- }
- }
- return topology;
- }
-
- public DcAndRack put(InetAddressAndPort key, DcAndRack value)
- {
- return map.put(key, value);
- }
-
- public String localRack(InetAddressAndPort key)
- {
- DcAndRack p = map.get(key);
- if (p == null)
- return null;
- return p.rack;
- }
-
- public String localDC(InetAddressAndPort key)
- {
- DcAndRack p = map.get(key);
- if (p == null)
- return null;
- return p.dc;
- }
-
- public boolean contains(InetAddressAndPort key)
- {
- return map.containsKey(key);
- }
-
- public String toString()
- {
- return "NetworkTopology{" + map + '}';
- }
-
-
- public static Map<Integer, NetworkTopology.DcAndRack> singleDcNetworkTopology(int nodeCount,
- String dc,
- String rack)
- {
- return networkTopology(nodeCount, (nodeid) -> NetworkTopology.dcAndRack(dc, rack));
- }
-
- public static Map<Integer, NetworkTopology.DcAndRack> networkTopology(int nodeCount,
- IntFunction<DcAndRack> dcAndRackSupplier)
- {
-
- return IntStream.rangeClosed(1, nodeCount).boxed()
- .collect(Collectors.toMap(nodeId -> nodeId,
- dcAndRackSupplier::apply));
- }
-}
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/RowUtil.java b/test/distributed/org/apache/cassandra/distributed/impl/RowUtil.java
index ded3708..2da3676 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/RowUtil.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/RowUtil.java
@@ -51,6 +51,11 @@ public class RowUtil
return result;
}
+ public static Iterator<Object[]> toObjects(UntypedResultSet rs)
+ {
+ return toObjects(rs.metadata(), rs.iterator());
+ }
+
public static Iterator<Object[]> toObjects(List<ColumnSpecification> columnSpecs, Iterator<UntypedResultSet.Row> rs)
{
return Iterators.transform(rs,
@@ -60,8 +65,10 @@ public class RowUtil
{
ColumnSpecification columnSpec = columnSpecs.get(i);
ByteBuffer bb = row.getBytes(columnSpec.name.toString());
+
if (bb != null)
objectRow[i] = columnSpec.type.getSerializer().deserialize(bb);
+
}
return objectRow;
});
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/TracingUtil.java b/test/distributed/org/apache/cassandra/distributed/impl/TracingUtil.java
index d671e55..439d4b7 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/TracingUtil.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/TracingUtil.java
@@ -24,7 +24,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.UUID;
-import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
/**
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Versions.java b/test/distributed/org/apache/cassandra/distributed/impl/Versions.java
deleted file mode 100644
index d5d6e7d..0000000
--- a/test/distributed/org/apache/cassandra/distributed/impl/Versions.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.distributed.impl;
-
-import java.io.File;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.config.Config;
-import org.apache.cassandra.utils.FBUtilities;
-
-public class Versions
-{
- private static final Logger logger = LoggerFactory.getLogger(Versions.class);
- public static Version CURRENT = new Version(FBUtilities.getReleaseVersionString(), ((URLClassLoader)Versions.class.getClassLoader()).getURLs());
-
- public enum Major
- {
- v22("2\\.2\\.([0-9]+)"),
- v30("3\\.0\\.([0-9]+)"),
- v3X("3\\.([1-9]|1[01])(\\.([0-9]+))?"),
- v4("4\\.([0-9]+)");
- final Pattern pattern;
- Major(String verify)
- {
- this.pattern = Pattern.compile(verify);
- }
-
- static Major fromFull(String version)
- {
- if (version.isEmpty())
- throw new IllegalArgumentException(version);
- switch (version.charAt(0))
- {
- case '2':
- if (version.startsWith("2.2"))
- return v22;
- throw new IllegalArgumentException(version);
- case '3':
- if (version.startsWith("3.0"))
- return v30;
- return v3X;
- case '4':
- return v4;
- default:
- throw new IllegalArgumentException(version);
- }
- }
-
- // verify that the version string is valid for this major version
- boolean verify(String version)
- {
- return pattern.matcher(version).matches();
- }
-
- // sort two strings of the same major version as this enum instance
- int compare(String a, String b)
- {
- Matcher ma = pattern.matcher(a);
- Matcher mb = pattern.matcher(a);
- if (!ma.matches()) throw new IllegalArgumentException(a);
- if (!mb.matches()) throw new IllegalArgumentException(b);
- int result = Integer.compare(Integer.parseInt(ma.group(1)), Integer.parseInt(mb.group(1)));
- if (result == 0 && this == v3X && (ma.group(3) != null || mb.group(3) != null))
- {
- if (ma.group(3) != null && mb.group(3) != null)
- {
- result = Integer.compare(Integer.parseInt(ma.group(3)), Integer.parseInt(mb.group(3)));
- }
- else
- {
- result = ma.group(3) != null ? 1 : -1;
- }
- }
- // sort descending
- return -result;
- }
- }
-
- public static class Version
- {
- public final Major major;
- public final String version;
- public final URL[] classpath;
-
- public Version(String version, URL[] classpath)
- {
- this(Major.fromFull(version), version, classpath);
- }
- public Version(Major major, String version, URL[] classpath)
- {
- this.major = major;
- this.version = version;
- this.classpath = classpath;
- }
- }
-
- private final Map<Major, List<Version>> versions;
- public Versions(Map<Major, List<Version>> versions)
- {
- this.versions = versions;
- }
-
- public Version get(String full)
- {
- return versions.get(Major.fromFull(full))
- .stream()
- .filter(v -> full.equals(v.version))
- .findFirst()
- .orElseThrow(() -> new RuntimeException("No version " + full + " found"));
- }
-
- public Version getLatest(Major major)
- {
- return versions.get(major).stream().findFirst().orElseThrow(() -> new RuntimeException("No " + major + " versions found"));
- }
-
- public static Versions find()
- {
- final String dtestJarDirectory = System.getProperty(Config.PROPERTY_PREFIX + "test.dtest_jar_path","build");
- final File sourceDirectory = new File(dtestJarDirectory);
- logger.info("Looking for dtest jars in " + sourceDirectory.getAbsolutePath());
- final Pattern pattern = Pattern.compile("dtest-(?<fullversion>(\\d+)\\.(\\d+)(\\.\\d+)?(\\.\\d+)?)([~\\-]\\w[.\\w]*(?:\\-\\w[.\\w]*)*)?(\\+[.\\w]+)?\\.jar");
- final Map<Major, List<Version>> versions = new HashMap<>();
- for (Major major : Major.values())
- versions.put(major, new ArrayList<>());
-
- for (File file : sourceDirectory.listFiles())
- {
- Matcher m = pattern.matcher(file.getName());
- if (!m.matches())
- continue;
- String version = m.group("fullversion");
- Major major = Major.fromFull(version);
- versions.get(major).add(new Version(major, version, new URL[] { toURL(file) }));
- }
-
- for (Map.Entry<Major, List<Version>> e : versions.entrySet())
- {
- if (e.getValue().isEmpty())
- continue;
- Collections.sort(e.getValue(), Comparator.comparing(v -> v.version, e.getKey()::compare));
- logger.info("Found " + e.getValue().stream().map(v -> v.version).collect(Collectors.joining(", ")));
- }
-
- return new Versions(versions);
- }
-
- public static URL toURL(File file)
- {
- try
- {
- return file.toURI().toURL();
- }
- catch (MalformedURLException e)
- {
- throw new IllegalArgumentException(e);
- }
- }
-
-}
diff --git a/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbe.java b/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbe.java
index ca1d53a..625b4aa 100644
--- a/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbe.java
+++ b/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbe.java
@@ -23,6 +23,8 @@ import java.lang.management.ManagementFactory;
import java.util.Iterator;
import java.util.Map;
+import javax.management.ListenerNotFoundException;
+
import com.google.common.collect.Multimap;
import org.apache.cassandra.db.ColumnFamilyStoreMBean;
@@ -41,14 +43,19 @@ import org.apache.cassandra.service.CacheServiceMBean;
import org.apache.cassandra.service.GCInspector;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.StorageServiceMBean;
import org.apache.cassandra.streaming.StreamManager;
import org.apache.cassandra.tools.NodeProbe;
+import org.mockito.Mockito;
public class InternalNodeProbe extends NodeProbe
{
- public InternalNodeProbe() throws IOException
+ private final boolean withNotifications;
+
+ public InternalNodeProbe(boolean withNotifications)
{
- super("", 0);
+ this.withNotifications = withNotifications;
+ connect();
}
protected void connect()
@@ -57,6 +64,28 @@ public class InternalNodeProbe extends NodeProbe
mbeanServerConn = null;
jmxc = null;
+
+ if (withNotifications)
+ {
+ ssProxy = StorageService.instance;
+ }
+ else
+ {
+ // replace the notification apis with a no-op method
+ StorageServiceMBean mock = Mockito.spy(StorageService.instance);
+ Mockito.doNothing().when(mock).addNotificationListener(Mockito.any(), Mockito.any(), Mockito.any());
+ try
+ {
+ Mockito.doNothing().when(mock).removeNotificationListener(Mockito.any(), Mockito.any(), Mockito.any());
+ Mockito.doNothing().when(mock).removeNotificationListener(Mockito.any());
+ }
+ catch (ListenerNotFoundException e)
+ {
+ throw new AssertionError(e);
+ }
+ ssProxy = mock;
+ }
+
ssProxy = StorageService.instance;
msProxy = MessagingService.instance();
streamProxy = StreamManager.instance;
diff --git a/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java b/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java
index f7c9dcf..1904aa7 100644
--- a/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java
+++ b/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java
@@ -25,11 +25,18 @@ import org.apache.cassandra.tools.INodeProbeFactory;
public class InternalNodeProbeFactory implements INodeProbeFactory
{
+ private final boolean withNotifications;
+
+ public InternalNodeProbeFactory(boolean withNotifications)
+ {
+ this.withNotifications = withNotifications;
+ }
+
public NodeProbe create(String host, int port) throws IOException {
- return new InternalNodeProbe();
+ return new InternalNodeProbe(withNotifications);
}
public NodeProbe create(String host, int port, String username, String password) throws IOException {
- return new InternalNodeProbe();
+ return new InternalNodeProbe(withNotifications);
}
}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/BootstrapTest.java b/test/distributed/org/apache/cassandra/distributed/test/BootstrapTest.java
index 0af26ea..11c2f9d 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/BootstrapTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/BootstrapTest.java
@@ -25,17 +25,21 @@ import java.util.stream.IntStream;
import org.junit.Assert;
import org.junit.Test;
-import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ICluster;
import org.apache.cassandra.distributed.api.IInstance;
-import org.apache.cassandra.distributed.impl.IInvokableInstance;
-import org.apache.cassandra.distributed.impl.InstanceConfig;
-import org.apache.cassandra.distributed.impl.NetworkTopology;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.shared.Builder;
+import org.apache.cassandra.distributed.shared.NetworkTopology;
import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.apache.cassandra.distributed.shared.DistributedTestBase.KEYSPACE;
-public class BootstrapTest extends DistributedTestBase
+// TODO: this test should be removed after running in-jvm dtests is set up via the shared API repository
+public class BootstrapTest extends TestBaseImpl
{
@Test
@@ -43,24 +47,22 @@ public class BootstrapTest extends DistributedTestBase
{
int originalNodeCount = 2;
int expandedNodeCount = originalNodeCount + 1;
- Cluster.Builder<IInvokableInstance, Cluster> builder = Cluster.build(originalNodeCount)
- .withTokenSupplier(Cluster.evenlyDistributedTokens(expandedNodeCount))
- .withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(originalNodeCount, "dc0", "rack0"))
- .withConfig(config -> config.with(NETWORK, GOSSIP));
+ Builder<IInstance, ICluster> builder = builder().withNodes(originalNodeCount)
+ .withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount))
+ .withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(originalNodeCount, "dc0", "rack0"))
+ .withConfig(config -> config.with(NETWORK, GOSSIP));
Map<Integer, Long> withBootstrap = null;
Map<Integer, Long> naturally = null;
-
- try (Cluster cluster = builder.start())
+ try (ICluster<IInvokableInstance> cluster = builder.withNodes(originalNodeCount).start())
{
populate(cluster);
- InstanceConfig config = builder.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, "dc0", "rack0"))
- .newInstanceConfig(cluster);
+ IInstanceConfig config = builder.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, "dc0", "rack0"))
+ .newInstanceConfig(cluster);
config.set("auto_bootstrap", true);
- IInstance newInstance = cluster.bootstrap(config);
- newInstance.startup();
+ cluster.bootstrap(config).startup();
cluster.stream().forEach(instance -> {
instance.nodetool("cleanup", KEYSPACE, "tbl");
@@ -69,20 +71,21 @@ public class BootstrapTest extends DistributedTestBase
withBootstrap = count(cluster);
}
- builder = Cluster.build(expandedNodeCount)
- .withTokenSupplier(Cluster.evenlyDistributedTokens(expandedNodeCount))
+ builder = builder.withNodes(expandedNodeCount)
+ .withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount))
.withConfig(config -> config.with(NETWORK, GOSSIP));
- try (Cluster cluster = builder.start())
+ try (ICluster cluster = builder.start())
{
populate(cluster);
naturally = count(cluster);
}
- Assert.assertEquals(withBootstrap, naturally);
+ for (Map.Entry<Integer, Long> e : withBootstrap.entrySet())
+ Assert.assertTrue(e.getValue() >= naturally.get(e.getKey()));
}
- public void populate(Cluster cluster)
+ public void populate(ICluster cluster)
{
cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + 3 + "};");
cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
@@ -93,12 +96,11 @@ public class BootstrapTest extends DistributedTestBase
i, i, i);
}
- public Map<Integer, Long> count(Cluster cluster)
+ public Map<Integer, Long> count(ICluster cluster)
{
return IntStream.rangeClosed(1, cluster.size())
.boxed()
.collect(Collectors.toMap(nodeId -> nodeId,
nodeId -> (Long) cluster.get(nodeId).executeInternal("SELECT count(*) FROM " + KEYSPACE + ".tbl")[0][0]));
}
-
-}
+}
\ No newline at end of file
diff --git a/test/distributed/org/apache/cassandra/distributed/test/DistributedReadWritePathTest.java b/test/distributed/org/apache/cassandra/distributed/test/DistributedReadWritePathTest.java
deleted file mode 100644
index 8cd731d..0000000
--- a/test/distributed/org/apache/cassandra/distributed/test/DistributedReadWritePathTest.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.distributed.test;
-
-import org.junit.Test;
-
-import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.distributed.Cluster;
-
-public class DistributedReadWritePathTest extends DistributedTestBase
-{
- @Test
- public void coordinatorReadTest() throws Throwable
- {
- try (Cluster cluster = init(Cluster.create(3)))
- {
- cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
-
- cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
- cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, 2)");
- cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 3, 3)");
-
- assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?",
- ConsistencyLevel.ALL,
- 1),
- row(1, 1, 1),
- row(1, 2, 2),
- row(1, 3, 3));
- }
- }
-
- @Test
- public void coordinatorWriteTest() throws Throwable
- {
- try (Cluster cluster = init(Cluster.create(3)))
- {
- cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
-
- cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)",
- ConsistencyLevel.QUORUM);
-
- for (int i = 0; i < 3; i++)
- {
- assertRows(cluster.get(1).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"),
- row(1, 1, 1));
- }
-
- assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
- ConsistencyLevel.QUORUM),
- row(1, 1, 1));
- }
- }
-
- @Test
- public void readRepairTest() throws Throwable
- {
- try (Cluster cluster = init(Cluster.create(3)))
- {
- cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
-
- cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
- cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
-
- assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"));
-
- assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
- ConsistencyLevel.ALL), // ensure node3 in preflist
- row(1, 1, 1));
-
- // Verify that data got repaired to the third node
- assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"),
- row(1, 1, 1));
- }
- }
-
- @Test
- public void simplePagedReadsTest() throws Throwable
- {
- try (Cluster cluster = init(Cluster.create(3)))
- {
- cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
-
- int size = 100;
- Object[][] results = new Object[size][];
- for (int i = 0; i < size; i++)
- {
- cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)",
- ConsistencyLevel.QUORUM,
- i, i);
- results[i] = new Object[] { 1, i, i};
- }
-
- // First, make sure that non-paged reads are able to fetch the results
- assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl", ConsistencyLevel.QUORUM),
- results);
-
- // Make sure paged read returns same results with different page sizes
- for (int pageSize : new int[] { 1, 2, 3, 5, 10, 20, 50})
- {
- assertRows(cluster.coordinator(1).executeWithPaging("SELECT * FROM " + KEYSPACE + ".tbl",
- ConsistencyLevel.QUORUM,
- pageSize),
- results);
- }
- }
- }
-
- @Test
- public void pagingWithRepairTest() throws Throwable
- {
- try (Cluster cluster = init(Cluster.create(3)))
- {
- cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
-
- int size = 10;
- Object[][] results = new Object[size][];
- for (int i = 0; i < size; i++)
- {
- // Make sure that data lands on different nodes and not coordinator
- cluster.get(i % 2 == 0 ? 2 : 3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)",
- i, i);
-
- results[i] = new Object[] { 1, i, i};
- }
-
- // Make sure paged read returns same results with different page sizes
- for (int pageSize : new int[] { 1, 2, 3, 5, 10, 20, 50})
- {
- assertRows(cluster.coordinator(1).executeWithPaging("SELECT * FROM " + KEYSPACE + ".tbl",
- ConsistencyLevel.ALL,
- pageSize),
- results);
- }
-
- assertRows(cluster.get(1).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl"),
- results);
- }
- }
-
- @Test
- public void pagingTests() throws Throwable
- {
- try (Cluster cluster = init(Cluster.create(3));
- Cluster singleNode = init(Cluster.create(1)))
- {
- cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
- singleNode.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
-
- for (int i = 0; i < 10; i++)
- {
- for (int j = 0; j < 10; j++)
- {
- cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)",
- ConsistencyLevel.QUORUM,
- i, j, i + i);
- singleNode.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)",
- ConsistencyLevel.QUORUM,
- i, j, i + i);
- }
- }
-
- int[] pageSizes = new int[] { 1, 2, 3, 5, 10, 20, 50};
- String[] statements = new String [] {"SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5",
- "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck >= 5",
- "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10",
- "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 LIMIT 3",
- "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck >= 5 LIMIT 2",
- "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10 LIMIT 2",
- "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 ORDER BY ck DESC",
- "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck >= 5 ORDER BY ck DESC",
- "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10 ORDER BY ck DESC",
- "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 ORDER BY ck DESC LIMIT 3",
- "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck >= 5 ORDER BY ck DESC LIMIT 2",
- "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10 ORDER BY ck DESC LIMIT 2",
- "SELECT DISTINCT pk FROM " + KEYSPACE + ".tbl LIMIT 3",
- "SELECT DISTINCT pk FROM " + KEYSPACE + ".tbl WHERE pk IN (3,5,8,10)",
- "SELECT DISTINCT pk FROM " + KEYSPACE + ".tbl WHERE pk IN (3,5,8,10) LIMIT 2"
- };
- for (String statement : statements)
- {
- for (int pageSize : pageSizes)
- {
- assertRows(cluster.coordinator(1)
- .executeWithPaging(statement,
- ConsistencyLevel.QUORUM, pageSize),
- singleNode.coordinator(1)
- .executeWithPaging(statement,
- ConsistencyLevel.QUORUM, Integer.MAX_VALUE));
- }
- }
-
- }
- }
-}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java b/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java
deleted file mode 100644
index 7a3d52d..0000000
--- a/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.distributed.test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-
-import com.google.common.collect.Iterators;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-
-import com.datastax.driver.core.ResultSet;
-import org.apache.cassandra.distributed.impl.AbstractCluster;
-import org.apache.cassandra.distributed.impl.IsolatedExecutor;
-import org.apache.cassandra.distributed.impl.RowUtil;
-
-public class DistributedTestBase
-{
- @After
- public void afterEach()
- {
- System.runFinalization();
- System.gc();
- }
-
- public static String KEYSPACE = "distributed_test_keyspace";
-
- public static void nativeLibraryWorkaround()
- {
- // Disable the C library for in-JVM dtests otherwise it holds a gcroot against the InstanceClassLoader
- System.setProperty("cassandra.disable_clibrary", "true");
-
- // Disable the Netty tcnative library otherwise the io.netty.internal.tcnative.CertificateCallbackTask,
- // CertificateVerifierTask, SSLPrivateKeyMethodDecryptTask, SSLPrivateKeyMethodSignTask,
- // SSLPrivateKeyMethodTask, and SSLTask hold a gcroot against the InstanceClassLoader.
- System.setProperty("cassandra.disable_tcactive_openssl", "true");
- System.setProperty("io.netty.transport.noNative", "true");
- }
-
- public static void processReaperWorkaround()
- {
- // Make sure the 'process reaper' thread is initially created under the main classloader,
- // otherwise it gets created with the contextClassLoader pointing to an InstanceClassLoader
- // which prevents it from being garbage collected.
- IsolatedExecutor.ThrowingRunnable.toRunnable(() -> new ProcessBuilder().command("true").start().waitFor()).run();
- }
-
- @BeforeClass
- public static void setup()
- {
- System.setProperty("cassandra.ring_delay_ms", Integer.toString(10 * 1000));
- System.setProperty("org.apache.cassandra.disable_mbean_registration", "true");
- nativeLibraryWorkaround();
- processReaperWorkaround();
- }
-
- static String withKeyspace(String replaceIn)
- {
- return String.format(replaceIn, KEYSPACE);
- }
-
- protected static <C extends AbstractCluster<?>> C init(C cluster)
- {
- cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + cluster.size() + "};");
- return cluster;
- }
-
- public static void assertRows(ResultSet actual,Object[]... expected)
- {
- assertRows(RowUtil.toObjects(actual), expected);
- }
-
- public static void assertRows(Object[][] actual, Object[]... expected)
- {
- Assert.assertEquals(rowsNotEqualErrorMessage(actual, expected),
- expected.length, actual.length);
-
- for (int i = 0; i < expected.length; i++)
- {
- Object[] expectedRow = expected[i];
- Object[] actualRow = actual[i];
- Assert.assertTrue(rowsNotEqualErrorMessage(actual, expected),
- Arrays.equals(expectedRow, actualRow));
- }
- }
-
- public static void assertRow(Object[] actual, Object... expected)
- {
- Assert.assertTrue(rowNotEqualErrorMessage(actual, expected),
- Arrays.equals(actual, expected));
- }
-
- public static void assertRows(Iterator<Object[]> actual, Iterator<Object[]> expected)
- {
- while (actual.hasNext() && expected.hasNext())
- assertRow(actual.next(), expected.next());
-
- Assert.assertEquals("Resultsets have different sizes", actual.hasNext(), expected.hasNext());
- }
-
- public static void assertRows(Iterator<Object[]> actual, Object[]... expected)
- {
- assertRows(actual, Iterators.forArray(expected));
- }
-
- public static String rowNotEqualErrorMessage(Object[] actual, Object[] expected)
- {
- return String.format("Expected: %s\nActual:%s\n",
- Arrays.toString(expected),
- Arrays.toString(actual));
- }
-
- public static String rowsNotEqualErrorMessage(Object[][] actual, Object[][] expected)
- {
- return String.format("Expected: %s\nActual: %s\n",
- rowsToString(expected),
- rowsToString(actual));
- }
-
- public static String rowsToString(Object[][] rows)
- {
- StringBuilder builder = new StringBuilder();
- builder.append("[");
- boolean isFirst = true;
- for (Object[] row : rows)
- {
- if (isFirst)
- isFirst = false;
- else
- builder.append(",");
- builder.append(Arrays.toString(row));
- }
- builder.append("]");
- return builder.toString();
- }
-
- public static Object[][] toObjectArray(Iterator<Object[]> iter)
- {
- List<Object[]> res = new ArrayList<>();
- while (iter.hasNext())
- res.add(iter.next());
-
- return res.toArray(new Object[res.size()][]);
- }
-
- public static Object[] row(Object... expected)
- {
- return expected;
- }
-}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/GossipSettlesTest.java b/test/distributed/org/apache/cassandra/distributed/test/GossipSettlesTest.java
index 5f1263a..509fe6f 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/GossipSettlesTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/GossipSettlesTest.java
@@ -18,23 +18,25 @@
package org.apache.cassandra.distributed.test;
-import java.io.IOException;
-
import org.junit.Test;
-import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ICluster;
import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
import static org.apache.cassandra.distributed.api.Feature.NETWORK;
-public class GossipSettlesTest extends DistributedTestBase
+// TODO: this test should be removed after running in-jvm dtests is set up via the shared API repository
+public class GossipSettlesTest extends TestBaseImpl
{
@Test
- public void testGossipSettles() throws IOException
+ public void testGossipSettles() throws Throwable
{
- // Use withSubnet(1) to prove seed provider is set correctly - without the fix to pass a seed provider, this test fails
- try (Cluster cluster = Cluster.build(3).withConfig(config -> config.with(GOSSIP).with(NETWORK)).withSubnet(1).start())
+ /* Use withSubnet(1) to prove seed provider is set correctly - without the fix to pass a seed provider, this test fails */
+ try (ICluster cluster = builder().withNodes(3)
+ .withConfig(config -> config.with(GOSSIP).with(NETWORK))
+ .withSubnet(1)
+ .start())
{
}
}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/MessageFiltersTest.java b/test/distributed/org/apache/cassandra/distributed/test/MessageFiltersTest.java
index 96974d8..062f401 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/MessageFiltersTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/MessageFiltersTest.java
@@ -18,29 +18,46 @@
package org.apache.cassandra.distributed.test;
+import java.net.InetSocketAddress;
import java.util.Arrays;
-import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
+import com.google.common.collect.Sets;
import org.junit.Assert;
import org.junit.Test;
-import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.IIsolatedExecutor;
import org.apache.cassandra.distributed.api.IMessage;
import org.apache.cassandra.distributed.api.IMessageFilters;
import org.apache.cassandra.distributed.impl.Instance;
-import org.apache.cassandra.distributed.impl.MessageFilters;
-import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.distributed.shared.MessageFilters;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessagingService;
-public class MessageFiltersTest extends DistributedTestBase
+public class MessageFiltersTest extends TestBaseImpl
{
+
+ @Test
+ public void simpleInboundFiltersTest()
+ {
+ simpleFiltersTest(true);
+ }
+
@Test
- public void simpleFiltersTest() throws Throwable
+ public void simpleOutboundFiltersTest()
+ {
+ simpleFiltersTest(false);
+ }
+
+ private interface Permit
+ {
+ boolean test(int from, int to, IMessage msg);
+ }
+
+ private static void simpleFiltersTest(boolean inbound)
{
int VERB1 = MessagingService.Verb.READ.ordinal();
int VERB2 = MessagingService.Verb.REQUEST_RESPONSE.ordinal();
@@ -52,61 +69,62 @@ public class MessageFiltersTest extends DistributedTestBase
String MSG2 = "msg2";
MessageFilters filters = new MessageFilters();
- MessageFilters.Filter filter = filters.allVerbs().from(1).drop();
+ Permit permit = inbound ? filters::permitInbound : filters::permitOutbound;
- Assert.assertFalse(filters.permit(i1, i2, msg(VERB1, MSG1)));
- Assert.assertFalse(filters.permit(i1, i2, msg(VERB2, MSG1)));
- Assert.assertFalse(filters.permit(i1, i2, msg(VERB3, MSG1)));
- Assert.assertTrue(filters.permit(i2, i1, msg(VERB1, MSG1)));
+ IMessageFilters.Filter filter = filters.allVerbs().inbound(inbound).from(1).drop();
+ Assert.assertFalse(permit.test(i1, i2, msg(VERB1, MSG1)));
+ Assert.assertFalse(permit.test(i1, i2, msg(VERB2, MSG1)));
+ Assert.assertFalse(permit.test(i1, i2, msg(VERB3, MSG1)));
+ Assert.assertTrue(permit.test(i2, i1, msg(VERB1, MSG1)));
filter.off();
- Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG1)));
+ Assert.assertTrue(permit.test(i1, i2, msg(VERB1, MSG1)));
filters.reset();
- filters.verbs(VERB1).from(1).to(2).drop();
- Assert.assertFalse(filters.permit(i1, i2, msg(VERB1, MSG1)));
- Assert.assertTrue(filters.permit(i1, i2, msg(VERB2, MSG1)));
- Assert.assertTrue(filters.permit(i2, i1, msg(VERB1, MSG1)));
- Assert.assertTrue(filters.permit(i2, i3, msg(VERB2, MSG1)));
+ filters.verbs(VERB1).inbound(inbound).from(1).to(2).drop();
+ Assert.assertFalse(permit.test(i1, i2, msg(VERB1, MSG1)));
+ Assert.assertTrue(permit.test(i1, i2, msg(VERB2, MSG1)));
+ Assert.assertTrue(permit.test(i2, i1, msg(VERB1, MSG1)));
+ Assert.assertTrue(permit.test(i2, i3, msg(VERB2, MSG1)));
filters.reset();
AtomicInteger counter = new AtomicInteger();
- filters.verbs(VERB1).from(1).to(2).messagesMatching((from, to, msg) -> {
+ filters.verbs(VERB1).inbound(inbound).from(1).to(2).messagesMatching((from, to, msg) -> {
counter.incrementAndGet();
return Arrays.equals(msg.bytes(), MSG1.getBytes());
}).drop();
- Assert.assertFalse(filters.permit(i1, i2, msg(VERB1, MSG1)));
+ Assert.assertFalse(permit.test(i1, i2, msg(VERB1, MSG1)));
Assert.assertEquals(counter.get(), 1);
- Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG2)));
+ Assert.assertTrue(permit.test(i1, i2, msg(VERB1, MSG2)));
Assert.assertEquals(counter.get(), 2);
// filter chain gets interrupted because a higher level filter returns no match
- Assert.assertTrue(filters.permit(i2, i1, msg(VERB1, MSG1)));
+ Assert.assertTrue(permit.test(i2, i1, msg(VERB1, MSG1)));
Assert.assertEquals(counter.get(), 2);
- Assert.assertTrue(filters.permit(i2, i1, msg(VERB2, MSG1)));
+ Assert.assertTrue(permit.test(i2, i1, msg(VERB2, MSG1)));
Assert.assertEquals(counter.get(), 2);
filters.reset();
- filters.allVerbs().from(3, 2).to(2, 1).drop();
- Assert.assertFalse(filters.permit(i3, i1, msg(VERB1, MSG1)));
- Assert.assertFalse(filters.permit(i3, i2, msg(VERB1, MSG1)));
- Assert.assertFalse(filters.permit(i2, i1, msg(VERB1, MSG1)));
- Assert.assertTrue(filters.permit(i2, i3, msg(VERB1, MSG1)));
- Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG1)));
- Assert.assertTrue(filters.permit(i1, i3, msg(VERB1, MSG1)));
+ filters.allVerbs().inbound(inbound).from(3, 2).to(2, 1).drop();
+ Assert.assertFalse(permit.test(i3, i1, msg(VERB1, MSG1)));
+ Assert.assertFalse(permit.test(i3, i2, msg(VERB1, MSG1)));
+ Assert.assertFalse(permit.test(i2, i1, msg(VERB1, MSG1)));
+ Assert.assertTrue(permit.test(i2, i3, msg(VERB1, MSG1)));
+ Assert.assertTrue(permit.test(i1, i2, msg(VERB1, MSG1)));
+ Assert.assertTrue(permit.test(i1, i3, msg(VERB1, MSG1)));
filters.reset();
counter.set(0);
- filters.allVerbs().from(1).to(2).messagesMatching((from, to, msg) -> {
+ filters.allVerbs().inbound(inbound).from(1).to(2).messagesMatching((from, to, msg) -> {
counter.incrementAndGet();
return false;
}).drop();
- Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG1)));
- Assert.assertTrue(filters.permit(i1, i3, msg(VERB1, MSG1)));
- Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG1)));
+ Assert.assertTrue(permit.test(i1, i2, msg(VERB1, MSG1)));
+ Assert.assertTrue(permit.test(i1, i3, msg(VERB1, MSG1)));
+ Assert.assertTrue(permit.test(i1, i2, msg(VERB1, MSG1)));
Assert.assertEquals(2, counter.get());
}
- IMessage msg(int verb, String msg)
+ private static IMessage msg(int verb, String msg)
{
return new IMessage()
{
@@ -114,7 +132,7 @@ public class MessageFiltersTest extends DistributedTestBase
public byte[] bytes() { return msg.getBytes(); }
public int id() { return 0; }
public int version() { return 0; }
- public InetAddressAndPort from() { return null; }
+ public InetSocketAddress from() { return null; }
};
}
@@ -161,8 +179,8 @@ public class MessageFiltersTest extends DistributedTestBase
AtomicInteger counter = new AtomicInteger();
- Set<Integer> verbs = new HashSet<>(Arrays.asList(MessagingService.Verb.RANGE_SLICE.ordinal(),
- MessagingService.Verb.MUTATION.ordinal()));
+ Set<Integer> verbs = Sets.newHashSet(Arrays.asList(MessagingService.Verb.RANGE_SLICE.ordinal(),
+ MessagingService.Verb.MUTATION.ordinal()));
// Reads and writes are going to time out in both directions
IMessageFilters.Filter filter = cluster.filters()
diff --git a/test/distributed/org/apache/cassandra/distributed/test/MessageForwardingTest.java b/test/distributed/org/apache/cassandra/distributed/test/MessageForwardingTest.java
index 72928e4..61ccb5f 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/MessageForwardingTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/MessageForwardingTest.java
@@ -31,13 +31,13 @@ import java.util.stream.Stream;
import org.junit.Assert;
import org.junit.Test;
-import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.impl.IsolatedExecutor;
import org.apache.cassandra.distributed.impl.TracingUtil;
import org.apache.cassandra.utils.UUIDGen;
-public class MessageForwardingTest extends DistributedTestBase
+public class MessageForwardingTest extends TestBaseImpl
{
@Test
public void mutationsForwardedToAllReplicasTest()
@@ -53,12 +53,12 @@ public class MessageForwardingTest extends DistributedTestBase
{
cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v text, PRIMARY KEY (pk, ck))");
- cluster.forEach(instance -> commitCounts.put(instance.broadcastAddressAndPort().address, 0));
+ cluster.forEach(instance -> commitCounts.put(instance.broadcastAddress().getAddress(), 0));
final UUID sessionId = UUIDGen.getTimeUUID();
Stream<Future<Object[][]>> inserts = IntStream.range(0, numInserts).mapToObj((idx) ->
cluster.coordinator(1).asyncExecuteWithTracing(sessionId,
"INSERT INTO " + KEYSPACE + ".tbl(pk,ck,v) VALUES (1, 1, 'x')",
- ConsistencyLevel.ALL)
+ ConsistencyLevel.ALL)
);
// Wait for each of the futures to complete before checking the traces, don't care
@@ -66,7 +66,7 @@ public class MessageForwardingTest extends DistributedTestBase
//noinspection ResultOfMethodCallIgnored
inserts.map(IsolatedExecutor::waitOn).count();
- cluster.forEach(instance -> commitCounts.put(instance.broadcastAddressAndPort().address, 0));
+ cluster.forEach(instance -> commitCounts.put(instance.broadcastAddress().getAddress(), 0));
List<TracingUtil.TraceEntry> traces = TracingUtil.getTrace(cluster, sessionId, ConsistencyLevel.ALL);
traces.forEach(traceEntry -> {
if (traceEntry.activity.contains("Appending to commitlog"))
diff --git a/test/distributed/org/apache/cassandra/distributed/test/NativeProtocolTest.java b/test/distributed/org/apache/cassandra/distributed/test/NativeProtocolTest.java
index 45d9840..0905b92 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/NativeProtocolTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/NativeProtocolTest.java
@@ -18,50 +18,53 @@
package org.apache.cassandra.distributed.test;
+import org.apache.cassandra.distributed.impl.RowUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.SimpleStatement;
import com.datastax.driver.core.Statement;
-import org.apache.cassandra.distributed.Cluster;
-import org.apache.cassandra.distributed.impl.RowUtil;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Iterator;
+import org.apache.cassandra.distributed.api.ICluster;
import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.apache.cassandra.distributed.shared.AssertUtils.*;
-public class NativeProtocolTest extends DistributedTestBase
+// TODO: this test should be removed after running in-jvm dtests is set up via the shared API repository
+public class NativeProtocolTest extends TestBaseImpl
{
@Test
public void withClientRequests() throws Throwable
{
- try (Cluster ignored = init(Cluster.create(3,
- config -> config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL))))
+ try (ICluster ignored = init(builder().withNodes(3)
+ .withConfig(config -> config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL))
+ .start()))
{
- final com.datastax.driver.core.Cluster cluster = com.datastax.driver.core.Cluster.builder().addContactPoint("127.0.0.1").build();
- Session session = cluster.connect();
- session.execute("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck));");
- session.execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) values (1,1,1);");
- Statement select = new SimpleStatement("select * from " + KEYSPACE + ".tbl;").setConsistencyLevel(ConsistencyLevel.ALL);
- final ResultSet resultSet = session.execute(select);
- assertRows(resultSet, row(1, 1, 1));
- Assert.assertEquals(3, cluster.getMetadata().getAllHosts().size());
- session.close();
- cluster.close();
+
+ try (com.datastax.driver.core.Cluster cluster = com.datastax.driver.core.Cluster.builder().addContactPoint("127.0.0.1").build();
+ Session session = cluster.connect())
+ {
+ session.execute("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck));");
+ session.execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) values (1,1,1);");
+ Statement select = new SimpleStatement("select * from " + KEYSPACE + ".tbl;").setConsistencyLevel(ConsistencyLevel.ALL);
+ final ResultSet resultSet = session.execute(select);
+ assertRows(RowUtil.toObjects(resultSet), row(1, 1, 1));
+ Assert.assertEquals(3, cluster.getMetadata().getAllHosts().size());
+ }
}
}
@Test
public void withCounters() throws Throwable
{
- try (Cluster dtCluster = init(Cluster.create(3,
- config -> config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL))))
+ try (ICluster ignored = init(builder().withNodes(3)
+ .withConfig(config -> config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL))
+ .start()))
{
final com.datastax.driver.core.Cluster cluster = com.datastax.driver.core.Cluster.builder().addContactPoint("127.0.0.1").build();
Session session = cluster.connect();
@@ -69,7 +72,7 @@ public class NativeProtocolTest extends DistributedTestBase
session.execute("UPDATE " + KEYSPACE + ".tbl set ck = ck + 10 where pk = 1;");
Statement select = new SimpleStatement("select * from " + KEYSPACE + ".tbl;").setConsistencyLevel(ConsistencyLevel.ALL);
final ResultSet resultSet = session.execute(select);
- assertRows(resultSet, row(1, 10L));
+ assertRows(RowUtil.toObjects(resultSet), row(1, 10L));
Assert.assertEquals(3, cluster.getMetadata().getAllHosts().size());
session.close();
cluster.close();
diff --git a/test/distributed/org/apache/cassandra/distributed/test/NetworkTopologyTest.java b/test/distributed/org/apache/cassandra/distributed/test/NetworkTopologyTest.java
index a9c2cee..8230fd5 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/NetworkTopologyTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/NetworkTopologyTest.java
@@ -26,22 +26,23 @@ import java.util.stream.Stream;
import org.junit.Assert;
import org.junit.Test;
-import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ICluster;
import org.apache.cassandra.distributed.api.IInstance;
-import org.apache.cassandra.distributed.impl.NetworkTopology;
-import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.shared.NetworkTopology;
-public class NetworkTopologyTest extends DistributedTestBase
+// TODO: this test should be removed after running in-jvm dtests is set up via the shared API repository
+public class NetworkTopologyTest extends TestBaseImpl
{
@Test
public void namedDcTest() throws Throwable
{
- try (Cluster cluster = Cluster.build()
- .withNodeIdTopology(Collections.singletonMap(1, NetworkTopology.dcAndRack("somewhere", "rack0")))
- .withRack("elsewhere", "firstrack", 1)
- .withRack("elsewhere", "secondrack", 2)
- .withDC("nearthere", 4)
- .start())
+ try (ICluster<IInvokableInstance> cluster = builder()
+ .withNodeIdTopology(Collections.singletonMap(1, NetworkTopology.dcAndRack("somewhere", "rack0")))
+ .withRack("elsewhere", "firstrack", 1)
+ .withRack("elsewhere", "secondrack", 2)
+ .withDC("nearthere", 4)
+ .start())
{
Assert.assertEquals(1, cluster.stream("somewhere").count());
Assert.assertEquals(1, cluster.stream("elsewhere", "firstrack").count());
@@ -61,9 +62,8 @@ public class NetworkTopologyTest extends DistributedTestBase
public void automaticNamedDcTest() throws Throwable
{
- try (Cluster cluster = Cluster.build()
- .withRacks(2, 1, 3)
- .start())
+ try (ICluster cluster = builder().withRacks(2, 1, 3)
+ .start())
{
Assert.assertEquals(6, cluster.stream().count());
Assert.assertEquals(3, cluster.stream("datacenter1").count());
@@ -74,27 +74,25 @@ public class NetworkTopologyTest extends DistributedTestBase
@Test(expected = IllegalStateException.class)
public void noCountsAfterNamingDCsTest()
{
- Cluster.build()
- .withDC("nameddc", 1)
- .withDCs(1);
+ builder().withDC("nameddc", 1)
+ .withDCs(1);
}
@Test(expected = IllegalStateException.class)
public void mustProvideNodeCountBeforeWithDCsTest()
{
- Cluster.build()
- .withDCs(1);
+ builder().withDCs(1);
}
@Test(expected = IllegalStateException.class)
public void noEmptyNodeIdTopologyTest()
{
- Cluster.build().withNodeIdTopology(Collections.emptyMap());
+ builder().withNodeIdTopology(Collections.emptyMap());
}
@Test(expected = IllegalStateException.class)
public void noHolesInNodeIdTopologyTest()
{
- Cluster.build().withNodeIdTopology(Collections.singletonMap(2, NetworkTopology.dcAndRack("doomed", "rack")));
+ builder().withNodeIdTopology(Collections.singletonMap(2, NetworkTopology.dcAndRack("doomed", "rack")));
}
-}
+}
\ No newline at end of file
diff --git a/test/distributed/org/apache/cassandra/distributed/test/NodeToolTest.java b/test/distributed/org/apache/cassandra/distributed/test/NodeToolTest.java
index e209d1d..1d78152 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/NodeToolTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/NodeToolTest.java
@@ -24,7 +24,7 @@ import org.apache.cassandra.distributed.Cluster;
import static org.junit.Assert.assertEquals;
-public class NodeToolTest extends DistributedTestBase
+public class NodeToolTest extends TestBaseImpl
{
@Test
public void test() throws Throwable
diff --git a/test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java b/test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java
index 09f40e4..808b95c 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java
@@ -26,7 +26,6 @@ import java.nio.file.Path;
import java.sql.Date;
import java.text.SimpleDateFormat;
import java.time.Instant;
-import java.util.List;
import java.util.function.Consumer;
import javax.management.MBeanServer;
@@ -34,17 +33,17 @@ import org.junit.Ignore;
import org.junit.Test;
import com.sun.management.HotSpotDiagnosticMXBean;
-import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.distributed.Cluster;
-import org.apache.cassandra.distributed.impl.InstanceConfig;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
import org.apache.cassandra.service.CassandraDaemon;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.SigarLibrary;
import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
-import static org.apache.cassandra.distributed.api.Feature.NETWORK;
import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
/* Resource Leak Test - useful when tracking down issues with in-JVM framework cleanup.
* All objects referencing the InstanceClassLoader need to be garbage collected or
@@ -59,7 +58,7 @@ import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
* but it shows that the file handles for Data/Index files are being leaked.
*/
@Ignore
-public class ResourceLeakTest extends DistributedTestBase
+public class ResourceLeakTest extends TestBaseImpl
{
// Parameters to adjust while hunting for leaks
final int numTestLoops = 1; // Set this value high to crash on leaks, or low when tracking down an issue.
@@ -141,7 +140,7 @@ public class ResourceLeakTest extends DistributedTestBase
}
}
- void doTest(int numClusterNodes, Consumer<InstanceConfig> updater) throws Throwable
+ void doTest(int numClusterNodes, Consumer<IInstanceConfig> updater) throws Throwable
{
for (int loop = 0; loop < numTestLoops; loop++)
{
@@ -149,7 +148,7 @@ public class ResourceLeakTest extends DistributedTestBase
try (Cluster cluster = Cluster.build(numClusterNodes).withConfig(updater).start())
{
if (cluster.get(1).config().has(GOSSIP)) // Wait for gossip to settle on the seed node
- cluster.get(1).runOnInstance(() -> CassandraDaemon.waitForGossipToSettle());
+ cluster.get(1).runOnInstance(CassandraDaemon::waitForGossipToSettle);
init(cluster);
String tableName = "tbl" + loop;
diff --git a/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWritePathTest.java b/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWritePathTest.java
new file mode 100644
index 0000000..8c9e8af
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWritePathTest.java
@@ -0,0 +1,225 @@
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ICluster;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+
+import static org.apache.cassandra.distributed.shared.AssertUtils.*;
+
+// TODO: this test should be removed after running in-jvm dtests is set up via the shared API repository
+public class SimpleReadWritePathTest extends TestBaseImpl
+{
+ private static final TestBaseImpl impl = new TestBaseImpl();
+ private static ICluster cluster;
+
+ @BeforeClass
+ public static void before() throws IOException
+ {
+ cluster = init(impl.builder().withNodes(3).start());
+ }
+
+ @AfterClass
+ public static void after() throws Exception
+ {
+ cluster.close();
+ }
+
+ @After
+ public void afterEach()
+ {
+ cluster.schemaChange("DROP KEYSPACE IF EXISTS " + KEYSPACE);
+ init(cluster);
+ }
+
+ @Test
+ public void coordinatorReadTest() throws Throwable
+ {
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+
+ cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
+ cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, 2)");
+ cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 3, 3)");
+
+ assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?",
+ ConsistencyLevel.ALL,
+ 1),
+ row(1, 1, 1),
+ row(1, 2, 2),
+ row(1, 3, 3));
+ }
+
+ @Test
+ public void largeMessageTest() throws Throwable
+ {
+ int largeMessageThreshold = 1024 * 64;
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v text, PRIMARY KEY (pk, ck))");
+ StringBuilder builder = new StringBuilder();
+ for (int i = 0; i < largeMessageThreshold; i++)
+ builder.append('a');
+ String s = builder.toString();
+ cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, ?)",
+ ConsistencyLevel.ALL,
+ s);
+ assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?",
+ ConsistencyLevel.ALL,
+ 1),
+ row(1, 1, s));
+ }
+
+ @Test
+ public void coordinatorWriteTest() throws Throwable
+ {
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+
+ cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)",
+ ConsistencyLevel.QUORUM);
+
+ for (int i = 0; i < 3; i++)
+ {
+ assertRows(cluster.get(1).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"),
+ row(1, 1, 1));
+ }
+
+ assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
+ ConsistencyLevel.QUORUM),
+ row(1, 1, 1));
+ }
+
+ @Test
+ public void readRepairTest() throws Throwable
+ {
+
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+
+ cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
+ cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
+
+ assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"));
+
+ assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
+ ConsistencyLevel.ALL), // ensure node3 in preflist
+ row(1, 1, 1));
+
+ // Verify that data got repaired to the third node
+ assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"),
+ row(1, 1, 1));
+ }
+
+ @Test
+ public void simplePagedReadsTest() throws Throwable
+ {
+
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+
+ int size = 100;
+ Object[][] results = new Object[size][];
+ for (int i = 0; i < size; i++)
+ {
+ cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)",
+ ConsistencyLevel.QUORUM,
+ i, i);
+ results[i] = new Object[]{ 1, i, i };
+ }
+
+ // Make sure paged read returns same results with different page sizes
+ for (int pageSize : new int[]{ 1, 2, 3, 5, 10, 20, 50 })
+ {
+ assertRows(cluster.coordinator(1).executeWithPaging("SELECT * FROM " + KEYSPACE + ".tbl",
+ ConsistencyLevel.QUORUM,
+ pageSize),
+ results);
+ }
+ }
+
+ @Test
+ public void pagingWithRepairTest() throws Throwable
+ {
+
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+
+ int size = 100;
+ Object[][] results = new Object[size][];
+ for (int i = 0; i < size; i++)
+ {
+ // Make sure that data lands on different nodes and not coordinator
+ cluster.get(i % 2 == 0 ? 2 : 3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)",
+ i, i);
+
+ results[i] = new Object[]{ 1, i, i };
+ }
+
+ // Make sure paged read returns same results with different page sizes
+ for (int pageSize : new int[]{ 1, 2, 3, 5, 10, 20, 50 })
+ {
+ assertRows(cluster.coordinator(1).executeWithPaging("SELECT * FROM " + KEYSPACE + ".tbl",
+ ConsistencyLevel.ALL,
+ pageSize),
+ results);
+ }
+
+ assertRows(cluster.get(1).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl"),
+ results);
+ }
+
+ @Test
+ public void pagingTests() throws Throwable
+ {
+ try (ICluster singleNode = init(builder().withNodes(1).withSubnet(1).start()))
+ {
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+ singleNode.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+
+ for (int i = 0; i < 10; i++)
+ {
+ for (int j = 0; j < 10; j++)
+ {
+ cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)",
+ ConsistencyLevel.QUORUM,
+ i, j, i + i);
+ singleNode.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)",
+ ConsistencyLevel.QUORUM,
+ i, j, i + i);
+ }
+ }
+
+ int[] pageSizes = new int[]{ 1, 2, 3, 5, 10, 20, 50 };
+ String[] statements = new String[]{ "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5",
+ "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck >= 5",
+ "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10",
+ "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 LIMIT 3",
+ "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck >= 5 LIMIT 2",
+ "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10 LIMIT 2",
+ "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 ORDER BY ck DESC",
+ "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck >= 5 ORDER BY ck DESC",
+ "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10 ORDER BY ck DESC",
+ "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 ORDER BY ck DESC LIMIT 3",
+ "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck >= 5 ORDER BY ck DESC LIMIT 2",
+ "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10 ORDER BY ck DESC LIMIT 2",
+ "SELECT DISTINCT pk FROM " + KEYSPACE + ".tbl LIMIT 3",
+ "SELECT DISTINCT pk FROM " + KEYSPACE + ".tbl WHERE pk IN (3,5,8,10)",
+ "SELECT DISTINCT pk FROM " + KEYSPACE + ".tbl WHERE pk IN (3,5,8,10) LIMIT 2"
+ };
+ for (String statement : statements)
+ {
+ for (int pageSize : pageSizes)
+ {
+ assertRows(cluster.coordinator(1)
+ .executeWithPaging(statement,
+ ConsistencyLevel.QUORUM, pageSize),
+ singleNode.coordinator(1)
+ .executeWithPaging(statement,
+ ConsistencyLevel.QUORUM, Integer.MAX_VALUE));
+ }
+ }
+ }
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java b/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
new file mode 100644
index 0000000..a89a352
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import org.junit.After;
+import org.junit.BeforeClass;
+
+import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.distributed.shared.Builder;
+import org.apache.cassandra.distributed.shared.DistributedTestBase;
+
+public class TestBaseImpl extends DistributedTestBase
+{
+ protected static final TestBaseImpl impl = new TestBaseImpl();
+
+ @After
+ public void afterEach() {
+ super.afterEach();
+ }
+
+ @BeforeClass
+ public static void beforeClass() throws Throwable {
+ ICluster.setup();
+ }
+
+ @Override
+ public <I extends IInstance, C extends ICluster> Builder<I, C> builder() {
+ // This is definitely not the smartest solution, but given the complexity of the alternatives and low risk, we can just rely on the
+ // fact that this code is going to work accross _all_ versions.
+ return (Builder<I, C>) org.apache.cassandra.distributed.Cluster.build();
+ }
+}
\ No newline at end of file
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReadRepairTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReadRepairTest.java
index 31f4b84..e69e38a 100644
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReadRepairTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReadRepairTest.java
@@ -20,11 +20,10 @@ package org.apache.cassandra.distributed.upgrade;
import org.junit.Test;
-import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.distributed.impl.Versions;
-import org.apache.cassandra.distributed.test.DistributedTestBase;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.shared.Versions;
-import static org.apache.cassandra.distributed.impl.Versions.find;
+import static org.apache.cassandra.distributed.shared.Versions.find;
public class MixedModeReadRepairTest extends UpgradeTestBase
{
@@ -35,17 +34,17 @@ public class MixedModeReadRepairTest extends UpgradeTestBase
.nodes(2)
.upgrade(Versions.Major.v22, Versions.Major.v30)
.nodesToUpgrade(2)
- .setup((cluster) -> cluster.schemaChange("CREATE TABLE " + DistributedTestBase.KEYSPACE + ".tbl (pk ascii, b boolean, v blob, PRIMARY KEY (pk)) WITH COMPACT STORAGE"))
+ .setup((cluster) -> cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk ascii, b boolean, v blob, PRIMARY KEY (pk)) WITH COMPACT STORAGE"))
.runAfterClusterUpgrade((cluster) -> {
// now node2 is 3.0 and node1 is 2.2
// make sure 2.2 side does not get the mutation
- cluster.get(2).executeInternal("DELETE FROM " + DistributedTestBase.KEYSPACE + ".tbl WHERE pk = ?",
+ cluster.get(2).executeInternal("DELETE FROM " + KEYSPACE + ".tbl WHERE pk = ?",
"something");
// trigger a read repair
- cluster.coordinator(1).execute("SELECT * FROM " + DistributedTestBase.KEYSPACE + ".tbl WHERE pk = ?",
+ cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?",
ConsistencyLevel.ALL,
"something");
- cluster.get(1).flush(DistributedTestBase.KEYSPACE);
+ cluster.get(1).flush(KEYSPACE);
// upgrade node1 to 3.0
cluster.get(1).shutdown().get();
Versions allVersions = find();
@@ -53,7 +52,7 @@ public class MixedModeReadRepairTest extends UpgradeTestBase
cluster.get(1).startup();
// and make sure the sstables are readable
- cluster.get(1).forceCompact(DistributedTestBase.KEYSPACE, "tbl");
+ cluster.get(1).forceCompact(KEYSPACE, "tbl");
}).run();
}
}
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTest.java
index 9d2534a..93ae78e 100644
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTest.java
@@ -20,9 +20,11 @@ package org.apache.cassandra.distributed.upgrade;
import org.junit.Test;
-import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.distributed.impl.Versions;
-import org.apache.cassandra.distributed.test.DistributedTestBase;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.shared.Versions;
+
+import junit.framework.Assert;
+import static org.apache.cassandra.distributed.shared.AssertUtils.*;
public class UpgradeTest extends UpgradeTestBase
{
@@ -31,22 +33,22 @@ public class UpgradeTest extends UpgradeTestBase
public void upgradeTest() throws Throwable
{
new TestCase()
- .upgrade(Versions.Major.v22, Versions.Major.v30, Versions.Major.v3X)
- .setup((cluster) -> {
- cluster.schemaChange("CREATE TABLE " + DistributedTestBase.KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
-
- cluster.get(1).executeInternal("INSERT INTO " + DistributedTestBase.KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
- cluster.get(2).executeInternal("INSERT INTO " + DistributedTestBase.KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, 2)");
- cluster.get(3).executeInternal("INSERT INTO " + DistributedTestBase.KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 3, 3)");
- })
- .runAfterClusterUpgrade((cluster) -> {
- DistributedTestBase.assertRows(cluster.coordinator(1).execute("SELECT * FROM " + DistributedTestBase.KEYSPACE + ".tbl WHERE pk = ?",
- ConsistencyLevel.ALL,
- 1),
- DistributedTestBase.row(1, 1, 1),
- DistributedTestBase.row(1, 2, 2),
- DistributedTestBase.row(1, 3, 3));
- }).run();
+ .upgrade(Versions.Major.v22, Versions.Major.v30, Versions.Major.v3X)
+ .setup((cluster) -> {
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+
+ cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
+ cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, 2)");
+ cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 3, 3)");
+ })
+ .runAfterClusterUpgrade((cluster) -> {
+ assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?",
+ ConsistencyLevel.ALL,
+ 1),
+ row(1, 1, 1),
+ row(1, 2, 2),
+ row(1, 3, 3));
+ }).run();
}
-}
+}
\ No newline at end of file
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
index 27094d8..6ae1d7b 100644
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
@@ -25,16 +25,24 @@ import java.util.List;
import java.util.Set;
import org.apache.cassandra.distributed.UpgradeableCluster;
+import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.api.IInstance;
import org.apache.cassandra.distributed.impl.Instance;
-import org.apache.cassandra.distributed.impl.Versions;
-import org.apache.cassandra.distributed.impl.Versions.Version;
-import org.apache.cassandra.distributed.test.DistributedTestBase;
-import static org.apache.cassandra.distributed.impl.Versions.Major;
-import static org.apache.cassandra.distributed.impl.Versions.find;
+import org.apache.cassandra.distributed.shared.Builder;
+import org.apache.cassandra.distributed.shared.DistributedTestBase;
+import org.apache.cassandra.distributed.shared.Versions;
+import static org.apache.cassandra.distributed.shared.Versions.Version;
+import static org.apache.cassandra.distributed.shared.Versions.Major;
+import static org.apache.cassandra.distributed.shared.Versions.find;
public class UpgradeTestBase extends DistributedTestBase
{
+ public <I extends IInstance, C extends ICluster> Builder<I, C> builder()
+ {
+ return (Builder<I, C>) UpgradeableCluster.build();
+ }
+
public static interface RunOnCluster
{
public void run(UpgradeableCluster cluster) throws Throwable;
@@ -164,4 +172,4 @@ public class UpgradeTestBase extends DistributedTestBase
}
}
-}
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org