You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2017/04/07 14:10:59 UTC

[1/2] activemq-artemis git commit: ARTEMIS-1089 Improving flow control on replication

Repository: activemq-artemis
Updated Branches:
  refs/heads/master d29afd1c8 -> 7304416d4


ARTEMIS-1089 Improving flow control on replication


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/911888e8
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/911888e8
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/911888e8

Branch: refs/heads/master
Commit: 911888e8d19c98400a68f969b1c8f86bff51fa17
Parents: d29afd1
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Apr 6 11:47:31 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Apr 6 21:45:31 2017 -0400

----------------------------------------------------------------------
 .../protocol/core/CoreRemotingConnection.java   |   8 ++
 .../artemis/core/protocol/core/Packet.java      |  10 ++
 .../core/protocol/core/impl/PacketImpl.java     |   9 +-
 .../core/impl/RemotingConnectionImpl.java       |   6 +
 .../core/impl/wireformat/MessagePacket.java     |  12 --
 .../wireformat/SessionContinuationMessage.java  |  16 +--
 .../SessionReceiveContinuationMessage.java      |   6 +-
 .../impl/wireformat/SessionReceiveMessage.java  |   6 +-
 .../SessionSendContinuationMessage.java         |   6 +-
 .../impl/wireformat/SessionSendMessage.java     |   5 +-
 .../impl/wireformat/ReplicationAddMessage.java  |  13 +-
 .../wireformat/ReplicationAddTXMessage.java     |  13 ++
 .../wireformat/ReplicationCommitMessage.java    |   9 ++
 .../wireformat/ReplicationDeleteMessage.java    |   9 ++
 .../wireformat/ReplicationDeleteTXMessage.java  |  11 ++
 .../ReplicationLargeMessageBeginMessage.java    |   9 ++
 .../ReplicationLargeMessageEndMessage.java      |   8 ++
 .../ReplicationLargeMessageWriteMessage.java    |  10 ++
 .../ReplicationLiveIsStoppingMessage.java       |   7 ++
 .../wireformat/ReplicationPageEventMessage.java |   9 ++
 .../wireformat/ReplicationPageWriteMessage.java |   8 ++
 .../wireformat/ReplicationPrepareMessage.java   |  10 ++
 .../wireformat/ReplicationResponseMessage.java  |   7 ++
 .../ReplicationResponseMessageV2.java           |   7 ++
 .../wireformat/ReplicationStartSyncMessage.java |  21 ++++
 .../wireformat/ReplicationSyncFileMessage.java  |  38 +++++-
 .../impl/netty/NettyServerConnection.java       |   6 -
 .../core/replication/ReplicationManager.java    | 120 +++++++------------
 28 files changed, 267 insertions(+), 132 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/911888e8/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java
index f2aa5b4..45d9229 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java
@@ -111,4 +111,12 @@ public interface CoreRemotingConnection extends RemotingConnection {
     * @return the principal
     */
    ActiveMQPrincipal getDefaultActiveMQPrincipal();
+
+   /**
+    *
+    * @param size size we are trying to write
+    * @param timeout
+    * @return
+    */
+   boolean blockUntilWritable(int size, long timeout);
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/911888e8/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java
index ddb734e..a86c5c1 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java
@@ -24,6 +24,8 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
  */
 public interface Packet {
 
+   int INITIAL_PACKET_SIZE = 1500;
+
    /**
     * Sets the channel id that should be used once the packet has been successfully decoded it is
     * sent to the correct channel.
@@ -33,6 +35,14 @@ public interface Packet {
    void setChannelID(long channelID);
 
    /**
+    * This will return the expected packet size for the encoding
+    * @return
+    */
+   default int expectedEncodeSize() {
+      return INITIAL_PACKET_SIZE;
+   }
+
+   /**
     * Returns the channel id of the channel that should handle this packet.
     *
     * @return the id of the channel

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/911888e8/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
index 75f5086..97a7973 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
@@ -39,8 +39,6 @@ public class PacketImpl implements Packet {
    public static final int PACKET_HEADERS_SIZE = DataConstants.SIZE_INT + DataConstants.SIZE_BYTE +
       DataConstants.SIZE_LONG;
 
-   private static final int INITIAL_PACKET_SIZE = 1500;
-
    protected long channelID;
 
    private final byte type;
@@ -329,10 +327,13 @@ public class PacketImpl implements Packet {
    }
 
    protected ActiveMQBuffer createPacket(RemotingConnection connection) {
+
+      int size = expectedEncodeSize();
+
       if (connection == null) {
-         return new ChannelBufferWrapper(Unpooled.buffer(INITIAL_PACKET_SIZE));
+         return new ChannelBufferWrapper(Unpooled.buffer(size));
       } else {
-         return connection.createTransportBuffer(PacketImpl.INITIAL_PACKET_SIZE);
+         return connection.createTransportBuffer(size);
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/911888e8/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
index f4efcb2..cc1d685 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
@@ -22,6 +22,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -231,6 +232,11 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
    }
 
    @Override
+   public boolean blockUntilWritable(int size, long timeout) {
+      return transportConnection.blockUntilWritable(size, timeout, TimeUnit.MILLISECONDS);
+   }
+
+   @Override
    public void disconnect(final boolean criticalError) {
       disconnect(null, criticalError);
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/911888e8/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java
index 0c32007..97b476d 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java
@@ -16,12 +16,8 @@
  */
 package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
 
-import io.netty.buffer.Unpooled;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ICoreMessage;
-import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
 import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
-import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 
 public abstract class MessagePacket extends PacketImpl implements MessagePacketI {
 
@@ -43,12 +39,4 @@ public abstract class MessagePacket extends PacketImpl implements MessagePacketI
       return super.getParentString() + ", message=" + message;
    }
 
-   protected ActiveMQBuffer internalCreatePacket(int size, RemotingConnection connection) {
-      if (connection == null) {
-         return new ChannelBufferWrapper(Unpooled.buffer(size));
-      } else {
-         return connection.createTransportBuffer(size);
-      }
-   }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/911888e8/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionContinuationMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionContinuationMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionContinuationMessage.java
index d2a4266..a894594 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionContinuationMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionContinuationMessage.java
@@ -18,11 +18,8 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
 
 import java.util.Arrays;
 
-import io.netty.buffer.Unpooled;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
 import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
-import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.utils.DataConstants;
 
 public abstract class SessionContinuationMessage extends PacketImpl {
@@ -74,18 +71,9 @@ public abstract class SessionContinuationMessage extends PacketImpl {
     *
     * @return the size in bytes of the expected encoded packet
     */
-   protected int expectedEncodedSize() {
-      return SESSION_CONTINUATION_BASE_SIZE + (body == null ? 0 : body.length);
-   }
-
    @Override
-   protected final ActiveMQBuffer createPacket(RemotingConnection connection) {
-      final int expectedEncodedSize = expectedEncodedSize();
-      if (connection == null) {
-         return new ChannelBufferWrapper(Unpooled.buffer(expectedEncodedSize));
-      } else {
-         return connection.createTransportBuffer(expectedEncodedSize);
-      }
+   public int expectedEncodeSize() {
+      return SESSION_CONTINUATION_BASE_SIZE + (body == null ? 0 : body.length);
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/911888e8/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java
index 44ad1bb..41e786b 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java
@@ -70,8 +70,8 @@ public class SessionReceiveContinuationMessage extends SessionContinuationMessag
    // Protected -----------------------------------------------------
 
    @Override
-   protected final int expectedEncodedSize() {
-      return super.expectedEncodedSize() + DataConstants.SIZE_LONG;
+   public int expectedEncodeSize() {
+      return super.expectedEncodeSize() + DataConstants.SIZE_LONG;
    }
 
    // Public --------------------------------------------------------
@@ -128,4 +128,4 @@ public class SessionReceiveContinuationMessage extends SessionContinuationMessag
       return true;
    }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/911888e8/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java
index d89e394..4fbd48f 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java
@@ -19,7 +19,6 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.core.message.impl.CoreMessage;
-import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.utils.DataConstants;
 
 public class SessionReceiveMessage extends MessagePacket {
@@ -53,9 +52,10 @@ public class SessionReceiveMessage extends MessagePacket {
       return deliveryCount;
    }
 
+
    @Override
-   protected ActiveMQBuffer createPacket(RemotingConnection connection) {
-      return internalCreatePacket(message.getEncodeSize() + PACKET_HEADERS_SIZE + DataConstants.SIZE_LONG + DataConstants.SIZE_INT, connection);
+   public int expectedEncodeSize() {
+      return message.getEncodeSize() + PACKET_HEADERS_SIZE + DataConstants.SIZE_LONG + DataConstants.SIZE_INT;
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/911888e8/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
index 1c600e9..26eedd7 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
@@ -93,8 +93,8 @@ public class SessionSendContinuationMessage extends SessionContinuationMessage {
    }
 
    @Override
-   protected final int expectedEncodedSize() {
-      return super.expectedEncodedSize() + (!continues ? DataConstants.SIZE_LONG : 0) + DataConstants.SIZE_BOOLEAN;
+   public int expectedEncodeSize() {
+      return super.expectedEncodeSize() + (!continues ? DataConstants.SIZE_LONG : 0) + DataConstants.SIZE_BOOLEAN;
    }
 
    @Override
@@ -160,4 +160,4 @@ public class SessionSendContinuationMessage extends SessionContinuationMessage {
    public SendAcknowledgementHandler getHandler() {
       return handler;
    }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/911888e8/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java
index 79cb4cb..9f76c2d 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java
@@ -21,7 +21,6 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
 import org.apache.activemq.artemis.core.message.impl.CoreMessage;
-import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 
 public class SessionSendMessage extends MessagePacket {
 
@@ -62,8 +61,8 @@ public class SessionSendMessage extends MessagePacket {
    }
 
    @Override
-   protected ActiveMQBuffer createPacket(RemotingConnection connection) {
-      return internalCreatePacket(message.getEncodeSize() + PACKET_HEADERS_SIZE + 1, connection);
+   public int expectedEncodeSize() {
+      return message.getEncodeSize() + PACKET_HEADERS_SIZE + 1;
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/911888e8/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddMessage.java
index 8d22fab..4a5a8b5 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddMessage.java
@@ -22,6 +22,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
 import org.apache.activemq.artemis.core.replication.ReplicationManager.ADD_OPERATION_TYPE;
+import org.apache.activemq.artemis.utils.DataConstants;
 
 public final class ReplicationAddMessage extends PacketImpl {
 
@@ -64,9 +65,19 @@ public final class ReplicationAddMessage extends PacketImpl {
    // Public --------------------------------------------------------
 
    @Override
+   public int expectedEncodeSize() {
+      return  PACKET_HEADERS_SIZE +
+         DataConstants.SIZE_BYTE + // buffer.writeByte(journalID);
+         DataConstants.SIZE_BOOLEAN + // buffer.writeBoolean(operation.toBoolean());
+         DataConstants.SIZE_LONG + // buffer.writeLong(id);
+         DataConstants.SIZE_BYTE + // buffer.writeByte(journalRecordType);
+         DataConstants.SIZE_INT + // buffer.writeInt(persister.getEncodeSize(encodingData));
+         persister.getEncodeSize(encodingData);// persister.encode(buffer, encodingData);
+   }
+
+   @Override
    public void encodeRest(final ActiveMQBuffer buffer) {
       buffer.writeByte(journalID);
-
       buffer.writeBoolean(operation.toBoolean());
       buffer.writeLong(id);
       buffer.writeByte(journalRecordType);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/911888e8/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddTXMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddTXMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddTXMessage.java
index a6fd02b..fd7946a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddTXMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddTXMessage.java
@@ -22,6 +22,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
 import org.apache.activemq.artemis.core.replication.ReplicationManager.ADD_OPERATION_TYPE;
+import org.apache.activemq.artemis.utils.DataConstants;
 
 public class ReplicationAddTXMessage extends PacketImpl {
 
@@ -68,6 +69,18 @@ public class ReplicationAddTXMessage extends PacketImpl {
    // Public --------------------------------------------------------
 
    @Override
+   public int expectedEncodeSize() {
+      return PACKET_HEADERS_SIZE +
+            DataConstants.SIZE_BYTE + // buffer.writeByte(journalID);
+            DataConstants.SIZE_BOOLEAN + // buffer.writeBoolean(operation.toBoolean());
+            DataConstants.SIZE_LONG + // buffer.writeLong(txId);
+            DataConstants.SIZE_LONG + // buffer.writeLong(id);
+            DataConstants.SIZE_BYTE + // buffer.writeByte(recordType);
+            DataConstants.SIZE_INT + // buffer.writeInt(persister.getEncodeSize(encodingData));
+            persister.getEncodeSize(encodingData); // persister.encode(buffer, encodingData);
+   }
+
+   @Override
    public void encodeRest(final ActiveMQBuffer buffer) {
       buffer.writeByte(journalID);
       buffer.writeBoolean(operation.toBoolean());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/911888e8/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationCommitMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationCommitMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationCommitMessage.java
index 1987b3d..245ec18 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationCommitMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationCommitMessage.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
+import org.apache.activemq.artemis.utils.DataConstants;
 
 public final class ReplicationCommitMessage extends PacketImpl {
 
@@ -42,6 +43,14 @@ public final class ReplicationCommitMessage extends PacketImpl {
    }
 
    @Override
+   public int expectedEncodeSize() {
+      return PACKET_HEADERS_SIZE +
+             DataConstants.SIZE_BYTE + // buffer.writeByte(journalID);
+             DataConstants.SIZE_BOOLEAN + // buffer.writeBoolean(rollback);
+             DataConstants.SIZE_LONG; // buffer.writeLong(txId);
+   }
+
+   @Override
    public void encodeRest(final ActiveMQBuffer buffer) {
       buffer.writeByte(journalID);
       buffer.writeBoolean(rollback);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/911888e8/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationDeleteMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationDeleteMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationDeleteMessage.java
index ab97b18..fdbfb9b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationDeleteMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationDeleteMessage.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
+import org.apache.activemq.artemis.utils.DataConstants;
 
 public final class ReplicationDeleteMessage extends PacketImpl {
 
@@ -39,6 +40,14 @@ public final class ReplicationDeleteMessage extends PacketImpl {
    }
 
    @Override
+   public int expectedEncodeSize() {
+      return PACKET_HEADERS_SIZE +
+         DataConstants.SIZE_BYTE + // buffer.writeByte(journalID);
+         DataConstants.SIZE_LONG; // buffer.writeLong(id);
+   }
+
+
+   @Override
    public void encodeRest(final ActiveMQBuffer buffer) {
       buffer.writeByte(journalID);
       buffer.writeLong(id);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/911888e8/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationDeleteTXMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationDeleteTXMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationDeleteTXMessage.java
index 0d75523..4e942bd 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationDeleteTXMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationDeleteTXMessage.java
@@ -21,6 +21,7 @@ import java.util.Arrays;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.core.journal.EncodingSupport;
 import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
+import org.apache.activemq.artemis.utils.DataConstants;
 
 public class ReplicationDeleteTXMessage extends PacketImpl {
 
@@ -53,6 +54,16 @@ public class ReplicationDeleteTXMessage extends PacketImpl {
    }
 
    @Override
+   public int expectedEncodeSize() {
+      return PACKET_HEADERS_SIZE +
+         DataConstants.SIZE_BYTE + // buffer.writeByte(journalID);
+         DataConstants.SIZE_LONG + // buffer.writeLong(txId);
+         DataConstants.SIZE_LONG +  // buffer.writeLong(id);
+         DataConstants.SIZE_INT +  // buffer.writeInt(encodingData.getEncodeSize());
+         encodingData.getEncodeSize(); // encodingData.encode(buffer);
+   }
+
+   @Override
    public void encodeRest(final ActiveMQBuffer buffer) {
       buffer.writeByte(journalID);
       buffer.writeLong(txId);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/911888e8/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageBeginMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageBeginMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageBeginMessage.java
index 20af68c..1ecaa68 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageBeginMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageBeginMessage.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
+import org.apache.activemq.artemis.utils.DataConstants;
 
 public class ReplicationLargeMessageBeginMessage extends PacketImpl {
 
@@ -32,6 +33,14 @@ public class ReplicationLargeMessageBeginMessage extends PacketImpl {
       super(PacketImpl.REPLICATION_LARGE_MESSAGE_BEGIN);
    }
 
+
+   @Override
+   public int expectedEncodeSize() {
+      return PACKET_HEADERS_SIZE +
+         DataConstants.SIZE_LONG; // buffer.writeLong(messageId);
+   }
+
+
    @Override
    public void encodeRest(final ActiveMQBuffer buffer) {
       buffer.writeLong(messageId);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/911888e8/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java
index bb77929..4a09cc0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
+import org.apache.activemq.artemis.utils.DataConstants;
 
 public class ReplicationLargeMessageEndMessage extends PacketImpl {
 
@@ -32,6 +33,13 @@ public class ReplicationLargeMessageEndMessage extends PacketImpl {
       this.messageId = messageId;
    }
 
+
+   @Override
+   public int expectedEncodeSize() {
+      return PACKET_HEADERS_SIZE +
+         DataConstants.SIZE_LONG; // buffer.writeLong(messageId);
+   }
+
    @Override
    public void encodeRest(final ActiveMQBuffer buffer) {
       buffer.writeLong(messageId);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/911888e8/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageWriteMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageWriteMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageWriteMessage.java
index f60c629..ffee14c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageWriteMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageWriteMessage.java
@@ -20,6 +20,7 @@ import java.util.Arrays;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
+import org.apache.activemq.artemis.utils.DataConstants;
 
 public final class ReplicationLargeMessageWriteMessage extends PacketImpl {
 
@@ -43,6 +44,15 @@ public final class ReplicationLargeMessageWriteMessage extends PacketImpl {
    }
 
    @Override
+   public int expectedEncodeSize() {
+      return PACKET_HEADERS_SIZE +
+         DataConstants.SIZE_LONG +  // buffer.writeLong(messageId);
+         DataConstants.SIZE_LONG +  // buffer.writeLong(messageId);
+         DataConstants.SIZE_INT +   // buffer.writeInt(body.length);
+         body.length;               // buffer.writeBytes(body);
+   }
+
+   @Override
    public void encodeRest(final ActiveMQBuffer buffer) {
       buffer.writeLong(messageId);
       buffer.writeInt(body.length);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/911888e8/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLiveIsStoppingMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLiveIsStoppingMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLiveIsStoppingMessage.java
index 339cfa8..456a46b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLiveIsStoppingMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLiveIsStoppingMessage.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
+import org.apache.activemq.artemis.utils.DataConstants;
 
 /**
  * Message indicating that the live is stopping (a scheduled stop).
@@ -60,6 +61,12 @@ public final class ReplicationLiveIsStoppingMessage extends PacketImpl {
    }
 
    @Override
+   public int expectedEncodeSize() {
+      return PACKET_HEADERS_SIZE +
+         DataConstants.SIZE_INT; // buffer.writeInt(liveStopping.code);
+   }
+
+   @Override
    public void encodeRest(final ActiveMQBuffer buffer) {
       buffer.writeInt(liveStopping.code);
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/911888e8/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageEventMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageEventMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageEventMessage.java
index 31680da..ea929e4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageEventMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageEventMessage.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
+import org.apache.activemq.artemis.utils.DataConstants;
 
 public class ReplicationPageEventMessage extends PacketImpl {
 
@@ -43,6 +44,14 @@ public class ReplicationPageEventMessage extends PacketImpl {
    }
 
    @Override
+   public int expectedEncodeSize() {
+      return PACKET_HEADERS_SIZE +
+             SimpleString.sizeofString(storeName) + // buffer.writeSimpleString(storeName);
+             DataConstants.SIZE_INT + //  buffer.writeInt(pageNumber);
+             DataConstants.SIZE_BOOLEAN; // buffer.writeBoolean(isDelete);
+   }
+
+   @Override
    public void encodeRest(final ActiveMQBuffer buffer) {
       buffer.writeSimpleString(storeName);
       buffer.writeInt(pageNumber);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/911888e8/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageWriteMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageWriteMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageWriteMessage.java
index b88e0fe..4f5079d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageWriteMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageWriteMessage.java
@@ -20,6 +20,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.core.paging.PagedMessage;
 import org.apache.activemq.artemis.core.paging.impl.PagedMessageImpl;
 import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
+import org.apache.activemq.artemis.utils.DataConstants;
 
 public class ReplicationPageWriteMessage extends PacketImpl {
 
@@ -40,6 +41,13 @@ public class ReplicationPageWriteMessage extends PacketImpl {
    // Public --------------------------------------------------------
 
    @Override
+   public int expectedEncodeSize() {
+      return PACKET_HEADERS_SIZE +
+             DataConstants.SIZE_INT + // buffer.writeInt(pageNumber);
+             pagedMessage.getEncodeSize(); //  pagedMessage.encode(buffer);
+   }
+
+   @Override
    public void encodeRest(final ActiveMQBuffer buffer) {
       buffer.writeInt(pageNumber);
       pagedMessage.encode(buffer);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/911888e8/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPrepareMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPrepareMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPrepareMessage.java
index ebdc1c4..cf993a3 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPrepareMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPrepareMessage.java
@@ -21,6 +21,7 @@ import java.util.Arrays;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.core.journal.EncodingSupport;
 import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
+import org.apache.activemq.artemis.utils.DataConstants;
 
 public final class ReplicationPrepareMessage extends PacketImpl {
 
@@ -49,6 +50,15 @@ public final class ReplicationPrepareMessage extends PacketImpl {
    // Public --------------------------------------------------------
 
    @Override
+   public int expectedEncodeSize() {
+      return PACKET_HEADERS_SIZE +
+             DataConstants.SIZE_BYTE + // buffer.writeByte(journalID);
+             DataConstants.SIZE_LONG + // buffer.writeLong(txId);
+             DataConstants.SIZE_INT + // buffer.writeInt(encodingData.getEncodeSize());
+             encodingData.getEncodeSize(); // encodingData.encode(buffer);
+   }
+
+   @Override
    public void encodeRest(final ActiveMQBuffer buffer) {
       buffer.writeByte(journalID);
       buffer.writeLong(txId);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/911888e8/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessage.java
index c7eff85..d9d8b1a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessage.java
@@ -27,4 +27,11 @@ public class ReplicationResponseMessage extends PacketImpl {
    public ReplicationResponseMessage(byte replicationResponseV2) {
       super(replicationResponseV2);
    }
+
+
+   @Override
+   public int expectedEncodeSize() {
+      return PACKET_HEADERS_SIZE;
+   }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/911888e8/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessageV2.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessageV2.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessageV2.java
index f9001c6..b26084b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessageV2.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessageV2.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
+import org.apache.activemq.artemis.utils.DataConstants;
 
 public final class ReplicationResponseMessageV2 extends ReplicationResponseMessage {
 
@@ -42,6 +43,12 @@ public final class ReplicationResponseMessageV2 extends ReplicationResponseMessa
    }
 
    @Override
+   public int expectedEncodeSize() {
+      return PACKET_HEADERS_SIZE +
+         DataConstants.SIZE_BOOLEAN; // buffer.writeBoolean(synchronizationIsFinishedAcknowledgement);
+   }
+
+   @Override
    public void encodeRest(final ActiveMQBuffer buffer) {
       super.encodeRest(buffer);
       buffer.writeBoolean(synchronizationIsFinishedAcknowledgement);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/911888e8/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java
index a44707f..018535f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java
@@ -24,6 +24,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.core.journal.impl.JournalFile;
 import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager;
 import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
+import org.apache.activemq.artemis.utils.DataConstants;
 
 /**
  * This message may signal start or end of the replication synchronization.
@@ -109,6 +110,26 @@ public class ReplicationStartSyncMessage extends PacketImpl {
       }
    }
 
+
+   @Override
+   public int expectedEncodeSize() {
+      int size = PACKET_HEADERS_SIZE +
+             DataConstants.SIZE_BOOLEAN + // buffer.writeBoolean(synchronizationIsFinished);
+             DataConstants.SIZE_BOOLEAN + // buffer.writeBoolean(allowsAutoFailBack);
+             nodeID.length() * 3; //  buffer.writeString(nodeID); -- an estimate
+
+
+      if (synchronizationIsFinished) {
+         return size;
+      }
+      size += DataConstants.SIZE_BYTE + // buffer.writeByte(dataType.code);
+              DataConstants.SIZE_INT +  // buffer.writeInt(ids.length);
+              DataConstants.SIZE_LONG * ids.length; // the write loop
+
+      return size;
+   }
+
+
    @Override
    public void encodeRest(final ActiveMQBuffer buffer) {
       buffer.writeBoolean(synchronizationIsFinished);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/911888e8/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
index 90d2ca0..4d3c32f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
@@ -25,6 +25,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager;
 import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
+import org.apache.activemq.artemis.utils.DataConstants;
 
 /**
  * Message is used to sync {@link org.apache.activemq.artemis.core.io.SequentialFile}s to a backup server. The {@link FileType} controls
@@ -99,6 +100,38 @@ public final class ReplicationSyncFileMessage extends PacketImpl {
    }
 
    @Override
+   public int expectedEncodeSize() {
+      int size = PACKET_HEADERS_SIZE +
+                 DataConstants.SIZE_LONG; // buffer.writeLong(fileId);
+
+      if (fileId == -1)
+         return size;
+
+      size += DataConstants.SIZE_BYTE; // buffer.writeByte(fileType.code);
+      switch (fileType) {
+         case JOURNAL: {
+            size += DataConstants.SIZE_BYTE; // buffer.writeByte(journalType.typeByte);
+            break;
+         }
+         case PAGE: {
+            size += SimpleString.sizeofString(pageStoreName);
+            break;
+         }
+         case LARGE_MESSAGE:
+         default:
+            // no-op
+      }
+
+      size += DataConstants.SIZE_INT; // buffer.writeInt(dataSize);
+
+      if (dataSize > 0) {
+         size += byteBuffer.writerIndex(); // buffer.writeBytes(byteBuffer, 0, byteBuffer.writerIndex());
+      }
+
+      return size;
+   }
+
+   @Override
    public void encodeRest(final ActiveMQBuffer buffer) {
       buffer.writeLong(fileId);
       if (fileId == -1)
@@ -126,11 +159,6 @@ public final class ReplicationSyncFileMessage extends PacketImpl {
       if (dataSize > 0) {
          buffer.writeBytes(byteBuffer, 0, byteBuffer.writerIndex());
       }
-
-      if (byteBuffer != null) {
-         byteBuffer.release();
-         byteBuffer = null;
-      }
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/911888e8/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyServerConnection.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyServerConnection.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyServerConnection.java
index 29e39d5..f9e1b3d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyServerConnection.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyServerConnection.java
@@ -19,8 +19,6 @@ package org.apache.activemq.artemis.core.remoting.impl.netty;
 import java.util.Map;
 
 import io.netty.channel.Channel;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
 import org.apache.activemq.artemis.spi.core.remoting.ServerConnectionLifeCycleListener;
 
 public class NettyServerConnection extends NettyConnection {
@@ -33,8 +31,4 @@ public class NettyServerConnection extends NettyConnection {
       super(configuration, channel, listener, batchingEnabled, directDeliver);
    }
 
-   @Override
-   public ActiveMQBuffer createTransportBuffer(int size) {
-      return new ChannelBufferWrapper(channel.alloc().directBuffer(size), true);
-   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/911888e8/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
index 8b91c02..91dfa98 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
@@ -33,16 +33,15 @@ import io.netty.buffer.PooledByteBufAllocator;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
-import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
 import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
 import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.core.journal.EncodingSupport;
-import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.core.journal.impl.JournalFile;
 import org.apache.activemq.artemis.core.paging.PagedMessage;
 import org.apache.activemq.artemis.core.persistence.OperationContext;
+import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager;
 import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
 import org.apache.activemq.artemis.core.protocol.core.Channel;
@@ -71,7 +70,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQComponent;
 import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
-import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
 import org.apache.activemq.artemis.utils.ExecutorFactory;
 import org.apache.activemq.artemis.utils.ReusableLatch;
 import org.jboss.logging.Logger;
@@ -84,7 +82,7 @@ import org.jboss.logging.Logger;
  *
  * @see ReplicationEndpoint
  */
-public final class ReplicationManager implements ActiveMQComponent, ReadyListener {
+public final class ReplicationManager implements ActiveMQComponent {
 
    private static final Logger logger = Logger.getLogger(ReplicationManager.class);
 
@@ -119,8 +117,6 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
 
    private final AtomicBoolean writable = new AtomicBoolean(true);
 
-   private final Object replicationLock = new Object();
-
    private final Queue<OperationContext> pendingTokens = new ConcurrentLinkedQueue<>();
 
    private final ExecutorFactory executorFactory;
@@ -292,12 +288,9 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
          replicatingChannel.getConnection().getTransportConnection().fireReady(true);
       }
 
-      synchronized (replicationLock) {
-         enabled = false;
-         writable.set(true);
-         replicationLock.notifyAll();
-         clearReplicationTokens();
-      }
+      enabled = false;
+      writable.set(true);
+      clearReplicationTokens();
 
       RemotingConnection toStop = remotingConnection;
       if (toStop != null) {
@@ -315,16 +308,13 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
     */
    public void clearReplicationTokens() {
       logger.trace("clearReplicationTokens initiating");
-      synchronized (replicationLock) {
-         logger.trace("clearReplicationTokens entered the lock");
-         while (!pendingTokens.isEmpty()) {
-            OperationContext ctx = pendingTokens.poll();
-            logger.trace("Calling ctx.replicationDone()");
-            try {
-               ctx.replicationDone();
-            } catch (Throwable e) {
-               ActiveMQServerLogger.LOGGER.errorCompletingCallbackOnReplicationManager(e);
-            }
+      while (!pendingTokens.isEmpty()) {
+         OperationContext ctx = pendingTokens.poll();
+         logger.trace("Calling ctx.replicationDone()");
+         try {
+            ctx.replicationDone();
+         } catch (Throwable e) {
+            ActiveMQServerLogger.LOGGER.errorCompletingCallbackOnReplicationManager(e);
          }
       }
       logger.trace("clearReplicationTokens finished");
@@ -362,24 +352,22 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
          repliToken.replicationLineUp();
       }
 
-      synchronized (replicationLock) {
-         if (enabled) {
-            pendingTokens.add(repliToken);
-            if (useExecutor) {
-               replicationStream.execute(() -> {
-                  if (enabled) {
-                     flowControl();
-                     replicatingChannel.send(packet);
-                  }
-               });
-            } else {
-               flowControl();
-               replicatingChannel.send(packet);
-            }
+      if (enabled) {
+         pendingTokens.add(repliToken);
+         if (useExecutor) {
+            replicationStream.execute(() -> {
+               if (enabled) {
+                  flowControl(packet.expectedEncodeSize());
+                  replicatingChannel.send(packet);
+               }
+            });
          } else {
-            // Already replicating channel failed, so just play the action now
-            runItNow = true;
+            flowControl(packet.expectedEncodeSize());
+            replicatingChannel.send(packet);
          }
+      } else {
+         // Already replicating channel failed, so just play the action now
+         runItNow = true;
       }
 
       // Execute outside lock
@@ -395,47 +383,20 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
     * This was written as a refactoring of sendReplicatePacket.
     * In case you refactor this in any way, this method must hold a lock on replication lock. .
     */
-   private boolean flowControl() {
-      synchronized (replicationLock) {
-         // synchronized (replicationLock) { -- I'm not adding this because the caller already has it
-         // future maintainers of this code please be aware that the intention here is hold the lock on replication lock
-         if (!replicatingChannel.getConnection().isWritable(this)) {
-            try {
-               logger.trace("flowControl waiting on writable replication");
-               writable.set(false);
-               //don't wait for ever as this may hang tests etc, we've probably been closed anyway
-               long now = System.currentTimeMillis();
-               long deadline = now + timeout;
-               while (!writable.get() && now < deadline) {
-                  replicationLock.wait(deadline - now);
-                  now = System.currentTimeMillis();
-               }
-               logger.trace("flow control done on replication");
-
-               if (!writable.get()) {
-                  ActiveMQServerLogger.LOGGER.slowReplicationResponse();
-                  logger.tracef("There was no response from replication backup after %s seconds, server being stopped now", System.currentTimeMillis() - now);
-                  try {
-                     stop();
-                  } catch (Exception e) {
-                     logger.warn(e.getMessage(), e);
-                  }
-                  return false;
-               }
-            } catch (InterruptedException e) {
-               throw new ActiveMQInterruptedException(e);
-            }
+   private boolean flowControl(int size) {
+      boolean flowWorked = replicatingChannel.getConnection().blockUntilWritable(size, timeout);
+
+      if (!flowWorked) {
+         try {
+            ActiveMQServerLogger.LOGGER.slowReplicationResponse();
+            stop();
+         } catch (Exception e) {
+            logger.warn(e.getMessage(), e);
          }
       }
-      return true;
-   }
 
-   @Override
-   public void readyForWriting() {
-      synchronized (replicationLock) {
-         writable.set(true);
-         replicationLock.notifyAll();
-      }
+
+      return flowWorked;
    }
 
    /**
@@ -569,15 +530,17 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
       if (!file.isOpen()) {
          file.open();
       }
+      int size = 32 * 1024;
+      final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size, size);
+
       try {
          try (final FileInputStream fis = new FileInputStream(file.getJavaFile());
               final FileChannel channel = fis.getChannel()) {
             // We can afford having a single buffer here for this entire loop
             // because sendReplicatePacket will encode the packet as a NettyBuffer
             // through ActiveMQBuffer class leaving this buffer free to be reused on the next copy
-            int size = 1 << 17;
             while (true) {
-               final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size);
+               buffer.clear();
                ByteBuffer byteBuffer = buffer.writerIndex(size).readerIndex(0).nioBuffer();
                final int bytesRead = channel.read(byteBuffer);
                int toSend = bytesRead;
@@ -600,6 +563,7 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
             }
          }
       } finally {
+         buffer.release();
          if (file.isOpen())
             file.close();
       }


[2/2] activemq-artemis git commit: This closes #1183

Posted by jb...@apache.org.
This closes #1183


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/7304416d
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/7304416d
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/7304416d

Branch: refs/heads/master
Commit: 7304416d424dfe15918b8141794ea0d10c94e149
Parents: d29afd1 911888e
Author: Justin Bertram <jb...@apache.org>
Authored: Fri Apr 7 09:10:44 2017 -0500
Committer: Justin Bertram <jb...@apache.org>
Committed: Fri Apr 7 09:10:44 2017 -0500

----------------------------------------------------------------------
 .../protocol/core/CoreRemotingConnection.java   |   8 ++
 .../artemis/core/protocol/core/Packet.java      |  10 ++
 .../core/protocol/core/impl/PacketImpl.java     |   9 +-
 .../core/impl/RemotingConnectionImpl.java       |   6 +
 .../core/impl/wireformat/MessagePacket.java     |  12 --
 .../wireformat/SessionContinuationMessage.java  |  16 +--
 .../SessionReceiveContinuationMessage.java      |   6 +-
 .../impl/wireformat/SessionReceiveMessage.java  |   6 +-
 .../SessionSendContinuationMessage.java         |   6 +-
 .../impl/wireformat/SessionSendMessage.java     |   5 +-
 .../impl/wireformat/ReplicationAddMessage.java  |  13 +-
 .../wireformat/ReplicationAddTXMessage.java     |  13 ++
 .../wireformat/ReplicationCommitMessage.java    |   9 ++
 .../wireformat/ReplicationDeleteMessage.java    |   9 ++
 .../wireformat/ReplicationDeleteTXMessage.java  |  11 ++
 .../ReplicationLargeMessageBeginMessage.java    |   9 ++
 .../ReplicationLargeMessageEndMessage.java      |   8 ++
 .../ReplicationLargeMessageWriteMessage.java    |  10 ++
 .../ReplicationLiveIsStoppingMessage.java       |   7 ++
 .../wireformat/ReplicationPageEventMessage.java |   9 ++
 .../wireformat/ReplicationPageWriteMessage.java |   8 ++
 .../wireformat/ReplicationPrepareMessage.java   |  10 ++
 .../wireformat/ReplicationResponseMessage.java  |   7 ++
 .../ReplicationResponseMessageV2.java           |   7 ++
 .../wireformat/ReplicationStartSyncMessage.java |  21 ++++
 .../wireformat/ReplicationSyncFileMessage.java  |  38 +++++-
 .../impl/netty/NettyServerConnection.java       |   6 -
 .../core/replication/ReplicationManager.java    | 120 +++++++------------
 28 files changed, 267 insertions(+), 132 deletions(-)
----------------------------------------------------------------------