You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by mi...@apache.org on 2019/07/15 22:52:24 UTC

[activemq-artemis] branch master updated: ARTEMIS-2336 Use zero copy to replicate journal/page/large message file

This is an automated email from the ASF dual-hosted git repository.

michaelpearce pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 85b93f0  ARTEMIS-2336 Use zero copy to replicate journal/page/large message file
     new 1eaf12b  This closes #2666
85b93f0 is described below

commit 85b93f0883bc06a2dfe2de9d560805a59d626d38
Author: yang wei <wy...@gmail.com>
AuthorDate: Mon May 13 12:32:58 2019 +0800

    ARTEMIS-2336 Use zero copy to replicate journal/page/large message file
---
 .../artemis/core/protocol/core/Channel.java        |  20 +++
 .../core/protocol/core/impl/ChannelImpl.java       | 118 +++++++++-----
 .../core/protocol/core/impl/PacketImpl.java        |  11 +-
 .../core/remoting/impl/netty/NettyConnection.java  |  41 +++++
 .../impl/netty/NonClosingDefaultFileRegion.java    |  38 +++++
 .../artemis/spi/core/remoting/Connection.java      |   4 +
 .../core/protocol/core/impl/ChannelImplTest.java   |  11 ++
 .../wireformat/ReplicationSyncFileMessage.java     | 170 ++++++++++++---------
 .../core/remoting/impl/invm/InVMConnection.java    |  24 +++
 .../core/replication/ReplicationManager.java       | 105 ++++++++-----
 .../wireformat/ReplicationSyncFileMessageTest.java |  85 +++++++++++
 .../integration/cluster/util/BackupSyncDelay.java  |   7 +
 .../remoting/impl/netty/NettyConnectionTest.java   |  28 ++++
 13 files changed, 513 insertions(+), 149 deletions(-)

diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java
index 56f8259..e541dad 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.artemis.core.protocol.core;
 
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
 import java.util.concurrent.locks.Lock;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -67,6 +69,20 @@ public interface Channel {
    boolean send(Packet packet);
 
    /**
+    * Sends a packet and file on this channel.
+    *
+    * @param packet the packet to send
+    * @param raf the file to send
+    * @param fileChannel the file channel retrieved from raf
+    * @param offset the position of the raf
+    * @param dataSize the data size to send
+    * @param callback callback after send
+    * @return false if the packet was rejected by an outgoing interceptor; true if the send was
+    * successful
+    */
+   boolean send(Packet packet, RandomAccessFile raf, FileChannel fileChannel, long offset, int dataSize, Callback callback);
+
+   /**
     * Sends a packet on this channel.
     *
     * @param packet the packet to send
@@ -247,4 +263,8 @@ public interface Channel {
     * @param transferring whether the channel is transferring
     */
    void setTransferring(boolean transferring);
+
+   interface Callback {
+      void done(boolean success);
+   }
 }
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
index fe876ed..9f36d81 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.artemis.core.protocol.core.impl;
 
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -25,6 +27,7 @@ import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import io.netty.channel.ChannelFutureListener;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
@@ -274,67 +277,104 @@ public final class ChannelImpl implements Channel {
       }
    }
 
-   // This must never called by more than one thread concurrently
-   private boolean send(final Packet packet, final int reconnectID, final boolean flush, final boolean batch) {
-      if (invokeInterceptors(packet, interceptors, connection) != null) {
-         return false;
+   private ActiveMQBuffer beforeSend(final Packet packet, final int reconnectID) {
+      packet.setChannelID(id);
+
+      if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {
+         packet.setCorrelationID(responseAsyncCache.nextCorrelationID());
       }
 
-      synchronized (sendLock) {
-         packet.setChannelID(id);
+      if (logger.isTraceEnabled()) {
+         logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Sending packet nonblocking " + packet + " on channelID=" + id);
+      }
 
-         if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {
-            packet.setCorrelationID(responseAsyncCache.nextCorrelationID());
+      ActiveMQBuffer buffer = packet.encode(connection);
+
+      lock.lock();
+
+      try {
+         if (failingOver) {
+            waitForFailOver("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " timed-out waiting for fail-over condition on non-blocking send");
          }
 
-         if (logger.isTraceEnabled()) {
-            logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Sending packet nonblocking " + packet + " on channelID=" + id);
+         // Sanity check
+         if (transferring) {
+            throw ActiveMQClientMessageBundle.BUNDLE.cannotSendPacketDuringFailover();
          }
 
-         ActiveMQBuffer buffer = packet.encode(connection);
+         if (resendCache != null && packet.isRequiresConfirmations()) {
+            addResendPacket(packet);
+         }
 
-         lock.lock();
+      } finally {
+         lock.unlock();
+      }
 
-         try {
-            if (failingOver) {
-               waitForFailOver("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " timed-out waiting for fail-over condition on non-blocking send");
-            }
+      if (logger.isTraceEnabled()) {
+         logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Writing buffer for channelID=" + id);
+      }
 
-            // Sanity check
-            if (transferring) {
-               throw ActiveMQClientMessageBundle.BUNDLE.cannotSendPacketDuringFailover();
-            }
+      checkReconnectID(reconnectID);
 
-            if (resendCache != null && packet.isRequiresConfirmations()) {
-               addResendPacket(packet);
+      //We do this outside the lock as ResponseCache is threadsafe and allows responses to come in,
+      //As the send could block if the response cache cannot add, preventing responses to be handled.
+      if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {
+         while (!responseAsyncCache.add(packet)) {
+            try {
+               Thread.sleep(1);
+            } catch (Exception e) {
+               // Ignore
             }
-
-         } finally {
-            lock.unlock();
          }
+      }
 
-         if (logger.isTraceEnabled()) {
-            logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Writing buffer for channelID=" + id);
-         }
+      return buffer;
+   }
 
-         checkReconnectID(reconnectID);
+   // This must never called by more than one thread concurrently
+   private boolean send(final Packet packet, final int reconnectID, final boolean flush, final boolean batch) {
+      if (invokeInterceptors(packet, interceptors, connection) != null) {
+         return false;
+      }
 
-         //We do this outside the lock as ResponseCache is threadsafe and allows responses to come in,
-         //As the send could block if the response cache cannot add, preventing responses to be handled.
-         if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {
-            while (!responseAsyncCache.add(packet)) {
-               try {
-                  Thread.sleep(1);
-               } catch (Exception e) {
-                  // Ignore
-               }
+      synchronized (sendLock) {
+         ActiveMQBuffer buffer = beforeSend(packet, reconnectID);
+
+         // The actual send must be outside the lock, or with OIO transport, the write can block if the tcp
+         // buffer is full, preventing any incoming buffers being handled and blocking failover
+         try {
+            connection.getTransportConnection().write(buffer, flush, batch);
+         } catch (Throwable t) {
+            //If runtime exception, we must remove from the cache to avoid filling up the cache causing it to be full.
+            //The client would get still know about this as the exception bubbles up the call stack instead.
+            if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {
+               responseAsyncCache.remove(packet.getCorrelationID());
             }
+            throw t;
          }
+         return true;
+      }
+   }
+
+   @Override
+   public boolean send(Packet packet,
+                       RandomAccessFile raf,
+                       FileChannel fileChannel,
+                       long offset,
+                       int dataSize,
+                       Callback callback) {
+      if (invokeInterceptors(packet, interceptors, connection) != null) {
+         return false;
+      }
+
+      synchronized (sendLock) {
+         ActiveMQBuffer buffer = beforeSend(packet, -1);
 
          // The actual send must be outside the lock, or with OIO transport, the write can block if the tcp
          // buffer is full, preventing any incoming buffers being handled and blocking failover
          try {
-            connection.getTransportConnection().write(buffer, flush, batch);
+            connection.getTransportConnection().write(buffer);
+            connection.getTransportConnection().write(raf, fileChannel, offset, dataSize, callback == null ? null : (ChannelFutureListener) future -> callback.done(future == null || future.isSuccess()));
          } catch (Throwable t) {
             //If runtime exception, we must remove from the cache to avoid filling up the cache causing it to be full.
             //The client would get still know about this as the exception bubbles up the call stack instead.
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
index f8f85e8..a7a3253 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
@@ -336,7 +336,11 @@ public class PacketImpl implements Packet {
    }
 
    protected void encodeSize(ActiveMQBuffer buffer) {
-      size = buffer.writerIndex();
+      encodeSize(buffer, buffer.writerIndex());
+   }
+
+   protected void encodeSize(ActiveMQBuffer buffer, int size) {
+      this.size = size;
 
       // The length doesn't include the actual length byte
       int len = size - DataConstants.SIZE_INT;
@@ -345,9 +349,10 @@ public class PacketImpl implements Packet {
    }
 
    protected ActiveMQBuffer createPacket(CoreRemotingConnection connection) {
+      return createPacket(connection, expectedEncodeSize());
+   }
 
-      int size = expectedEncodeSize();
-
+   protected ActiveMQBuffer createPacket(CoreRemotingConnection connection, int size) {
       if (connection == null) {
          return new ChannelBufferWrapper(Unpooled.buffer(size));
       } else {
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
index 51330c7..497448e 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
@@ -16,7 +16,10 @@
  */
 package org.apache.activemq.artemis.core.remoting.impl.netty;
 
+import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.net.SocketAddress;
+import java.nio.channels.FileChannel;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -29,6 +32,8 @@ import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelPromise;
 import io.netty.channel.EventLoop;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.handler.stream.ChunkedFile;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
@@ -350,6 +355,18 @@ public class NettyConnection implements Connection {
       return canWrite;
    }
 
+   private Object getFileObject(RandomAccessFile raf, FileChannel fileChannel, long offset, int dataSize) {
+      if (channel.pipeline().get(SslHandler.class) == null) {
+         return new NonClosingDefaultFileRegion(fileChannel, offset, dataSize);
+      } else {
+         try {
+            return new ChunkedFile(raf, offset, dataSize, 8192);
+         } catch (IOException e) {
+            throw new RuntimeException(e);
+         }
+      }
+   }
+
    @Override
    public final void write(ActiveMQBuffer buffer,
                            final boolean flush,
@@ -390,6 +407,30 @@ public class NettyConnection implements Connection {
       }
    }
 
+   @Override
+   public void write(RandomAccessFile raf,
+                     FileChannel fileChannel,
+                     long offset,
+                     int dataSize,
+                     final ChannelFutureListener futureListener) {
+      final int readableBytes = dataSize;
+      if (logger.isDebugEnabled()) {
+         final int remainingBytes = this.writeBufferHighWaterMark - readableBytes;
+         if (remainingBytes < 0) {
+            logger.debug("a write request is exceeding by " + (-remainingBytes) + " bytes the writeBufferHighWaterMark size [ " + this.writeBufferHighWaterMark + " ] : consider to set it at least of " + readableBytes + " bytes");
+         }
+      }
+
+      //no need to lock because the Netty's channel is thread-safe
+      //and the order of write is ensured by the order of the write calls
+      final Channel channel = this.channel;
+      assert readableBytes >= 0;
+      ChannelFuture channelFuture = channel.writeAndFlush(getFileObject(raf, fileChannel, offset, dataSize));
+      if (futureListener != null) {
+         channelFuture.addListener(futureListener);
+      }
+   }
+
    private static void flushAndWait(final Channel channel, final ChannelPromise promise) {
       if (!channel.eventLoop().inEventLoop()) {
          waitFor(promise, DEFAULT_WAIT_MILLIS);
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NonClosingDefaultFileRegion.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NonClosingDefaultFileRegion.java
new file mode 100644
index 0000000..4fc367f
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NonClosingDefaultFileRegion.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.remoting.impl.netty;
+
+import java.io.File;
+import java.nio.channels.FileChannel;
+
+import io.netty.channel.DefaultFileRegion;
+
+public class NonClosingDefaultFileRegion extends DefaultFileRegion {
+
+   public NonClosingDefaultFileRegion(FileChannel file, long position, long count) {
+      super(file, position, count);
+   }
+
+   public NonClosingDefaultFileRegion(File f, long position, long count) {
+      super(f, position, count);
+   }
+
+   @Override
+   protected void deallocate() {
+      // Overridden to avoid closing the file
+   }
+}
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
index ebde456..fe5d395 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.artemis.spi.core.remoting;
 
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
 import java.util.concurrent.TimeUnit;
 
 import io.netty.channel.ChannelFutureListener;
@@ -101,6 +103,8 @@ public interface Connection {
     */
    void write(ActiveMQBuffer buffer);
 
+   void write(RandomAccessFile raf, FileChannel fileChannel, long offset, int dataSize, ChannelFutureListener futureListener);
+
    /**
     * This should close the internal channel without calling any listeners.
     * This is to avoid a situation where the broker is busy writing on an internal thread.
diff --git a/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java b/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java
index 9908d97..e9181f8 100644
--- a/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java
+++ b/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java
@@ -17,6 +17,8 @@
 package org.apache.activemq.artemis.core.protocol.core.impl;
 
 import javax.security.auth.Subject;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -387,6 +389,15 @@ public class ChannelImplTest {
             }
 
             @Override
+            public void write(RandomAccessFile raf,
+                              FileChannel fileChannel,
+                              long offset,
+                              int dataSize,
+                              ChannelFutureListener channelFutureListener) {
+
+            }
+
+            @Override
             public void forceClose() {
 
             }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
index b81782b..5a30c64 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
@@ -16,22 +16,30 @@
  */
 package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
 
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 import java.util.Arrays;
 import java.util.EnumSet;
+import java.util.Objects;
 import java.util.Set;
 
-import io.netty.buffer.ByteBuf;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager;
+import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
 import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
+import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
 import org.apache.activemq.artemis.utils.DataConstants;
+import org.jboss.logging.Logger;
 
 /**
  * Message is used to sync {@link org.apache.activemq.artemis.core.io.SequentialFile}s to a backup server. The {@link FileType} controls
  * which extra information is sent.
  */
 public final class ReplicationSyncFileMessage extends PacketImpl {
+   private static final Logger logger = Logger.getLogger(ReplicationSyncFileMessage.class);
 
    /**
     * The JournalType or {@code null} if sync'ing large-messages.
@@ -43,10 +51,12 @@ public final class ReplicationSyncFileMessage extends PacketImpl {
     */
    private long fileId;
    private int dataSize;
-   private ByteBuf byteBuffer;
    private byte[] byteArray;
    private SimpleString pageStoreName;
    private FileType fileType;
+   private RandomAccessFile raf;
+   private FileChannel fileChannel;
+   private long offset;
 
    public enum FileType {
       JOURNAL(0), PAGE(1), LARGE_MESSAGE(2);
@@ -78,14 +88,18 @@ public final class ReplicationSyncFileMessage extends PacketImpl {
    public ReplicationSyncFileMessage(AbstractJournalStorageManager.JournalContent content,
                                      SimpleString storeName,
                                      long id,
-                                     int size,
-                                     ByteBuf buffer) {
+                                     RandomAccessFile raf,
+                                     FileChannel fileChannel,
+                                     long offset,
+                                     int size) {
       this();
-      this.byteBuffer = buffer;
       this.pageStoreName = storeName;
       this.dataSize = size;
       this.fileId = id;
+      this.raf = raf;
+      this.fileChannel = fileChannel;
       this.journalType = content;
+      this.offset = offset;
       determineType();
    }
 
@@ -99,10 +113,30 @@ public final class ReplicationSyncFileMessage extends PacketImpl {
       }
    }
 
+   public long getFileId() {
+      return fileId;
+   }
+
+   public int getDataSize() {
+      return dataSize;
+   }
+
+   public RandomAccessFile getRaf() {
+      return raf;
+   }
+
+   public FileChannel getFileChannel() {
+      return fileChannel;
+   }
+
+   public long getOffset() {
+      return offset;
+   }
+
    @Override
    public int expectedEncodeSize() {
       int size = PACKET_HEADERS_SIZE +
-                 DataConstants.SIZE_LONG; // buffer.writeLong(fileId);
+         DataConstants.SIZE_LONG; // buffer.writeLong(fileId);
 
       if (fileId == -1)
          return size;
@@ -125,7 +159,7 @@ public final class ReplicationSyncFileMessage extends PacketImpl {
       size += DataConstants.SIZE_INT; // buffer.writeInt(dataSize);
 
       if (dataSize > 0) {
-         size += byteBuffer.writerIndex(); // buffer.writeBytes(byteBuffer, 0, byteBuffer.writerIndex());
+         size += dataSize;
       }
 
       return size;
@@ -150,30 +184,55 @@ public final class ReplicationSyncFileMessage extends PacketImpl {
          default:
             // no-op
       }
-
       buffer.writeInt(dataSize);
-      /*
-       * sending -1 will close the file in case of a journal, but not in case of a largeMessage
-       * (which might receive appends)
-       */
-      if (dataSize > 0) {
-         buffer.writeBytes(byteBuffer, 0, byteBuffer.writerIndex());
-      }
+   }
 
-      release();
+   @Override
+   public ActiveMQBuffer encode(CoreRemotingConnection connection) {
+      if (fileId != -1 && dataSize > 0) {
+         ActiveMQBuffer buffer;
+         int bufferSize = expectedEncodeSize();
+         int encodedSize = bufferSize;
+         boolean isNetty = false;
+         if (connection != null && connection.getTransportConnection() instanceof NettyConnection) {
+            bufferSize -= dataSize;
+            isNetty = true;
+         }
+         buffer = createPacket(connection, bufferSize);
+         encodeHeader(buffer);
+         encodeRest(buffer, connection);
+         if (!isNetty) {
+            ByteBuffer byteBuffer;
+            if (buffer.byteBuf() != null && buffer.byteBuf().nioBufferCount() == 1) {
+               byteBuffer = buffer.byteBuf().internalNioBuffer(buffer.writerIndex(), buffer.writableBytes());
+            } else {
+               byteBuffer = buffer.toByteBuffer(buffer.writerIndex(), buffer.writableBytes());
+            }
+            readFile(byteBuffer);
+            buffer.writerIndex(buffer.capacity());
+         }
+         encodeSize(buffer, encodedSize);
+         return buffer;
+      } else {
+         return super.encode(connection);
+      }
    }
 
    @Override
    public void release() {
-      if (byteBuffer != null) {
-         byteBuffer.release();
-         byteBuffer = null;
+      if (raf != null) {
+         try {
+            raf.close();
+         } catch (IOException e) {
+            logger.error("Close file " + this + " failed", e);
+         }
       }
    }
 
    @Override
    public void decodeRest(final ActiveMQBuffer buffer) {
       fileId = buffer.readLong();
+      if (fileId == -1) return;
       switch (FileType.getFileType(buffer.readByte())) {
          case JOURNAL: {
             journalType = AbstractJournalStorageManager.JournalContent.getType(buffer.readByte());
@@ -197,6 +256,14 @@ public final class ReplicationSyncFileMessage extends PacketImpl {
       }
    }
 
+   private void readFile(ByteBuffer buffer) {
+      try {
+         fileChannel.read(buffer, offset);
+      } catch (IOException e) {
+         throw new RuntimeException(e);
+      }
+   }
+
    public long getId() {
       return fileId;
    }
@@ -218,61 +285,22 @@ public final class ReplicationSyncFileMessage extends PacketImpl {
    }
 
    @Override
-   public int hashCode() {
-      final int prime = 31;
-      int result = super.hashCode();
-      result = prime * result + Arrays.hashCode(byteArray);
-      result = prime * result + ((byteBuffer == null) ? 0 : byteBuffer.hashCode());
-      result = prime * result + dataSize;
-      result = prime * result + (int) (fileId ^ (fileId >>> 32));
-      result = prime * result + ((fileType == null) ? 0 : fileType.hashCode());
-      result = prime * result + ((journalType == null) ? 0 : journalType.hashCode());
-      result = prime * result + ((pageStoreName == null) ? 0 : pageStoreName.hashCode());
-      return result;
-   }
-
-   @Override
-   public boolean equals(Object obj) {
-      if (this == obj) {
+   public boolean equals(Object o) {
+      if (this == o)
          return true;
-      }
-      if (!super.equals(obj)) {
-         return false;
-      }
-      if (!(obj instanceof ReplicationSyncFileMessage)) {
-         return false;
-      }
-      ReplicationSyncFileMessage other = (ReplicationSyncFileMessage) obj;
-      if (!Arrays.equals(byteArray, other.byteArray)) {
-         return false;
-      }
-      if (byteBuffer == null) {
-         if (other.byteBuffer != null) {
-            return false;
-         }
-      } else if (!byteBuffer.equals(other.byteBuffer)) {
+      if (o == null || getClass() != o.getClass())
          return false;
-      }
-      if (dataSize != other.dataSize) {
-         return false;
-      }
-      if (fileId != other.fileId) {
+      if (!super.equals(o))
          return false;
-      }
-      if (fileType != other.fileType) {
-         return false;
-      }
-      if (journalType != other.journalType) {
-         return false;
-      }
-      if (pageStoreName == null) {
-         if (other.pageStoreName != null) {
-            return false;
-         }
-      } else if (!pageStoreName.equals(other.pageStoreName)) {
-         return false;
-      }
-      return true;
+      ReplicationSyncFileMessage that = (ReplicationSyncFileMessage) o;
+      return fileId == that.fileId && dataSize == that.dataSize && offset == that.offset && journalType == that.journalType && Arrays.equals(byteArray, that.byteArray) && Objects.equals(pageStoreName, that.pageStoreName) && fileType == that.fileType && Objects.equals(raf, that.raf) && Objects.equals(fileChannel, that.fileChannel);
+   }
+
+   @Override
+   public int hashCode() {
+      int result = Objects.hash(super.hashCode(), journalType, fileId, dataSize, pageStoreName, fileType, raf, fileChannel, offset);
+      result = 31 * result + Arrays.hashCode(byteArray);
+      return result;
    }
 
    @Override
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java
index b2fc576..02f1c84 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.artemis.core.remoting.impl.invm;
 
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
@@ -243,6 +245,28 @@ public class InVMConnection implements Connection {
    }
 
    @Override
+   public void write(RandomAccessFile raf,
+                     FileChannel fileChannel,
+                     long offset,
+                     int dataSize,
+                     final ChannelFutureListener futureListener) {
+      if (futureListener == null) {
+         return;
+      }
+      try {
+         executor.execute(() -> {
+            try {
+               futureListener.operationComplete(null);
+            } catch (Exception e) {
+               throw new IllegalStateException(e);
+            }
+         });
+      } catch (RejectedExecutionException e) {
+
+      }
+   }
+
+   @Override
    public String getRemoteAddress() {
       return "invm:" + serverID;
    }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
index 1d1217d..d48a5a0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
@@ -16,8 +16,7 @@
  */
 package org.apache.activemq.artemis.core.replication;
 
-import java.io.FileInputStream;
-import java.nio.ByteBuffer;
+import java.io.RandomAccessFile;
 import java.nio.channels.FileChannel;
 import java.util.ArrayList;
 import java.util.LinkedHashSet;
@@ -28,8 +27,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
@@ -392,6 +389,39 @@ public final class ReplicationManager implements ActiveMQComponent {
       return repliToken;
    }
 
+   private OperationContext sendSyncFileMessage(final ReplicationSyncFileMessage syncFileMessage, boolean lastChunk) {
+      if (!enabled) {
+         syncFileMessage.release();
+         return null;
+      }
+
+      final OperationContext repliToken = OperationContextImpl.getContext(ioExecutorFactory);
+      repliToken.replicationLineUp();
+
+      replicationStream.execute(() -> {
+         if (enabled) {
+            try {
+               pendingTokens.add(repliToken);
+               flowControl(syncFileMessage.expectedEncodeSize());
+               if (syncFileMessage.getFileId() != -1 && syncFileMessage.getDataSize() > 0) {
+                  replicatingChannel.send(syncFileMessage, syncFileMessage.getRaf(), syncFileMessage.getFileChannel(),
+                                          syncFileMessage.getOffset(), syncFileMessage.getDataSize(),
+                                          lastChunk ? (Channel.Callback) success -> syncFileMessage.release() : null);
+               } else {
+                  replicatingChannel.send(syncFileMessage);
+               }
+            } catch (Exception e) {
+               syncFileMessage.release();
+            }
+         } else {
+            syncFileMessage.release();
+            repliToken.replicationDone();
+         }
+      });
+
+      return repliToken;
+   }
+
    /**
     * This was written as a refactoring of sendReplicatePacket.
     * In case you refactor this in any way, this method must hold a lock on replication lock. .
@@ -560,49 +590,52 @@ public final class ReplicationManager implements ActiveMQComponent {
       if (!file.isOpen()) {
          file.open();
       }
-      int size = 32 * 1024;
+      final int size = 1024 * 1024;
+      long fileSize = file.size();
 
       int flowControlSize = 10;
 
       int packetsSent = 0;
       FlushAction action = new FlushAction();
 
+      long offset = 0;
+      RandomAccessFile raf = null;
+      FileChannel fileChannel = null;
       try {
-         try (FileInputStream fis = new FileInputStream(file.getJavaFile()); FileChannel channel = fis.getChannel()) {
-
-            // We can afford having a single buffer here for this entire loop
-            // because sendReplicatePacket will encode the packet as a NettyBuffer
-            // through ActiveMQBuffer class leaving this buffer free to be reused on the next copy
-            while (true) {
-               final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size, size);
-               buffer.clear();
-               ByteBuffer byteBuffer = buffer.writerIndex(size).readerIndex(0).nioBuffer();
-               final int bytesRead = channel.read(byteBuffer);
-               int toSend = bytesRead;
-               if (bytesRead > 0) {
-                  if (bytesRead >= maxBytesToSend) {
-                     toSend = (int) maxBytesToSend;
-                     maxBytesToSend = 0;
-                  } else {
-                     maxBytesToSend = maxBytesToSend - bytesRead;
-                  }
-               }
-               logger.debug("sending " + buffer.writerIndex() + " bytes on file " + file.getFileName());
-               // sending -1 or 0 bytes will close the file at the backup
-               // We cannot simply send everything of a file through the executor,
-               // otherwise we would run out of memory.
-               // so we don't use the executor here
-               sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer), true);
-               packetsSent++;
-
-               if (packetsSent % flowControlSize == 0) {
-                  flushReplicationStream(action);
+         raf = new RandomAccessFile(file.getJavaFile(), "r");
+         fileChannel = raf.getChannel();
+         while (true) {
+            long chunkSize = Math.min(size, fileSize - offset);
+            int toSend = (int) chunkSize;
+            if (chunkSize > 0) {
+               if (chunkSize >= maxBytesToSend) {
+                  toSend = (int) maxBytesToSend;
+                  maxBytesToSend = 0;
+               } else {
+                  maxBytesToSend = maxBytesToSend - chunkSize;
                }
-               if (bytesRead == -1 || bytesRead == 0 || maxBytesToSend == 0)
-                  break;
             }
+            logger.debug("sending " + toSend + " bytes on file " + file.getFileName());
+            // sending -1 or 0 bytes will close the file at the backup
+            // We cannot simply send everything of a file through the executor,
+            // otherwise we would run out of memory.
+            // so we don't use the executor here
+            sendSyncFileMessage(new ReplicationSyncFileMessage(content, pageStore, id, raf, fileChannel, offset, toSend), offset + toSend == fileSize);
+            packetsSent++;
+            offset += toSend;
+
+            if (packetsSent % flowControlSize == 0) {
+               flushReplicationStream(action);
+            }
+            if (toSend == 0 || maxBytesToSend == 0)
+               break;
          }
          flushReplicationStream(action);
+
+      } catch (Exception e) {
+         if (raf != null)
+            raf.close();
+         throw e;
       } finally {
          if (file.isOpen())
             file.close();
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessageTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessageTest.java
new file mode 100644
index 0000000..812de2c
--- /dev/null
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessageTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
+
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.util.HashMap;
+
+import io.netty.channel.embedded.EmbeddedChannel;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.core.io.SequentialFile;
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
+import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
+import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
+import org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl;
+import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnection;
+import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.utils.DataConstants;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager.JournalContent.MESSAGES;
+
+public class ReplicationSyncFileMessageTest extends ActiveMQTestBase {
+   @Test
+   public void testNettyConnectionEncodeMessage() throws Exception {
+      int dataSize = 10;
+      NettyConnection conn = new NettyConnection(new HashMap<>(), new EmbeddedChannel(), null, false, false);
+
+      SequentialFileFactory factory = new NIOSequentialFileFactory(temporaryFolder.getRoot(), 100);
+      SequentialFile file = factory.createSequentialFile("file1.bin");
+      file.open();
+      RandomAccessFile raf = new RandomAccessFile(file.getJavaFile(), "r");
+      FileChannel fileChannel = raf.getChannel();
+      ReplicationSyncFileMessage replicationSyncFileMessage = new ReplicationSyncFileMessage(MESSAGES,
+                                                                                             null, 10, raf, fileChannel, 0, dataSize);
+      RemotingConnectionImpl remotingConnection = new RemotingConnectionImpl(null, conn, 10, 10, null, null);
+      ActiveMQBuffer buffer = replicationSyncFileMessage.encode(remotingConnection);
+      Assert.assertEquals(buffer.getInt(0), replicationSyncFileMessage.expectedEncodeSize() - DataConstants.SIZE_INT);
+      Assert.assertEquals(buffer.capacity(), replicationSyncFileMessage.expectedEncodeSize() - dataSize);
+      file.close();
+   }
+
+
+   @Test
+   public void testInVMConnectionEncodeMessage() throws Exception {
+      int fileId = 10;
+      InVMConnection conn = new InVMConnection(0, null, null, null);
+
+      SequentialFileFactory factory = new NIOSequentialFileFactory(temporaryFolder.getRoot(), 100);
+      SequentialFile file = factory.createSequentialFile("file1.bin");
+      file.open();
+      RandomAccessFile raf = new RandomAccessFile(file.getJavaFile(), "r");
+      FileChannel fileChannel = raf.getChannel();
+      ReplicationSyncFileMessage replicationSyncFileMessage = new ReplicationSyncFileMessage(MESSAGES,
+                                                                                             null, fileId, raf, fileChannel, 0, 0);
+      RemotingConnectionImpl remotingConnection = new RemotingConnectionImpl(null, conn, 10, 10, null, null);
+      ActiveMQBuffer buffer = replicationSyncFileMessage.encode(remotingConnection);
+      Assert.assertEquals(buffer.readInt(), replicationSyncFileMessage.expectedEncodeSize() - DataConstants.SIZE_INT);
+      Assert.assertEquals(buffer.capacity(), replicationSyncFileMessage.expectedEncodeSize());
+
+      Assert.assertEquals(buffer.readByte(), PacketImpl.REPLICATION_SYNC_FILE);
+
+      ReplicationSyncFileMessage decodedReplicationSyncFileMessage = new ReplicationSyncFileMessage();
+      decodedReplicationSyncFileMessage.decode(buffer);
+      Assert.assertEquals(decodedReplicationSyncFileMessage.getJournalContent(), MESSAGES);
+      Assert.assertNull(decodedReplicationSyncFileMessage.getData());
+      file.close();
+   }
+}
\ No newline at end of file
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java
index c7ed869..c55764a 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.artemis.tests.integration.cluster.util;
 
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
 import java.util.concurrent.locks.Lock;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -212,6 +214,11 @@ public class BackupSyncDelay implements Interceptor {
       }
 
       @Override
+      public boolean send(Packet packet, RandomAccessFile raf, FileChannel fileChannel, long offset, int dataSize, Callback callback) {
+         return true;
+      }
+
+      @Override
       public boolean sendBatched(Packet packet) {
          throw new UnsupportedOperationException();
 
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
index c9c975c..23ae5f9 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
@@ -16,7 +16,9 @@
  */
 package org.apache.activemq.artemis.tests.unit.core.remoting.impl.netty;
 
+import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -29,6 +31,9 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.core.io.SequentialFile;
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
+import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
 import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
 import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
 import org.apache.activemq.artemis.core.server.ActiveMQComponent;
@@ -77,6 +82,29 @@ public class NettyConnectionTest extends ActiveMQTestBase {
 
    }
 
+   @Test
+   public void testWritePacketAndFile() throws Exception {
+      EmbeddedChannel channel = createChannel();
+      NettyConnection conn = new NettyConnection(emptyMap, channel, new MyListener(), false, false);
+
+      final int size = 1234;
+
+      ActiveMQBuffer buff = conn.createTransportBuffer(size);
+      buff.writeByte((byte) 0x00); // Netty buffer does lazy initialization.
+      SequentialFileFactory factory = new NIOSequentialFileFactory(temporaryFolder.getRoot(), 100);
+      SequentialFile file = factory.createSequentialFile("file1.bin");
+      file.open();
+      RandomAccessFile raf = new RandomAccessFile(file.getJavaFile(), "r");
+      FileChannel fileChannel = raf.getChannel();
+
+      conn.write(buff);
+      conn.write(raf, fileChannel, 0, size, future -> raf.close());
+      channel.runPendingTasks();
+      Assert.assertEquals(2, channel.outboundMessages().size());
+      Assert.assertFalse(fileChannel.isOpen());
+      file.close();
+   }
+
    @Test(expected = IllegalStateException.class)
    public void throwsExceptionOnBlockUntilWritableIfClosed() {
       EmbeddedChannel channel = createChannel();