You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/06/21 14:25:48 UTC

[2/2] activemq-artemis git commit: ARTEMIS-1240 Disconnect at client side on decoding error

ARTEMIS-1240 Disconnect at client side on decoding error

When a broken packet arrives at client side it causes decoding error.
Currently artemis doesn't handle it properly. It should catch such
errors and disconnect the underlying connection, logging a proper
warning message


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/3d0896f8
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/3d0896f8
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/3d0896f8

Branch: refs/heads/master
Commit: 3d0896f87c586f184e7bd9367e70ade259677783
Parents: 498a328
Author: Howard Gao <ho...@gmail.com>
Authored: Tue Jun 20 08:13:16 2017 +0800
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Jun 21 10:25:34 2017 -0400

----------------------------------------------------------------------
 .../core/client/ActiveMQClientLogger.java       |  3 +
 .../client/impl/ClientSessionFactoryImpl.java   | 12 +++-
 .../DisconnectOnCriticalFailureTest.java        | 64 ++++++++++++++++++++
 3 files changed, 78 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3d0896f8/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java
index 6382147..bdb4bd1 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java
@@ -451,4 +451,7 @@ public interface ActiveMQClientLogger extends BasicLogger {
    @Message(id = 214030, value = "Failed to bind {0}={1}", format = Message.Format.MESSAGE_FORMAT)
    void failedToBind(String p1, String p2, @Cause Throwable cause);
 
+   @LogMessage(level = Logger.Level.ERROR)
+   @Message(id = 214031, value = "Failed to decode buffer, disconnect immediately.", format = Message.Format.MESSAGE_FORMAT)
+   void disconnectOnErrorDecoding(@Cause Throwable cause);
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3d0896f8/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
index 8f6a5ea..38da780 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
@@ -1140,7 +1140,17 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
          RemotingConnection theConn = connection;
 
          if (theConn != null && connectionID.equals(theConn.getID())) {
-            theConn.bufferReceived(connectionID, buffer);
+            try {
+               theConn.bufferReceived(connectionID, buffer);
+            } catch (final RuntimeException e) {
+               ActiveMQClientLogger.LOGGER.disconnectOnErrorDecoding(e);
+               threadPool.execute(new Runnable() {
+                  @Override
+                  public void run() {
+                     theConn.fail(new ActiveMQException(e.getMessage()));
+                  }
+               });
+            }
          } else {
             logger.debug("TheConn == null on ClientSessionFactoryImpl::DelegatingBufferHandler, ignoring packet");
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3d0896f8/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/DisconnectOnCriticalFailureTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/DisconnectOnCriticalFailureTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/DisconnectOnCriticalFailureTest.java
index 5a0d514..fea3bf4 100644
--- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/DisconnectOnCriticalFailureTest.java
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/DisconnectOnCriticalFailureTest.java
@@ -17,17 +17,24 @@
 
 package org.apache.activemq.artemis.tests.extras.byteman;
 
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
 import org.apache.activemq.artemis.tests.util.JMSTestBase;
 import org.jboss.byteman.contrib.bmunit.BMRule;
 import org.jboss.byteman.contrib.bmunit.BMRules;
 import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+import org.junit.After;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 
 import javax.jms.Connection;
 import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
 import javax.jms.Session;
+import javax.jms.TextMessage;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -37,6 +44,13 @@ public class DisconnectOnCriticalFailureTest extends JMSTestBase {
 
    private static AtomicBoolean corruptPacket = new AtomicBoolean(false);
 
+   @After
+   @Override
+   public void tearDown() throws Exception {
+      corruptPacket.set(false);
+      super.tearDown();
+   }
+
    @Test
    @BMRules(
       rules = {@BMRule(
@@ -71,6 +85,56 @@ public class DisconnectOnCriticalFailureTest extends JMSTestBase {
       }
    }
 
+   @Test
+   @BMRules(
+      rules = {@BMRule(
+         name = "Corrupt Decoding",
+         targetClass = "org.apache.activemq.artemis.core.protocol.ClientPacketDecoder",
+         targetMethod = "decode(org.apache.activemq.artemis.api.core.ActiveMQBuffer)",
+         targetLocation = "ENTRY",
+         action = "org.apache.activemq.artemis.tests.extras.byteman.DisconnectOnCriticalFailureTest.doThrow($1);")})
+   public void testClientDisconnect() throws Exception {
+      Queue q1 = createQueue("queue1");
+      final Connection connection = nettyCf.createConnection();
+      final CountDownLatch latch = new CountDownLatch(1);
+
+      try {
+         connection.setExceptionListener(new ExceptionListener() {
+            @Override
+            public void onException(JMSException e) {
+               latch.countDown();
+            }
+         });
+
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageProducer producer = session.createProducer(q1);
+         TextMessage m = session.createTextMessage("hello");
+         producer.send(m);
+         connection.start();
+
+         corruptPacket.set(true);
+         MessageConsumer consumer = session.createConsumer(q1);
+         consumer.receive(2000);
+
+         assertTrue(latch.await(5, TimeUnit.SECONDS));
+      } finally {
+         corruptPacket.set(false);
+
+         if (connection != null) {
+            connection.close();
+         }
+      }
+   }
+
+   public static void doThrow(ActiveMQBuffer buff) {
+      byte type = buff.getByte(buff.readerIndex());
+      if (corruptPacket.get() && type == PacketImpl.SESS_RECEIVE_MSG) {
+         corruptPacket.set(false);
+         throw new IllegalArgumentException("Invalid type: -84");
+      }
+   }
+
    public static void doThrow() {
       if (corruptPacket.get()) {
          corruptPacket.set(false);