You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2016/07/21 15:57:07 UTC

[3/4] activemq-artemis git commit: ARTEMIS-611 send ERROR in case of TTL violation

ARTEMIS-611 send ERROR in case of TTL violation

I changed the message for TTL timeouts because the existing message was
a bit verbose and a bit speculative. Also, now that the broker supports
more protocols the bit about connection-ttl and
client-failure-check-period is less relevant.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/7d8f9dce
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/7d8f9dce
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/7d8f9dce

Branch: refs/heads/master
Commit: 7d8f9dcec690870197f7e0a6eb5f4335ac49aca4
Parents: 4232ed7
Author: jbertram <jb...@apache.org>
Authored: Wed Jul 20 15:28:29 2016 -0500
Committer: jbertram <jb...@apache.org>
Committed: Wed Jul 20 15:54:29 2016 -0500

----------------------------------------------------------------------
 .../artemis/core/protocol/stomp/StompConnection.java   |  6 ++++++
 .../core/remoting/server/impl/RemotingServiceImpl.java | 13 +++++++------
 .../artemis/core/server/ActiveMQMessageBundle.java     |  8 ++------
 .../artemis/tests/integration/stomp/StompTest.java     |  2 ++
 .../tests/integration/stomp/v11/StompV11Test.java      |  4 ++++
 5 files changed, 21 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7d8f9dce/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
index 5396c5b..7ab2750 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
@@ -286,6 +286,12 @@ public final class StompConnection implements RemotingConnection {
             return;
          }
 
+         if (me != null) {
+            StompFrame frame = frameHandler.createStompFrame(Stomp.Responses.ERROR);
+            frame.addHeader(Stomp.Headers.Error.MESSAGE, me.getMessage());
+            sendFrame(frame);
+         }
+
          destroyed = true;
       }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7d8f9dce/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
index 49a81db..60ac9a0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
@@ -41,6 +41,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
 import org.apache.activemq.artemis.api.core.BaseInterceptor;
+import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
@@ -691,7 +692,7 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
             try {
                long now = System.currentTimeMillis();
 
-               Set<Object> idsToRemove = new HashSet<>();
+               Set<Pair<Object, Long>> toRemove = new HashSet<>();
 
                for (ConnectionEntry entry : connections.values()) {
                   final RemotingConnection conn = entry.connection;
@@ -701,7 +702,7 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
                   if (entry.ttl != -1) {
                      if (!conn.checkDataReceived()) {
                         if (now >= entry.lastCheck + entry.ttl) {
-                           idsToRemove.add(conn.getID());
+                           toRemove.add(new Pair<>(conn.getID(), entry.ttl));
 
                            flush = false;
                         }
@@ -730,8 +731,8 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
                   }
                }
 
-               for (Object id : idsToRemove) {
-                  final RemotingConnection conn = getConnection(id);
+               for (final Pair<Object, Long> pair : toRemove) {
+                  final RemotingConnection conn = getConnection(pair.getA());
                   if (conn != null) {
                      // In certain cases (replicationManager for instance) calling fail could take some time
                      // We can't pause the FailureCheckAndFlushThread as that would lead other clients to fail for
@@ -739,10 +740,10 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
                      flushExecutor.execute(new Runnable() {
                         @Override
                         public void run() {
-                           conn.fail(ActiveMQMessageBundle.BUNDLE.clientExited(conn.getRemoteAddress()));
+                           conn.fail(ActiveMQMessageBundle.BUNDLE.clientExited(conn.getRemoteAddress(), pair.getB()));
                         }
                      });
-                     removeConnection(id);
+                     removeConnection(pair.getA());
                   }
                }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7d8f9dce/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
index dfcfed2..d7b893f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
@@ -100,12 +100,8 @@ public interface ActiveMQMessageBundle {
    ActiveMQInternalErrorException bindingNotDivert(SimpleString name);
 
    @Message(id = 119014,
-      value = "Did not receive data from {0}. It is likely the client has exited or crashed without " +
-         "closing its connection, or the network between the server and client has failed. " +
-         "You also might have configured connection-ttl and client-failure-check-period incorrectly. " +
-         "Please check user manual for more information." +
-         " The connection will now be closed.", format = Message.Format.MESSAGE_FORMAT)
-   ActiveMQConnectionTimedOutException clientExited(String remoteAddress);
+      value = "Did not receive data from {0} within the {1}ms connection TTL. The connection will now be closed.", format = Message.Format.MESSAGE_FORMAT)
+   ActiveMQConnectionTimedOutException clientExited(String remoteAddress, long ttl);
 
    @Message(id = 119017, value = "Queue {0} does not exist", format = Message.Format.MESSAGE_FORMAT)
    ActiveMQNonExistentQueueException noSuchQueue(SimpleString queueName);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7d8f9dce/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
index 8ba369f..4a6324a 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
@@ -66,6 +66,8 @@ public class StompTest extends StompTestBase {
 
       Thread.sleep(5000);
 
+      assertTrue(receiveFrame(index, 10000).indexOf(Stomp.Responses.ERROR) != -1);
+
       assertChannelClosed(index);
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7d8f9dce/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java
index 3d7e5f5..2cba55e 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java
@@ -761,6 +761,8 @@ public class StompV11Test extends StompV11TestBase {
 
       Thread.sleep(3000);
 
+      assertEquals(Stomp.Responses.ERROR, connection.receiveFrame(1000).getCommand());
+
       assertEquals(0, connection.getFrameQueueSize());
 
       try {
@@ -790,6 +792,8 @@ public class StompV11Test extends StompV11TestBase {
 
       Thread.sleep(3000);
 
+      assertEquals(Stomp.Responses.ERROR, connection.receiveFrame(1000).getCommand());
+
       assertEquals(0, connection.getFrameQueueSize());
 
       try {