You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/01/26 05:00:52 UTC
[1/2] activemq-artemis git commit: ARTEMIS-934 Stomp Heart beat not
being stopped in some cases
Repository: activemq-artemis
Updated Branches:
refs/heads/master 98f6fa760 -> fb4bc063f
ARTEMIS-934 Stomp Heart beat not being stopped in some cases
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/f79b21e8
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/f79b21e8
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/f79b21e8
Branch: refs/heads/master
Commit: f79b21e866539ca196eea67adc700c424f61fbfc
Parents: 98f6fa7
Author: Clebert Suconic <cl...@apache.org>
Authored: Wed Jan 25 10:12:06 2017 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Jan 25 12:17:47 2017 -0500
----------------------------------------------------------------------
.../core/protocol/stomp/StompConnection.java | 9 ++++
.../stomp/VersionedStompFrameHandler.java | 3 ++
.../stomp/v11/StompFrameHandlerV11.java | 15 ++++++-
.../util/AbstractStompClientConnection.java | 8 +++-
.../integration/stomp/v11/StompV11Test.java | 47 ++++++++++++++++++++
5 files changed, 79 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f79b21e8/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
index 0eb81b9..f72a73e 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
@@ -110,6 +110,10 @@ public final class StompConnection implements RemotingConnection {
return false;
}
+ public VersionedStompFrameHandler getStompVersionHandler() {
+ return frameHandler;
+ }
+
public StompFrame decode(ActiveMQBuffer buffer) throws ActiveMQStompException {
StompFrame frame = null;
try {
@@ -343,6 +347,11 @@ public final class StompConnection implements RemotingConnection {
}
ActiveMQServerLogger.LOGGER.connectionFailureDetected(me.getMessage(), me.getType());
+
+ if (frameHandler != null) {
+ frameHandler.disconnect();
+ }
+
// Then call the listeners
callFailureListeners(me);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f79b21e8/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
index 02facd6..673c86e 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
@@ -43,6 +43,9 @@ public abstract class VersionedStompFrameHandler {
protected final ScheduledExecutorService scheduledExecutorService;
protected final ExecutorFactory executorFactory;
+ protected void disconnect() {
+ }
+
public static VersionedStompFrameHandler getHandler(StompConnection connection,
StompVersions version,
ScheduledExecutorService scheduledExecutorService,
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f79b21e8/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java
index 867cdd8..c6831cd 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java
@@ -57,6 +57,10 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements
decoder.init();
}
+ public ActiveMQScheduledComponent getHeartBeater() {
+ return heartBeater;
+ }
+
@Override
public StompFrame onConnect(StompFrame frame) {
StompFrame response = null;
@@ -131,15 +135,22 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements
//client receive ping
long minAcceptInterval = Long.valueOf(params[1]);
- heartBeater = new HeartBeater(scheduledExecutorService, executorFactory.getExecutor(), minPingInterval, minAcceptInterval);
+ if (heartBeater == null) {
+ heartBeater = new HeartBeater(scheduledExecutorService, executorFactory.getExecutor(), minPingInterval, minAcceptInterval);
+ }
}
@Override
public StompFrame onDisconnect(StompFrame frame) {
+ disconnect();
+ return null;
+ }
+
+ @Override
+ protected void disconnect() {
if (this.heartBeater != null) {
heartBeater.shutdown();
}
- return null;
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f79b21e8/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractStompClientConnection.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractStompClientConnection.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractStompClientConnection.java
index d8a487e..fa1ec73 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractStompClientConnection.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractStompClientConnection.java
@@ -45,6 +45,7 @@ public abstract class AbstractStompClientConnection implements StompClientConnec
protected BlockingQueue<ClientStompFrame> frameQueue = new LinkedBlockingQueue<>();
protected boolean connected = false;
protected int serverPingCounter;
+ protected ReaderThread readerThread;
public AbstractStompClientConnection(String version, String host, int port) throws IOException {
this.version = version;
@@ -67,7 +68,12 @@ public abstract class AbstractStompClientConnection implements StompClientConnec
readBuffer = ByteBuffer.allocateDirect(10240);
receiveList = new ArrayList<>(10240);
- new ReaderThread().start();
+ readerThread = new ReaderThread();
+ readerThread.start();
+ }
+
+ public void killReaderThread() {
+ readerThread.stop();
}
private ClientStompFrame sendFrameInternal(ClientStompFrame frame, boolean wicked) throws IOException, InterruptedException {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f79b21e8/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java
index 01f1cf8..eb055f1 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java
@@ -31,13 +31,17 @@ import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
+import org.apache.activemq.artemis.core.protocol.stomp.StompConnection;
+import org.apache.activemq.artemis.core.protocol.stomp.v11.StompFrameHandlerV11;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.integration.stomp.StompTestBase;
+import org.apache.activemq.artemis.tests.integration.stomp.util.AbstractStompClientConnection;
import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory;
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionV11;
+import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -2113,6 +2117,49 @@ public class StompV11Test extends StompTestBase {
conn.disconnect();
}
+
+ @Test
+ public void testHeartBeat3() throws Exception {
+
+ connection.close();
+ ClientStompFrame frame = conn.createFrame("CONNECT");
+ frame.addHeader("host", "127.0.0.1");
+ frame.addHeader("login", this.defUser);
+ frame.addHeader("passcode", this.defPass);
+ frame.addHeader("heart-beat", "500,500");
+ frame.addHeader("accept-version", "1.0,1.1");
+
+ ClientStompFrame reply = conn.sendFrame(frame);
+
+ assertEquals("CONNECTED", reply.getCommand());
+
+ assertEquals("500,500", reply.getHeader("heart-beat"));
+
+
+ System.out.println("========== start pinger!");
+
+ conn.startPinger(100);
+
+
+ Assert.assertEquals(1, server.getActiveMQServer().getRemotingService().getConnections().size());
+ StompConnection stompConnection = (StompConnection)server.getActiveMQServer().getRemotingService().getConnections().iterator().next();
+ StompFrameHandlerV11 stompFrameHandler = (StompFrameHandlerV11)stompConnection.getStompVersionHandler();
+
+ Thread.sleep(1000);
+
+ //now check the frame size
+ int size = conn.getServerPingNumber();
+
+ conn.stopPinger();
+ ((AbstractStompClientConnection)conn).killReaderThread();
+ Wait.waitFor(() -> {
+ return server.getActiveMQServer().getRemotingService().getConnections().size() == 0;
+ });
+
+ Assert.assertFalse(stompFrameHandler.getHeartBeater().isStarted());
+ }
+
+
protected void assertSubscribeWithClientAckThenConsumeWithAutoAck(boolean sendDisconnect) throws Exception {
conn.connect(defUser, defPass);
[2/2] activemq-artemis git commit: This closes #979
Posted by cl...@apache.org.
This closes #979
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/fb4bc063
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/fb4bc063
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/fb4bc063
Branch: refs/heads/master
Commit: fb4bc063f1d8a641803da2294b73cbab305d0e59
Parents: 98f6fa7 f79b21e
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Jan 26 00:00:38 2017 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Jan 26 00:00:38 2017 -0500
----------------------------------------------------------------------
.../core/protocol/stomp/StompConnection.java | 9 ++++
.../stomp/VersionedStompFrameHandler.java | 3 ++
.../stomp/v11/StompFrameHandlerV11.java | 15 ++++++-
.../util/AbstractStompClientConnection.java | 8 +++-
.../integration/stomp/v11/StompV11Test.java | 47 ++++++++++++++++++++
5 files changed, 79 insertions(+), 3 deletions(-)
----------------------------------------------------------------------