You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/08/29 17:41:08 UTC

[2/2] activemq-artemis git commit: ARTEMIS-2055 Lock LM on PacketHandler on clear

ARTEMIS-2055 Lock LM on PacketHandler on clear


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/9a855e18
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/9a855e18
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/9a855e18

Branch: refs/heads/master
Commit: 9a855e18e1e73fb36486a06b53ee61b9fbf8f7db
Parents: ff6a690
Author: Martyn Taylor <mt...@redhat.com>
Authored: Mon Aug 27 15:05:20 2018 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Aug 29 13:40:19 2018 -0400

----------------------------------------------------------------------
 .../core/ServerSessionPacketHandler.java        | 47 +++++++++++---------
 .../byteman/LargeMessageOnShutdownTest.java     |  2 +-
 2 files changed, 28 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9a855e18/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index 3b0433e..37564b5 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -160,6 +160,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
 
    private final boolean direct;
 
+   private final Object largeMessageLock = new Object();
 
    public ServerSessionPacketHandler(final ActiveMQServer server,
                                      final CoreProtocolManager manager,
@@ -196,11 +197,15 @@ public class ServerSessionPacketHandler implements ChannelHandler {
    }
 
    private void clearLargeMessage() {
-      if (currentLargeMessage != null) {
-         try {
-            currentLargeMessage.deleteFile();
-         } catch (Throwable error) {
-            ActiveMQServerLogger.LOGGER.errorDeletingLargeMessageFile(error);
+      synchronized (largeMessageLock) {
+         if (currentLargeMessage != null) {
+            try {
+               currentLargeMessage.deleteFile();
+            } catch (Throwable error) {
+               ActiveMQServerLogger.LOGGER.errorDeletingLargeMessageFile(error);
+            } finally {
+               currentLargeMessage = null;
+            }
          }
       }
    }
@@ -958,26 +963,28 @@ public class ServerSessionPacketHandler implements ChannelHandler {
                                   final long messageBodySize,
                                   final byte[] body,
                                   final boolean continues) throws Exception {
-      if (currentLargeMessage == null) {
-         throw ActiveMQMessageBundle.BUNDLE.largeMessageNotInitialised();
-      }
 
-      // Immediately release the credits for the continuations- these don't contribute to the in-memory size
-      // of the message
+      synchronized (largeMessageLock) {
+         if (currentLargeMessage == null) {
+            throw ActiveMQMessageBundle.BUNDLE.largeMessageNotInitialised();
+         }
+
+         // Immediately release the credits for the continuations- these don't contribute to the in-memory size
+         // of the message
 
-      currentLargeMessage.addBytes(body);
+         currentLargeMessage.addBytes(body);
 
-      if (!continues) {
-         currentLargeMessage.releaseResources();
+         if (!continues) {
+            currentLargeMessage.releaseResources();
 
-         if (messageBodySize >= 0) {
-            currentLargeMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, messageBodySize);
-         }
+            if (messageBodySize >= 0) {
+               currentLargeMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, messageBodySize);
+            }
 
-         LargeServerMessage message = currentLargeMessage;
-         currentLargeMessage = null;
-         session.doSend(session.getCurrentTransaction(), message, null, false, false);
+            LargeServerMessage message = currentLargeMessage;
+            currentLargeMessage = null;
+            session.doSend(session.getCurrentTransaction(), message, null, false, false);
+         }
       }
    }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9a855e18/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/LargeMessageOnShutdownTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/LargeMessageOnShutdownTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/LargeMessageOnShutdownTest.java
index e9be73b..9b01223 100644
--- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/LargeMessageOnShutdownTest.java
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/LargeMessageOnShutdownTest.java
@@ -73,7 +73,7 @@ public class LargeMessageOnShutdownTest extends ActiveMQTestBase {
             condition = "!flagged(\"testLargeMessageOnShutdown\")",
             action =
                "org.apache.activemq.artemis.tests.extras.byteman.LargeMessageOnShutdownTest.stopServer();" +
-               "waitFor(\"testLargeMessageOnShutdown\");" +
+               "waitFor(\"testLargeMessageOnShutdown\", 5000);" +
                "flag(\"testLargeMessageOnShutdown\")"
          ),
          @BMRule(