You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2017/04/07 14:10:59 UTC
[1/2] activemq-artemis git commit: ARTEMIS-1089 Improving flow
control on replication
Repository: activemq-artemis
Updated Branches:
refs/heads/master d29afd1c8 -> 7304416d4
ARTEMIS-1089 Improving flow control on replication
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/911888e8
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/911888e8
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/911888e8
Branch: refs/heads/master
Commit: 911888e8d19c98400a68f969b1c8f86bff51fa17
Parents: d29afd1
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Apr 6 11:47:31 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Apr 6 21:45:31 2017 -0400
----------------------------------------------------------------------
.../protocol/core/CoreRemotingConnection.java | 8 ++
.../artemis/core/protocol/core/Packet.java | 10 ++
.../core/protocol/core/impl/PacketImpl.java | 9 +-
.../core/impl/RemotingConnectionImpl.java | 6 +
.../core/impl/wireformat/MessagePacket.java | 12 --
.../wireformat/SessionContinuationMessage.java | 16 +--
.../SessionReceiveContinuationMessage.java | 6 +-
.../impl/wireformat/SessionReceiveMessage.java | 6 +-
.../SessionSendContinuationMessage.java | 6 +-
.../impl/wireformat/SessionSendMessage.java | 5 +-
.../impl/wireformat/ReplicationAddMessage.java | 13 +-
.../wireformat/ReplicationAddTXMessage.java | 13 ++
.../wireformat/ReplicationCommitMessage.java | 9 ++
.../wireformat/ReplicationDeleteMessage.java | 9 ++
.../wireformat/ReplicationDeleteTXMessage.java | 11 ++
.../ReplicationLargeMessageBeginMessage.java | 9 ++
.../ReplicationLargeMessageEndMessage.java | 8 ++
.../ReplicationLargeMessageWriteMessage.java | 10 ++
.../ReplicationLiveIsStoppingMessage.java | 7 ++
.../wireformat/ReplicationPageEventMessage.java | 9 ++
.../wireformat/ReplicationPageWriteMessage.java | 8 ++
.../wireformat/ReplicationPrepareMessage.java | 10 ++
.../wireformat/ReplicationResponseMessage.java | 7 ++
.../ReplicationResponseMessageV2.java | 7 ++
.../wireformat/ReplicationStartSyncMessage.java | 21 ++++
.../wireformat/ReplicationSyncFileMessage.java | 38 +++++-
.../impl/netty/NettyServerConnection.java | 6 -
.../core/replication/ReplicationManager.java | 120 +++++++------------
28 files changed, 267 insertions(+), 132 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/911888e8/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java
index f2aa5b4..45d9229 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java
@@ -111,4 +111,12 @@ public interface CoreRemotingConnection extends RemotingConnection {
* @return the principal
*/
ActiveMQPrincipal getDefaultActiveMQPrincipal();
+
+ /**
+ *
+ * @param size size we are trying to write
+ * @param timeout
+ * @return
+ */
+ boolean blockUntilWritable(int size, long timeout);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/911888e8/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java
index ddb734e..a86c5c1 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java
@@ -24,6 +24,8 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
*/
public interface Packet {
+ int INITIAL_PACKET_SIZE = 1500;
+
/**
* Sets the channel id that should be used once the packet has been successfully decoded it is
* sent to the correct channel.
@@ -33,6 +35,14 @@ public interface Packet {
void setChannelID(long channelID);
/**
+ * This will return the expected packet size for the encoding
+ * @return
+ */
+ default int expectedEncodeSize() {
+ return INITIAL_PACKET_SIZE;
+ }
+
+ /**
* Returns the channel id of the channel that should handle this packet.
*
* @return the id of the channel
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/911888e8/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
index 75f5086..97a7973 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
@@ -39,8 +39,6 @@ public class PacketImpl implements Packet {
public static final int PACKET_HEADERS_SIZE = DataConstants.SIZE_INT + DataConstants.SIZE_BYTE +
DataConstants.SIZE_LONG;
- private static final int INITIAL_PACKET_SIZE = 1500;
-
protected long channelID;
private final byte type;
@@ -329,10 +327,13 @@ public class PacketImpl implements Packet {
}
protected ActiveMQBuffer createPacket(RemotingConnection connection) {
+
+ int size = expectedEncodeSize();
+
if (connection == null) {
- return new ChannelBufferWrapper(Unpooled.buffer(INITIAL_PACKET_SIZE));
+ return new ChannelBufferWrapper(Unpooled.buffer(size));
} else {
- return connection.createTransportBuffer(PacketImpl.INITIAL_PACKET_SIZE);
+ return connection.createTransportBuffer(size);
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/911888e8/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
index f4efcb2..cc1d685 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
@@ -22,6 +22,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -231,6 +232,11 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
}
@Override
+ public boolean blockUntilWritable(int size, long timeout) {
+ return transportConnection.blockUntilWritable(size, timeout, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
public void disconnect(final boolean criticalError) {
disconnect(null, criticalError);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/911888e8/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java
index 0c32007..97b476d 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java
@@ -16,12 +16,8 @@
*/
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
-import io.netty.buffer.Unpooled;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ICoreMessage;
-import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
-import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
public abstract class MessagePacket extends PacketImpl implements MessagePacketI {
@@ -43,12 +39,4 @@ public abstract class MessagePacket extends PacketImpl implements MessagePacketI
return super.getParentString() + ", message=" + message;
}
- protected ActiveMQBuffer internalCreatePacket(int size, RemotingConnection connection) {
- if (connection == null) {
- return new ChannelBufferWrapper(Unpooled.buffer(size));
- } else {
- return connection.createTransportBuffer(size);
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/911888e8/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionContinuationMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionContinuationMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionContinuationMessage.java
index d2a4266..a894594 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionContinuationMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionContinuationMessage.java
@@ -18,11 +18,8 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import java.util.Arrays;
-import io.netty.buffer.Unpooled;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
-import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.utils.DataConstants;
public abstract class SessionContinuationMessage extends PacketImpl {
@@ -74,18 +71,9 @@ public abstract class SessionContinuationMessage extends PacketImpl {
*
* @return the size in bytes of the expected encoded packet
*/
- protected int expectedEncodedSize() {
- return SESSION_CONTINUATION_BASE_SIZE + (body == null ? 0 : body.length);
- }
-
@Override
- protected final ActiveMQBuffer createPacket(RemotingConnection connection) {
- final int expectedEncodedSize = expectedEncodedSize();
- if (connection == null) {
- return new ChannelBufferWrapper(Unpooled.buffer(expectedEncodedSize));
- } else {
- return connection.createTransportBuffer(expectedEncodedSize);
- }
+ public int expectedEncodeSize() {
+ return SESSION_CONTINUATION_BASE_SIZE + (body == null ? 0 : body.length);
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/911888e8/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java
index 44ad1bb..41e786b 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java
@@ -70,8 +70,8 @@ public class SessionReceiveContinuationMessage extends SessionContinuationMessag
// Protected -----------------------------------------------------
@Override
- protected final int expectedEncodedSize() {
- return super.expectedEncodedSize() + DataConstants.SIZE_LONG;
+ public int expectedEncodeSize() {
+ return super.expectedEncodeSize() + DataConstants.SIZE_LONG;
}
// Public --------------------------------------------------------
@@ -128,4 +128,4 @@ public class SessionReceiveContinuationMessage extends SessionContinuationMessag
return true;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/911888e8/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java
index d89e394..4fbd48f 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java
@@ -19,7 +19,6 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
-import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.utils.DataConstants;
public class SessionReceiveMessage extends MessagePacket {
@@ -53,9 +52,10 @@ public class SessionReceiveMessage extends MessagePacket {
return deliveryCount;
}
+
@Override
- protected ActiveMQBuffer createPacket(RemotingConnection connection) {
- return internalCreatePacket(message.getEncodeSize() + PACKET_HEADERS_SIZE + DataConstants.SIZE_LONG + DataConstants.SIZE_INT, connection);
+ public int expectedEncodeSize() {
+ return message.getEncodeSize() + PACKET_HEADERS_SIZE + DataConstants.SIZE_LONG + DataConstants.SIZE_INT;
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/911888e8/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
index 1c600e9..26eedd7 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage.java
@@ -93,8 +93,8 @@ public class SessionSendContinuationMessage extends SessionContinuationMessage {
}
@Override
- protected final int expectedEncodedSize() {
- return super.expectedEncodedSize() + (!continues ? DataConstants.SIZE_LONG : 0) + DataConstants.SIZE_BOOLEAN;
+ public int expectedEncodeSize() {
+ return super.expectedEncodeSize() + (!continues ? DataConstants.SIZE_LONG : 0) + DataConstants.SIZE_BOOLEAN;
}
@Override
@@ -160,4 +160,4 @@ public class SessionSendContinuationMessage extends SessionContinuationMessage {
public SendAcknowledgementHandler getHandler() {
return handler;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/911888e8/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java
index 79cb4cb..9f76c2d 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java
@@ -21,7 +21,6 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
-import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
public class SessionSendMessage extends MessagePacket {
@@ -62,8 +61,8 @@ public class SessionSendMessage extends MessagePacket {
}
@Override
- protected ActiveMQBuffer createPacket(RemotingConnection connection) {
- return internalCreatePacket(message.getEncodeSize() + PACKET_HEADERS_SIZE + 1, connection);
+ public int expectedEncodeSize() {
+ return message.getEncodeSize() + PACKET_HEADERS_SIZE + 1;
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/911888e8/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddMessage.java
index 8d22fab..4a5a8b5 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddMessage.java
@@ -22,6 +22,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.replication.ReplicationManager.ADD_OPERATION_TYPE;
+import org.apache.activemq.artemis.utils.DataConstants;
public final class ReplicationAddMessage extends PacketImpl {
@@ -64,9 +65,19 @@ public final class ReplicationAddMessage extends PacketImpl {
// Public --------------------------------------------------------
@Override
+ public int expectedEncodeSize() {
+ return PACKET_HEADERS_SIZE +
+ DataConstants.SIZE_BYTE + // buffer.writeByte(journalID);
+ DataConstants.SIZE_BOOLEAN + // buffer.writeBoolean(operation.toBoolean());
+ DataConstants.SIZE_LONG + // buffer.writeLong(id);
+ DataConstants.SIZE_BYTE + // buffer.writeByte(journalRecordType);
+ DataConstants.SIZE_INT + // buffer.writeInt(persister.getEncodeSize(encodingData));
+ persister.getEncodeSize(encodingData);// persister.encode(buffer, encodingData);
+ }
+
+ @Override
public void encodeRest(final ActiveMQBuffer buffer) {
buffer.writeByte(journalID);
-
buffer.writeBoolean(operation.toBoolean());
buffer.writeLong(id);
buffer.writeByte(journalRecordType);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/911888e8/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddTXMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddTXMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddTXMessage.java
index a6fd02b..fd7946a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddTXMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddTXMessage.java
@@ -22,6 +22,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.replication.ReplicationManager.ADD_OPERATION_TYPE;
+import org.apache.activemq.artemis.utils.DataConstants;
public class ReplicationAddTXMessage extends PacketImpl {
@@ -68,6 +69,18 @@ public class ReplicationAddTXMessage extends PacketImpl {
// Public --------------------------------------------------------
@Override
+ public int expectedEncodeSize() {
+ return PACKET_HEADERS_SIZE +
+ DataConstants.SIZE_BYTE + // buffer.writeByte(journalID);
+ DataConstants.SIZE_BOOLEAN + // buffer.writeBoolean(operation.toBoolean());
+ DataConstants.SIZE_LONG + // buffer.writeLong(txId);
+ DataConstants.SIZE_LONG + // buffer.writeLong(id);
+ DataConstants.SIZE_BYTE + // buffer.writeByte(recordType);
+ DataConstants.SIZE_INT + // buffer.writeInt(persister.getEncodeSize(encodingData));
+ persister.getEncodeSize(encodingData); // persister.encode(buffer, encodingData);
+ }
+
+ @Override
public void encodeRest(final ActiveMQBuffer buffer) {
buffer.writeByte(journalID);
buffer.writeBoolean(operation.toBoolean());
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/911888e8/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationCommitMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationCommitMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationCommitMessage.java
index 1987b3d..245ec18 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationCommitMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationCommitMessage.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
+import org.apache.activemq.artemis.utils.DataConstants;
public final class ReplicationCommitMessage extends PacketImpl {
@@ -42,6 +43,14 @@ public final class ReplicationCommitMessage extends PacketImpl {
}
@Override
+ public int expectedEncodeSize() {
+ return PACKET_HEADERS_SIZE +
+ DataConstants.SIZE_BYTE + // buffer.writeByte(journalID);
+ DataConstants.SIZE_BOOLEAN + // buffer.writeBoolean(rollback);
+ DataConstants.SIZE_LONG; // buffer.writeLong(txId);
+ }
+
+ @Override
public void encodeRest(final ActiveMQBuffer buffer) {
buffer.writeByte(journalID);
buffer.writeBoolean(rollback);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/911888e8/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationDeleteMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationDeleteMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationDeleteMessage.java
index ab97b18..fdbfb9b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationDeleteMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationDeleteMessage.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
+import org.apache.activemq.artemis.utils.DataConstants;
public final class ReplicationDeleteMessage extends PacketImpl {
@@ -39,6 +40,14 @@ public final class ReplicationDeleteMessage extends PacketImpl {
}
@Override
+ public int expectedEncodeSize() {
+ return PACKET_HEADERS_SIZE +
+ DataConstants.SIZE_BYTE + // buffer.writeByte(journalID);
+ DataConstants.SIZE_LONG; // buffer.writeLong(id);
+ }
+
+
+ @Override
public void encodeRest(final ActiveMQBuffer buffer) {
buffer.writeByte(journalID);
buffer.writeLong(id);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/911888e8/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationDeleteTXMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationDeleteTXMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationDeleteTXMessage.java
index 0d75523..4e942bd 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationDeleteTXMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationDeleteTXMessage.java
@@ -21,6 +21,7 @@ import java.util.Arrays;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
+import org.apache.activemq.artemis.utils.DataConstants;
public class ReplicationDeleteTXMessage extends PacketImpl {
@@ -53,6 +54,16 @@ public class ReplicationDeleteTXMessage extends PacketImpl {
}
@Override
+ public int expectedEncodeSize() {
+ return PACKET_HEADERS_SIZE +
+ DataConstants.SIZE_BYTE + // buffer.writeByte(journalID);
+ DataConstants.SIZE_LONG + // buffer.writeLong(txId);
+ DataConstants.SIZE_LONG + // buffer.writeLong(id);
+ DataConstants.SIZE_INT + // buffer.writeInt(encodingData.getEncodeSize());
+ encodingData.getEncodeSize(); // encodingData.encode(buffer);
+ }
+
+ @Override
public void encodeRest(final ActiveMQBuffer buffer) {
buffer.writeByte(journalID);
buffer.writeLong(txId);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/911888e8/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageBeginMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageBeginMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageBeginMessage.java
index 20af68c..1ecaa68 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageBeginMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageBeginMessage.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
+import org.apache.activemq.artemis.utils.DataConstants;
public class ReplicationLargeMessageBeginMessage extends PacketImpl {
@@ -32,6 +33,14 @@ public class ReplicationLargeMessageBeginMessage extends PacketImpl {
super(PacketImpl.REPLICATION_LARGE_MESSAGE_BEGIN);
}
+
+ @Override
+ public int expectedEncodeSize() {
+ return PACKET_HEADERS_SIZE +
+ DataConstants.SIZE_LONG; // buffer.writeLong(messageId);
+ }
+
+
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
buffer.writeLong(messageId);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/911888e8/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java
index bb77929..4a09cc0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
+import org.apache.activemq.artemis.utils.DataConstants;
public class ReplicationLargeMessageEndMessage extends PacketImpl {
@@ -32,6 +33,13 @@ public class ReplicationLargeMessageEndMessage extends PacketImpl {
this.messageId = messageId;
}
+
+ @Override
+ public int expectedEncodeSize() {
+ return PACKET_HEADERS_SIZE +
+ DataConstants.SIZE_LONG; // buffer.writeLong(messageId);
+ }
+
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
buffer.writeLong(messageId);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/911888e8/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageWriteMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageWriteMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageWriteMessage.java
index f60c629..ffee14c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageWriteMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageWriteMessage.java
@@ -20,6 +20,7 @@ import java.util.Arrays;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
+import org.apache.activemq.artemis.utils.DataConstants;
public final class ReplicationLargeMessageWriteMessage extends PacketImpl {
@@ -43,6 +44,15 @@ public final class ReplicationLargeMessageWriteMessage extends PacketImpl {
}
@Override
+ public int expectedEncodeSize() {
+ return PACKET_HEADERS_SIZE +
+ DataConstants.SIZE_LONG + // buffer.writeLong(messageId);
+ DataConstants.SIZE_LONG + // buffer.writeLong(messageId);
+ DataConstants.SIZE_INT + // buffer.writeInt(body.length);
+ body.length; // buffer.writeBytes(body);
+ }
+
+ @Override
public void encodeRest(final ActiveMQBuffer buffer) {
buffer.writeLong(messageId);
buffer.writeInt(body.length);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/911888e8/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLiveIsStoppingMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLiveIsStoppingMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLiveIsStoppingMessage.java
index 339cfa8..456a46b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLiveIsStoppingMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLiveIsStoppingMessage.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
+import org.apache.activemq.artemis.utils.DataConstants;
/**
* Message indicating that the live is stopping (a scheduled stop).
@@ -60,6 +61,12 @@ public final class ReplicationLiveIsStoppingMessage extends PacketImpl {
}
@Override
+ public int expectedEncodeSize() {
+ return PACKET_HEADERS_SIZE +
+ DataConstants.SIZE_INT; // buffer.writeInt(liveStopping.code);
+ }
+
+ @Override
public void encodeRest(final ActiveMQBuffer buffer) {
buffer.writeInt(liveStopping.code);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/911888e8/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageEventMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageEventMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageEventMessage.java
index 31680da..ea929e4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageEventMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageEventMessage.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
+import org.apache.activemq.artemis.utils.DataConstants;
public class ReplicationPageEventMessage extends PacketImpl {
@@ -43,6 +44,14 @@ public class ReplicationPageEventMessage extends PacketImpl {
}
@Override
+ public int expectedEncodeSize() {
+ return PACKET_HEADERS_SIZE +
+ SimpleString.sizeofString(storeName) + // buffer.writeSimpleString(storeName);
+ DataConstants.SIZE_INT + // buffer.writeInt(pageNumber);
+ DataConstants.SIZE_BOOLEAN; // buffer.writeBoolean(isDelete);
+ }
+
+ @Override
public void encodeRest(final ActiveMQBuffer buffer) {
buffer.writeSimpleString(storeName);
buffer.writeInt(pageNumber);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/911888e8/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageWriteMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageWriteMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageWriteMessage.java
index b88e0fe..4f5079d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageWriteMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageWriteMessage.java
@@ -20,6 +20,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.impl.PagedMessageImpl;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
+import org.apache.activemq.artemis.utils.DataConstants;
public class ReplicationPageWriteMessage extends PacketImpl {
@@ -40,6 +41,13 @@ public class ReplicationPageWriteMessage extends PacketImpl {
// Public --------------------------------------------------------
@Override
+ public int expectedEncodeSize() {
+ return PACKET_HEADERS_SIZE +
+ DataConstants.SIZE_INT + // buffer.writeInt(pageNumber);
+ pagedMessage.getEncodeSize(); // pagedMessage.encode(buffer);
+ }
+
+ @Override
public void encodeRest(final ActiveMQBuffer buffer) {
buffer.writeInt(pageNumber);
pagedMessage.encode(buffer);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/911888e8/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPrepareMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPrepareMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPrepareMessage.java
index ebdc1c4..cf993a3 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPrepareMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPrepareMessage.java
@@ -21,6 +21,7 @@ import java.util.Arrays;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
+import org.apache.activemq.artemis.utils.DataConstants;
public final class ReplicationPrepareMessage extends PacketImpl {
@@ -49,6 +50,15 @@ public final class ReplicationPrepareMessage extends PacketImpl {
// Public --------------------------------------------------------
@Override
+ public int expectedEncodeSize() {
+ return PACKET_HEADERS_SIZE +
+ DataConstants.SIZE_BYTE + // buffer.writeByte(journalID);
+ DataConstants.SIZE_LONG + // buffer.writeLong(txId);
+ DataConstants.SIZE_INT + // buffer.writeInt(encodingData.getEncodeSize());
+ encodingData.getEncodeSize(); // encodingData.encode(buffer);
+ }
+
+ @Override
public void encodeRest(final ActiveMQBuffer buffer) {
buffer.writeByte(journalID);
buffer.writeLong(txId);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/911888e8/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessage.java
index c7eff85..d9d8b1a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessage.java
@@ -27,4 +27,11 @@ public class ReplicationResponseMessage extends PacketImpl {
public ReplicationResponseMessage(byte replicationResponseV2) {
super(replicationResponseV2);
}
+
+
+ @Override
+ public int expectedEncodeSize() {
+ return PACKET_HEADERS_SIZE;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/911888e8/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessageV2.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessageV2.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessageV2.java
index f9001c6..b26084b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessageV2.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessageV2.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
+import org.apache.activemq.artemis.utils.DataConstants;
public final class ReplicationResponseMessageV2 extends ReplicationResponseMessage {
@@ -42,6 +43,12 @@ public final class ReplicationResponseMessageV2 extends ReplicationResponseMessa
}
@Override
+ public int expectedEncodeSize() {
+ return PACKET_HEADERS_SIZE +
+ DataConstants.SIZE_BOOLEAN; // buffer.writeBoolean(synchronizationIsFinishedAcknowledgement);
+ }
+
+ @Override
public void encodeRest(final ActiveMQBuffer buffer) {
super.encodeRest(buffer);
buffer.writeBoolean(synchronizationIsFinishedAcknowledgement);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/911888e8/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java
index a44707f..018535f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java
@@ -24,6 +24,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
+import org.apache.activemq.artemis.utils.DataConstants;
/**
* This message may signal start or end of the replication synchronization.
@@ -109,6 +110,26 @@ public class ReplicationStartSyncMessage extends PacketImpl {
}
}
+
+ @Override
+ public int expectedEncodeSize() {
+ int size = PACKET_HEADERS_SIZE +
+ DataConstants.SIZE_BOOLEAN + // buffer.writeBoolean(synchronizationIsFinished);
+ DataConstants.SIZE_BOOLEAN + // buffer.writeBoolean(allowsAutoFailBack);
+ nodeID.length() * 3; // buffer.writeString(nodeID); -- an estimate
+
+
+ if (synchronizationIsFinished) {
+ return size;
+ }
+ size += DataConstants.SIZE_BYTE + // buffer.writeByte(dataType.code);
+ DataConstants.SIZE_INT + // buffer.writeInt(ids.length);
+ DataConstants.SIZE_LONG * ids.length; // the write loop
+
+ return size;
+ }
+
+
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
buffer.writeBoolean(synchronizationIsFinished);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/911888e8/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
index 90d2ca0..4d3c32f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
@@ -25,6 +25,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
+import org.apache.activemq.artemis.utils.DataConstants;
/**
* Message is used to sync {@link org.apache.activemq.artemis.core.io.SequentialFile}s to a backup server. The {@link FileType} controls
@@ -99,6 +100,38 @@ public final class ReplicationSyncFileMessage extends PacketImpl {
}
@Override
+ public int expectedEncodeSize() {
+ int size = PACKET_HEADERS_SIZE +
+ DataConstants.SIZE_LONG; // buffer.writeLong(fileId);
+
+ if (fileId == -1)
+ return size;
+
+ size += DataConstants.SIZE_BYTE; // buffer.writeByte(fileType.code);
+ switch (fileType) {
+ case JOURNAL: {
+ size += DataConstants.SIZE_BYTE; // buffer.writeByte(journalType.typeByte);
+ break;
+ }
+ case PAGE: {
+ size += SimpleString.sizeofString(pageStoreName);
+ break;
+ }
+ case LARGE_MESSAGE:
+ default:
+ // no-op
+ }
+
+ size += DataConstants.SIZE_INT; // buffer.writeInt(dataSize);
+
+ if (dataSize > 0) {
+ size += byteBuffer.writerIndex(); // buffer.writeBytes(byteBuffer, 0, byteBuffer.writerIndex());
+ }
+
+ return size;
+ }
+
+ @Override
public void encodeRest(final ActiveMQBuffer buffer) {
buffer.writeLong(fileId);
if (fileId == -1)
@@ -126,11 +159,6 @@ public final class ReplicationSyncFileMessage extends PacketImpl {
if (dataSize > 0) {
buffer.writeBytes(byteBuffer, 0, byteBuffer.writerIndex());
}
-
- if (byteBuffer != null) {
- byteBuffer.release();
- byteBuffer = null;
- }
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/911888e8/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyServerConnection.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyServerConnection.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyServerConnection.java
index 29e39d5..f9e1b3d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyServerConnection.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyServerConnection.java
@@ -19,8 +19,6 @@ package org.apache.activemq.artemis.core.remoting.impl.netty;
import java.util.Map;
import io.netty.channel.Channel;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.spi.core.remoting.ServerConnectionLifeCycleListener;
public class NettyServerConnection extends NettyConnection {
@@ -33,8 +31,4 @@ public class NettyServerConnection extends NettyConnection {
super(configuration, channel, listener, batchingEnabled, directDeliver);
}
- @Override
- public ActiveMQBuffer createTransportBuffer(int size) {
- return new ChannelBufferWrapper(channel.alloc().directBuffer(size), true);
- }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/911888e8/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
index 8b91c02..91dfa98 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
@@ -33,16 +33,15 @@ import io.netty.buffer.PooledByteBufAllocator;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
-import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
-import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.persistence.OperationContext;
+import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.core.protocol.core.Channel;
@@ -71,7 +70,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
-import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.jboss.logging.Logger;
@@ -84,7 +82,7 @@ import org.jboss.logging.Logger;
*
* @see ReplicationEndpoint
*/
-public final class ReplicationManager implements ActiveMQComponent, ReadyListener {
+public final class ReplicationManager implements ActiveMQComponent {
private static final Logger logger = Logger.getLogger(ReplicationManager.class);
@@ -119,8 +117,6 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
private final AtomicBoolean writable = new AtomicBoolean(true);
- private final Object replicationLock = new Object();
-
private final Queue<OperationContext> pendingTokens = new ConcurrentLinkedQueue<>();
private final ExecutorFactory executorFactory;
@@ -292,12 +288,9 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
replicatingChannel.getConnection().getTransportConnection().fireReady(true);
}
- synchronized (replicationLock) {
- enabled = false;
- writable.set(true);
- replicationLock.notifyAll();
- clearReplicationTokens();
- }
+ enabled = false;
+ writable.set(true);
+ clearReplicationTokens();
RemotingConnection toStop = remotingConnection;
if (toStop != null) {
@@ -315,16 +308,13 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
*/
public void clearReplicationTokens() {
logger.trace("clearReplicationTokens initiating");
- synchronized (replicationLock) {
- logger.trace("clearReplicationTokens entered the lock");
- while (!pendingTokens.isEmpty()) {
- OperationContext ctx = pendingTokens.poll();
- logger.trace("Calling ctx.replicationDone()");
- try {
- ctx.replicationDone();
- } catch (Throwable e) {
- ActiveMQServerLogger.LOGGER.errorCompletingCallbackOnReplicationManager(e);
- }
+ while (!pendingTokens.isEmpty()) {
+ OperationContext ctx = pendingTokens.poll();
+ logger.trace("Calling ctx.replicationDone()");
+ try {
+ ctx.replicationDone();
+ } catch (Throwable e) {
+ ActiveMQServerLogger.LOGGER.errorCompletingCallbackOnReplicationManager(e);
}
}
logger.trace("clearReplicationTokens finished");
@@ -362,24 +352,22 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
repliToken.replicationLineUp();
}
- synchronized (replicationLock) {
- if (enabled) {
- pendingTokens.add(repliToken);
- if (useExecutor) {
- replicationStream.execute(() -> {
- if (enabled) {
- flowControl();
- replicatingChannel.send(packet);
- }
- });
- } else {
- flowControl();
- replicatingChannel.send(packet);
- }
+ if (enabled) {
+ pendingTokens.add(repliToken);
+ if (useExecutor) {
+ replicationStream.execute(() -> {
+ if (enabled) {
+ flowControl(packet.expectedEncodeSize());
+ replicatingChannel.send(packet);
+ }
+ });
} else {
- // Already replicating channel failed, so just play the action now
- runItNow = true;
+ flowControl(packet.expectedEncodeSize());
+ replicatingChannel.send(packet);
}
+ } else {
+ // Already replicating channel failed, so just play the action now
+ runItNow = true;
}
// Execute outside lock
@@ -395,47 +383,20 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
* This was written as a refactoring of sendReplicatePacket.
* In case you refactor this in any way, this method must hold a lock on replication lock. .
*/
- private boolean flowControl() {
- synchronized (replicationLock) {
- // synchronized (replicationLock) { -- I'm not adding this because the caller already has it
- // future maintainers of this code please be aware that the intention here is hold the lock on replication lock
- if (!replicatingChannel.getConnection().isWritable(this)) {
- try {
- logger.trace("flowControl waiting on writable replication");
- writable.set(false);
- //don't wait for ever as this may hang tests etc, we've probably been closed anyway
- long now = System.currentTimeMillis();
- long deadline = now + timeout;
- while (!writable.get() && now < deadline) {
- replicationLock.wait(deadline - now);
- now = System.currentTimeMillis();
- }
- logger.trace("flow control done on replication");
-
- if (!writable.get()) {
- ActiveMQServerLogger.LOGGER.slowReplicationResponse();
- logger.tracef("There was no response from replication backup after %s seconds, server being stopped now", System.currentTimeMillis() - now);
- try {
- stop();
- } catch (Exception e) {
- logger.warn(e.getMessage(), e);
- }
- return false;
- }
- } catch (InterruptedException e) {
- throw new ActiveMQInterruptedException(e);
- }
+ private boolean flowControl(int size) {
+ boolean flowWorked = replicatingChannel.getConnection().blockUntilWritable(size, timeout);
+
+ if (!flowWorked) {
+ try {
+ ActiveMQServerLogger.LOGGER.slowReplicationResponse();
+ stop();
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
}
}
- return true;
- }
- @Override
- public void readyForWriting() {
- synchronized (replicationLock) {
- writable.set(true);
- replicationLock.notifyAll();
- }
+
+ return flowWorked;
}
/**
@@ -569,15 +530,17 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
if (!file.isOpen()) {
file.open();
}
+ int size = 32 * 1024;
+ final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size, size);
+
try {
try (final FileInputStream fis = new FileInputStream(file.getJavaFile());
final FileChannel channel = fis.getChannel()) {
// We can afford having a single buffer here for this entire loop
// because sendReplicatePacket will encode the packet as a NettyBuffer
// through ActiveMQBuffer class leaving this buffer free to be reused on the next copy
- int size = 1 << 17;
while (true) {
- final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size);
+ buffer.clear();
ByteBuffer byteBuffer = buffer.writerIndex(size).readerIndex(0).nioBuffer();
final int bytesRead = channel.read(byteBuffer);
int toSend = bytesRead;
@@ -600,6 +563,7 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
}
}
} finally {
+ buffer.release();
if (file.isOpen())
file.close();
}
[2/2] activemq-artemis git commit: This closes #1183
Posted by jb...@apache.org.
This closes #1183
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/7304416d
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/7304416d
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/7304416d
Branch: refs/heads/master
Commit: 7304416d424dfe15918b8141794ea0d10c94e149
Parents: d29afd1 911888e
Author: Justin Bertram <jb...@apache.org>
Authored: Fri Apr 7 09:10:44 2017 -0500
Committer: Justin Bertram <jb...@apache.org>
Committed: Fri Apr 7 09:10:44 2017 -0500
----------------------------------------------------------------------
.../protocol/core/CoreRemotingConnection.java | 8 ++
.../artemis/core/protocol/core/Packet.java | 10 ++
.../core/protocol/core/impl/PacketImpl.java | 9 +-
.../core/impl/RemotingConnectionImpl.java | 6 +
.../core/impl/wireformat/MessagePacket.java | 12 --
.../wireformat/SessionContinuationMessage.java | 16 +--
.../SessionReceiveContinuationMessage.java | 6 +-
.../impl/wireformat/SessionReceiveMessage.java | 6 +-
.../SessionSendContinuationMessage.java | 6 +-
.../impl/wireformat/SessionSendMessage.java | 5 +-
.../impl/wireformat/ReplicationAddMessage.java | 13 +-
.../wireformat/ReplicationAddTXMessage.java | 13 ++
.../wireformat/ReplicationCommitMessage.java | 9 ++
.../wireformat/ReplicationDeleteMessage.java | 9 ++
.../wireformat/ReplicationDeleteTXMessage.java | 11 ++
.../ReplicationLargeMessageBeginMessage.java | 9 ++
.../ReplicationLargeMessageEndMessage.java | 8 ++
.../ReplicationLargeMessageWriteMessage.java | 10 ++
.../ReplicationLiveIsStoppingMessage.java | 7 ++
.../wireformat/ReplicationPageEventMessage.java | 9 ++
.../wireformat/ReplicationPageWriteMessage.java | 8 ++
.../wireformat/ReplicationPrepareMessage.java | 10 ++
.../wireformat/ReplicationResponseMessage.java | 7 ++
.../ReplicationResponseMessageV2.java | 7 ++
.../wireformat/ReplicationStartSyncMessage.java | 21 ++++
.../wireformat/ReplicationSyncFileMessage.java | 38 +++++-
.../impl/netty/NettyServerConnection.java | 6 -
.../core/replication/ReplicationManager.java | 120 +++++++------------
28 files changed, 267 insertions(+), 132 deletions(-)
----------------------------------------------------------------------