You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/08/26 03:56:44 UTC

[activemq-artemis] branch master updated: ARTEMIS-2392 Enable remove on cancel policy for scheduled pool

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new ee31a92  ARTEMIS-2392 Enable remove on cancel policy for scheduled pool
     new d8ed425  This closes #2726
ee31a92 is described below

commit ee31a92d23b3d8bf07fafcf76a54a9b55fc02548
Author: brusdev <br...@gmail.com>
AuthorDate: Fri Jul 26 16:22:10 2019 +0200

    ARTEMIS-2392 Enable remove on cancel policy for scheduled pool
    
    By default, such a cancelled task is not automatically removed from the
    work queue until its delay elapses. It may cause unbounded retention of
    cancelled tasks. To avoid this, set remove on cancel policy to true.
---
 .../artemis/core/server/ActiveMQServerLogger.java  |  4 ++
 .../core/server/impl/ActiveMQServerImpl.java       | 10 +++-
 .../core/server/impl/ActiveMQServerImplTest.java   | 63 ++++++++++++++++++++++
 3 files changed, 76 insertions(+), 1 deletion(-)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index e37e2bf..8954839 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -2021,4 +2021,8 @@ public interface ActiveMQServerLogger extends BasicLogger {
    @Message(id = 224100, value = "Timed out waiting for large messages deletion with IDs {0}, might not be deleted if broker crashes atm",
       format = Message.Format.MESSAGE_FORMAT)
    void timedOutWaitingForLargeMessagesDeletion(List<Long> largeMessageIds);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 224101, value = "Apache ActiveMQ Artemis is using a scheduled pool without remove on cancel policy, so a cancelled task could be not automatically removed from the work queue, it may also cause unbounded retention of cancelled tasks.", format = Message.Format.MESSAGE_FORMAT)
+   void scheduledPoolWithNoRemoveOnCancelPolicy();
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index b5c7054..9b5c3cb 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -2707,10 +2707,18 @@ public class ActiveMQServerImpl implements ActiveMQServer {
                return new ActiveMQThreadFactory("ActiveMQ-scheduled-threads", false, ClientSessionFactoryImpl.class.getClassLoader());
             }
          });
-         scheduledPool = new ScheduledThreadPoolExecutor(configuration.getScheduledThreadPoolMaxSize(), tFactory);
+
+         ScheduledThreadPoolExecutor scheduledPoolExecutor = new ScheduledThreadPoolExecutor(configuration.getScheduledThreadPoolMaxSize(), tFactory);
+         scheduledPoolExecutor.setRemoveOnCancelPolicy(true);
+         scheduledPool = scheduledPoolExecutor;
       } else {
          this.scheduledPoolSupplied = true;
          this.scheduledPool = serviceRegistry.getScheduledExecutorService();
+
+         if (!(scheduledPool instanceof ScheduledThreadPoolExecutor) ||
+            !((ScheduledThreadPoolExecutor)scheduledPool).getRemoveOnCancelPolicy()) {
+            ActiveMQServerLogger.LOGGER.scheduledPoolWithNoRemoveOnCancelPolicy();
+         }
       }
    }
 
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImplTest.java
new file mode 100644
index 0000000..e443478
--- /dev/null
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImplTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import java.lang.ref.WeakReference;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ActiveMQServerImplTest extends ActiveMQTestBase {
+
+   @Test
+   public void testScheduledPoolGC() throws Exception {
+      ActiveMQServer server = createServer(false);
+
+      server.start();
+
+      Runnable scheduledRunnable = new Runnable() {
+         @Override
+         public void run() {
+            Assert.fail();
+         }
+      };
+      WeakReference<Runnable> scheduledRunnableRef = new WeakReference<>(scheduledRunnable);
+
+      ScheduledExecutorService scheduledPool = server.getScheduledPool();
+      ScheduledFuture scheduledFuture = scheduledPool.schedule(scheduledRunnable, 5000, TimeUnit.MILLISECONDS);
+
+      Assert.assertFalse(scheduledFuture.isCancelled());
+      Assert.assertTrue(scheduledFuture.cancel(true));
+      Assert.assertTrue(scheduledFuture.isCancelled());
+
+      Assert.assertNotEquals(null, scheduledRunnableRef.get());
+
+      scheduledRunnable = null;
+
+      forceGC();
+
+      Assert.assertEquals(null, scheduledRunnableRef.get());
+
+      server.stop();
+   }
+
+}