You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/01/12 02:31:15 UTC

[2/3] activemq-artemis git commit: Dealing with expected IBM JDK thread and refactoring the Thread check as a Rule

Dealing with expected IBM JDK thread and refactoring the Thread check as a Rule


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/e56ca95f
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/e56ca95f
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/e56ca95f

Branch: refs/heads/master
Commit: e56ca95fdc9f813b339e6cb182a94f833161202f
Parents: 4e5ec13
Author: Clebert Suconic <cl...@apache.org>
Authored: Mon Jan 11 16:11:19 2016 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Jan 11 20:02:46 2016 -0500

----------------------------------------------------------------------
 .../artemis/tests/util/ActiveMQTestBase.java    | 130 +------------
 .../artemis/tests/util/ThreadLeakCheckRule.java | 189 +++++++++++++++++++
 ...MDBMultipleHandlersServerDisconnectTest.java |   2 +-
 .../broadcast/JGroupsBroadcastTest.java         |  12 +-
 4 files changed, 200 insertions(+), 133 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e56ca95f/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
index 4e0c35f..bd2a156 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
@@ -143,6 +143,9 @@ import org.junit.runner.Description;
  */
 public abstract class ActiveMQTestBase extends Assert {
 
+   @Rule
+   public ThreadLeakCheckRule leakCheckRule = new ThreadLeakCheckRule();
+
    public static final String TARGET_TMP = "./target/tmp";
    public static final String INVM_ACCEPTOR_FACTORY = InVMAcceptorFactory.class.getCanonicalName();
    public static final String INVM_CONNECTOR_FACTORY = InVMConnectorFactory.class.getCanonicalName();
@@ -173,10 +176,8 @@ public abstract class ActiveMQTestBase extends Assert {
    private final Collection<ActiveMQComponent> otherComponents = new HashSet<>();
    private final Set<ExecutorService> executorSet = new HashSet<>();
 
-   private boolean checkThread = true;
    private String testDir;
    private int sendMsgCount = 0;
-   private Map<Thread, StackTraceElement[]> previousThreads;
 
    @Rule
    public TestName name = new TestName();
@@ -306,46 +307,6 @@ public abstract class ActiveMQTestBase extends Assert {
             }
          }
 
-         if (checkThread) {
-            StringBuffer buffer = null;
-
-            boolean failed = true;
-
-            boolean failedOnce = false;
-
-            long timeout = System.currentTimeMillis() + 60000;
-            while (failed && timeout > System.currentTimeMillis()) {
-               buffer = new StringBuffer();
-
-               failed = checkThread(buffer);
-
-               if (failed) {
-                  failedOnce = true;
-                  forceGC();
-                  Thread.sleep(500);
-                  log.info("There are still threads running, trying again");
-                  System.out.println(buffer);
-               }
-            }
-
-            if (failed) {
-               logAndSystemOut("Thread leaked on test " + this.getClass().getName() + "::" + this.getName() + "\n" +
-                                  buffer);
-               logAndSystemOut("Thread leakage! Failure!!!");
-
-               fail("Thread leaked");
-            }
-            else if (failedOnce) {
-               System.out.println("******************** Threads cleared after retries ********************");
-               System.out.println();
-            }
-
-
-         }
-         else {
-            checkThread = true;
-         }
-
          if (Thread.currentThread().getContextClassLoader() == null) {
             Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
             fail("Thread Context ClassLoader was set to null at some point before this test. We will set to this.getClass().getClassLoader(), but you are supposed to fix your tests");
@@ -370,8 +331,6 @@ public abstract class ActiveMQTestBase extends Assert {
 
       // checkFreePort(TransportConstants.DEFAULT_PORT);
 
-      previousThreads = Thread.getAllStackTraces();
-
       logAndSystemOut("#test " + getName());
    }
 
@@ -401,7 +360,7 @@ public abstract class ActiveMQTestBase extends Assert {
    }
 
    protected void disableCheckThread() {
-      checkThread = false;
+      leakCheckRule.disable();
    }
 
    protected String getName() {
@@ -1960,87 +1919,6 @@ public abstract class ActiveMQTestBase extends Assert {
       }
    }
 
-   /**
-    * @param buffer
-    * @return
-    */
-   private boolean checkThread(StringBuffer buffer) {
-      boolean failedThread = false;
-
-      Map<Thread, StackTraceElement[]> postThreads = Thread.getAllStackTraces();
-
-      if (postThreads != null && previousThreads != null && postThreads.size() > previousThreads.size()) {
-
-         buffer.append("*********************************************************************************\n");
-         buffer.append("LEAKING THREADS\n");
-
-         for (Thread aliveThread : postThreads.keySet()) {
-            if (!isExpectedThread(aliveThread) && !previousThreads.containsKey(aliveThread)) {
-               failedThread = true;
-               buffer.append("=============================================================================\n");
-               buffer.append("Thread " + aliveThread + " is still alive with the following stackTrace:\n");
-               StackTraceElement[] elements = postThreads.get(aliveThread);
-               for (StackTraceElement el : elements) {
-                  buffer.append(el + "\n");
-               }
-            }
-
-         }
-         buffer.append("*********************************************************************************\n");
-
-      }
-      return failedThread;
-   }
-
-   /**
-    * if it's an expected thread... we will just move along ignoring it
-    *
-    * @param thread
-    * @return
-    */
-   private boolean isExpectedThread(Thread thread) {
-      final String threadName = thread.getName();
-      final ThreadGroup group = thread.getThreadGroup();
-      final boolean isSystemThread = group != null && "system".equals(group.getName());
-      final String javaVendor = System.getProperty("java.vendor");
-
-      if (threadName.contains("SunPKCS11")) {
-         return true;
-      }
-      else if (threadName.contains("Attach Listener")) {
-         return true;
-      }
-      else if ((javaVendor.contains("IBM") || isSystemThread) && threadName.equals("process reaper")) {
-         return true;
-      }
-      else if (javaVendor.contains("IBM") && threadName.equals("MemoryPoolMXBean notification dispatcher")) {
-         return true;
-      }
-      else if (threadName.contains("globalEventExecutor")) {
-         return true;
-      }
-      else if (threadName.contains("threadDeathWatcher")) {
-         return true;
-      }
-      else if (threadName.contains("netty-threads")) {
-         // This is ok as we use EventLoopGroup.shutdownGracefully() which will shutdown things with a bit of delay
-         // if the EventLoop's are still busy.
-         return true;
-      }
-      else if (threadName.contains("threadDeathWatcher")) {
-         //another netty thread
-         return true;
-      }
-      else {
-         for (StackTraceElement element : thread.getStackTrace()) {
-            if (element.getClassName().contains("org.jboss.byteman.agent.TransformListener")) {
-               return true;
-            }
-         }
-         return false;
-      }
-   }
-
    private void checkFilesUsage() {
 
       long timeout = System.currentTimeMillis() + 15000;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e56ca95f/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ThreadLeakCheckRule.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ThreadLeakCheckRule.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ThreadLeakCheckRule.java
new file mode 100644
index 0000000..3f371a8
--- /dev/null
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ThreadLeakCheckRule.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.util;
+
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.rules.ExternalResource;
+
+/**
+ * This is useful to make sure you won't have leaking threads between tests
+ */
+public class ThreadLeakCheckRule extends ExternalResource {
+
+   boolean enabled = true;
+
+   private Map<Thread, StackTraceElement[]> previousThreads;
+
+   public void disable() {
+      enabled = false;
+   }
+
+   /**
+    * Override to set up your specific external resource.
+    *
+    * @throws if setup fails (which will disable {@code after}
+    */
+   protected void before() throws Throwable {
+      // do nothing
+
+      previousThreads = Thread.getAllStackTraces();
+
+   }
+
+   /**
+    * Override to tear down your specific external resource.
+    */
+   protected void after() {
+      if (enabled) {
+         StringBuffer buffer = null;
+
+         boolean failed = true;
+
+         boolean failedOnce = false;
+
+         long timeout = System.currentTimeMillis() + 60000;
+         while (failed && timeout > System.currentTimeMillis()) {
+            buffer = new StringBuffer();
+
+            failed = checkThread(buffer);
+
+            if (failed) {
+               failedOnce = true;
+               ActiveMQTestBase.forceGC();
+               try {
+                  Thread.sleep(500);
+               }
+               catch (Throwable e) {
+               }
+
+               System.out.println("There are still threads running, trying again");
+               System.out.println(buffer);
+            }
+         }
+
+         if (failed) {
+            System.out.println("Thread leaked on test \n" +
+                               buffer);
+            System.out.println("Thread leakage! Failure!!!");
+
+            Assert.fail("Thread leaked");
+         }
+         else if (failedOnce) {
+            System.out.println("******************** Threads cleared after retries ********************");
+            System.out.println();
+         }
+
+
+      }
+      else {
+         enabled = true;
+      }
+
+   }
+
+
+
+   /**
+    * @param buffer
+    * @return
+    */
+   private boolean checkThread(StringBuffer buffer) {
+      boolean failedThread = false;
+
+      Map<Thread, StackTraceElement[]> postThreads = Thread.getAllStackTraces();
+
+      if (postThreads != null && previousThreads != null && postThreads.size() > previousThreads.size()) {
+
+         buffer.append("*********************************************************************************\n");
+         buffer.append("LEAKING THREADS\n");
+
+         for (Thread aliveThread : postThreads.keySet()) {
+            if (!isExpectedThread(aliveThread) && !previousThreads.containsKey(aliveThread)) {
+               failedThread = true;
+               buffer.append("=============================================================================\n");
+               buffer.append("Thread " + aliveThread + " is still alive with the following stackTrace:\n");
+               StackTraceElement[] elements = postThreads.get(aliveThread);
+               for (StackTraceElement el : elements) {
+                  buffer.append(el + "\n");
+               }
+            }
+
+         }
+         buffer.append("*********************************************************************************\n");
+
+      }
+      return failedThread;
+   }
+
+
+   /**
+    * if it's an expected thread... we will just move along ignoring it
+    *
+    * @param thread
+    * @return
+    */
+   private boolean isExpectedThread(Thread thread) {
+      final String threadName = thread.getName();
+      final ThreadGroup group = thread.getThreadGroup();
+      final boolean isSystemThread = group != null && "system".equals(group.getName());
+      final String javaVendor = System.getProperty("java.vendor");
+
+      if (threadName.contains("SunPKCS11")) {
+         return true;
+      }
+      else if (threadName.contains("Attach Listener")) {
+         return true;
+      }
+      else if ((javaVendor.contains("IBM") || isSystemThread) && threadName.equals("process reaper")) {
+         return true;
+      }
+      else if ((javaVendor.contains("IBM") || isSystemThread) && threadName.equals("ClassCache Reaper")) {
+         return true;
+      }
+      else if (javaVendor.contains("IBM") && threadName.equals("MemoryPoolMXBean notification dispatcher")) {
+         return true;
+      }
+      else if (threadName.contains("globalEventExecutor")) {
+         return true;
+      }
+      else if (threadName.contains("threadDeathWatcher")) {
+         return true;
+      }
+      else if (threadName.contains("netty-threads")) {
+         // This is ok as we use EventLoopGroup.shutdownGracefully() which will shutdown things with a bit of delay
+         // if the EventLoop's are still busy.
+         return true;
+      }
+      else if (threadName.contains("threadDeathWatcher")) {
+         //another netty thread
+         return true;
+      }
+      else {
+         for (StackTraceElement element : thread.getStackTrace()) {
+            if (element.getClassName().contains("org.jboss.byteman.agent.TransformListener")) {
+               return true;
+            }
+         }
+         return false;
+      }
+   }
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e56ca95f/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/ra/MDBMultipleHandlersServerDisconnectTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/ra/MDBMultipleHandlersServerDisconnectTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/ra/MDBMultipleHandlersServerDisconnectTest.java
index 8077a33..5c4e854 100644
--- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/ra/MDBMultipleHandlersServerDisconnectTest.java
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/ra/MDBMultipleHandlersServerDisconnectTest.java
@@ -345,7 +345,7 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase
          System.out.println(writer.toString());
       }
 
-      Assert.assertFalse(failed);
+      Assert.assertFalse(writer.toString(), failed);
 
       System.out.println("Received " + NUMBER_OF_MESSAGES + " messages");
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e56ca95f/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/broadcast/JGroupsBroadcastTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/broadcast/JGroupsBroadcastTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/broadcast/JGroupsBroadcastTest.java
index cae7437..53a6783 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/broadcast/JGroupsBroadcastTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/broadcast/JGroupsBroadcastTest.java
@@ -19,18 +19,19 @@ package org.apache.activemq.artemis.tests.integration.broadcast;
 import org.apache.activemq.artemis.api.core.BroadcastEndpoint;
 import org.apache.activemq.artemis.api.core.BroadcastEndpointFactory;
 import org.apache.activemq.artemis.api.core.ChannelBroadcastEndpointFactory;
-import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.ThreadLeakCheckRule;
 import org.jgroups.JChannel;
 import org.jgroups.conf.PlainConfigurator;
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
 
-public class JGroupsBroadcastTest extends ActiveMQTestBase {
-
-   private final String jgroupsConfigString = "UDP(oob_thread_pool.max_threads=300;" + "bind_addr=127.0.0.1;oob_thread_pool.keep_alive_time=1000;" + "max_bundle_size=31k;mcast_send_buf_size=640000;" + "internal_thread_pool.keep_alive_time=60000;" + "internal_thread_pool.rejection_policy=discard;" + "mcast_recv_buf_size=25000000;bind_port=55200;" + "internal_thread_pool.queue_max_size=100;" + "mcast_port=45688;thread_pool.min_threads=20;" + "oob_thread_pool.rejection_policy=discard;" + "thread_pool.max_threads=300;enable_diagnostics=false;" + "thread_pool.enabled=true;internal_thread_pool.queue_enabled=true;" + "ucast_recv_buf_size=20000000;ucast_send_buf_size=640000;" + "internal_thread_pool.enabled=true;oob_thread_pool.enabled=true;" + "ip_ttl=2;thread_pool.rejection_policy=discard;thread_pool.keep_alive_time=5000;" + "internal_thread_pool.max_threads=10;thread_pool.queue_enabled=true;" + "mcast_addr=230.0.0.4;singleton_name=udp;max_bundle_timeout=30;" + "oob_thread_pool.queue_enab
 led=false;internal_thread_pool.min_threads=1;" + "bundler_type=old;oob_thread_pool.min_threads=20;" + "thread_pool.queue_max_size=1000):PING(num_initial_members=3;" + "timeout=2000):MERGE3(min_interval=20000;max_interval=100000)" + ":FD_SOCK(bind_addr=127.0.0.1;start_port=54200):FD_ALL(interval=3000;" + "timeout=15000):VERIFY_SUSPECT(bind_addr=127.0.0.1;" + "timeout=1500):pbcast.NAKACK2(max_msg_batch_size=100;" + "xmit_table_msgs_per_row=10000;xmit_table_max_compaction_time=10000;" + "xmit_table_num_rows=100;xmit_interval=1000):UNICAST3(xmit_table_msgs_per_row=10000;" + "xmit_table_max_compaction_time=10000;xmit_table_num_rows=20)" + ":pbcast.STABLE(desired_avg_gossip=50000;max_bytes=400000;" + "stability_delay=1000):pbcast.GMS(print_local_addr=true;" + "view_bundling=true;join_timeout=3000;view_ack_collection_timeout=5000;" + "resume_task_timeout=7500):UFC(max_credits=1m;min_threshold=0.40)" + ":MFC(max_credits=1m;min_threshold=0.40):FRAG2(frag_size=30k)" + ":RSVP(resend_interval=5
 00;ack_on_delivery=false;timeout=60000)";
-
+public class JGroupsBroadcastTest {
 
+   @Rule
+   public ThreadLeakCheckRule threadLeakCheckRule = new ThreadLeakCheckRule();
 
+   private final String jgroupsConfigString = "UDP(oob_thread_pool.max_threads=300;" + "bind_addr=127.0.0.1;oob_thread_pool.keep_alive_time=1000;" + "max_bundle_size=31k;mcast_send_buf_size=640000;" + "internal_thread_pool.keep_alive_time=60000;" + "internal_thread_pool.rejection_policy=discard;" + "mcast_recv_buf_size=25000000;bind_port=55200;" + "internal_thread_pool.queue_max_size=100;" + "mcast_port=45688;thread_pool.min_threads=20;" + "oob_thread_pool.rejection_policy=discard;" + "thread_pool.max_threads=300;enable_diagnostics=false;" + "thread_pool.enabled=true;internal_thread_pool.queue_enabled=true;" + "ucast_recv_buf_size=20000000;ucast_send_buf_size=640000;" + "internal_thread_pool.enabled=true;oob_thread_pool.enabled=true;" + "ip_ttl=2;thread_pool.rejection_policy=discard;thread_pool.keep_alive_time=5000;" + "internal_thread_pool.max_threads=10;thread_pool.queue_enabled=true;" + "mcast_addr=230.0.0.4;singleton_name=udp;max_bundle_timeout=30;" + "oob_thread_pool.queue_enab
 led=false;internal_thread_pool.min_threads=1;" + "bundler_type=old;oob_thread_pool.min_threads=20;" + "thread_pool.queue_max_size=1000):PING(num_initial_members=3;" + "timeout=2000):MERGE3(min_interval=20000;max_interval=100000)" + ":FD_SOCK(bind_addr=127.0.0.1;start_port=54200):FD_ALL(interval=3000;" + "timeout=15000):VERIFY_SUSPECT(bind_addr=127.0.0.1;" + "timeout=1500):pbcast.NAKACK2(max_msg_batch_size=100;" + "xmit_table_msgs_per_row=10000;xmit_table_max_compaction_time=10000;" + "xmit_table_num_rows=100;xmit_interval=1000):UNICAST3(xmit_table_msgs_per_row=10000;" + "xmit_table_max_compaction_time=10000;xmit_table_num_rows=20)" + ":pbcast.STABLE(desired_avg_gossip=50000;max_bytes=400000;" + "stability_delay=1000):pbcast.GMS(print_local_addr=true;" + "view_bundling=true;join_timeout=3000;view_ack_collection_timeout=5000;" + "resume_task_timeout=7500):UFC(max_credits=1m;min_threshold=0.40)" + ":MFC(max_credits=1m;min_threshold=0.40):FRAG2(frag_size=30k)" + ":RSVP(resend_interval=5
 00;ack_on_delivery=false;timeout=60000)";
 
    @Test
    public void testRefCount() throws Exception {
@@ -83,7 +84,6 @@ public class JGroupsBroadcastTest extends ActiveMQTestBase {
 
          channelEndpoint1.openClient();
 
-
       }
       catch (Exception e) {
          e.printStackTrace();