You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2020/03/20 15:49:26 UTC
[cassandra-in-jvm-dtest-api] 01/03: Introduce the extracted in-JVM
DTest API
This is an automated email from the ASF dual-hosted git repository.
ifesdjeen pushed a commit to annotated tag 0.0.1-1
in repository https://gitbox.apache.org/repos/asf/cassandra-in-jvm-dtest-api.git
commit 10a3729e259c47101da2c53505177f5159e603da
Author: Alex Petrov <ol...@gmail.com>
AuthorDate: Mon Feb 24 12:06:09 2020 +0100
Introduce the extracted in-JVM DTest API
Patch by Alex Petrov; reviewed by David Capwell for CASSANDRA-15539.
---
pom.xml | 94 +++++++
.../distributed/api/ConsistencyLevel.java | 34 +++
.../apache/cassandra/distributed/api/Feature.java | 24 ++
.../apache/cassandra/distributed/api/ICluster.java | 41 +++
.../cassandra/distributed/api/ICoordinator.java | 34 +++
.../cassandra/distributed/api/IInstance.java | 66 +++++
.../cassandra/distributed/api/IInstanceConfig.java | 101 +++++++
.../distributed/api/IInvokableInstance.java | 67 +++++
.../distributed/api/IIsolatedExecutor.java | 128 +++++++++
.../apache/cassandra/distributed/api/IListen.java | 28 ++
.../apache/cassandra/distributed/api/IMessage.java | 38 +++
.../cassandra/distributed/api/IMessageFilters.java | 56 ++++
.../distributed/api/IUpgradeableInstance.java | 29 +++
.../cassandra/distributed/api/TokenSupplier.java | 32 +++
.../cassandra/distributed/shared/Builder.java | 233 +++++++++++++++++
.../distributed/shared/DistributedTestBase.java | 205 +++++++++++++++
.../distributed/shared/InstanceClassLoader.java | 116 +++++++++
.../distributed/shared/MessageFilters.java | 165 ++++++++++++
.../distributed/shared/MessageFiltersTest.java | 113 ++++++++
.../distributed/shared/NetworkTopology.java | 216 +++++++++++++++
.../distributed/shared/ThrowingRunnable.java | 38 +++
.../cassandra/distributed/shared/Versions.java | 201 ++++++++++++++
.../cassandra/distributed/test/BootstrapTest.java | 104 ++++++++
.../test/DistributedReadWritePathTest.java | 290 +++++++++++++++++++++
.../distributed/test/GossipSettlesTest.java | 43 +++
.../distributed/test/LargeColumnTest.java | 96 +++++++
.../distributed/test/NativeProtocolTest.java | 79 ++++++
.../distributed/test/NetworkTopologyTest.java | 98 +++++++
.../distributed/test/SimpleReadWriteTest.java | 270 +++++++++++++++++++
test/conf/logback-dtest.xml | 77 ++++++
30 files changed, 3116 insertions(+)
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..4b92a62
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,94 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache</groupId>
+ <artifactId>apache</artifactId>
+ <version>10</version>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.apache.cassandra</groupId>
+ <artifactId>dtest-api</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ <name>In JVM Test API</name>
+ <description>In JVM Test API</description>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+
+ <licenses>
+ <license>
+ <name>Apache License, Version 2.0</name>
+ <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+ <distribution>repo</distribution>
+ </license>
+ </licenses>
+
+ <dependencies>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.12</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.cassandra</groupId>
+ <artifactId>in-jvm-dtest-cassandra-tryout</artifactId>
+ <version>0.0.7-3.11-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.8.0</version>
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ </configuration>
+ </plugin>
+
+
+ <plugin>
+ <artifactId>maven-deploy-plugin</artifactId>
+ <version>2.8.2</version>
+ <executions>
+ <execution>
+ <id>default-deploy</id>
+ <phase>deploy</phase>
+ <goals>
+ <goal>deploy</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-gpg-plugin</artifactId>
+ <version>1.5</version>
+ <executions>
+ <execution>
+ <id>sign-artifacts</id>
+ <phase>verify</phase>
+ <goals>
+ <goal>sign</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ </plugins>
+ </build>
+
+ <scm>
+ <connection>scm:git:git@github.com:apache/cassandra-in-jvm-dtests.git</connection>
+ <developerConnection>scm:git:git@github.com:apache/cassandra-in-jvm-dtests.git</developerConnection>
+ <url>git@github.com:apache/cassandra-in-jvm-dtests.git</url>
+ <tag>HEAD</tag>
+ </scm>
+</project>
+
diff --git a/src/main/java/org/apache/cassandra/distributed/api/ConsistencyLevel.java b/src/main/java/org/apache/cassandra/distributed/api/ConsistencyLevel.java
new file mode 100644
index 0000000..3c057f8
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/api/ConsistencyLevel.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.api;
+
+public enum ConsistencyLevel {
+ ANY,
+ ONE,
+ TWO,
+ THREE,
+ QUORUM,
+ ALL,
+ LOCAL_QUORUM,
+ EACH_QUORUM,
+ SERIAL,
+ LOCAL_SERIAL,
+ LOCAL_ONE,
+ NODE_LOCAL
+}
diff --git a/src/main/java/org/apache/cassandra/distributed/api/Feature.java b/src/main/java/org/apache/cassandra/distributed/api/Feature.java
new file mode 100644
index 0000000..b4ba036
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/api/Feature.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.api;
+
+public enum Feature
+{
+ NETWORK, GOSSIP, NATIVE_PROTOCOL
+}
diff --git a/src/main/java/org/apache/cassandra/distributed/api/ICluster.java b/src/main/java/org/apache/cassandra/distributed/api/ICluster.java
new file mode 100644
index 0000000..6d86e99
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/api/ICluster.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.api;
+
+import org.apache.cassandra.distributed.shared.NetworkTopology;
+
+import java.util.stream.Stream;
+
+public interface ICluster<I extends IInstance> extends AutoCloseable
+{
+ void startup();
+ I bootstrap(IInstanceConfig config);
+ I get(int i);
+ I get(NetworkTopology.AddressAndPort endpoint);
+ ICoordinator coordinator(int node);
+ void schemaChange(String query);
+ void schemaChange(String statement, int instance);
+
+ int size();
+
+ Stream<I> stream();
+ Stream<I> stream(String dcName);
+ Stream<I> stream(String dcName, String rackName);
+ IMessageFilters filters();
+}
diff --git a/src/main/java/org/apache/cassandra/distributed/api/ICoordinator.java b/src/main/java/org/apache/cassandra/distributed/api/ICoordinator.java
new file mode 100644
index 0000000..da17df6
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/api/ICoordinator.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.api;
+
+import java.util.Iterator;
+import java.util.UUID;
+import java.util.concurrent.Future;
+
+// The cross-version API requires that a Coordinator can be constructed without any constructor arguments
+public interface ICoordinator
+{
+ Object[][] execute(String query, ConsistencyLevel consistencyLevel, Object... boundValues);
+ Iterator<Object[]> executeWithPaging(String query, ConsistencyLevel consistencyLevel, int pageSize, Object... boundValues);
+
+ Future<Object[][]> asyncExecuteWithTracing(UUID sessionId, String query, ConsistencyLevel consistencyLevel, Object... boundValues);
+ Object[][] executeWithTracing(UUID sessionId, String query, ConsistencyLevel consistencyLevel, Object... boundValues);
+ IInstance instance();
+}
diff --git a/src/main/java/org/apache/cassandra/distributed/api/IInstance.java b/src/main/java/org/apache/cassandra/distributed/api/IInstance.java
new file mode 100644
index 0000000..dec3549
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/api/IInstance.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.api;
+
+import org.apache.cassandra.distributed.shared.NetworkTopology;
+
+import java.util.UUID;
+import java.util.concurrent.Future;
+
+// The cross-version API requires that an Instance has a constructor signature of (IInstanceConfig, ClassLoader)
+public interface IInstance extends IIsolatedExecutor
+{
+ ICoordinator coordinator();
+ IListen listen();
+
+ void schemaChangeInternal(String query);
+ public Object[][] executeInternal(String query, Object... args);
+
+ IInstanceConfig config();
+ NetworkTopology.AddressAndPort broadcastAddress();
+ UUID schemaVersion();
+
+ void startup();
+ boolean isShutdown();
+ Future<Void> shutdown();
+ Future<Void> shutdown(boolean graceful);
+
+ int liveMemberCount();
+
+ int nodetool(String... commandAndArgs);
+ void uncaughtException(Thread t, Throwable e);
+
+ /**
+ * Return the number of times the instance tried to call {@link System#exit(int)}.
+ *
+ * When the instance is shutdown, this state should be saved, but in case not possible should return {@code -1}
+ * to indicate "unknown".
+ */
+ long killAttempts();
+
+ // these methods are not for external use, but for simplicity we leave them public and on the normal IInstance interface
+ void startup(ICluster cluster);
+ void receiveMessage(IMessage message);
+
+ int getMessagingVersion();
+ void setMessagingVersion(NetworkTopology.AddressAndPort addressAndPort, int version);
+
+ void flush(String keyspace);
+ void forceCompact(String keyspace, String table);
+}
diff --git a/src/main/java/org/apache/cassandra/distributed/api/IInstanceConfig.java b/src/main/java/org/apache/cassandra/distributed/api/IInstanceConfig.java
new file mode 100644
index 0000000..05969f8
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/api/IInstanceConfig.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.api;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.function.Function;
+
+import org.apache.cassandra.distributed.shared.NetworkTopology;
+import org.apache.cassandra.distributed.shared.Versions;
+
+public interface IInstanceConfig
+{
+ IInstanceConfig with(Feature featureFlag);
+ IInstanceConfig with(Feature... flags);
+
+ int num();
+ UUID hostId();
+ NetworkTopology.AddressAndPort broadcastAddress();
+ NetworkTopology networkTopology();
+
+ String localRack();
+ String localDatacenter();
+
+ /**
+ * write the specified parameters to the Config object; we do not specify Config as the type to support a Config
+ * from any ClassLoader; the implementation must not directly access any fields of the Object, or cast it, but
+ * must use the reflection API to modify the state
+ */
+ void propagate(Object writeToConfig, Map<Class<?>, Function<Object, Object>>executor);
+
+ /**
+ * Validates whether the config properties are within range of accepted values.
+ */
+ void validate();
+ IInstanceConfig set(String fieldName, Object value);
+ Object get(String fieldName);
+ String getString(String fieldName);
+ int getInt(String fieldName);
+ boolean has(Feature featureFlag);
+
+ public IInstanceConfig forVersion(Versions.Major major);
+
+ public static class ParameterizedClass
+ {
+ public static final String CLASS_NAME = "class_name";
+ public static final String PARAMETERS = "parameters";
+
+ public String class_name;
+ public Map<String, String> parameters;
+
+ public ParameterizedClass(String class_name, Map<String, String> parameters)
+ {
+ this.class_name = class_name;
+ this.parameters = parameters;
+ }
+
+ @SuppressWarnings("unchecked")
+ public ParameterizedClass(Map<String, ?> p)
+ {
+ this((String)p.get(CLASS_NAME),
+ p.containsKey(PARAMETERS) ? (Map<String, String>)((List<?>)p.get(PARAMETERS)).get(0) : null);
+ }
+
+ @Override
+ public boolean equals(Object that)
+ {
+ return that instanceof ParameterizedClass && equals((ParameterizedClass) that);
+ }
+
+ public boolean equals(ParameterizedClass that)
+ {
+ return Objects.equals(class_name, that.class_name) && Objects.equals(parameters, that.parameters);
+ }
+
+ @Override
+ public String toString()
+ {
+ return class_name + (parameters == null ? "" : parameters.toString());
+ }
+ }
+
+}
diff --git a/src/main/java/org/apache/cassandra/distributed/api/IInvokableInstance.java b/src/main/java/org/apache/cassandra/distributed/api/IInvokableInstance.java
new file mode 100644
index 0000000..1dbc4cc
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/api/IInvokableInstance.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.api;
+
+import java.io.Serializable;
+import java.util.concurrent.Future;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import org.apache.cassandra.distributed.api.IInstance;
+
+/**
+ * This version is only supported for a Cluster running the same code as the test environment, and permits
+ * ergonomic cross-node behaviours, without editing the cross-version API.
+ *
+ * A lambda can be written tto be invoked on any or all of the nodes.
+ *
+ * The reason this cannot (easily) be made cross-version is that the lambda is tied to the declaring class, which will
+ * not be the same in the alternate version. Even were it not, there would likely be a runtime linkage error given
+ * any code divergence.
+ */
+public interface IInvokableInstance extends IInstance
+{
+ public default <O> CallableNoExcept<Future<O>> asyncCallsOnInstance(SerializableCallable<O> call) { return async(transfer(call)); }
+ public default <O> CallableNoExcept<O> callsOnInstance(SerializableCallable<O> call) { return sync(transfer(call)); }
+ public default <O> O callOnInstance(SerializableCallable<O> call) { return callsOnInstance(call).call(); }
+
+ public default CallableNoExcept<Future<?>> asyncRunsOnInstance(SerializableRunnable run) { return async(transfer(run)); }
+ public default Runnable runsOnInstance(SerializableRunnable run) { return sync(transfer(run)); }
+ public default void runOnInstance(SerializableRunnable run) { runsOnInstance(run).run(); }
+
+ public default <I> Function<I, Future<?>> asyncAcceptsOnInstance(SerializableConsumer<I> consumer) { return async(transfer(consumer)); }
+ public default <I> Consumer<I> acceptsOnInstance(SerializableConsumer<I> consumer) { return sync(transfer(consumer)); }
+
+ public default <I1, I2> BiFunction<I1, I2, Future<?>> asyncAcceptsOnInstance(SerializableBiConsumer<I1, I2> consumer) { return async(transfer(consumer)); }
+ public default <I1, I2> BiConsumer<I1, I2> acceptsOnInstance(SerializableBiConsumer<I1, I2> consumer) { return sync(transfer(consumer)); }
+
+ public default <I, O> Function<I, Future<O>> asyncAppliesOnInstance(SerializableFunction<I, O> f) { return async(transfer(f)); }
+ public default <I, O> Function<I, O> appliesOnInstance(SerializableFunction<I, O> f) { return sync(transfer(f)); }
+
+ public default <I1, I2, O> BiFunction<I1, I2, Future<O>> asyncAppliesOnInstance(SerializableBiFunction<I1, I2, O> f) { return async(transfer(f)); }
+ public default <I1, I2, O> BiFunction<I1, I2, O> appliesOnInstance(SerializableBiFunction<I1, I2, O> f) { return sync(transfer(f)); }
+
+ public default <I1, I2, I3, O> TriFunction<I1, I2, I3, Future<O>> asyncAppliesOnInstance(SerializableTriFunction<I1, I2, I3, O> f) { return async(transfer(f)); }
+ public default <I1, I2, I3, O> TriFunction<I1, I2, I3, O> appliesOnInstance(SerializableTriFunction<I1, I2, I3, O> f) { return sync(transfer(f)); }
+
+ public <E extends Serializable> E transfer(E object);
+
+}
diff --git a/src/main/java/org/apache/cassandra/distributed/api/IIsolatedExecutor.java b/src/main/java/org/apache/cassandra/distributed/api/IIsolatedExecutor.java
new file mode 100644
index 0000000..d811b17
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/api/IIsolatedExecutor.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.api;
+
+import java.io.Serializable;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+/**
+ * Represents a clean way to handoff evaluation of some work to an executor associated
+ * with a node's lifetime.
+ *
+ * There is no transfer of execution to the parallel class hierarchy.
+ *
+ * Classes, such as Instance, that are themselves instantiated on the correct ClassLoader, utilise this class
+ * to ensure the lifetime of any thread evaluating one of its method invocations matches the lifetime of the class itself.
+ * Since they are instantiated on the correct ClassLoader, sharing only the interface, there is no serialization necessary.
+ */
+public interface IIsolatedExecutor
+{
+ public interface CallableNoExcept<O> extends Callable<O> { public O call(); }
+ public interface SerializableCallable<O> extends CallableNoExcept<O>, Serializable { }
+ public interface SerializableRunnable extends Runnable, Serializable {}
+ public interface SerializableConsumer<O> extends Consumer<O>, Serializable {}
+ public interface SerializableSupplier<O> extends Supplier<O>, Serializable {}
+ public interface SerializableBiConsumer<I1, I2> extends BiConsumer<I1, I2>, Serializable {}
+ public interface SerializableFunction<I, O> extends Function<I, O>, Serializable {}
+ public interface SerializableBiFunction<I1, I2, O> extends BiFunction<I1, I2, O>, Serializable {}
+ public interface TriFunction<I1, I2, I3, O>
+ {
+ O apply(I1 i1, I2 i2, I3 i3);
+ }
+ public interface SerializableTriFunction<I1, I2, I3, O> extends Serializable, TriFunction<I1, I2, I3, O> { }
+
+ Future<Void> shutdown();
+
+ /**
+ * Convert the execution to one performed asynchronously on the IsolatedExecutor, returning a Future of the execution result
+ */
+ <O> CallableNoExcept<Future<O>> async(CallableNoExcept<O> call);
+
+ /**
+ * Convert the execution to one performed synchronously on the IsolatedExecutor
+ */
+ <O> CallableNoExcept<O> sync(CallableNoExcept<O> call);
+
+ /**
+ * Convert the execution to one performed asynchronously on the IsolatedExecutor, returning a Future of the execution result
+ */
+ CallableNoExcept<Future<?>> async(Runnable run);
+
+ /**
+ * Convert the execution to one performed synchronously on the IsolatedExecutor
+ */
+ Runnable sync(Runnable run);
+
+ /**
+ * Convert the execution to one performed asynchronously on the IsolatedExecutor, returning a Future of the execution result
+ */
+ <I> Function<I, Future<?>> async(Consumer<I> consumer);
+
+ /**
+ * Convert the execution to one performed synchronously on the IsolatedExecutor
+ */
+ <I> Consumer<I> sync(Consumer<I> consumer);
+
+ /**
+ * Convert the execution to one performed asynchronously on the IsolatedExecutor, returning a Future of the execution result
+ */
+ <I1, I2> BiFunction<I1, I2, Future<?>> async(BiConsumer<I1, I2> consumer);
+
+ /**
+ * Convert the execution to one performed synchronously on the IsolatedExecutor
+ */
+ <I1, I2> BiConsumer<I1, I2> sync(BiConsumer<I1, I2> consumer);
+
+ /**
+ * Convert the execution to one performed asynchronously on the IsolatedExecutor, returning a Future of the execution result
+ */
+ <I, O> Function<I, Future<O>> async(Function<I, O> f);
+
+ /**
+ * Convert the execution to one performed synchronously on the IsolatedExecutor
+ */
+ <I, O> Function<I, O> sync(Function<I, O> f);
+
+ /**
+ * Convert the execution to one performed asynchronously on the IsolatedExecutor, returning a Future of the execution result
+ */
+ <I1, I2, O> BiFunction<I1, I2, Future<O>> async(BiFunction<I1, I2, O> f);
+
+ /**
+ * Convert the execution to one performed synchronously on the IsolatedExecutor
+ */
+ <I1, I2, O> BiFunction<I1, I2, O> sync(BiFunction<I1, I2, O> f);
+
+ /**
+ * Convert the execution to one performed asynchronously on the IsolatedExecutor, returning a Future of the execution result
+ */
+ <I1, I2, I3, O> TriFunction<I1, I2, I3, Future<O>> async(TriFunction<I1, I2, I3, O> f);
+
+ /**
+ * Convert the execution to one performed synchronously on the IsolatedExecutor
+ */
+ <I1, I2, I3, O> TriFunction<I1, I2, I3, O> sync(TriFunction<I1, I2, I3, O> f);
+
+}
diff --git a/src/main/java/org/apache/cassandra/distributed/api/IListen.java b/src/main/java/org/apache/cassandra/distributed/api/IListen.java
new file mode 100644
index 0000000..c2e8dd6
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/api/IListen.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.api;
+
+public interface IListen
+{
+ public interface Cancel { void cancel(); }
+
+ Cancel schema(Runnable onChange);
+
+ Cancel liveMembers(Runnable onChange);
+}
diff --git a/src/main/java/org/apache/cassandra/distributed/api/IMessage.java b/src/main/java/org/apache/cassandra/distributed/api/IMessage.java
new file mode 100644
index 0000000..da536cc
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/api/IMessage.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.api;
+
+import org.apache.cassandra.distributed.shared.NetworkTopology;
+
+import java.io.Serializable;
+
+/**
+ * A cross-version interface for delivering internode messages via message sinks.
+ *
+ * Message implementations should be serializable so we could load into instances.
+ */
+public interface IMessage extends Serializable
+{
+ int verb();
+ byte[] bytes();
+ // TODO: need to make this a long
+ int id();
+ int version();
+ NetworkTopology.AddressAndPort from();
+}
diff --git a/src/main/java/org/apache/cassandra/distributed/api/IMessageFilters.java b/src/main/java/org/apache/cassandra/distributed/api/IMessageFilters.java
new file mode 100644
index 0000000..01fe972
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/api/IMessageFilters.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.api;
+
+public interface IMessageFilters
+{
+ public interface Filter
+ {
+ Filter off();
+ Filter on();
+ }
+
+ public interface Builder
+ {
+ Builder from(int ... nums);
+ Builder to(int ... nums);
+
+ /**
+ * Every message for which matcher returns `true` will be _dropped_ (assuming all
+ * other matchers in the chain will return `true` as well).
+ */
+ Builder messagesMatching(Matcher filter);
+ Filter drop();
+ }
+
+ public interface Matcher
+ {
+ boolean matches(int from, int to, IMessage message);
+ }
+
+ Builder verbs(int... verbs);
+ Builder allVerbs();
+ void reset();
+
+ /**
+ * {@code true} value returned by the implementation implies that the message was
+ * not matched by any filters and therefore should be delivered.
+ */
+ boolean permit(int from, int to, IMessage msg);
+}
diff --git a/src/main/java/org/apache/cassandra/distributed/api/IUpgradeableInstance.java b/src/main/java/org/apache/cassandra/distributed/api/IUpgradeableInstance.java
new file mode 100644
index 0000000..da864e0
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/api/IUpgradeableInstance.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.api;
+
+import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.distributed.shared.Versions;
+
+// this lives outside the api package so that we do not have to worry about inter-version compatibility
+public interface IUpgradeableInstance extends IInstance
+{
+ // only to be invoked while the node is shutdown!
+ public void setVersion(Versions.Version version);
+}
diff --git a/src/main/java/org/apache/cassandra/distributed/api/TokenSupplier.java b/src/main/java/org/apache/cassandra/distributed/api/TokenSupplier.java
new file mode 100644
index 0000000..a714cd5
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/api/TokenSupplier.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.api;
+
+public interface TokenSupplier {
+ long token(int nodeId);
+
+ static TokenSupplier evenlyDistributedTokens(int numNodes) {
+ long increment = (Long.MAX_VALUE / numNodes) * 2;
+ return (int nodeId) -> {
+ assert nodeId <= numNodes : String.format("Can not allocate a token for a node %s, since only %s nodes are allowed by the token allocation strategy",
+ nodeId, numNodes);
+ return Long.MIN_VALUE + 1 + nodeId * increment;
+ };
+ }
+}
diff --git a/src/main/java/org/apache/cassandra/distributed/shared/Builder.java b/src/main/java/org/apache/cassandra/distributed/shared/Builder.java
new file mode 100644
index 0000000..6f6adfb
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/shared/Builder.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.shared;
+
+import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.cassandra.distributed.api.TokenSupplier.evenlyDistributedTokens;
+
+public abstract class Builder<I extends IInstance, C extends ICluster> {
+
+ private final int BROADCAST_PORT = 7012;
+
+ public interface Factory<I extends IInstance, C extends ICluster> {
+ C newCluster(File root, Versions.Version version, List<IInstanceConfig> configs, ClassLoader sharedClassLoader);
+ }
+
+ private final Factory<I, C> factory;
+ private int nodeCount;
+ private int subnet;
+ private Map<Integer, NetworkTopology.DcAndRack> nodeIdTopology;
+ private TokenSupplier tokenSupplier;
+ private File root;
+ private Versions.Version version;
+ private Consumer<IInstanceConfig> configUpdater;
+
+ public Builder(Factory<I, C> factory) {
+ this.factory = factory;
+ }
+
+ public C start() throws IOException {
+ C cluster = createWithoutStarting();
+ cluster.startup();
+ return cluster;
+ }
+
+ public C createWithoutStarting() throws IOException {
+ if (root == null)
+ root = Files.createTempDirectory("dtests").toFile();
+
+ if (nodeCount <= 0)
+ throw new IllegalStateException("Cluster must have at least one node");
+
+ if (nodeIdTopology == null) {
+ nodeIdTopology = IntStream.rangeClosed(1, nodeCount).boxed()
+ .collect(Collectors.toMap(nodeId -> nodeId,
+ nodeId -> NetworkTopology.dcAndRack(dcName(0), rackName(0))));
+ }
+
+ root.mkdirs();
+
+ ClassLoader sharedClassLoader = Thread.currentThread().getContextClassLoader();
+
+ List<IInstanceConfig> configs = new ArrayList<>();
+
+ // TODO: make token allocation strategy configurable
+ if (tokenSupplier == null)
+ tokenSupplier = evenlyDistributedTokens(nodeCount);
+
+ for (int i = 0; i < nodeCount; ++i) {
+ int nodeNum = i + 1;
+ configs.add(createInstanceConfig(nodeNum));
+ }
+
+ return factory.newCluster(root, version, configs, sharedClassLoader);
+ }
+
+ public IInstanceConfig newInstanceConfig(C cluster) {
+ return createInstanceConfig(cluster.size() + 1);
+ }
+
+ protected IInstanceConfig createInstanceConfig(int nodeNum) {
+ String ipPrefix = "127.0." + subnet + ".";
+ String seedIp = ipPrefix + "1";
+ String ipAddress = ipPrefix + nodeNum;
+ long token = tokenSupplier.token(nodeNum);
+
+ NetworkTopology topology = NetworkTopology.build(ipPrefix, BROADCAST_PORT, nodeIdTopology);
+
+ IInstanceConfig config = generateConfig(nodeNum, ipAddress, topology, root, String.valueOf(token), seedIp);
+ if (configUpdater != null)
+ configUpdater.accept(config);
+
+ return config;
+ }
+
+ protected abstract IInstanceConfig generateConfig(int nodeNum, String ipAddress, NetworkTopology networkTopology, File root, String token, String seedIp);
+
+ public Builder<I, C> withTokenSupplier(TokenSupplier tokenSupplier) {
+ this.tokenSupplier = tokenSupplier;
+ return this;
+ }
+
+ public Builder<I, C> withSubnet(int subnet) {
+ this.subnet = subnet;
+ return this;
+ }
+
+ public Builder<I, C> withNodes(int nodeCount) {
+ this.nodeCount = nodeCount;
+ return this;
+ }
+
+ public Builder<I, C> withDCs(int dcCount) {
+ return withRacks(dcCount, 1);
+ }
+
+ public Builder<I, C> withRacks(int dcCount, int racksPerDC) {
+ if (nodeCount == 0)
+ throw new IllegalStateException("Node count will be calculated. Do not supply total node count in the builder");
+
+ int totalRacks = dcCount * racksPerDC;
+ int nodesPerRack = (nodeCount + totalRacks - 1) / totalRacks; // round up to next integer
+ return withRacks(dcCount, racksPerDC, nodesPerRack);
+ }
+
+ public Builder<I, C> withRacks(int dcCount, int racksPerDC, int nodesPerRack) {
+ if (nodeIdTopology != null)
+ throw new IllegalStateException("Network topology already created. Call withDCs/withRacks once or before withDC/withRack calls");
+
+ nodeIdTopology = new HashMap<>();
+ int nodeId = 1;
+ for (int dc = 1; dc <= dcCount; dc++) {
+ for (int rack = 1; rack <= racksPerDC; rack++) {
+ for (int rackNodeIdx = 0; rackNodeIdx < nodesPerRack; rackNodeIdx++)
+ nodeIdTopology.put(nodeId++, NetworkTopology.dcAndRack(dcName(dc), rackName(rack)));
+ }
+ }
+ // adjust the node count to match the allocatation
+ final int adjustedNodeCount = dcCount * racksPerDC * nodesPerRack;
+ if (adjustedNodeCount != nodeCount) {
+ assert adjustedNodeCount > nodeCount : "withRacks should only ever increase the node count";
+ System.out.println(String.format("Network topology of %s DCs with %s racks per DC and %s nodes per rack required increasing total nodes to %s",
+ dcCount, racksPerDC, nodesPerRack, adjustedNodeCount));
+ nodeCount = adjustedNodeCount;
+ }
+ return this;
+ }
+
+ public Builder<I, C> withDC(String dcName, int nodeCount) {
+ return withRack(dcName, rackName(1), nodeCount);
+ }
+
+ public Builder<I, C> withRack(String dcName, String rackName, int nodesInRack) {
+ if (nodeIdTopology == null) {
+ if (nodeCount > 0)
+ throw new IllegalStateException("Node count must not be explicitly set, or allocated using withDCs/withRacks");
+
+ nodeIdTopology = new HashMap<>();
+ }
+ for (int nodeId = nodeCount + 1; nodeId <= nodeCount + nodesInRack; nodeId++)
+ nodeIdTopology.put(nodeId, NetworkTopology.dcAndRack(dcName, rackName));
+
+ nodeCount += nodesInRack;
+ return this;
+ }
+
+ // Map of node ids to dc and rack - must be contiguous with an entry nodeId 1 to nodeCount
+ public Builder<I, C> withNodeIdTopology(Map<Integer, NetworkTopology.DcAndRack> nodeIdTopology) {
+ if (nodeIdTopology.isEmpty())
+ throw new IllegalStateException("Topology is empty. It must have an entry for every nodeId.");
+
+ IntStream.rangeClosed(1, nodeIdTopology.size()).forEach(nodeId -> {
+ if (nodeIdTopology.get(nodeId) == null)
+ throw new IllegalStateException("Topology is missing entry for nodeId " + nodeId);
+ });
+
+ if (nodeCount != nodeIdTopology.size()) {
+ nodeCount = nodeIdTopology.size();
+ System.out.println(String.format("Adjusting node count to %s for supplied network topology", nodeCount));
+ }
+
+ this.nodeIdTopology = new HashMap<>(nodeIdTopology);
+
+ return this;
+ }
+
+ public Builder<I, C> withRoot(File root) {
+ this.root = root;
+ return this;
+ }
+
+ public Builder<I, C> withVersion(Versions.Version version) {
+ this.version = version;
+ return this;
+ }
+
+ public Builder<I, C> withConfig(Consumer<IInstanceConfig> updater) {
+ this.configUpdater = updater;
+ return this;
+ }
+
+ static String dcName(int index)
+ {
+ return "datacenter" + index;
+ }
+
+ static String rackName(int index)
+ {
+ return "rack" + index;
+ }
+}
+
+
diff --git a/src/main/java/org/apache/cassandra/distributed/shared/DistributedTestBase.java b/src/main/java/org/apache/cassandra/distributed/shared/DistributedTestBase.java
new file mode 100644
index 0000000..89dc0cd
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/shared/DistributedTestBase.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.shared;
+
+import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.api.IInstance;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+public abstract class DistributedTestBase
+{
+ @After
+ public void afterEach()
+ {
+ System.runFinalization();
+ System.gc();
+ }
+
+ public abstract <I extends IInstance, C extends ICluster> Builder<I, C> builder();
+
+ public static String KEYSPACE = "distributed_test_keyspace";
+
+ // TODO: move this to Cluster.java?
+ public static void nativeLibraryWorkaround()
+ {
+ // Disable the Netty tcnative library otherwise the io.netty.internal.tcnative.CertificateCallbackTask,
+ // CertificateVerifierTask, SSLPrivateKeyMethodDecryptTask, SSLPrivateKeyMethodSignTask,
+ // SSLPrivateKeyMethodTask, and SSLTask hold a gcroot against the InstanceClassLoader.
+ System.setProperty("cassandra.disable_tcactive_openssl", "true");
+ System.setProperty("io.netty.transport.noNative", "true");
+ }
+
+ public static void processReaperWorkaround() throws Throwable {
+ // Make sure the 'process reaper' thread is initially created under the main classloader,
+ // otherwise it gets created with the contextClassLoader pointing to an InstanceClassLoader
+ // which prevents it from being garbage collected.
+ new ProcessBuilder().command("true").start().waitFor();
+ }
+
+ public static void setupLogging()
+ {
+ try
+ {
+ File root = Files.createTempDirectory("in-jvm-dtest").toFile();
+ root.deleteOnExit();
+ String testConfPath = "test/conf/logback-dtest.xml";
+ Path logConfPath = Paths.get(root.getPath(), "/logback-dtest.xml");
+
+ if (!logConfPath.toFile().exists())
+ {
+ Files.copy(new File(testConfPath).toPath(),
+ logConfPath);
+ }
+
+ System.setProperty("logback.configurationFile", "file://" + logConfPath);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @BeforeClass
+ public static void setup() throws Throwable {
+ setupLogging();
+ System.setProperty("cassandra.ring_delay_ms", Integer.toString(10 * 1000));
+ System.setProperty("org.apache.cassandra.disable_mbean_registration", "true");
+ nativeLibraryWorkaround();
+ processReaperWorkaround();
+ }
+
+ public static String withKeyspace(String replaceIn)
+ {
+ return String.format(replaceIn, KEYSPACE);
+ }
+
+ protected static <C extends ICluster<?>> C init(C cluster)
+ {
+ return init(cluster, cluster.size());
+ }
+
+ protected static <C extends ICluster<?>> C init(C cluster, int replicationFactor)
+ {
+ cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + replicationFactor + "};");
+ return cluster;
+ }
+
+ public static void assertRows(Object[][] actual, Object[]... expected)
+ {
+ Assert.assertEquals(rowsNotEqualErrorMessage(actual, expected),
+ expected.length, actual.length);
+
+ for (int i = 0; i < expected.length; i++)
+ {
+ Object[] expectedRow = expected[i];
+ Object[] actualRow = actual[i];
+ Assert.assertTrue(rowsNotEqualErrorMessage(actual, expected),
+ Arrays.equals(expectedRow, actualRow));
+ }
+ }
+
+ public static void assertRow(Object[] actual, Object... expected)
+ {
+ Assert.assertTrue(rowNotEqualErrorMessage(actual, expected),
+ Arrays.equals(actual, expected));
+ }
+
+ public static void assertRows(Iterator<Object[]> actual, Iterator<Object[]> expected)
+ {
+ while (actual.hasNext() && expected.hasNext())
+ assertRow(actual.next(), expected.next());
+
+ Assert.assertEquals("Resultsets have different sizes", actual.hasNext(), expected.hasNext());
+ }
+
+ public static void assertRows(Iterator<Object[]> actual, Object[]... expected)
+ {
+ assertRows(actual, new Iterator<Object[]>() {
+
+ int i = 0;
+ @Override
+ public boolean hasNext() {
+ return i < expected.length;
+ }
+
+ @Override
+ public Object[] next() {
+ return expected[i++];
+ }
+ });
+ }
+
+ public static String rowNotEqualErrorMessage(Object[] actual, Object[] expected)
+ {
+ return String.format("Expected: %s\nActual:%s\n",
+ Arrays.toString(expected),
+ Arrays.toString(actual));
+ }
+
+ public static String rowsNotEqualErrorMessage(Object[][] actual, Object[][] expected)
+ {
+ return String.format("Expected: %s\nActual: %s\n",
+ rowsToString(expected),
+ rowsToString(actual));
+ }
+
+ public static String rowsToString(Object[][] rows)
+ {
+ StringBuilder builder = new StringBuilder();
+ builder.append("[");
+ boolean isFirst = true;
+ for (Object[] row : rows)
+ {
+ if (isFirst)
+ isFirst = false;
+ else
+ builder.append(",");
+ builder.append(Arrays.toString(row));
+ }
+ builder.append("]");
+ return builder.toString();
+ }
+
+ public static Object[][] toObjectArray(Iterator<Object[]> iter)
+ {
+ List<Object[]> res = new ArrayList<>();
+ while (iter.hasNext())
+ res.add(iter.next());
+
+ return res.toArray(new Object[res.size()][]);
+ }
+
+ public static Object[] row(Object... expected)
+ {
+ return expected;
+ }
+
+}
diff --git a/src/main/java/org/apache/cassandra/distributed/shared/InstanceClassLoader.java b/src/main/java/org/apache/cassandra/distributed/shared/InstanceClassLoader.java
new file mode 100644
index 0000000..c37a520
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/shared/InstanceClassLoader.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.shared;
+
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.Arrays;
+import java.util.function.Predicate;
+
+public class InstanceClassLoader extends URLClassLoader
+{
+ private static final Predicate<String> sharePackage = name ->
+ name.startsWith("org.apache.cassandra.distributed.api.")
+ || name.startsWith("org.apache.cassandra.distributed.shared.")
+ || name.startsWith("sun.")
+ || name.startsWith("oracle.")
+ || name.startsWith("com.intellij.")
+ || name.startsWith("com.sun.")
+ || name.startsWith("com.oracle.")
+ || name.startsWith("java.")
+ || name.startsWith("javax.")
+ || name.startsWith("jdk.")
+ || name.startsWith("netscape.")
+ || name.startsWith("org.xml.sax.");
+
+ private volatile boolean isClosed = false;
+ private final URL[] urls;
+ private final int generation; // used to help debug class loader leaks, by helping determine which classloaders should have been collected
+ private final int id;
+ private final ClassLoader sharedClassLoader;
+
+ public InstanceClassLoader(int generation, int id, URL[] urls, ClassLoader sharedClassLoader)
+ {
+ super(urls, null);
+ this.urls = urls;
+ this.sharedClassLoader = sharedClassLoader;
+ this.generation = generation;
+ this.id = id;
+ }
+
+ public int getClusterGeneration()
+ {
+ return generation;
+ }
+
+ public int getInstanceId()
+ {
+ return id;
+ }
+
+ @Override
+ public Class<?> loadClass(String name) throws ClassNotFoundException
+ {
+ if (sharePackage.test(name))
+ return sharedClassLoader.loadClass(name);
+
+ return loadClassInternal(name);
+ }
+
+ Class<?> loadClassInternal(String name) throws ClassNotFoundException
+ {
+ if (isClosed)
+ throw new IllegalStateException(String.format("Can't load %s. Instance class loader is already closed.", name));
+
+ synchronized (getClassLoadingLock(name))
+ {
+ // First, check if the class has already been loaded
+ Class<?> c = findLoadedClass(name);
+
+ if (c == null)
+ c = findClass(name);
+
+ return c;
+ }
+ }
+
+ /**
+ * @return true iff this class was loaded by an InstanceClassLoader, and as such is used by a dtest node
+ */
+ public static boolean wasLoadedByAnInstanceClassLoader(Class<?> clazz)
+ {
+ return clazz.getClassLoader().getClass().getName().equals(InstanceClassLoader.class.getName());
+ }
+
+ public String toString()
+ {
+ return "InstanceClassLoader{" +
+ "generation=" + generation +
+ ", id = " + id +
+ ", urls=" + Arrays.toString(urls) +
+ '}';
+ }
+
+ public void close() throws IOException
+ {
+ isClosed = true;
+ super.close();
+ }
+}
diff --git a/src/main/java/org/apache/cassandra/distributed/shared/MessageFilters.java b/src/main/java/org/apache/cassandra/distributed/shared/MessageFilters.java
new file mode 100644
index 0000000..57b8f57
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/shared/MessageFilters.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.shared;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import org.apache.cassandra.distributed.api.IMessage;
+import org.apache.cassandra.distributed.api.IMessageFilters;
+
+public class MessageFilters implements IMessageFilters
+{
+ private final List<Filter> filters = new CopyOnWriteArrayList<>();
+
+ public boolean permit(int from, int to, IMessage msg)
+ {
+ for (Filter filter : filters)
+ {
+ if (filter.matches(from, to, msg))
+ return false;
+ }
+ return true;
+ }
+
+ public class Filter implements IMessageFilters.Filter
+ {
+ final int[] from;
+ final int[] to;
+ final int[] verbs;
+ final Matcher matcher;
+
+ Filter(int[] from, int[] to, int[] verbs, Matcher matcher)
+ {
+ if (from != null)
+ {
+ from = from.clone();
+ Arrays.sort(from);
+ }
+ if (to != null)
+ {
+ to = to.clone();
+ Arrays.sort(to);
+ }
+ if (verbs != null)
+ {
+ verbs = verbs.clone();
+ Arrays.sort(verbs);
+ }
+ this.from = from;
+ this.to = to;
+ this.verbs = verbs;
+ this.matcher = matcher;
+ }
+
+ public int hashCode()
+ {
+ return (from == null ? 0 : Arrays.hashCode(from))
+ + (to == null ? 0 : Arrays.hashCode(to))
+ + (verbs == null ? 0 : Arrays.hashCode(verbs));
+ }
+
+ public boolean equals(Object that)
+ {
+ return that instanceof Filter && equals((Filter) that);
+ }
+
+ public boolean equals(Filter that)
+ {
+ return Arrays.equals(from, that.from)
+ && Arrays.equals(to, that.to)
+ && Arrays.equals(verbs, that.verbs);
+ }
+
+ public Filter off()
+ {
+ filters.remove(this);
+ return this;
+ }
+
+ public Filter on()
+ {
+ filters.add(this);
+ return this;
+ }
+
+ public boolean matches(int from, int to, IMessage msg)
+ {
+ return (this.from == null || Arrays.binarySearch(this.from, from) >= 0)
+ && (this.to == null || Arrays.binarySearch(this.to, to) >= 0)
+ && (this.verbs == null || Arrays.binarySearch(this.verbs, msg.verb()) >= 0)
+ && (this.matcher == null || this.matcher.matches(from, to, msg));
+ }
+ }
+
+ public class Builder implements IMessageFilters.Builder
+ {
+ int[] from;
+ int[] to;
+ int[] verbs;
+ Matcher matcher;
+
+ private Builder(int[] verbs)
+ {
+ this.verbs = verbs;
+ }
+
+ public Builder from(int... nums)
+ {
+ from = nums;
+ return this;
+ }
+
+ public Builder to(int... nums)
+ {
+ to = nums;
+ return this;
+ }
+
+ public IMessageFilters.Builder messagesMatching(Matcher matcher)
+ {
+ this.matcher = matcher;
+ return this;
+ }
+
+ public Filter drop()
+ {
+ return new Filter(from, to, verbs, matcher).on();
+ }
+ }
+
+
+ public Builder verbs(int... verbs)
+ {
+ return new Builder(verbs);
+ }
+
+ @Override
+ public Builder allVerbs()
+ {
+ return new Builder(null);
+ }
+
+ @Override
+ public void reset()
+ {
+ filters.clear();
+ }
+}
diff --git a/src/main/java/org/apache/cassandra/distributed/shared/MessageFiltersTest.java b/src/main/java/org/apache/cassandra/distributed/shared/MessageFiltersTest.java
new file mode 100644
index 0000000..6eedb16
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/shared/MessageFiltersTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.shared;
+
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.api.IMessage;
+
+public class MessageFiltersTest
+{
+ @Test
+ public void simpleFiltersTest() throws Throwable
+ {
+ int VERB1 = 1;
+ int VERB2 = 2;
+ int VERB3 = 3;
+ int i1 = 1;
+ int i2 = 2;
+ int i3 = 3;
+ String MSG1 = "msg1";
+ String MSG2 = "msg2";
+
+ MessageFilters filters = new MessageFilters();
+ MessageFilters.Filter filter = filters.allVerbs().from(1).drop();
+
+ Assert.assertFalse(filters.permit(i1, i2, msg(VERB1, MSG1)));
+ Assert.assertFalse(filters.permit(i1, i2, msg(VERB2, MSG1)));
+ Assert.assertFalse(filters.permit(i1, i2, msg(VERB3, MSG1)));
+ Assert.assertTrue(filters.permit(i2, i1, msg(VERB1, MSG1)));
+ filter.off();
+ Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG1)));
+ filters.reset();
+
+ filters.verbs(VERB1).from(1).to(2).drop();
+ Assert.assertFalse(filters.permit(i1, i2, msg(VERB1, MSG1)));
+ Assert.assertTrue(filters.permit(i1, i2, msg(VERB2, MSG1)));
+ Assert.assertTrue(filters.permit(i2, i1, msg(VERB1, MSG1)));
+ Assert.assertTrue(filters.permit(i2, i3, msg(VERB2, MSG1)));
+
+ filters.reset();
+ AtomicInteger counter = new AtomicInteger();
+ filters.verbs(VERB1).from(1).to(2).messagesMatching((from, to, msg) -> {
+ counter.incrementAndGet();
+ return Arrays.equals(msg.bytes(), MSG1.getBytes());
+ }).drop();
+ Assert.assertFalse(filters.permit(i1, i2, msg(VERB1, MSG1)));
+ Assert.assertEquals(counter.get(), 1);
+ Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG2)));
+ Assert.assertEquals(counter.get(), 2);
+
+ // filter chain gets interrupted because a higher level filter returns no match
+ Assert.assertTrue(filters.permit(i2, i1, msg(VERB1, MSG1)));
+ Assert.assertEquals(counter.get(), 2);
+ Assert.assertTrue(filters.permit(i2, i1, msg(VERB2, MSG1)));
+ Assert.assertEquals(counter.get(), 2);
+ filters.reset();
+
+ filters.allVerbs().from(3, 2).to(2, 1).drop();
+ Assert.assertFalse(filters.permit(i3, i1, msg(VERB1, MSG1)));
+ Assert.assertFalse(filters.permit(i3, i2, msg(VERB1, MSG1)));
+ Assert.assertFalse(filters.permit(i2, i1, msg(VERB1, MSG1)));
+ Assert.assertTrue(filters.permit(i2, i3, msg(VERB1, MSG1)));
+ Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG1)));
+ Assert.assertTrue(filters.permit(i1, i3, msg(VERB1, MSG1)));
+ filters.reset();
+
+ counter.set(0);
+ filters.allVerbs().from(1).to(2).messagesMatching((from, to, msg) -> {
+ counter.incrementAndGet();
+ return false;
+ }).drop();
+ Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG1)));
+ Assert.assertTrue(filters.permit(i1, i3, msg(VERB1, MSG1)));
+ Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG1)));
+ Assert.assertEquals(2, counter.get());
+ }
+
+ IMessage msg(int verb, String msg)
+ {
+ return new IMessage()
+ {
+ public int verb() { return verb; }
+ public byte[] bytes() { return msg.getBytes(); }
+ public int id() { return 0; }
+ public int version() { return 0; }
+ public NetworkTopology.AddressAndPort from() { return null; }
+ public int fromPort()
+ {
+ return 0;
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/cassandra/distributed/shared/NetworkTopology.java b/src/main/java/org/apache/cassandra/distributed/shared/NetworkTopology.java
new file mode 100644
index 0000000..9a8e8f6
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/shared/NetworkTopology.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.shared;
+
+import java.io.Serializable;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.IntFunction;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public class NetworkTopology
+{
+ private final Map<AddressAndPort, DcAndRack> map;
+
+ public static class DcAndRack
+ {
+ private final String dc;
+ private final String rack;
+
+ private DcAndRack(String dc, String rack)
+ {
+ this.dc = dc;
+ this.rack = rack;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "DcAndRack{" +
+ "dc='" + dc + '\'' +
+ ", rack='" + rack + '\'' +
+ '}';
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ DcAndRack dcAndRack = (DcAndRack) o;
+ return Objects.equals(dc, dcAndRack.dc) &&
+ Objects.equals(rack, dcAndRack.rack);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(dc, rack);
+ }
+ }
+
+ public static class AddressAndPort implements Serializable
+ {
+ private final InetAddress address;
+ private final int port;
+
+ public AddressAndPort(InetAddress address, int port)
+ {
+ this.address = address;
+ this.port = port;
+ }
+
+ public int getPort()
+ {
+ return port;
+ }
+
+ public InetAddress getAddress()
+ {
+ return address;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "AddressAndPort{" +
+ "address=" + address +
+ ", port=" + port +
+ '}';
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ AddressAndPort that = (AddressAndPort) o;
+ return port == that.port &&
+ Objects.equals(address, that.address);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(address, port);
+ }
+ }
+
+ public static DcAndRack dcAndRack(String dc, String rack)
+ {
+ return new DcAndRack(dc, rack);
+ }
+
+ public static AddressAndPort addressAndPort(InetAddress address, int port)
+ {
+ return new AddressAndPort(address, port);
+ }
+
+ public static AddressAndPort addressAndPort(String address, int port)
+ {
+ try
+ {
+ return new AddressAndPort(InetAddress.getByName(address), port);
+ }
+ catch (UnknownHostException e)
+ {
+ throw new IllegalArgumentException("Unknown address '" + address + '\'');
+ }
+ }
+
+ private NetworkTopology()
+ {
+ map = new HashMap<>();
+ }
+
+ @SuppressWarnings("WeakerAccess")
+ public NetworkTopology(NetworkTopology networkTopology)
+ {
+ map = new HashMap<>(networkTopology.map);
+ }
+
+ public static NetworkTopology build(String ipPrefix, int broadcastPort, Map<Integer, DcAndRack> nodeIdTopology)
+ {
+ final NetworkTopology topology = new NetworkTopology();
+
+ for (int nodeId = 1; nodeId <= nodeIdTopology.size(); nodeId++)
+ {
+ String broadcastAddress = ipPrefix + nodeId;
+
+ DcAndRack dcAndRack = nodeIdTopology.get(nodeId);
+ if (dcAndRack == null)
+ throw new IllegalStateException("nodeId " + nodeId + "not found in instanceMap");
+
+ topology.put(addressAndPort(broadcastAddress, broadcastPort), dcAndRack);
+ }
+ return topology;
+ }
+
+ public DcAndRack put(AddressAndPort addressAndPort, DcAndRack value)
+ {
+ return map.put(addressAndPort, value);
+ }
+
+ public String localRack(NetworkTopology.AddressAndPort key)
+ {
+ DcAndRack p = map.get(key);
+ if (p == null)
+ return null;
+ return p.rack;
+ }
+
+ public String localDC(NetworkTopology.AddressAndPort key)
+ {
+ DcAndRack p = map.get(key);
+ if (p == null)
+ return null;
+ return p.dc;
+ }
+
+ public boolean contains(NetworkTopology.AddressAndPort key)
+ {
+ return map.containsKey(key);
+ }
+
+ public String toString()
+ {
+ return "NetworkTopology{" + map + '}';
+ }
+
+
+ public static Map<Integer, NetworkTopology.DcAndRack> singleDcNetworkTopology(int nodeCount,
+ String dc,
+ String rack)
+ {
+ return networkTopology(nodeCount, (nodeid) -> NetworkTopology.dcAndRack(dc, rack));
+ }
+
+ public static Map<Integer, NetworkTopology.DcAndRack> networkTopology(int nodeCount,
+ IntFunction<DcAndRack> dcAndRackSupplier)
+ {
+
+ return IntStream.rangeClosed(1, nodeCount).boxed()
+ .collect(Collectors.toMap(nodeId -> nodeId,
+ dcAndRackSupplier::apply));
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/cassandra/distributed/shared/ThrowingRunnable.java b/src/main/java/org/apache/cassandra/distributed/shared/ThrowingRunnable.java
new file mode 100644
index 0000000..01f8d29
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/shared/ThrowingRunnable.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.shared;
+
+public interface ThrowingRunnable
+{
+ public void run() throws Throwable;
+
+ public static Runnable toRunnable(ThrowingRunnable runnable)
+ {
+ return () -> {
+ try
+ {
+ runnable.run();
+ }
+ catch (Throwable throwable)
+ {
+ throw new RuntimeException(throwable);
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/cassandra/distributed/shared/Versions.java b/src/main/java/org/apache/cassandra/distributed/shared/Versions.java
new file mode 100644
index 0000000..30c0c7c
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/shared/Versions.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.shared;
+
+import java.io.File;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.nio.file.Paths;
+import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+public class Versions
+{
+ public static final String PROPERTY_PREFIX = "cassandra.";
+
+ public static URL[] getClassPath()
+ {
+ // In Java 9 the default system ClassLoader got changed from URLClassLoader to AppClassLoader which
+ // does not extend URLClassLoader nor does it give access to the class path array.
+ // Java requires the system property 'java.class.path' to be setup with the classpath seperated by :
+ // so this logic parses that string into the URL[] needed to build later
+ String cp = System.getProperty("java.class.path");
+ assert cp != null && !cp.isEmpty();
+ String[] split = cp.split(File.pathSeparator);
+ assert split.length > 0;
+ URL[] urls = new URL[split.length];
+ try
+ {
+ for (int i = 0; i < split.length; i++)
+ urls[i] = Paths.get(split[i]).toUri().toURL();
+ }
+ catch (MalformedURLException e)
+ {
+ throw new RuntimeException(e);
+ }
+ return urls;
+ }
+
+ public enum Major
+ {
+ v22("2\\.2\\.([0-9]+)"),
+ v30("3\\.0\\.([0-9]+)"),
+ v3X("3\\.([1-9]|1[01])(\\.([0-9]+))?"),
+ v4("4\\.([0-9]+)");
+ final Pattern pattern;
+ Major(String verify)
+ {
+ this.pattern = Pattern.compile(verify);
+ }
+
+ static Major fromFull(String version)
+ {
+ if (version.isEmpty())
+ throw new IllegalArgumentException(version);
+ switch (version.charAt(0))
+ {
+ case '2':
+ if (version.startsWith("2.2"))
+ return v22;
+ throw new IllegalArgumentException(version);
+ case '3':
+ if (version.startsWith("3.0"))
+ return v30;
+ return v3X;
+ case '4':
+ return v4;
+ default:
+ throw new IllegalArgumentException(version);
+ }
+ }
+
+ // verify that the version string is valid for this major version
+ boolean verify(String version)
+ {
+ return pattern.matcher(version).matches();
+ }
+
+ // sort two strings of the same major version as this enum instance
+ int compare(String a, String b)
+ {
+ Matcher ma = pattern.matcher(a);
+ Matcher mb = pattern.matcher(a);
+ if (!ma.matches()) throw new IllegalArgumentException(a);
+ if (!mb.matches()) throw new IllegalArgumentException(b);
+ int result = Integer.compare(Integer.parseInt(ma.group(1)), Integer.parseInt(mb.group(1)));
+ if (result == 0 && this == v3X && (ma.group(3) != null || mb.group(3) != null))
+ {
+ if (ma.group(3) != null && mb.group(3) != null)
+ {
+ result = Integer.compare(Integer.parseInt(ma.group(3)), Integer.parseInt(mb.group(3)));
+ }
+ else
+ {
+ result = ma.group(3) != null ? 1 : -1;
+ }
+ }
+ // sort descending
+ return -result;
+ }
+ }
+
+ public static class Version
+ {
+ public final Major major;
+ public final String version;
+ public final URL[] classpath;
+
+ public Version(String version, URL[] classpath)
+ {
+ this(Major.fromFull(version), version, classpath);
+ }
+ public Version(Major major, String version, URL[] classpath)
+ {
+ this.major = major;
+ this.version = version;
+ this.classpath = classpath;
+ }
+ }
+
+ private final Map<Major, List<Version>> versions;
+ public Versions(Map<Major, List<Version>> versions)
+ {
+ this.versions = versions;
+ }
+
+ public Version get(String full)
+ {
+ return versions.get(Major.fromFull(full))
+ .stream()
+ .filter(v -> full.equals(v.version))
+ .findFirst()
+ .orElseThrow(() -> new RuntimeException("No version " + full + " found"));
+ }
+
+ public Version getLatest(Major major)
+ {
+ return versions.get(major).stream().findFirst().orElseThrow(() -> new RuntimeException("No " + major + " versions found"));
+ }
+
+ public static Versions find()
+ {
+ final String dtestJarDirectory = System.getProperty(PROPERTY_PREFIX + "test.dtest_jar_path","build");
+ final File sourceDirectory = new File(dtestJarDirectory);
+ System.out.println("Looking for dtest jars in " + sourceDirectory.getAbsolutePath());
+ final Pattern pattern = Pattern.compile("dtest-(?<fullversion>(\\d+)\\.(\\d+)(\\.\\d+)?(\\.\\d+)?)([~\\-]\\w[.\\w]*(?:\\-\\w[.\\w]*)*)?(\\+[.\\w]+)?\\.jar");
+ final Map<Major, List<Version>> versions = new HashMap<>();
+ for (Major major : Major.values())
+ versions.put(major, new ArrayList<>());
+
+ for (File file : sourceDirectory.listFiles())
+ {
+ Matcher m = pattern.matcher(file.getName());
+ if (!m.matches())
+ continue;
+ String version = m.group(1);
+ Major major = Major.fromFull(version);
+ versions.get(major).add(new Version(major, version, new URL[] { toURL(file) }));
+ }
+
+ for (Map.Entry<Major, List<Version>> e : versions.entrySet())
+ {
+ if (e.getValue().isEmpty())
+ continue;
+ Collections.sort(e.getValue(), Comparator.comparing(v -> v.version, e.getKey()::compare));
+ System.out.println("Found " + e.getValue().stream().map(v -> v.version).collect(Collectors.joining(", ")));
+ }
+
+ return new Versions(versions);
+ }
+
+ public static URL toURL(File file)
+ {
+ try
+ {
+ return file.toURI().toURL();
+ }
+ catch (MalformedURLException e)
+ {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+}
diff --git a/src/test/java/org/apache/cassandra/distributed/test/BootstrapTest.java b/src/test/java/org/apache/cassandra/distributed/test/BootstrapTest.java
new file mode 100644
index 0000000..e61faa6
--- /dev/null
+++ b/src/test/java/org/apache/cassandra/distributed/test/BootstrapTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.shared.Builder;
+import org.apache.cassandra.distributed.shared.NetworkTopology;
+
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.apache.cassandra.distributed.shared.DistributedTestBase.KEYSPACE;
+
+public class BootstrapTest extends TestBaseImpl
+{
+
+ @Test
+ public void bootstrapTest() throws Throwable
+ {
+ int originalNodeCount = 2;
+ int expandedNodeCount = originalNodeCount + 1;
+ Builder<IInstance, ICluster> builder = builder().withNodes(originalNodeCount)
+ .withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount))
+ .withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(originalNodeCount, "dc0", "rack0"))
+ .withConfig(config -> config.with(NETWORK, GOSSIP));
+
+ Map<Integer, Long> withBootstrap = null;
+ Map<Integer, Long> naturally = null;
+ try (ICluster<IInvokableInstance> cluster = builder.withNodes(originalNodeCount).start())
+ {
+ populate(cluster);
+
+ IInstanceConfig config = builder.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, "dc0", "rack0"))
+ .newInstanceConfig(cluster);
+ config.set("auto_bootstrap", true);
+
+ cluster.bootstrap(config).startup();
+
+ cluster.stream().forEach(instance -> {
+ instance.nodetool("cleanup", KEYSPACE, "tbl");
+ });
+
+ withBootstrap = count(cluster);
+ }
+
+ builder = builder.withNodes(expandedNodeCount)
+ .withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount))
+ .withConfig(config -> config.with(NETWORK, GOSSIP));
+
+ try (ICluster cluster = builder.start())
+ {
+ populate(cluster);
+ naturally = count(cluster);
+ }
+
+ Assert.assertEquals(withBootstrap, naturally);
+ }
+
+ public void populate(ICluster cluster)
+ {
+ cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + 3 + "};");
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+
+ for (int i = 0; i < 1000; i++)
+ cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (?, ?, ?)",
+ ConsistencyLevel.QUORUM,
+ i, i, i);
+ }
+
+ public Map<Integer, Long> count(ICluster cluster)
+ {
+ return IntStream.rangeClosed(1, cluster.size())
+ .boxed()
+ .collect(Collectors.toMap(nodeId -> nodeId,
+ nodeId -> (Long) cluster.get(nodeId).executeInternal("SELECT count(*) FROM " + KEYSPACE + ".tbl")[0][0]));
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/org/apache/cassandra/distributed/test/DistributedReadWritePathTest.java b/src/test/java/org/apache/cassandra/distributed/test/DistributedReadWritePathTest.java
new file mode 100644
index 0000000..0ce0e74
--- /dev/null
+++ b/src/test/java/org/apache/cassandra/distributed/test/DistributedReadWritePathTest.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ICluster;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+
+public class DistributedReadWritePathTest extends TestBaseImpl
+{
+ @Test
+ public void coordinatorReadTest() throws Throwable
+ {
+ try (ICluster cluster = init(builder().withNodes(3).start()))
+ {
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='none'");
+
+ cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
+ cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, 2)");
+ cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 3, 3)");
+
+ assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?",
+ ConsistencyLevel.ALL,
+ 1),
+ row(1, 1, 1),
+ row(1, 2, 2),
+ row(1, 3, 3));
+ }
+ }
+
+ @Test
+ public void largeMessageTest() throws Throwable
+ {
+ int largeMessageThreshold = 1024 * 64;
+ try (ICluster cluster = init(builder().withNodes(2).start()))
+ {
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v text, PRIMARY KEY (pk, ck))");
+ StringBuilder builder = new StringBuilder();
+ for (int i = 0; i < largeMessageThreshold ; i++)
+ builder.append('a');
+ String s = builder.toString();
+ cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, ?)",
+ ConsistencyLevel.ALL,
+ s);
+ assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?",
+ ConsistencyLevel.ALL,
+ 1),
+ row(1, 1, s));
+ }
+ }
+
+ @Test
+ public void coordinatorWriteTest() throws Throwable
+ {
+ try (ICluster cluster = init(builder().withNodes(3).start()))
+ {
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='none'");
+
+ cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)",
+ ConsistencyLevel.QUORUM);
+
+ for (int i = 0; i < 3; i++)
+ {
+ assertRows(cluster.get(1).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"),
+ row(1, 1, 1));
+ }
+
+ assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
+ ConsistencyLevel.QUORUM),
+ row(1, 1, 1));
+ }
+ }
+
+ @Test
+ public void readRepairTest() throws Throwable
+ {
+ try (ICluster cluster = init(builder().withNodes(3).start()))
+ {
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='blocking'");
+
+ cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
+ cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
+
+ assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"));
+
+ assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
+ ConsistencyLevel.ALL), // ensure node3 in preflist
+ row(1, 1, 1));
+
+ // Verify that data got repaired to the third node
+ assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"),
+ row(1, 1, 1));
+ }
+ }
+
+ @Test
+ public void writeWithSchemaDisagreement() throws Throwable
+ {
+ try (ICluster cluster = init(builder().withNodes(3).withConfig(config -> config.with(NETWORK)).start()))
+ {
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))");
+
+ cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
+ cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
+ cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
+
+ // Introduce schema disagreement
+ cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1);
+
+ Exception thrown = null;
+ try
+ {
+ cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (2, 2, 2, 2)",
+ ConsistencyLevel.QUORUM);
+ }
+ catch (RuntimeException e)
+ {
+ thrown = e;
+ }
+
+ Assert.assertTrue(thrown.getMessage().contains("INCOMPATIBLE_SCHEMA from 127.0.0.2"));
+ Assert.assertTrue(thrown.getMessage().contains("INCOMPATIBLE_SCHEMA from 127.0.0.3"));
+ }
+ }
+
+ @Test
+ public void readWithSchemaDisagreement() throws Throwable
+ {
+ try (ICluster cluster = init(builder().withNodes(3).withConfig(config -> config.with(NETWORK)).start()))
+ {
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))");
+
+ cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
+ cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
+ cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
+
+ // Introduce schema disagreement
+ cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1);
+
+ Exception thrown = null;
+ try
+ {
+ assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
+ ConsistencyLevel.ALL),
+ row(1, 1, 1, null));
+ }
+ catch (Exception e)
+ {
+ thrown = e;
+ }
+
+ Assert.assertTrue(thrown.getMessage().contains("INCOMPATIBLE_SCHEMA from 127.0.0.2"));
+ Assert.assertTrue(thrown.getMessage().contains("INCOMPATIBLE_SCHEMA from 127.0.0.3"));
+ }
+ }
+
+ @Test
+ public void simplePagedReadsTest() throws Throwable
+ {
+ try (ICluster cluster = init(builder().withNodes(3).start()))
+ {
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+
+ int size = 100;
+ Object[][] results = new Object[size][];
+ for (int i = 0; i < size; i++)
+ {
+ cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)",
+ ConsistencyLevel.QUORUM,
+ i, i);
+ results[i] = new Object[] { 1, i, i};
+ }
+
+ // Make sure paged read returns same results with different page sizes
+ for (int pageSize : new int[] { 1, 2, 3, 5, 10, 20, 50})
+ {
+ assertRows(cluster.coordinator(1).executeWithPaging("SELECT * FROM " + KEYSPACE + ".tbl",
+ ConsistencyLevel.QUORUM,
+ pageSize),
+ results);
+ }
+ }
+ }
+
+ @Test
+ public void pagingWithRepairTest() throws Throwable
+ {
+ try (ICluster cluster = init(builder().withNodes(3).start()))
+ {
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+
+ int size = 100;
+ Object[][] results = new Object[size][];
+ for (int i = 0; i < size; i++)
+ {
+ // Make sure that data lands on different nodes and not coordinator
+ cluster.get(i % 2 == 0 ? 2 : 3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)",
+ i, i);
+
+ results[i] = new Object[] { 1, i, i};
+ }
+
+ // Make sure paged read returns same results with different page sizes
+ for (int pageSize : new int[] { 1, 2, 3, 5, 10, 20, 50})
+ {
+ assertRows(cluster.coordinator(1).executeWithPaging("SELECT * FROM " + KEYSPACE + ".tbl",
+ ConsistencyLevel.ALL,
+ pageSize),
+ results);
+ }
+
+ assertRows(cluster.get(1).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl"),
+ results);
+ }
+ }
+
+ @Test
+ public void pagingTests() throws Throwable
+ {
+ try (ICluster cluster = init(builder().withNodes(3).start());
+ ICluster singleNode = init(builder().withNodes(1).withSubnet(1).start()))
+ {
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+ singleNode.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+
+ for (int i = 0; i < 10; i++)
+ {
+ for (int j = 0; j < 10; j++)
+ {
+ cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)",
+ ConsistencyLevel.QUORUM,
+ i, j, i + i);
+ singleNode.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)",
+ ConsistencyLevel.QUORUM,
+ i, j, i + i);
+ }
+ }
+
+ int[] pageSizes = new int[] { 1, 2, 3, 5, 10, 20, 50};
+ String[] statements = new String [] {"SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5",
+ "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck >= 5",
+ "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10",
+ "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 LIMIT 3",
+ "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck >= 5 LIMIT 2",
+ "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10 LIMIT 2",
+ "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 ORDER BY ck DESC",
+ "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck >= 5 ORDER BY ck DESC",
+ "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10 ORDER BY ck DESC",
+ "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 ORDER BY ck DESC LIMIT 3",
+ "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck >= 5 ORDER BY ck DESC LIMIT 2",
+ "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10 ORDER BY ck DESC LIMIT 2",
+ "SELECT DISTINCT pk FROM " + KEYSPACE + ".tbl LIMIT 3",
+ "SELECT DISTINCT pk FROM " + KEYSPACE + ".tbl WHERE pk IN (3,5,8,10)",
+ "SELECT DISTINCT pk FROM " + KEYSPACE + ".tbl WHERE pk IN (3,5,8,10) LIMIT 2"
+ };
+ for (String statement : statements)
+ {
+ for (int pageSize : pageSizes)
+ {
+ assertRows(cluster.coordinator(1)
+ .executeWithPaging(statement,
+ ConsistencyLevel.QUORUM, pageSize),
+ singleNode.coordinator(1)
+ .executeWithPaging(statement,
+ ConsistencyLevel.QUORUM, Integer.MAX_VALUE));
+ }
+ }
+ }
+ }
+}
+
diff --git a/src/test/java/org/apache/cassandra/distributed/test/GossipSettlesTest.java b/src/test/java/org/apache/cassandra/distributed/test/GossipSettlesTest.java
new file mode 100644
index 0000000..e3d3c68
--- /dev/null
+++ b/src/test/java/org/apache/cassandra/distributed/test/GossipSettlesTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.api.ICluster;
+
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+
+public class GossipSettlesTest extends TestBaseImpl
+{
+
+ @Test
+ public void testGossipSettles() throws Throwable
+ {
+ /* Use withSubnet(1) to prove seed provider is set correctly - without the fix to pass a seed provider, this test fails */
+ try (ICluster cluster = builder().withNodes(3)
+ .withConfig(config -> config.with(GOSSIP).with(NETWORK))
+ .withSubnet(1)
+ .start())
+ {
+ }
+ }
+
+}
diff --git a/src/test/java/org/apache/cassandra/distributed/test/LargeColumnTest.java b/src/test/java/org/apache/cassandra/distributed/test/LargeColumnTest.java
new file mode 100644
index 0000000..3c3ee47
--- /dev/null
+++ b/src/test/java/org/apache/cassandra/distributed/test/LargeColumnTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ICluster;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+public class LargeColumnTest extends TestBaseImpl
+{
+ private static final Logger logger = LoggerFactory.getLogger(LargeColumnTest.class);
+
+ private static String str(int length, Random random, long seed)
+ {
+ random.setSeed(seed);
+ char[] chars = new char[length];
+ int i = 0;
+ int s = 0;
+ long v = 0;
+ while (i < length)
+ {
+ if (s == 0)
+ {
+ v = random.nextLong();
+ s = 8;
+ }
+ chars[i] = (char) (((v & 127) + 32) & 127);
+ v >>= 8;
+ --s;
+ ++i;
+ }
+ return new String(chars);
+ }
+
+ private void testLargeColumns(int nodes, int columnSize, int rowCount) throws Throwable
+ {
+ Random random = new Random();
+ long seed = ThreadLocalRandom.current().nextLong();
+ logger.info("Using seed {}", seed);
+
+ try (ICluster cluster = init(builder()
+ .withNodes(nodes)
+ .withConfig(config ->
+ config.set("commitlog_segment_size_in_mb", (columnSize * 3) >> 20)
+ .set("internode_application_send_queue_reserve_endpoint_capacity_in_bytes", columnSize * 2)
+ .set("internode_application_send_queue_reserve_global_capacity_in_bytes", columnSize * 3)
+ .set("write_request_timeout_in_ms", SECONDS.toMillis(30L))
+ .set("read_request_timeout_in_ms", SECONDS.toMillis(30L))
+ .set("memtable_heap_space_in_mb", 1024)
+ )
+ .start()))
+ {
+ cluster.schemaChange(String.format("CREATE TABLE %s.cf (k int, c text, PRIMARY KEY (k))", KEYSPACE));
+
+ for (int i = 0; i < rowCount; ++i)
+ cluster.coordinator(1).execute(String.format("INSERT INTO %s.cf (k, c) VALUES (?, ?);", KEYSPACE), ConsistencyLevel.ALL, i, str(columnSize, random, seed | i));
+
+ for (int i = 0; i < rowCount; ++i)
+ {
+ Object[][] results = cluster.coordinator(1).execute(String.format("SELECT k, c FROM %s.cf WHERE k = ?;", KEYSPACE), ConsistencyLevel.ALL, i);
+ Assert.assertTrue(str(columnSize, random, seed | i).equals(results[0][1]));
+ }
+ }
+ }
+
+ @Test
+ public void test() throws Throwable
+ {
+ testLargeColumns(2, 16 << 20, 5);
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/org/apache/cassandra/distributed/test/NativeProtocolTest.java b/src/test/java/org/apache/cassandra/distributed/test/NativeProtocolTest.java
new file mode 100644
index 0000000..c7e9b26
--- /dev/null
+++ b/src/test/java/org/apache/cassandra/distributed/test/NativeProtocolTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import org.apache.cassandra.distributed.impl.RowUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.core.Statement;
+import org.apache.cassandra.distributed.api.ICluster;
+
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+
+public class NativeProtocolTest extends TestBaseImpl
+{
+
+ @Test
+ public void withClientRequests() throws Throwable
+ {
+ try (ICluster ignored = init(builder().withNodes(3)
+ .withConfig(config -> config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL))
+ .start()))
+ {
+
+ try (com.datastax.driver.core.Cluster cluster = com.datastax.driver.core.Cluster.builder().addContactPoint("127.0.0.1").build();
+ Session session = cluster.connect())
+ {
+ session.execute("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck));");
+ session.execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) values (1,1,1);");
+ Statement select = new SimpleStatement("select * from " + KEYSPACE + ".tbl;").setConsistencyLevel(ConsistencyLevel.ALL);
+ final ResultSet resultSet = session.execute(select);
+ assertRows(RowUtil.toObjects(resultSet), row(1, 1, 1));
+ Assert.assertEquals(3, cluster.getMetadata().getAllHosts().size());
+ }
+ }
+ }
+
+ @Test
+ public void withCounters() throws Throwable
+ {
+ try (ICluster ignored = init(builder().withNodes(3)
+ .withConfig(config -> config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL))
+ .start()))
+ {
+ final com.datastax.driver.core.Cluster cluster = com.datastax.driver.core.Cluster.builder().addContactPoint("127.0.0.1").build();
+ Session session = cluster.connect();
+ session.execute("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck counter, PRIMARY KEY (pk));");
+ session.execute("UPDATE " + KEYSPACE + ".tbl set ck = ck + 10 where pk = 1;");
+ Statement select = new SimpleStatement("select * from " + KEYSPACE + ".tbl;").setConsistencyLevel(ConsistencyLevel.ALL);
+ final ResultSet resultSet = session.execute(select);
+ assertRows(RowUtil.toObjects(resultSet), row(1, 10L));
+ Assert.assertEquals(3, cluster.getMetadata().getAllHosts().size());
+ session.close();
+ cluster.close();
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/org/apache/cassandra/distributed/test/NetworkTopologyTest.java b/src/test/java/org/apache/cassandra/distributed/test/NetworkTopologyTest.java
new file mode 100644
index 0000000..53154e3
--- /dev/null
+++ b/src/test/java/org/apache/cassandra/distributed/test/NetworkTopologyTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.util.Collections;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.shared.NetworkTopology;
+
+public class NetworkTopologyTest extends TestBaseImpl
+{
+ @Test
+ public void namedDcTest() throws Throwable
+ {
+ try (ICluster<IInvokableInstance> cluster = builder()
+ .withNodeIdTopology(Collections.singletonMap(1, NetworkTopology.dcAndRack("somewhere", "rack0")))
+ .withRack("elsewhere", "firstrack", 1)
+ .withRack("elsewhere", "secondrack", 2)
+ .withDC("nearthere", 4)
+ .start())
+ {
+ Assert.assertEquals(1, cluster.stream("somewhere").count());
+ Assert.assertEquals(1, cluster.stream("elsewhere", "firstrack").count());
+ Assert.assertEquals(2, cluster.stream("elsewhere", "secondrack").count());
+ Assert.assertEquals(3, cluster.stream("elsewhere").count());
+ Assert.assertEquals(4, cluster.stream("nearthere").count());
+
+ Set<IInstance> expect = cluster.stream().collect(Collectors.toSet());
+ Set<IInstance> result = Stream.concat(Stream.concat(cluster.stream("somewhere"),
+ cluster.stream("elsewhere")),
+ cluster.stream("nearthere")).collect(Collectors.toSet());
+ Assert.assertEquals(expect, result);
+ }
+ }
+
+ @Test
+ public void automaticNamedDcTest() throws Throwable
+
+ {
+ try (ICluster cluster = builder()
+ .withRacks(2, 1, 3)
+ .start())
+ {
+ Assert.assertEquals(6, cluster.stream().count());
+ Assert.assertEquals(3, cluster.stream("datacenter1").count());
+ Assert.assertEquals(3, cluster.stream("datacenter2", "rack1").count());
+ }
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void noCountsAfterNamingDCsTest()
+ {
+ builder().withDC("nameddc", 1)
+ .withDCs(1);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void mustProvideNodeCountBeforeWithDCsTest()
+ {
+ builder().withDCs(1);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void noEmptyNodeIdTopologyTest()
+ {
+ builder().withNodeIdTopology(Collections.emptyMap());
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void noHolesInNodeIdTopologyTest()
+ {
+ builder().withNodeIdTopology(Collections.singletonMap(2, NetworkTopology.dcAndRack("doomed", "rack")));
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java b/src/test/java/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
new file mode 100644
index 0000000..70790bc
--- /dev/null
+++ b/src/test/java/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ICluster;
+
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+
+public class SimpleReadWriteTest extends TestBaseImpl
+{
+ @Test
+ public void coordinatorReadTest() throws Throwable
+ {
+ try (ICluster cluster = init(builder().withNodes(3).start()))
+ {
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='none'");
+
+ cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
+ cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, 2)");
+ cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 3, 3)");
+
+ assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?",
+ ConsistencyLevel.ALL,
+ 1),
+ row(1, 1, 1),
+ row(1, 2, 2),
+ row(1, 3, 3));
+ }
+ }
+
+ @Test
+ public void coordinatorWriteTest() throws Throwable
+ {
+ try (ICluster cluster = init(builder().withNodes(3).start()))
+ {
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='none'");
+
+ cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)",
+ ConsistencyLevel.QUORUM);
+
+ for (int i = 0; i < 3; i++)
+ {
+ assertRows(cluster.get(1).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"),
+ row(1, 1, 1));
+ }
+
+ assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
+ ConsistencyLevel.QUORUM),
+ row(1, 1, 1));
+ }
+ }
+
+ @Test
+ public void readRepairTest() throws Throwable
+ {
+ try (ICluster cluster = init(builder().withNodes(3).start()))
+ {
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='blocking'");
+
+ cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
+ cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
+
+ assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"));
+
+ assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
+ ConsistencyLevel.ALL), // ensure node3 in preflist
+ row(1, 1, 1));
+
+ // Verify that data got repaired to the third node
+ assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"),
+ row(1, 1, 1));
+ }
+ }
+
+ @Test
+ public void writeWithSchemaDisagreement() throws Throwable
+ {
+ try (ICluster cluster = init(builder().withNodes(3).withConfig(config -> config.with(NETWORK)).start()))
+ {
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))");
+
+ cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
+ cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
+ cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
+
+ // Introduce schema disagreement
+ cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1);
+
+ Exception thrown = null;
+ try
+ {
+ cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (2, 2, 2, 2)",
+ ConsistencyLevel.QUORUM);
+ }
+ catch (RuntimeException e)
+ {
+ thrown = e;
+ }
+
+ Assert.assertTrue(thrown.getMessage().contains("INCOMPATIBLE_SCHEMA from 127.0.0.2"));
+ Assert.assertTrue(thrown.getMessage().contains("INCOMPATIBLE_SCHEMA from 127.0.0.3"));
+ }
+ }
+
+ @Test
+ public void readWithSchemaDisagreement() throws Throwable
+ {
+ try (ICluster cluster = init(builder().withNodes(3).withConfig(config -> config.with(NETWORK)).start()))
+ {
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))");
+
+ cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
+ cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
+ cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
+
+ // Introduce schema disagreement
+ cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1);
+
+ Exception thrown = null;
+ try
+ {
+ assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
+ ConsistencyLevel.ALL),
+ row(1, 1, 1, null));
+ }
+ catch (Exception e)
+ {
+ thrown = e;
+ }
+
+ Assert.assertTrue(thrown.getMessage(), thrown.getMessage().contains("INCOMPATIBLE_SCHEMA from 127.0.0.2"));
+ Assert.assertTrue(thrown.getMessage(), thrown.getMessage().contains("INCOMPATIBLE_SCHEMA from 127.0.0.3"));
+ }
+ }
+
+ @Test
+ public void simplePagedReadsTest() throws Throwable
+ {
+ try (ICluster cluster = init(builder().withNodes(3).start()))
+ {
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+
+ int size = 100;
+ Object[][] results = new Object[size][];
+ for (int i = 0; i < size; i++)
+ {
+ cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)",
+ ConsistencyLevel.QUORUM,
+ i, i);
+ results[i] = new Object[] { 1, i, i};
+ }
+
+ // Make sure paged read returns same results with different page sizes
+ for (int pageSize : new int[] { 1, 2, 3, 5, 10, 20, 50})
+ {
+ assertRows(cluster.coordinator(1).executeWithPaging("SELECT * FROM " + KEYSPACE + ".tbl",
+ ConsistencyLevel.QUORUM,
+ pageSize),
+ results);
+ }
+ }
+ }
+
+ @Test
+ public void pagingWithRepairTest() throws Throwable
+ {
+ try (ICluster cluster = init(builder().withNodes(3).start()))
+ {
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+
+ int size = 100;
+ Object[][] results = new Object[size][];
+ for (int i = 0; i < size; i++)
+ {
+ // Make sure that data lands on different nodes and not coordinator
+ cluster.get(i % 2 == 0 ? 2 : 3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)",
+ i, i);
+
+ results[i] = new Object[] { 1, i, i};
+ }
+
+ // Make sure paged read returns same results with different page sizes
+ for (int pageSize : new int[] { 1, 2, 3, 5, 10, 20, 50})
+ {
+ assertRows(cluster.coordinator(1).executeWithPaging("SELECT * FROM " + KEYSPACE + ".tbl",
+ ConsistencyLevel.ALL,
+ pageSize),
+ results);
+ }
+
+ assertRows(cluster.get(1).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl"),
+ results);
+ }
+ }
+
+ @Test
+ public void pagingTests() throws Throwable
+ {
+ try (ICluster cluster = init(builder().withNodes(3).start());
+ ICluster singleNode = init(builder().withNodes(1).withSubnet(1).start()))
+ {
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+ singleNode.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+
+ for (int i = 0; i < 10; i++)
+ {
+ for (int j = 0; j < 10; j++)
+ {
+ cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)",
+ ConsistencyLevel.QUORUM,
+ i, j, i + i);
+ singleNode.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, ?)",
+ ConsistencyLevel.QUORUM,
+ i, j, i + i);
+ }
+ }
+
+ int[] pageSizes = new int[] { 1, 2, 3, 5, 10, 20, 50};
+ String[] statements = new String [] {"SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5",
+ "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck >= 5",
+ "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10",
+ "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 LIMIT 3",
+ "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck >= 5 LIMIT 2",
+ "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10 LIMIT 2",
+ "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 ORDER BY ck DESC",
+ "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck >= 5 ORDER BY ck DESC",
+ "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10 ORDER BY ck DESC",
+ "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 ORDER BY ck DESC LIMIT 3",
+ "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck >= 5 ORDER BY ck DESC LIMIT 2",
+ "SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1 AND ck > 5 AND ck <= 10 ORDER BY ck DESC LIMIT 2",
+ "SELECT DISTINCT pk FROM " + KEYSPACE + ".tbl LIMIT 3",
+ "SELECT DISTINCT pk FROM " + KEYSPACE + ".tbl WHERE pk IN (3,5,8,10)",
+ "SELECT DISTINCT pk FROM " + KEYSPACE + ".tbl WHERE pk IN (3,5,8,10) LIMIT 2"
+ };
+ for (String statement : statements)
+ {
+ for (int pageSize : pageSizes)
+ {
+ assertRows(cluster.coordinator(1)
+ .executeWithPaging(statement,
+ ConsistencyLevel.QUORUM, pageSize),
+ singleNode.coordinator(1)
+ .executeWithPaging(statement,
+ ConsistencyLevel.QUORUM, Integer.MAX_VALUE));
+ }
+ }
+
+ }
+ }
+}
diff --git a/test/conf/logback-dtest.xml b/test/conf/logback-dtest.xml
new file mode 100644
index 0000000..370e1e5
--- /dev/null
+++ b/test/conf/logback-dtest.xml
@@ -0,0 +1,77 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+<configuration debug="false" scan="true" scanPeriod="60 seconds">
+ <define name="instance_id" class="org.apache.cassandra.distributed.impl.InstanceIDDefiner" />
+
+ <!-- Shutdown hook ensures that async appender flushes -->
+ <shutdownHook class="ch.qos.logback.core.hook.DelayingShutdownHook"/>
+
+ <appender name="INSTANCEFILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
+
+ <file>./build/test/logs/${cassandra.testtag}/TEST-${suitename}.log</file>
+ <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
+ <fileNamePattern>./build/test/logs/${cassandra.testtag}/TEST-${suitename}.log.%i.gz</fileNamePattern>
+ <minIndex>1</minIndex>
+ <maxIndex>20</maxIndex>
+ </rollingPolicy>
+
+ <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+ <maxFileSize>20MB</maxFileSize>
+ </triggeringPolicy>
+
+ <encoder>
+ <pattern>%-5level [%thread] ${instance_id} %date{ISO8601} %msg%n</pattern>
+ </encoder>
+ <immediateFlush>false</immediateFlush>
+ </appender>
+
+ <appender name="INSTANCEASYNCFILE" class="ch.qos.logback.classic.AsyncAppender">
+ <discardingThreshold>0</discardingThreshold>
+ <maxFlushTime>0</maxFlushTime>
+ <queueSize>1024</queueSize>
+ <appender-ref ref="INSTANCEFILE"/>
+ </appender>
+
+ <appender name="INSTANCESTDERR" target="System.err" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%-5level %date{HH:mm:ss,SSS} %msg%n</pattern>
+ </encoder>
+ <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+ <level>WARN</level>
+ </filter>
+ </appender>
+
+ <appender name="INSTANCESTDOUT" target="System.out" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%-5level [%thread] ${instance_id} %date{ISO8601} %F:%L - %msg%n</pattern>
+ </encoder>
+ <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+ <level>DEBUG</level>
+ </filter>
+ </appender>
+
+ <logger name="org.apache.hadoop" level="WARN"/>
+
+ <root level="DEBUG">
+ <appender-ref ref="INSTANCEASYNCFILE" />
+ <appender-ref ref="INSTANCESTDERR" />
+ <appender-ref ref="INSTANCESTDOUT" />
+ </root>
+</configuration>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org