You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2019/11/11 14:56:14 UTC

[cassandra] branch cassandra-2.2 updated: Add client testing capabilities to in-jvm tests

This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch cassandra-2.2
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-2.2 by this push:
     new 50b7094  Add client testing capabilities to in-jvm tests
50b7094 is described below

commit 50b7094278241f389d3b0b49b02e893fd4322b12
Author: Doug Rohrer <dr...@apple.com>
AuthorDate: Mon Oct 14 13:42:35 2019 -0400

    Add client testing capabilities to in-jvm tests
    
    Patch by Doug Rohrer, reviewed by Alex Petrov for CASSANDRA-15347.
    
    Co-authored-by: Alex Petrov <ol...@gmail.com>
---
 .../apache/cassandra/service/CassandraDaemon.java  |  96 +++++++++----
 .../org/apache/cassandra/thrift/ThriftServer.java  |   5 +
 .../org/apache/cassandra/transport/Server.java     |  41 +++++-
 .../apache/cassandra/distributed/api/Feature.java  |   2 +-
 .../cassandra/distributed/api/IInstance.java       |   2 +
 .../apache/cassandra/distributed/api/IListen.java  |   2 +
 .../distributed/impl/AbstractCluster.java          | 158 +++++++++++++++++----
 .../cassandra/distributed/impl/Instance.java       |  21 +++
 .../distributed/impl/InstanceClassLoader.java      |  16 ++-
 .../cassandra/distributed/impl/InstanceConfig.java |  34 +++--
 .../apache/cassandra/distributed/impl/Listen.java  |  20 ++-
 .../apache/cassandra/distributed/impl/RowUtil.java |  17 +++
 .../distributed/test/DistributedTestBase.java      |   7 +
 .../distributed/test/NativeProtocolTest.java       |  59 ++++++++
 .../distributed/test/ResourceLeakTest.java         |  16 +++
 15 files changed, 424 insertions(+), 72 deletions(-)

diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 1380f43..c0ba38e 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -31,7 +31,6 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
-import javax.management.MBeanServer;
 import javax.management.ObjectName;
 import javax.management.StandardMBean;
 import javax.management.remote.JMXConnectorServer;
@@ -83,6 +82,13 @@ public class CassandraDaemon
     private static JMXConnectorServer jmxServer = null;
 
     private static final Logger logger;
+
+    @VisibleForTesting
+    public static CassandraDaemon getInstanceForTesting()
+    {
+        return instance;
+    }
+
     static {
         // Need to register metrics before instrumented appender is created(first access to LoggerFactory).
         SharedMetricRegistries.getOrCreate("logback-metrics").addListener(new MetricRegistryListener.Base()
@@ -357,13 +363,53 @@ public class CassandraDaemon
         int rpcPort = DatabaseDescriptor.getRpcPort();
         int listenBacklog = DatabaseDescriptor.getRpcListenBacklog();
         thriftServer = new ThriftServer(rpcAddr, rpcPort, listenBacklog);
+        initializeNativeTransport();
+
+        completeSetup();
+    }
 
+    public void initializeNativeTransport()
+    {
         // Native transport
         InetAddress nativeAddr = DatabaseDescriptor.getRpcAddress();
         int nativePort = DatabaseDescriptor.getNativeTransportPort();
         nativeServer = new org.apache.cassandra.transport.Server(nativeAddr, nativePort);
+    }
 
-        completeSetup();
+    public void startNativeTransport()
+    {
+        validateTransportsCanStart();
+
+        if (nativeServer == null)
+            throw new IllegalStateException("native transport should be set up before it can be started");
+
+        nativeServer.start();
+    }
+
+    private void validateTransportsCanStart()
+    {
+        // We only start transports if bootstrap has completed and we're not in survey mode, OR if we are in
+        // survey mode and streaming has completed but we're not using auth.
+        // OR if we have not joined the ring yet.
+        if (StorageService.instance.hasJoined())
+        {
+            if (StorageService.instance.isSurveyMode())
+            {
+                if (StorageService.instance.isBootstrapMode() || DatabaseDescriptor.getAuthenticator().requireAuthentication())
+                {
+                    throw new IllegalStateException("Not starting client transports in write_survey mode as it's bootstrapping or " +
+                                                    "auth is enabled");
+                }
+            }
+            else
+            {
+                if (!SystemKeyspace.bootstrapComplete())
+                {
+                    throw new IllegalStateException("Node is not yet bootstrapped completely. Use nodetool to check bootstrap" +
+                                                    " state and resume. For more, see `nodetool help bootstrap`");
+                }
+            }
+        }
     }
 
     /*
@@ -440,28 +486,15 @@ public class CassandraDaemon
      */
     public void start()
     {
-        // We only start transports if bootstrap has completed and we're not in survey mode, OR if we are in
-        // survey mode and streaming has completed but we're not using auth.
-        // OR if we have not joined the ring yet.
-        if (StorageService.instance.hasJoined())
+        try
         {
-            if (StorageService.instance.isSurveyMode())
-            {
-                if (StorageService.instance.isBootstrapMode() || DatabaseDescriptor.getAuthenticator().requireAuthentication())
-                {
-                    logger.info("Not starting client transports in write_survey mode as it's bootstrapping or " +
-                            "auth is enabled");
-                    return;
-                }
-            }
-            else
-            {
-                if (!SystemKeyspace.bootstrapComplete())
-                {
-                    logger.info("Not starting client transports as bootstrap has not completed");
-                    return;
-                }
-            }
+            validateTransportsCanStart();
+        }
+        catch (IllegalStateException isx)
+        {
+            // If there are any errors, we just log and return in this case
+            logger.info(isx.getMessage());
+            return;
         }
 
         String nativeFlag = System.getProperty("cassandra.start_native_transport");
@@ -510,6 +543,18 @@ public class CassandraDaemon
         }
     }
 
+    @VisibleForTesting
+    public void destroyNativeTransport() throws InterruptedException
+    {
+        // In 2.2, just stopping the server works. Future versions require `destroy` to be called
+        // so we maintain the name for consistency
+        if (nativeServer != null)
+        {
+            nativeServer.stopAndAwaitTermination();
+            nativeServer = null;
+        }
+    }
+
 
     /**
      * Clean up all resources obtained during the lifetime of the daemon. This
@@ -648,7 +693,7 @@ public class CassandraDaemon
             logger.info("No gossip backlog; proceeding");
     }
 
-    public static void stop(String[] args)
+    public static void stop(String[] args) throws InterruptedException
     {
         instance.deactivate();
     }
@@ -703,6 +748,9 @@ public class CassandraDaemon
          */
         public void stop();
 
+        @VisibleForTesting
+        public void stopAndAwaitTermination();
+
         /**
          * Returns whether the server is currently running.
          */
diff --git a/src/java/org/apache/cassandra/thrift/ThriftServer.java b/src/java/org/apache/cassandra/thrift/ThriftServer.java
index 44ec524..87dcd3e 100644
--- a/src/java/org/apache/cassandra/thrift/ThriftServer.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftServer.java
@@ -77,6 +77,11 @@ public class ThriftServer implements CassandraDaemon.Server
         }
     }
 
+    public void stopAndAwaitTermination()
+    {
+        stop();
+    }
+
     public boolean isRunning()
     {
         return server != null;
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index c91d37d..418f6f7 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -24,10 +24,12 @@ import java.net.UnknownHostException;
 import java.util.*;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -124,7 +126,14 @@ public class Server implements CassandraDaemon.Server
     public void stop()
     {
         if (isRunning.compareAndSet(true, false))
-            close();
+            close(false);
+    }
+
+    @VisibleForTesting
+    public void stopAndAwaitTermination()
+    {
+        if (isRunning.compareAndSet(true, false))
+            close(true);
     }
 
     public boolean isRunning()
@@ -206,15 +215,39 @@ public class Server implements CassandraDaemon.Server
 
     private void close()
     {
+        close(false);
+    }
+
+    private void closeAndAwait()
+    {
+        close(true);
+    }
+
+    private void close(boolean awaitTermination)
+    {
         // Close opened connections
         connectionTracker.closeAll();
         workerGroup.shutdownGracefully();
-        workerGroup = null;
-
         eventExecutorGroup.shutdown();
-        eventExecutorGroup = null;
+
         logger.info("Stop listening for CQL clients");
 
+        if (awaitTermination)
+        {
+            try
+            {
+                workerGroup.awaitTermination(1, TimeUnit.MINUTES);
+                eventExecutorGroup.awaitTermination(1, TimeUnit.MINUTES);
+            }
+            catch (InterruptedException e)
+            {
+                logger.error(e.getMessage());
+            }
+        }
+
+        workerGroup = null;
+        eventExecutorGroup = null;
+
         StorageService.instance.setRpcReady(false);
     }
 
diff --git a/test/distributed/org/apache/cassandra/distributed/api/Feature.java b/test/distributed/org/apache/cassandra/distributed/api/Feature.java
index a5c9316..b4ba036 100644
--- a/test/distributed/org/apache/cassandra/distributed/api/Feature.java
+++ b/test/distributed/org/apache/cassandra/distributed/api/Feature.java
@@ -20,5 +20,5 @@ package org.apache.cassandra.distributed.api;
 
 public enum Feature
 {
-    NETWORK, GOSSIP
+    NETWORK, GOSSIP, NATIVE_PROTOCOL
 }
diff --git a/test/distributed/org/apache/cassandra/distributed/api/IInstance.java b/test/distributed/org/apache/cassandra/distributed/api/IInstance.java
index 6a9e33a..e6c705c 100644
--- a/test/distributed/org/apache/cassandra/distributed/api/IInstance.java
+++ b/test/distributed/org/apache/cassandra/distributed/api/IInstance.java
@@ -41,6 +41,8 @@ public interface IInstance extends IIsolatedExecutor
     Future<Void> shutdown();
     Future<Void> shutdown(boolean graceful);
 
+    int liveMemberCount();
+
     // these methods are not for external use, but for simplicity we leave them public and on the normal IInstance interface
     void startup(ICluster cluster);
     void receiveMessage(IMessage message);
diff --git a/test/distributed/org/apache/cassandra/distributed/api/IListen.java b/test/distributed/org/apache/cassandra/distributed/api/IListen.java
index d3a80da..c2e8dd6 100644
--- a/test/distributed/org/apache/cassandra/distributed/api/IListen.java
+++ b/test/distributed/org/apache/cassandra/distributed/api/IListen.java
@@ -23,4 +23,6 @@ public interface IListen
     public interface Cancel { void cancel(); }
 
     Cancel schema(Runnable onChange);
+
+    Cancel liveMembers(Runnable onChange);
 }
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index f603497..0283457 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@ -43,9 +43,11 @@ import com.google.common.collect.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.distributed.api.Feature;
 import org.apache.cassandra.distributed.api.ICoordinator;
 import org.apache.cassandra.distributed.api.IInstance;
 import org.apache.cassandra.distributed.api.IInstanceConfig;
@@ -129,7 +131,7 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
 
         private IInvokableInstance newInstance(int generation)
         {
-            ClassLoader classLoader = new InstanceClassLoader(generation, version.classpath, sharedClassLoader);
+            ClassLoader classLoader = new InstanceClassLoader(generation, config.num, version.classpath, sharedClassLoader);
             return Instance.transferAdhoc((SerializableBiFunction<IInstanceConfig, ClassLoader, Instance>)Instance::new, classLoader)
                            .apply(config, classLoader);
         }
@@ -171,6 +173,14 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
             return future;
         }
 
+        public int liveMemberCount()
+        {
+            if (!isShutdown && delegate != null)
+                return delegate().liveMemberCount();
+
+            throw new IllegalStateException("Cannot get live member count on shutdown instance");
+        }
+
         @Override
         public void receiveMessage(IMessage message)
         {
@@ -250,16 +260,22 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
     }
 
     public void forEach(IIsolatedExecutor.SerializableRunnable runnable) { forEach(i -> i.sync(runnable)); }
-    public void forEach(Consumer<? super I> consumer) { instances.forEach(consumer); }
-    public void parallelForEach(IIsolatedExecutor.SerializableConsumer<? super I> consumer, long timeout, TimeUnit units)
+    public void forEach(Consumer<? super I> consumer) { forEach(instances, consumer); }
+    public void forEach(List<I> instancesForOp, Consumer<? super I> consumer) { instancesForOp.forEach(consumer); }
+
+    public void parallelForEach(IIsolatedExecutor.SerializableConsumer<? super I> consumer, long timeout, TimeUnit unit)
+    {
+            parallelForEach(instances, consumer, timeout, unit);
+    }
+
+    public void parallelForEach(List<I> instances, IIsolatedExecutor.SerializableConsumer<? super I> consumer, long timeout, TimeUnit unit)
     {
         FBUtilities.waitOnFutures(instances.stream()
                                            .map(i -> i.async(consumer).apply(i))
                                            .collect(Collectors.toList()),
-                                  timeout, units);
+                                  timeout, unit);
     }
 
-
     public IMessageFilters filters() { return filters; }
     public MessageFilters.Builder verbs(MessagingService.Verb ... verbs) { return filters.verbs(verbs); }
 
@@ -278,7 +294,7 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
             {
                 // execute the schema change
                 coordinator(1).execute(query, ConsistencyLevel.ALL);
-                monitor.waitForAgreement();
+                monitor.waitForCompletion();
             }
         }).run();
     }
@@ -301,33 +317,26 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
         }
     }
 
-    /**
-     * Will wait for a schema change AND agreement that occurs after it is created
-     * (and precedes the invocation to waitForAgreement)
-     *
-     * Works by simply checking if all UUIDs agree after any schema version change event,
-     * so long as the waitForAgreement method has been entered (indicating the change has
-     * taken place on the coordinator)
-     *
-     * This could perhaps be made a little more robust, but this should more than suffice.
-     */
-    public class SchemaChangeMonitor implements AutoCloseable
+    public abstract class ChangeMonitor implements AutoCloseable
     {
         final List<IListen.Cancel> cleanup;
-        volatile boolean schemaHasChanged;
-        final SimpleCondition agreement = new SimpleCondition();
+        final SimpleCondition completed;
+        private final long timeOut;
+        private final TimeUnit timeoutUnit;
+        volatile boolean changed;
 
-        public SchemaChangeMonitor()
+        public ChangeMonitor(long timeOut, TimeUnit timeoutUnit)
         {
+            this.timeOut = timeOut;
+            this.timeoutUnit = timeoutUnit;
             this.cleanup = new ArrayList<>(instances.size());
-            for (IInstance instance : instances)
-                cleanup.add(instance.listen().schema(this::signal));
+            this.completed = new SimpleCondition();
         }
 
-        private void signal()
+        protected void signal()
         {
-            if (schemaHasChanged && 1 == instances.stream().map(IInstance::schemaVersion).distinct().count())
-                agreement.signalAll();
+            if (changed && isCompleted())
+                completed.signalAll();
         }
 
         @Override
@@ -337,20 +346,91 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
                 cancel.cancel();
         }
 
-        public void waitForAgreement()
+        public void waitForCompletion()
         {
-            schemaHasChanged = true;
+            startPolling();
+            changed = true;
             signal();
             try
             {
-                if (!agreement.await(1L, TimeUnit.MINUTES))
+                if (!completed.await(timeOut, timeoutUnit))
                     throw new InterruptedException();
             }
             catch (InterruptedException e)
             {
-                throw new IllegalStateException("Schema agreement not reached");
+                throw new IllegalStateException(getMonitorTimeoutMessage());
             }
         }
+
+        private void startPolling()
+        {
+            for (IInstance instance : instances)
+                cleanup.add(startPolling(instance));
+        }
+
+        protected abstract IListen.Cancel startPolling(IInstance instance);
+
+        protected abstract boolean isCompleted();
+
+        protected abstract String getMonitorTimeoutMessage();
+    }
+
+
+
+    /**
+     * Will wait for a schema change AND agreement that occurs after it is created
+     * (and precedes the invocation to waitForAgreement)
+     *
+     * Works by simply checking if all UUIDs agree after any schema version change event,
+     * so long as the waitForAgreement method has been entered (indicating the change has
+     * taken place on the coordinator)
+     *
+     * This could perhaps be made a little more robust, but this should more than suffice.
+     */
+    public class SchemaChangeMonitor extends ChangeMonitor
+    {
+        public SchemaChangeMonitor()
+        {
+            super(70, TimeUnit.SECONDS);
+        }
+
+        protected IListen.Cancel startPolling(IInstance instance)
+        {
+            return instance.listen().schema(this::signal);
+        }
+
+        protected boolean isCompleted()
+        {
+            return 1 == instances.stream().map(IInstance::schemaVersion).distinct().count();
+        }
+
+        protected String getMonitorTimeoutMessage()
+        {
+            return "Schema agreement not reached";
+        }
+    }
+
+    public class AllMembersAliveMonitor extends ChangeMonitor
+    {
+        public AllMembersAliveMonitor()
+        {
+            super(60, TimeUnit.SECONDS);
+        }
+
+        protected IListen.Cancel startPolling(IInstance instance)
+        {
+            return instance.listen().liveMembers(this::signal);
+        }
+
+        protected boolean isCompleted()
+        {
+            return instances.stream().allMatch(i -> !i.config().has(Feature.GOSSIP) || i.liveMemberCount() == instances.size());
+        }
+
+        protected String getMonitorTimeoutMessage()
+        {
+            return "Live member count did not converge across all instances";
+        }
     }
 
     public void schemaChange(String statement, int instance)
@@ -360,7 +440,25 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
 
     void startup()
     {
-        parallelForEach(I::startup, 0, null);
+        try (AllMembersAliveMonitor monitor = new AllMembersAliveMonitor())
+        {
+            // Start any instances with auto_bootstrap enabled first, and in series to avoid issues
+            // with multiple nodes bootstrapping with consistent range movement enabled,
+            // and then start any instances with it disabled in parallel.
+            List<I> startSequentially = new ArrayList<>();
+            List<I> startParallel = new ArrayList<>();
+            for (I instance : instances)
+            {
+                if ((boolean) instance.config().get("auto_bootstrap"))
+                    startSequentially.add(instance);
+                else
+                    startParallel.add(instance);
+            }
+
+            forEach(startSequentially, I::startup);
+            parallelForEach(startParallel, I::startup, 0, null);
+            monitor.waitForCompletion();
+        }
     }
 
     protected interface Factory<I extends IInstance, C extends AbstractCluster<I>>
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index d3f2955..c8613f7 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -73,6 +73,7 @@ import org.apache.cassandra.net.IMessageSink;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.CassandraDaemon;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.PendingRangeCalculatorService;
 import org.apache.cassandra.service.QueryState;
@@ -90,6 +91,7 @@ import org.apache.cassandra.utils.concurrent.Ref;
 import static java.util.concurrent.TimeUnit.MINUTES;
 import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
 import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
 
 public class Instance extends IsolatedExecutor implements IInvokableInstance
 {
@@ -426,6 +428,7 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
                 if (config.has(GOSSIP))
                 {
                     StorageService.instance.initServer();
+                    StorageService.instance.removeShutdownHook();
                 }
                 else
                 {
@@ -436,6 +439,12 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
 
                 SystemKeyspace.finishStartup();
 
+                if (config.has(NATIVE_PROTOCOL))
+                {
+                    CassandraDaemon.getInstanceForTesting().initializeNativeTransport();
+                    CassandraDaemon.getInstanceForTesting().startNativeTransport();
+                }
+
                 if (!FBUtilities.getBroadcastAddress().equals(broadcastAddressAndPort().address))
                     throw new IllegalStateException();
                 if (DatabaseDescriptor.getStoragePort() != broadcastAddressAndPort().port)
@@ -525,6 +534,8 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
         Future<?> future = async((ExecutorService executor) -> {
             Throwable error = null;
 
+            error = parallelRun(error, executor, CassandraDaemon.getInstanceForTesting()::destroyNativeTransport);
+
             if (config.has(GOSSIP) || config.has(NETWORK))
             {
                 StorageService.instance.shutdownServer();
@@ -565,6 +576,16 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
                                 .thenRun(super::shutdown);
     }
 
+    public int liveMemberCount()
+    {
+        return sync(() -> {
+            if (!DatabaseDescriptor.isDaemonInitialized() || !Gossiper.instance.isEnabled())
+                return 0;
+            return Gossiper.instance.getLiveMembers().size();
+        }).call();
+    }
+
+
     private static Throwable parallelRun(Throwable accumulate, ExecutorService runOn, ThrowingRunnable ... runnables)
     {
         List<Future<Throwable>> results = new ArrayList<>();
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/InstanceClassLoader.java b/test/distributed/org/apache/cassandra/distributed/impl/InstanceClassLoader.java
index ca6d713..fb78902 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceClassLoader.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceClassLoader.java
@@ -24,6 +24,7 @@ import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.SigarLibrary;
 
+import java.io.IOException;
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.Arrays;
@@ -65,16 +66,19 @@ public class InstanceClassLoader extends URLClassLoader
         InstanceClassLoader create(int id, URL[] urls, ClassLoader sharedClassLoader);
     }
 
+    private volatile boolean isClosed = false;
     private final URL[] urls;
     private final int generation; // used to help debug class loader leaks, by helping determine which classloaders should have been collected
+    private final int id;
     private final ClassLoader sharedClassLoader;
 
-    InstanceClassLoader(int generation, URL[] urls, ClassLoader sharedClassLoader)
+    InstanceClassLoader(int generation, int id, URL[] urls, ClassLoader sharedClassLoader)
     {
         super(urls, null);
         this.urls = urls;
         this.sharedClassLoader = sharedClassLoader;
         this.generation = generation;
+        this.id = id;
     }
 
     @Override
@@ -88,6 +92,9 @@ public class InstanceClassLoader extends URLClassLoader
 
     Class<?> loadClassInternal(String name) throws ClassNotFoundException
     {
+        if (isClosed)
+            throw new IllegalStateException(String.format("Can't load %s. Instance class loader is already closed.", name));
+
         synchronized (getClassLoadingLock(name))
         {
             // First, check if the class has already been loaded
@@ -112,7 +119,14 @@ public class InstanceClassLoader extends URLClassLoader
     {
         return "InstanceClassLoader{" +
                "generation=" + generation +
+               ", id = " + id +
                ", urls=" + Arrays.toString(urls) +
                '}';
     }
+
+    public void close() throws IOException
+    {
+        isClosed = true;
+        super.close();
+    }
 }
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
index 88299d6..97e1a18 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
@@ -58,18 +58,23 @@ public class InstanceConfig implements IInstanceConfig
     {
         if (broadcastAddressAndPort == null)
         {
-            try
-            {
-                broadcastAddressAndPort = InetAddressAndPort.getByNameOverrideDefaults(getString("broadcast_address"), getInt("storage_port"));
-            }
-            catch (UnknownHostException e)
-            {
-                throw new IllegalStateException(e);
-            }
+            broadcastAddressAndPort = getAddressAndPortFromConfig("broadcast_address", "storage_port");
         }
         return broadcastAddressAndPort;
     }
 
+    private InetAddressAndPort getAddressAndPortFromConfig(String addressProp, String portProp)
+    {
+        try
+        {
+            return InetAddressAndPort.getByNameOverrideDefaults(getString(addressProp), getInt(portProp));
+        }
+        catch (UnknownHostException e)
+        {
+            throw new IllegalStateException(e);
+        }
+    }
+
     private InstanceConfig(int num,
                            NetworkTopology networkTopology,
                            String broadcast_address,
@@ -97,6 +102,7 @@ public class InstanceConfig implements IInstanceConfig
 //                .set("cdc_directory", cdc_directory)
                 .set("initial_token", initial_token)
                 .set("partitioner", "org.apache.cassandra.dht.Murmur3Partitioner")
+                .set("start_native_transport", true)
                 .set("concurrent_writes", 2)
                 .set("concurrent_counter_writes", 2)
 //                .set("concurrent_materialized_view_writes", 2)
@@ -109,6 +115,11 @@ public class InstanceConfig implements IInstanceConfig
                 .set("endpoint_snitch", DistributedTestSnitch.class.getName())
                 .set("seed_provider", new ParameterizedClass(SimpleSeedProvider.class.getName(),
                         Collections.singletonMap("seeds", "127.0.0.1")))
+                .set("auto_bootstrap", false)
+                // capacities that are based on `totalMemory` that should be fixed size
+                .set("index_summary_capacity_in_mb", 50l)
+                .set("counter_cache_size_in_mb", 50l)
+                .set("key_cache_size_in_mb", 50l)
                 // legacy parameters
                 .forceSet("commitlog_sync_batch_window_in_ms", 1.0);
         this.featureFlags = EnumSet.noneOf(Feature.class);
@@ -130,6 +141,13 @@ public class InstanceConfig implements IInstanceConfig
         return this;
     }
 
+    public InstanceConfig with(Feature... flags)
+    {
+        for (Feature flag : flags)
+            featureFlags.add(flag);
+        return this;
+    }
+
     public boolean has(Feature featureFlag)
     {
         return featureFlags.contains(featureFlag);
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Listen.java b/test/distributed/org/apache/cassandra/distributed/impl/Listen.java
index cf208a1..27ae156 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Listen.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Listen.java
@@ -18,15 +18,18 @@
 
 package org.apache.cassandra.distributed.impl;
 
-import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.LockSupport;
+import java.util.function.Supplier;
+
 import org.apache.cassandra.distributed.api.IListen;
+import org.apache.cassandra.gms.Gossiper;
 
 public class Listen implements IListen
 {
     final Instance instance;
+
     public Listen(Instance instance)
     {
         this.instance = instance;
@@ -34,9 +37,18 @@ public class Listen implements IListen
 
     public Cancel schema(Runnable onChange)
     {
-        final AtomicBoolean cancel = new AtomicBoolean();
+        return start(onChange, instance::schemaVersion);
+    }
+
+    public Cancel liveMembers(Runnable onChange)
+    {
+        return start(onChange, instance::liveMemberCount);
+    }
+
+    protected <T> Cancel start(Runnable onChange, Supplier<T> valueSupplier) {
+        AtomicBoolean cancel = new AtomicBoolean(false);
         instance.isolatedExecutor.execute(() -> {
-            UUID prev = instance.schemaVersion();
+            T prev = valueSupplier.get();
             while (true)
             {
                 if (cancel.get())
@@ -44,7 +56,7 @@ public class Listen implements IListen
 
                 LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(10L));
 
-                UUID cur = instance.schemaVersion();
+                T cur = valueSupplier.get();
                 if (!prev.equals(cur))
                     onChange.run();
                 prev = cur;
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/RowUtil.java b/test/distributed/org/apache/cassandra/distributed/impl/RowUtil.java
index d84c4a9..ded3708 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/RowUtil.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/RowUtil.java
@@ -24,6 +24,8 @@ import java.util.List;
 
 import com.google.common.collect.Iterators;
 
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
 import org.apache.cassandra.cql3.ColumnSpecification;
 import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.transport.messages.ResultMessage;
@@ -64,4 +66,19 @@ public class RowUtil
                                        return objectRow;
                                    });
     }
+
+    public static Iterator<Object[]> toObjects(ResultSet rs)
+    {
+        return Iterators.transform(rs.iterator(), (Row row) -> {
+            final int numColumns = rs.getColumnDefinitions().size();
+            Object[] objectRow = new Object[numColumns];
+            for (int i = 0; i < numColumns; i++)
+            {
+                objectRow[i] = row.getObject(i);
+            }
+            return objectRow;
+        });
+    }
+
+
 }
diff --git a/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java b/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java
index 757c17f..745e1ab 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java
@@ -28,8 +28,10 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 
+import com.datastax.driver.core.ResultSet;
 import org.apache.cassandra.distributed.impl.AbstractCluster;
 import org.apache.cassandra.distributed.impl.IsolatedExecutor;
+import org.apache.cassandra.distributed.impl.RowUtil;
 
 public class DistributedTestBase
 {
@@ -81,6 +83,11 @@ public class DistributedTestBase
         return cluster;
     }
 
+    public static void assertRows(ResultSet actual,Object[]... expected)
+    {
+        assertRows(RowUtil.toObjects(actual), expected);
+    }
+
     public static void assertRows(Object[][] actual, Object[]... expected)
     {
         Assert.assertEquals(rowsNotEqualErrorMessage(actual, expected),
diff --git a/test/distributed/org/apache/cassandra/distributed/test/NativeProtocolTest.java b/test/distributed/org/apache/cassandra/distributed/test/NativeProtocolTest.java
new file mode 100644
index 0000000..15392b1
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/NativeProtocolTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.core.Statement;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.impl.RowUtil;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Iterator;
+
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+
+public class NativeProtocolTest extends DistributedTestBase
+{
+
+    @Test
+    public void withClientRequests() throws Throwable
+    {
+        try (Cluster ignored = init(Cluster.create(3,
+                                                   config -> config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL))))
+        {
+            final com.datastax.driver.core.Cluster cluster = com.datastax.driver.core.Cluster.builder().addContactPoint("127.0.0.1").build();
+            Session session = cluster.connect();
+            session.execute("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck));");
+            session.execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) values (1,1,1);");
+            Statement select = new SimpleStatement("select * from " + KEYSPACE + ".tbl;").setConsistencyLevel(ConsistencyLevel.ALL);
+            final ResultSet resultSet = session.execute(select);
+            assertRows(resultSet, row(1, 1, 1));
+            Assert.assertEquals(3, cluster.getMetadata().getAllHosts().size());
+            session.close();
+            cluster.close();
+        }
+    }
+}
\ No newline at end of file
diff --git a/test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java b/test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java
index 55c700c..09f40e4 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java
@@ -44,6 +44,7 @@ import org.apache.cassandra.utils.SigarLibrary;
 
 import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
 import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
 
 /* Resource Leak Test - useful when tracking down issues with in-JVM framework cleanup.
  * All objects referencing the InstanceClassLoader need to be garbage collected or
@@ -144,6 +145,7 @@ public class ResourceLeakTest extends DistributedTestBase
     {
         for (int loop = 0; loop < numTestLoops; loop++)
         {
+            System.out.println(String.format("========== Starting loop %03d ========", loop));
             try (Cluster cluster = Cluster.build(numClusterNodes).withConfig(updater).start())
             {
                 if (cluster.get(1).config().has(GOSSIP)) // Wait for gossip to settle on the seed node
@@ -170,6 +172,7 @@ public class ResourceLeakTest extends DistributedTestBase
                 System.runFinalization();
                 System.gc();
             }
+            System.out.println(String.format("========== Completed loop %03d ========", loop));
         }
     }
 
@@ -198,4 +201,17 @@ public class ResourceLeakTest extends DistributedTestBase
         }
         dumpResources("final-gossip-network");
     }
+
+    @Test
+    public void looperNativeTest() throws Throwable
+    {
+        doTest(2, config -> config.with(NATIVE_PROTOCOL));
+        if (forceCollection)
+        {
+            System.runFinalization();
+            System.gc();
+            Thread.sleep(finalWaitMillis);
+        }
+        dumpResources("final-native");
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org