You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2019/01/09 14:05:52 UTC

[cassandra] branch trunk updated: In JVM dtests need to clean up after instance shutdown

This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d500562  In JVM dtests need to clean up after instance shutdown
d500562 is described below

commit d5005627b02b4e716947fa05a40473368017c0f9
Author: Joseph Lynch <jo...@gmail.com>
AuthorDate: Fri Dec 7 18:29:08 2018 -0800

    In JVM dtests need to clean up after instance shutdown
    
    Adds additional cleanup logic to ensure we don't leak classloaders and
    their associated objects when running the in JVM dtests.
    
    Patch by Joseph Lynch; reviewed by Alex Petrov for CASSANDRA-14922
---
 build.xml                                          |  7 +++++
 ide/idea/workspace.xml                             |  2 +-
 .../cassandra/concurrent/ScheduledExecutors.java   |  2 +-
 .../apache/cassandra/hints/HintsBufferPool.java    | 10 ++++++-
 .../org/apache/cassandra/hints/HintsService.java   |  1 +
 .../cassandra/io/sstable/format/SSTableReader.java |  9 ++++++
 .../org/apache/cassandra/net/MessagingService.java |  2 ++
 .../apache/cassandra/net/async/NettyFactory.java   |  2 +-
 .../cassandra/utils/NativeLibraryDarwin.java       |  4 ++-
 .../apache/cassandra/utils/NativeLibraryLinux.java |  4 ++-
 .../cassandra/utils/NativeLibraryWindows.java      |  4 ++-
 .../org/apache/cassandra/utils/concurrent/Ref.java |  5 ++++
 .../distributed/DistributedReadWritePathTest.java  | 11 ++++++-
 .../org/apache/cassandra/distributed/Instance.java | 34 +++++++++++-----------
 .../apache/cassandra/distributed/TestCluster.java  | 33 +++++++++++++++++----
 15 files changed, 100 insertions(+), 30 deletions(-)

diff --git a/build.xml b/build.xml
index 3973689..60b66fc 100644
--- a/build.xml
+++ b/build.xml
@@ -1300,6 +1300,13 @@
         <jvmarg value="-ea"/>
         <jvmarg value="-Dcassandra.debugrefcount=true"/>
         <jvmarg value="-Xss256k"/>
+        <!-- When we do classloader manipulation SoftReferences can cause memory leaks
+             that can OOM our test runs. The next two settings informs our GC
+             algorithm to limit the metaspace size and clean up SoftReferences
+             more aggressively rather than waiting. See CASSANDRA-14922 for more details.
+        -->
+        <jvmarg value="-XX:MaxMetaspaceSize=256M" />
+        <jvmarg value="-XX:SoftRefLRUPolicyMSPerMB=0" />
         <jvmarg value="-Dcassandra.memtable_row_overhead_computation_step=100"/>
         <jvmarg value="-Dcassandra.test.use_prepared=${cassandra.test.use_prepared}"/>
         <jvmarg value="-Dcassandra.test.offsetseed=@{poffset}"/>
diff --git a/ide/idea/workspace.xml b/ide/idea/workspace.xml
index a2dea2a..150f1a0 100644
--- a/ide/idea/workspace.xml
+++ b/ide/idea/workspace.xml
@@ -167,7 +167,7 @@
       <option name="MAIN_CLASS_NAME" value="" />
       <option name="METHOD_NAME" value="" />
       <option name="TEST_OBJECT" value="class" />
-      <option name="VM_PARAMETERS" value="-Dcassandra.config=file://$PROJECT_DIR$/test/conf/cassandra.yaml -Dlogback.configurationFile=file://$PROJECT_DIR$/test/conf/logback-test.xml -Dcassandra.logdir=$PROJECT_DIR$/build/test/logs -Djava.library.path=$PROJECT_DIR$/lib/sigar-bin -Dlegacy-sstable-root=$PROJECT_DIR$/test/data/legacy-sstables -Dinvalid-legacy-sstable-root=$PROJECT_DIR$/test/data/invalid-legacy-sstables -Dcassandra.ring_delay_ms=1000 -Dcassandra.skip_sync=true -ea" />
+      <option name="VM_PARAMETERS" value="-Dcassandra.config=file://$PROJECT_DIR$/test/conf/cassandra.yaml -Dlogback.configurationFile=file://$PROJECT_DIR$/test/conf/logback-test.xml -Dcassandra.logdir=$PROJECT_DIR$/build/test/logs -Djava.library.path=$PROJECT_DIR$/lib/sigar-bin -Dlegacy-sstable-root=$PROJECT_DIR$/test/data/legacy-sstables -Dinvalid-legacy-sstable-root=$PROJECT_DIR$/test/data/invalid-legacy-sstables -Dcassandra.ring_delay_ms=1000 -Dcassandra.skip_sync=true -ea -XX:MaxMet [...]
       <option name="PARAMETERS" value="" />
       <option name="WORKING_DIRECTORY" value="" />
       <option name="ENV_VARIABLES" />
diff --git a/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java b/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java
index e51e4c2..5e3e5cf 100644
--- a/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java
+++ b/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java
@@ -52,7 +52,7 @@ public class ScheduledExecutors
     {
         ExecutorService[] executors = new ExecutorService[] { scheduledFastTasks, scheduledTasks, nonPeriodicTasks, optionalTasks };
         for (ExecutorService executor : executors)
-            executor.shutdown();
+            executor.shutdownNow();
         for (ExecutorService executor : executors)
             executor.awaitTermination(60, TimeUnit.SECONDS);
     }
diff --git a/src/java/org/apache/cassandra/hints/HintsBufferPool.java b/src/java/org/apache/cassandra/hints/HintsBufferPool.java
index 25f9bc1..f705de1 100644
--- a/src/java/org/apache/cassandra/hints/HintsBufferPool.java
+++ b/src/java/org/apache/cassandra/hints/HintsBufferPool.java
@@ -17,18 +17,21 @@
  */
 package org.apache.cassandra.hints;
 
+import java.io.Closeable;
+import java.io.IOException;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.net.MessagingService;
+import sun.nio.ch.DirectBuffer;
 
 /**
  * A primitive pool of {@link HintsBuffer} buffers. Under normal conditions should only hold two buffers - the currently
  * written to one, and a reserve buffer to switch to when the first one is beyond capacity.
  */
-final class HintsBufferPool
+final class HintsBufferPool implements Closeable
 {
     interface FlushCallback
     {
@@ -129,4 +132,9 @@ final class HintsBufferPool
         allocatedBuffers++;
         return HintsBuffer.create(bufferSize);
     }
+
+    public void close()
+    {
+        currentBuffer.free();
+    }
 }
diff --git a/src/java/org/apache/cassandra/hints/HintsService.java b/src/java/org/apache/cassandra/hints/HintsService.java
index 1a352c2..1fd2d1a 100644
--- a/src/java/org/apache/cassandra/hints/HintsService.java
+++ b/src/java/org/apache/cassandra/hints/HintsService.java
@@ -256,6 +256,7 @@ public final class HintsService implements HintsServiceMBean
         writeExecutor.shutdownBlocking();
 
         HintsServiceDiagnostics.dispatchingShutdown(this);
+        bufferPool.close();
     }
 
     /**
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index cf14c3d..01e30d6 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -2475,4 +2475,13 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
         return reader;
     }
 
+    public static void shutdownBlocking() throws InterruptedException
+    {
+        if (syncExecutor != null)
+        {
+            syncExecutor.shutdownNow();
+            syncExecutor.awaitTermination(0, TimeUnit.SECONDS);
+        }
+        resetTidying();
+    }
 }
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 761e210..f5c064e 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -1142,6 +1142,8 @@ public final class MessagingService implements MessagingServiceMBean
 
             if (!isTest)
                 NettyFactory.instance.close();
+
+            clearMessageSinks();
         }
         catch (Exception e)
         {
diff --git a/src/java/org/apache/cassandra/net/async/NettyFactory.java b/src/java/org/apache/cassandra/net/async/NettyFactory.java
index 2366722..81de5d8 100644
--- a/src/java/org/apache/cassandra/net/async/NettyFactory.java
+++ b/src/java/org/apache/cassandra/net/async/NettyFactory.java
@@ -389,7 +389,7 @@ public final class NettyFactory
     {
         EventLoopGroup[] groups = new EventLoopGroup[] { acceptGroup, outboundGroup, inboundGroup, streamingGroup };
         for (EventLoopGroup group : groups)
-            group.shutdownGracefully();
+            group.shutdownGracefully(0, 2, TimeUnit.SECONDS);
         for (EventLoopGroup group : groups)
             group.awaitTermination(60, TimeUnit.SECONDS);
     }
diff --git a/src/java/org/apache/cassandra/utils/NativeLibraryDarwin.java b/src/java/org/apache/cassandra/utils/NativeLibraryDarwin.java
index d6f1a9e..6ed18d1 100644
--- a/src/java/org/apache/cassandra/utils/NativeLibraryDarwin.java
+++ b/src/java/org/apache/cassandra/utils/NativeLibraryDarwin.java
@@ -18,6 +18,8 @@
 
 package org.apache.cassandra.utils;
 
+import java.util.Collections;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,7 +52,7 @@ public class NativeLibraryDarwin implements NativeLibraryWrapper
     {
         try
         {
-            Native.register("c");
+            Native.register(com.sun.jna.NativeLibrary.getInstance("c", Collections.emptyMap()));
             available = true;
         }
         catch (NoClassDefFoundError e)
diff --git a/src/java/org/apache/cassandra/utils/NativeLibraryLinux.java b/src/java/org/apache/cassandra/utils/NativeLibraryLinux.java
index b6667e4..3f21d17 100644
--- a/src/java/org/apache/cassandra/utils/NativeLibraryLinux.java
+++ b/src/java/org/apache/cassandra/utils/NativeLibraryLinux.java
@@ -18,6 +18,8 @@
 
 package org.apache.cassandra.utils;
 
+import java.util.Collections;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,7 +52,7 @@ public class NativeLibraryLinux implements NativeLibraryWrapper
     {
         try
         {
-            Native.register("c");
+            Native.register(com.sun.jna.NativeLibrary.getInstance("c", Collections.emptyMap()));
             available = true;
         }
         catch (NoClassDefFoundError e)
diff --git a/src/java/org/apache/cassandra/utils/NativeLibraryWindows.java b/src/java/org/apache/cassandra/utils/NativeLibraryWindows.java
index e6e823c..b8304c7 100644
--- a/src/java/org/apache/cassandra/utils/NativeLibraryWindows.java
+++ b/src/java/org/apache/cassandra/utils/NativeLibraryWindows.java
@@ -18,6 +18,8 @@
 
 package org.apache.cassandra.utils;
 
+import java.util.Collections;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,7 +45,7 @@ public class NativeLibraryWindows implements NativeLibraryWrapper
     {
         try
         {
-            Native.register("kernel32");
+            Native.register(com.sun.jna.NativeLibrary.getInstance("kernel32", Collections.emptyMap()));
             available = true;
         }
         catch (NoClassDefFoundError e)
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Ref.java b/src/java/org/apache/cassandra/utils/concurrent/Ref.java
index 1a17a1f..3c1b7cc 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Ref.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Ref.java
@@ -709,5 +709,10 @@ public final class Ref<T> implements RefCounted<T>
     {
         EXEC.shutdown();
         EXEC.awaitTermination(60, TimeUnit.SECONDS);
+        if (STRONG_LEAK_DETECTOR != null)
+        {
+            STRONG_LEAK_DETECTOR.shutdownNow();
+            STRONG_LEAK_DETECTOR.awaitTermination(60, TimeUnit.SECONDS);
+        }
     }
 }
diff --git a/test/distributed/org/apache/cassandra/distributed/DistributedReadWritePathTest.java b/test/distributed/org/apache/cassandra/distributed/DistributedReadWritePathTest.java
index d03ef4f..04ea8b0 100644
--- a/test/distributed/org/apache/cassandra/distributed/DistributedReadWritePathTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/DistributedReadWritePathTest.java
@@ -28,7 +28,16 @@ import static org.apache.cassandra.net.MessagingService.Verb.READ_REPAIR;
 public class DistributedReadWritePathTest extends DistributedTestBase
 {
     @Test
-    public void coordinatorRead() throws Throwable
+    public void coordinatorReadTest() throws Throwable
+    {
+        for (int i = 0; i < 10; i++)
+        {
+            System.out.println(i);
+            coordinatorRead();
+        }
+    }
+
+    private void coordinatorRead() throws Throwable
     {
         try (TestCluster cluster = createCluster(3))
         {
diff --git a/test/distributed/org/apache/cassandra/distributed/Instance.java b/test/distributed/org/apache/cassandra/distributed/Instance.java
index f344411..c68b961 100644
--- a/test/distributed/org/apache/cassandra/distributed/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/Instance.java
@@ -29,6 +29,9 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
 
+import org.slf4j.LoggerFactory;
+
+import ch.qos.logback.classic.LoggerContext;
 import org.apache.cassandra.batchlog.BatchlogManager;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.concurrent.SharedExecutorPool;
@@ -53,6 +56,7 @@ import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.gms.VersionedValue;
 import org.apache.cassandra.hints.HintsService;
 import org.apache.cassandra.index.SecondaryIndexManager;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.locator.InetAddressAndPort;
@@ -64,6 +68,7 @@ import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.async.MessageInHandler;
+import org.apache.cassandra.net.async.NettyFactory;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.service.ActiveRepairService;
@@ -229,6 +234,11 @@ public class Instance extends InvokableInstance
             DatabaseDescriptor.createAllDirectories();
             Keyspace.setInitialized();
             SystemKeyspace.persistLocalMetadata();
+            // Even though we don't use MessagingService, access the static NettyFactory
+            // instance here so that we start the static event loop state
+            // (e.g. acceptGroup, inboundGroup, outboundGroup, etc ...). We can remove this
+            // once we actually use the MessagingService to communicate between nodes
+            NettyFactory.instance.getClass();
         }).accept(config);
     }
 
@@ -330,10 +340,10 @@ public class Instance extends InvokableInstance
         runOnInstance(() -> {
             Throwable error = null;
             error = runAndMergeThrowable(error,
+                    CompactionManager.instance::forceShutdown,
                     BatchlogManager.instance::shutdown,
                     HintsService.instance::shutdownBlocking,
                     CommitLog.instance::shutdownBlocking,
-                    CompactionManager.instance::forceShutdown,
                     Gossiper.instance::stop,
                     SecondaryIndexManager::shutdownExecutors,
                     MessagingService.instance()::shutdown,
@@ -347,8 +357,12 @@ public class Instance extends InvokableInstance
                     StageManager::shutdownAndWait,
                     SharedExecutorPool.SHARED::shutdown,
                     Memtable.MEMORY_POOL::shutdown,
-                    ScheduledExecutors::shutdownAndWait);
+                    ScheduledExecutors::shutdownAndWait,
+                    SSTableReader::shutdownBlocking);
+
             error = shutdownAndWait(error, ActiveRepairService.repairCommandExecutor);
+            LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
+            loggerContext.stop();
             Throwables.maybeFail(error);
         });
     }
@@ -357,25 +371,11 @@ public class Instance extends InvokableInstance
     {
         return runAndMergeThrowable(existing, () -> {
             executor.shutdownNow();
-            executor.awaitTermination(5, TimeUnit.SECONDS);
+            executor.awaitTermination(20, TimeUnit.SECONDS);
             assert executor.isTerminated() && executor.isShutdown() : executor;
         });
     }
 
-    private static Throwable runAndMergeThrowable(Throwable existing, ThrowingRunnable runnable)
-    {
-        try
-        {
-            runnable.run();
-        }
-        catch (Throwable t)
-        {
-            return Throwables.merge(existing, t);
-        }
-
-        return existing;
-    }
-
     private static Throwable runAndMergeThrowable(Throwable existing, ThrowingRunnable ... runnables)
     {
         for (ThrowingRunnable runnable : runnables)
diff --git a/test/distributed/org/apache/cassandra/distributed/TestCluster.java b/test/distributed/org/apache/cassandra/distributed/TestCluster.java
index 2b979ee..cc6cf81 100644
--- a/test/distributed/org/apache/cassandra/distributed/TestCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/TestCluster.java
@@ -31,16 +31,21 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.function.Consumer;
 import java.util.function.IntFunction;
 import java.util.stream.Collectors;
 
 import com.google.common.collect.Sets;
 
+import io.netty.util.concurrent.FastThreadLocal;
+import io.netty.util.concurrent.FastThreadLocalThread;
+import io.netty.util.internal.InternalThreadLocalMap;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.ConsistencyLevel;
@@ -79,7 +84,7 @@ import org.apache.cassandra.utils.concurrent.SimpleCondition;
  */
 public class TestCluster implements AutoCloseable
 {
-    private static ExecutorService exec = Executors.newCachedThreadPool(new NamedThreadFactory("cluster-async-tasks"));
+    private final ExecutorService exec = Executors.newCachedThreadPool(new NamedThreadFactory("cluster-async-tasks"));
 
     private final File root;
     private final List<Instance> instances;
@@ -271,19 +276,37 @@ public class TestCluster implements AutoCloseable
     }
 
     @Override
-    public void close()
+    public void close() throws InterruptedException, TimeoutException, ExecutionException
     {
         List<Future<?>> futures = instances.stream()
                 .map(i -> exec.submit(i::shutdown))
                 .collect(Collectors.toList());
 
-//        withThreadLeakCheck(futures);
-
         // Make sure to only delete directory when threads are stopped
-        exec.submit(() -> {
+        Future combined = exec.submit(() -> {
             FBUtilities.waitOnFutures(futures);
             FileUtils.deleteRecursive(root);
         });
+
+        combined.get(60, TimeUnit.SECONDS);
+
+        exec.shutdownNow();
+        exec.awaitTermination(10, TimeUnit.SECONDS);
+
+        //withThreadLeakCheck(futures);
+        Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
+        for (Thread thread : threadSet)
+        {
+            if (thread instanceof FastThreadLocalThread)
+                ((FastThreadLocalThread)thread).setThreadLocalMap(null);
+        }
+
+        InternalThreadLocalMap.remove();
+        InternalThreadLocalMap.destroy();
+
+        FastThreadLocal.removeAll();
+        FastThreadLocal.destroy();
+        System.gc();
     }
 
     // We do not want this check to run every time until we fix problems with tread stops


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org