You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2019/11/11 14:56:14 UTC
[cassandra] branch cassandra-2.2 updated: Add client testing
capabilities to in-jvm tests
This is an automated email from the ASF dual-hosted git repository.
ifesdjeen pushed a commit to branch cassandra-2.2
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-2.2 by this push:
new 50b7094 Add client testing capabilities to in-jvm tests
50b7094 is described below
commit 50b7094278241f389d3b0b49b02e893fd4322b12
Author: Doug Rohrer <dr...@apple.com>
AuthorDate: Mon Oct 14 13:42:35 2019 -0400
Add client testing capabilities to in-jvm tests
Patch by Doug Rohrer, reviewed by Alex Petrov for CASSANDRA-15347.
Co-authored-by: Alex Petrov <ol...@gmail.com>
---
.../apache/cassandra/service/CassandraDaemon.java | 96 +++++++++----
.../org/apache/cassandra/thrift/ThriftServer.java | 5 +
.../org/apache/cassandra/transport/Server.java | 41 +++++-
.../apache/cassandra/distributed/api/Feature.java | 2 +-
.../cassandra/distributed/api/IInstance.java | 2 +
.../apache/cassandra/distributed/api/IListen.java | 2 +
.../distributed/impl/AbstractCluster.java | 158 +++++++++++++++++----
.../cassandra/distributed/impl/Instance.java | 21 +++
.../distributed/impl/InstanceClassLoader.java | 16 ++-
.../cassandra/distributed/impl/InstanceConfig.java | 34 +++--
.../apache/cassandra/distributed/impl/Listen.java | 20 ++-
.../apache/cassandra/distributed/impl/RowUtil.java | 17 +++
.../distributed/test/DistributedTestBase.java | 7 +
.../distributed/test/NativeProtocolTest.java | 59 ++++++++
.../distributed/test/ResourceLeakTest.java | 16 +++
15 files changed, 424 insertions(+), 72 deletions(-)
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 1380f43..c0ba38e 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -31,7 +31,6 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
-import javax.management.MBeanServer;
import javax.management.ObjectName;
import javax.management.StandardMBean;
import javax.management.remote.JMXConnectorServer;
@@ -83,6 +82,13 @@ public class CassandraDaemon
private static JMXConnectorServer jmxServer = null;
private static final Logger logger;
+
+ @VisibleForTesting
+ public static CassandraDaemon getInstanceForTesting()
+ {
+ return instance;
+ }
+
static {
// Need to register metrics before instrumented appender is created(first access to LoggerFactory).
SharedMetricRegistries.getOrCreate("logback-metrics").addListener(new MetricRegistryListener.Base()
@@ -357,13 +363,53 @@ public class CassandraDaemon
int rpcPort = DatabaseDescriptor.getRpcPort();
int listenBacklog = DatabaseDescriptor.getRpcListenBacklog();
thriftServer = new ThriftServer(rpcAddr, rpcPort, listenBacklog);
+ initializeNativeTransport();
+
+ completeSetup();
+ }
+ public void initializeNativeTransport()
+ {
// Native transport
InetAddress nativeAddr = DatabaseDescriptor.getRpcAddress();
int nativePort = DatabaseDescriptor.getNativeTransportPort();
nativeServer = new org.apache.cassandra.transport.Server(nativeAddr, nativePort);
+ }
- completeSetup();
+ public void startNativeTransport()
+ {
+ validateTransportsCanStart();
+
+ if (nativeServer == null)
+ throw new IllegalStateException("native transport should be set up before it can be started");
+
+ nativeServer.start();
+ }
+
+ private void validateTransportsCanStart()
+ {
+ // We only start transports if bootstrap has completed and we're not in survey mode, OR if we are in
+ // survey mode and streaming has completed but we're not using auth.
+ // OR if we have not joined the ring yet.
+ if (StorageService.instance.hasJoined())
+ {
+ if (StorageService.instance.isSurveyMode())
+ {
+ if (StorageService.instance.isBootstrapMode() || DatabaseDescriptor.getAuthenticator().requireAuthentication())
+ {
+ throw new IllegalStateException("Not starting client transports in write_survey mode as it's bootstrapping or " +
+ "auth is enabled");
+ }
+ }
+ else
+ {
+ if (!SystemKeyspace.bootstrapComplete())
+ {
+ throw new IllegalStateException("Node is not yet bootstrapped completely. Use nodetool to check bootstrap" +
+ " state and resume. For more, see `nodetool help bootstrap`");
+ }
+ }
+ }
}
/*
@@ -440,28 +486,15 @@ public class CassandraDaemon
*/
public void start()
{
- // We only start transports if bootstrap has completed and we're not in survey mode, OR if we are in
- // survey mode and streaming has completed but we're not using auth.
- // OR if we have not joined the ring yet.
- if (StorageService.instance.hasJoined())
+ try
{
- if (StorageService.instance.isSurveyMode())
- {
- if (StorageService.instance.isBootstrapMode() || DatabaseDescriptor.getAuthenticator().requireAuthentication())
- {
- logger.info("Not starting client transports in write_survey mode as it's bootstrapping or " +
- "auth is enabled");
- return;
- }
- }
- else
- {
- if (!SystemKeyspace.bootstrapComplete())
- {
- logger.info("Not starting client transports as bootstrap has not completed");
- return;
- }
- }
+ validateTransportsCanStart();
+ }
+ catch (IllegalStateException isx)
+ {
+ // If there are any errors, we just log and return in this case
+ logger.info(isx.getMessage());
+ return;
}
String nativeFlag = System.getProperty("cassandra.start_native_transport");
@@ -510,6 +543,18 @@ public class CassandraDaemon
}
}
+ @VisibleForTesting
+ public void destroyNativeTransport() throws InterruptedException
+ {
+ // In 2.2, just stopping the server works. Future versions require `destroy` to be called
+ // so we maintain the name for consistency
+ if (nativeServer != null)
+ {
+ nativeServer.stopAndAwaitTermination();
+ nativeServer = null;
+ }
+ }
+
/**
* Clean up all resources obtained during the lifetime of the daemon. This
@@ -648,7 +693,7 @@ public class CassandraDaemon
logger.info("No gossip backlog; proceeding");
}
- public static void stop(String[] args)
+ public static void stop(String[] args) throws InterruptedException
{
instance.deactivate();
}
@@ -703,6 +748,9 @@ public class CassandraDaemon
*/
public void stop();
+ @VisibleForTesting
+ public void stopAndAwaitTermination();
+
/**
* Returns whether the server is currently running.
*/
diff --git a/src/java/org/apache/cassandra/thrift/ThriftServer.java b/src/java/org/apache/cassandra/thrift/ThriftServer.java
index 44ec524..87dcd3e 100644
--- a/src/java/org/apache/cassandra/thrift/ThriftServer.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftServer.java
@@ -77,6 +77,11 @@ public class ThriftServer implements CassandraDaemon.Server
}
}
+ public void stopAndAwaitTermination()
+ {
+ stop();
+ }
+
public boolean isRunning()
{
return server != null;
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index c91d37d..418f6f7 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -24,10 +24,12 @@ import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
+import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -124,7 +126,14 @@ public class Server implements CassandraDaemon.Server
public void stop()
{
if (isRunning.compareAndSet(true, false))
- close();
+ close(false);
+ }
+
+ @VisibleForTesting
+ public void stopAndAwaitTermination()
+ {
+ if (isRunning.compareAndSet(true, false))
+ close(true);
}
public boolean isRunning()
@@ -206,15 +215,39 @@ public class Server implements CassandraDaemon.Server
private void close()
{
+ close(false);
+ }
+
+ private void closeAndAwait()
+ {
+ close(true);
+ }
+
+ private void close(boolean awaitTermination)
+ {
// Close opened connections
connectionTracker.closeAll();
workerGroup.shutdownGracefully();
- workerGroup = null;
-
eventExecutorGroup.shutdown();
- eventExecutorGroup = null;
+
logger.info("Stop listening for CQL clients");
+ if (awaitTermination)
+ {
+ try
+ {
+ workerGroup.awaitTermination(1, TimeUnit.MINUTES);
+ eventExecutorGroup.awaitTermination(1, TimeUnit.MINUTES);
+ }
+ catch (InterruptedException e)
+ {
+ logger.error(e.getMessage());
+ }
+ }
+
+ workerGroup = null;
+ eventExecutorGroup = null;
+
StorageService.instance.setRpcReady(false);
}
diff --git a/test/distributed/org/apache/cassandra/distributed/api/Feature.java b/test/distributed/org/apache/cassandra/distributed/api/Feature.java
index a5c9316..b4ba036 100644
--- a/test/distributed/org/apache/cassandra/distributed/api/Feature.java
+++ b/test/distributed/org/apache/cassandra/distributed/api/Feature.java
@@ -20,5 +20,5 @@ package org.apache.cassandra.distributed.api;
public enum Feature
{
- NETWORK, GOSSIP
+ NETWORK, GOSSIP, NATIVE_PROTOCOL
}
diff --git a/test/distributed/org/apache/cassandra/distributed/api/IInstance.java b/test/distributed/org/apache/cassandra/distributed/api/IInstance.java
index 6a9e33a..e6c705c 100644
--- a/test/distributed/org/apache/cassandra/distributed/api/IInstance.java
+++ b/test/distributed/org/apache/cassandra/distributed/api/IInstance.java
@@ -41,6 +41,8 @@ public interface IInstance extends IIsolatedExecutor
Future<Void> shutdown();
Future<Void> shutdown(boolean graceful);
+ int liveMemberCount();
+
// these methods are not for external use, but for simplicity we leave them public and on the normal IInstance interface
void startup(ICluster cluster);
void receiveMessage(IMessage message);
diff --git a/test/distributed/org/apache/cassandra/distributed/api/IListen.java b/test/distributed/org/apache/cassandra/distributed/api/IListen.java
index d3a80da..c2e8dd6 100644
--- a/test/distributed/org/apache/cassandra/distributed/api/IListen.java
+++ b/test/distributed/org/apache/cassandra/distributed/api/IListen.java
@@ -23,4 +23,6 @@ public interface IListen
public interface Cancel { void cancel(); }
Cancel schema(Runnable onChange);
+
+ Cancel liveMembers(Runnable onChange);
}
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index f603497..0283457 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@ -43,9 +43,11 @@ import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.distributed.api.Feature;
import org.apache.cassandra.distributed.api.ICoordinator;
import org.apache.cassandra.distributed.api.IInstance;
import org.apache.cassandra.distributed.api.IInstanceConfig;
@@ -129,7 +131,7 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
private IInvokableInstance newInstance(int generation)
{
- ClassLoader classLoader = new InstanceClassLoader(generation, version.classpath, sharedClassLoader);
+ ClassLoader classLoader = new InstanceClassLoader(generation, config.num, version.classpath, sharedClassLoader);
return Instance.transferAdhoc((SerializableBiFunction<IInstanceConfig, ClassLoader, Instance>)Instance::new, classLoader)
.apply(config, classLoader);
}
@@ -171,6 +173,14 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
return future;
}
+ public int liveMemberCount()
+ {
+ if (!isShutdown && delegate != null)
+ return delegate().liveMemberCount();
+
+ throw new IllegalStateException("Cannot get live member count on shutdown instance");
+ }
+
@Override
public void receiveMessage(IMessage message)
{
@@ -250,16 +260,22 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
}
public void forEach(IIsolatedExecutor.SerializableRunnable runnable) { forEach(i -> i.sync(runnable)); }
- public void forEach(Consumer<? super I> consumer) { instances.forEach(consumer); }
- public void parallelForEach(IIsolatedExecutor.SerializableConsumer<? super I> consumer, long timeout, TimeUnit units)
+ public void forEach(Consumer<? super I> consumer) { forEach(instances, consumer); }
+ public void forEach(List<I> instancesForOp, Consumer<? super I> consumer) { instancesForOp.forEach(consumer); }
+
+ public void parallelForEach(IIsolatedExecutor.SerializableConsumer<? super I> consumer, long timeout, TimeUnit unit)
+ {
+ parallelForEach(instances, consumer, timeout, unit);
+ }
+
+ public void parallelForEach(List<I> instances, IIsolatedExecutor.SerializableConsumer<? super I> consumer, long timeout, TimeUnit unit)
{
FBUtilities.waitOnFutures(instances.stream()
.map(i -> i.async(consumer).apply(i))
.collect(Collectors.toList()),
- timeout, units);
+ timeout, unit);
}
-
public IMessageFilters filters() { return filters; }
public MessageFilters.Builder verbs(MessagingService.Verb ... verbs) { return filters.verbs(verbs); }
@@ -278,7 +294,7 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
{
// execute the schema change
coordinator(1).execute(query, ConsistencyLevel.ALL);
- monitor.waitForAgreement();
+ monitor.waitForCompletion();
}
}).run();
}
@@ -301,33 +317,26 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
}
}
- /**
- * Will wait for a schema change AND agreement that occurs after it is created
- * (and precedes the invocation to waitForAgreement)
- *
- * Works by simply checking if all UUIDs agree after any schema version change event,
- * so long as the waitForAgreement method has been entered (indicating the change has
- * taken place on the coordinator)
- *
- * This could perhaps be made a little more robust, but this should more than suffice.
- */
- public class SchemaChangeMonitor implements AutoCloseable
+ public abstract class ChangeMonitor implements AutoCloseable
{
final List<IListen.Cancel> cleanup;
- volatile boolean schemaHasChanged;
- final SimpleCondition agreement = new SimpleCondition();
+ final SimpleCondition completed;
+ private final long timeOut;
+ private final TimeUnit timeoutUnit;
+ volatile boolean changed;
- public SchemaChangeMonitor()
+ public ChangeMonitor(long timeOut, TimeUnit timeoutUnit)
{
+ this.timeOut = timeOut;
+ this.timeoutUnit = timeoutUnit;
this.cleanup = new ArrayList<>(instances.size());
- for (IInstance instance : instances)
- cleanup.add(instance.listen().schema(this::signal));
+ this.completed = new SimpleCondition();
}
- private void signal()
+ protected void signal()
{
- if (schemaHasChanged && 1 == instances.stream().map(IInstance::schemaVersion).distinct().count())
- agreement.signalAll();
+ if (changed && isCompleted())
+ completed.signalAll();
}
@Override
@@ -337,20 +346,91 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
cancel.cancel();
}
- public void waitForAgreement()
+ public void waitForCompletion()
{
- schemaHasChanged = true;
+ startPolling();
+ changed = true;
signal();
try
{
- if (!agreement.await(1L, TimeUnit.MINUTES))
+ if (!completed.await(timeOut, timeoutUnit))
throw new InterruptedException();
}
catch (InterruptedException e)
{
- throw new IllegalStateException("Schema agreement not reached");
+ throw new IllegalStateException(getMonitorTimeoutMessage());
}
}
+
+ private void startPolling()
+ {
+ for (IInstance instance : instances)
+ cleanup.add(startPolling(instance));
+ }
+
+ protected abstract IListen.Cancel startPolling(IInstance instance);
+
+ protected abstract boolean isCompleted();
+
+ protected abstract String getMonitorTimeoutMessage();
+ }
+
+
+
+ /**
+ * Will wait for a schema change AND agreement that occurs after it is created
+ * (and precedes the invocation to waitForAgreement)
+ *
+ * Works by simply checking if all UUIDs agree after any schema version change event,
+ * so long as the waitForAgreement method has been entered (indicating the change has
+ * taken place on the coordinator)
+ *
+ * This could perhaps be made a little more robust, but this should more than suffice.
+ */
+ public class SchemaChangeMonitor extends ChangeMonitor
+ {
+ public SchemaChangeMonitor()
+ {
+ super(70, TimeUnit.SECONDS);
+ }
+
+ protected IListen.Cancel startPolling(IInstance instance)
+ {
+ return instance.listen().schema(this::signal);
+ }
+
+ protected boolean isCompleted()
+ {
+ return 1 == instances.stream().map(IInstance::schemaVersion).distinct().count();
+ }
+
+ protected String getMonitorTimeoutMessage()
+ {
+ return "Schema agreement not reached";
+ }
+ }
+
+ public class AllMembersAliveMonitor extends ChangeMonitor
+ {
+ public AllMembersAliveMonitor()
+ {
+ super(60, TimeUnit.SECONDS);
+ }
+
+ protected IListen.Cancel startPolling(IInstance instance)
+ {
+ return instance.listen().liveMembers(this::signal);
+ }
+
+ protected boolean isCompleted()
+ {
+ return instances.stream().allMatch(i -> !i.config().has(Feature.GOSSIP) || i.liveMemberCount() == instances.size());
+ }
+
+ protected String getMonitorTimeoutMessage()
+ {
+ return "Live member count did not converge across all instances";
+ }
}
public void schemaChange(String statement, int instance)
@@ -360,7 +440,25 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
void startup()
{
- parallelForEach(I::startup, 0, null);
+ try (AllMembersAliveMonitor monitor = new AllMembersAliveMonitor())
+ {
+ // Start any instances with auto_bootstrap enabled first, and in series to avoid issues
+ // with multiple nodes bootstrapping with consistent range movement enabled,
+ // and then start any instances with it disabled in parallel.
+ List<I> startSequentially = new ArrayList<>();
+ List<I> startParallel = new ArrayList<>();
+ for (I instance : instances)
+ {
+ if ((boolean) instance.config().get("auto_bootstrap"))
+ startSequentially.add(instance);
+ else
+ startParallel.add(instance);
+ }
+
+ forEach(startSequentially, I::startup);
+ parallelForEach(startParallel, I::startup, 0, null);
+ monitor.waitForCompletion();
+ }
}
protected interface Factory<I extends IInstance, C extends AbstractCluster<I>>
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index d3f2955..c8613f7 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -73,6 +73,7 @@ import org.apache.cassandra.net.IMessageSink;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.CassandraDaemon;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.PendingRangeCalculatorService;
import org.apache.cassandra.service.QueryState;
@@ -90,6 +91,7 @@ import org.apache.cassandra.utils.concurrent.Ref;
import static java.util.concurrent.TimeUnit.MINUTES;
import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
public class Instance extends IsolatedExecutor implements IInvokableInstance
{
@@ -426,6 +428,7 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
if (config.has(GOSSIP))
{
StorageService.instance.initServer();
+ StorageService.instance.removeShutdownHook();
}
else
{
@@ -436,6 +439,12 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
SystemKeyspace.finishStartup();
+ if (config.has(NATIVE_PROTOCOL))
+ {
+ CassandraDaemon.getInstanceForTesting().initializeNativeTransport();
+ CassandraDaemon.getInstanceForTesting().startNativeTransport();
+ }
+
if (!FBUtilities.getBroadcastAddress().equals(broadcastAddressAndPort().address))
throw new IllegalStateException();
if (DatabaseDescriptor.getStoragePort() != broadcastAddressAndPort().port)
@@ -525,6 +534,8 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
Future<?> future = async((ExecutorService executor) -> {
Throwable error = null;
+ error = parallelRun(error, executor, CassandraDaemon.getInstanceForTesting()::destroyNativeTransport);
+
if (config.has(GOSSIP) || config.has(NETWORK))
{
StorageService.instance.shutdownServer();
@@ -565,6 +576,16 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
.thenRun(super::shutdown);
}
+ public int liveMemberCount()
+ {
+ return sync(() -> {
+ if (!DatabaseDescriptor.isDaemonInitialized() || !Gossiper.instance.isEnabled())
+ return 0;
+ return Gossiper.instance.getLiveMembers().size();
+ }).call();
+ }
+
+
private static Throwable parallelRun(Throwable accumulate, ExecutorService runOn, ThrowingRunnable ... runnables)
{
List<Future<Throwable>> results = new ArrayList<>();
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/InstanceClassLoader.java b/test/distributed/org/apache/cassandra/distributed/impl/InstanceClassLoader.java
index ca6d713..fb78902 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceClassLoader.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceClassLoader.java
@@ -24,6 +24,7 @@ import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.SigarLibrary;
+import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.Arrays;
@@ -65,16 +66,19 @@ public class InstanceClassLoader extends URLClassLoader
InstanceClassLoader create(int id, URL[] urls, ClassLoader sharedClassLoader);
}
+ private volatile boolean isClosed = false;
private final URL[] urls;
private final int generation; // used to help debug class loader leaks, by helping determine which classloaders should have been collected
+ private final int id;
private final ClassLoader sharedClassLoader;
- InstanceClassLoader(int generation, URL[] urls, ClassLoader sharedClassLoader)
+ InstanceClassLoader(int generation, int id, URL[] urls, ClassLoader sharedClassLoader)
{
super(urls, null);
this.urls = urls;
this.sharedClassLoader = sharedClassLoader;
this.generation = generation;
+ this.id = id;
}
@Override
@@ -88,6 +92,9 @@ public class InstanceClassLoader extends URLClassLoader
Class<?> loadClassInternal(String name) throws ClassNotFoundException
{
+ if (isClosed)
+ throw new IllegalStateException(String.format("Can't load %s. Instance class loader is already closed.", name));
+
synchronized (getClassLoadingLock(name))
{
// First, check if the class has already been loaded
@@ -112,7 +119,14 @@ public class InstanceClassLoader extends URLClassLoader
{
return "InstanceClassLoader{" +
"generation=" + generation +
+ ", id = " + id +
", urls=" + Arrays.toString(urls) +
'}';
}
+
+ public void close() throws IOException
+ {
+ isClosed = true;
+ super.close();
+ }
}
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
index 88299d6..97e1a18 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
@@ -58,18 +58,23 @@ public class InstanceConfig implements IInstanceConfig
{
if (broadcastAddressAndPort == null)
{
- try
- {
- broadcastAddressAndPort = InetAddressAndPort.getByNameOverrideDefaults(getString("broadcast_address"), getInt("storage_port"));
- }
- catch (UnknownHostException e)
- {
- throw new IllegalStateException(e);
- }
+ broadcastAddressAndPort = getAddressAndPortFromConfig("broadcast_address", "storage_port");
}
return broadcastAddressAndPort;
}
+ private InetAddressAndPort getAddressAndPortFromConfig(String addressProp, String portProp)
+ {
+ try
+ {
+ return InetAddressAndPort.getByNameOverrideDefaults(getString(addressProp), getInt(portProp));
+ }
+ catch (UnknownHostException e)
+ {
+ throw new IllegalStateException(e);
+ }
+ }
+
private InstanceConfig(int num,
NetworkTopology networkTopology,
String broadcast_address,
@@ -97,6 +102,7 @@ public class InstanceConfig implements IInstanceConfig
// .set("cdc_directory", cdc_directory)
.set("initial_token", initial_token)
.set("partitioner", "org.apache.cassandra.dht.Murmur3Partitioner")
+ .set("start_native_transport", true)
.set("concurrent_writes", 2)
.set("concurrent_counter_writes", 2)
// .set("concurrent_materialized_view_writes", 2)
@@ -109,6 +115,11 @@ public class InstanceConfig implements IInstanceConfig
.set("endpoint_snitch", DistributedTestSnitch.class.getName())
.set("seed_provider", new ParameterizedClass(SimpleSeedProvider.class.getName(),
Collections.singletonMap("seeds", "127.0.0.1")))
+ .set("auto_bootstrap", false)
+ // capacities that are based on `totalMemory` that should be fixed size
+ .set("index_summary_capacity_in_mb", 50l)
+ .set("counter_cache_size_in_mb", 50l)
+ .set("key_cache_size_in_mb", 50l)
// legacy parameters
.forceSet("commitlog_sync_batch_window_in_ms", 1.0);
this.featureFlags = EnumSet.noneOf(Feature.class);
@@ -130,6 +141,13 @@ public class InstanceConfig implements IInstanceConfig
return this;
}
+ public InstanceConfig with(Feature... flags)
+ {
+ for (Feature flag : flags)
+ featureFlags.add(flag);
+ return this;
+ }
+
public boolean has(Feature featureFlag)
{
return featureFlags.contains(featureFlag);
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Listen.java b/test/distributed/org/apache/cassandra/distributed/impl/Listen.java
index cf208a1..27ae156 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Listen.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Listen.java
@@ -18,15 +18,18 @@
package org.apache.cassandra.distributed.impl;
-import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.LockSupport;
+import java.util.function.Supplier;
+
import org.apache.cassandra.distributed.api.IListen;
+import org.apache.cassandra.gms.Gossiper;
public class Listen implements IListen
{
final Instance instance;
+
public Listen(Instance instance)
{
this.instance = instance;
@@ -34,9 +37,18 @@ public class Listen implements IListen
public Cancel schema(Runnable onChange)
{
- final AtomicBoolean cancel = new AtomicBoolean();
+ return start(onChange, instance::schemaVersion);
+ }
+
+ public Cancel liveMembers(Runnable onChange)
+ {
+ return start(onChange, instance::liveMemberCount);
+ }
+
+ protected <T> Cancel start(Runnable onChange, Supplier<T> valueSupplier) {
+ AtomicBoolean cancel = new AtomicBoolean(false);
instance.isolatedExecutor.execute(() -> {
- UUID prev = instance.schemaVersion();
+ T prev = valueSupplier.get();
while (true)
{
if (cancel.get())
@@ -44,7 +56,7 @@ public class Listen implements IListen
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(10L));
- UUID cur = instance.schemaVersion();
+ T cur = valueSupplier.get();
if (!prev.equals(cur))
onChange.run();
prev = cur;
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/RowUtil.java b/test/distributed/org/apache/cassandra/distributed/impl/RowUtil.java
index d84c4a9..ded3708 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/RowUtil.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/RowUtil.java
@@ -24,6 +24,8 @@ import java.util.List;
import com.google.common.collect.Iterators;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
import org.apache.cassandra.cql3.ColumnSpecification;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.transport.messages.ResultMessage;
@@ -64,4 +66,19 @@ public class RowUtil
return objectRow;
});
}
+
+ public static Iterator<Object[]> toObjects(ResultSet rs)
+ {
+ return Iterators.transform(rs.iterator(), (Row row) -> {
+ final int numColumns = rs.getColumnDefinitions().size();
+ Object[] objectRow = new Object[numColumns];
+ for (int i = 0; i < numColumns; i++)
+ {
+ objectRow[i] = row.getObject(i);
+ }
+ return objectRow;
+ });
+ }
+
+
}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java b/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java
index 757c17f..745e1ab 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java
@@ -28,8 +28,10 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
+import com.datastax.driver.core.ResultSet;
import org.apache.cassandra.distributed.impl.AbstractCluster;
import org.apache.cassandra.distributed.impl.IsolatedExecutor;
+import org.apache.cassandra.distributed.impl.RowUtil;
public class DistributedTestBase
{
@@ -81,6 +83,11 @@ public class DistributedTestBase
return cluster;
}
+ public static void assertRows(ResultSet actual,Object[]... expected)
+ {
+ assertRows(RowUtil.toObjects(actual), expected);
+ }
+
public static void assertRows(Object[][] actual, Object[]... expected)
{
Assert.assertEquals(rowsNotEqualErrorMessage(actual, expected),
diff --git a/test/distributed/org/apache/cassandra/distributed/test/NativeProtocolTest.java b/test/distributed/org/apache/cassandra/distributed/test/NativeProtocolTest.java
new file mode 100644
index 0000000..15392b1
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/NativeProtocolTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.core.Statement;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.impl.RowUtil;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Iterator;
+
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+
+public class NativeProtocolTest extends DistributedTestBase
+{
+
+ @Test
+ public void withClientRequests() throws Throwable
+ {
+ try (Cluster ignored = init(Cluster.create(3,
+ config -> config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL))))
+ {
+ final com.datastax.driver.core.Cluster cluster = com.datastax.driver.core.Cluster.builder().addContactPoint("127.0.0.1").build();
+ Session session = cluster.connect();
+ session.execute("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck));");
+ session.execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) values (1,1,1);");
+ Statement select = new SimpleStatement("select * from " + KEYSPACE + ".tbl;").setConsistencyLevel(ConsistencyLevel.ALL);
+ final ResultSet resultSet = session.execute(select);
+ assertRows(resultSet, row(1, 1, 1));
+ Assert.assertEquals(3, cluster.getMetadata().getAllHosts().size());
+ session.close();
+ cluster.close();
+ }
+ }
+}
\ No newline at end of file
diff --git a/test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java b/test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java
index 55c700c..09f40e4 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java
@@ -44,6 +44,7 @@ import org.apache.cassandra.utils.SigarLibrary;
import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
/* Resource Leak Test - useful when tracking down issues with in-JVM framework cleanup.
* All objects referencing the InstanceClassLoader need to be garbage collected or
@@ -144,6 +145,7 @@ public class ResourceLeakTest extends DistributedTestBase
{
for (int loop = 0; loop < numTestLoops; loop++)
{
+ System.out.println(String.format("========== Starting loop %03d ========", loop));
try (Cluster cluster = Cluster.build(numClusterNodes).withConfig(updater).start())
{
if (cluster.get(1).config().has(GOSSIP)) // Wait for gossip to settle on the seed node
@@ -170,6 +172,7 @@ public class ResourceLeakTest extends DistributedTestBase
System.runFinalization();
System.gc();
}
+ System.out.println(String.format("========== Completed loop %03d ========", loop));
}
}
@@ -198,4 +201,17 @@ public class ResourceLeakTest extends DistributedTestBase
}
dumpResources("final-gossip-network");
}
+
+ @Test
+ public void looperNativeTest() throws Throwable
+ {
+ doTest(2, config -> config.with(NATIVE_PROTOCOL));
+ if (forceCollection)
+ {
+ System.runFinalization();
+ System.gc();
+ Thread.sleep(finalWaitMillis);
+ }
+ dumpResources("final-native");
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org