You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2015/09/11 16:06:06 UTC
[6/8] activemq-artemis git commit: ARTEMIS-222 fixing a deadlock that
appeared on the testsuite (MultipleThreadsOpeningTest)
ARTEMIS-222 fixing a deadlock that appeared on the testsuite (MultipleThreadsOpeningTest)
https://issues.apache.org/jira/browse/ARTEMIS-222
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/f5a72725
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/f5a72725
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/f5a72725
Branch: refs/heads/master
Commit: f5a727259e7d913c49130e57155ad7aa2616f79f
Parents: d5a0128
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Sep 10 16:22:38 2015 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Sep 11 09:07:49 2015 -0400
----------------------------------------------------------------------
.../core/client/impl/ServerLocatorImpl.java | 71 +++++++++++---------
.../jms/cluster/MultipleThreadsOpeningTest.java | 33 ++++++---
2 files changed, 64 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f5a72725/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
index bed47b7..c979246 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
@@ -561,32 +561,34 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
clusterTransportConfiguration = locator.clusterTransportConfiguration;
}
- private synchronized TransportConfiguration selectConnector() {
+ private TransportConfiguration selectConnector() {
Pair<TransportConfiguration, TransportConfiguration>[] usedTopology;
synchronized (topologyArrayGuard) {
usedTopology = topologyArray;
}
- // if the topologyArray is null, we will use the initialConnectors
- if (usedTopology != null) {
- if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
- ActiveMQClientLogger.LOGGER.trace("Selecting connector from toplogy.");
- }
- int pos = loadBalancingPolicy.select(usedTopology.length);
- Pair<TransportConfiguration, TransportConfiguration> pair = usedTopology[pos];
+ synchronized (this) {
+ // if the topologyArray is null, we will use the initialConnectors
+ if (usedTopology != null) {
+ if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
+ ActiveMQClientLogger.LOGGER.trace("Selecting connector from toplogy.");
+ }
+ int pos = loadBalancingPolicy.select(usedTopology.length);
+ Pair<TransportConfiguration, TransportConfiguration> pair = usedTopology[pos];
- return pair.getA();
- }
- else {
- // Get from initialconnectors
- if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
- ActiveMQClientLogger.LOGGER.trace("Selecting connector from initial connectors.");
+ return pair.getA();
}
+ else {
+ // Get from initialconnectors
+ if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
+ ActiveMQClientLogger.LOGGER.trace("Selecting connector from initial connectors.");
+ }
- int pos = loadBalancingPolicy.select(initialConnectors.length);
+ int pos = loadBalancingPolicy.select(initialConnectors.length);
- return initialConnectors[pos];
+ return initialConnectors[pos];
+ }
}
}
@@ -637,16 +639,23 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
}
private ClientSessionFactoryInternal connect(final boolean skipWarnings) throws ActiveMQException {
+ ClientSessionFactoryInternal returnFactory = null;
+
synchronized (this) {
// static list of initial connectors
if (getNumInitialConnectors() > 0 && discoveryGroup == null) {
- ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal) staticConnector.connect(skipWarnings);
- addFactory(sf);
- return sf;
+ returnFactory = (ClientSessionFactoryInternal) staticConnector.connect(skipWarnings);
}
}
- // wait for discovery group to get the list of initial connectors
- return (ClientSessionFactoryInternal) createSessionFactory();
+
+ if (returnFactory != null) {
+ addFactory(returnFactory);
+ return returnFactory;
+ }
+ else {
+ // wait for discovery group to get the list of initial connectors
+ return (ClientSessionFactoryInternal) createSessionFactory();
+ }
}
@Override
@@ -844,11 +853,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
factory.cleanup();
throw ActiveMQClientMessageBundle.BUNDLE.connectionTimedOutOnReceiveTopology(discoveryGroup);
}
+ }
- addFactory(factory);
+ addFactory(factory);
- return factory;
- }
+ return factory;
}
public boolean isHA() {
@@ -1494,16 +1503,18 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
}
public void factoryClosed(final ClientSessionFactory factory) {
+ boolean isEmpty;
synchronized (factories) {
factories.remove(factory);
+ isEmpty = factories.isEmpty();
+ }
- if (!clusterConnection && factories.isEmpty()) {
- // Go back to using the broadcast or static list
- synchronized (topologyArrayGuard) {
- receivedTopology = false;
+ if (!clusterConnection && isEmpty) {
+ // Go back to using the broadcast or static list
+ synchronized (topologyArrayGuard) {
+ receivedTopology = false;
- topologyArray = null;
- }
+ topologyArray = null;
}
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f5a72725/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/MultipleThreadsOpeningTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/MultipleThreadsOpeningTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/MultipleThreadsOpeningTest.java
index 47669da..6475a13 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/MultipleThreadsOpeningTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/MultipleThreadsOpeningTest.java
@@ -23,8 +23,8 @@ import java.util.concurrent.CountDownLatch;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.api.jms.JMSFactoryType;
-import org.apache.activemq.artemis.tests.util.JMSClusteredTestBase;
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory;
+import org.apache.activemq.artemis.tests.util.JMSClusteredTestBase;
import org.junit.Test;
public class MultipleThreadsOpeningTest extends JMSClusteredTestBase {
@@ -33,7 +33,7 @@ public class MultipleThreadsOpeningTest extends JMSClusteredTestBase {
public void testMultipleOpen() throws Exception {
cf1 = ActiveMQJMSClient.createConnectionFactoryWithHA(JMSFactoryType.CF, new TransportConfiguration(InVMConnectorFactory.class.getName(), generateInVMParams(1)));
- final int numberOfOpens = 2000;
+ final int numberOfOpens = 500;
int numberOfThreads = 20;
// I want all the threads aligned, just ready to start creating connections like in a car race
final CountDownLatch flagAlignSemaphore = new CountDownLatch(numberOfThreads);
@@ -41,6 +41,10 @@ public class MultipleThreadsOpeningTest extends JMSClusteredTestBase {
class ThreadOpen extends Thread {
+ ThreadOpen(int i) {
+ super("MultipleThreadsOpeningTest/ThreadOpen::" + i);
+ }
+
int errors = 0;
public void run() {
@@ -50,8 +54,8 @@ public class MultipleThreadsOpeningTest extends JMSClusteredTestBase {
flagStartRace.await();
for (int i = 0; i < numberOfOpens; i++) {
- if (i % 1000 == 0)
- System.out.println("tests " + i);
+ if (i % 100 == 0)
+ System.out.println("connections created on Thread " + Thread.currentThread() + " " + i);
Connection conn = cf1.createConnection();
Session sess = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
sess.close();
@@ -68,18 +72,27 @@ public class MultipleThreadsOpeningTest extends JMSClusteredTestBase {
ThreadOpen[] threads = new ThreadOpen[numberOfThreads];
for (int i = 0; i < numberOfThreads; i++) {
- threads[i] = new ThreadOpen();
+ threads[i] = new ThreadOpen(i);
threads[i].start();
}
flagAlignSemaphore.await();
flagStartRace.countDown();
- for (ThreadOpen t : threads) {
- // 5 minutes seems long but this may take a bit of time in a slower box
- t.join(300000);
- assertFalse(t.isAlive());
- assertEquals("There are Errors on the test thread", 0, t.errors);
+ try {
+ for (ThreadOpen t : threads) {
+ t.join(60000);
+ assertFalse(t.isAlive());
+ assertEquals("There are Errors on the test thread", 0, t.errors);
+ }
+ }
+ finally {
+ for (ThreadOpen t : threads) {
+ if (t.isAlive()) {
+ t.interrupt();
+ }
+ t.join(1000);
+ }
}
}
}