You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2021/02/07 16:15:37 UTC

[activemq-artemis] branch master updated: ARTEMIS-3105 large message file not closed on backup side

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 9c5ec1b  ARTEMIS-3105 large message file not closed on backup side
     new 8862c11  This closes #3435
9c5ec1b is described below

commit 9c5ec1b07ccf908adab1bec3c5ef7d3f778f94a4
Author: 岳豹 <17...@cn.suning.com>
AuthorDate: Sat Feb 6 18:42:20 2021 +0800

    ARTEMIS-3105 large message file not closed on backup side
---
 .../protocol/amqp/broker/AMQPLargeMessage.java     |  8 ++---
 .../artemis/core/protocol/stomp/StompSession.java  |  2 +-
 .../artemis/core/persistence/StorageManager.java   |  2 ++
 .../impl/journal/JournalStorageManager.java        | 12 +++++++
 .../core/persistence/impl/journal/LargeBody.java   | 11 ++++--
 .../impl/journal/LargeServerMessageImpl.java       |  8 ++---
 .../impl/journal/LargeServerMessageInSync.java     |  4 +--
 .../impl/nullpm/NullStorageLargeServerMessage.java |  2 +-
 .../impl/nullpm/NullStorageManager.java            |  5 +++
 .../protocol/core/ServerSessionPacketHandler.java  |  2 +-
 .../ReplicationLargeMessageEndMessage.java         | 26 ++++++++++++--
 .../core/replication/ReplicatedLargeMessage.java   |  4 +--
 .../core/replication/ReplicationEndpoint.java      | 40 +++++++++++++++-------
 .../core/replication/ReplicationManager.java       |  8 ++++-
 .../artemis/core/server/LargeServerMessage.java    |  2 +-
 .../core/server/impl/ServerConsumerImpl.java       |  2 +-
 .../core/transaction/impl/TransactionImplTest.java |  5 +++
 .../integration/amqp/paging/AmqpPageTest.java      |  2 +-
 .../tests/integration/client/LargeMessageTest.java |  6 ++--
 .../tests/integration/client/SendAckFailTest.java  |  5 +++
 .../largemessage/ServerLargeMessageTest.java       |  4 +--
 .../persistence/XmlImportExportTest.java           |  6 ++--
 .../integration/replication/ReplicationTest.java   | 25 ++++++++++++++
 .../integration/server/ScaleDown3NodeTest.java     |  2 +-
 .../tests/unit/core/paging/impl/PageTest.java      |  2 +-
 25 files changed, 148 insertions(+), 47 deletions(-)

diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
index 171a275..771e745 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
@@ -150,7 +150,7 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
    }
 
    public void closeLargeMessage() throws Exception {
-      largeBody.releaseResources(false);
+      largeBody.releaseResources(false, true);
       parsingData.freeDirectBuffer();
       parsingData = null;
    }
@@ -443,8 +443,8 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
    }
 
    @Override
-   public void releaseResources(boolean sync) {
-      largeBody.releaseResources(sync);
+   public void releaseResources(boolean sync, boolean sendEvent) {
+      largeBody.releaseResources(sync, sendEvent);
 
    }
 
@@ -526,7 +526,7 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
          }
 
          largeBody.copyInto(copy, bufferNewHeader, place.intValue());
-         copy.releaseResources(true);
+         copy.releaseResources(true, true);
          return copy;
 
       } catch (Exception e) {
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
index 7aba86e..8cdc9de 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
@@ -380,7 +380,7 @@ public class StompSession implements SessionCallback {
 
       largeMessage.addBytes(bytes);
 
-      largeMessage.releaseResources(true);
+      largeMessage.releaseResources(true, true);
 
       largeMessage.toMessage().putLongProperty(Message.HDR_LARGE_BODY_SIZE, bytes.length);
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
index a35d2a9..48e4454 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
@@ -272,6 +272,8 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
     */
    SequentialFile createFileForLargeMessage(long messageID, LargeMessageExtension extension);
 
+   void largeMessageClosed(LargeServerMessage largeServerMessage) throws ActiveMQException;
+
    void deleteLargeMessageBody(LargeServerMessage largeServerMessage) throws ActiveMQException;
 
    default SequentialFile createFileForLargeMessage(long messageID, boolean durable) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index 735ffcc..b21e868 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -450,6 +450,18 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
    }
 
    @Override
+   public void largeMessageClosed(LargeServerMessage largeServerMessage) throws ActiveMQException {
+      readLock();
+      try {
+         if (isReplicated()) {
+            replicator.largeMessageClosed(largeServerMessage.toMessage().getMessageID(), JournalStorageManager.this);
+         }
+      } finally {
+         readUnLock();
+      }
+   }
+
+   @Override
    public void deleteLargeMessageBody(final LargeServerMessage largeServerMessage) throws ActiveMQException {
       synchronized (largeServerMessage) {
          if (!largeServerMessage.hasPendingRecord()) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java
index 4b1f020..a2e4273 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java
@@ -116,7 +116,7 @@ public class LargeBody {
    public synchronized void deleteFile() {
       try {
          validateFile();
-         releaseResources(false);
+         releaseResources(false, false);
          storageManager.deleteLargeMessageBody(message);
       } catch (Exception e) {
          storageManager.criticalError(e);
@@ -303,13 +303,20 @@ public class LargeBody {
       }
    }
 
-   public synchronized void releaseResources(boolean sync) {
+   /**
+    * sendEvent means it's a close happening from end of write largemessage.
+    * While reading the largemessage we don't need (and shouldn't inform the backup
+    */
+   public synchronized void releaseResources(boolean sync, boolean sendEvent) {
       if (file != null && file.isOpen()) {
          try {
             if (sync) {
                file.sync();
             }
             file.close(false, false);
+            if (sendEvent) {
+               storageManager.largeMessageClosed(message);
+            }
          } catch (Exception e) {
             ActiveMQServerLogger.LOGGER.largeMessageErrorReleasingResources(e);
          }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
index 2ee6114..6809808 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
@@ -63,7 +63,7 @@ public final class LargeServerMessageImpl extends CoreMessage implements CoreLar
       ActiveMQBuffer buffer = coreMessage.getReadOnlyBodyBuffer();
       final int readableBytes = buffer.readableBytes();
       lsm.addBytes(buffer);
-      lsm.releaseResources(true);
+      lsm.releaseResources(true, true);
       lsm.toMessage().putLongProperty(Message.HDR_LARGE_BODY_SIZE, readableBytes);
       return lsm.toMessage();
    }
@@ -254,9 +254,9 @@ public final class LargeServerMessageImpl extends CoreMessage implements CoreLar
    }
 
    @Override
-   public void releaseResources(boolean sync) {
+   public void releaseResources(boolean sync, boolean sendEvent) {
       synchronized (largeBody) {
-         largeBody.releaseResources(sync);
+         largeBody.releaseResources(sync, sendEvent);
       }
    }
 
@@ -293,7 +293,7 @@ public final class LargeServerMessageImpl extends CoreMessage implements CoreLar
       try {
          LargeServerMessage newMessage = storageManager.createLargeMessage(newID, this);
          largeBody.copyInto(newMessage);
-         newMessage.releaseResources(true);
+         newMessage.releaseResources(true, true);
          return newMessage.toMessage();
 
       } catch (Exception e) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageInSync.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageInSync.java
index 62d5365..dbf2480 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageInSync.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageInSync.java
@@ -96,11 +96,11 @@ public final class LargeServerMessageInSync implements ReplicatedLargeMessage {
    }
 
    @Override
-   public synchronized void releaseResources(boolean sync) {
+   public synchronized void releaseResources(boolean sync, boolean sendEvent) {
       if (logger.isTraceEnabled()) {
          logger.trace("release resources called on " + mainLM, new Exception("trace"));
       }
-      mainLM.releaseResources(sync);
+      mainLM.releaseResources(sync, sendEvent);
       if (appendFile != null && appendFile.isOpen()) {
          try {
             appendFile.close();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
index 69ea514..68c7f88 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
@@ -38,7 +38,7 @@ class NullStorageLargeServerMessage extends CoreMessage implements CoreLargeServ
    }
 
    @Override
-   public void releaseResources(boolean sync) {
+   public void releaseResources(boolean sync, boolean sendEvent) {
    }
 
    @Override
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
index d1783b2..8375060 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
@@ -608,6 +608,11 @@ public class NullStorageManager implements StorageManager {
    }
 
    @Override
+   public void largeMessageClosed(LargeServerMessage largeServerMessage) throws ActiveMQException {
+
+   }
+
+   @Override
    public boolean addToPage(PagingStore store,
                             Message msg,
                             Transaction tx,
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index 97d2963..aea354a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -1035,7 +1035,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
          currentLargeMessage.addBytes(body);
 
          if (!continues) {
-            currentLargeMessage.releaseResources(true);
+            currentLargeMessage.releaseResources(true, true);
 
             if (messageBodySize >= 0) {
                currentLargeMessage.toMessage().putLongProperty(Message.HDR_LARGE_BODY_SIZE, messageBodySize);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java
index a9be86a..2c1d808 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java
@@ -24,31 +24,38 @@ public class ReplicationLargeMessageEndMessage extends PacketImpl {
 
    long messageId;
    long pendingRecordId;
+   /**
+    * True = delete file, False = close file
+    */
+   private boolean isDelete;
 
    public ReplicationLargeMessageEndMessage() {
       super(PacketImpl.REPLICATION_LARGE_MESSAGE_END);
    }
 
-   public ReplicationLargeMessageEndMessage(final long messageId, final long pendingRecordId) {
+   public ReplicationLargeMessageEndMessage(final long messageId, final long pendingRecordId, final boolean isDelete) {
       this();
       this.messageId = messageId;
       //we use negative value to indicate that this id is pre-generated by live node
       //so that it won't be generated at backup.
       //see https://issues.apache.org/jira/browse/ARTEMIS-1221
       this.pendingRecordId = -pendingRecordId;
+      this.isDelete = isDelete;
    }
 
    @Override
    public int expectedEncodeSize() {
       return PACKET_HEADERS_SIZE +
          DataConstants.SIZE_LONG + // buffer.writeLong(messageId)
-         DataConstants.SIZE_LONG; // buffer.writeLong(pendingRecordId);
+         DataConstants.SIZE_LONG + // buffer.writeLong(pendingRecordId);
+         DataConstants.SIZE_BOOLEAN; // buffer.writeBoolean(isDelete);
    }
 
    @Override
    public void encodeRest(final ActiveMQBuffer buffer) {
       buffer.writeLong(messageId);
       buffer.writeLong(pendingRecordId);
+      buffer.writeBoolean(isDelete);
    }
 
    @Override
@@ -57,6 +64,9 @@ public class ReplicationLargeMessageEndMessage extends PacketImpl {
       if (buffer.readableBytes() >= DataConstants.SIZE_LONG) {
          pendingRecordId = buffer.readLong();
       }
+      if (buffer.readableBytes() >= DataConstants.SIZE_BOOLEAN) {
+         isDelete = buffer.readBoolean();
+      }
    }
 
    /**
@@ -70,6 +80,7 @@ public class ReplicationLargeMessageEndMessage extends PacketImpl {
    public int hashCode() {
       final int prime = 31;
       int result = super.hashCode();
+      result = prime * result + (isDelete ? 1231 : 1237);
       result = prime * result + (int) (messageId ^ (messageId >>> 32));
       return result;
    }
@@ -77,7 +88,7 @@ public class ReplicationLargeMessageEndMessage extends PacketImpl {
    @Override
    public String toString() {
       return "ReplicationLargeMessageEndMessage{" +
-         "messageId=" + messageId +
+         "messageId=" + messageId + ", isDelete=" + isDelete +
          '}';
    }
 
@@ -92,10 +103,19 @@ public class ReplicationLargeMessageEndMessage extends PacketImpl {
       ReplicationLargeMessageEndMessage other = (ReplicationLargeMessageEndMessage) obj;
       if (messageId != other.messageId)
          return false;
+      if (isDelete != other.isDelete)
+         return false;
       return true;
    }
 
    public long getPendingRecordId() {
       return pendingRecordId;
    }
+
+   /**
+    * @return the isDelete
+    */
+   public boolean isDelete() {
+      return isDelete;
+   }
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedLargeMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedLargeMessage.java
index d63167f..fdcd808 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedLargeMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedLargeMessage.java
@@ -38,9 +38,9 @@ public interface ReplicatedLargeMessage {
    Message setMessageID(long id);
 
    /**
-    * @see org.apache.activemq.artemis.core.server.LargeServerMessage#releaseResources(boolean)
+    * @see org.apache.activemq.artemis.core.server.LargeServerMessage#releaseResources(boolean,boolean)
     */
-   void releaseResources(boolean sync);
+   void releaseResources(boolean sync, boolean sendEvent);
 
    /**
     * @see org.apache.activemq.artemis.core.server.LargeServerMessage#deleteFile()
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
index 1729476..debdfb1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
@@ -346,7 +346,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
       }
 
       for (ReplicatedLargeMessage largeMessage : largeMessages.values()) {
-         largeMessage.releaseResources(true);
+         largeMessage.releaseResources(true, false);
       }
       largeMessages.clear();
 
@@ -615,22 +615,29 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
       if (logger.isTraceEnabled()) {
          logger.trace("handleLargeMessageEnd on " + packet.getMessageId());
       }
-      final ReplicatedLargeMessage message = lookupLargeMessage(packet.getMessageId(), true, false);
+      final ReplicatedLargeMessage message = lookupLargeMessage(packet.getMessageId(), packet.isDelete(), false);
       if (message != null) {
          message.setPendingRecordID(packet.getPendingRecordId());
-         executor.execute(new Runnable() {
-            @Override
-            public void run() {
-               try {
-                  if (logger.isTraceEnabled()) {
-                     logger.trace("Deleting LargeMessage " + packet.getMessageId() + " on the executor @ handleLargeMessageEnd");
+         if (!packet.isDelete()) {
+            if (logger.isTraceEnabled()) {
+               logger.trace("Closing LargeMessage " + packet.getMessageId() + " on the executor @ handleLargeMessageEnd");
+            }
+            message.releaseResources(true, false);
+         } else {
+            executor.execute(new Runnable() {
+               @Override
+               public void run() {
+                  try {
+                     if (logger.isTraceEnabled()) {
+                        logger.trace("Deleting LargeMessage " + packet.getMessageId() + " on the executor @ handleLargeMessageEnd");
+                     }
+                     message.deleteFile();
+                  } catch (Exception e) {
+                     ActiveMQServerLogger.LOGGER.errorDeletingLargeMessage(e, packet.getMessageId());
                   }
-                  message.deleteFile();
-               } catch (Exception e) {
-                  ActiveMQServerLogger.LOGGER.errorDeletingLargeMessage(e, packet.getMessageId());
                }
-            }
-         });
+            });
+         }
       }
    }
 
@@ -903,4 +910,11 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
    public void setExecutor(Executor executor2) {
       this.executor = executor2;
    }
+
+   /**
+    * This is for tests basically, do not use it as its API is not guaranteed for future usage.
+    */
+   public ConcurrentMap<Long, ReplicatedLargeMessage> getLargeMessages() {
+      return largeMessages;
+   }
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
index 6554cc8..9a2d629 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
@@ -256,7 +256,13 @@ public final class ReplicationManager implements ActiveMQComponent {
    public void largeMessageDelete(final Long messageId, JournalStorageManager storageManager) {
       if (enabled) {
          long pendingRecordID = storageManager.generateID();
-         sendReplicatePacket(new ReplicationLargeMessageEndMessage(messageId, pendingRecordID));
+         sendReplicatePacket(new ReplicationLargeMessageEndMessage(messageId, pendingRecordID, true));
+      }
+   }
+
+   public void largeMessageClosed(final Long messageId, JournalStorageManager storageManager) {
+      if (enabled) {
+         sendReplicatePacket(new ReplicationLargeMessageEndMessage(messageId, -1, false));
       }
    }
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java
index 6375f3b..b2d3aef 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java
@@ -52,7 +52,7 @@ public interface LargeServerMessage extends ReplicatedLargeMessage {
     * Close the files if opened
     */
    @Override
-   void releaseResources(boolean sync);
+   void releaseResources(boolean sync, boolean sendEvent);
 
    @Override
    void deleteFile() throws Exception;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index a3b3ac7..15864db 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -1418,7 +1418,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
                context = null;
             }
 
-            largeMessage.releaseResources(false);
+            largeMessage.releaseResources(false, false);
 
             largeMessage.toMessage().usageDown();
 
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
index 1ff7892..2deefdf 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
@@ -252,6 +252,11 @@ public class TransactionImplTest extends ActiveMQTestBase {
       }
 
       @Override
+      public void largeMessageClosed(LargeServerMessage largeServerMessage) throws ActiveMQException {
+
+      }
+
+      @Override
       public void addBytesToLargeMessage(SequentialFile file, long messageId, ActiveMQBuffer bytes) throws Exception {
 
       }
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/paging/AmqpPageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/paging/AmqpPageTest.java
index ff37554..4e78236 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/paging/AmqpPageTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/paging/AmqpPageTest.java
@@ -71,7 +71,7 @@ public class AmqpPageTest extends PageTest {
       } else {
          final AMQPLargeMessage message = createLargeMessage(storageManager, address, msgID, content);
          page.write(new PagedMessageImpl(message, new long[0]));
-         message.releaseResources(false);
+         message.releaseResources(false, false);
       }
    }
 }
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
index ad3a48b..d98876b 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
@@ -2495,7 +2495,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
       // The server would be doing this
       fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, largeMessageSize);
 
-      fileMessage.releaseResources(false);
+      fileMessage.releaseResources(false, false);
 
       Assert.assertEquals(largeMessageSize, fileMessage.getBodyBufferSize());
    }
@@ -2522,7 +2522,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
       // The server would be doing this
       fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, largeMessageSize);
 
-      fileMessage.releaseResources(false);
+      fileMessage.releaseResources(false, false);
 
       session.createQueue(new QueueConfiguration(ADDRESS));
 
@@ -2687,7 +2687,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
          fileMessage.addBytes(new byte[]{ActiveMQTestBase.getSamplebyte(i)});
       }
 
-      fileMessage.releaseResources(false);
+      fileMessage.releaseResources(false, false);
 
       session.createQueue(new QueueConfiguration(ADDRESS));
 
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
index 786de34..d16067a 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
@@ -863,6 +863,11 @@ public class SendAckFailTest extends SpawnedTestBase {
       }
 
       @Override
+      public void largeMessageClosed(LargeServerMessage largeServerMessage) throws ActiveMQException {
+         manager.largeMessageClosed(largeServerMessage);
+      }
+
+      @Override
       public void addBytesToLargeMessage(SequentialFile file, long messageId, ActiveMQBuffer bytes) throws Exception {
          manager.addBytesToLargeMessage(file, messageId, bytes);
       }
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/largemessage/ServerLargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/largemessage/ServerLargeMessageTest.java
index 35b960c..18be7cd 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/largemessage/ServerLargeMessageTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/largemessage/ServerLargeMessageTest.java
@@ -116,7 +116,7 @@ public class ServerLargeMessageTest extends ActiveMQTestBase {
          // The server would be doing this
          fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, 2 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
 
-         fileMessage.releaseResources(false);
+         fileMessage.releaseResources(false, true);
 
          session.createQueue(new QueueConfiguration("A").setRoutingType(RoutingType.ANYCAST));
 
@@ -339,7 +339,7 @@ public class ServerLargeMessageTest extends ActiveMQTestBase {
       largeServerMessage.setMessageID(1234);
       largeServerMessage.addBytes(new byte[0]);
       assertTrue(open.get());
-      largeServerMessage.releaseResources(true);
+      largeServerMessage.releaseResources(true, true);
       assertTrue(sync.get());
    }
 
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java
index f4ee96f..cc0ec1a 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java
@@ -472,7 +472,7 @@ public class XmlImportExportTest extends ActiveMQTestBase {
 
       fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, 2 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
 
-      fileMessage.releaseResources(false);
+      fileMessage.releaseResources(false, true);
 
       session.createQueue(new QueueConfiguration("A"));
 
@@ -544,7 +544,7 @@ public class XmlImportExportTest extends ActiveMQTestBase {
 
       fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, 2 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
 
-      fileMessage.releaseResources(false);
+      fileMessage.releaseResources(false, true);
 
       session.createQueue(new QueueConfiguration("A"));
 
@@ -888,7 +888,7 @@ public class XmlImportExportTest extends ActiveMQTestBase {
 
       fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, 2 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
 
-      fileMessage.releaseResources(false);
+      fileMessage.releaseResources(false, true);
 
       producer.send(fileMessage);
 
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
index da64521..b82e582 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
@@ -71,6 +71,7 @@ import org.apache.activemq.artemis.core.persistence.OperationContext;
 import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
+import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl;
 import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
 import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
 import org.apache.activemq.artemis.core.protocol.core.Packet;
@@ -78,6 +79,7 @@ import org.apache.activemq.artemis.core.replication.ReplicatedJournal;
 import org.apache.activemq.artemis.core.replication.ReplicationManager;
 import org.apache.activemq.artemis.core.server.ActiveMQComponent;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.LargeServerMessage;
 import org.apache.activemq.artemis.core.server.cluster.ClusterController;
 import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
 import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
@@ -539,6 +541,29 @@ public final class ReplicationTest extends ActiveMQTestBase {
       Assert.assertEquals(0, manager.getActiveTokens().size());
    }
 
+   @Test
+   public void testReplicationLargeMessageFileClose() throws Exception {
+      setupServer(true);
+
+      JournalStorageManager storage = getStorage();
+
+      manager = liveServer.getReplicationManager();
+      waitForComponent(manager);
+
+      CoreMessage msg = new CoreMessage().initBuffer(1024).setMessageID(1);
+      LargeServerMessage largeMsg = liveServer.getStorageManager().createLargeMessage(500, msg);
+      largeMsg.addBytes(new byte[1024]);
+      largeMsg.releaseResources(true, true);
+
+      blockOnReplication(storage, manager);
+
+      LargeServerMessageImpl message1 = (LargeServerMessageImpl) backupServer.getReplicationEndpoint().getLargeMessages().get(Long.valueOf(500));
+
+      Assert.assertNotNull(message1);
+      Assert.assertFalse(largeMsg.getAppendFile().isOpen());
+      Assert.assertFalse(message1.getAppendFile().isOpen());
+   }
+
    class FakeData implements EncodingSupport {
 
       @Override
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDown3NodeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDown3NodeTest.java
index 4a6c78d..726b079 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDown3NodeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDown3NodeTest.java
@@ -144,7 +144,7 @@ public class ScaleDown3NodeTest extends ClusterTestBase {
 
          fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, 2 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
 
-         fileMessage.releaseResources(false);
+         fileMessage.releaseResources(false, false);
 
          message = fileMessage;
       } else {
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTest.java
index eaf0359..bcf8808 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTest.java
@@ -293,7 +293,7 @@ public class PageTest extends ActiveMQTestBase {
          msg.addBytes(content);
          msg.setAddress(address);
          page.write(new PagedMessageImpl(msg, new long[0]));
-         msg.releaseResources(false);
+         msg.releaseResources(false, false);
       } else {
          ICoreMessage msg = new CoreMessage().initBuffer(100);
          msg.setMessageID(msgID);