You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2015/09/11 16:06:06 UTC

[6/8] activemq-artemis git commit: ARTEMIS-222 fixing a deadlock that appeared on the testsuite (MultipleThreadsOpeningTest)

ARTEMIS-222 fixing a deadlock that appeared on the testsuite (MultipleThreadsOpeningTest)

https://issues.apache.org/jira/browse/ARTEMIS-222


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/f5a72725
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/f5a72725
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/f5a72725

Branch: refs/heads/master
Commit: f5a727259e7d913c49130e57155ad7aa2616f79f
Parents: d5a0128
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Sep 10 16:22:38 2015 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Sep 11 09:07:49 2015 -0400

----------------------------------------------------------------------
 .../core/client/impl/ServerLocatorImpl.java     | 71 +++++++++++---------
 .../jms/cluster/MultipleThreadsOpeningTest.java | 33 ++++++---
 2 files changed, 64 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f5a72725/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
index bed47b7..c979246 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
@@ -561,32 +561,34 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
       clusterTransportConfiguration = locator.clusterTransportConfiguration;
    }
 
-   private synchronized TransportConfiguration selectConnector() {
+   private TransportConfiguration selectConnector() {
       Pair<TransportConfiguration, TransportConfiguration>[] usedTopology;
 
       synchronized (topologyArrayGuard) {
          usedTopology = topologyArray;
       }
 
-      // if the topologyArray is null, we will use the initialConnectors
-      if (usedTopology != null) {
-         if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
-            ActiveMQClientLogger.LOGGER.trace("Selecting connector from toplogy.");
-         }
-         int pos = loadBalancingPolicy.select(usedTopology.length);
-         Pair<TransportConfiguration, TransportConfiguration> pair = usedTopology[pos];
+      synchronized (this) {
+         // if the topologyArray is null, we will use the initialConnectors
+         if (usedTopology != null) {
+            if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
+               ActiveMQClientLogger.LOGGER.trace("Selecting connector from toplogy.");
+            }
+            int pos = loadBalancingPolicy.select(usedTopology.length);
+            Pair<TransportConfiguration, TransportConfiguration> pair = usedTopology[pos];
 
-         return pair.getA();
-      }
-      else {
-         // Get from initialconnectors
-         if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
-            ActiveMQClientLogger.LOGGER.trace("Selecting connector from initial connectors.");
+            return pair.getA();
          }
+         else {
+            // Get from initialconnectors
+            if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
+               ActiveMQClientLogger.LOGGER.trace("Selecting connector from initial connectors.");
+            }
 
-         int pos = loadBalancingPolicy.select(initialConnectors.length);
+            int pos = loadBalancingPolicy.select(initialConnectors.length);
 
-         return initialConnectors[pos];
+            return initialConnectors[pos];
+         }
       }
    }
 
@@ -637,16 +639,23 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
    }
 
    private ClientSessionFactoryInternal connect(final boolean skipWarnings) throws ActiveMQException {
+      ClientSessionFactoryInternal returnFactory = null;
+
       synchronized (this) {
          // static list of initial connectors
          if (getNumInitialConnectors() > 0 && discoveryGroup == null) {
-            ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal) staticConnector.connect(skipWarnings);
-            addFactory(sf);
-            return sf;
+            returnFactory = (ClientSessionFactoryInternal) staticConnector.connect(skipWarnings);
          }
       }
-      // wait for discovery group to get the list of initial connectors
-      return (ClientSessionFactoryInternal) createSessionFactory();
+
+      if (returnFactory != null) {
+         addFactory(returnFactory);
+         return returnFactory;
+      }
+      else {
+         // wait for discovery group to get the list of initial connectors
+         return (ClientSessionFactoryInternal) createSessionFactory();
+      }
    }
 
    @Override
@@ -844,11 +853,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
                factory.cleanup();
             throw ActiveMQClientMessageBundle.BUNDLE.connectionTimedOutOnReceiveTopology(discoveryGroup);
          }
+      }
 
-         addFactory(factory);
+      addFactory(factory);
 
-         return factory;
-      }
+      return factory;
    }
 
    public boolean isHA() {
@@ -1494,16 +1503,18 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
    }
 
    public void factoryClosed(final ClientSessionFactory factory) {
+      boolean isEmpty;
       synchronized (factories) {
          factories.remove(factory);
+         isEmpty = factories.isEmpty();
+      }
 
-         if (!clusterConnection && factories.isEmpty()) {
-            // Go back to using the broadcast or static list
-            synchronized (topologyArrayGuard) {
-               receivedTopology = false;
+      if (!clusterConnection && isEmpty) {
+         // Go back to using the broadcast or static list
+         synchronized (topologyArrayGuard) {
+            receivedTopology = false;
 
-               topologyArray = null;
-            }
+            topologyArray = null;
          }
       }
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f5a72725/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/MultipleThreadsOpeningTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/MultipleThreadsOpeningTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/MultipleThreadsOpeningTest.java
index 47669da..6475a13 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/MultipleThreadsOpeningTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/MultipleThreadsOpeningTest.java
@@ -23,8 +23,8 @@ import java.util.concurrent.CountDownLatch;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
 import org.apache.activemq.artemis.api.jms.JMSFactoryType;
-import org.apache.activemq.artemis.tests.util.JMSClusteredTestBase;
 import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory;
+import org.apache.activemq.artemis.tests.util.JMSClusteredTestBase;
 import org.junit.Test;
 
 public class MultipleThreadsOpeningTest extends JMSClusteredTestBase {
@@ -33,7 +33,7 @@ public class MultipleThreadsOpeningTest extends JMSClusteredTestBase {
    public void testMultipleOpen() throws Exception {
       cf1 = ActiveMQJMSClient.createConnectionFactoryWithHA(JMSFactoryType.CF, new TransportConfiguration(InVMConnectorFactory.class.getName(), generateInVMParams(1)));
 
-      final int numberOfOpens = 2000;
+      final int numberOfOpens = 500;
       int numberOfThreads = 20;
       // I want all the threads aligned, just ready to start creating connections like in a car race
       final CountDownLatch flagAlignSemaphore = new CountDownLatch(numberOfThreads);
@@ -41,6 +41,10 @@ public class MultipleThreadsOpeningTest extends JMSClusteredTestBase {
 
       class ThreadOpen extends Thread {
 
+         ThreadOpen(int i) {
+            super("MultipleThreadsOpeningTest/ThreadOpen::" + i);
+         }
+
          int errors = 0;
 
          public void run() {
@@ -50,8 +54,8 @@ public class MultipleThreadsOpeningTest extends JMSClusteredTestBase {
                flagStartRace.await();
 
                for (int i = 0; i < numberOfOpens; i++) {
-                  if (i % 1000 == 0)
-                     System.out.println("tests " + i);
+                  if (i % 100 == 0)
+                     System.out.println("connections created on Thread " + Thread.currentThread() + " " + i);
                   Connection conn = cf1.createConnection();
                   Session sess = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
                   sess.close();
@@ -68,18 +72,27 @@ public class MultipleThreadsOpeningTest extends JMSClusteredTestBase {
       ThreadOpen[] threads = new ThreadOpen[numberOfThreads];
 
       for (int i = 0; i < numberOfThreads; i++) {
-         threads[i] = new ThreadOpen();
+         threads[i] = new ThreadOpen(i);
          threads[i].start();
       }
 
       flagAlignSemaphore.await();
       flagStartRace.countDown();
 
-      for (ThreadOpen t : threads) {
-         // 5 minutes seems long but this may take a bit of time in a slower box
-         t.join(300000);
-         assertFalse(t.isAlive());
-         assertEquals("There are Errors on the test thread", 0, t.errors);
+      try {
+         for (ThreadOpen t : threads) {
+            t.join(60000);
+            assertFalse(t.isAlive());
+            assertEquals("There are Errors on the test thread", 0, t.errors);
+         }
+      }
+      finally {
+         for (ThreadOpen t : threads) {
+            if (t.isAlive()) {
+               t.interrupt();
+            }
+            t.join(1000);
+         }
       }
    }
 }