You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/10/14 19:59:41 UTC

[activemq-artemis] branch master updated: ARTEMIS-2513 Large message's copy may be interfered by other threads

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 6177d32  ARTEMIS-2513 Large message's copy may be interfered by other threads
     new 3687ae4  This closes #2859
6177d32 is described below

commit 6177d32774a4f899bea50b0bc7711643b403074a
Author: Howard Gao <ho...@gmail.com>
AuthorDate: Wed Oct 9 23:57:57 2019 +0800

    ARTEMIS-2513 Large message's copy may be interfered by other threads
    
    In LargeMessageImpl.copy(long) it need to open the underlying
    file in order to read and copy bytes into the new copied message.
    However there is a chance that another thread can come in and close
    the file in the middle, making the copy failed
    with "channel is null" error.
    
    This is happening in cases where a large message is sent to a jms
    topic (multicast address). During delivery it to multiple
    subscribers, some consumer is doing delivery and closed the
    underlying file after. Some other consumer is rolling back
    the messages and eventually move it to DLQ (which will call
    the above copy method). So there is a chance this bug being hit on.
---
 .../impl/journal/LargeServerMessageImpl.java       |  89 ++++++++--------
 .../largemessage/ServerLargeMessageTest.java       | 113 +++++++++++++++++++++
 2 files changed, 159 insertions(+), 43 deletions(-)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
index a55af10..85cb24c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
@@ -368,52 +368,51 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe
       try {
          LargeServerMessage newMessage = storageManager.createLargeMessage(newID, this);
 
-         boolean originallyOpen = file != null && file.isOpen();
+         //clone a SequentialFile to avoid concurrent access
+         ensureFileExists(false);
+         SequentialFile cloneFile = file.cloneFile();
 
-         validateFile();
-
-         byte[] bufferBytes = new byte[100 * 1024];
-
-         ByteBuffer buffer = ByteBuffer.wrap(bufferBytes);
+         try {
+            byte[] bufferBytes = new byte[100 * 1024];
 
-         long oldPosition = file.position();
+            ByteBuffer buffer = ByteBuffer.wrap(bufferBytes);
 
-         if (!file.isOpen()) {
-            file.open();
-         }
-         file.position(0);
-
-         for (;;) {
-            // The buffer is reused...
-            // We need to make sure we clear the limits and the buffer before reusing it
-            buffer.clear();
-            int bytesRead = file.read(buffer);
-
-            byte[] bufferToWrite;
-            if (bytesRead <= 0) {
-               break;
-            } else if (bytesRead == bufferBytes.length && !this.storageManager.isReplicated()) {
-               // ARTEMIS-1220: We cannot reuse the same buffer if it's replicated
-               // otherwise there could be another thread still using the buffer on a
-               // replication.
-               bufferToWrite = bufferBytes;
-            } else {
-               bufferToWrite = new byte[bytesRead];
-               System.arraycopy(bufferBytes, 0, bufferToWrite, 0, bytesRead);
+            if (!cloneFile.isOpen()) {
+               cloneFile.open();
             }
 
-            newMessage.addBytes(bufferToWrite);
-
-            if (bytesRead < bufferBytes.length) {
-               break;
+            cloneFile.position(0);
+
+            for (;;) {
+               // The buffer is reused...
+               // We need to make sure we clear the limits and the buffer before reusing it
+               buffer.clear();
+               int bytesRead = cloneFile.read(buffer);
+
+               byte[] bufferToWrite;
+               if (bytesRead <= 0) {
+                  break;
+               } else if (bytesRead == bufferBytes.length && !this.storageManager.isReplicated()) {
+                  // ARTEMIS-1220: We cannot reuse the same buffer if it's replicated
+                  // otherwise there could be another thread still using the buffer on a
+                  // replication.
+                  bufferToWrite = bufferBytes;
+               } else {
+                  bufferToWrite = new byte[bytesRead];
+                  System.arraycopy(bufferBytes, 0, bufferToWrite, 0, bytesRead);
+               }
+
+               newMessage.addBytes(bufferToWrite);
+
+               if (bytesRead < bufferBytes.length) {
+                  break;
+               }
             }
-         }
-
-         file.position(oldPosition);
-
-         if (!originallyOpen) {
-            file.close(false);
-            newMessage.getFile().close();
+         } finally {
+            if (!file.isOpen()) {
+               newMessage.getFile().close();
+            }
+            cloneFile.close();
          }
 
          return newMessage;
@@ -469,9 +468,11 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe
       }
    }
 
-   // Private -------------------------------------------------------
-
    public synchronized void validateFile() throws ActiveMQException {
+      this.ensureFileExists(true);
+   }
+
+   public synchronized void ensureFileExists(boolean toOpen) throws ActiveMQException {
       try {
          if (file == null) {
             if (messageID <= 0) {
@@ -480,7 +481,9 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe
 
             file = createFile();
 
-            openFile();
+            if (toOpen) {
+               openFile();
+            }
 
             bodySize = file.size();
          }
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/largemessage/ServerLargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/largemessage/ServerLargeMessageTest.java
index 786b6ec..fcf30b3 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/largemessage/ServerLargeMessageTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/largemessage/ServerLargeMessageTest.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.integration.largemessage;
 import java.io.File;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
+import java.lang.reflect.Field;
 import java.net.URL;
 import java.nio.ByteBuffer;
 import java.util.HashSet;
@@ -36,6 +37,7 @@ import org.apache.activemq.artemis.api.core.client.ClientProducer;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
 import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.io.AbstractSequentialFile;
 import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.core.io.buffer.TimedBuffer;
@@ -47,6 +49,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServers;
 import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
 import org.apache.activemq.artemis.tests.integration.security.SecurityTest;
+import org.apache.activemq.artemis.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer;
 import org.junit.After;
@@ -335,6 +338,40 @@ public class ServerLargeMessageTest extends ActiveMQTestBase {
       assertTrue(sync.get());
    }
 
+   @Test
+   public void testLargeServerMessageCopyIsolation() throws Exception {
+      ActiveMQServer server = createServer(true);
+      server.start();
+
+      try {
+         LargeServerMessageImpl largeMessage = new LargeServerMessageImpl((JournalStorageManager)server.getStorageManager());
+         largeMessage.setMessageID(23456);
+
+         for (int i = 0; i < 2 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE; i++) {
+            largeMessage.addBytes(new byte[]{ActiveMQTestBase.getSamplebyte(i)});
+         }
+
+         //now replace the underlying file with a fake
+         replaceFile(largeMessage);
+
+         Message copied = largeMessage.copy(99999);
+         assertEquals(99999, copied.getMessageID());
+
+      } finally {
+         server.stop();
+      }
+   }
+
+   private void replaceFile(LargeServerMessageImpl largeMessage) throws Exception {
+      SequentialFile originalFile = largeMessage.getFile();
+      MockSequentialFile mockFile = new MockSequentialFile(originalFile);
+
+      Field fileField = LargeServerMessageImpl.class.getDeclaredField("file");
+      fileField.setAccessible(true);
+      fileField.set(largeMessage, mockFile);
+      mockFile.close();
+   }
+
       // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
@@ -342,5 +379,81 @@ public class ServerLargeMessageTest extends ActiveMQTestBase {
    // Private -------------------------------------------------------
 
    // Inner classes -------------------------------------------------
+   private class MockSequentialFile extends AbstractSequentialFile {
+
+      private SequentialFile originalFile;
+
+      MockSequentialFile(SequentialFile originalFile) throws Exception {
+         super(originalFile.getJavaFile().getParentFile(), originalFile.getFileName(), new FakeSequentialFileFactory(), null);
+         this.originalFile = originalFile;
+         this.originalFile.close();
+      }
+
+      @Override
+      public void open() throws Exception {
+         //open and close it right away to simulate failure condition
+         originalFile.open();
+         originalFile.close();
+      }
+
+      @Override
+      public void open(int maxIO, boolean useExecutor) throws Exception {
+      }
+
+      @Override
+      public boolean isOpen() {
+         return originalFile.isOpen();
+      }
+
+      @Override
+      public int calculateBlockStart(int position) throws Exception {
+         return originalFile.calculateBlockStart(position);
+      }
+
+      @Override
+      public void fill(int size) throws Exception {
+         originalFile.fill(size);
+      }
+
+      @Override
+      public void writeDirect(ByteBuffer bytes, boolean sync, IOCallback callback) {
+         originalFile.writeDirect(bytes, sync, callback);
+      }
+
+      @Override
+      public void writeDirect(ByteBuffer bytes, boolean sync) throws Exception {
+         originalFile.writeDirect(bytes, sync);
+      }
+
+      @Override
+      public void blockingWriteDirect(ByteBuffer bytes, boolean sync, boolean releaseBuffer) throws Exception {
+         originalFile.blockingWriteDirect(bytes, sync, releaseBuffer);
+      }
+
+      @Override
+      public int read(ByteBuffer bytes, IOCallback callback) throws Exception {
+         return originalFile.read(bytes, callback);
+      }
+
+      @Override
+      public int read(ByteBuffer bytes) throws Exception {
+         return originalFile.read(bytes);
+      }
+
+      @Override
+      public void sync() throws IOException {
+         originalFile.sync();
+      }
+
+      @Override
+      public long size() throws Exception {
+         return originalFile.size();
+      }
+
+      @Override
+      public SequentialFile cloneFile() {
+         return originalFile.cloneFile();
+      }
+   }
 
 }