You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@activemq.apache.org by "jbertram (via GitHub)" <gi...@apache.org> on 2023/05/22 02:36:04 UTC

[GitHub] [activemq-artemis] jbertram opened a new pull request, #4485: ARTEMIS-4286 sometimes federated consumer won't stop

jbertram opened a new pull request, #4485:
URL: https://github.com/apache/activemq-artemis/pull/4485

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] jbertram commented on a diff in pull request #4485: ARTEMIS-4286 sometimes federated consumer won't stop

Posted by "jbertram (via GitHub)" <gi...@apache.org>.
jbertram commented on code in PR #4485:
URL: https://github.com/apache/activemq-artemis/pull/4485#discussion_r1204398694


##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueConsumerTest.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.federation;
+
+import java.util.concurrent.ScheduledFuture;
+
+import org.apache.activemq.artemis.core.config.FederationConfiguration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.federation.FederatedQueueConsumerImpl;
+import org.apache.activemq.artemis.core.server.federation.Federation;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.RandomUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Test;
+
+public class FederatedQueueConsumerTest extends ActiveMQTestBase {
+
+   @Test
+   public void testClose() throws Exception {
+      ActiveMQServer server = createServer(false, createDefaultInVMConfig());
+      server.start();
+      Federation federation = new Federation(server, new FederationConfiguration().setName(RandomUtil.randomString()));
+      federation.start();
+      FederatedQueueConsumerImpl consumer = new FederatedQueueConsumerImpl(federation, server, null, null, null, null);
+      assertNull(consumer.getCurrentConnectTask());
+      consumer.start();
+      assertNotNull(consumer.getCurrentConnectTask());
+      consumer.close();
+      Wait.assertTrue(() -> {
+         ScheduledFuture task = consumer.getCurrentConnectTask();
+         return task.isDone() || task.isCancelled() && task == consumer.getCurrentConnectTask();

Review Comment:
   Ah yes. I forgot my parentheses.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] jbertram commented on a diff in pull request #4485: ARTEMIS-4286 sometimes federated consumer won't stop

Posted by "jbertram (via GitHub)" <gi...@apache.org>.
jbertram commented on code in PR #4485:
URL: https://github.com/apache/activemq-artemis/pull/4485#discussion_r1204262075


##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java:
##########
@@ -49,6 +55,8 @@ public class FederatedQueueConsumerImpl implements FederatedQueueConsumer, Sessi
    private final int intialConnectDelayMultiplier = 2;
    private final int intialConnectDelayMax = 30;
    private final ClientSessionCallback clientSessionCallback;
+   private volatile boolean started = false;

Review Comment:
   Understood.



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueConsumerTest.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.federation;
+
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.activemq.artemis.core.config.FederationConfiguration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.federation.FederatedQueueConsumerImpl;
+import org.apache.activemq.artemis.core.server.federation.Federation;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.RandomUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Test;
+
+public class FederatedQueueConsumerTest extends ActiveMQTestBase {
+
+   @Test
+   public void testClose() throws Exception {
+      ActiveMQServer server = createServer(false, createDefaultInVMConfig());
+      server.start();
+      Federation federation = new Federation(server, new FederationConfiguration().setName(RandomUtil.randomString()));
+      federation.start();
+      FederatedQueueConsumerImpl consumer = new FederatedQueueConsumerImpl(federation, server, null, null, null, null);
+      assertNull(consumer.getCurrentConnectTask());
+      consumer.start();
+      assertNotNull(consumer.getCurrentConnectTask());
+      consumer.close();
+      AtomicReference<ScheduledFuture> task = new AtomicReference<>();
+      Wait.assertTrue(() -> {
+         task.set(consumer.getCurrentConnectTask());
+         return task.get().isDone() || task.get().isCancelled();
+      }, 2000, 50);
+      assertTrue(task.get() == consumer.getCurrentConnectTask());

Review Comment:
   I see what you mean.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] gemmellr merged pull request #4485: ARTEMIS-4286 sometimes federated consumer won't stop

Posted by "gemmellr (via GitHub)" <gi...@apache.org>.
gemmellr merged PR #4485:
URL: https://github.com/apache/activemq-artemis/pull/4485


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] gemmellr commented on a diff in pull request #4485: ARTEMIS-4286 sometimes federated consumer won't stop

Posted by "gemmellr (via GitHub)" <gi...@apache.org>.
gemmellr commented on code in PR #4485:
URL: https://github.com/apache/activemq-artemis/pull/4485#discussion_r1204342852


##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueConsumerTest.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.federation;
+
+import java.util.concurrent.ScheduledFuture;
+
+import org.apache.activemq.artemis.core.config.FederationConfiguration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.federation.FederatedQueueConsumerImpl;
+import org.apache.activemq.artemis.core.server.federation.Federation;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.RandomUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Test;
+
+public class FederatedQueueConsumerTest extends ActiveMQTestBase {
+
+   @Test
+   public void testClose() throws Exception {
+      ActiveMQServer server = createServer(false, createDefaultInVMConfig());
+      server.start();
+      Federation federation = new Federation(server, new FederationConfiguration().setName(RandomUtil.randomString()));
+      federation.start();
+      FederatedQueueConsumerImpl consumer = new FederatedQueueConsumerImpl(federation, server, null, null, null, null);
+      assertNull(consumer.getCurrentConnectTask());
+      consumer.start();
+      assertNotNull(consumer.getCurrentConnectTask());
+      consumer.close();
+      Wait.assertTrue(() -> {
+         ScheduledFuture task = consumer.getCurrentConnectTask();
+         return task.isDone() || task.isCancelled() && task == consumer.getCurrentConnectTask();

Review Comment:
   This only checks the future when when it is cancelled, but needs to check it is the same when it is 'done', since it may reschedule while running, after we got it initially.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] gemmellr commented on a diff in pull request #4485: ARTEMIS-4286 sometimes federated consumer won't stop

Posted by "gemmellr (via GitHub)" <gi...@apache.org>.
gemmellr commented on code in PR #4485:
URL: https://github.com/apache/activemq-artemis/pull/4485#discussion_r1202740824


##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java:
##########
@@ -95,24 +103,31 @@ public int decrementCount() {
    }
 
    @Override
-   public void start() {
-      scheduleConnect(0);
+   public synchronized void start() {
+      if (!started) {
+         started = true;
+         scheduleConnect(0);
+      }
    }
 
    private void scheduleConnect(int delay) {
-      scheduledExecutorService.schedule(() -> {
+      currentConnectTask = scheduledExecutorService.schedule(() -> {
          try {
             connect();
          } catch (Exception e) {
-            scheduleConnect(FederatedQueueConsumer.getNextDelay(delay, intialConnectDelayMultiplier, intialConnectDelayMax));
+            int nextDelay = FederatedQueueConsumer.getNextDelay(delay, intialConnectDelayMultiplier, intialConnectDelayMax);
+            if (logger.isTraceEnabled()) {
+               logger.trace(this + " failed to connect. Scheduling reconnect in " + nextDelay + " seconds.", e);

Review Comment:
   All the methods take the last vararg to be an exception if it is one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] gemmellr commented on a diff in pull request #4485: ARTEMIS-4286 sometimes federated consumer won't stop

Posted by "gemmellr (via GitHub)" <gi...@apache.org>.
gemmellr commented on code in PR #4485:
URL: https://github.com/apache/activemq-artemis/pull/4485#discussion_r1200650735


##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java:
##########
@@ -109,41 +113,46 @@ private void scheduleConnect(int delay) {
       }, delay, TimeUnit.SECONDS);
    }
 
-   private void connect() throws Exception {
-      try {
-         if (clientConsumer == null) {
-            synchronized (this) {
-               this.clientSessionFactory = (ClientSessionFactoryInternal) upstream.getConnection().clientSessionFactory();
-               this.clientSession = clientSessionFactory.createSession(upstream.getUser(), upstream.getPassword(), false, true, true, clientSessionFactory.getServerLocator().isPreAcknowledge(), clientSessionFactory.getServerLocator().getAckBatchSize());
-               this.clientSession.addFailureListener(this);
-               this.clientSession.addMetaData(FEDERATION_NAME, federation.getName().toString());
-               this.clientSession.addMetaData(FEDERATION_UPSTREAM_NAME, upstream.getName().toString());
-               this.clientSession.start();
-               if (clientSessionCallback != null) {
-                  clientSessionCallback.callback(clientSession);
-               }
-               if (clientSession.queueQuery(key.getQueueName()).isExists()) {
-                  this.clientConsumer = clientSession.createConsumer(key.getQueueName(), key.getFilterString(), key.getPriority(), false);
-                  this.clientConsumer.setMessageHandler(this);
-               } else {
-                  throw new ActiveMQNonExistentQueueException("Queue " + key.getQueueName() + " does not exist on remote");
+   private synchronized void connect() throws Exception {
+      if (started) {
+         connectionAttemptTimestamp.set(System.currentTimeMillis());
+         try {
+            if (clientConsumer == null) {
+               synchronized (this) {
+                  this.clientSessionFactory = (ClientSessionFactoryInternal) upstream.getConnection().clientSessionFactory();
+                  this.clientSession = clientSessionFactory.createSession(upstream.getUser(), upstream.getPassword(), false, true, true, clientSessionFactory.getServerLocator().isPreAcknowledge(), clientSessionFactory.getServerLocator().getAckBatchSize());
+                  this.clientSession.addFailureListener(this);
+                  this.clientSession.addMetaData(FEDERATION_NAME, federation.getName().toString());
+                  this.clientSession.addMetaData(FEDERATION_UPSTREAM_NAME, upstream.getName().toString());
+                  this.clientSession.start();
+                  if (clientSessionCallback != null) {
+                     clientSessionCallback.callback(clientSession);
+                  }
+                  if (clientSession.queueQuery(key.getQueueName()).isExists()) {
+                     this.clientConsumer = clientSession.createConsumer(key.getQueueName(), key.getFilterString(), key.getPriority(), false);
+                     this.clientConsumer.setMessageHandler(this);
+                  } else {
+                     throw new ActiveMQNonExistentQueueException("Queue " + key.getQueueName() + " does not exist on remote");
+                  }
                }
             }
-         }
-      } catch (Exception e) {
-         try {
-            if (clientSessionFactory != null) {
-               clientSessionFactory.cleanup();
+         } catch (Exception e) {
+            try {
+               if (clientSessionFactory != null) {
+                  clientSessionFactory.cleanup();
+               }
+               disconnect();
+            } catch (ActiveMQException ignored) {
             }
-            disconnect();
-         } catch (ActiveMQException ignored) {
+            throw e;
          }
-         throw e;
       }
    }
 
    @Override
-   public void close() {
+   public synchronized void close() {
+      started = false;

Review Comment:
   Could also check that it was started originally, so that close fully noops rather than scheduling a task if there is nothing to do.



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueConsumerTest.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.federation;
+
+import org.apache.activemq.artemis.core.config.FederationConfiguration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.federation.FederatedQueueConsumerImpl;
+import org.apache.activemq.artemis.core.server.federation.Federation;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.RandomUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Test;
+
+public class FederatedQueueConsumerTest extends ActiveMQTestBase {
+
+   @Test
+   public void testClose() throws Exception {
+      ActiveMQServer server = createServer(false, createDefaultInVMConfig());
+      server.start();
+      Federation federation = new Federation(server, new FederationConfiguration().setName(RandomUtil.randomString()));
+      federation.start();
+      FederatedQueueConsumerImpl consumer = new FederatedQueueConsumerImpl(federation, server, null, null, null, null);
+      consumer.start();
+      Wait.waitFor(() -> consumer.getConnectionAttemptTimestamp() > 0);

Review Comment:
   Its unclear this should wait up to 30 seconds at 100ms intervals. A lower timeout and interval would seem appropriate for what it is checking in this case.



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueConsumerTest.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.federation;
+
+import org.apache.activemq.artemis.core.config.FederationConfiguration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.federation.FederatedQueueConsumerImpl;
+import org.apache.activemq.artemis.core.server.federation.Federation;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.RandomUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Test;
+
+public class FederatedQueueConsumerTest extends ActiveMQTestBase {
+
+   @Test
+   public void testClose() throws Exception {
+      ActiveMQServer server = createServer(false, createDefaultInVMConfig());
+      server.start();
+      Federation federation = new Federation(server, new FederationConfiguration().setName(RandomUtil.randomString()));
+      federation.start();
+      FederatedQueueConsumerImpl consumer = new FederatedQueueConsumerImpl(federation, server, null, null, null, null);
+      consumer.start();
+      Wait.waitFor(() -> consumer.getConnectionAttemptTimestamp() > 0);
+      consumer.close();
+      long closed = System.currentTimeMillis();
+      assertFalse(Wait.waitFor(() -> consumer.getConnectionAttemptTimestamp() > closed, 5000, 100));

Review Comment:
   Is it necessary to burn 5 seconds waiting for it not to be set to something else? Even with a couple of exceptions+retries after an initial connect failure it would only get to ~3sec total, though it seems likely the test would complete inside the first retry, so 1sec typically.
   
   Verifying somehow (inc/dec a counter?) that there become no outstanding connect tasks to run would seem like a way to ensure more quickly and reliably that its not going to continue making new attempts.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] jbertram commented on a diff in pull request #4485: ARTEMIS-4286 sometimes federated consumer won't stop

Posted by "jbertram (via GitHub)" <gi...@apache.org>.
jbertram commented on code in PR #4485:
URL: https://github.com/apache/activemq-artemis/pull/4485#discussion_r1200812876


##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueConsumerTest.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.federation;
+
+import org.apache.activemq.artemis.core.config.FederationConfiguration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.federation.FederatedQueueConsumerImpl;
+import org.apache.activemq.artemis.core.server.federation.Federation;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.RandomUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Test;
+
+public class FederatedQueueConsumerTest extends ActiveMQTestBase {
+
+   @Test
+   public void testClose() throws Exception {
+      ActiveMQServer server = createServer(false, createDefaultInVMConfig());
+      server.start();
+      Federation federation = new Federation(server, new FederationConfiguration().setName(RandomUtil.randomString()));
+      federation.start();
+      FederatedQueueConsumerImpl consumer = new FederatedQueueConsumerImpl(federation, server, null, null, null, null);
+      consumer.start();
+      Wait.waitFor(() -> consumer.getConnectionAttemptTimestamp() > 0);
+      consumer.close();
+      long closed = System.currentTimeMillis();
+      assertFalse(Wait.waitFor(() -> consumer.getConnectionAttemptTimestamp() > closed, 5000, 100));

Review Comment:
   I lowered the timeout and the interval. I couldn't think of a better way to make the check faster or more reliable. If you have any ideas I'm open.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] jbertram commented on a diff in pull request #4485: ARTEMIS-4286 sometimes federated consumer won't stop

Posted by "jbertram (via GitHub)" <gi...@apache.org>.
jbertram commented on code in PR #4485:
URL: https://github.com/apache/activemq-artemis/pull/4485#discussion_r1202764815


##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java:
##########
@@ -95,24 +103,31 @@ public int decrementCount() {
    }
 
    @Override
-   public void start() {
-      scheduleConnect(0);
+   public synchronized void start() {
+      if (!started) {
+         started = true;
+         scheduleConnect(0);
+      }
    }
 
    private void scheduleConnect(int delay) {
-      scheduledExecutorService.schedule(() -> {
+      currentConnectTask = scheduledExecutorService.schedule(() -> {
          try {
             connect();
          } catch (Exception e) {
-            scheduleConnect(FederatedQueueConsumer.getNextDelay(delay, intialConnectDelayMultiplier, intialConnectDelayMax));
+            int nextDelay = FederatedQueueConsumer.getNextDelay(delay, intialConnectDelayMultiplier, intialConnectDelayMax);
+            if (logger.isTraceEnabled()) {
+               logger.trace(this + " failed to connect. Scheduling reconnect in " + nextDelay + " seconds.", e);

Review Comment:
   That's very cool, if true. Out of curiosity, how do you know that? I looked in the JavaDoc but I didn't see any mention of that semantic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] brusdev commented on a diff in pull request #4485: ARTEMIS-4286 sometimes federated consumer won't stop

Posted by "brusdev (via GitHub)" <gi...@apache.org>.
brusdev commented on code in PR #4485:
URL: https://github.com/apache/activemq-artemis/pull/4485#discussion_r1200019774


##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java:
##########
@@ -144,6 +152,8 @@ private void connect() throws Exception {
 
    @Override
    public void close() {
+      started.set(false);

Review Comment:
   The connect could be already scheduled and not yet executed while a disconnect could be scheduled with zero delay. In this case, the disconnect would be executed before a connect.



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueConsumerTest.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.federation;
+
+import org.apache.activemq.artemis.core.config.FederationConfiguration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.federation.FederatedQueueConsumerImpl;
+import org.apache.activemq.artemis.core.server.federation.Federation;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.RandomUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Test;
+
+public class FederatedQueueConsumerTest extends ActiveMQTestBase {
+
+   @Test
+   public void testClose() throws Exception {
+      ActiveMQServer server = createServer(false, createDefaultInVMConfig());
+      server.start();
+      Federation federation = new Federation(server, new FederationConfiguration().setName(RandomUtil.randomString()));
+      federation.start();
+      FederatedQueueConsumerImpl consumer = new FederatedQueueConsumerImpl(federation, server, null, null, null, null);
+      consumer.start();
+      Wait.waitFor(() -> consumer.getConnectionAttemptTimestamp() > 0);
+      consumer.close();

Review Comment:
   Is this test checking a close while the consumer is trying to connect?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] jbertram commented on a diff in pull request #4485: ARTEMIS-4286 sometimes federated consumer won't stop

Posted by "jbertram (via GitHub)" <gi...@apache.org>.
jbertram commented on code in PR #4485:
URL: https://github.com/apache/activemq-artemis/pull/4485#discussion_r1202732696


##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java:
##########
@@ -95,24 +103,31 @@ public int decrementCount() {
    }
 
    @Override
-   public void start() {
-      scheduleConnect(0);
+   public synchronized void start() {
+      if (!started) {
+         started = true;
+         scheduleConnect(0);
+      }
    }
 
    private void scheduleConnect(int delay) {
-      scheduledExecutorService.schedule(() -> {
+      currentConnectTask = scheduledExecutorService.schedule(() -> {
          try {
             connect();
          } catch (Exception e) {
-            scheduleConnect(FederatedQueueConsumer.getNextDelay(delay, intialConnectDelayMultiplier, intialConnectDelayMax));
+            int nextDelay = FederatedQueueConsumer.getNextDelay(delay, intialConnectDelayMultiplier, intialConnectDelayMax);
+            if (logger.isTraceEnabled()) {
+               logger.trace(this + " failed to connect. Scheduling reconnect in " + nextDelay + " seconds.", e);

Review Comment:
   The problem with using place-holders is that there's no method which takes place-holders which also takes the cause and will log the whole stack-trace.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] gemmellr commented on a diff in pull request #4485: ARTEMIS-4286 sometimes federated consumer won't stop

Posted by "gemmellr (via GitHub)" <gi...@apache.org>.
gemmellr commented on code in PR #4485:
URL: https://github.com/apache/activemq-artemis/pull/4485#discussion_r1202581258


##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java:
##########
@@ -129,22 +144,26 @@ private void connect() throws Exception {
                   throw new ActiveMQNonExistentQueueException("Queue " + key.getQueueName() + " does not exist on remote");
                }
             }
-         }
-      } catch (Exception e) {
-         try {
-            if (clientSessionFactory != null) {
-               clientSessionFactory.cleanup();
+         } catch (Exception e) {
+            try {
+               if (clientSessionFactory != null) {
+                  clientSessionFactory.cleanup();
+               }
+               disconnect();

Review Comment:
   Can cleanup() throw? Should disconnect() perhaps be in a finally, in case?



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java:
##########
@@ -49,6 +55,8 @@ public class FederatedQueueConsumerImpl implements FederatedQueueConsumer, Sessi
    private final int intialConnectDelayMultiplier = 2;
    private final int intialConnectDelayMax = 30;
    private final ClientSessionCallback clientSessionCallback;
+   private boolean started = false;
+   private ScheduledFuture currentConnectTask;

Review Comment:
   its set/used from different threads so should probably be volatile 



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueConsumerTest.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.federation;
+
+import org.apache.activemq.artemis.core.config.FederationConfiguration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.federation.FederatedQueueConsumerImpl;
+import org.apache.activemq.artemis.core.server.federation.Federation;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.RandomUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Test;
+
+public class FederatedQueueConsumerTest extends ActiveMQTestBase {
+
+   @Test
+   public void testClose() throws Exception {
+      ActiveMQServer server = createServer(false, createDefaultInVMConfig());
+      server.start();
+      Federation federation = new Federation(server, new FederationConfiguration().setName(RandomUtil.randomString()));
+      federation.start();
+      FederatedQueueConsumerImpl consumer = new FederatedQueueConsumerImpl(federation, server, null, null, null, null);
+      consumer.start();
+      Wait.assertTrue(() -> consumer.getCurrentConnectTask() != null, 2000, 20);

Review Comment:
   As its being set before return from start(), could probably just verify it was null before, not-null after.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java:
##########
@@ -129,22 +144,26 @@ private void connect() throws Exception {
                   throw new ActiveMQNonExistentQueueException("Queue " + key.getQueueName() + " does not exist on remote");
                }
             }
-         }
-      } catch (Exception e) {
-         try {
-            if (clientSessionFactory != null) {
-               clientSessionFactory.cleanup();
+         } catch (Exception e) {
+            try {
+               if (clientSessionFactory != null) {
+                  clientSessionFactory.cleanup();
+               }
+               disconnect();
+            } catch (ActiveMQException ignored) {
             }
-            disconnect();
-         } catch (ActiveMQException ignored) {
+            throw e;
          }
-         throw e;
       }
    }
 
    @Override
-   public void close() {
-      scheduleDisconnect(0);
+   public synchronized void close() {
+      if (started) {
+         started = false;
+         currentConnectTask.cancel(true);

Review Comment:
   Not sure it should try to bother interrupt a running connect? Wondering if that could make it throw, and then handle it by scheduling another connect hehe. Since this goes on to schedule a disconnect anyway it doesnt seem like it would necessarily be gaining much to try interrupting?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] gemmellr commented on a diff in pull request #4485: ARTEMIS-4286 sometimes federated consumer won't stop

Posted by "gemmellr (via GitHub)" <gi...@apache.org>.
gemmellr commented on code in PR #4485:
URL: https://github.com/apache/activemq-artemis/pull/4485#discussion_r1202678262


##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueConsumerTest.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.federation;
+
+import org.apache.activemq.artemis.core.config.FederationConfiguration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.federation.FederatedQueueConsumerImpl;
+import org.apache.activemq.artemis.core.server.federation.Federation;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.RandomUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Test;
+
+public class FederatedQueueConsumerTest extends ActiveMQTestBase {
+
+   @Test
+   public void testClose() throws Exception {
+      ActiveMQServer server = createServer(false, createDefaultInVMConfig());
+      server.start();
+      Federation federation = new Federation(server, new FederationConfiguration().setName(RandomUtil.randomString()));
+      federation.start();
+      FederatedQueueConsumerImpl consumer = new FederatedQueueConsumerImpl(federation, server, null, null, null, null);
+      consumer.start();
+      Wait.assertTrue(() -> consumer.getCurrentConnectTask() != null, 2000, 20);
+      consumer.close();
+      Wait.assertTrue(() -> consumer.getCurrentConnectTask().isDone() || consumer.getCurrentConnectTask().isCancelled(), 2000, 50);

Review Comment:
   To cover the potential re-scheduling race, may be worth adding a check that calling consumer.getCurrentConnectTask() again returns the same future. So, its done and you get the same one, or its cancelled and you get the same one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] jbertram commented on a diff in pull request #4485: ARTEMIS-4286 sometimes federated consumer won't stop

Posted by "jbertram (via GitHub)" <gi...@apache.org>.
jbertram commented on code in PR #4485:
URL: https://github.com/apache/activemq-artemis/pull/4485#discussion_r1200811548


##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java:
##########
@@ -109,41 +113,46 @@ private void scheduleConnect(int delay) {
       }, delay, TimeUnit.SECONDS);
    }
 
-   private void connect() throws Exception {
-      try {
-         if (clientConsumer == null) {
-            synchronized (this) {
-               this.clientSessionFactory = (ClientSessionFactoryInternal) upstream.getConnection().clientSessionFactory();
-               this.clientSession = clientSessionFactory.createSession(upstream.getUser(), upstream.getPassword(), false, true, true, clientSessionFactory.getServerLocator().isPreAcknowledge(), clientSessionFactory.getServerLocator().getAckBatchSize());
-               this.clientSession.addFailureListener(this);
-               this.clientSession.addMetaData(FEDERATION_NAME, federation.getName().toString());
-               this.clientSession.addMetaData(FEDERATION_UPSTREAM_NAME, upstream.getName().toString());
-               this.clientSession.start();
-               if (clientSessionCallback != null) {
-                  clientSessionCallback.callback(clientSession);
-               }
-               if (clientSession.queueQuery(key.getQueueName()).isExists()) {
-                  this.clientConsumer = clientSession.createConsumer(key.getQueueName(), key.getFilterString(), key.getPriority(), false);
-                  this.clientConsumer.setMessageHandler(this);
-               } else {
-                  throw new ActiveMQNonExistentQueueException("Queue " + key.getQueueName() + " does not exist on remote");
+   private synchronized void connect() throws Exception {
+      if (started) {
+         connectionAttemptTimestamp.set(System.currentTimeMillis());
+         try {
+            if (clientConsumer == null) {
+               synchronized (this) {
+                  this.clientSessionFactory = (ClientSessionFactoryInternal) upstream.getConnection().clientSessionFactory();
+                  this.clientSession = clientSessionFactory.createSession(upstream.getUser(), upstream.getPassword(), false, true, true, clientSessionFactory.getServerLocator().isPreAcknowledge(), clientSessionFactory.getServerLocator().getAckBatchSize());
+                  this.clientSession.addFailureListener(this);
+                  this.clientSession.addMetaData(FEDERATION_NAME, federation.getName().toString());
+                  this.clientSession.addMetaData(FEDERATION_UPSTREAM_NAME, upstream.getName().toString());
+                  this.clientSession.start();
+                  if (clientSessionCallback != null) {
+                     clientSessionCallback.callback(clientSession);
+                  }
+                  if (clientSession.queueQuery(key.getQueueName()).isExists()) {
+                     this.clientConsumer = clientSession.createConsumer(key.getQueueName(), key.getFilterString(), key.getPriority(), false);
+                     this.clientConsumer.setMessageHandler(this);
+                  } else {
+                     throw new ActiveMQNonExistentQueueException("Queue " + key.getQueueName() + " does not exist on remote");
+                  }
                }
             }
-         }
-      } catch (Exception e) {
-         try {
-            if (clientSessionFactory != null) {
-               clientSessionFactory.cleanup();
+         } catch (Exception e) {
+            try {
+               if (clientSessionFactory != null) {
+                  clientSessionFactory.cleanup();
+               }
+               disconnect();
+            } catch (ActiveMQException ignored) {
             }
-            disconnect();
-         } catch (ActiveMQException ignored) {
+            throw e;
          }
-         throw e;
       }
    }
 
    @Override
-   public void close() {
+   public synchronized void close() {
+      started = false;

Review Comment:
   I added checks to both `start` & `close`.



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueConsumerTest.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.federation;
+
+import org.apache.activemq.artemis.core.config.FederationConfiguration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.federation.FederatedQueueConsumerImpl;
+import org.apache.activemq.artemis.core.server.federation.Federation;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.RandomUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Test;
+
+public class FederatedQueueConsumerTest extends ActiveMQTestBase {
+
+   @Test
+   public void testClose() throws Exception {
+      ActiveMQServer server = createServer(false, createDefaultInVMConfig());
+      server.start();
+      Federation federation = new Federation(server, new FederationConfiguration().setName(RandomUtil.randomString()));
+      federation.start();
+      FederatedQueueConsumerImpl consumer = new FederatedQueueConsumerImpl(federation, server, null, null, null, null);
+      consumer.start();
+      Wait.waitFor(() -> consumer.getConnectionAttemptTimestamp() > 0);

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] jbertram commented on a diff in pull request #4485: ARTEMIS-4286 sometimes federated consumer won't stop

Posted by "jbertram (via GitHub)" <gi...@apache.org>.
jbertram commented on code in PR #4485:
URL: https://github.com/apache/activemq-artemis/pull/4485#discussion_r1202750244


##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java:
##########
@@ -129,22 +144,26 @@ private void connect() throws Exception {
                   throw new ActiveMQNonExistentQueueException("Queue " + key.getQueueName() + " does not exist on remote");
                }
             }
-         }
-      } catch (Exception e) {
-         try {
-            if (clientSessionFactory != null) {
-               clientSessionFactory.cleanup();
+         } catch (Exception e) {
+            try {
+               if (clientSessionFactory != null) {
+                  clientSessionFactory.cleanup();
+               }
+               disconnect();

Review Comment:
   There's certainly no _checked_ exceptions. I don't see anything that would throw an unchecked exception either.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] gemmellr commented on a diff in pull request #4485: ARTEMIS-4286 sometimes federated consumer won't stop

Posted by "gemmellr (via GitHub)" <gi...@apache.org>.
gemmellr commented on code in PR #4485:
URL: https://github.com/apache/activemq-artemis/pull/4485#discussion_r1202778227


##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java:
##########
@@ -95,24 +103,31 @@ public int decrementCount() {
    }
 
    @Override
-   public void start() {
-      scheduleConnect(0);
+   public synchronized void start() {
+      if (!started) {
+         started = true;
+         scheduleConnect(0);
+      }
    }
 
    private void scheduleConnect(int delay) {
-      scheduledExecutorService.schedule(() -> {
+      currentConnectTask = scheduledExecutorService.schedule(() -> {
          try {
             connect();
          } catch (Exception e) {
-            scheduleConnect(FederatedQueueConsumer.getNextDelay(delay, intialConnectDelayMultiplier, intialConnectDelayMax));
+            int nextDelay = FederatedQueueConsumer.getNextDelay(delay, intialConnectDelayMultiplier, intialConnectDelayMax);
+            if (logger.isTraceEnabled()) {
+               logger.trace(this + " failed to connect. Scheduling reconnect in " + nextDelay + " seconds.", e);

Review Comment:
   Its worked that way for as long as I've used SLF4J really. https://www.slf4j.org/faq.html#paramException
   
   I'd agree the javadoc isnt clear on this, which I'd never noticed as...I knew it worked that way and so have never really needed to look :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] jbertram commented on a diff in pull request #4485: ARTEMIS-4286 sometimes federated consumer won't stop

Posted by "jbertram (via GitHub)" <gi...@apache.org>.
jbertram commented on code in PR #4485:
URL: https://github.com/apache/activemq-artemis/pull/4485#discussion_r1202789691


##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java:
##########
@@ -49,6 +55,8 @@ public class FederatedQueueConsumerImpl implements FederatedQueueConsumer, Sessi
    private final int intialConnectDelayMultiplier = 2;
    private final int intialConnectDelayMax = 30;
    private final ClientSessionCallback clientSessionCallback;
+   private boolean started = false;
+   private ScheduledFuture currentConnectTask;

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] jbertram commented on a diff in pull request #4485: ARTEMIS-4286 sometimes federated consumer won't stop

Posted by "jbertram (via GitHub)" <gi...@apache.org>.
jbertram commented on code in PR #4485:
URL: https://github.com/apache/activemq-artemis/pull/4485#discussion_r1202765509


##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java:
##########
@@ -129,22 +144,26 @@ private void connect() throws Exception {
                   throw new ActiveMQNonExistentQueueException("Queue " + key.getQueueName() + " does not exist on remote");
                }
             }
-         }
-      } catch (Exception e) {
-         try {
-            if (clientSessionFactory != null) {
-               clientSessionFactory.cleanup();
+         } catch (Exception e) {
+            try {
+               if (clientSessionFactory != null) {
+                  clientSessionFactory.cleanup();
+               }
+               disconnect();
+            } catch (ActiveMQException ignored) {
             }
-            disconnect();
-         } catch (ActiveMQException ignored) {
+            throw e;
          }
-         throw e;
       }
    }
 
    @Override
-   public void close() {
-      scheduleDisconnect(0);
+   public synchronized void close() {
+      if (started) {
+         started = false;
+         currentConnectTask.cancel(true);

Review Comment:
   Fair enough.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] gemmellr commented on a diff in pull request #4485: ARTEMIS-4286 sometimes federated consumer won't stop

Posted by "gemmellr (via GitHub)" <gi...@apache.org>.
gemmellr commented on code in PR #4485:
URL: https://github.com/apache/activemq-artemis/pull/4485#discussion_r1202398532


##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java:
##########
@@ -95,24 +103,31 @@ public int decrementCount() {
    }
 
    @Override
-   public void start() {
-      scheduleConnect(0);
+   public synchronized void start() {
+      if (!started) {
+         started = true;
+         scheduleConnect(0);
+      }
    }
 
    private void scheduleConnect(int delay) {
-      scheduledExecutorService.schedule(() -> {
+      currentConnectTask = scheduledExecutorService.schedule(() -> {
          try {
             connect();
          } catch (Exception e) {
-            scheduleConnect(FederatedQueueConsumer.getNextDelay(delay, intialConnectDelayMultiplier, intialConnectDelayMax));
+            int nextDelay = FederatedQueueConsumer.getNextDelay(delay, intialConnectDelayMultiplier, intialConnectDelayMax);
+            if (logger.isTraceEnabled()) {
+               logger.trace(this + " failed to connect. Scheduling reconnect in " + nextDelay + " seconds.", e);

Review Comment:
   Should use placeholders to be consistent with the rest of the codebase.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] jbertram commented on a diff in pull request #4485: ARTEMIS-4286 sometimes federated consumer won't stop

Posted by "jbertram (via GitHub)" <gi...@apache.org>.
jbertram commented on code in PR #4485:
URL: https://github.com/apache/activemq-artemis/pull/4485#discussion_r1202784015


##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueConsumerTest.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.federation;
+
+import org.apache.activemq.artemis.core.config.FederationConfiguration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.federation.FederatedQueueConsumerImpl;
+import org.apache.activemq.artemis.core.server.federation.Federation;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.RandomUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Test;
+
+public class FederatedQueueConsumerTest extends ActiveMQTestBase {
+
+   @Test
+   public void testClose() throws Exception {
+      ActiveMQServer server = createServer(false, createDefaultInVMConfig());
+      server.start();
+      Federation federation = new Federation(server, new FederationConfiguration().setName(RandomUtil.randomString()));
+      federation.start();
+      FederatedQueueConsumerImpl consumer = new FederatedQueueConsumerImpl(federation, server, null, null, null, null);
+      consumer.start();
+      Wait.assertTrue(() -> consumer.getCurrentConnectTask() != null, 2000, 20);
+      consumer.close();
+      Wait.assertTrue(() -> consumer.getCurrentConnectTask().isDone() || consumer.getCurrentConnectTask().isCancelled(), 2000, 50);

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] jbertram commented on a diff in pull request #4485: ARTEMIS-4286 sometimes federated consumer won't stop

Posted by "jbertram (via GitHub)" <gi...@apache.org>.
jbertram commented on code in PR #4485:
URL: https://github.com/apache/activemq-artemis/pull/4485#discussion_r1202787893


##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueConsumerTest.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.federation;
+
+import org.apache.activemq.artemis.core.config.FederationConfiguration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.federation.FederatedQueueConsumerImpl;
+import org.apache.activemq.artemis.core.server.federation.Federation;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.RandomUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Test;
+
+public class FederatedQueueConsumerTest extends ActiveMQTestBase {
+
+   @Test
+   public void testClose() throws Exception {
+      ActiveMQServer server = createServer(false, createDefaultInVMConfig());
+      server.start();
+      Federation federation = new Federation(server, new FederationConfiguration().setName(RandomUtil.randomString()));
+      federation.start();
+      FederatedQueueConsumerImpl consumer = new FederatedQueueConsumerImpl(federation, server, null, null, null, null);
+      consumer.start();
+      Wait.assertTrue(() -> consumer.getCurrentConnectTask() != null, 2000, 20);

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] gemmellr commented on a diff in pull request #4485: ARTEMIS-4286 sometimes federated consumer won't stop

Posted by "gemmellr (via GitHub)" <gi...@apache.org>.
gemmellr commented on code in PR #4485:
URL: https://github.com/apache/activemq-artemis/pull/4485#discussion_r1200731272


##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java:
##########
@@ -129,21 +134,23 @@ private void connect() throws Exception {
                   throw new ActiveMQNonExistentQueueException("Queue " + key.getQueueName() + " does not exist on remote");
                }
             }
-         }
-      } catch (Exception e) {
-         try {
-            if (clientSessionFactory != null) {
-               clientSessionFactory.cleanup();
+         } catch (Exception e) {
+            try {
+               if (clientSessionFactory != null) {
+                  clientSessionFactory.cleanup();
+               }
+               disconnect();
+            } catch (ActiveMQException ignored) {
             }
-            disconnect();
-         } catch (ActiveMQException ignored) {
+            throw e;

Review Comment:
   Not necessarily the spot to do it (perhaps in the task where it retries), but the easiest to comment in this PR. It seems like it might be good to have at least a trace log to indicate it has failed and is going to try connecting again.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] jbertram commented on a diff in pull request #4485: ARTEMIS-4286 sometimes federated consumer won't stop

Posted by "jbertram (via GitHub)" <gi...@apache.org>.
jbertram commented on code in PR #4485:
URL: https://github.com/apache/activemq-artemis/pull/4485#discussion_r1200563672


##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java:
##########
@@ -144,6 +152,8 @@ private void connect() throws Exception {
 
    @Override
    public void close() {
+      started.set(false);

Review Comment:
   Good point! I refactored things a bit to deal with this.



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueConsumerTest.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.federation;
+
+import org.apache.activemq.artemis.core.config.FederationConfiguration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.federation.FederatedQueueConsumerImpl;
+import org.apache.activemq.artemis.core.server.federation.Federation;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.RandomUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Test;
+
+public class FederatedQueueConsumerTest extends ActiveMQTestBase {
+
+   @Test
+   public void testClose() throws Exception {
+      ActiveMQServer server = createServer(false, createDefaultInVMConfig());
+      server.start();
+      Federation federation = new Federation(server, new FederationConfiguration().setName(RandomUtil.randomString()));
+      federation.start();
+      FederatedQueueConsumerImpl consumer = new FederatedQueueConsumerImpl(federation, server, null, null, null, null);
+      consumer.start();
+      Wait.waitFor(() -> consumer.getConnectionAttemptTimestamp() > 0);
+      consumer.close();

Review Comment:
   Yes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] jbertram commented on a diff in pull request #4485: ARTEMIS-4286 sometimes federated consumer won't stop

Posted by "jbertram (via GitHub)" <gi...@apache.org>.
jbertram commented on code in PR #4485:
URL: https://github.com/apache/activemq-artemis/pull/4485#discussion_r1200813211


##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java:
##########
@@ -129,21 +134,23 @@ private void connect() throws Exception {
                   throw new ActiveMQNonExistentQueueException("Queue " + key.getQueueName() + " does not exist on remote");
                }
             }
-         }
-      } catch (Exception e) {
-         try {
-            if (clientSessionFactory != null) {
-               clientSessionFactory.cleanup();
+         } catch (Exception e) {
+            try {
+               if (clientSessionFactory != null) {
+                  clientSessionFactory.cleanup();
+               }
+               disconnect();
+            } catch (ActiveMQException ignored) {
             }
-            disconnect();
-         } catch (ActiveMQException ignored) {
+            throw e;

Review Comment:
   Logging added in `scheduleConnect`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] gemmellr commented on a diff in pull request #4485: ARTEMIS-4286 sometimes federated consumer won't stop

Posted by "gemmellr (via GitHub)" <gi...@apache.org>.
gemmellr commented on code in PR #4485:
URL: https://github.com/apache/activemq-artemis/pull/4485#discussion_r1201934685


##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueConsumerTest.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.federation;
+
+import org.apache.activemq.artemis.core.config.FederationConfiguration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.federation.FederatedQueueConsumerImpl;
+import org.apache.activemq.artemis.core.server.federation.Federation;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.RandomUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Test;
+
+public class FederatedQueueConsumerTest extends ActiveMQTestBase {
+
+   @Test
+   public void testClose() throws Exception {
+      ActiveMQServer server = createServer(false, createDefaultInVMConfig());
+      server.start();
+      Federation federation = new Federation(server, new FederationConfiguration().setName(RandomUtil.randomString()));
+      federation.start();
+      FederatedQueueConsumerImpl consumer = new FederatedQueueConsumerImpl(federation, server, null, null, null, null);
+      consumer.start();
+      Wait.waitFor(() -> consumer.getConnectionAttemptTimestamp() > 0);
+      consumer.close();
+      long closed = System.currentTimeMillis();
+      assertFalse(Wait.waitFor(() -> consumer.getConnectionAttemptTimestamp() > closed, 5000, 100));

Review Comment:
   My basic suggestion yesterday was just inc/dec a counter, e.g instead of setting/resetting a timestamp as you did, keep a simple count of outstanding connect tasks. If there are none outstanding there will be no connect without calling start. Since the initial scheduling is all synchronized now, and the connect task reschedules itself on failure, its either going to be 0 (nothing scheduled, no connect will happen), or 1 (connect scheduled, or task still actually in progress), or 2 (a task in progress and failing, plus 1 retry it has just scheduled). You could then just wait for it to be 0 again.
   
   A better alternative would probably be keeping the currently-thrown-away ScheduledFuture for the latest scheduled connect task. You could then tell whether there is another attempt scheduled. You could also use it in close() to cancel any future connect attempt that hasnt been started yet, rather than just relying on it to no-op potentially much later. This could also make the test even faster than the ~1sec typical of the first retry period, since then it wouldnt need to wait for the retry to happen, it could then tell it wont happen.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] jbertram commented on a diff in pull request #4485: ARTEMIS-4286 sometimes federated consumer won't stop

Posted by "jbertram (via GitHub)" <gi...@apache.org>.
jbertram commented on code in PR #4485:
URL: https://github.com/apache/activemq-artemis/pull/4485#discussion_r1202364556


##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueConsumerTest.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.federation;
+
+import org.apache.activemq.artemis.core.config.FederationConfiguration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.federation.FederatedQueueConsumerImpl;
+import org.apache.activemq.artemis.core.server.federation.Federation;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.RandomUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Test;
+
+public class FederatedQueueConsumerTest extends ActiveMQTestBase {
+
+   @Test
+   public void testClose() throws Exception {
+      ActiveMQServer server = createServer(false, createDefaultInVMConfig());
+      server.start();
+      Federation federation = new Federation(server, new FederationConfiguration().setName(RandomUtil.randomString()));
+      federation.start();
+      FederatedQueueConsumerImpl consumer = new FederatedQueueConsumerImpl(federation, server, null, null, null, null);
+      consumer.start();
+      Wait.waitFor(() -> consumer.getConnectionAttemptTimestamp() > 0);
+      consumer.close();
+      long closed = System.currentTimeMillis();
+      assertFalse(Wait.waitFor(() -> consumer.getConnectionAttemptTimestamp() > closed, 5000, 100));

Review Comment:
   I just did a `push -f`. Let me know what you think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] gemmellr commented on a diff in pull request #4485: ARTEMIS-4286 sometimes federated consumer won't stop

Posted by "gemmellr (via GitHub)" <gi...@apache.org>.
gemmellr commented on code in PR #4485:
URL: https://github.com/apache/activemq-artemis/pull/4485#discussion_r1202782118


##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java:
##########
@@ -95,24 +103,31 @@ public int decrementCount() {
    }
 
    @Override
-   public void start() {
-      scheduleConnect(0);
+   public synchronized void start() {
+      if (!started) {
+         started = true;
+         scheduleConnect(0);
+      }
    }
 
    private void scheduleConnect(int delay) {
-      scheduledExecutorService.schedule(() -> {
+      currentConnectTask = scheduledExecutorService.schedule(() -> {
          try {
             connect();
          } catch (Exception e) {
-            scheduleConnect(FederatedQueueConsumer.getNextDelay(delay, intialConnectDelayMultiplier, intialConnectDelayMax));
+            int nextDelay = FederatedQueueConsumer.getNextDelay(delay, intialConnectDelayMultiplier, intialConnectDelayMax);
+            if (logger.isTraceEnabled()) {
+               logger.trace(this + " failed to connect. Scheduling reconnect in " + nextDelay + " seconds.", e);

Review Comment:
   That is linked from the main Logger doc also: https://www.slf4j.org/apidocs/org/slf4j/Logger.html



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] gemmellr commented on a diff in pull request #4485: ARTEMIS-4286 sometimes federated consumer won't stop

Posted by "gemmellr (via GitHub)" <gi...@apache.org>.
gemmellr commented on code in PR #4485:
URL: https://github.com/apache/activemq-artemis/pull/4485#discussion_r1203935504


##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java:
##########
@@ -49,6 +55,8 @@ public class FederatedQueueConsumerImpl implements FederatedQueueConsumer, Sessi
    private final int intialConnectDelayMultiplier = 2;
    private final int intialConnectDelayMax = 30;
    private final ClientSessionCallback clientSessionCallback;
+   private volatile boolean started = false;

Review Comment:
   The use of _started_ all looks synchronized, if so it doesnt need to be volatile (the earlier comment was only about _currentConnectTask_ which isnt all synced)



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueConsumerTest.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.federation;
+
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.activemq.artemis.core.config.FederationConfiguration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.federation.FederatedQueueConsumerImpl;
+import org.apache.activemq.artemis.core.server.federation.Federation;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.RandomUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Test;
+
+public class FederatedQueueConsumerTest extends ActiveMQTestBase {
+
+   @Test
+   public void testClose() throws Exception {
+      ActiveMQServer server = createServer(false, createDefaultInVMConfig());
+      server.start();
+      Federation federation = new Federation(server, new FederationConfiguration().setName(RandomUtil.randomString()));
+      federation.start();
+      FederatedQueueConsumerImpl consumer = new FederatedQueueConsumerImpl(federation, server, null, null, null, null);
+      assertNull(consumer.getCurrentConnectTask());
+      consumer.start();
+      assertNotNull(consumer.getCurrentConnectTask());
+      consumer.close();
+      AtomicReference<ScheduledFuture> task = new AtomicReference<>();
+      Wait.assertTrue(() -> {
+         task.set(consumer.getCurrentConnectTask());
+         return task.get().isDone() || task.get().isCancelled();
+      }, 2000, 50);
+      assertTrue(task.get() == consumer.getCurrentConnectTask());

Review Comment:
   The check needs to be inside the Wait condition, to check the finished task is still the current one, i.e there will be no more rather than that complete task having scheduled another after the future was retrieved. (It could still assert on it after the wait too though).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org