You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2020/03/21 10:00:10 UTC
[cassandra-in-jvm-dtest-api] branch master updated: Introduce the
extracted in-JVM DTest API
This is an automated email from the ASF dual-hosted git repository.
ifesdjeen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/cassandra-in-jvm-dtest-api.git
The following commit(s) were added to refs/heads/master by this push:
new a562fd5 Introduce the extracted in-JVM DTest API
a562fd5 is described below
commit a562fd56b302e0573b2af9371aa948689714dcbc
Author: Alex Petrov <ol...@gmail.com>
AuthorDate: Mon Feb 24 12:06:09 2020 +0100
Introduce the extracted in-JVM DTest API
Patch by Alex Petrov; reviewed by David Capwell for CASSANDRA-15539.
---
pom.xml | 99 +++++++++
.../distributed/api/ConsistencyLevel.java | 34 +++
.../apache/cassandra/distributed/api/Feature.java | 24 +++
.../apache/cassandra/distributed/api/ICluster.java | 98 +++++++++
.../cassandra/distributed/api/ICoordinator.java | 38 ++++
.../cassandra/distributed/api/IInstance.java | 72 +++++++
.../cassandra/distributed/api/IInstanceConfig.java | 102 +++++++++
.../distributed/api/IInvokableInstance.java | 67 ++++++
.../distributed/api/IIsolatedExecutor.java | 128 +++++++++++
.../apache/cassandra/distributed/api/IListen.java | 28 +++
.../apache/cassandra/distributed/api/IMessage.java | 37 ++++
.../cassandra/distributed/api/IMessageFilters.java | 100 +++++++++
.../distributed/api/IUpgradeableInstance.java | 29 +++
.../cassandra/distributed/api/LongTokenRange.java | 38 ++++
.../cassandra/distributed/api/NodeToolResult.java | 182 ++++++++++++++++
.../cassandra/distributed/api/QueryResult.java | 139 ++++++++++++
.../org/apache/cassandra/distributed/api/Row.java | 110 ++++++++++
.../cassandra/distributed/api/TokenSupplier.java | 32 +++
.../cassandra/distributed/shared/AssertUtils.java | 130 ++++++++++++
.../cassandra/distributed/shared/Builder.java | 233 +++++++++++++++++++++
.../distributed/shared/DistributedTestBase.java | 57 +++++
.../distributed/shared/InstanceClassLoader.java | 116 ++++++++++
.../distributed/shared/MessageFilters.java | 194 +++++++++++++++++
.../distributed/shared/NetworkTopology.java | 169 +++++++++++++++
.../distributed/shared/ThrowingRunnable.java | 38 ++++
.../cassandra/distributed/shared/Versions.java | 206 ++++++++++++++++++
test/conf/logback-dtest.xml | 77 +++++++
27 files changed, 2577 insertions(+)
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..61fe4b8
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,99 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache</groupId>
+ <artifactId>apache</artifactId>
+ <version>23</version>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.apache.cassandra</groupId>
+ <artifactId>dtest-api</artifactId>
+ <version>0.0.2-SNAPSHOT</version>
+ <name>In JVM Test API</name>
+ <description>In JVM Test API</description>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+
+ <licenses>
+ <license>
+ <name>Apache License, Version 2.0</name>
+ <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+ <distribution>repo</distribution>
+ </license>
+ </licenses>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.7.25</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.12</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.cassandra</groupId>
+ <artifactId>in-jvm-dtest-cassandra-tryout</artifactId>
+ <version>0.0.1-2.2-1</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.8.0</version>
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ </configuration>
+ </plugin>
+
+
+ <plugin>
+ <artifactId>maven-deploy-plugin</artifactId>
+ <version>2.8.2</version>
+ <executions>
+ <execution>
+ <id>default-deploy</id>
+ <phase>deploy</phase>
+ <goals>
+ <goal>deploy</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-gpg-plugin</artifactId>
+ <version>1.5</version>
+ <executions>
+ <execution>
+ <id>sign-artifacts</id>
+ <phase>verify</phase>
+ <goals>
+ <goal>sign</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ </plugins>
+ </build>
+
+ <scm>
+ <connection>scm:git:https://gitbox.apache.org/repos/asf/cassandra-in-jvm-dtest-api.git</connection>
+ <developerConnection>scm:git:https://gitbox.apache.org/repos/asf/cassandra-in-jvm-dtest-api.git</developerConnection>
+ <url>https://gitbox.apache.org/repos/asf/cassandra-in-jvm-dtest-api.git</url>
+ <tag>0.0.4</tag>
+ </scm>
+</project>
+
diff --git a/src/main/java/org/apache/cassandra/distributed/api/ConsistencyLevel.java b/src/main/java/org/apache/cassandra/distributed/api/ConsistencyLevel.java
new file mode 100644
index 0000000..3c057f8
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/api/ConsistencyLevel.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.api;
+
+public enum ConsistencyLevel {
+ ANY,
+ ONE,
+ TWO,
+ THREE,
+ QUORUM,
+ ALL,
+ LOCAL_QUORUM,
+ EACH_QUORUM,
+ SERIAL,
+ LOCAL_SERIAL,
+ LOCAL_ONE,
+ NODE_LOCAL
+}
diff --git a/src/main/java/org/apache/cassandra/distributed/api/Feature.java b/src/main/java/org/apache/cassandra/distributed/api/Feature.java
new file mode 100644
index 0000000..b4ba036
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/api/Feature.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.api;
+
+public enum Feature
+{
+ NETWORK, GOSSIP, NATIVE_PROTOCOL
+}
diff --git a/src/main/java/org/apache/cassandra/distributed/api/ICluster.java b/src/main/java/org/apache/cassandra/distributed/api/ICluster.java
new file mode 100644
index 0000000..dffd980
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/api/ICluster.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.api;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.stream.Stream;
+
+public interface ICluster<I extends IInstance> extends AutoCloseable {
+ void startup();
+
+ I bootstrap(IInstanceConfig config);
+
+ I get(int i);
+
+ I get(InetSocketAddress endpoint);
+
+ ICoordinator coordinator(int node);
+
+ void schemaChange(String query);
+
+ void schemaChange(String statement, int instance);
+
+ int size();
+
+ Stream<I> stream();
+
+ Stream<I> stream(String dcName);
+
+ Stream<I> stream(String dcName, String rackName);
+
+ IMessageFilters filters();
+
+ static void setup() throws Throwable {
+ setupLogging();
+ setSystemProperties();
+ nativeLibraryWorkaround();
+ processReaperWorkaround();
+ }
+
+ static void nativeLibraryWorkaround() {
+ // Disable the Netty tcnative library otherwise the io.netty.internal.tcnative.CertificateCallbackTask,
+ // CertificateVerifierTask, SSLPrivateKeyMethodDecryptTask, SSLPrivateKeyMethodSignTask,
+ // SSLPrivateKeyMethodTask, and SSLTask hold a gcroot against the InstanceClassLoader.
+ System.setProperty("cassandra.disable_tcactive_openssl", "true");
+ System.setProperty("io.netty.transport.noNative", "true");
+ }
+
+ static void processReaperWorkaround() throws Throwable {
+ // Make sure the 'process reaper' thread is initially created under the main classloader,
+ // otherwise it gets created with the contextClassLoader pointing to an InstanceClassLoader
+ // which prevents it from being garbage collected.
+ new ProcessBuilder().command("true").start().waitFor();
+ }
+
+ static void setSystemProperties() {
+ System.setProperty("cassandra.ring_delay_ms", Integer.toString(30 * 1000));
+ System.setProperty("org.apache.cassandra.disable_mbean_registration", "true");
+ }
+
+ static void setupLogging() {
+ try {
+ File root = Files.createTempDirectory("in-jvm-dtest").toFile();
+ root.deleteOnExit();
+ String testConfPath = "test/conf/logback-dtest.xml";
+ Path logConfPath = Paths.get(root.getPath(), "/logback-dtest.xml");
+
+ if (!logConfPath.toFile().exists()) {
+ Files.copy(new File(testConfPath).toPath(),
+ logConfPath);
+ }
+
+ System.setProperty("logback.configurationFile", "file://" + logConfPath);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/cassandra/distributed/api/ICoordinator.java b/src/main/java/org/apache/cassandra/distributed/api/ICoordinator.java
new file mode 100644
index 0000000..3d07a3d
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/api/ICoordinator.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.api;
+
+import java.util.Iterator;
+import java.util.UUID;
+import java.util.concurrent.Future;
+
+// The cross-version API requires that a Coordinator can be constructed without any constructor arguments
+public interface ICoordinator
+{
+ default Object[][] execute(String query, ConsistencyLevel consistencyLevel, Object... boundValues)
+ {
+ return executeWithResult(query, consistencyLevel, boundValues).toObjectArrays();
+ }
+ QueryResult executeWithResult(String query, ConsistencyLevel consistencyLevel, Object... boundValues);
+ Iterator<Object[]> executeWithPaging(String query, ConsistencyLevel consistencyLevel, int pageSize, Object... boundValues);
+
+ Future<Object[][]> asyncExecuteWithTracing(UUID sessionId, String query, ConsistencyLevel consistencyLevel, Object... boundValues);
+ Object[][] executeWithTracing(UUID sessionId, String query, ConsistencyLevel consistencyLevel, Object... boundValues);
+ IInstance instance();
+}
diff --git a/src/main/java/org/apache/cassandra/distributed/api/IInstance.java b/src/main/java/org/apache/cassandra/distributed/api/IInstance.java
new file mode 100644
index 0000000..0dd4865
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/api/IInstance.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.api;
+
+import java.net.InetSocketAddress;
+import java.util.UUID;
+import java.util.concurrent.Future;
+
+// The cross-version API requires that an Instance has a constructor signature of (IInstanceConfig, ClassLoader)
+public interface IInstance extends IIsolatedExecutor
+{
+ ICoordinator coordinator();
+ IListen listen();
+
+ void schemaChangeInternal(String query);
+ public Object[][] executeInternal(String query, Object... args);
+
+ IInstanceConfig config();
+ InetSocketAddress broadcastAddress();
+ UUID schemaVersion();
+
+ void startup();
+ boolean isShutdown();
+ Future<Void> shutdown();
+ Future<Void> shutdown(boolean graceful);
+
+ int liveMemberCount();
+
+ NodeToolResult nodetoolResult(boolean withNotifications, String... commandAndArgs);
+ default NodeToolResult nodetoolResult(String... commandAndArgs)
+ {
+ return nodetoolResult(true, commandAndArgs);
+ }
+ default int nodetool(String... commandAndArgs) {
+ return nodetoolResult(commandAndArgs).getRc();
+ }
+ void uncaughtException(Thread t, Throwable e);
+
+ /**
+ * Return the number of times the instance tried to call {@link System#exit(int)}.
+ *
+ * When the instance is shutdown, this state should be saved, but in case not possible should return {@code -1}
+ * to indicate "unknown".
+ */
+ long killAttempts();
+
+ // these methods are not for external use, but for simplicity we leave them public and on the normal IInstance interface
+ void startup(ICluster cluster);
+ void receiveMessage(IMessage message);
+
+ int getMessagingVersion();
+ void setMessagingVersion(InetSocketAddress addressAndPort, int version);
+
+ void flush(String keyspace);
+ void forceCompact(String keyspace, String table);
+}
diff --git a/src/main/java/org/apache/cassandra/distributed/api/IInstanceConfig.java b/src/main/java/org/apache/cassandra/distributed/api/IInstanceConfig.java
new file mode 100644
index 0000000..97c1534
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/api/IInstanceConfig.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.api;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.function.Function;
+
+import org.apache.cassandra.distributed.shared.NetworkTopology;
+import org.apache.cassandra.distributed.shared.Versions;
+
+public interface IInstanceConfig
+{
+ IInstanceConfig with(Feature featureFlag);
+ IInstanceConfig with(Feature... flags);
+
+ int num();
+ UUID hostId();
+ InetSocketAddress broadcastAddress();
+ NetworkTopology networkTopology();
+
+ String localRack();
+ String localDatacenter();
+
+ /**
+ * write the specified parameters to the Config object; we do not specify Config as the type to support a Config
+ * from any ClassLoader; the implementation must not directly access any fields of the Object, or cast it, but
+ * must use the reflection API to modify the state
+ */
+ void propagate(Object writeToConfig, Map<Class<?>, Function<Object, Object>>executor);
+
+ /**
+ * Validates whether the config properties are within range of accepted values.
+ */
+ void validate();
+ IInstanceConfig set(String fieldName, Object value);
+ Object get(String fieldName);
+ String getString(String fieldName);
+ int getInt(String fieldName);
+ boolean has(Feature featureFlag);
+
+ public IInstanceConfig forVersion(Versions.Major major);
+
+ public static class ParameterizedClass
+ {
+ public static final String CLASS_NAME = "class_name";
+ public static final String PARAMETERS = "parameters";
+
+ public String class_name;
+ public Map<String, String> parameters;
+
+ public ParameterizedClass(String class_name, Map<String, String> parameters)
+ {
+ this.class_name = class_name;
+ this.parameters = parameters;
+ }
+
+ @SuppressWarnings("unchecked")
+ public ParameterizedClass(Map<String, ?> p)
+ {
+ this((String)p.get(CLASS_NAME),
+ p.containsKey(PARAMETERS) ? (Map<String, String>)((List<?>)p.get(PARAMETERS)).get(0) : null);
+ }
+
+ @Override
+ public boolean equals(Object that)
+ {
+ return that instanceof ParameterizedClass && equals((ParameterizedClass) that);
+ }
+
+ public boolean equals(ParameterizedClass that)
+ {
+ return Objects.equals(class_name, that.class_name) && Objects.equals(parameters, that.parameters);
+ }
+
+ @Override
+ public String toString()
+ {
+ return class_name + (parameters == null ? "" : parameters.toString());
+ }
+ }
+
+}
diff --git a/src/main/java/org/apache/cassandra/distributed/api/IInvokableInstance.java b/src/main/java/org/apache/cassandra/distributed/api/IInvokableInstance.java
new file mode 100644
index 0000000..1dbc4cc
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/api/IInvokableInstance.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.api;
+
+import java.io.Serializable;
+import java.util.concurrent.Future;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import org.apache.cassandra.distributed.api.IInstance;
+
+/**
+ * This version is only supported for a Cluster running the same code as the test environment, and permits
+ * ergonomic cross-node behaviours, without editing the cross-version API.
+ *
+ * A lambda can be written tto be invoked on any or all of the nodes.
+ *
+ * The reason this cannot (easily) be made cross-version is that the lambda is tied to the declaring class, which will
+ * not be the same in the alternate version. Even were it not, there would likely be a runtime linkage error given
+ * any code divergence.
+ */
+public interface IInvokableInstance extends IInstance
+{
+ public default <O> CallableNoExcept<Future<O>> asyncCallsOnInstance(SerializableCallable<O> call) { return async(transfer(call)); }
+ public default <O> CallableNoExcept<O> callsOnInstance(SerializableCallable<O> call) { return sync(transfer(call)); }
+ public default <O> O callOnInstance(SerializableCallable<O> call) { return callsOnInstance(call).call(); }
+
+ public default CallableNoExcept<Future<?>> asyncRunsOnInstance(SerializableRunnable run) { return async(transfer(run)); }
+ public default Runnable runsOnInstance(SerializableRunnable run) { return sync(transfer(run)); }
+ public default void runOnInstance(SerializableRunnable run) { runsOnInstance(run).run(); }
+
+ public default <I> Function<I, Future<?>> asyncAcceptsOnInstance(SerializableConsumer<I> consumer) { return async(transfer(consumer)); }
+ public default <I> Consumer<I> acceptsOnInstance(SerializableConsumer<I> consumer) { return sync(transfer(consumer)); }
+
+ public default <I1, I2> BiFunction<I1, I2, Future<?>> asyncAcceptsOnInstance(SerializableBiConsumer<I1, I2> consumer) { return async(transfer(consumer)); }
+ public default <I1, I2> BiConsumer<I1, I2> acceptsOnInstance(SerializableBiConsumer<I1, I2> consumer) { return sync(transfer(consumer)); }
+
+ public default <I, O> Function<I, Future<O>> asyncAppliesOnInstance(SerializableFunction<I, O> f) { return async(transfer(f)); }
+ public default <I, O> Function<I, O> appliesOnInstance(SerializableFunction<I, O> f) { return sync(transfer(f)); }
+
+ public default <I1, I2, O> BiFunction<I1, I2, Future<O>> asyncAppliesOnInstance(SerializableBiFunction<I1, I2, O> f) { return async(transfer(f)); }
+ public default <I1, I2, O> BiFunction<I1, I2, O> appliesOnInstance(SerializableBiFunction<I1, I2, O> f) { return sync(transfer(f)); }
+
+ public default <I1, I2, I3, O> TriFunction<I1, I2, I3, Future<O>> asyncAppliesOnInstance(SerializableTriFunction<I1, I2, I3, O> f) { return async(transfer(f)); }
+ public default <I1, I2, I3, O> TriFunction<I1, I2, I3, O> appliesOnInstance(SerializableTriFunction<I1, I2, I3, O> f) { return sync(transfer(f)); }
+
+ public <E extends Serializable> E transfer(E object);
+
+}
diff --git a/src/main/java/org/apache/cassandra/distributed/api/IIsolatedExecutor.java b/src/main/java/org/apache/cassandra/distributed/api/IIsolatedExecutor.java
new file mode 100644
index 0000000..d811b17
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/api/IIsolatedExecutor.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.api;
+
+import java.io.Serializable;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+/**
+ * Represents a clean way to handoff evaluation of some work to an executor associated
+ * with a node's lifetime.
+ *
+ * There is no transfer of execution to the parallel class hierarchy.
+ *
+ * Classes, such as Instance, that are themselves instantiated on the correct ClassLoader, utilise this class
+ * to ensure the lifetime of any thread evaluating one of its method invocations matches the lifetime of the class itself.
+ * Since they are instantiated on the correct ClassLoader, sharing only the interface, there is no serialization necessary.
+ */
+public interface IIsolatedExecutor
+{
+ public interface CallableNoExcept<O> extends Callable<O> { public O call(); }
+ public interface SerializableCallable<O> extends CallableNoExcept<O>, Serializable { }
+ public interface SerializableRunnable extends Runnable, Serializable {}
+ public interface SerializableConsumer<O> extends Consumer<O>, Serializable {}
+ public interface SerializableSupplier<O> extends Supplier<O>, Serializable {}
+ public interface SerializableBiConsumer<I1, I2> extends BiConsumer<I1, I2>, Serializable {}
+ public interface SerializableFunction<I, O> extends Function<I, O>, Serializable {}
+ public interface SerializableBiFunction<I1, I2, O> extends BiFunction<I1, I2, O>, Serializable {}
+ public interface TriFunction<I1, I2, I3, O>
+ {
+ O apply(I1 i1, I2 i2, I3 i3);
+ }
+ public interface SerializableTriFunction<I1, I2, I3, O> extends Serializable, TriFunction<I1, I2, I3, O> { }
+
+ Future<Void> shutdown();
+
+ /**
+ * Convert the execution to one performed asynchronously on the IsolatedExecutor, returning a Future of the execution result
+ */
+ <O> CallableNoExcept<Future<O>> async(CallableNoExcept<O> call);
+
+ /**
+ * Convert the execution to one performed synchronously on the IsolatedExecutor
+ */
+ <O> CallableNoExcept<O> sync(CallableNoExcept<O> call);
+
+ /**
+ * Convert the execution to one performed asynchronously on the IsolatedExecutor, returning a Future of the execution result
+ */
+ CallableNoExcept<Future<?>> async(Runnable run);
+
+ /**
+ * Convert the execution to one performed synchronously on the IsolatedExecutor
+ */
+ Runnable sync(Runnable run);
+
+ /**
+ * Convert the execution to one performed asynchronously on the IsolatedExecutor, returning a Future of the execution result
+ */
+ <I> Function<I, Future<?>> async(Consumer<I> consumer);
+
+ /**
+ * Convert the execution to one performed synchronously on the IsolatedExecutor
+ */
+ <I> Consumer<I> sync(Consumer<I> consumer);
+
+ /**
+ * Convert the execution to one performed asynchronously on the IsolatedExecutor, returning a Future of the execution result
+ */
+ <I1, I2> BiFunction<I1, I2, Future<?>> async(BiConsumer<I1, I2> consumer);
+
+ /**
+ * Convert the execution to one performed synchronously on the IsolatedExecutor
+ */
+ <I1, I2> BiConsumer<I1, I2> sync(BiConsumer<I1, I2> consumer);
+
+ /**
+ * Convert the execution to one performed asynchronously on the IsolatedExecutor, returning a Future of the execution result
+ */
+ <I, O> Function<I, Future<O>> async(Function<I, O> f);
+
+ /**
+ * Convert the execution to one performed synchronously on the IsolatedExecutor
+ */
+ <I, O> Function<I, O> sync(Function<I, O> f);
+
+ /**
+ * Convert the execution to one performed asynchronously on the IsolatedExecutor, returning a Future of the execution result
+ */
+ <I1, I2, O> BiFunction<I1, I2, Future<O>> async(BiFunction<I1, I2, O> f);
+
+ /**
+ * Convert the execution to one performed synchronously on the IsolatedExecutor
+ */
+ <I1, I2, O> BiFunction<I1, I2, O> sync(BiFunction<I1, I2, O> f);
+
+ /**
+ * Convert the execution to one performed asynchronously on the IsolatedExecutor, returning a Future of the execution result
+ */
+ <I1, I2, I3, O> TriFunction<I1, I2, I3, Future<O>> async(TriFunction<I1, I2, I3, O> f);
+
+ /**
+ * Convert the execution to one performed synchronously on the IsolatedExecutor
+ */
+ <I1, I2, I3, O> TriFunction<I1, I2, I3, O> sync(TriFunction<I1, I2, I3, O> f);
+
+}
diff --git a/src/main/java/org/apache/cassandra/distributed/api/IListen.java b/src/main/java/org/apache/cassandra/distributed/api/IListen.java
new file mode 100644
index 0000000..d21c594
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/api/IListen.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.api;
+
+public interface IListen
+{
+ interface Cancel { void cancel(); }
+
+ Cancel schema(Runnable onChange);
+
+ Cancel liveMembers(Runnable onChange);
+}
diff --git a/src/main/java/org/apache/cassandra/distributed/api/IMessage.java b/src/main/java/org/apache/cassandra/distributed/api/IMessage.java
new file mode 100644
index 0000000..f75861e
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/api/IMessage.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.api;
+
+import java.io.Serializable;
+import java.net.InetSocketAddress;
+
+/**
+ * A cross-version interface for delivering internode messages via message sinks.
+ *
+ * Message implementations should be serializable so we could load into instances.
+ */
+public interface IMessage extends Serializable
+{
+ int verb();
+ byte[] bytes();
+ // TODO: need to make this a long
+ int id();
+ int version();
+ InetSocketAddress from();
+}
diff --git a/src/main/java/org/apache/cassandra/distributed/api/IMessageFilters.java b/src/main/java/org/apache/cassandra/distributed/api/IMessageFilters.java
new file mode 100644
index 0000000..f2cd6ee
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/api/IMessageFilters.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.api;
+
+import java.util.function.Predicate;
+
+public interface IMessageFilters
+{
+ public interface Filter
+ {
+ Filter off();
+ Filter on();
+ }
+
+ public interface Builder
+ {
+ Builder from(int ... nums);
+ Builder to(int ... nums);
+
+ Builder verbs(int... verbs);
+ Builder allVerbs();
+
+ Builder inbound(boolean inbound);
+
+ default Builder inbound()
+ {
+ return inbound(true);
+ }
+
+ default Builder outbound()
+ {
+ return inbound(false);
+ }
+
+ /**
+ * Every message for which matcher returns `true` will be _dropped_ (assuming all
+ * other matchers in the chain will return `true` as well).
+ */
+ Builder messagesMatching(Matcher filter);
+ Filter drop();
+ }
+
+ public interface Matcher
+ {
+ boolean matches(int from, int to, IMessage message);
+
+ static Matcher of(Predicate<IMessage> fn) {
+ return (from, to, m) -> fn.test(m);
+ }
+ }
+
+ Builder inbound(boolean inbound);
+ default Builder inbound() {
+ return inbound(true);
+ }
+ default Builder outbound() {
+ return inbound(false);
+ }
+ default Builder verbs(int... verbs) {
+ return inbound().verbs(verbs);
+ }
+ default Builder allVerbs() {
+ return inbound().allVerbs();
+ }
+ void reset();
+
+ /**
+ * Checks if the message should be delivered. This is expected to run on "inbound", or on the reciever of
+ * the message (instance.config.num == to).
+ *
+ * @return {@code true} value returned by the implementation implies that the message was
+ * not matched by any filters and therefore should be delivered.
+ */
+ boolean permitInbound(int from, int to, IMessage msg);
+
+ /**
+ * Checks if the message should be delivered. This is expected to run on "outbound", or on the sender of
+ * the message (instance.config.num == from).
+ *
+ * @return {@code true} value returned by the implementation implies that the message was
+ * not matched by any filters and therefore should be delivered.
+ */
+ boolean permitOutbound(int from, int to, IMessage msg);
+}
diff --git a/src/main/java/org/apache/cassandra/distributed/api/IUpgradeableInstance.java b/src/main/java/org/apache/cassandra/distributed/api/IUpgradeableInstance.java
new file mode 100644
index 0000000..da864e0
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/api/IUpgradeableInstance.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.api;
+
+import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.distributed.shared.Versions;
+
+// this lives outside the api package so that we do not have to worry about inter-version compatibility
+public interface IUpgradeableInstance extends IInstance
+{
+ // only to be invoked while the node is shutdown!
+ public void setVersion(Versions.Version version);
+}
diff --git a/src/main/java/org/apache/cassandra/distributed/api/LongTokenRange.java b/src/main/java/org/apache/cassandra/distributed/api/LongTokenRange.java
new file mode 100644
index 0000000..06327e8
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/api/LongTokenRange.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.api;
+
+import java.io.Serializable;
+
+public final class LongTokenRange implements Serializable
+{
+ public final long minExclusive;
+ public final long maxInclusive;
+
+ public LongTokenRange(long minExclusive, long maxInclusive)
+ {
+ this.minExclusive = minExclusive;
+ this.maxInclusive = maxInclusive;
+ }
+
+ public String toString()
+ {
+ return "(" + minExclusive + "," + maxInclusive + "]";
+ }
+}
diff --git a/src/main/java/org/apache/cassandra/distributed/api/NodeToolResult.java b/src/main/java/org/apache/cassandra/distributed/api/NodeToolResult.java
new file mode 100644
index 0000000..773f617
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/api/NodeToolResult.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.api;
+
+import org.apache.cassandra.distributed.shared.AssertUtils;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import javax.management.Notification;
+
+public class NodeToolResult
+{
+ private final String[] commandAndArgs;
+ private final int rc;
+ private final List<Notification> notifications;
+ private final Throwable error;
+
+ public NodeToolResult(String[] commandAndArgs, int rc, List<Notification> notifications, Throwable error)
+ {
+ this.commandAndArgs = commandAndArgs;
+ this.rc = rc;
+ this.notifications = notifications;
+ this.error = error;
+ }
+
+ public String[] getCommandAndArgs()
+ {
+ return commandAndArgs;
+ }
+
+ public int getRc()
+ {
+ return rc;
+ }
+
+ public List<Notification> getNotifications()
+ {
+ return notifications;
+ }
+
+ public Throwable getError()
+ {
+ return error;
+ }
+
+ public Asserts asserts()
+ {
+ return new Asserts();
+ }
+
+ public final class Asserts {
+ public Asserts success() {
+ AssertUtils.assertEquals("nodetool command " + commandAndArgs[0] + " was not successful", 0, rc);
+ return this;
+ }
+
+ public Asserts failure() {
+ AssertUtils.assertNotEquals("nodetool command " + commandAndArgs[0] + " was successful but not expected to be", 0, rc);
+ return this;
+ }
+
+ public Asserts errorContains(String msg) {
+ AssertUtils.assertNotNull("No exception was found but expected one", error);
+ AssertUtils.assertTrue("Error message '" + error.getMessage() + "' does not contain '" + msg + "'", error.getMessage().contains(msg));
+ return this;
+ }
+
+ public Asserts notificationContains(String msg) {
+ AssertUtils.assertNotNull("notifications not defined", notifications);
+ AssertUtils.assertFalse("notifications not defined", notifications.isEmpty());
+ for (Notification n : notifications) {
+ if (n.getMessage().contains(msg)) {
+ return this;
+ }
+ }
+ AssertUtils.fail("Unable to locate message " + msg + " in notifications: " + notifications);
+ return this; // unreachable
+ }
+
+ public Asserts notificationContains(ProgressEventType type, String msg) {
+ int userType = type.ordinal();
+ AssertUtils.assertNotNull("notifications not defined", notifications);
+ AssertUtils.assertFalse("notifications not defined", notifications.isEmpty());
+ for (Notification n : notifications) {
+ if (notificationType(n) == userType) {
+ if (n.getMessage().contains(msg)) {
+ return this;
+ }
+ }
+ }
+ AssertUtils.fail("Unable to locate message '" + msg + "' in notifications: " + notifications);
+ return this; // unreachable
+ }
+ }
+
+ private static int notificationType(Notification n)
+ {
+ return ((Map<String, Integer>) n.getUserData()).get("type").intValue();
+ }
+
+ public String toString()
+ {
+ return "NodeToolResult{" +
+ "commandAndArgs=" + Arrays.toString(commandAndArgs) +
+ ", rc=" + rc +
+ ", notifications=[" + notifications.stream().map(n -> ProgressEventType.values()[notificationType(n)].name()).collect(Collectors.joining(", ")) + "]" +
+ ", error=" + error +
+ '}';
+ }
+
+ /**
+ * Progress event type.
+ *
+ * <p>
+ * Progress starts by emitting {@link #START}, followed by emitting zero or more {@link #PROGRESS} events,
+ * then it emits either one of {@link #ERROR}/{@link #ABORT}/{@link #SUCCESS}.
+ * Progress indicates its completion by emitting {@link #COMPLETE} at the end of process.
+ * </p>
+ * <p>
+ * {@link #NOTIFICATION} event type is used to just notify message without progress.
+ * </p>
+ */
+ public enum ProgressEventType
+ {
+ /**
+ * Fired first when progress starts.
+ * Happens only once.
+ */
+ START,
+
+ /**
+ * Fire when progress happens.
+ * This can be zero or more time after START.
+ */
+ PROGRESS,
+
+ /**
+ * When observing process completes with error, this is sent once before COMPLETE.
+ */
+ ERROR,
+
+ /**
+ * When observing process is aborted by user, this is sent once before COMPLETE.
+ */
+ ABORT,
+
+ /**
+ * When observing process completes successfully, this is sent once before COMPLETE.
+ */
+ SUCCESS,
+
+ /**
+ * Fire when progress complete.
+ * This is fired once, after ERROR/ABORT/SUCCESS is fired.
+ * After this, no more ProgressEvent should be fired for the same event.
+ */
+ COMPLETE,
+
+ /**
+ * Used when sending message without progress.
+ */
+ NOTIFICATION
+ }
+}
diff --git a/src/main/java/org/apache/cassandra/distributed/api/QueryResult.java b/src/main/java/org/apache/cassandra/distributed/api/QueryResult.java
new file mode 100644
index 0000000..dcdfa14
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/api/QueryResult.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.api;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.function.Predicate;
+
+/**
+ * A table of data representing a complete query result.
+ *
+ * A <code>QueryResult</code> is different from {@link java.sql.ResultSet} in several key ways:
+ *
+ * <ul>
+ * <li>represents a complete result rather than a cursor</li>
+ * <li>returns a {@link Row} to access the current row of data</li>
+ * <li>relies on object pooling; {@link #hasNext()} may return the same object just with different data, accessing a
+ * {@link Row} from a previous {@link #hasNext()} call has undefined behavior.</li>
+ * <li>includes {@link #filter(Predicate)}, this will do client side filtering since Apache Cassandra is more
+ * restrictive on server side filtering</li>
+ * </ul>
+ *
+ * <h2>Unsafe patterns</h2>
+ *
+ * Below are a few unsafe patterns which may lead to unexpected results
+ *
+ * <code>{@code
+ * while (rs.hasNext()) {
+ * list.add(rs.next());
+ * }
+ * }</code>
+ *
+ * <code>{@code
+ * rs.forEach(list::add)
+ * }</code>
+ *
+ * Both cases have the same issue; reference to a row from a previous call to {@link #hasNext()}. Since the same {@link Row}
+ * object can be used accross different calls to {@link #hasNext()} this would mean any attempt to access after the fact
+ * points to newer data. If this behavior is not desirable and access is needed between calls, then {@link Row#copy()}
+ * should be used; this will clone the {@link Row} and return a new object pointing to the same data.
+ */
+public class QueryResult implements Iterator<Row>
+{
+ public static final QueryResult EMPTY = new QueryResult(new String[0], null);
+
+ private final String[] names;
+ private final Object[][] results;
+ private final Predicate<Row> filter;
+ private final Row row;
+ private int offset = -1;
+
+ public QueryResult(String[] names, Object[][] results)
+ {
+ this.names = Objects.requireNonNull(names, "names");
+ this.results = results;
+ this.row = new Row(names);
+ this.filter = ignore -> true;
+ }
+
+ private QueryResult(String[] names, Object[][] results, Predicate<Row> filter, int offset)
+ {
+ this.names = names;
+ this.results = results;
+ this.filter = filter;
+ this.offset = offset;
+ this.row = new Row(names);
+ }
+
+ public String[] getNames()
+ {
+ return names;
+ }
+
+ public boolean isEmpty()
+ {
+ return results.length == 0;
+ }
+
+ public int size()
+ {
+ return results.length;
+ }
+
+ public QueryResult filter(Predicate<Row> fn)
+ {
+ return new QueryResult(names, results, filter.and(fn), offset);
+ }
+
+ /**
+ * Get all rows as a 2d array. Any calls to {@link #filter(Predicate)} will be ignored and the array returned will
+ * be the full set from the query.
+ */
+ public Object[][] toObjectArrays()
+ {
+ return results;
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ if (results == null)
+ return false;
+ while ((offset += 1) < results.length)
+ {
+ row.setResults(results[offset]);
+ if (filter.test(row))
+ {
+ return true;
+ }
+ }
+ row.setResults(null);
+ return false;
+ }
+
+ @Override
+ public Row next()
+ {
+ if (offset < 0 || offset >= results.length)
+ throw new NoSuchElementException();
+ return row;
+ }
+}
diff --git a/src/main/java/org/apache/cassandra/distributed/api/Row.java b/src/main/java/org/apache/cassandra/distributed/api/Row.java
new file mode 100644
index 0000000..1dd05fe
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/api/Row.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.api;
+
+import java.util.*;
+
+/**
+ * Data representing a single row in a query result.
+ *
+ * This class is mutable from the parent {@link QueryResult} and can have the row it points to changed between calls
+ * to {@link QueryResult#hasNext()}, for this reason it is unsafe to hold reference to this class after that call;
+ * to get around this, a call to {@link #copy()} will return a new object pointing to the same row.
+ */
+public class Row
+{
+ private final Map<String, Integer> nameIndex;
+ private Object[] results; // mutable to avoid allocations in loops
+
+ public Row(String[] names)
+ {
+ Objects.requireNonNull(names, "names");
+ this.nameIndex = new HashMap<>(names.length);
+ for (int i = 0; i < names.length; i++) {
+ nameIndex.put(names[i], i);
+ }
+ }
+
+ private Row(Map<String, Integer> nameIndex)
+ {
+ this.nameIndex = nameIndex;
+ }
+
+ void setResults(Object[] results)
+ {
+ this.results = results;
+ }
+
+ /**
+ * Creates a copy of the current row; can be used past calls to {@link QueryResult#hasNext()}.
+ */
+ public Row copy() {
+ Row copy = new Row(nameIndex);
+ copy.setResults(results);
+ return copy;
+ }
+
+ public <T> T get(String name)
+ {
+ checkAccess();
+ int idx = findIndex(name);
+ if (idx == -1)
+ return null;
+ return (T) results[idx];
+ }
+
+ public String getString(String name)
+ {
+ return get(name);
+ }
+
+ public UUID getUUID(String name)
+ {
+ return get(name);
+ }
+
+ public Date getTimestamp(String name)
+ {
+ return get(name);
+ }
+
+ public <T> Set<T> getSet(String name)
+ {
+ return get(name);
+ }
+
+ public String toString()
+ {
+ return "Row{" +
+ "names=" + nameIndex.keySet() +
+ ", results=" + Arrays.toString(results) +
+ '}';
+ }
+
+ private void checkAccess()
+ {
+ if (results == null)
+ throw new NoSuchElementException();
+ }
+
+ private int findIndex(String name)
+ {
+ return nameIndex.getOrDefault(name, -1);
+ }
+}
diff --git a/src/main/java/org/apache/cassandra/distributed/api/TokenSupplier.java b/src/main/java/org/apache/cassandra/distributed/api/TokenSupplier.java
new file mode 100644
index 0000000..a714cd5
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/api/TokenSupplier.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.api;
+
+public interface TokenSupplier {
+ long token(int nodeId);
+
+ static TokenSupplier evenlyDistributedTokens(int numNodes) {
+ long increment = (Long.MAX_VALUE / numNodes) * 2;
+ return (int nodeId) -> {
+ assert nodeId <= numNodes : String.format("Can not allocate a token for a node %s, since only %s nodes are allowed by the token allocation strategy",
+ nodeId, numNodes);
+ return Long.MIN_VALUE + 1 + nodeId * increment;
+ };
+ }
+}
diff --git a/src/main/java/org/apache/cassandra/distributed/shared/AssertUtils.java b/src/main/java/org/apache/cassandra/distributed/shared/AssertUtils.java
new file mode 100644
index 0000000..f914e90
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/shared/AssertUtils.java
@@ -0,0 +1,130 @@
+package org.apache.cassandra.distributed.shared;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+public class AssertUtils {
+
+ public static void assertRows(Object[][] actual, Object[]... expected)
+ {
+ assertEquals(rowsNotEqualErrorMessage(actual, expected),
+ expected.length, actual.length);
+
+ for (int i = 0; i < expected.length; i++)
+ {
+ Object[] expectedRow = expected[i];
+ Object[] actualRow = actual[i];
+ assertTrue(rowsNotEqualErrorMessage(actual, expected),
+ Arrays.equals(expectedRow, actualRow));
+ }
+ }
+
+ public static void assertRow(Object[] actual, Object... expected)
+ {
+ assertTrue(rowNotEqualErrorMessage(actual, expected),
+ Arrays.equals(actual, expected));
+ }
+
+ public static void assertRows(Iterator<Object[]> actual, Iterator<Object[]> expected)
+ {
+ while (actual.hasNext() && expected.hasNext())
+ assertRow(actual.next(), expected.next());
+
+ assertTrue("Resultsets have different sizes", actual.hasNext() == expected.hasNext());
+ }
+
+ public static void assertRows(Iterator<Object[]> actual, Object[]... expected)
+ {
+ assertRows(actual, new Iterator<Object[]>() {
+
+ int i = 0;
+ @Override
+ public boolean hasNext() {
+ return i < expected.length;
+ }
+
+ @Override
+ public Object[] next() {
+ return expected[i++];
+ }
+ });
+ }
+
+ public static String rowNotEqualErrorMessage(Object[] actual, Object[] expected)
+ {
+ return String.format("Expected: %s\nActual:%s\n",
+ Arrays.toString(expected),
+ Arrays.toString(actual));
+ }
+
+ public static String rowsNotEqualErrorMessage(Object[][] actual, Object[][] expected)
+ {
+ return String.format("Expected: %s\nActual: %s\n",
+ rowsToString(expected),
+ rowsToString(actual));
+ }
+
+ public static String rowsToString(Object[][] rows)
+ {
+ StringBuilder builder = new StringBuilder();
+ builder.append("[");
+ boolean isFirst = true;
+ for (Object[] row : rows)
+ {
+ if (isFirst)
+ isFirst = false;
+ else
+ builder.append(",");
+ builder.append(Arrays.toString(row));
+ }
+ builder.append("]");
+ return builder.toString();
+ }
+
+ public static Object[][] toObjectArray(Iterator<Object[]> iter)
+ {
+ List<Object[]> res = new ArrayList<>();
+ while (iter.hasNext())
+ res.add(iter.next());
+
+ return res.toArray(new Object[res.size()][]);
+ }
+
+ public static Object[] row(Object... expected)
+ {
+ return expected;
+ }
+
+ public static void assertEquals(String message, long expected, long actual) {
+ if (expected != actual)
+ fail(message);
+ }
+
+ public static void assertNotEquals(String message, long expected, long actual) {
+ if (expected == actual)
+ fail(message);
+ }
+
+ public static void assertNotNull(String message, Object object) {
+ if (object == null)
+ fail(message);
+ }
+
+ public static void assertTrue(String message, boolean condition) {
+ if (!condition)
+ fail(message);
+ }
+
+ public static void assertFalse(String message, boolean condition) {
+ if (condition)
+ fail(message);
+ }
+
+
+ public static void fail(String message) {
+ throw new AssertionError(message);
+ }
+
+}
diff --git a/src/main/java/org/apache/cassandra/distributed/shared/Builder.java b/src/main/java/org/apache/cassandra/distributed/shared/Builder.java
new file mode 100644
index 0000000..6f6adfb
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/shared/Builder.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.shared;
+
+import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.cassandra.distributed.api.TokenSupplier.evenlyDistributedTokens;
+
+public abstract class Builder<I extends IInstance, C extends ICluster> {
+
+ private final int BROADCAST_PORT = 7012;
+
+ public interface Factory<I extends IInstance, C extends ICluster> {
+ C newCluster(File root, Versions.Version version, List<IInstanceConfig> configs, ClassLoader sharedClassLoader);
+ }
+
+ private final Factory<I, C> factory;
+ private int nodeCount;
+ private int subnet;
+ private Map<Integer, NetworkTopology.DcAndRack> nodeIdTopology;
+ private TokenSupplier tokenSupplier;
+ private File root;
+ private Versions.Version version;
+ private Consumer<IInstanceConfig> configUpdater;
+
+ public Builder(Factory<I, C> factory) {
+ this.factory = factory;
+ }
+
+ public C start() throws IOException {
+ C cluster = createWithoutStarting();
+ cluster.startup();
+ return cluster;
+ }
+
+ public C createWithoutStarting() throws IOException {
+ if (root == null)
+ root = Files.createTempDirectory("dtests").toFile();
+
+ if (nodeCount <= 0)
+ throw new IllegalStateException("Cluster must have at least one node");
+
+ if (nodeIdTopology == null) {
+ nodeIdTopology = IntStream.rangeClosed(1, nodeCount).boxed()
+ .collect(Collectors.toMap(nodeId -> nodeId,
+ nodeId -> NetworkTopology.dcAndRack(dcName(0), rackName(0))));
+ }
+
+ root.mkdirs();
+
+ ClassLoader sharedClassLoader = Thread.currentThread().getContextClassLoader();
+
+ List<IInstanceConfig> configs = new ArrayList<>();
+
+ // TODO: make token allocation strategy configurable
+ if (tokenSupplier == null)
+ tokenSupplier = evenlyDistributedTokens(nodeCount);
+
+ for (int i = 0; i < nodeCount; ++i) {
+ int nodeNum = i + 1;
+ configs.add(createInstanceConfig(nodeNum));
+ }
+
+ return factory.newCluster(root, version, configs, sharedClassLoader);
+ }
+
+ public IInstanceConfig newInstanceConfig(C cluster) {
+ return createInstanceConfig(cluster.size() + 1);
+ }
+
+ protected IInstanceConfig createInstanceConfig(int nodeNum) {
+ String ipPrefix = "127.0." + subnet + ".";
+ String seedIp = ipPrefix + "1";
+ String ipAddress = ipPrefix + nodeNum;
+ long token = tokenSupplier.token(nodeNum);
+
+ NetworkTopology topology = NetworkTopology.build(ipPrefix, BROADCAST_PORT, nodeIdTopology);
+
+ IInstanceConfig config = generateConfig(nodeNum, ipAddress, topology, root, String.valueOf(token), seedIp);
+ if (configUpdater != null)
+ configUpdater.accept(config);
+
+ return config;
+ }
+
+ protected abstract IInstanceConfig generateConfig(int nodeNum, String ipAddress, NetworkTopology networkTopology, File root, String token, String seedIp);
+
+ public Builder<I, C> withTokenSupplier(TokenSupplier tokenSupplier) {
+ this.tokenSupplier = tokenSupplier;
+ return this;
+ }
+
+ public Builder<I, C> withSubnet(int subnet) {
+ this.subnet = subnet;
+ return this;
+ }
+
+ public Builder<I, C> withNodes(int nodeCount) {
+ this.nodeCount = nodeCount;
+ return this;
+ }
+
+ public Builder<I, C> withDCs(int dcCount) {
+ return withRacks(dcCount, 1);
+ }
+
+ public Builder<I, C> withRacks(int dcCount, int racksPerDC) {
+ if (nodeCount == 0)
+ throw new IllegalStateException("Node count will be calculated. Do not supply total node count in the builder");
+
+ int totalRacks = dcCount * racksPerDC;
+ int nodesPerRack = (nodeCount + totalRacks - 1) / totalRacks; // round up to next integer
+ return withRacks(dcCount, racksPerDC, nodesPerRack);
+ }
+
+ public Builder<I, C> withRacks(int dcCount, int racksPerDC, int nodesPerRack) {
+ if (nodeIdTopology != null)
+ throw new IllegalStateException("Network topology already created. Call withDCs/withRacks once or before withDC/withRack calls");
+
+ nodeIdTopology = new HashMap<>();
+ int nodeId = 1;
+ for (int dc = 1; dc <= dcCount; dc++) {
+ for (int rack = 1; rack <= racksPerDC; rack++) {
+ for (int rackNodeIdx = 0; rackNodeIdx < nodesPerRack; rackNodeIdx++)
+ nodeIdTopology.put(nodeId++, NetworkTopology.dcAndRack(dcName(dc), rackName(rack)));
+ }
+ }
+ // adjust the node count to match the allocatation
+ final int adjustedNodeCount = dcCount * racksPerDC * nodesPerRack;
+ if (adjustedNodeCount != nodeCount) {
+ assert adjustedNodeCount > nodeCount : "withRacks should only ever increase the node count";
+ System.out.println(String.format("Network topology of %s DCs with %s racks per DC and %s nodes per rack required increasing total nodes to %s",
+ dcCount, racksPerDC, nodesPerRack, adjustedNodeCount));
+ nodeCount = adjustedNodeCount;
+ }
+ return this;
+ }
+
+ public Builder<I, C> withDC(String dcName, int nodeCount) {
+ return withRack(dcName, rackName(1), nodeCount);
+ }
+
+ public Builder<I, C> withRack(String dcName, String rackName, int nodesInRack) {
+ if (nodeIdTopology == null) {
+ if (nodeCount > 0)
+ throw new IllegalStateException("Node count must not be explicitly set, or allocated using withDCs/withRacks");
+
+ nodeIdTopology = new HashMap<>();
+ }
+ for (int nodeId = nodeCount + 1; nodeId <= nodeCount + nodesInRack; nodeId++)
+ nodeIdTopology.put(nodeId, NetworkTopology.dcAndRack(dcName, rackName));
+
+ nodeCount += nodesInRack;
+ return this;
+ }
+
+ // Map of node ids to dc and rack - must be contiguous with an entry nodeId 1 to nodeCount
+ public Builder<I, C> withNodeIdTopology(Map<Integer, NetworkTopology.DcAndRack> nodeIdTopology) {
+ if (nodeIdTopology.isEmpty())
+ throw new IllegalStateException("Topology is empty. It must have an entry for every nodeId.");
+
+ IntStream.rangeClosed(1, nodeIdTopology.size()).forEach(nodeId -> {
+ if (nodeIdTopology.get(nodeId) == null)
+ throw new IllegalStateException("Topology is missing entry for nodeId " + nodeId);
+ });
+
+ if (nodeCount != nodeIdTopology.size()) {
+ nodeCount = nodeIdTopology.size();
+ System.out.println(String.format("Adjusting node count to %s for supplied network topology", nodeCount));
+ }
+
+ this.nodeIdTopology = new HashMap<>(nodeIdTopology);
+
+ return this;
+ }
+
+ public Builder<I, C> withRoot(File root) {
+ this.root = root;
+ return this;
+ }
+
+ public Builder<I, C> withVersion(Versions.Version version) {
+ this.version = version;
+ return this;
+ }
+
+ public Builder<I, C> withConfig(Consumer<IInstanceConfig> updater) {
+ this.configUpdater = updater;
+ return this;
+ }
+
+ static String dcName(int index)
+ {
+ return "datacenter" + index;
+ }
+
+ static String rackName(int index)
+ {
+ return "rack" + index;
+ }
+}
+
+
diff --git a/src/main/java/org/apache/cassandra/distributed/shared/DistributedTestBase.java b/src/main/java/org/apache/cassandra/distributed/shared/DistributedTestBase.java
new file mode 100644
index 0000000..d28af2a
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/shared/DistributedTestBase.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.shared;
+
+import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.api.IInstance;
+
+public abstract class DistributedTestBase
+{
+ public void afterEach()
+ {
+ System.runFinalization();
+ System.gc();
+ }
+
+ public static void beforeClass() throws Throwable {
+ ICluster.setup();
+ }
+
+ public abstract <I extends IInstance, C extends ICluster> Builder<I, C> builder();
+
+ public static String KEYSPACE = "distributed_test_keyspace";
+
+ public static String withKeyspace(String replaceIn)
+ {
+ return String.format(replaceIn, KEYSPACE);
+ }
+
+ protected static <C extends ICluster<?>> C init(C cluster)
+ {
+ return init(cluster, cluster.size());
+ }
+
+ protected static <C extends ICluster<?>> C init(C cluster, int replicationFactor)
+ {
+ cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + replicationFactor + "};");
+ return cluster;
+ }
+
+
+}
diff --git a/src/main/java/org/apache/cassandra/distributed/shared/InstanceClassLoader.java b/src/main/java/org/apache/cassandra/distributed/shared/InstanceClassLoader.java
new file mode 100644
index 0000000..c37a520
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/shared/InstanceClassLoader.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.shared;
+
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.Arrays;
+import java.util.function.Predicate;
+
+public class InstanceClassLoader extends URLClassLoader
+{
+ private static final Predicate<String> sharePackage = name ->
+ name.startsWith("org.apache.cassandra.distributed.api.")
+ || name.startsWith("org.apache.cassandra.distributed.shared.")
+ || name.startsWith("sun.")
+ || name.startsWith("oracle.")
+ || name.startsWith("com.intellij.")
+ || name.startsWith("com.sun.")
+ || name.startsWith("com.oracle.")
+ || name.startsWith("java.")
+ || name.startsWith("javax.")
+ || name.startsWith("jdk.")
+ || name.startsWith("netscape.")
+ || name.startsWith("org.xml.sax.");
+
+ private volatile boolean isClosed = false;
+ private final URL[] urls;
+ private final int generation; // used to help debug class loader leaks, by helping determine which classloaders should have been collected
+ private final int id;
+ private final ClassLoader sharedClassLoader;
+
+ public InstanceClassLoader(int generation, int id, URL[] urls, ClassLoader sharedClassLoader)
+ {
+ super(urls, null);
+ this.urls = urls;
+ this.sharedClassLoader = sharedClassLoader;
+ this.generation = generation;
+ this.id = id;
+ }
+
+ public int getClusterGeneration()
+ {
+ return generation;
+ }
+
+ public int getInstanceId()
+ {
+ return id;
+ }
+
+ @Override
+ public Class<?> loadClass(String name) throws ClassNotFoundException
+ {
+ if (sharePackage.test(name))
+ return sharedClassLoader.loadClass(name);
+
+ return loadClassInternal(name);
+ }
+
+ Class<?> loadClassInternal(String name) throws ClassNotFoundException
+ {
+ if (isClosed)
+ throw new IllegalStateException(String.format("Can't load %s. Instance class loader is already closed.", name));
+
+ synchronized (getClassLoadingLock(name))
+ {
+ // First, check if the class has already been loaded
+ Class<?> c = findLoadedClass(name);
+
+ if (c == null)
+ c = findClass(name);
+
+ return c;
+ }
+ }
+
+ /**
+ * @return true iff this class was loaded by an InstanceClassLoader, and as such is used by a dtest node
+ */
+ public static boolean wasLoadedByAnInstanceClassLoader(Class<?> clazz)
+ {
+ return clazz.getClassLoader().getClass().getName().equals(InstanceClassLoader.class.getName());
+ }
+
+ public String toString()
+ {
+ return "InstanceClassLoader{" +
+ "generation=" + generation +
+ ", id = " + id +
+ ", urls=" + Arrays.toString(urls) +
+ '}';
+ }
+
+ public void close() throws IOException
+ {
+ isClosed = true;
+ super.close();
+ }
+}
diff --git a/src/main/java/org/apache/cassandra/distributed/shared/MessageFilters.java b/src/main/java/org/apache/cassandra/distributed/shared/MessageFilters.java
new file mode 100644
index 0000000..c1731db
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/shared/MessageFilters.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.shared;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import org.apache.cassandra.distributed.api.IMessage;
+import org.apache.cassandra.distributed.api.IMessageFilters;
+
+public class MessageFilters implements IMessageFilters
+{
+ private final List<Filter> inboundFilters = new CopyOnWriteArrayList<>();
+ private final List<Filter> outboundFilters = new CopyOnWriteArrayList<>();
+
+ public boolean permitInbound(int from, int to, IMessage msg)
+ {
+ return permit(inboundFilters, from, to, msg);
+ }
+
+ public boolean permitOutbound(int from, int to, IMessage msg)
+ {
+ return permit(outboundFilters, from, to, msg);
+ }
+
+ private static boolean permit(List<Filter> filters, int from, int to, IMessage msg)
+ {
+ for (Filter filter : filters)
+ {
+ if (filter.matches(from, to, msg))
+ return false;
+ }
+ return true;
+ }
+
+ public static class Filter implements IMessageFilters.Filter
+ {
+ final int[] from;
+ final int[] to;
+ final int[] verbs;
+ final Matcher matcher;
+ final List<Filter> parent;
+
+ Filter(int[] from, int[] to, int[] verbs, Matcher matcher, List<Filter> parent)
+ {
+ if (from != null)
+ {
+ from = from.clone();
+ Arrays.sort(from);
+ }
+ if (to != null)
+ {
+ to = to.clone();
+ Arrays.sort(to);
+ }
+ if (verbs != null)
+ {
+ verbs = verbs.clone();
+ Arrays.sort(verbs);
+ }
+ this.from = from;
+ this.to = to;
+ this.verbs = verbs;
+ this.matcher = matcher;
+ this.parent = Objects.requireNonNull(parent, "parent");
+ }
+
+ public int hashCode()
+ {
+ return (from == null ? 0 : Arrays.hashCode(from))
+ + (to == null ? 0 : Arrays.hashCode(to))
+ + (verbs == null ? 0 : Arrays.hashCode(verbs)
+ + parent.hashCode());
+ }
+
+ public boolean equals(Object that)
+ {
+ return that instanceof Filter && equals((Filter) that);
+ }
+
+ public boolean equals(Filter that)
+ {
+ return Arrays.equals(from, that.from)
+ && Arrays.equals(to, that.to)
+ && Arrays.equals(verbs, that.verbs)
+ && parent.equals(that.parent);
+ }
+
+ public Filter off()
+ {
+ parent.remove(this);
+ return this;
+ }
+
+ public Filter on()
+ {
+ parent.add(this);
+ return this;
+ }
+
+ public boolean matches(int from, int to, IMessage msg)
+ {
+ return (this.from == null || Arrays.binarySearch(this.from, from) >= 0)
+ && (this.to == null || Arrays.binarySearch(this.to, to) >= 0)
+ && (this.verbs == null || Arrays.binarySearch(this.verbs, msg.verb()) >= 0)
+ && (this.matcher == null || this.matcher.matches(from, to, msg));
+ }
+ }
+
+ public class Builder implements IMessageFilters.Builder
+ {
+ int[] from;
+ int[] to;
+ int[] verbs;
+ Matcher matcher;
+ boolean inbound;
+
+ private Builder(boolean inbound)
+ {
+ this.inbound = inbound;
+ }
+
+ public Builder from(int... nums)
+ {
+ from = nums;
+ return this;
+ }
+
+ public Builder to(int... nums)
+ {
+ to = nums;
+ return this;
+ }
+
+ public IMessageFilters.Builder verbs(int... verbs)
+ {
+ this.verbs = verbs;
+ return this;
+ }
+
+ public IMessageFilters.Builder allVerbs()
+ {
+ this.verbs = null;
+ return this;
+ }
+
+ public IMessageFilters.Builder inbound(boolean inbound)
+ {
+ this.inbound = inbound;
+ return this;
+ }
+
+ public IMessageFilters.Builder messagesMatching(Matcher matcher)
+ {
+ this.matcher = matcher;
+ return this;
+ }
+
+ public IMessageFilters.Filter drop()
+ {
+ return new Filter(from, to, verbs, matcher, inbound ? inboundFilters : outboundFilters).on();
+ }
+ }
+
+ public IMessageFilters.Builder inbound(boolean inbound)
+ {
+ return new Builder(inbound);
+ }
+
+ @Override
+ public void reset()
+ {
+ inboundFilters.clear();
+ outboundFilters.clear();
+ }
+}
diff --git a/src/main/java/org/apache/cassandra/distributed/shared/NetworkTopology.java b/src/main/java/org/apache/cassandra/distributed/shared/NetworkTopology.java
new file mode 100644
index 0000000..7bd91d3
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/shared/NetworkTopology.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.shared;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.IntFunction;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public class NetworkTopology
+{
+ private final Map<InetSocketAddress, DcAndRack> map;
+
+ public static class DcAndRack
+ {
+ private final String dc;
+ private final String rack;
+
+ private DcAndRack(String dc, String rack)
+ {
+ this.dc = dc;
+ this.rack = rack;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "DcAndRack{" +
+ "dc='" + dc + '\'' +
+ ", rack='" + rack + '\'' +
+ '}';
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ DcAndRack dcAndRack = (DcAndRack) o;
+ return Objects.equals(dc, dcAndRack.dc) &&
+ Objects.equals(rack, dcAndRack.rack);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(dc, rack);
+ }
+ }
+
+ public static DcAndRack dcAndRack(String dc, String rack)
+ {
+ return new DcAndRack(dc, rack);
+ }
+
+ public static InetSocketAddress addressAndPort(InetAddress address, int port)
+ {
+ return new InetSocketAddress(address, port);
+ }
+
+ public static InetSocketAddress addressAndPort(String address, int port)
+ {
+ try
+ {
+ return new InetSocketAddress(InetAddress.getByName(address), port);
+ }
+ catch (UnknownHostException e)
+ {
+ throw new IllegalArgumentException("Unknown address '" + address + '\'');
+ }
+ }
+
+ private NetworkTopology()
+ {
+ map = new HashMap<>();
+ }
+
+ @SuppressWarnings("WeakerAccess")
+ public NetworkTopology(NetworkTopology networkTopology)
+ {
+ map = new HashMap<>(networkTopology.map);
+ }
+
+ public static NetworkTopology build(String ipPrefix, int broadcastPort, Map<Integer, DcAndRack> nodeIdTopology)
+ {
+ final NetworkTopology topology = new NetworkTopology();
+
+ for (int nodeId = 1; nodeId <= nodeIdTopology.size(); nodeId++)
+ {
+ String broadcastAddress = ipPrefix + nodeId;
+
+ DcAndRack dcAndRack = nodeIdTopology.get(nodeId);
+ if (dcAndRack == null)
+ throw new IllegalStateException("nodeId " + nodeId + "not found in instanceMap");
+
+ topology.put(addressAndPort(broadcastAddress, broadcastPort), dcAndRack);
+ }
+ return topology;
+ }
+
+ public DcAndRack put(InetSocketAddress addressAndPort, DcAndRack value)
+ {
+ return map.put(addressAndPort, value);
+ }
+
+ public String localRack(InetSocketAddress key)
+ {
+ DcAndRack p = map.get(key);
+ if (p == null)
+ return null;
+ return p.rack;
+ }
+
+ public String localDC(InetSocketAddress key)
+ {
+ DcAndRack p = map.get(key);
+ if (p == null)
+ return null;
+ return p.dc;
+ }
+
+ public boolean contains(InetSocketAddress key)
+ {
+ return map.containsKey(key);
+ }
+
+ public String toString()
+ {
+ return "NetworkTopology{" + map + '}';
+ }
+
+
+ public static Map<Integer, NetworkTopology.DcAndRack> singleDcNetworkTopology(int nodeCount,
+ String dc,
+ String rack)
+ {
+ return networkTopology(nodeCount, (nodeid) -> NetworkTopology.dcAndRack(dc, rack));
+ }
+
+ public static Map<Integer, NetworkTopology.DcAndRack> networkTopology(int nodeCount,
+ IntFunction<DcAndRack> dcAndRackSupplier)
+ {
+
+ return IntStream.rangeClosed(1, nodeCount).boxed()
+ .collect(Collectors.toMap(nodeId -> nodeId,
+ dcAndRackSupplier::apply));
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/cassandra/distributed/shared/ThrowingRunnable.java b/src/main/java/org/apache/cassandra/distributed/shared/ThrowingRunnable.java
new file mode 100644
index 0000000..01f8d29
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/shared/ThrowingRunnable.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.shared;
+
+public interface ThrowingRunnable
+{
+ public void run() throws Throwable;
+
+ public static Runnable toRunnable(ThrowingRunnable runnable)
+ {
+ return () -> {
+ try
+ {
+ runnable.run();
+ }
+ catch (Throwable throwable)
+ {
+ throw new RuntimeException(throwable);
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/cassandra/distributed/shared/Versions.java b/src/main/java/org/apache/cassandra/distributed/shared/Versions.java
new file mode 100644
index 0000000..51c9e49
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/shared/Versions.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.shared;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.nio.file.Paths;
+import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+public class Versions
+{
+ private static final Logger logger = LoggerFactory.getLogger(Versions.class);
+
+ public static final String PROPERTY_PREFIX = "cassandra.";
+
+ public static URL[] getClassPath()
+ {
+ // In Java 9 the default system ClassLoader got changed from URLClassLoader to AppClassLoader which
+ // does not extend URLClassLoader nor does it give access to the class path array.
+ // Java requires the system property 'java.class.path' to be setup with the classpath seperated by :
+ // so this logic parses that string into the URL[] needed to build later
+ String cp = System.getProperty("java.class.path");
+ assert cp != null && !cp.isEmpty();
+ String[] split = cp.split(File.pathSeparator);
+ assert split.length > 0;
+ URL[] urls = new URL[split.length];
+ try
+ {
+ for (int i = 0; i < split.length; i++)
+ urls[i] = Paths.get(split[i]).toUri().toURL();
+ }
+ catch (MalformedURLException e)
+ {
+ throw new RuntimeException(e);
+ }
+ return urls;
+ }
+
+ public enum Major
+ {
+ v22("2\\.2\\.([0-9]+)"),
+ v30("3\\.0\\.([0-9]+)"),
+ v3X("3\\.([1-9]|1[01])(\\.([0-9]+))?"),
+ v4("4\\.([0-9]+)");
+ final Pattern pattern;
+ Major(String verify)
+ {
+ this.pattern = Pattern.compile(verify);
+ }
+
+ static Major fromFull(String version)
+ {
+ if (version.isEmpty())
+ throw new IllegalArgumentException(version);
+ switch (version.charAt(0))
+ {
+ case '2':
+ if (version.startsWith("2.2"))
+ return v22;
+ throw new IllegalArgumentException(version);
+ case '3':
+ if (version.startsWith("3.0"))
+ return v30;
+ return v3X;
+ case '4':
+ return v4;
+ default:
+ throw new IllegalArgumentException(version);
+ }
+ }
+
+ // verify that the version string is valid for this major version
+ boolean verify(String version)
+ {
+ return pattern.matcher(version).matches();
+ }
+
+ // sort two strings of the same major version as this enum instance
+ int compare(String a, String b)
+ {
+ Matcher ma = pattern.matcher(a);
+ Matcher mb = pattern.matcher(a);
+ if (!ma.matches()) throw new IllegalArgumentException(a);
+ if (!mb.matches()) throw new IllegalArgumentException(b);
+ int result = Integer.compare(Integer.parseInt(ma.group(1)), Integer.parseInt(mb.group(1)));
+ if (result == 0 && this == v3X && (ma.group(3) != null || mb.group(3) != null))
+ {
+ if (ma.group(3) != null && mb.group(3) != null)
+ {
+ result = Integer.compare(Integer.parseInt(ma.group(3)), Integer.parseInt(mb.group(3)));
+ }
+ else
+ {
+ result = ma.group(3) != null ? 1 : -1;
+ }
+ }
+ // sort descending
+ return -result;
+ }
+ }
+
+ public static class Version
+ {
+ public final Major major;
+ public final String version;
+ public final URL[] classpath;
+
+ public Version(String version, URL[] classpath)
+ {
+ this(Major.fromFull(version), version, classpath);
+ }
+ public Version(Major major, String version, URL[] classpath)
+ {
+ this.major = major;
+ this.version = version;
+ this.classpath = classpath;
+ }
+ }
+
+ private final Map<Major, List<Version>> versions;
+ public Versions(Map<Major, List<Version>> versions)
+ {
+ this.versions = versions;
+ }
+
+ public Version get(String full)
+ {
+ return versions.get(Major.fromFull(full))
+ .stream()
+ .filter(v -> full.equals(v.version))
+ .findFirst()
+ .orElseThrow(() -> new RuntimeException("No version " + full + " found"));
+ }
+
+ public Version getLatest(Major major)
+ {
+ return versions.get(major).stream().findFirst().orElseThrow(() -> new RuntimeException("No " + major + " versions found"));
+ }
+
+ public static Versions find()
+ {
+ final String dtestJarDirectory = System.getProperty(PROPERTY_PREFIX + "test.dtest_jar_path","build");
+ final File sourceDirectory = new File(dtestJarDirectory);
+ logger.info("Looking for dtest jars in " + sourceDirectory.getAbsolutePath());
+ final Pattern pattern = Pattern.compile("dtest-(?<fullversion>(\\d+)\\.(\\d+)(\\.\\d+)?(\\.\\d+)?)([~\\-]\\w[.\\w]*(?:\\-\\w[.\\w]*)*)?(\\+[.\\w]+)?\\.jar");
+ final Map<Major, List<Version>> versions = new HashMap<>();
+ for (Major major : Major.values())
+ versions.put(major, new ArrayList<>());
+
+ for (File file : sourceDirectory.listFiles())
+ {
+ Matcher m = pattern.matcher(file.getName());
+ if (!m.matches())
+ continue;
+ String version = m.group(1);
+ Major major = Major.fromFull(version);
+ versions.get(major).add(new Version(major, version, new URL[] { toURL(file) }));
+ }
+
+ for (Map.Entry<Major, List<Version>> e : versions.entrySet())
+ {
+ if (e.getValue().isEmpty())
+ continue;
+ Collections.sort(e.getValue(), Comparator.comparing(v -> v.version, e.getKey()::compare));
+ System.out.println("Found " + e.getValue().stream().map(v -> v.version).collect(Collectors.joining(", ")));
+ }
+
+ return new Versions(versions);
+ }
+
+ public static URL toURL(File file)
+ {
+ try
+ {
+ return file.toURI().toURL();
+ }
+ catch (MalformedURLException e)
+ {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+}
diff --git a/test/conf/logback-dtest.xml b/test/conf/logback-dtest.xml
new file mode 100644
index 0000000..370e1e5
--- /dev/null
+++ b/test/conf/logback-dtest.xml
@@ -0,0 +1,77 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+<configuration debug="false" scan="true" scanPeriod="60 seconds">
+ <define name="instance_id" class="org.apache.cassandra.distributed.impl.InstanceIDDefiner" />
+
+ <!-- Shutdown hook ensures that async appender flushes -->
+ <shutdownHook class="ch.qos.logback.core.hook.DelayingShutdownHook"/>
+
+ <appender name="INSTANCEFILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
+
+ <file>./build/test/logs/${cassandra.testtag}/TEST-${suitename}.log</file>
+ <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
+ <fileNamePattern>./build/test/logs/${cassandra.testtag}/TEST-${suitename}.log.%i.gz</fileNamePattern>
+ <minIndex>1</minIndex>
+ <maxIndex>20</maxIndex>
+ </rollingPolicy>
+
+ <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+ <maxFileSize>20MB</maxFileSize>
+ </triggeringPolicy>
+
+ <encoder>
+ <pattern>%-5level [%thread] ${instance_id} %date{ISO8601} %msg%n</pattern>
+ </encoder>
+ <immediateFlush>false</immediateFlush>
+ </appender>
+
+ <appender name="INSTANCEASYNCFILE" class="ch.qos.logback.classic.AsyncAppender">
+ <discardingThreshold>0</discardingThreshold>
+ <maxFlushTime>0</maxFlushTime>
+ <queueSize>1024</queueSize>
+ <appender-ref ref="INSTANCEFILE"/>
+ </appender>
+
+ <appender name="INSTANCESTDERR" target="System.err" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%-5level %date{HH:mm:ss,SSS} %msg%n</pattern>
+ </encoder>
+ <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+ <level>WARN</level>
+ </filter>
+ </appender>
+
+ <appender name="INSTANCESTDOUT" target="System.out" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%-5level [%thread] ${instance_id} %date{ISO8601} %F:%L - %msg%n</pattern>
+ </encoder>
+ <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+ <level>DEBUG</level>
+ </filter>
+ </appender>
+
+ <logger name="org.apache.hadoop" level="WARN"/>
+
+ <root level="DEBUG">
+ <appender-ref ref="INSTANCEASYNCFILE" />
+ <appender-ref ref="INSTANCESTDERR" />
+ <appender-ref ref="INSTANCESTDOUT" />
+ </root>
+</configuration>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org