You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/10/14 19:59:41 UTC
[activemq-artemis] branch master updated: ARTEMIS-2513 Large
message's copy may be interfered by other threads
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new 6177d32 ARTEMIS-2513 Large message's copy may be interfered by other threads
new 3687ae4 This closes #2859
6177d32 is described below
commit 6177d32774a4f899bea50b0bc7711643b403074a
Author: Howard Gao <ho...@gmail.com>
AuthorDate: Wed Oct 9 23:57:57 2019 +0800
ARTEMIS-2513 Large message's copy may be interfered by other threads
In LargeMessageImpl.copy(long) it need to open the underlying
file in order to read and copy bytes into the new copied message.
However there is a chance that another thread can come in and close
the file in the middle, making the copy failed
with "channel is null" error.
This is happening in cases where a large message is sent to a jms
topic (multicast address). During delivery it to multiple
subscribers, some consumer is doing delivery and closed the
underlying file after. Some other consumer is rolling back
the messages and eventually move it to DLQ (which will call
the above copy method). So there is a chance this bug being hit on.
---
.../impl/journal/LargeServerMessageImpl.java | 89 ++++++++--------
.../largemessage/ServerLargeMessageTest.java | 113 +++++++++++++++++++++
2 files changed, 159 insertions(+), 43 deletions(-)
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
index a55af10..85cb24c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
@@ -368,52 +368,51 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe
try {
LargeServerMessage newMessage = storageManager.createLargeMessage(newID, this);
- boolean originallyOpen = file != null && file.isOpen();
+ //clone a SequentialFile to avoid concurrent access
+ ensureFileExists(false);
+ SequentialFile cloneFile = file.cloneFile();
- validateFile();
-
- byte[] bufferBytes = new byte[100 * 1024];
-
- ByteBuffer buffer = ByteBuffer.wrap(bufferBytes);
+ try {
+ byte[] bufferBytes = new byte[100 * 1024];
- long oldPosition = file.position();
+ ByteBuffer buffer = ByteBuffer.wrap(bufferBytes);
- if (!file.isOpen()) {
- file.open();
- }
- file.position(0);
-
- for (;;) {
- // The buffer is reused...
- // We need to make sure we clear the limits and the buffer before reusing it
- buffer.clear();
- int bytesRead = file.read(buffer);
-
- byte[] bufferToWrite;
- if (bytesRead <= 0) {
- break;
- } else if (bytesRead == bufferBytes.length && !this.storageManager.isReplicated()) {
- // ARTEMIS-1220: We cannot reuse the same buffer if it's replicated
- // otherwise there could be another thread still using the buffer on a
- // replication.
- bufferToWrite = bufferBytes;
- } else {
- bufferToWrite = new byte[bytesRead];
- System.arraycopy(bufferBytes, 0, bufferToWrite, 0, bytesRead);
+ if (!cloneFile.isOpen()) {
+ cloneFile.open();
}
- newMessage.addBytes(bufferToWrite);
-
- if (bytesRead < bufferBytes.length) {
- break;
+ cloneFile.position(0);
+
+ for (;;) {
+ // The buffer is reused...
+ // We need to make sure we clear the limits and the buffer before reusing it
+ buffer.clear();
+ int bytesRead = cloneFile.read(buffer);
+
+ byte[] bufferToWrite;
+ if (bytesRead <= 0) {
+ break;
+ } else if (bytesRead == bufferBytes.length && !this.storageManager.isReplicated()) {
+ // ARTEMIS-1220: We cannot reuse the same buffer if it's replicated
+ // otherwise there could be another thread still using the buffer on a
+ // replication.
+ bufferToWrite = bufferBytes;
+ } else {
+ bufferToWrite = new byte[bytesRead];
+ System.arraycopy(bufferBytes, 0, bufferToWrite, 0, bytesRead);
+ }
+
+ newMessage.addBytes(bufferToWrite);
+
+ if (bytesRead < bufferBytes.length) {
+ break;
+ }
}
- }
-
- file.position(oldPosition);
-
- if (!originallyOpen) {
- file.close(false);
- newMessage.getFile().close();
+ } finally {
+ if (!file.isOpen()) {
+ newMessage.getFile().close();
+ }
+ cloneFile.close();
}
return newMessage;
@@ -469,9 +468,11 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe
}
}
- // Private -------------------------------------------------------
-
public synchronized void validateFile() throws ActiveMQException {
+ this.ensureFileExists(true);
+ }
+
+ public synchronized void ensureFileExists(boolean toOpen) throws ActiveMQException {
try {
if (file == null) {
if (messageID <= 0) {
@@ -480,7 +481,9 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe
file = createFile();
- openFile();
+ if (toOpen) {
+ openFile();
+ }
bodySize = file.size();
}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/largemessage/ServerLargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/largemessage/ServerLargeMessageTest.java
index 786b6ec..fcf30b3 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/largemessage/ServerLargeMessageTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/largemessage/ServerLargeMessageTest.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.integration.largemessage;
import java.io.File;
import java.io.IOException;
import java.lang.management.ManagementFactory;
+import java.lang.reflect.Field;
import java.net.URL;
import java.nio.ByteBuffer;
import java.util.HashSet;
@@ -36,6 +37,7 @@ import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.io.AbstractSequentialFile;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.buffer.TimedBuffer;
@@ -47,6 +49,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
import org.apache.activemq.artemis.tests.integration.security.SecurityTest;
+import org.apache.activemq.artemis.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer;
import org.junit.After;
@@ -335,6 +338,40 @@ public class ServerLargeMessageTest extends ActiveMQTestBase {
assertTrue(sync.get());
}
+ @Test
+ public void testLargeServerMessageCopyIsolation() throws Exception {
+ ActiveMQServer server = createServer(true);
+ server.start();
+
+ try {
+ LargeServerMessageImpl largeMessage = new LargeServerMessageImpl((JournalStorageManager)server.getStorageManager());
+ largeMessage.setMessageID(23456);
+
+ for (int i = 0; i < 2 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE; i++) {
+ largeMessage.addBytes(new byte[]{ActiveMQTestBase.getSamplebyte(i)});
+ }
+
+ //now replace the underlying file with a fake
+ replaceFile(largeMessage);
+
+ Message copied = largeMessage.copy(99999);
+ assertEquals(99999, copied.getMessageID());
+
+ } finally {
+ server.stop();
+ }
+ }
+
+ private void replaceFile(LargeServerMessageImpl largeMessage) throws Exception {
+ SequentialFile originalFile = largeMessage.getFile();
+ MockSequentialFile mockFile = new MockSequentialFile(originalFile);
+
+ Field fileField = LargeServerMessageImpl.class.getDeclaredField("file");
+ fileField.setAccessible(true);
+ fileField.set(largeMessage, mockFile);
+ mockFile.close();
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -342,5 +379,81 @@ public class ServerLargeMessageTest extends ActiveMQTestBase {
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
+ private class MockSequentialFile extends AbstractSequentialFile {
+
+ private SequentialFile originalFile;
+
+ MockSequentialFile(SequentialFile originalFile) throws Exception {
+ super(originalFile.getJavaFile().getParentFile(), originalFile.getFileName(), new FakeSequentialFileFactory(), null);
+ this.originalFile = originalFile;
+ this.originalFile.close();
+ }
+
+ @Override
+ public void open() throws Exception {
+ //open and close it right away to simulate failure condition
+ originalFile.open();
+ originalFile.close();
+ }
+
+ @Override
+ public void open(int maxIO, boolean useExecutor) throws Exception {
+ }
+
+ @Override
+ public boolean isOpen() {
+ return originalFile.isOpen();
+ }
+
+ @Override
+ public int calculateBlockStart(int position) throws Exception {
+ return originalFile.calculateBlockStart(position);
+ }
+
+ @Override
+ public void fill(int size) throws Exception {
+ originalFile.fill(size);
+ }
+
+ @Override
+ public void writeDirect(ByteBuffer bytes, boolean sync, IOCallback callback) {
+ originalFile.writeDirect(bytes, sync, callback);
+ }
+
+ @Override
+ public void writeDirect(ByteBuffer bytes, boolean sync) throws Exception {
+ originalFile.writeDirect(bytes, sync);
+ }
+
+ @Override
+ public void blockingWriteDirect(ByteBuffer bytes, boolean sync, boolean releaseBuffer) throws Exception {
+ originalFile.blockingWriteDirect(bytes, sync, releaseBuffer);
+ }
+
+ @Override
+ public int read(ByteBuffer bytes, IOCallback callback) throws Exception {
+ return originalFile.read(bytes, callback);
+ }
+
+ @Override
+ public int read(ByteBuffer bytes) throws Exception {
+ return originalFile.read(bytes);
+ }
+
+ @Override
+ public void sync() throws IOException {
+ originalFile.sync();
+ }
+
+ @Override
+ public long size() throws Exception {
+ return originalFile.size();
+ }
+
+ @Override
+ public SequentialFile cloneFile() {
+ return originalFile.cloneFile();
+ }
+ }
}