You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/09/10 19:02:36 UTC
[activemq-artemis] branch master updated: ARTEMIS-2479 Initial
connection will not work with infinite retry and multiple nodes
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new 60b6294 ARTEMIS-2479 Initial connection will not work with infinite retry and multiple nodes
new b583ba7 This closes #2830
60b6294 is described below
commit 60b62940b9887b962efd7f2e54ba324042e67c0b
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Mon Sep 9 15:36:06 2019 -0400
ARTEMIS-2479 Initial connection will not work with infinite retry and multiple nodes
---
.../core/client/impl/ClientSessionFactoryImpl.java | 21 ++++--
.../client/impl/ClientSessionFactoryInternal.java | 2 +
.../core/client/impl/ServerLocatorImpl.java | 49 ++++++++++----
.../integration/client/InitialConnectionTest.java | 79 ++++++++++++++++++++++
.../integration/ra/ActiveMQClusteredTest.java | 37 +++++-----
5 files changed, 147 insertions(+), 41 deletions(-)
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
index 47237b5..4ca1af1 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
@@ -826,13 +826,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
ClientSessionFactoryImpl.logger.trace("Waiting " + interval + " milliseconds before next retry. RetryInterval=" + retryInterval + " and multiplier=" + retryIntervalMultiplier);
}
- try {
- if (clientProtocolManager.waitOnLatch(interval)) {
- return;
- }
- } catch (InterruptedException ignore) {
- throw new ActiveMQInterruptedException(createTrace);
- }
+ if (waitForRetry(interval))
+ return;
// Exponential back-off
long newInterval = (long) (interval * retryIntervalMultiplier);
@@ -850,6 +845,18 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
}
}
+ @Override
+ public boolean waitForRetry(long interval) {
+ try {
+ if (clientProtocolManager.waitOnLatch(interval)) {
+ return true;
+ }
+ } catch (InterruptedException ignore) {
+ throw new ActiveMQInterruptedException(createTrace);
+ }
+ return false;
+ }
+
private void cancelScheduledTasks() {
Future<?> pingerFutureLocal = pingerFuture;
if (pingerFutureLocal != null) {
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryInternal.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryInternal.java
index f72ca5e..408c8e7 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryInternal.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryInternal.java
@@ -60,4 +60,6 @@ public interface ClientSessionFactoryInternal extends ClientSessionFactory {
ConfirmationWindowWarning getConfirmationWindowWarning();
Lock lockFailover();
+
+ boolean waitForRetry(long interval);
}
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
index a05a8af..fd3507e 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
@@ -563,6 +563,24 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
}
}
+ private int getConnectorsSize() {
+ Pair<TransportConfiguration, TransportConfiguration>[] usedTopology;
+
+ flushTopology();
+
+ synchronized (topologyArrayGuard) {
+ usedTopology = topologyArray;
+ }
+
+ synchronized (this) {
+ if (usedTopology != null && useTopologyForLoadBalancing) {
+ return usedTopology.length;
+ } else {
+ return initialConnectors.length;
+ }
+ }
+ }
+
@Override
public void start(Executor executor) throws Exception {
initialize();
@@ -764,9 +782,9 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
ClientSessionFactoryInternal factory = null;
synchronized (this) {
- boolean retry;
+ boolean retry = true;
int attempts = 0;
- do {
+ while (retry && !isClosed()) {
retry = false;
TransportConfiguration tc = selectConnector();
@@ -780,31 +798,36 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
factory = new ClientSessionFactoryImpl(this, tc, callTimeout, callFailoverTimeout, clientFailureCheckPeriod, connectionTTL, retryInterval, retryIntervalMultiplier, maxRetryInterval, reconnectAttempts, threadPool, scheduledThreadPool, incomingInterceptors, outgoingInterceptors);
try {
addToConnecting(factory);
- factory.connect(initialConnectAttempts, failoverOnInitialConnection);
+ // We always try to connect here with only one attempt,
+ // as we will perform the initial retry here, looking for all possible connectors
+ factory.connect(1, false);
} finally {
removeFromConnecting(factory);
}
} catch (ActiveMQException e) {
- factory.close();
- if (e.getType() == ActiveMQExceptionType.NOT_CONNECTED) {
- attempts++;
+ try {
+ if (e.getType() == ActiveMQExceptionType.NOT_CONNECTED) {
+ attempts++;
- synchronized (topologyArrayGuard) {
+ int connectorsSize = getConnectorsSize();
+ int maxAttempts = initialConnectAttempts == 0 ? 1 : initialConnectAttempts;
- if (topologyArray != null && attempts == topologyArray.length) {
+ if (initialConnectAttempts >= 0 && attempts >= maxAttempts * connectorsSize) {
throw ActiveMQClientMessageBundle.BUNDLE.cannotConnectToServers();
}
- if (topologyArray == null && attempts == this.getNumInitialConnectors()) {
+ if (factory.waitForRetry(retryInterval)) {
throw ActiveMQClientMessageBundle.BUNDLE.cannotConnectToServers();
}
+ retry = true;
+ } else {
+ throw e;
}
- retry = true;
- } else {
- throw e;
+ } finally {
+
+ factory.close();
}
}
}
- while (retry);
}
// ATM topology is never != null. Checking here just to be consistent with
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InitialConnectionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InitialConnectionTest.java
new file mode 100644
index 0000000..dc93db4
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InitialConnectionTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.client;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class InitialConnectionTest extends ActiveMQTestBase {
+
+ @Test
+ public void testInitialInfinite() throws Exception {
+ AtomicInteger errors = new AtomicInteger(0);
+
+ ActiveMQServer server = createServer(false, true);
+
+ Thread t = new Thread() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(500);
+ server.start();
+ } catch (Throwable e) {
+ e.printStackTrace();
+ errors.incrementAndGet();
+ }
+ }
+ };
+ t.start();
+ ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("(tcp://localhost:61618,tcp://localhost:61616,tcp://localhost:61610)?ha=true&retryInterval=100&retryIntervalMultiplier=1.0&reconnectAttempts=-1&initialConnectAttempts=-1&useTopologyForLoadBalancing=true");
+ connectionFactory.createConnection().close();
+ connectionFactory.close();
+
+
+ t.join();
+
+ Assert.assertEquals(0, errors.get());
+ }
+
+
+ @Test
+ public void testNegativeMaxTries() throws Exception {
+
+ long timeStart = System.currentTimeMillis();
+ ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("(tcp://localhost:61618,tcp://localhost:61616,tcp://localhost:61610)?ha=true&retryInterval=100&retryIntervalMultiplier=1.0&reconnectAttempts=-1&initialConnectAttempts=2&useTopologyForLoadBalancing=true");
+ boolean failed = false;
+ try {
+ connectionFactory.createConnection();
+ } catch (JMSException e) {
+ // expected
+ failed = true;
+ }
+ Assert.assertTrue(failed);
+ long timeEnd = System.currentTimeMillis();
+ Assert.assertTrue("3 connectors, at 100 milliseconds each try, initialConnectAttempt=2, it should have waited at least 600 (- 100 from the last try that we don't actually wait, just throw ) milliseconds", timeEnd - timeStart >= 500);
+ }
+}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQClusteredTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQClusteredTest.java
index 8d886cb..a19b79e 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQClusteredTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQClusteredTest.java
@@ -39,6 +39,7 @@ import org.apache.activemq.artemis.ra.ActiveMQRASession;
import org.apache.activemq.artemis.ra.ActiveMQResourceAdapter;
import org.apache.activemq.artemis.ra.inflow.ActiveMQActivation;
import org.apache.activemq.artemis.ra.inflow.ActiveMQActivationSpec;
+import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Test;
public class ActiveMQClusteredTest extends ActiveMQRAClusteredTestBase {
@@ -193,6 +194,7 @@ public class ActiveMQClusteredTest extends ActiveMQRAClusteredTestBase {
spec.setMaxSession(CONSUMER_COUNT);
spec.setSetupAttempts(5);
spec.setSetupInterval(200L);
+ spec.setRetryInterval(100L);
spec.setReconnectAttempts(reconnectAttempts);
spec.setHA(true); // if this isn't true then the topology listener won't get nodeDown notifications
spec.setCallTimeout(500L); // if this isn't set then it may take a long time for tearDown to occur on the MDB connection
@@ -220,32 +222,25 @@ public class ActiveMQClusteredTest extends ActiveMQRAClusteredTestBase {
assertNotNull(endpoint.lastMessage);
assertEquals(endpoint.lastMessage.getCoreMessage().getBodyBuffer().readString(), "test");
- for (int i = 0; i < 10; i++) {
- secondaryServer.stop();
-
- long mark = System.currentTimeMillis();
- long timeout = 5000;
- while (primaryQueue.getConsumerCount() < CONSUMER_COUNT && (System.currentTimeMillis() - mark) < timeout) {
- Thread.sleep(100);
- }
+ try {
+ for (int i = 0; i < 10; i++) {
+ secondaryServer.stop();
- assertTrue(primaryQueue.getConsumerCount() == CONSUMER_COUNT);
+ Wait.assertTrue(() -> primaryQueue.getConsumerCount() == CONSUMER_COUNT);
- secondaryServer.start();
- waitForServerToStart(secondaryServer);
- secondaryQueue = secondaryServer.locateQueue(MDBQUEUEPREFIXEDSIMPLE);
+ secondaryServer.start();
+ waitForServerToStart(secondaryServer);
+ secondaryQueue = secondaryServer.locateQueue(MDBQUEUEPREFIXEDSIMPLE);
- mark = System.currentTimeMillis();
- while (((primaryQueue.getConsumerCount() + secondaryQueue.getConsumerCount()) < (CONSUMER_COUNT) || primaryQueue.getConsumerCount() == CONSUMER_COUNT) && (System.currentTimeMillis() - mark) <= timeout) {
- Thread.sleep(100);
+ Queue secondaryQueueRef = secondaryQueue;
+ Wait.assertTrue(() -> primaryQueue.getConsumerCount() <= CONSUMER_COUNT);
+ Wait.assertTrue(() -> secondaryQueueRef.getConsumerCount() <= CONSUMER_COUNT);
+ Wait.assertTrue(() -> primaryQueue.getConsumerCount() + secondaryQueueRef.getConsumerCount() == CONSUMER_COUNT);
}
-
- assertTrue(primaryQueue.getConsumerCount() < CONSUMER_COUNT);
- assertTrue(secondaryQueue.getConsumerCount() < CONSUMER_COUNT);
- assertTrue(primaryQueue.getConsumerCount() + secondaryQueue.getConsumerCount() == CONSUMER_COUNT);
+ } finally {
+ qResourceAdapter.endpointDeactivation(endpointFactory, spec);
+ qResourceAdapter.stop();
}
- qResourceAdapter.endpointDeactivation(endpointFactory, spec);
- qResourceAdapter.stop();
}
}