You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/08/02 20:37:20 UTC

[activemq-artemis] branch master updated: ARTEMIS-2440 Connection.fail on sendBlock should be asynchronous

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new cd723aa  ARTEMIS-2440 Connection.fail on sendBlock should be asynchronous
     new dd176ee  This closes #2781
cd723aa is described below

commit cd723aa5282ff50e7e9b3a3a81b5b1b5476bb3a8
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Fri Aug 2 12:51:13 2019 -0400

    ARTEMIS-2440 Connection.fail on sendBlock should be asynchronous
    
    This is following up on ARTEMIS-2327.
---
 .../core/protocol/core/impl/ChannelImpl.java       |  2 +-
 .../core/protocol/AbstractRemotingConnection.java  | 19 +++++++++++++
 .../spi/core/protocol/RemotingConnection.java      |  7 +++++
 .../core/protocol/core/impl/ChannelImplTest.java   |  6 ++++
 .../artemis/core/protocol/mqtt/MQTTConnection.java | 18 ++++++++++++
 .../core/protocol/stomp/StompConnection.java       | 32 ++++++++++++++++++----
 .../impl/ManagementRemotingConnection.java         |  6 ++++
 ...ListenerForConnectionTimedOutExceptionTest.java |  2 ++
 8 files changed, 85 insertions(+), 7 deletions(-)

diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
index 9f36d81..d69b1e1 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
@@ -492,7 +492,7 @@ public final class ChannelImpl implements Channel {
 
             if (response == null) {
                ActiveMQException e = ActiveMQClientMessageBundle.BUNDLE.timedOutSendingPacket(connection.getBlockingCallTimeout(), packet.getType());
-               connection.fail(e);
+               connection.asyncFail(e);
                throw e;
             }
 
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java
index 960cadd..5cbcbbf 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java
@@ -20,6 +20,8 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -219,6 +221,23 @@ public abstract class AbstractRemotingConnection implements RemotingConnection {
    }
 
    @Override
+   public Future asyncFail(final ActiveMQException me) {
+
+      FutureTask<Void> task = new FutureTask(() -> {
+         fail(me);
+         return null;
+      });
+
+      if (executor == null) {
+         // only tests cases can do this
+         task.run();
+      } else {
+         executor.execute(task);
+      }
+      return task;
+   }
+
+   @Override
    public void bufferReceived(final Object connectionID, final ActiveMQBuffer buffer) {
       dataReceived = true;
    }
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java
index f7ed73a..41cd050 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.artemis.spi.core.protocol;
 
 import java.util.List;
+import java.util.concurrent.Future;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -132,6 +133,12 @@ public interface RemotingConnection extends BufferHandler {
     */
    void fail(ActiveMQException me);
 
+   /** Same thing as fail, but using an executor.
+    *  semantic of send here, is asynchrounous.
+    * @param me
+    */
+   Future asyncFail(ActiveMQException me);
+
    /**
     * called when the underlying connection fails.
     *
diff --git a/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java b/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java
index e9181f8..4a4ca39 100644
--- a/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java
+++ b/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java
@@ -20,6 +20,7 @@ import javax.security.auth.Subject;
 import java.io.RandomAccessFile;
 import java.nio.channels.FileChannel;
 import java.util.List;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import io.netty.buffer.Unpooled;
@@ -318,6 +319,11 @@ public class ChannelImplTest {
       }
 
       @Override
+      public Future asyncFail(ActiveMQException me) {
+         return null;
+      }
+
+      @Override
       public void fail(ActiveMQException me, String scaleDownTargetNodeID) {
 
       }
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java
index 1f31692..9a607f8 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java
@@ -20,6 +20,8 @@ package org.apache.activemq.artemis.core.protocol.mqtt;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
@@ -167,6 +169,22 @@ public class MQTTConnection implements RemotingConnection {
    }
 
    @Override
+   public Future asyncFail(ActiveMQException me) {
+      FutureTask<Void> task = new FutureTask(() -> {
+         fail(me);
+         return null;
+      });
+
+
+      // I don't expect asyncFail happening on MQTT, in case of happens this is semantically correct
+      Thread t = new Thread(task);
+
+      t.start();
+
+      return task;
+   }
+
+   @Override
    public void destroy() {
       //TODO(mtaylor) ensure this properly destroys this connection.
       destroyed = true;
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
index 8c32281..60c2d56 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
@@ -25,6 +25,8 @@ import java.util.List;
 import java.util.Set;
 import java.util.StringTokenizer;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
 import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
@@ -110,7 +112,7 @@ public final class StompConnection implements RemotingConnection {
 
    private final ScheduledExecutorService scheduledExecutorService;
 
-   private final ExecutorFactory factory;
+   private final ExecutorFactory executorFactory;
 
    @Override
    public boolean isSupportReconnect() {
@@ -136,7 +138,7 @@ public final class StompConnection implements RemotingConnection {
             case ActiveMQStompException.INVALID_EOL_V10:
                if (version != null)
                   throw e;
-               frameHandler = new StompFrameHandlerV12(this, scheduledExecutorService, factory);
+               frameHandler = new StompFrameHandlerV12(this, scheduledExecutorService, executorFactory);
                buffer.resetReaderIndex();
                frame = decode(buffer);
                break;
@@ -164,16 +166,16 @@ public final class StompConnection implements RemotingConnection {
                    final Connection transportConnection,
                    final StompProtocolManager manager,
                    final ScheduledExecutorService scheduledExecutorService,
-                   final ExecutorFactory factory) {
+                   final ExecutorFactory executorFactory) {
       this.scheduledExecutorService = scheduledExecutorService;
 
-      this.factory = factory;
+      this.executorFactory = executorFactory;
 
       this.transportConnection = transportConnection;
 
       this.manager = manager;
 
-      this.frameHandler = new StompFrameHandlerV10(this, scheduledExecutorService, factory);
+      this.frameHandler = new StompFrameHandlerV10(this, scheduledExecutorService, executorFactory);
 
       this.creationTime = System.currentTimeMillis();
 
@@ -380,6 +382,24 @@ public final class StompConnection implements RemotingConnection {
    }
 
    @Override
+   public Future asyncFail(ActiveMQException me) {
+
+      FutureTask<Void> task = new FutureTask(() -> {
+         fail(me);
+         return null;
+      });
+
+      if (this.executorFactory == null) {
+         // only tests cases can do this
+         task.run();
+      } else {
+         executorFactory.getExecutor().execute(task);
+      }
+
+      return task;
+   }
+
+   @Override
    public void fail(final ActiveMQException me, String scaleDownTargetNodeID) {
       fail(me);
    }
@@ -528,7 +548,7 @@ public final class StompConnection implements RemotingConnection {
       }
 
       if (this.version != (StompVersions.V1_0)) {
-         VersionedStompFrameHandler newHandler = VersionedStompFrameHandler.getHandler(this, this.version, scheduledExecutorService, factory);
+         VersionedStompFrameHandler newHandler = VersionedStompFrameHandler.getHandler(this, this.version, scheduledExecutorService, executorFactory);
          newHandler.initDecoder(this.frameHandler);
          this.frameHandler = newHandler;
       }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ManagementRemotingConnection.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ManagementRemotingConnection.java
index 7e760c1..72c52d4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ManagementRemotingConnection.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ManagementRemotingConnection.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.management.impl;
 
 import javax.security.auth.Subject;
 import java.util.List;
+import java.util.concurrent.Future;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -111,6 +112,11 @@ public class ManagementRemotingConnection implements RemotingConnection {
    }
 
    @Override
+   public Future asyncFail(ActiveMQException me) {
+      return null;
+   }
+
+   @Override
    public void fail(ActiveMQException me, String scaleDownTargetNodeID) {
 
    }
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/connection/ExceptionListenerForConnectionTimedOutExceptionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/connection/ExceptionListenerForConnectionTimedOutExceptionTest.java
index ccfc42c..6579e0b 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/connection/ExceptionListenerForConnectionTimedOutExceptionTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/connection/ExceptionListenerForConnectionTimedOutExceptionTest.java
@@ -58,6 +58,7 @@ public class ExceptionListenerForConnectionTimedOutExceptionTest extends JMSTest
          ((ActiveMQConnectionFactory) cf).setOutgoingInterceptorList(OutBoundPacketCapture.class.getName());
          ((ActiveMQConnectionFactory) cf).setIncomingInterceptorList(SessAcknowledgeCauseResponseTimeout.class.getName());
          ((ActiveMQConnectionFactory) cf).setBlockOnAcknowledge(true);
+         ((ActiveMQConnectionFactory) cf).setCallTimeout(500);
 
          sendConnection = cf.createConnection();
 
@@ -108,6 +109,7 @@ public class ExceptionListenerForConnectionTimedOutExceptionTest extends JMSTest
       try {
          ((ActiveMQConnectionFactory) cf).setOutgoingInterceptorList(OutBoundPacketCapture.class.getName());
          ((ActiveMQConnectionFactory) cf).setIncomingInterceptorList(SessSendCauseResponseTimeout.class.getName());
+         ((ActiveMQConnectionFactory) cf).setCallTimeout(500);
 
          sendConnection = cf.createConnection();
          sendConnection.setExceptionListener(exceptionOnConnection::set);