You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/08/02 20:37:20 UTC
[activemq-artemis] branch master updated: ARTEMIS-2440
Connection.fail on sendBlock should be asynchronous
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new cd723aa ARTEMIS-2440 Connection.fail on sendBlock should be asynchronous
new dd176ee This closes #2781
cd723aa is described below
commit cd723aa5282ff50e7e9b3a3a81b5b1b5476bb3a8
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Fri Aug 2 12:51:13 2019 -0400
ARTEMIS-2440 Connection.fail on sendBlock should be asynchronous
This is following up on ARTEMIS-2327.
---
.../core/protocol/core/impl/ChannelImpl.java | 2 +-
.../core/protocol/AbstractRemotingConnection.java | 19 +++++++++++++
.../spi/core/protocol/RemotingConnection.java | 7 +++++
.../core/protocol/core/impl/ChannelImplTest.java | 6 ++++
.../artemis/core/protocol/mqtt/MQTTConnection.java | 18 ++++++++++++
.../core/protocol/stomp/StompConnection.java | 32 ++++++++++++++++++----
.../impl/ManagementRemotingConnection.java | 6 ++++
...ListenerForConnectionTimedOutExceptionTest.java | 2 ++
8 files changed, 85 insertions(+), 7 deletions(-)
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
index 9f36d81..d69b1e1 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
@@ -492,7 +492,7 @@ public final class ChannelImpl implements Channel {
if (response == null) {
ActiveMQException e = ActiveMQClientMessageBundle.BUNDLE.timedOutSendingPacket(connection.getBlockingCallTimeout(), packet.getType());
- connection.fail(e);
+ connection.asyncFail(e);
throw e;
}
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java
index 960cadd..5cbcbbf 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java
@@ -20,6 +20,8 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -219,6 +221,23 @@ public abstract class AbstractRemotingConnection implements RemotingConnection {
}
@Override
+ public Future asyncFail(final ActiveMQException me) {
+
+ FutureTask<Void> task = new FutureTask(() -> {
+ fail(me);
+ return null;
+ });
+
+ if (executor == null) {
+ // only tests cases can do this
+ task.run();
+ } else {
+ executor.execute(task);
+ }
+ return task;
+ }
+
+ @Override
public void bufferReceived(final Object connectionID, final ActiveMQBuffer buffer) {
dataReceived = true;
}
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java
index f7ed73a..41cd050 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java
@@ -17,6 +17,7 @@
package org.apache.activemq.artemis.spi.core.protocol;
import java.util.List;
+import java.util.concurrent.Future;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -132,6 +133,12 @@ public interface RemotingConnection extends BufferHandler {
*/
void fail(ActiveMQException me);
+ /** Same thing as fail, but using an executor.
+ * semantic of send here, is asynchrounous.
+ * @param me
+ */
+ Future asyncFail(ActiveMQException me);
+
/**
* called when the underlying connection fails.
*
diff --git a/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java b/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java
index e9181f8..4a4ca39 100644
--- a/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java
+++ b/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java
@@ -20,6 +20,7 @@ import javax.security.auth.Subject;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.util.List;
+import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import io.netty.buffer.Unpooled;
@@ -318,6 +319,11 @@ public class ChannelImplTest {
}
@Override
+ public Future asyncFail(ActiveMQException me) {
+ return null;
+ }
+
+ @Override
public void fail(ActiveMQException me, String scaleDownTargetNodeID) {
}
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java
index 1f31692..9a607f8 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java
@@ -20,6 +20,8 @@ package org.apache.activemq.artemis.core.protocol.mqtt;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
@@ -167,6 +169,22 @@ public class MQTTConnection implements RemotingConnection {
}
@Override
+ public Future asyncFail(ActiveMQException me) {
+ FutureTask<Void> task = new FutureTask(() -> {
+ fail(me);
+ return null;
+ });
+
+
+ // I don't expect asyncFail happening on MQTT, in case of happens this is semantically correct
+ Thread t = new Thread(task);
+
+ t.start();
+
+ return task;
+ }
+
+ @Override
public void destroy() {
//TODO(mtaylor) ensure this properly destroys this connection.
destroyed = true;
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
index 8c32281..60c2d56 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
@@ -25,6 +25,8 @@ import java.util.List;
import java.util.Set;
import java.util.StringTokenizer;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
@@ -110,7 +112,7 @@ public final class StompConnection implements RemotingConnection {
private final ScheduledExecutorService scheduledExecutorService;
- private final ExecutorFactory factory;
+ private final ExecutorFactory executorFactory;
@Override
public boolean isSupportReconnect() {
@@ -136,7 +138,7 @@ public final class StompConnection implements RemotingConnection {
case ActiveMQStompException.INVALID_EOL_V10:
if (version != null)
throw e;
- frameHandler = new StompFrameHandlerV12(this, scheduledExecutorService, factory);
+ frameHandler = new StompFrameHandlerV12(this, scheduledExecutorService, executorFactory);
buffer.resetReaderIndex();
frame = decode(buffer);
break;
@@ -164,16 +166,16 @@ public final class StompConnection implements RemotingConnection {
final Connection transportConnection,
final StompProtocolManager manager,
final ScheduledExecutorService scheduledExecutorService,
- final ExecutorFactory factory) {
+ final ExecutorFactory executorFactory) {
this.scheduledExecutorService = scheduledExecutorService;
- this.factory = factory;
+ this.executorFactory = executorFactory;
this.transportConnection = transportConnection;
this.manager = manager;
- this.frameHandler = new StompFrameHandlerV10(this, scheduledExecutorService, factory);
+ this.frameHandler = new StompFrameHandlerV10(this, scheduledExecutorService, executorFactory);
this.creationTime = System.currentTimeMillis();
@@ -380,6 +382,24 @@ public final class StompConnection implements RemotingConnection {
}
@Override
+ public Future asyncFail(ActiveMQException me) {
+
+ FutureTask<Void> task = new FutureTask(() -> {
+ fail(me);
+ return null;
+ });
+
+ if (this.executorFactory == null) {
+ // only tests cases can do this
+ task.run();
+ } else {
+ executorFactory.getExecutor().execute(task);
+ }
+
+ return task;
+ }
+
+ @Override
public void fail(final ActiveMQException me, String scaleDownTargetNodeID) {
fail(me);
}
@@ -528,7 +548,7 @@ public final class StompConnection implements RemotingConnection {
}
if (this.version != (StompVersions.V1_0)) {
- VersionedStompFrameHandler newHandler = VersionedStompFrameHandler.getHandler(this, this.version, scheduledExecutorService, factory);
+ VersionedStompFrameHandler newHandler = VersionedStompFrameHandler.getHandler(this, this.version, scheduledExecutorService, executorFactory);
newHandler.initDecoder(this.frameHandler);
this.frameHandler = newHandler;
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ManagementRemotingConnection.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ManagementRemotingConnection.java
index 7e760c1..72c52d4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ManagementRemotingConnection.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ManagementRemotingConnection.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.management.impl;
import javax.security.auth.Subject;
import java.util.List;
+import java.util.concurrent.Future;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -111,6 +112,11 @@ public class ManagementRemotingConnection implements RemotingConnection {
}
@Override
+ public Future asyncFail(ActiveMQException me) {
+ return null;
+ }
+
+ @Override
public void fail(ActiveMQException me, String scaleDownTargetNodeID) {
}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/connection/ExceptionListenerForConnectionTimedOutExceptionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/connection/ExceptionListenerForConnectionTimedOutExceptionTest.java
index ccfc42c..6579e0b 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/connection/ExceptionListenerForConnectionTimedOutExceptionTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/connection/ExceptionListenerForConnectionTimedOutExceptionTest.java
@@ -58,6 +58,7 @@ public class ExceptionListenerForConnectionTimedOutExceptionTest extends JMSTest
((ActiveMQConnectionFactory) cf).setOutgoingInterceptorList(OutBoundPacketCapture.class.getName());
((ActiveMQConnectionFactory) cf).setIncomingInterceptorList(SessAcknowledgeCauseResponseTimeout.class.getName());
((ActiveMQConnectionFactory) cf).setBlockOnAcknowledge(true);
+ ((ActiveMQConnectionFactory) cf).setCallTimeout(500);
sendConnection = cf.createConnection();
@@ -108,6 +109,7 @@ public class ExceptionListenerForConnectionTimedOutExceptionTest extends JMSTest
try {
((ActiveMQConnectionFactory) cf).setOutgoingInterceptorList(OutBoundPacketCapture.class.getName());
((ActiveMQConnectionFactory) cf).setIncomingInterceptorList(SessSendCauseResponseTimeout.class.getName());
+ ((ActiveMQConnectionFactory) cf).setCallTimeout(500);
sendConnection = cf.createConnection();
sendConnection.setExceptionListener(exceptionOnConnection::set);