You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/05/24 20:29:30 UTC
[15/33] activemq-artemis git commit: ARTEMIS-1873 STOMP heartbeater
left alive on cxn destroy
ARTEMIS-1873 STOMP heartbeater left alive on cxn destroy
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/8d64f741
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/8d64f741
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/8d64f741
Branch: refs/heads/2.6.x
Commit: 8d64f741a17846e814e9795ab101572e4acbb9a7
Parents: 4cd7b1c
Author: Justin Bertram <jb...@apache.org>
Authored: Tue May 22 15:43:09 2018 -0500
Committer: Justin Bertram <jb...@apache.org>
Committed: Tue May 22 17:07:31 2018 -0500
----------------------------------------------------------------------
.../core/protocol/stomp/StompConnection.java | 22 ++++---
.../integration/stomp/v11/StompV11Test.java | 61 ++++++++++++++++++++
2 files changed, 71 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8d64f741/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
index fbd0107..171d7be 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
@@ -335,15 +335,11 @@ public final class StompConnection implements RemotingConnection {
if (destroyed) {
return;
}
- }
- destroyed = true;
+ destroyed = true;
+ }
internalClose();
-
- synchronized (sendLock) {
- callClosingListeners();
- }
}
public Acceptor getAcceptorUsed() {
@@ -351,9 +347,17 @@ public final class StompConnection implements RemotingConnection {
}
private void internalClose() {
+ if (frameHandler != null) {
+ frameHandler.disconnect();
+ }
+
transportConnection.close();
manager.cleanup(this);
+
+ synchronized (sendLock) {
+ callClosingListeners();
+ }
}
@Override
@@ -372,15 +376,9 @@ public final class StompConnection implements RemotingConnection {
ActiveMQServerLogger.LOGGER.connectionFailureDetected(me.getMessage(), me.getType());
- if (frameHandler != null) {
- frameHandler.disconnect();
- }
-
// Then call the listeners
callFailureListeners(me);
- callClosingListeners();
-
internalClose();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8d64f741/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java
index 99ad1fb..6a3fae6 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java
@@ -28,6 +28,7 @@ import java.nio.channels.ClosedChannelException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -37,6 +38,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
import org.apache.activemq.artemis.core.protocol.stomp.StompConnection;
import org.apache.activemq.artemis.core.protocol.stomp.v11.StompFrameHandlerV11;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.integration.stomp.StompTestBase;
import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
@@ -2181,6 +2183,65 @@ public class StompV11Test extends StompTestBase {
Assert.assertFalse(stompFrameHandler.getHeartBeater().isStarted());
}
+ @Test
+ public void testHeartBeat4() throws Exception {
+ connection.close();
+ ClientStompFrame frame = conn.createFrame("CONNECT");
+ frame.addHeader("host", "127.0.0.1");
+ frame.addHeader("login", this.defUser);
+ frame.addHeader("passcode", this.defPass);
+ frame.addHeader("heart-beat", "500,500");
+ frame.addHeader("accept-version", "1.1,1.2");
+
+ ClientStompFrame reply = conn.sendFrame(frame);
+
+ System.out.println("Reply: " + reply.toString());
+
+ assertEquals("CONNECTED", reply.getCommand());
+
+ // Obtain a reference to the server StompConnection object
+ RemotingConnection remotingConnection = null;
+ StompConnection stompConnection = null;
+ Iterator<RemotingConnection> iterator = server.getActiveMQServer().getRemotingService().getConnections().iterator();
+ while (iterator.hasNext()) {
+ remotingConnection = iterator.next();
+ if (remotingConnection instanceof StompConnection) {
+ stompConnection = (StompConnection)remotingConnection;
+ }
+ }
+
+ StompFrameHandlerV11 stompFrameHandler = (StompFrameHandlerV11) stompConnection.getStompVersionHandler();
+
+ System.out.println("========== start pinger!");
+
+ conn.startPinger(100);
+
+ ClientStompFrame subFrame = conn.createFrame("SUBSCRIBE");
+ subFrame.addHeader("destination", getTopicPrefix() + getTopicName());
+ subFrame.addHeader("id", "0");
+
+ ClientStompFrame f = conn.sendFrame(subFrame);
+ f = conn.sendFrame(subFrame);
+
+ // Send subscription with a duplicate ID, triggering a server error and closing of the session.
+ f = conn.sendFrame(subFrame);
+
+ f = conn.receiveFrame(1000);
+ System.out.println("Received " + f.toString());
+ Assert.assertTrue(f.getCommand().equals("ERROR"));
+
+ conn.stopPinger();
+
+ // give it some time to detect and close connections
+ Thread.sleep(2000);
+
+ Wait.waitFor(() -> {
+ return server.getActiveMQServer().getRemotingService().getConnections().size() == 0;
+ });
+
+ Assert.assertFalse("HeartBeater is still running!!", stompFrameHandler.getHeartBeater().isStarted());
+ }
+
protected void assertSubscribeWithClientAckThenConsumeWithAutoAck(boolean sendDisconnect) throws Exception {
conn.connect(defUser, defPass);