You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/08/27 12:16:00 UTC

[1/2] activemq-artemis git commit: ARTEMIS-2055 Set Live LM to Null after route

Repository: activemq-artemis
Updated Branches:
  refs/heads/2.6.x 4a7b5252f -> dc2e4dd54


ARTEMIS-2055 Set Live LM to Null after route

The ServerSessionPacketHandler has a close() callback handler which will
delete any pending large messages.  However, there is a race where a
large message can be routed, then the close delete the associated large
message resulting in data loss.

(cherry picked from commit 490ef71e1dccc88e51d862cc51af468d37d416ce)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/f2d26dc1
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/f2d26dc1
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/f2d26dc1

Branch: refs/heads/2.6.x
Commit: f2d26dc1be26ea1d7c7ac505403d26a045ca5da9
Parents: 4a7b525
Author: Martyn Taylor <mt...@redhat.com>
Authored: Mon Aug 27 09:18:02 2018 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Aug 27 08:15:49 2018 -0400

----------------------------------------------------------------------
 .../core/ServerSessionPacketHandler.java        |   4 +-
 .../byteman/LargeMessageOnShutdownTest.java     | 137 +++++++++++++++++++
 2 files changed, 139 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f2d26dc1/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index 36273f8..3b0433e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -974,9 +974,9 @@ public class ServerSessionPacketHandler implements ChannelHandler {
             currentLargeMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, messageBodySize);
          }
 
-         session.doSend(session.getCurrentTransaction(), currentLargeMessage, null, false, false);
-
+         LargeServerMessage message = currentLargeMessage;
          currentLargeMessage = null;
+         session.doSend(session.getCurrentTransaction(), message, null, false, false);
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f2d26dc1/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/LargeMessageOnShutdownTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/LargeMessageOnShutdownTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/LargeMessageOnShutdownTest.java
new file mode 100644
index 0000000..ebbd7de
--- /dev/null
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/LargeMessageOnShutdownTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.extras.byteman;
+
+import java.io.ByteArrayInputStream;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMRules;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+@RunWith(BMUnitRunner.class)
+public class LargeMessageOnShutdownTest extends ActiveMQTestBase {
+
+   private static final SimpleString queueName = new SimpleString("largeMessageShutdownQueue");
+   private static ActiveMQServer server;
+
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+
+      server = createServer(true, createDefaultNettyConfig());
+      startServer();
+      server.createQueue(queueName, RoutingType.MULTICAST, queueName, null, true, false);
+   }
+
+   @After
+   public void tearDown() throws Exception {
+      super.tearDown();
+      stopServer();
+   }
+
+   @Test
+   @BMRules(
+      rules = {
+         @BMRule(
+            name = "BlockOnFinalLargeMessagePacket",
+            targetClass = "org.apache.activemq.artemis.core.server.impl.ServerSessionImpl",
+            targetMethod = "doSend(Transaction,Message,SimpleString,boolean,boolean)",
+            targetLocation = "EXIT",
+            condition = "!flagged(\"testLargeMessageOnShutdown\")",
+            action =
+               "org.apache.activemq.artemis.tests.extras.byteman.LargeMessageOnShutdownTest.stopServer();" +
+               "waitFor(\"testLargeMessageOnShutdown\");" +
+               "flag(\"testLargeMessageOnShutdown\")"
+         ),
+         @BMRule(
+            name = "ReleaseBlockOnSessionCleanup",
+            targetClass = "org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler",
+            targetMethod = "clearLargeMessage()",
+            targetLocation = "EXIT",
+            action = "signalWake(\"testLargeMessageOnShutdown\")"
+         )
+      }
+   )
+   public void testLargeMessageOnShutdown() throws Exception {
+
+      byte[] payload = new byte[ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE * 2];
+
+      // Send Large Message
+      ClientSessionFactory csf1 = createSessionFactory(createNettyNonHALocator());
+      try {
+         ClientSession session1 = csf1.createSession();
+         ClientProducer producer1 = session1.createProducer(queueName);
+         ClientMessage message = session1.createMessage(true);
+
+         message.setBodyInputStream(new ByteArrayInputStream(payload));
+         producer1.send(message);
+      } catch (Exception e) {
+         // Expected due to shutdown.
+      }
+      finally {
+         csf1.close();
+      }
+
+      waitForStoppedServer();
+      startServer();
+
+      // Consume Large Message
+      ClientSessionFactory csf2 = createSessionFactory(createNettyNonHALocator());
+      try {
+         ClientSession session2 = csf2.createSession();
+         session2.start();
+         ClientConsumer consumer2 = session2.createConsumer(queueName);
+         ClientMessage rmessage = consumer2.receive(10000);
+
+         assertEquals(payload.length, rmessage.getBodyBuffer().readableBytes());
+         assertEqualsBuffers(payload.length, ActiveMQBuffers.wrappedBuffer(payload), rmessage.getBodyBuffer());
+      } finally {
+         csf2.close();
+      }
+   }
+
+   public static void stopServer() throws Exception {
+      server.stop();
+      waitForStoppedServer();
+   }
+
+   public static void startServer() throws Exception {
+      server.start();
+      server.waitForActivation(30, TimeUnit.SECONDS);
+   }
+
+   public static void waitForStoppedServer() throws Exception {
+      Wait.waitFor(() -> server.getState() == ActiveMQServer.SERVER_STATE.STOPPED);
+   }
+}


[2/2] activemq-artemis git commit: ARTEMIS-2056 Set write position on JDBCFile copy

Posted by cl...@apache.org.
ARTEMIS-2056 Set write position on JDBCFile copy

(cherry picked from commit b36a1058d4584adca399a591c4109866d4265e07)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/dc2e4dd5
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/dc2e4dd5
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/dc2e4dd5

Branch: refs/heads/2.6.x
Commit: dc2e4dd54695d7af349fa754f9212e48adebceca
Parents: f2d26dc
Author: Martyn Taylor <mt...@redhat.com>
Authored: Fri Aug 24 14:24:25 2018 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Aug 27 08:15:54 2018 -0400

----------------------------------------------------------------------
 .../activemq/artemis/jdbc/store/file/JDBCSequentialFile.java   | 6 ++++++
 .../artemis/jdbc/file/JDBCSequentialFileFactoryTest.java       | 3 +++
 2 files changed, 9 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc2e4dd5/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
index 843be54..fec8eaf 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
@@ -330,6 +330,7 @@ public class JDBCSequentialFile implements SequentialFile {
    public SequentialFile cloneFile() {
       try {
          JDBCSequentialFile clone = new JDBCSequentialFile(fileFactory, filename, executor, dbDriver, writeLock);
+         clone.setWritePosition(this.writePosition);
          return clone;
       } catch (Exception e) {
          fileFactory.onIOError(e, "Error cloning JDBC file.", this);
@@ -342,8 +343,13 @@ public class JDBCSequentialFile implements SequentialFile {
       JDBCSequentialFile clone = (JDBCSequentialFile) cloneFile;
       try {
          synchronized (writeLock) {
+            if (logger.isTraceEnabled()) {
+               logger.trace("JDBC Copying File.  From: " + this + " To: " + cloneFile);
+            }
+
             clone.open();
             dbDriver.copyFileData(this, clone);
+            clone.setWritePosition(writePosition);
          }
       } catch (Exception e) {
          fileFactory.onIOError(e, "Error copying JDBC file.", this);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc2e4dd5/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java b/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java
index d567f84..a45b9a8 100644
--- a/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java
+++ b/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java
@@ -221,6 +221,9 @@ public class JDBCSequentialFileFactoryTest {
 
       checkData(file, src);
       checkData(copy, src);
+
+      assertEquals(bufferSize, copy.size());
+      assertEquals(bufferSize, file.size());
    }
 
    @Test