You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@activemq.apache.org by "clebertsuconic (via GitHub)" <gi...@apache.org> on 2024/01/16 22:18:47 UTC

[PR] ARTEMIS-4569 Blocked Producers will hold runnables until messages are… [activemq-artemis]

clebertsuconic opened a new pull request, #4743:
URL: https://github.com/apache/activemq-artemis/pull/4743

   … consumed.
   
   When initially developed the expectation was that no more producers would keep connecting but in a scenario like this the consumers could actually give up and things will just accumulate on the server.
   
   We should cleanup these upon disconnect.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] ARTEMIS-4569 Blocked Producers will hold runnables until messages are… [activemq-artemis]

Posted by "tabish121 (via GitHub)" <gi...@apache.org>.
tabish121 commented on code in PR #4743:
URL: https://github.com/apache/activemq-artemis/pull/4743#discussion_r1456114078


##########
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/RunnableList.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.utils.runnables;
+
+import java.util.HashSet;
+import java.util.function.Consumer;
+
+public class RunnableList {
+
+   private final HashSet<AtomicRunnable> list = new HashSet<>();
+
+   public RunnableList() {
+   }
+
+   public synchronized void add(AtomicRunnable runnable) {
+      runnable.setAcceptedList(this);
+      list.add(runnable);
+   }
+
+   public int size() {
+      return list.size();
+   }
+
+   public synchronized void remove(AtomicRunnable runnable) {
+      list.remove(runnable);
+   }
+
+   public void afterRun(AtomicRunnable runnable) {

Review Comment:
   What is the purpose of this method as opposed to just calling the remove method directly as this method does, would seem to be more descriptive for the callers to invoke remove plus remove is directly declared as synchronized which seems the point of the atomic types being implemented.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] ARTEMIS-4569 Blocked Producers will hold runnables until messages are… [activemq-artemis]

Posted by "clebertsuconic (via GitHub)" <gi...@apache.org>.
clebertsuconic commented on code in PR #4743:
URL: https://github.com/apache/activemq-artemis/pull/4743#discussion_r1456409451


##########
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/RunnableList.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.utils.runnables;
+
+import java.util.HashSet;
+import java.util.function.Consumer;
+
+public class RunnableList {
+
+   private final HashSet<AtomicRunnable> list = new HashSet<>();
+
+   public RunnableList() {
+   }
+
+   public synchronized void add(AtomicRunnable runnable) {
+      runnable.setAcceptedList(this);
+      list.add(runnable);
+   }
+
+   public int size() {
+      return list.size();
+   }
+
+   public synchronized void remove(AtomicRunnable runnable) {
+      list.remove(runnable);
+   }
+
+   public synchronized void cancel() {
+      list.forEach(this::cancel);
+      list.clear();
+   }
+
+   private void cancel(AtomicRunnable atomicRunnable) {
+      atomicRunnable.cancel();
+   }
+
+   public void forEach(Consumer<AtomicRunnable> runnable) {

Review Comment:
   my own review to myself here: wrong name for this variable.. runnable.. consumerRunnable perhaps.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] ARTEMIS-4569 Blocked Producers will hold runnables until messages are… [activemq-artemis]

Posted by "clebertsuconic (via GitHub)" <gi...@apache.org>.
clebertsuconic commented on code in PR #4743:
URL: https://github.com/apache/activemq-artemis/pull/4743#discussion_r1456415601


##########
tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/ProducerBlockedLeakTest.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.leak;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import io.github.checkleak.core.CheckLeak;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.SpawnedVMSupport;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ProducerBlockedLeakTest extends ActiveMQTestBase {
+
+   private static final int OK = 100;
+
+   private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+   private static final String QUEUE_NAME = "TEST_BLOCKED_QUEUE";
+
+   ActiveMQServer server;
+
+   @BeforeClass
+   public static void beforeClass() throws Exception {
+      Assume.assumeTrue(CheckLeak.isLoaded());
+   }
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+      server = createServer(true, createDefaultConfig(1, true));
+      server.getConfiguration().getAddressSettings().clear();
+      server.getConfiguration().getAddressSettings().put("#", new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK).setMaxSizeMessages(10));
+      server.start();
+   }
+
+   @Test
+   public void testOPENWIRE() throws Exception {
+      testBlocked("OPENWIRE");
+   }
+
+   @Test
+   public void testCORE() throws Exception {
+      testBlocked("CORE");
+   }
+
+   @Test
+   public void testAMQP() throws Exception {
+      testBlocked("AMQP");
+   }
+
+   private void testBlocked(String protocol) throws Exception {
+      testBody(protocol);
+      MemoryAssertions.basicMemoryAsserts(false);
+      Queue queue = server.locateQueue(QUEUE_NAME);
+      queue.deleteAllReferences();
+      MemoryAssertions.basicMemoryAsserts(true);
+      server.stop();
+   }
+
+   // separating the test into a sub-method just to allow removing local references
+   // so they would be gone when basicMemoryAsserts is called
+   private void testBody(String protocol) throws Exception {
+      try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
+         AtomicInteger messagesSent = new AtomicInteger(0);
+
+         server.addAddressInfo(new AddressInfo(QUEUE_NAME).addRoutingType(RoutingType.ANYCAST));
+         server.createQueue(new QueueConfiguration(QUEUE_NAME).setAddress(QUEUE_NAME).setRoutingType(RoutingType.ANYCAST).setDurable(true));
+
+         // clients need to be disconnected while blocked. For that reason a new VM is being spawned
+         Process process = SpawnedVMSupport.spawnVM(ProducerBlockedLeakTest.class.getName(), protocol, "10");
+
+         Wait.assertTrue(() -> loggerHandler.findText("AMQ222183"), 5000, 10); //unblock
+
+         process.destroyForcibly();
+         Assert.assertTrue(process.waitFor(10, TimeUnit.SECONDS));

Review Comment:
   Wrong comment here. It's actually blocked...



##########
tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/ProducerBlockedLeakTest.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.leak;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import io.github.checkleak.core.CheckLeak;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.SpawnedVMSupport;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ProducerBlockedLeakTest extends ActiveMQTestBase {
+
+   private static final int OK = 100;
+
+   private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+   private static final String QUEUE_NAME = "TEST_BLOCKED_QUEUE";
+
+   ActiveMQServer server;
+
+   @BeforeClass
+   public static void beforeClass() throws Exception {
+      Assume.assumeTrue(CheckLeak.isLoaded());
+   }
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+      server = createServer(true, createDefaultConfig(1, true));
+      server.getConfiguration().getAddressSettings().clear();
+      server.getConfiguration().getAddressSettings().put("#", new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK).setMaxSizeMessages(10));
+      server.start();
+   }
+
+   @Test
+   public void testOPENWIRE() throws Exception {
+      testBlocked("OPENWIRE");
+   }
+
+   @Test
+   public void testCORE() throws Exception {
+      testBlocked("CORE");
+   }
+
+   @Test
+   public void testAMQP() throws Exception {
+      testBlocked("AMQP");
+   }
+
+   private void testBlocked(String protocol) throws Exception {
+      testBody(protocol);
+      MemoryAssertions.basicMemoryAsserts(false);
+      Queue queue = server.locateQueue(QUEUE_NAME);
+      queue.deleteAllReferences();
+      MemoryAssertions.basicMemoryAsserts(true);
+      server.stop();
+   }
+
+   // separating the test into a sub-method just to allow removing local references
+   // so they would be gone when basicMemoryAsserts is called
+   private void testBody(String protocol) throws Exception {
+      try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
+         AtomicInteger messagesSent = new AtomicInteger(0);
+
+         server.addAddressInfo(new AddressInfo(QUEUE_NAME).addRoutingType(RoutingType.ANYCAST));
+         server.createQueue(new QueueConfiguration(QUEUE_NAME).setAddress(QUEUE_NAME).setRoutingType(RoutingType.ANYCAST).setDurable(true));
+
+         // clients need to be disconnected while blocked. For that reason a new VM is being spawned
+         Process process = SpawnedVMSupport.spawnVM(ProducerBlockedLeakTest.class.getName(), protocol, "10");
+
+         Wait.assertTrue(() -> loggerHandler.findText("AMQ222183"), 5000, 10); //unblock

Review Comment:
   wrong comment here.. it's blocked



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] ARTEMIS-4569 Blocked Producers will hold runnables until messages are… [activemq-artemis]

Posted by "clebertsuconic (via GitHub)" <gi...@apache.org>.
clebertsuconic commented on code in PR #4743:
URL: https://github.com/apache/activemq-artemis/pull/4743#discussion_r1456406319


##########
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/RunnableList.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.utils.runnables;
+
+import java.util.HashSet;
+import java.util.function.Consumer;
+
+public class RunnableList {
+
+   private final HashSet<AtomicRunnable> list = new HashSet<>();
+
+   public RunnableList() {
+   }
+
+   public synchronized void add(AtomicRunnable runnable) {
+      runnable.setAcceptedList(this);
+      list.add(runnable);
+   }
+
+   public int size() {
+      return list.size();
+   }
+
+   public synchronized void remove(AtomicRunnable runnable) {
+      list.remove(runnable);
+   }
+
+   public void afterRun(AtomicRunnable runnable) {

Review Comment:
   @tabish121 I just thought it would be more descriptive to a dedicated operation to be called after the execution.. Future extension points.. etc.
   
   
   I will change it to call remove directly though. I will mark this as a resolve when I push the commit.
   
   
   Thanks a lot



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] ARTEMIS-4569 Blocked Producers will hold runnables until messages are… [activemq-artemis]

Posted by "clebertsuconic (via GitHub)" <gi...@apache.org>.
clebertsuconic merged PR #4743:
URL: https://github.com/apache/activemq-artemis/pull/4743


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] ARTEMIS-4569 Blocked Producers will hold runnables until messages are… [activemq-artemis]

Posted by "clebertsuconic (via GitHub)" <gi...@apache.org>.
clebertsuconic commented on code in PR #4743:
URL: https://github.com/apache/activemq-artemis/pull/4743#discussion_r1456415601


##########
tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/ProducerBlockedLeakTest.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.leak;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import io.github.checkleak.core.CheckLeak;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.SpawnedVMSupport;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ProducerBlockedLeakTest extends ActiveMQTestBase {
+
+   private static final int OK = 100;
+
+   private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+   private static final String QUEUE_NAME = "TEST_BLOCKED_QUEUE";
+
+   ActiveMQServer server;
+
+   @BeforeClass
+   public static void beforeClass() throws Exception {
+      Assume.assumeTrue(CheckLeak.isLoaded());
+   }
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+      server = createServer(true, createDefaultConfig(1, true));
+      server.getConfiguration().getAddressSettings().clear();
+      server.getConfiguration().getAddressSettings().put("#", new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK).setMaxSizeMessages(10));
+      server.start();
+   }
+
+   @Test
+   public void testOPENWIRE() throws Exception {
+      testBlocked("OPENWIRE");
+   }
+
+   @Test
+   public void testCORE() throws Exception {
+      testBlocked("CORE");
+   }
+
+   @Test
+   public void testAMQP() throws Exception {
+      testBlocked("AMQP");
+   }
+
+   private void testBlocked(String protocol) throws Exception {
+      testBody(protocol);
+      MemoryAssertions.basicMemoryAsserts(false);
+      Queue queue = server.locateQueue(QUEUE_NAME);
+      queue.deleteAllReferences();
+      MemoryAssertions.basicMemoryAsserts(true);
+      server.stop();
+   }
+
+   // separating the test into a sub-method just to allow removing local references
+   // so they would be gone when basicMemoryAsserts is called
+   private void testBody(String protocol) throws Exception {
+      try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
+         AtomicInteger messagesSent = new AtomicInteger(0);
+
+         server.addAddressInfo(new AddressInfo(QUEUE_NAME).addRoutingType(RoutingType.ANYCAST));
+         server.createQueue(new QueueConfiguration(QUEUE_NAME).setAddress(QUEUE_NAME).setRoutingType(RoutingType.ANYCAST).setDurable(true));
+
+         // clients need to be disconnected while blocked. For that reason a new VM is being spawned
+         Process process = SpawnedVMSupport.spawnVM(ProducerBlockedLeakTest.class.getName(), protocol, "10");
+
+         Wait.assertTrue(() -> loggerHandler.findText("AMQ222183"), 5000, 10); //unblock
+
+         process.destroyForcibly();
+         Assert.assertTrue(process.waitFor(10, TimeUnit.SECONDS));

Review Comment:
   Wrong comment here. It's actually blocked...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org