You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/11/09 16:58:55 UTC

[3/7] activemq-artemis git commit: ARTEMIS-1495 Removing flushes from codebase

ARTEMIS-1495 Removing flushes from codebase

Instead of flushing we just need to make sure there are no more calls into
page executors as we stop the PageManager.

This will avoid any possible starvations or deadlocks here.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/2e6176a6
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/2e6176a6
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/2e6176a6

Branch: refs/heads/master
Commit: 2e6176a69333367c165ed463437d6302f3f8da9e
Parents: 8bf879f
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue Nov 7 14:52:19 2017 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Nov 9 11:58:36 2017 -0500

----------------------------------------------------------------------
 .../artemis/utils/actors/ArtemisExecutor.java   | 23 +++----
 .../artemis/utils/actors/ProcessorBase.java     | 68 +++++++++-----------
 .../cursor/impl/PageCursorProviderImpl.java     |  2 +-
 .../core/paging/impl/PagingStoreImpl.java       |  2 +-
 .../core/ServerSessionPacketHandler.java        | 14 ++--
 .../protocol/core/impl/CoreSessionCallback.java |  2 +-
 .../management/impl/ManagementServiceImpl.java  |  4 +-
 .../jms/consumer/JmsConsumerTest.java           | 11 ++--
 8 files changed, 58 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e6176a6/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java
index d3036ec..5e72ef2 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java
@@ -25,7 +25,7 @@ public interface ArtemisExecutor extends Executor {
 
    /**
     * Artemis is supposed to implement this properly, however in tests or tools
-    * this can be used as a fake, doing a sipmle delegate and using the default methods implemented here.
+    * this can be used as a fake, doing a simple delegate and using the default methods implemented here.
     * @param executor
     * @return
     */
@@ -38,11 +38,16 @@ public interface ArtemisExecutor extends Executor {
       };
    }
 
-   default boolean flush() {
-      return flush(30, TimeUnit.SECONDS);
+   /** It will wait the current execution (if there is one) to finish
+    *  but will not complete any further executions */
+   default void shutdownNow() {
    }
 
-   default boolean flush(long timeout, TimeUnit unit) {
+   /**
+    * This will verify if the executor is flushed with no wait (or very minimal wait if not the {@link org.apache.activemq.artemis.utils.actors.OrderedExecutor}
+    * @return
+    */
+   default boolean isFlushed() {
       CountDownLatch latch = new CountDownLatch(1);
       Runnable runnable = new Runnable() {
          @Override
@@ -52,18 +57,10 @@ public interface ArtemisExecutor extends Executor {
       };
       execute(runnable);
       try {
-         return latch.await(timeout, unit);
+         return latch.await(100, TimeUnit.MILLISECONDS);
       } catch (InterruptedException e) {
          return false;
       }
    }
 
-   /**
-    * This will verify if the executor is flushed with no wait (or very minimal wait if not the {@link org.apache.activemq.artemis.utils.actors.OrderedExecutor}
-    * @return
-    */
-   default boolean isFlushed() {
-      return flush(100, TimeUnit.MILLISECONDS);
-   }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e6176a6/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
index dbc0776..44b2916 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
@@ -20,7 +20,6 @@ package org.apache.activemq.artemis.utils.actors;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 public abstract class ProcessorBase<T> {
@@ -34,6 +33,9 @@ public abstract class ProcessorBase<T> {
 
    private final ExecutorTask task = new ExecutorTask();
 
+   private final Object startedGuard = new Object();
+   private volatile boolean started = true;
+
    // used by stateUpdater
    @SuppressWarnings("unused")
    private volatile int state = 0;
@@ -49,8 +51,18 @@ public abstract class ProcessorBase<T> {
             if (stateUpdater.compareAndSet(ProcessorBase.this, STATE_NOT_RUNNING, STATE_RUNNING)) {
                T task = tasks.poll();
                //while the queue is not empty we process in order
-               while (task != null) {
-                  doTask(task);
+
+               // All we care on started, is that a current task is not running as we call shutdown.
+               // for that reason this first run doesn't need to be under any lock
+               while (task != null && started) {
+
+                  // Synchronized here is just to guarantee that a current task is finished before
+                  // the started update can be taken as false
+                  synchronized (startedGuard) {
+                     if (started) {
+                        doTask(task);
+                     }
+                  }
                   task = tasks.poll();
                }
                //set state back to not running.
@@ -66,52 +78,32 @@ public abstract class ProcessorBase<T> {
       }
    }
 
+   /** It will wait the current execution (if there is one) to finish
+    *  but will not complete any further executions */
+   public void shutdownNow() {
+      synchronized (startedGuard) {
+         started = false;
+      }
+      tasks.clear();
+   }
+
    protected abstract void doTask(T task);
 
    public ProcessorBase(Executor parent) {
       this.delegate = parent;
    }
 
-   public final boolean flush() {
-      return flush(30, TimeUnit.SECONDS);
-   }
-
-   /**
-    * WARNING: This will only flush when all the activity is suspended.
-    *          don't expect success on this call if another thread keeps feeding the queue
-    *          this is only valid on situations where you are not feeding the queue,
-    *          like in shutdown and failover situations.
-    * */
-   public final boolean flush(long timeout, TimeUnit unit) {
-      if (stateUpdater.get(this) == STATE_NOT_RUNNING) {
-         // quick test, most of the time it will be empty anyways
-         return true;
-      }
-
-      long timeLimit = System.currentTimeMillis() + unit.toMillis(timeout);
-      try {
-         while (stateUpdater.get(this) == STATE_RUNNING && timeLimit > System.currentTimeMillis()) {
-
-            if (tasks.isEmpty()) {
-               return true;
-            }
-
-            Thread.sleep(10);
-         }
-      } catch (InterruptedException e) {
-         // ignored
-      }
-
-      return stateUpdater.get(this) == STATE_NOT_RUNNING;
-   }
-
    public final boolean isFlushed() {
       return stateUpdater.get(this) == STATE_NOT_RUNNING;
    }
 
    protected void task(T command) {
-      tasks.add(command);
-      startPoller();
+      // There is no need to verify the lock here.
+      // you can only turn of running once
+      if (started) {
+         tasks.add(command);
+         startPoller();
+      }
    }
 
    protected void startPoller() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e6176a6/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
index 2030d25..45b2c1d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
@@ -240,7 +240,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
          cursor.stop();
       }
 
-      waitForFuture();
+      executor.shutdownNow();
    }
 
    private void waitForFuture() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e6176a6/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index 6f85aa2..f1beb31 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -352,7 +352,7 @@ public class PagingStoreImpl implements PagingStore {
 
          running = false;
 
-         flushExecutors();
+         executor.shutdownNow();
 
          if (currentPage != null) {
             currentPage.close(false);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e6176a6/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index 75ef071..f78f43f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -220,7 +220,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
    public void connectionFailed(final ActiveMQException exception, boolean failedOver) {
       ActiveMQServerLogger.LOGGER.clientConnectionFailed(session.getName());
 
-      flushExecutor();
+      closeExecutors();
 
       try {
          session.close(true);
@@ -248,15 +248,13 @@ public class ServerSessionPacketHandler implements ChannelHandler {
       inHandler.set(null);
    }
 
-   public void flushExecutor() {
-      if (!inHandler()) {
-         packetActor.flush();
-         callExecutor.flush();
-      }
+   public void closeExecutors() {
+      packetActor.shutdownNow();
+      callExecutor.shutdownNow();
    }
 
    public void close() {
-      flushExecutor();
+      closeExecutors();
 
       channel.flushConfirmations();
 
@@ -895,8 +893,6 @@ public class ServerSessionPacketHandler implements ChannelHandler {
             remotingConnection.removeFailureListener((FailureListener) closeListener);
          }
       }
-
-      flushExecutor();
    }
 
    public int transferConnection(final CoreRemotingConnection newConnection, final int lastReceivedCommandID) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e6176a6/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
index 8168e6e..92b3768 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
@@ -67,7 +67,7 @@ public final class CoreSessionCallback implements SessionCallback {
       ServerSessionPacketHandler localHandler = handler;
       if (localHandler != null) {
          // We wait any pending tasks before we make this as closed
-         localHandler.flushExecutor();
+         localHandler.closeExecutors();
       }
       this.handler = null;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e6176a6/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
index e94e40b..81a8e84 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
@@ -257,7 +257,9 @@ public class ManagementServiceImpl implements ManagementService {
       ObjectName objectName = objectNameBuilder.getQueueObjectName(address, name, routingType);
       unregisterFromJMX(objectName);
       unregisterFromRegistry(ResourceNames.QUEUE + name);
-      messageCounterManager.unregisterMessageCounter(name.toString());
+      if (messageCounterManager != null) {
+         messageCounterManager.unregisterMessageCounter(name.toString());
+      }
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e6176a6/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/consumer/JmsConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/consumer/JmsConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/consumer/JmsConsumerTest.java
index 5cefbd0..93229e1 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/consumer/JmsConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/consumer/JmsConsumerTest.java
@@ -41,6 +41,7 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
 import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
 import org.apache.activemq.artemis.tests.util.JMSTestBase;
+import org.apache.activemq.artemis.tests.util.Wait;
 import org.apache.activemq.artemis.utils.ReusableLatch;
 import org.junit.Assert;
 import org.junit.Before;
@@ -303,8 +304,9 @@ public class JmsConsumerTest extends JMSTestBase {
       SimpleString queueName = new SimpleString(JmsConsumerTest.Q_NAME);
       conn.close();
 
-      Assert.assertEquals(0, ((Queue) server.getPostOffice().getBinding(queueName).getBindable()).getDeliveringCount());
-      Assert.assertEquals(0, getMessageCount((Queue) server.getPostOffice().getBinding(queueName).getBindable()));
+      Queue queue = server.locateQueue(queueName);
+      Wait.assertEquals(0, queue::getDeliveringCount);
+      Wait.assertEquals(0, queue::getMessageCount);
    }
 
    @Test
@@ -329,8 +331,9 @@ public class JmsConsumerTest extends JMSTestBase {
 
       // Messages should all have been acked since we set pre ack on the cf
       SimpleString queueName = new SimpleString(JmsConsumerTest.Q_NAME);
-      Assert.assertEquals(0, ((Queue) server.getPostOffice().getBinding(queueName).getBindable()).getDeliveringCount());
-      Assert.assertEquals(0, getMessageCount((Queue) server.getPostOffice().getBinding(queueName).getBindable()));
+      Queue queue = server.locateQueue(queueName);
+      Wait.assertEquals(0, queue::getDeliveringCount);
+      Wait.assertEquals(0, queue::getMessageCount);
    }
 
    @Test