You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2021/02/07 16:15:37 UTC
[activemq-artemis] branch master updated: ARTEMIS-3105 large
message file not closed on backup side
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new 9c5ec1b ARTEMIS-3105 large message file not closed on backup side
new 8862c11 This closes #3435
9c5ec1b is described below
commit 9c5ec1b07ccf908adab1bec3c5ef7d3f778f94a4
Author: 岳豹 <17...@cn.suning.com>
AuthorDate: Sat Feb 6 18:42:20 2021 +0800
ARTEMIS-3105 large message file not closed on backup side
---
.../protocol/amqp/broker/AMQPLargeMessage.java | 8 ++---
.../artemis/core/protocol/stomp/StompSession.java | 2 +-
.../artemis/core/persistence/StorageManager.java | 2 ++
.../impl/journal/JournalStorageManager.java | 12 +++++++
.../core/persistence/impl/journal/LargeBody.java | 11 ++++--
.../impl/journal/LargeServerMessageImpl.java | 8 ++---
.../impl/journal/LargeServerMessageInSync.java | 4 +--
.../impl/nullpm/NullStorageLargeServerMessage.java | 2 +-
.../impl/nullpm/NullStorageManager.java | 5 +++
.../protocol/core/ServerSessionPacketHandler.java | 2 +-
.../ReplicationLargeMessageEndMessage.java | 26 ++++++++++++--
.../core/replication/ReplicatedLargeMessage.java | 4 +--
.../core/replication/ReplicationEndpoint.java | 40 +++++++++++++++-------
.../core/replication/ReplicationManager.java | 8 ++++-
.../artemis/core/server/LargeServerMessage.java | 2 +-
.../core/server/impl/ServerConsumerImpl.java | 2 +-
.../core/transaction/impl/TransactionImplTest.java | 5 +++
.../integration/amqp/paging/AmqpPageTest.java | 2 +-
.../tests/integration/client/LargeMessageTest.java | 6 ++--
.../tests/integration/client/SendAckFailTest.java | 5 +++
.../largemessage/ServerLargeMessageTest.java | 4 +--
.../persistence/XmlImportExportTest.java | 6 ++--
.../integration/replication/ReplicationTest.java | 25 ++++++++++++++
.../integration/server/ScaleDown3NodeTest.java | 2 +-
.../tests/unit/core/paging/impl/PageTest.java | 2 +-
25 files changed, 148 insertions(+), 47 deletions(-)
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
index 171a275..771e745 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
@@ -150,7 +150,7 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
}
public void closeLargeMessage() throws Exception {
- largeBody.releaseResources(false);
+ largeBody.releaseResources(false, true);
parsingData.freeDirectBuffer();
parsingData = null;
}
@@ -443,8 +443,8 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
}
@Override
- public void releaseResources(boolean sync) {
- largeBody.releaseResources(sync);
+ public void releaseResources(boolean sync, boolean sendEvent) {
+ largeBody.releaseResources(sync, sendEvent);
}
@@ -526,7 +526,7 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
}
largeBody.copyInto(copy, bufferNewHeader, place.intValue());
- copy.releaseResources(true);
+ copy.releaseResources(true, true);
return copy;
} catch (Exception e) {
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
index 7aba86e..8cdc9de 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
@@ -380,7 +380,7 @@ public class StompSession implements SessionCallback {
largeMessage.addBytes(bytes);
- largeMessage.releaseResources(true);
+ largeMessage.releaseResources(true, true);
largeMessage.toMessage().putLongProperty(Message.HDR_LARGE_BODY_SIZE, bytes.length);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
index a35d2a9..48e4454 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
@@ -272,6 +272,8 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
*/
SequentialFile createFileForLargeMessage(long messageID, LargeMessageExtension extension);
+ void largeMessageClosed(LargeServerMessage largeServerMessage) throws ActiveMQException;
+
void deleteLargeMessageBody(LargeServerMessage largeServerMessage) throws ActiveMQException;
default SequentialFile createFileForLargeMessage(long messageID, boolean durable) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index 735ffcc..b21e868 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -450,6 +450,18 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
}
@Override
+ public void largeMessageClosed(LargeServerMessage largeServerMessage) throws ActiveMQException {
+ readLock();
+ try {
+ if (isReplicated()) {
+ replicator.largeMessageClosed(largeServerMessage.toMessage().getMessageID(), JournalStorageManager.this);
+ }
+ } finally {
+ readUnLock();
+ }
+ }
+
+ @Override
public void deleteLargeMessageBody(final LargeServerMessage largeServerMessage) throws ActiveMQException {
synchronized (largeServerMessage) {
if (!largeServerMessage.hasPendingRecord()) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java
index 4b1f020..a2e4273 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java
@@ -116,7 +116,7 @@ public class LargeBody {
public synchronized void deleteFile() {
try {
validateFile();
- releaseResources(false);
+ releaseResources(false, false);
storageManager.deleteLargeMessageBody(message);
} catch (Exception e) {
storageManager.criticalError(e);
@@ -303,13 +303,20 @@ public class LargeBody {
}
}
- public synchronized void releaseResources(boolean sync) {
+ /**
+ * sendEvent means it's a close happening from end of write largemessage.
+ * While reading the largemessage we don't need (and shouldn't inform the backup
+ */
+ public synchronized void releaseResources(boolean sync, boolean sendEvent) {
if (file != null && file.isOpen()) {
try {
if (sync) {
file.sync();
}
file.close(false, false);
+ if (sendEvent) {
+ storageManager.largeMessageClosed(message);
+ }
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.largeMessageErrorReleasingResources(e);
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
index 2ee6114..6809808 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
@@ -63,7 +63,7 @@ public final class LargeServerMessageImpl extends CoreMessage implements CoreLar
ActiveMQBuffer buffer = coreMessage.getReadOnlyBodyBuffer();
final int readableBytes = buffer.readableBytes();
lsm.addBytes(buffer);
- lsm.releaseResources(true);
+ lsm.releaseResources(true, true);
lsm.toMessage().putLongProperty(Message.HDR_LARGE_BODY_SIZE, readableBytes);
return lsm.toMessage();
}
@@ -254,9 +254,9 @@ public final class LargeServerMessageImpl extends CoreMessage implements CoreLar
}
@Override
- public void releaseResources(boolean sync) {
+ public void releaseResources(boolean sync, boolean sendEvent) {
synchronized (largeBody) {
- largeBody.releaseResources(sync);
+ largeBody.releaseResources(sync, sendEvent);
}
}
@@ -293,7 +293,7 @@ public final class LargeServerMessageImpl extends CoreMessage implements CoreLar
try {
LargeServerMessage newMessage = storageManager.createLargeMessage(newID, this);
largeBody.copyInto(newMessage);
- newMessage.releaseResources(true);
+ newMessage.releaseResources(true, true);
return newMessage.toMessage();
} catch (Exception e) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageInSync.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageInSync.java
index 62d5365..dbf2480 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageInSync.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageInSync.java
@@ -96,11 +96,11 @@ public final class LargeServerMessageInSync implements ReplicatedLargeMessage {
}
@Override
- public synchronized void releaseResources(boolean sync) {
+ public synchronized void releaseResources(boolean sync, boolean sendEvent) {
if (logger.isTraceEnabled()) {
logger.trace("release resources called on " + mainLM, new Exception("trace"));
}
- mainLM.releaseResources(sync);
+ mainLM.releaseResources(sync, sendEvent);
if (appendFile != null && appendFile.isOpen()) {
try {
appendFile.close();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
index 69ea514..68c7f88 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
@@ -38,7 +38,7 @@ class NullStorageLargeServerMessage extends CoreMessage implements CoreLargeServ
}
@Override
- public void releaseResources(boolean sync) {
+ public void releaseResources(boolean sync, boolean sendEvent) {
}
@Override
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
index d1783b2..8375060 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
@@ -608,6 +608,11 @@ public class NullStorageManager implements StorageManager {
}
@Override
+ public void largeMessageClosed(LargeServerMessage largeServerMessage) throws ActiveMQException {
+
+ }
+
+ @Override
public boolean addToPage(PagingStore store,
Message msg,
Transaction tx,
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index 97d2963..aea354a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -1035,7 +1035,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
currentLargeMessage.addBytes(body);
if (!continues) {
- currentLargeMessage.releaseResources(true);
+ currentLargeMessage.releaseResources(true, true);
if (messageBodySize >= 0) {
currentLargeMessage.toMessage().putLongProperty(Message.HDR_LARGE_BODY_SIZE, messageBodySize);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java
index a9be86a..2c1d808 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java
@@ -24,31 +24,38 @@ public class ReplicationLargeMessageEndMessage extends PacketImpl {
long messageId;
long pendingRecordId;
+ /**
+ * True = delete file, False = close file
+ */
+ private boolean isDelete;
public ReplicationLargeMessageEndMessage() {
super(PacketImpl.REPLICATION_LARGE_MESSAGE_END);
}
- public ReplicationLargeMessageEndMessage(final long messageId, final long pendingRecordId) {
+ public ReplicationLargeMessageEndMessage(final long messageId, final long pendingRecordId, final boolean isDelete) {
this();
this.messageId = messageId;
//we use negative value to indicate that this id is pre-generated by live node
//so that it won't be generated at backup.
//see https://issues.apache.org/jira/browse/ARTEMIS-1221
this.pendingRecordId = -pendingRecordId;
+ this.isDelete = isDelete;
}
@Override
public int expectedEncodeSize() {
return PACKET_HEADERS_SIZE +
DataConstants.SIZE_LONG + // buffer.writeLong(messageId)
- DataConstants.SIZE_LONG; // buffer.writeLong(pendingRecordId);
+ DataConstants.SIZE_LONG + // buffer.writeLong(pendingRecordId);
+ DataConstants.SIZE_BOOLEAN; // buffer.writeBoolean(isDelete);
}
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
buffer.writeLong(messageId);
buffer.writeLong(pendingRecordId);
+ buffer.writeBoolean(isDelete);
}
@Override
@@ -57,6 +64,9 @@ public class ReplicationLargeMessageEndMessage extends PacketImpl {
if (buffer.readableBytes() >= DataConstants.SIZE_LONG) {
pendingRecordId = buffer.readLong();
}
+ if (buffer.readableBytes() >= DataConstants.SIZE_BOOLEAN) {
+ isDelete = buffer.readBoolean();
+ }
}
/**
@@ -70,6 +80,7 @@ public class ReplicationLargeMessageEndMessage extends PacketImpl {
public int hashCode() {
final int prime = 31;
int result = super.hashCode();
+ result = prime * result + (isDelete ? 1231 : 1237);
result = prime * result + (int) (messageId ^ (messageId >>> 32));
return result;
}
@@ -77,7 +88,7 @@ public class ReplicationLargeMessageEndMessage extends PacketImpl {
@Override
public String toString() {
return "ReplicationLargeMessageEndMessage{" +
- "messageId=" + messageId +
+ "messageId=" + messageId + ", isDelete=" + isDelete +
'}';
}
@@ -92,10 +103,19 @@ public class ReplicationLargeMessageEndMessage extends PacketImpl {
ReplicationLargeMessageEndMessage other = (ReplicationLargeMessageEndMessage) obj;
if (messageId != other.messageId)
return false;
+ if (isDelete != other.isDelete)
+ return false;
return true;
}
public long getPendingRecordId() {
return pendingRecordId;
}
+
+ /**
+ * @return the isDelete
+ */
+ public boolean isDelete() {
+ return isDelete;
+ }
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedLargeMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedLargeMessage.java
index d63167f..fdcd808 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedLargeMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedLargeMessage.java
@@ -38,9 +38,9 @@ public interface ReplicatedLargeMessage {
Message setMessageID(long id);
/**
- * @see org.apache.activemq.artemis.core.server.LargeServerMessage#releaseResources(boolean)
+ * @see org.apache.activemq.artemis.core.server.LargeServerMessage#releaseResources(boolean,boolean)
*/
- void releaseResources(boolean sync);
+ void releaseResources(boolean sync, boolean sendEvent);
/**
* @see org.apache.activemq.artemis.core.server.LargeServerMessage#deleteFile()
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
index 1729476..debdfb1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
@@ -346,7 +346,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
}
for (ReplicatedLargeMessage largeMessage : largeMessages.values()) {
- largeMessage.releaseResources(true);
+ largeMessage.releaseResources(true, false);
}
largeMessages.clear();
@@ -615,22 +615,29 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
if (logger.isTraceEnabled()) {
logger.trace("handleLargeMessageEnd on " + packet.getMessageId());
}
- final ReplicatedLargeMessage message = lookupLargeMessage(packet.getMessageId(), true, false);
+ final ReplicatedLargeMessage message = lookupLargeMessage(packet.getMessageId(), packet.isDelete(), false);
if (message != null) {
message.setPendingRecordID(packet.getPendingRecordId());
- executor.execute(new Runnable() {
- @Override
- public void run() {
- try {
- if (logger.isTraceEnabled()) {
- logger.trace("Deleting LargeMessage " + packet.getMessageId() + " on the executor @ handleLargeMessageEnd");
+ if (!packet.isDelete()) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Closing LargeMessage " + packet.getMessageId() + " on the executor @ handleLargeMessageEnd");
+ }
+ message.releaseResources(true, false);
+ } else {
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Deleting LargeMessage " + packet.getMessageId() + " on the executor @ handleLargeMessageEnd");
+ }
+ message.deleteFile();
+ } catch (Exception e) {
+ ActiveMQServerLogger.LOGGER.errorDeletingLargeMessage(e, packet.getMessageId());
}
- message.deleteFile();
- } catch (Exception e) {
- ActiveMQServerLogger.LOGGER.errorDeletingLargeMessage(e, packet.getMessageId());
}
- }
- });
+ });
+ }
}
}
@@ -903,4 +910,11 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
public void setExecutor(Executor executor2) {
this.executor = executor2;
}
+
+ /**
+ * This is for tests basically, do not use it as its API is not guaranteed for future usage.
+ */
+ public ConcurrentMap<Long, ReplicatedLargeMessage> getLargeMessages() {
+ return largeMessages;
+ }
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
index 6554cc8..9a2d629 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
@@ -256,7 +256,13 @@ public final class ReplicationManager implements ActiveMQComponent {
public void largeMessageDelete(final Long messageId, JournalStorageManager storageManager) {
if (enabled) {
long pendingRecordID = storageManager.generateID();
- sendReplicatePacket(new ReplicationLargeMessageEndMessage(messageId, pendingRecordID));
+ sendReplicatePacket(new ReplicationLargeMessageEndMessage(messageId, pendingRecordID, true));
+ }
+ }
+
+ public void largeMessageClosed(final Long messageId, JournalStorageManager storageManager) {
+ if (enabled) {
+ sendReplicatePacket(new ReplicationLargeMessageEndMessage(messageId, -1, false));
}
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java
index 6375f3b..b2d3aef 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java
@@ -52,7 +52,7 @@ public interface LargeServerMessage extends ReplicatedLargeMessage {
* Close the files if opened
*/
@Override
- void releaseResources(boolean sync);
+ void releaseResources(boolean sync, boolean sendEvent);
@Override
void deleteFile() throws Exception;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index a3b3ac7..15864db 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -1418,7 +1418,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
context = null;
}
- largeMessage.releaseResources(false);
+ largeMessage.releaseResources(false, false);
largeMessage.toMessage().usageDown();
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
index 1ff7892..2deefdf 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
@@ -252,6 +252,11 @@ public class TransactionImplTest extends ActiveMQTestBase {
}
@Override
+ public void largeMessageClosed(LargeServerMessage largeServerMessage) throws ActiveMQException {
+
+ }
+
+ @Override
public void addBytesToLargeMessage(SequentialFile file, long messageId, ActiveMQBuffer bytes) throws Exception {
}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/paging/AmqpPageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/paging/AmqpPageTest.java
index ff37554..4e78236 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/paging/AmqpPageTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/paging/AmqpPageTest.java
@@ -71,7 +71,7 @@ public class AmqpPageTest extends PageTest {
} else {
final AMQPLargeMessage message = createLargeMessage(storageManager, address, msgID, content);
page.write(new PagedMessageImpl(message, new long[0]));
- message.releaseResources(false);
+ message.releaseResources(false, false);
}
}
}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
index ad3a48b..d98876b 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
@@ -2495,7 +2495,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
// The server would be doing this
fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, largeMessageSize);
- fileMessage.releaseResources(false);
+ fileMessage.releaseResources(false, false);
Assert.assertEquals(largeMessageSize, fileMessage.getBodyBufferSize());
}
@@ -2522,7 +2522,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
// The server would be doing this
fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, largeMessageSize);
- fileMessage.releaseResources(false);
+ fileMessage.releaseResources(false, false);
session.createQueue(new QueueConfiguration(ADDRESS));
@@ -2687,7 +2687,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
fileMessage.addBytes(new byte[]{ActiveMQTestBase.getSamplebyte(i)});
}
- fileMessage.releaseResources(false);
+ fileMessage.releaseResources(false, false);
session.createQueue(new QueueConfiguration(ADDRESS));
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
index 786de34..d16067a 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
@@ -863,6 +863,11 @@ public class SendAckFailTest extends SpawnedTestBase {
}
@Override
+ public void largeMessageClosed(LargeServerMessage largeServerMessage) throws ActiveMQException {
+ manager.largeMessageClosed(largeServerMessage);
+ }
+
+ @Override
public void addBytesToLargeMessage(SequentialFile file, long messageId, ActiveMQBuffer bytes) throws Exception {
manager.addBytesToLargeMessage(file, messageId, bytes);
}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/largemessage/ServerLargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/largemessage/ServerLargeMessageTest.java
index 35b960c..18be7cd 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/largemessage/ServerLargeMessageTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/largemessage/ServerLargeMessageTest.java
@@ -116,7 +116,7 @@ public class ServerLargeMessageTest extends ActiveMQTestBase {
// The server would be doing this
fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, 2 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
- fileMessage.releaseResources(false);
+ fileMessage.releaseResources(false, true);
session.createQueue(new QueueConfiguration("A").setRoutingType(RoutingType.ANYCAST));
@@ -339,7 +339,7 @@ public class ServerLargeMessageTest extends ActiveMQTestBase {
largeServerMessage.setMessageID(1234);
largeServerMessage.addBytes(new byte[0]);
assertTrue(open.get());
- largeServerMessage.releaseResources(true);
+ largeServerMessage.releaseResources(true, true);
assertTrue(sync.get());
}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java
index f4ee96f..cc0ec1a 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java
@@ -472,7 +472,7 @@ public class XmlImportExportTest extends ActiveMQTestBase {
fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, 2 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
- fileMessage.releaseResources(false);
+ fileMessage.releaseResources(false, true);
session.createQueue(new QueueConfiguration("A"));
@@ -544,7 +544,7 @@ public class XmlImportExportTest extends ActiveMQTestBase {
fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, 2 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
- fileMessage.releaseResources(false);
+ fileMessage.releaseResources(false, true);
session.createQueue(new QueueConfiguration("A"));
@@ -888,7 +888,7 @@ public class XmlImportExportTest extends ActiveMQTestBase {
fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, 2 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
- fileMessage.releaseResources(false);
+ fileMessage.releaseResources(false, true);
producer.send(fileMessage);
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
index da64521..b82e582 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
@@ -71,6 +71,7 @@ import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
+import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl;
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.protocol.core.Packet;
@@ -78,6 +79,7 @@ import org.apache.activemq.artemis.core.replication.ReplicatedJournal;
import org.apache.activemq.artemis.core.replication.ReplicationManager;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.cluster.ClusterController;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
@@ -539,6 +541,29 @@ public final class ReplicationTest extends ActiveMQTestBase {
Assert.assertEquals(0, manager.getActiveTokens().size());
}
+ @Test
+ public void testReplicationLargeMessageFileClose() throws Exception {
+ setupServer(true);
+
+ JournalStorageManager storage = getStorage();
+
+ manager = liveServer.getReplicationManager();
+ waitForComponent(manager);
+
+ CoreMessage msg = new CoreMessage().initBuffer(1024).setMessageID(1);
+ LargeServerMessage largeMsg = liveServer.getStorageManager().createLargeMessage(500, msg);
+ largeMsg.addBytes(new byte[1024]);
+ largeMsg.releaseResources(true, true);
+
+ blockOnReplication(storage, manager);
+
+ LargeServerMessageImpl message1 = (LargeServerMessageImpl) backupServer.getReplicationEndpoint().getLargeMessages().get(Long.valueOf(500));
+
+ Assert.assertNotNull(message1);
+ Assert.assertFalse(largeMsg.getAppendFile().isOpen());
+ Assert.assertFalse(message1.getAppendFile().isOpen());
+ }
+
class FakeData implements EncodingSupport {
@Override
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDown3NodeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDown3NodeTest.java
index 4a6c78d..726b079 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDown3NodeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDown3NodeTest.java
@@ -144,7 +144,7 @@ public class ScaleDown3NodeTest extends ClusterTestBase {
fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, 2 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
- fileMessage.releaseResources(false);
+ fileMessage.releaseResources(false, false);
message = fileMessage;
} else {
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTest.java
index eaf0359..bcf8808 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTest.java
@@ -293,7 +293,7 @@ public class PageTest extends ActiveMQTestBase {
msg.addBytes(content);
msg.setAddress(address);
page.write(new PagedMessageImpl(msg, new long[0]));
- msg.releaseResources(false);
+ msg.releaseResources(false, false);
} else {
ICoreMessage msg = new CoreMessage().initBuffer(100);
msg.setMessageID(msgID);