You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2019/08/20 13:56:04 UTC

[cassandra] branch cassandra-3.11 updated (bb126c0 -> 3df63ed)

This is an automated email from the ASF dual-hosted git repository.

benedict pushed a change to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git.


    from bb126c0  Merge branch 'cassandra-3.0' into cassandra-3.11
     new 8dcaa12  Allow instance class loaders to be garbage collected for inJVM dtest
     new 51c0f6b  Merge branch 'cassandra-2.2' into cassandra-3.0
     new 3df63ed  Merge branch 'cassandra-3.0' into cassandra-3.11

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CHANGES.txt                                        |   1 +
 build.xml                                          |  18 +-
 .../apache/cassandra/batchlog/BatchlogManager.java |   7 +-
 .../cassandra/concurrent/InfiniteLoopExecutor.java |   2 +-
 .../cassandra/concurrent/ScheduledExecutors.java   |  11 +-
 .../cassandra/concurrent/SharedExecutorPool.java   |   4 +-
 .../apache/cassandra/concurrent/StageManager.java  |   8 +-
 .../org/apache/cassandra/db/ColumnFamilyStore.java |  18 +-
 .../apache/cassandra/db/commitlog/CommitLog.java   |   1 +
 src/java/org/apache/cassandra/gms/Gossiper.java    |   9 +
 .../org/apache/cassandra/hints/HintsCatalog.java   |   5 +-
 .../cassandra/index/SecondaryIndexManager.java     |  11 +-
 .../cassandra/io/sstable/IndexSummaryManager.java  |  11 ++
 .../cassandra/io/sstable/format/SSTableReader.java |   6 +-
 .../org/apache/cassandra/net/MessagingService.java |   2 +
 .../cassandra/net/OutboundTcpConnection.java       |   2 +-
 .../service/PendingRangeCalculatorService.java     |  11 +-
 .../apache/cassandra/service/StorageService.java   |  24 ++-
 .../cassandra/streaming/StreamCoordinator.java     |  13 ++
 .../apache/cassandra/streaming/StreamSession.java  |   9 +
 .../org/apache/cassandra/utils/ExecutorUtils.java  | 151 +++++++++++++++
 .../org/apache/cassandra/utils/concurrent/Ref.java |  14 +-
 .../apache/cassandra/utils/memory/BufferPool.java  |  10 +-
 .../cassandra/utils/memory/MemtablePool.java       |   8 +-
 test/conf/logback-dtest.xml                        |  18 +-
 .../org/apache/cassandra/distributed/Cluster.java  |  30 ++-
 .../cassandra/distributed/UpgradeableCluster.java  |  30 +--
 .../cassandra/distributed/api/IInstance.java       |   3 +-
 .../cassandra/distributed/api/IInstanceConfig.java |   1 +
 .../cassandra/distributed/api/IMessageFilters.java |   6 +-
 .../distributed/impl/AbstractCluster.java          | 154 +++++++++-------
 .../impl/DelegatingInvokableInstance.java          |   5 +-
 .../cassandra/distributed/impl/Instance.java       | 146 +++++++++++----
 .../distributed/impl/InstanceClassLoader.java      |   9 +-
 .../cassandra/distributed/impl/InstanceConfig.java |  28 ++-
 .../distributed/impl/IsolatedExecutor.java         |  47 ++++-
 .../cassandra/distributed/impl/MessageFilters.java |  31 +---
 .../distributed/test/DistributedTestBase.java      |  26 ++-
 .../cassandra/distributed/test/GossipTest.java     |  14 +-
 .../distributed/test/ResourceLeakTest.java         | 202 +++++++++++++++++++++
 .../cassandra/concurrent/SEPExecutorTest.java      |   3 +-
 41 files changed, 848 insertions(+), 261 deletions(-)
 create mode 100644 src/java/org/apache/cassandra/utils/ExecutorUtils.java
 create mode 100644 test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 01/01: Merge branch 'cassandra-3.0' into cassandra-3.11

Posted by be...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 3df63ed054ebaaf058fb6f93d149fca38eb687e1
Merge: bb126c0 51c0f6b
Author: Jon Meredith <jm...@gmail.com>
AuthorDate: Thu Aug 15 14:21:23 2019 -0600

    Merge branch 'cassandra-3.0' into cassandra-3.11

 CHANGES.txt                                        |   1 +
 build.xml                                          |  18 +-
 .../apache/cassandra/batchlog/BatchlogManager.java |   7 +-
 .../cassandra/concurrent/InfiniteLoopExecutor.java |   2 +-
 .../cassandra/concurrent/ScheduledExecutors.java   |  11 +-
 .../cassandra/concurrent/SharedExecutorPool.java   |   4 +-
 .../apache/cassandra/concurrent/StageManager.java  |   8 +-
 .../org/apache/cassandra/db/ColumnFamilyStore.java |  18 +-
 .../apache/cassandra/db/commitlog/CommitLog.java   |   1 +
 src/java/org/apache/cassandra/gms/Gossiper.java    |   9 +
 .../org/apache/cassandra/hints/HintsCatalog.java   |   5 +-
 .../cassandra/index/SecondaryIndexManager.java     |  11 +-
 .../cassandra/io/sstable/IndexSummaryManager.java  |  11 ++
 .../cassandra/io/sstable/format/SSTableReader.java |   6 +-
 .../org/apache/cassandra/net/MessagingService.java |   2 +
 .../cassandra/net/OutboundTcpConnection.java       |   2 +-
 .../service/PendingRangeCalculatorService.java     |  11 +-
 .../apache/cassandra/service/StorageService.java   |  24 ++-
 .../cassandra/streaming/StreamCoordinator.java     |  13 ++
 .../apache/cassandra/streaming/StreamSession.java  |   9 +
 .../org/apache/cassandra/utils/ExecutorUtils.java  | 151 +++++++++++++++
 .../org/apache/cassandra/utils/concurrent/Ref.java |  14 +-
 .../apache/cassandra/utils/memory/BufferPool.java  |  10 +-
 .../cassandra/utils/memory/MemtablePool.java       |   8 +-
 test/conf/logback-dtest.xml                        |  18 +-
 .../org/apache/cassandra/distributed/Cluster.java  |  30 ++-
 .../cassandra/distributed/UpgradeableCluster.java  |  30 +--
 .../cassandra/distributed/api/IInstance.java       |   3 +-
 .../cassandra/distributed/api/IInstanceConfig.java |   1 +
 .../cassandra/distributed/api/IMessageFilters.java |   6 +-
 .../distributed/impl/AbstractCluster.java          | 154 +++++++++-------
 .../impl/DelegatingInvokableInstance.java          |   5 +-
 .../cassandra/distributed/impl/Instance.java       | 146 +++++++++++----
 .../distributed/impl/InstanceClassLoader.java      |   9 +-
 .../cassandra/distributed/impl/InstanceConfig.java |  28 ++-
 .../distributed/impl/IsolatedExecutor.java         |  47 ++++-
 .../cassandra/distributed/impl/MessageFilters.java |  31 +---
 .../distributed/test/DistributedTestBase.java      |  26 ++-
 .../cassandra/distributed/test/GossipTest.java     |  14 +-
 .../distributed/test/ResourceLeakTest.java         | 202 +++++++++++++++++++++
 .../cassandra/concurrent/SEPExecutorTest.java      |   3 +-
 41 files changed, 848 insertions(+), 261 deletions(-)

diff --cc CHANGES.txt
index 617bcc4,e956796..772446d
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -37,12 -33,10 +37,13 @@@ Merged from 2.2
   * Refactor Circle CI configuration (CASSANDRA-14806)
   * Fixing invalid CQL in security documentation (CASSANDRA-15020)
   * Multi-version in-JVM dtests (CASSANDRA-14937)
+  * Allow instance class loaders to be garbage collected for inJVM dtest (CASSANDRA-15170)
  
  
 -3.0.18
 +3.11.4
 + * Make stop-server.bat wait for Cassandra to terminate (CASSANDRA-14829)
 + * Correct sstable sorting for garbagecollect and levelled compaction (CASSANDRA-14870)
 +Merged from 3.0:
   * Severe concurrency issues in STCS,DTCS,TWCS,TMD.Topology,TypeParser
   * Add a script to make running the cqlsh tests in cassandra repo easier (CASSANDRA-14951)
   * If SizeEstimatesRecorder misses a 'onDropTable' notification, the size_estimates table will never be cleared for that table. (CASSANDRA-14905)
diff --cc src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java
index 5e3e5cf,13d27a8..3da4569
--- a/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java
+++ b/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java
@@@ -48,12 -46,8 +51,8 @@@ public class ScheduledExecutor
      public static final DebuggableScheduledThreadPoolExecutor optionalTasks = new DebuggableScheduledThreadPoolExecutor("OptionalTasks");
  
      @VisibleForTesting
-     public static void shutdownAndWait() throws InterruptedException
+     public static void shutdownAndWait(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
      {
-         ExecutorService[] executors = new ExecutorService[] { scheduledFastTasks, scheduledTasks, nonPeriodicTasks, optionalTasks };
-         for (ExecutorService executor : executors)
-             executor.shutdownNow();
-         for (ExecutorService executor : executors)
-             executor.awaitTermination(60, TimeUnit.SECONDS);
 -        ExecutorUtils.shutdownNowAndWait(timeout, unit, scheduledTasks, nonPeriodicTasks, optionalTasks);
++        ExecutorUtils.shutdownNowAndWait(timeout, unit, scheduledFastTasks, scheduledTasks, nonPeriodicTasks, optionalTasks);
      }
  }
diff --cc src/java/org/apache/cassandra/concurrent/StageManager.java
index 7f59b4b,457e801..857c5b7
--- a/src/java/org/apache/cassandra/concurrent/StageManager.java
+++ b/src/java/org/apache/cassandra/concurrent/StageManager.java
@@@ -113,13 -114,18 +114,10 @@@ public class StageManage
          }
      }
  
 -    public final static Runnable NO_OP_TASK = new Runnable()
 -    {
 -        public void run()
 -        {
 -
 -        }
 -    };
 -
      @VisibleForTesting
-     public static void shutdownAndWait() throws InterruptedException
+     public static void shutdownAndWait(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
      {
-         for (Stage stage : Stage.values())
-             StageManager.stages.get(stage).shutdownNow();
-         for (Stage stage : Stage.values())
-             StageManager.stages.get(stage).awaitTermination(60, TimeUnit.SECONDS);
+         ExecutorUtils.shutdownNowAndWait(timeout, unit, StageManager.stages.values());
      }
  
      /**
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 41b5e73,c5e81f0..89fb271
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -264,20 -230,12 +266,14 @@@ public class ColumnFamilyStore implemen
          postFlushExecutor.awaitTermination(60, TimeUnit.SECONDS);
      }
  
-     public static void shutdownReclaimExecutor() throws InterruptedException
+     public static void shutdownExecutorsAndWait(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
      {
-         reclaimExecutor.shutdown();
-         reclaimExecutor.awaitTermination(60, TimeUnit.SECONDS);
-     }
- 
-     public static void shutdownPerDiskFlushExecutors() throws InterruptedException
-     {
-         for (ExecutorService executorService : perDiskflushExecutors)
-             executorService.shutdown();
-         for (ExecutorService executorService : perDiskflushExecutors)
-             executorService.awaitTermination(60, TimeUnit.SECONDS);
 -        ExecutorUtils.shutdownAndWait(timeout, unit, reclaimExecutor, postFlushExecutor, flushExecutor);
++        List<ExecutorService> executors = new ArrayList<>(perDiskflushExecutors.length + 3);
++        Collections.addAll(executors, reclaimExecutor, postFlushExecutor, flushExecutor);
++        Collections.addAll(executors, perDiskflushExecutors);
++        ExecutorUtils.shutdownAndWait(timeout, unit, executors);
      }
  
 -
      public void reload()
      {
          // metadata object has been mutated directly. make all the members jibe with new settings.
diff --cc src/java/org/apache/cassandra/gms/Gossiper.java
index 6a862e5,e9e0648..a205140
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@@ -53,9 -51,10 +54,11 @@@ import org.apache.cassandra.net.Message
  import org.apache.cassandra.net.MessageOut;
  import org.apache.cassandra.net.MessagingService;
  import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.utils.CassandraVersion;
  import org.apache.cassandra.utils.FBUtilities;
  import org.apache.cassandra.utils.JVMStabilityInspector;
+ import static org.apache.cassandra.utils.ExecutorUtils.awaitTermination;
+ import static org.apache.cassandra.utils.ExecutorUtils.shutdown;
  
  /**
   * This module is responsible for Gossiping information for the local endpoint. This abstraction
@@@ -1749,63 -1653,10 +1752,69 @@@ public class Gossiper implements IFailu
          return System.currentTimeMillis() + Gossiper.aVeryLongTime;
      }
  
 +    @Nullable
 +    public CassandraVersion getReleaseVersion(InetAddress ep)
 +    {
 +        EndpointState state = getEndpointStateForEndpoint(ep);
 +        return state != null ? state.getReleaseVersion() : null;
 +    }
 +
 +    @Nullable
 +    public UUID getSchemaVersion(InetAddress ep)
 +    {
 +        EndpointState state = getEndpointStateForEndpoint(ep);
 +        return state != null ? state.getSchemaVersion() : null;
 +    }
 +
 +    public static void waitToSettle()
 +    {
 +        int forceAfter = Integer.getInteger("cassandra.skip_wait_for_gossip_to_settle", -1);
 +        if (forceAfter == 0)
 +        {
 +            return;
 +        }
 +        final int GOSSIP_SETTLE_MIN_WAIT_MS = 5000;
 +        final int GOSSIP_SETTLE_POLL_INTERVAL_MS = 1000;
 +        final int GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED = 3;
 +
 +        logger.info("Waiting for gossip to settle...");
 +        Uninterruptibles.sleepUninterruptibly(GOSSIP_SETTLE_MIN_WAIT_MS, TimeUnit.MILLISECONDS);
 +        int totalPolls = 0;
 +        int numOkay = 0;
 +        int epSize = Gossiper.instance.getEndpointStates().size();
 +        while (numOkay < GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED)
 +        {
 +            Uninterruptibles.sleepUninterruptibly(GOSSIP_SETTLE_POLL_INTERVAL_MS, TimeUnit.MILLISECONDS);
 +            int currentSize = Gossiper.instance.getEndpointStates().size();
 +            totalPolls++;
 +            if (currentSize == epSize)
 +            {
 +                logger.debug("Gossip looks settled.");
 +                numOkay++;
 +            }
 +            else
 +            {
 +                logger.info("Gossip not settled after {} polls.", totalPolls);
 +                numOkay = 0;
 +            }
 +            epSize = currentSize;
 +            if (forceAfter > 0 && totalPolls > forceAfter)
 +            {
 +                logger.warn("Gossip not settled but startup forced by cassandra.skip_wait_for_gossip_to_settle. Gossip total polls: {}",
 +                            totalPolls);
 +                break;
 +            }
 +        }
 +        if (totalPolls > GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED)
 +            logger.info("Gossip settled after {} extra polls; proceeding", totalPolls - GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED);
 +        else
 +            logger.info("No gossip backlog; proceeding");
 +    }
 +
+     @VisibleForTesting
+     public void stopShutdownAndWait(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
+     {
+         stop();
+         ExecutorUtils.shutdownAndWait(timeout, unit, executor);
+     }
  }
diff --cc src/java/org/apache/cassandra/index/SecondaryIndexManager.java
index bd52bca,d66a18b..c603404
--- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
@@@ -65,6 -65,10 +65,9 @@@ import org.apache.cassandra.utils.FBUti
  import org.apache.cassandra.utils.concurrent.OpOrder;
  import org.apache.cassandra.utils.concurrent.Refs;
  
+ import static org.apache.cassandra.utils.ExecutorUtils.awaitTermination;
+ import static org.apache.cassandra.utils.ExecutorUtils.shutdown;
 -import static org.apache.cassandra.utils.ExecutorUtils.shutdownNow;
+ 
  /**
   * Handles the core maintenance functionality associated with indexes: adding/removing them to or from
   * a table, (re)building during bootstrap or other streaming operations, flushing, reloading metadata
diff --cc src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index 6866ff9,c094e0b..06f0072
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@@ -2371,10 -2339,9 +2371,10 @@@ public abstract class SSTableReader ext
  
      }
  
-     public static void shutdownBlocking() throws InterruptedException
+     public static void shutdownBlocking(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
      {
-         syncExecutor.shutdownNow();
-         syncExecutor.awaitTermination(0, TimeUnit.SECONDS);
++
+         ExecutorUtils.shutdownNowAndWait(timeout, unit, syncExecutor);
          resetTidying();
      }
  }
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index c340db6,d349b4b..000c2fb
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -18,7 -18,7 +18,6 @@@
  package org.apache.cassandra.service;
  
  import java.io.*;
--import java.lang.management.ManagementFactory;
  import java.net.InetAddress;
  import java.net.UnknownHostException;
  import java.nio.ByteBuffer;
@@@ -5284,6 -4837,15 +5293,15 @@@ public class StorageService extends Not
      public void setHintedHandoffThrottleInKB(int throttleInKB)
      {
          DatabaseDescriptor.setHintedHandoffThrottleInKB(throttleInKB);
 -        logger.info(String.format("Updated hinted_handoff_throttle_in_kb to %d", throttleInKB));
 +        logger.info("Updated hinted_handoff_throttle_in_kb to {}", throttleInKB);
      }
+ 
+     @VisibleForTesting
+     public void shutdownServer()
+     {
+         if (drainOnShutdown != null)
+         {
+             Runtime.getRuntime().removeShutdownHook(drainOnShutdown);
+         }
+     }
  }
diff --cc src/java/org/apache/cassandra/streaming/StreamSession.java
index a426207,c79a711..b405185
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@@ -47,7 -46,8 +47,8 @@@ import org.apache.cassandra.gms.*
  import org.apache.cassandra.metrics.StreamingMetrics;
  import org.apache.cassandra.service.ActiveRepairService;
  import org.apache.cassandra.streaming.messages.*;
 +import org.apache.cassandra.utils.CassandraVersion;
++import org.apache.cassandra.utils.ExecutorUtils;
  import org.apache.cassandra.utils.FBUtilities;
  import org.apache.cassandra.utils.JVMStabilityInspector;
  import org.apache.cassandra.utils.Pair;
@@@ -810,31 -743,4 +811,39 @@@ public class StreamSession implements I
                  taskCompleted(task); // there is no file to send
          }
      }
 +
 +    class KeepAliveTask implements Runnable
 +    {
 +        private KeepAliveMessage last = null;
 +
 +        public void run()
 +        {
 +            //to avoid jamming the message queue, we only send if the last one was sent
 +            if (last == null || last.wasSent())
 +            {
 +                logger.trace("[Stream #{}] Sending keep-alive to {}.", planId(), peer);
 +                last = new KeepAliveMessage();
 +                try
 +                {
 +                    handler.sendMessage(last);
 +                }
 +                catch (RuntimeException e) //connection handler is closed
 +                {
 +                    logger.debug("[Stream #{}] Could not send keep-alive message (perhaps stream session is finished?).", planId(), e);
 +                }
 +            }
 +            else
 +            {
 +                logger.trace("[Stream #{}] Skip sending keep-alive to {} (previous was not yet sent).", planId(), peer);
 +            }
 +        }
 +    }
++
++    @VisibleForTesting
++    public static void shutdownAndWait(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
++    {
++        List<ExecutorService> executors = ImmutableList.of(keepAliveExecutor);
++        ExecutorUtils.shutdownNow(executors);
++        ExecutorUtils.awaitTermination(timeout, unit, executors);
++    }
  }
diff --cc src/java/org/apache/cassandra/utils/memory/BufferPool.java
index c8ad078,d0cea0f..e91c9e2
--- a/src/java/org/apache/cassandra/utils/memory/BufferPool.java
+++ b/src/java/org/apache/cassandra/utils/memory/BufferPool.java
@@@ -21,7 -21,7 +21,8 @@@ package org.apache.cassandra.utils.memo
  import java.lang.ref.PhantomReference;
  import java.lang.ref.ReferenceQueue;
  import java.nio.ByteBuffer;
 -import java.util.*;
++import java.util.Arrays;
 +import java.util.Queue;
  import java.util.concurrent.*;
  import java.util.concurrent.atomic.AtomicLong;
  import java.util.concurrent.atomic.AtomicLongFieldUpdater;
@@@ -31,15 -36,13 +32,18 @@@ import org.apache.cassandra.concurrent.
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
 +import io.netty.util.concurrent.FastThreadLocal;
  import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.io.compress.BufferType;
 +import org.apache.cassandra.io.util.FileUtils;
  import org.apache.cassandra.metrics.BufferPoolMetrics;
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.NoSpamLogger;
  import org.apache.cassandra.utils.concurrent.Ref;
  
+ import static org.apache.cassandra.utils.ExecutorUtils.awaitTermination;
+ import static org.apache.cassandra.utils.ExecutorUtils.shutdownNow;
+ 
  /**
   * A pool of ByteBuffers that can be recycled.
   */
diff --cc src/java/org/apache/cassandra/utils/memory/MemtablePool.java
index 50b6f10,8061566..bd17f78
--- a/src/java/org/apache/cassandra/utils/memory/MemtablePool.java
+++ b/src/java/org/apache/cassandra/utils/memory/MemtablePool.java
@@@ -66,13 -68,14 +68,13 @@@ public abstract class MemtablePoo
          return cleaner == null ? null : new MemtableCleanerThread<>(this, cleaner);
      }
  
 -    public abstract boolean needToCopyOnHeap();
 -
      @VisibleForTesting
-     public void shutdown() throws InterruptedException
+     public void shutdownAndWait(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
      {
-         cleaner.shutdown();
-         cleaner.awaitTermination(60, TimeUnit.SECONDS);
+         ExecutorUtils.shutdownNowAndWait(timeout, unit, cleaner);
      }
  
++
      public abstract MemtableAllocator newAllocator();
  
      /**
diff --cc test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index f209d53,aea21e2..784eb3f
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@@ -55,11 -51,11 +52,12 @@@ import org.apache.cassandra.db.Memtable
  import org.apache.cassandra.db.SystemKeyspace;
  import org.apache.cassandra.db.commitlog.CommitLog;
  import org.apache.cassandra.db.compaction.CompactionManager;
++import org.apache.cassandra.db.monitoring.ApproximateTime;
  import org.apache.cassandra.dht.IPartitioner;
  import org.apache.cassandra.dht.Token;
- import org.apache.cassandra.distributed.api.Feature;
  import org.apache.cassandra.distributed.api.ICluster;
  import org.apache.cassandra.distributed.api.ICoordinator;
+ import org.apache.cassandra.distributed.api.IInstance;
  import org.apache.cassandra.distributed.api.IInstanceConfig;
  import org.apache.cassandra.distributed.api.IListen;
  import org.apache.cassandra.distributed.api.IMessage;
@@@ -82,8 -79,10 +81,11 @@@ import org.apache.cassandra.service.Cli
  import org.apache.cassandra.service.PendingRangeCalculatorService;
  import org.apache.cassandra.service.QueryState;
  import org.apache.cassandra.service.StorageService;
+ import org.apache.cassandra.streaming.StreamCoordinator;
++import org.apache.cassandra.streaming.StreamSession;
  import org.apache.cassandra.transport.messages.ResultMessage;
  import org.apache.cassandra.utils.FBUtilities;
+ import org.apache.cassandra.utils.NanoTimeToCurrentTimeMillis;
  import org.apache.cassandra.utils.Throwables;
  import org.apache.cassandra.utils.concurrent.Ref;
  import org.apache.cassandra.utils.memory.BufferPool;
@@@ -233,14 -264,43 +267,32 @@@ public class Instance extends IsolatedE
          }
      }
  
-     public void receiveMessage(IMessage message)
+     public void receiveMessage(IMessage imessage)
      {
          sync(() -> {
-             try (DataInputBuffer in = new DataInputBuffer(message.bytes()))
+             // Based on org.apache.cassandra.net.IncomingTcpConnection.receiveMessage
+             try (DataInputBuffer input = new DataInputBuffer(imessage.bytes()))
              {
-                 MessageIn<?> messageIn = MessageIn.read(in, message.version(), message.id());
-                 Runnable deliver = new MessageDeliveryTask(messageIn, message.id());
-                 deliver.run();
+                 int version = imessage.version();
+ 
+                 MessagingService.validateMagic(input.readInt());
+                 int id;
+                 if (version < MessagingService.VERSION_20)
+                     id = Integer.parseInt(input.readUTF());
+                 else
+                     id = input.readInt();
 -
 -                long timestamp = System.currentTimeMillis();
 -                boolean isCrossNodeTimestamp = false;
 -                // make sure to readInt, even if cross_node_to is not enabled
 -                int partial = input.readInt();
 -                if (DatabaseDescriptor.hasCrossNodeTimeout())
 -                {
 -                    long crossNodeTimestamp = (timestamp & 0xFFFFFFFF00000000L) | (((partial & 0xFFFFFFFFL) << 2) >> 2);
 -                    isCrossNodeTimestamp = (timestamp != crossNodeTimestamp);
 -                    timestamp = crossNodeTimestamp;
 -                }
 -
 -                MessageIn message = MessageIn.read(input, version, id);
++                long currentTime = ApproximateTime.currentTimeMillis();
++                MessageIn message = MessageIn.read(input, version, id, MessageIn.readConstructionTime(imessage.from().address, input, currentTime));
+                 if (message == null)
+                 {
+                     // callback expired; nothing to do
+                     return;
+                 }
+                 if (version <= MessagingService.current_version)
+                 {
 -                    MessagingService.instance().receive(message, id, timestamp, isCrossNodeTimestamp);
++                    MessagingService.instance().receive(message, id);
+                 }
+                 // else ignore message
              }
              catch (Throwable t)
              {
@@@ -266,6 -326,8 +318,8 @@@
              try
              {
                  mkdirs();
+ 
 -                DatabaseDescriptor.setDaemonInitialized();
++                DatabaseDescriptor.daemonInitialization();
                  DatabaseDescriptor.createAllDirectories();
  
                  // We need to  persist this as soon as possible after startup checks.
@@@ -404,24 -475,34 +468,30 @@@
  
          Future<?> future = async((ExecutorService executor) -> {
              Throwable error = null;
+ 
+             if (config.has(GOSSIP) || config.has(NETWORK))
+             {
+                 StorageService.instance.shutdownServer();
 -
 -                error = parallelRun(error, executor,
 -                                    () -> NanoTimeToCurrentTimeMillis.shutdown(MINUTES.toMillis(1L))
 -                );
+             }
+ 
              error = parallelRun(error, executor,
-                     Gossiper.instance::stop,
-                     CompactionManager.instance::forceShutdown,
-                     BatchlogManager.instance::shutdown,
-                     HintsService.instance::shutdownBlocking,
-                     SecondaryIndexManager::shutdownExecutors,
-                     ColumnFamilyStore::shutdownFlushExecutor,
-                     ColumnFamilyStore::shutdownPostFlushExecutor,
-                     ColumnFamilyStore::shutdownReclaimExecutor,
-                     ColumnFamilyStore::shutdownPerDiskFlushExecutors,
-                     PendingRangeCalculatorService.instance::shutdownExecutor,
-                     BufferPool::shutdownLocalCleaner,
-                     Ref::shutdownReferenceReaper,
-                     Memtable.MEMORY_POOL::shutdown,
-                     ScheduledExecutors::shutdownAndWait,
-                     SSTableReader::shutdownBlocking
+                                 () -> Gossiper.instance.stopShutdownAndWait(1L, MINUTES),
+                                 CompactionManager.instance::forceShutdown,
+                                 () -> BatchlogManager.instance.shutdownAndWait(1L, MINUTES),
+                                 HintsService.instance::shutdownBlocking,
+                                 () -> StreamCoordinator.shutdownAndWait(1L, MINUTES),
++                                () -> StreamSession.shutdownAndWait(1L, MINUTES),
+                                 () -> SecondaryIndexManager.shutdownAndWait(1L, MINUTES),
+                                 () -> IndexSummaryManager.instance.shutdownAndWait(1L, MINUTES),
+                                 () -> ColumnFamilyStore.shutdownExecutorsAndWait(1L, MINUTES),
+                                 () -> PendingRangeCalculatorService.instance.shutdownExecutor(1L, MINUTES),
+                                 () -> BufferPool.shutdownLocalCleaner(1L, MINUTES),
 -                                () -> StorageService.instance.shutdownBGMonitorAndWait(1L, MINUTES),
+                                 () -> Ref.shutdownReferenceReaper(1L, MINUTES),
+                                 () -> Memtable.MEMORY_POOL.shutdownAndWait(1L, MINUTES),
+                                 () -> SSTableReader.shutdownBlocking(1L, MINUTES)
              );
              error = parallelRun(error, executor,
+                                 () -> ScheduledExecutors.shutdownAndWait(1L, MINUTES),
                                  MessagingService.instance()::shutdown
              );
              error = parallelRun(error, executor,
diff --cc test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java
index 0000000,55c700c..4bfbdc9
mode 000000,100644..100644
--- a/test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java
@@@ -1,0 -1,201 +1,202 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.distributed.test;
+ 
+ import java.io.File;
+ import java.io.IOException;
+ import java.lang.management.ManagementFactory;
+ import java.nio.file.FileSystems;
+ import java.nio.file.Path;
+ import java.sql.Date;
+ import java.text.SimpleDateFormat;
+ import java.time.Instant;
+ import java.util.List;
+ import java.util.function.Consumer;
+ import javax.management.MBeanServer;
+ 
+ import org.junit.Ignore;
+ import org.junit.Test;
+ 
+ import com.sun.management.HotSpotDiagnosticMXBean;
+ import org.apache.cassandra.db.ConsistencyLevel;
+ import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.distributed.Cluster;
+ import org.apache.cassandra.distributed.impl.InstanceConfig;
++import org.apache.cassandra.gms.Gossiper;
+ import org.apache.cassandra.service.CassandraDaemon;
+ import org.apache.cassandra.utils.FBUtilities;
+ import org.apache.cassandra.utils.SigarLibrary;
+ 
+ import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+ import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+ 
+ /* Resource Leak Test - useful when tracking down issues with in-JVM framework cleanup.
+  * All objects referencing the InstanceClassLoader need to be garbage collected or
+  * the JVM runs out of metaspace. This test also calls out to lsof to check which
+  * file handles are still opened.
+  *
+  * This is intended to be a burn type test where it is run outside of the test suites
+  * when a problem is detected (like OutOfMetaspace exceptions).
+  *
+  * Currently this test demonstrates that the InstanceClassLoader is cleaned up (load up
+  * the final hprof and check that the class loaders are not reachable from a GC root),
+  * but it shows that the file handles for Data/Index files are being leaked.
+  */
+ @Ignore
+ public class ResourceLeakTest extends DistributedTestBase
+ {
+     // Parameters to adjust while hunting for leaks
+     final int numTestLoops = 1;            // Set this value high to crash on leaks, or low when tracking down an issue.
+     final boolean dumpEveryLoop = false;   // Dump heap & possibly files every loop
+     final boolean dumpFileHandles = false; // Call lsof whenever dumping resources
+     final boolean forceCollection = false; // Whether to explicitly force finalization/gc for smaller heap dumps
+     final long finalWaitMillis = 0l;       // Number of millis to wait before final resource dump to give gc a chance
+ 
+     static final SimpleDateFormat format = new SimpleDateFormat("yyyyMMddHHmmss");
+     static final String when = format.format(Date.from(Instant.now()));
+ 
+     static String outputFilename(String base, String description, String extension)
+     {
+         Path p = FileSystems.getDefault().getPath("build", "test",
+                                                   String.join("-", when, base, description) + extension);
+         return p.toString();
+     }
+ 
+     /**
+      * Retrieves the process ID or <code>null</code> if the process ID cannot be retrieved.
+      * @return the process ID or <code>null</code> if the process ID cannot be retrieved.
+      *
+      * (Duplicated from HeapUtils to avoid refactoring older releases where this test is useful).
+      */
+     private static Long getProcessId()
+     {
+         // Once Java 9 is ready the process API should provide a better way to get the process ID.
+         long pid = SigarLibrary.instance.getPid();
+ 
+         if (pid >= 0)
+             return Long.valueOf(pid);
+ 
+         return getProcessIdFromJvmName();
+     }
+ 
+     /**
+      * Retrieves the process ID from the JVM name.
+      * @return the process ID or <code>null</code> if the process ID cannot be retrieved.
+      */
+     private static Long getProcessIdFromJvmName()
+     {
+         // the JVM name in Oracle JVMs is: '<pid>@<hostname>' but this might not be the case on all JVMs
+         String jvmName = ManagementFactory.getRuntimeMXBean().getName();
+         try
+         {
+             return Long.parseLong(jvmName.split("@")[0]);
+         }
+         catch (NumberFormatException e)
+         {
+             // ignore
+         }
+         return null;
+     }
+ 
+     static void dumpHeap(String description, boolean live) throws IOException
+     {
+         MBeanServer server = ManagementFactory.getPlatformMBeanServer();
+         HotSpotDiagnosticMXBean mxBean = ManagementFactory.newPlatformMXBeanProxy(
+         server, "com.sun.management:type=HotSpotDiagnostic", HotSpotDiagnosticMXBean.class);
+         mxBean.dumpHeap(outputFilename("heap", description, ".hprof"), live);
+     }
+ 
+     static void dumpOpenFiles(String description) throws IOException, InterruptedException
+     {
+         long pid = getProcessId();
+         ProcessBuilder map = new ProcessBuilder("/usr/sbin/lsof", "-p", Long.toString(pid));
+         File output = new File(outputFilename("lsof", description, ".txt"));
+         map.redirectOutput(output);
+         map.redirectErrorStream(true);
+         map.start().waitFor();
+     }
+ 
+     void dumpResources(String description) throws IOException, InterruptedException
+     {
+         dumpHeap(description, false);
+         if (dumpFileHandles)
+         {
+             dumpOpenFiles(description);
+         }
+     }
+ 
+     void doTest(int numClusterNodes, Consumer<InstanceConfig> updater) throws Throwable
+     {
+         for (int loop = 0; loop < numTestLoops; loop++)
+         {
+             try (Cluster cluster = Cluster.build(numClusterNodes).withConfig(updater).start())
+             {
+                 if (cluster.get(1).config().has(GOSSIP)) // Wait for gossip to settle on the seed node
 -                    cluster.get(1).runOnInstance(() -> CassandraDaemon.waitForGossipToSettle());
++                    cluster.get(1).runOnInstance(() -> Gossiper.waitToSettle());
+ 
+                 init(cluster);
+                 String tableName = "tbl" + loop;
+                 cluster.schemaChange("CREATE TABLE " + KEYSPACE + "." + tableName + " (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+                 cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + "." + tableName + "(pk,ck,v) VALUES (0,0,0)", ConsistencyLevel.ALL);
+                 cluster.get(1).callOnInstance(() -> FBUtilities.waitOnFutures(Keyspace.open(KEYSPACE).flush()));
+                 if (dumpEveryLoop)
+                 {
+                     dumpResources(String.format("loop%03d", loop));
+                 }
+             }
+             catch (Throwable tr)
+             {
+                 System.out.println("Dumping resources for exception: " + tr.getMessage());
+                 tr.printStackTrace();
+                 dumpResources("exception");
+             }
+             if (forceCollection)
+             {
+                 System.runFinalization();
+                 System.gc();
+             }
+         }
+     }
+ 
+     @Test
+     public void looperTest() throws Throwable
+     {
+         doTest(1, config -> {});
+         if (forceCollection)
+         {
+             System.runFinalization();
+             System.gc();
+             Thread.sleep(finalWaitMillis);
+         }
+         dumpResources("final");
+     }
+ 
+     @Test
+     public void looperGossipNetworkTest() throws Throwable
+     {
+         doTest(2, config -> config.with(GOSSIP).with(NETWORK));
+         if (forceCollection)
+         {
+             System.runFinalization();
+             System.gc();
+             Thread.sleep(finalWaitMillis);
+         }
+         dumpResources("final-gossip-network");
+     }
+ }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org