You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2022/07/18 20:34:37 UTC
[activemq-artemis] branch main updated: ARTEMIS-3767 Incompatibility on replication between 2.17 and current version
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 8e54a65227 ARTEMIS-3767 Incompatibility on replication between 2.17 and current version
8e54a65227 is described below
commit 8e54a6522757901b43001a901bae92f165a9f557
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Fri Jul 15 16:00:41 2022 -0400
ARTEMIS-3767 Incompatibility on replication between 2.17 and current version
---
.../core/protocol/core/CoreRemotingConnection.java | 5 ++
.../artemis/core/protocol/ServerPacketDecoder.java | 6 +--
.../impl/wireformat/ReplicationAddMessage.java | 28 ++++++++--
.../impl/wireformat/ReplicationAddTXMessage.java | 28 ++++++++--
.../wireformat/ReplicationStartSyncMessage.java | 39 +++++++++-----
.../core/replication/ReplicationManager.java | 10 ++--
tests/compatibility-tests/pom.xml | 60 ++++++++++++++++++++++
.../artemis/tests/compatibility/GroovyRun.java | 2 +
.../multiVersionReplica/backupServer.groovy | 7 ++-
.../multiVersionReplica/mainServer.groovy | 8 ++-
.../compatibility/MultiVersionReplicaTest.java | 53 +++++++++++++------
11 files changed, 196 insertions(+), 50 deletions(-)
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java
index 78f28f2449..43035b0520 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java
@@ -71,6 +71,11 @@ public interface CoreRemotingConnection extends RemotingConnection {
return version >= PacketImpl.ARTEMIS_2_24_0_VERSION;
}
+ default boolean isBeforeTwoEighteen() {
+ int version = getChannelVersion();
+ return version < PacketImpl.ARTEMIS_2_18_0_VERSION;
+ }
+
/**
* Sets the client protocol used on the communication. This will determine if the client has
* support for certain packet types
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
index fa3d30532b..53b784da66 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
@@ -162,11 +162,11 @@ public class ServerPacketDecoder extends ClientPacketDecoder {
break;
}
case REPLICATION_APPEND: {
- packet = new ReplicationAddMessage();
+ packet = new ReplicationAddMessage(connection.isBeforeTwoEighteen());
break;
}
case REPLICATION_APPEND_TX: {
- packet = new ReplicationAddTXMessage();
+ packet = new ReplicationAddTXMessage(connection.isBeforeTwoEighteen());
break;
}
case REPLICATION_DELETE: {
@@ -222,7 +222,7 @@ public class ServerPacketDecoder extends ClientPacketDecoder {
break;
}
case PacketImpl.REPLICATION_START_FINISH_SYNC: {
- packet = new ReplicationStartSyncMessage();
+ packet = new ReplicationStartSyncMessage(connection.isBeforeTwoEighteen());
break;
}
case PacketImpl.REPLICATION_SYNC_FILE: {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddMessage.java
index c85cc84fd5..12eef32ec0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddMessage.java
@@ -43,17 +43,22 @@ public final class ReplicationAddMessage extends PacketImpl {
private byte[] recordData;
- public ReplicationAddMessage() {
+ // this is for version compatibility
+ private final boolean beforeTwoEighteen;
+
+ public ReplicationAddMessage(final boolean beforeTwoEighteen) {
super(PacketImpl.REPLICATION_APPEND);
+ this.beforeTwoEighteen = beforeTwoEighteen;
}
- public ReplicationAddMessage(final byte journalID,
+ public ReplicationAddMessage(final boolean beforeTwoEighteen,
+ final byte journalID,
final ADD_OPERATION_TYPE operation,
final long id,
final byte journalRecordType,
final Persister persister,
final Object encodingData) {
- this();
+ this(beforeTwoEighteen);
this.journalID = journalID;
this.operation = operation;
this.id = id;
@@ -77,7 +82,11 @@ public final class ReplicationAddMessage extends PacketImpl {
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
buffer.writeByte(journalID);
- buffer.writeByte(operation.toRecord());
+ if (beforeTwoEighteen) {
+ buffer.writeBoolean(operation == ADD_OPERATION_TYPE.UPDATE);
+ } else {
+ buffer.writeByte(operation.toRecord());
+ }
buffer.writeLong(id);
buffer.writeByte(journalRecordType);
buffer.writeInt(persister.getEncodeSize(encodingData));
@@ -87,7 +96,16 @@ public final class ReplicationAddMessage extends PacketImpl {
@Override
public void decodeRest(final ActiveMQBuffer buffer) {
journalID = buffer.readByte();
- operation = ADD_OPERATION_TYPE.toOperation(buffer.readByte());
+ if (beforeTwoEighteen) {
+ boolean isUpdate = buffer.readBoolean();
+ if (isUpdate) {
+ operation = ADD_OPERATION_TYPE.UPDATE;
+ } else {
+ operation = ADD_OPERATION_TYPE.ADD;
+ }
+ } else {
+ operation = ADD_OPERATION_TYPE.toOperation(buffer.readByte());
+ }
id = buffer.readLong();
journalRecordType = buffer.readByte();
final int recordDataSize = buffer.readInt();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddTXMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddTXMessage.java
index 73a35d4db4..5fabfef210 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddTXMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddTXMessage.java
@@ -45,18 +45,23 @@ public class ReplicationAddTXMessage extends PacketImpl {
private ADD_OPERATION_TYPE operation;
- public ReplicationAddTXMessage() {
+ // this is for version compatibility
+ private final boolean beforeTwoEighteen;
+
+ public ReplicationAddTXMessage(final boolean beforeTwoEighteen) {
super(PacketImpl.REPLICATION_APPEND_TX);
+ this.beforeTwoEighteen = beforeTwoEighteen;
}
- public ReplicationAddTXMessage(final byte journalID,
+ public ReplicationAddTXMessage(final boolean beforeTwoEighteen,
+ final byte journalID,
final ADD_OPERATION_TYPE operation,
final long txId,
final long id,
final byte recordType,
final Persister persister,
final Object encodingData) {
- this();
+ this(beforeTwoEighteen);
this.journalID = journalID;
this.operation = operation;
this.txId = txId;
@@ -82,7 +87,11 @@ public class ReplicationAddTXMessage extends PacketImpl {
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
buffer.writeByte(journalID);
- buffer.writeByte(operation.toRecord());
+ if (beforeTwoEighteen) {
+ buffer.writeBoolean(operation == ADD_OPERATION_TYPE.UPDATE);
+ } else {
+ buffer.writeByte(operation.toRecord());
+ }
buffer.writeLong(txId);
buffer.writeLong(id);
buffer.writeByte(recordType);
@@ -93,7 +102,16 @@ public class ReplicationAddTXMessage extends PacketImpl {
@Override
public void decodeRest(final ActiveMQBuffer buffer) {
journalID = buffer.readByte();
- operation = ADD_OPERATION_TYPE.toOperation(buffer.readByte());
+ if (beforeTwoEighteen) {
+ boolean isUpdate = buffer.readBoolean();
+ if (isUpdate) {
+ operation = ADD_OPERATION_TYPE.UPDATE;
+ } else {
+ operation = ADD_OPERATION_TYPE.ADD;
+ }
+ } else {
+ operation = ADD_OPERATION_TYPE.toOperation(buffer.readByte());
+ }
txId = buffer.readLong();
id = buffer.readLong();
recordType = buffer.readByte();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java
index e2e3fa463b..31003ab73d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java
@@ -16,16 +16,16 @@
*/
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
-import java.security.InvalidParameterException;
-import java.util.Arrays;
-import java.util.List;
-
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.utils.DataConstants;
+import java.security.InvalidParameterException;
+import java.util.Arrays;
+import java.util.List;
+
/**
* This message may signal start or end of the replication synchronization.
* <p>
@@ -40,6 +40,10 @@ public class ReplicationStartSyncMessage extends PacketImpl {
private String nodeID;
private boolean allowsAutoFailBack;
+ // this is for version compatibility
+ // certain versions will need to interrupt encoding and decoding after synchronizationIsFinished on the encoding depending on its value
+ private final boolean beforeTwoEighteen;
+
public enum SyncDataType {
JournalBindings(AbstractJournalStorageManager.JournalContent.BINDINGS.typeByte),
JournalMessages(AbstractJournalStorageManager.JournalContent.MESSAGES.typeByte),
@@ -70,12 +74,13 @@ public class ReplicationStartSyncMessage extends PacketImpl {
}
}
- public ReplicationStartSyncMessage() {
+ public ReplicationStartSyncMessage(boolean beforeTwoEighteen) {
super(REPLICATION_START_FINISH_SYNC);
+ this.beforeTwoEighteen = synchronizationIsFinished;
}
- public ReplicationStartSyncMessage(List<Long> filenames) {
- this();
+ public ReplicationStartSyncMessage(boolean beforeTwoEighteen, List<Long> filenames) {
+ this(beforeTwoEighteen);
ids = new long[filenames.size()];
for (int i = 0; i < filenames.size(); i++) {
ids[i] = filenames.get(i);
@@ -85,24 +90,24 @@ public class ReplicationStartSyncMessage extends PacketImpl {
}
- public ReplicationStartSyncMessage(String nodeID, long nodeDataVersion) {
- this(nodeID);
+ public ReplicationStartSyncMessage(boolean beforeTwoEighteen, String nodeID, long nodeDataVersion) {
+ this(beforeTwoEighteen, nodeID);
ids = new long[1];
ids[0] = nodeDataVersion;
dataType = SyncDataType.ActivationSequence;
}
- public ReplicationStartSyncMessage(String nodeID) {
- this();
+ public ReplicationStartSyncMessage(boolean beforeTwoEighteen, String nodeID) {
+ this(beforeTwoEighteen);
synchronizationIsFinished = true;
this.nodeID = nodeID;
}
- public ReplicationStartSyncMessage(JournalFile[] datafiles,
+ public ReplicationStartSyncMessage(boolean beforeTwoEighteen, JournalFile[] datafiles,
AbstractJournalStorageManager.JournalContent contentType,
String nodeID,
boolean allowsAutoFailBack) {
- this();
+ this(beforeTwoEighteen);
this.nodeID = nodeID;
this.allowsAutoFailBack = allowsAutoFailBack;
synchronizationIsFinished = false;
@@ -143,6 +148,10 @@ public class ReplicationStartSyncMessage extends PacketImpl {
buffer.writeBoolean(synchronizationIsFinished);
buffer.writeBoolean(allowsAutoFailBack);
buffer.writeString(nodeID);
+ if (beforeTwoEighteen && synchronizationIsFinished) {
+ // At this point, pre 2.18.0 servers don't expect any more data to come.
+ return;
+ }
buffer.writeByte(dataType.code);
buffer.writeInt(ids.length);
for (long id : ids) {
@@ -155,6 +164,10 @@ public class ReplicationStartSyncMessage extends PacketImpl {
synchronizationIsFinished = buffer.readBoolean();
allowsAutoFailBack = buffer.readBoolean();
nodeID = buffer.readString();
+ if (buffer.readableBytes() == 0) {
+ // Pre-2.18.0 server wouldn't send anything more than this.
+ return;
+ }
dataType = SyncDataType.getDataType(buffer.readByte());
int length = buffer.readInt();
ids = new long[length];
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
index 6f98470028..1755f11b88 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
@@ -224,7 +224,7 @@ public final class ReplicationManager implements ActiveMQComponent {
final Persister persister,
final Object record) throws Exception {
if (enabled) {
- sendReplicatePacket(new ReplicationAddMessage(journalID, operation, id, recordType, persister, record));
+ sendReplicatePacket(new ReplicationAddMessage(remotingConnection.isBeforeTwoEighteen(), journalID, operation, id, recordType, persister, record));
}
}
@@ -242,7 +242,7 @@ public final class ReplicationManager implements ActiveMQComponent {
final Persister persister,
final Object record) throws Exception {
if (enabled) {
- sendReplicatePacket(new ReplicationAddTXMessage(journalID, operation, txID, id, recordType, persister, record));
+ sendReplicatePacket(new ReplicationAddTXMessage(remotingConnection.isBeforeTwoEighteen(), journalID, operation, txID, id, recordType, persister, record));
}
}
@@ -801,7 +801,7 @@ public final class ReplicationManager implements ActiveMQComponent {
String nodeID,
boolean allowsAutoFailBack) throws ActiveMQException {
if (enabled)
- sendReplicatePacket(new ReplicationStartSyncMessage(datafiles, contentType, nodeID, allowsAutoFailBack));
+ sendReplicatePacket(new ReplicationStartSyncMessage(remotingConnection.isBeforeTwoEighteen(), datafiles, contentType, nodeID, allowsAutoFailBack));
}
/**
@@ -820,7 +820,7 @@ public final class ReplicationManager implements ActiveMQComponent {
}
synchronizationIsFinishedAcknowledgement.countUp();
- sendReplicatePacket(new ReplicationStartSyncMessage(nodeID, server.getNodeManager().getNodeActivationSequence()));
+ sendReplicatePacket(new ReplicationStartSyncMessage(remotingConnection.isBeforeTwoEighteen(), nodeID, server.getNodeManager().getNodeActivationSequence()));
try {
if (!synchronizationIsFinishedAcknowledgement.await(initialReplicationSyncTimeout)) {
ActiveMQReplicationTimeooutException exception = ActiveMQMessageBundle.BUNDLE.replicationSynchronizationTimeout(initialReplicationSyncTimeout);
@@ -865,7 +865,7 @@ public final class ReplicationManager implements ActiveMQComponent {
idsToSend = new ArrayList<>(largeMessages.keySet());
if (enabled)
- sendReplicatePacket(new ReplicationStartSyncMessage(idsToSend));
+ sendReplicatePacket(new ReplicationStartSyncMessage(remotingConnection.isBeforeTwoEighteen(), idsToSend));
}
/**
diff --git a/tests/compatibility-tests/pom.xml b/tests/compatibility-tests/pom.xml
index a355ddcd1a..28f9390df4 100644
--- a/tests/compatibility-tests/pom.xml
+++ b/tests/compatibility-tests/pom.xml
@@ -412,6 +412,58 @@
<variableName>ARTEMIS-2_10_0</variableName>
</configuration>
</execution>
+ <execution>
+ <phase>compile</phase>
+ <goals>
+ <goal>dependency-scan</goal>
+ </goals>
+ <id>2_17_0-check</id>
+ <configuration>
+ <optional>true</optional>
+ <libListWithDeps>
+ <arg>org.apache.activemq:artemis-jms-server:2.17.0</arg>
+ <arg>org.apache.activemq:artemis-jms-client:2.17.0</arg>
+ <arg>org.apache.activemq:artemis-cli:2.17.0</arg>
+ <arg>org.apache.activemq:artemis-hornetq-protocol:2.17.0</arg>
+ <arg>org.apache.activemq:artemis-amqp-protocol:2.17.0</arg>
+ <arg>org.apache.activemq:artemis-hornetq-protocol:2.17.0</arg>
+ <arg>org.apache.groovy:groovy-all:pom:${groovy.version}</arg>
+ <arg>org.jboss.marshalling:jboss-marshalling-river:2.0.9.Final</arg>
+ </libListWithDeps>
+ <libList>
+ <arg>org.apache.activemq.tests:compatibility-tests:${project.version}</arg>
+ </libList>
+ <!-- for future maintainers, notice that if you add new variables you also need to add the system property
+ otherwise this is not captured, search for the word @@@@ on this pom where I left anothr comment -->
+ <variableName>ARTEMIS-2_17_0</variableName>
+ </configuration>
+ </execution> <execution>
+ <phase>compile</phase>
+ <goals>
+ <goal>dependency-scan</goal>
+ </goals>
+ <id>2_18_0-check</id>
+ <configuration>
+ <optional>true</optional>
+ <libListWithDeps>
+ <arg>org.apache.activemq:artemis-jms-server:2.18.0</arg>
+ <arg>org.apache.activemq:artemis-jms-client:2.18.0</arg>
+ <arg>org.apache.activemq:artemis-cli:2.18.0</arg>
+ <arg>org.apache.activemq:artemis-hornetq-protocol:2.18.0</arg>
+ <arg>org.apache.activemq:artemis-amqp-protocol:2.18.0</arg>
+ <arg>org.apache.activemq:artemis-hornetq-protocol:2.18.0</arg>
+ <arg>org.apache.groovy:groovy-all:pom:${groovy.version}</arg>
+ <arg>org.jboss.marshalling:jboss-marshalling-river:2.0.9.Final</arg>
+ </libListWithDeps>
+ <libList>
+ <arg>org.apache.activemq.tests:compatibility-tests:${project.version}</arg>
+ </libList>
+ <!-- for future maintainers, notice that if you add new variables you also need to add the system property
+ otherwise this is not captured, search for the word @@@@ on this pom where I left anothr comment -->
+ <variableName>ARTEMIS-2_18_0</variableName>
+ </configuration>
+ </execution>
+
<execution>
<phase>compile</phase>
<goals>
@@ -661,6 +713,14 @@
<name>ARTEMIS-2_10_0</name> <!-- 2.10.0 -->
<value>${ARTEMIS-2_10_0}</value>
</property>
+ <property>
+ <name>ARTEMIS-2_17_0</name>
+ <value>${ARTEMIS-2_17_0}</value>
+ </property>
+ <property>
+ <name>ARTEMIS-2_18_0</name>
+ <value>${ARTEMIS-2_18_0}</value>
+ </property>
<property>
<name>ARTEMIS-2_22_0</name>
<value>${ARTEMIS-2_22_0}</value>
diff --git a/tests/compatibility-tests/src/main/java/org/apache/activemq/artemis/tests/compatibility/GroovyRun.java b/tests/compatibility-tests/src/main/java/org/apache/activemq/artemis/tests/compatibility/GroovyRun.java
index 2ac0edb628..7ff09f64af 100644
--- a/tests/compatibility-tests/src/main/java/org/apache/activemq/artemis/tests/compatibility/GroovyRun.java
+++ b/tests/compatibility-tests/src/main/java/org/apache/activemq/artemis/tests/compatibility/GroovyRun.java
@@ -39,6 +39,8 @@ public class GroovyRun {
public static final String TWO_SIX_THREE = "ARTEMIS-263";
public static final String TWO_SEVEN_ZERO = "ARTEMIS-270";
public static final String TWO_TEN_ZERO = "ARTEMIS-2_10_0";
+ public static final String TWO_SEVENTEEN_ZERO = "ARTEMIS-2_17_0";
+ public static final String TWO_EIGHTEEN_ZERO = "ARTEMIS-2_18_0";
public static final String TWO_TWENTYTWO_ZERO = "ARTEMIS-2_22_0";
public static final String HORNETQ_235 = "HORNETQ-235";
public static final String HORNETQ_247 = "HORNETQ-247";
diff --git a/tests/compatibility-tests/src/main/resources/multiVersionReplica/backupServer.groovy b/tests/compatibility-tests/src/main/resources/multiVersionReplica/backupServer.groovy
index c98cd43673..9f5921394e 100644
--- a/tests/compatibility-tests/src/main/resources/multiVersionReplica/backupServer.groovy
+++ b/tests/compatibility-tests/src/main/resources/multiVersionReplica/backupServer.groovy
@@ -39,7 +39,12 @@ configuration.addAcceptorConfiguration("artemis", "tcp://localhost:" + port);
configuration.addConnectorConfiguration("local", "tcp://localhost:" + port);
configuration.setSecurityEnabled(false);
configuration.setPersistenceEnabled(true);
-configuration.setGlobalMaxMessages(100);
+
+if (configuration.metaClass.hasMetaProperty("globalMaxMessages")) {
+ configuration.globalMaxMessages = 10
+} else {
+ configuration.globalMaxSize = 10 * 1024
+}
configuration.setHAPolicyConfiguration(new ReplicaPolicyConfiguration().setClusterName("main"))
configuration.addAddressesSetting("#", new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE));
diff --git a/tests/compatibility-tests/src/main/resources/multiVersionReplica/mainServer.groovy b/tests/compatibility-tests/src/main/resources/multiVersionReplica/mainServer.groovy
index 48179dac9d..68e4622718 100644
--- a/tests/compatibility-tests/src/main/resources/multiVersionReplica/mainServer.groovy
+++ b/tests/compatibility-tests/src/main/resources/multiVersionReplica/mainServer.groovy
@@ -41,7 +41,13 @@ configuration.addAcceptorConfiguration("artemis", "tcp://localhost:" + port);
configuration.addConnectorConfiguration("local", "tcp://localhost:" + port);
configuration.setSecurityEnabled(false);
configuration.setPersistenceEnabled(true);
-configuration.setGlobalMaxMessages(10);
+
+if (configuration.metaClass.hasMetaProperty("globalMaxMessages")) {
+ configuration.globalMaxMessages = 10
+} else {
+ configuration.globalMaxSize = 10 * 1024
+}
+
configuration.setHAPolicyConfiguration(new ReplicatedPolicyConfiguration().setClusterName("main"))
configuration.addAddressesSetting("#", new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE));
diff --git a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/MultiVersionReplicaTest.java b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/MultiVersionReplicaTest.java
index 542022d4a5..fa455d5d98 100644
--- a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/MultiVersionReplicaTest.java
+++ b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/MultiVersionReplicaTest.java
@@ -38,6 +38,8 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT;
+import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_EIGHTEEN_ZERO;
+import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_SEVENTEEN_ZERO;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.TWO_TWENTYTWO_ZERO;
@RunWith(Parameterized.class)
@@ -58,7 +60,11 @@ public class MultiVersionReplicaTest extends ClasspathBase {
List<Object[]> combinations = new ArrayList<>();
combinations.add(new Object[]{TWO_TWENTYTWO_ZERO, SNAPSHOT});
combinations.add(new Object[]{SNAPSHOT, TWO_TWENTYTWO_ZERO});
- // The SNAPSHOT/SNAPSHOT is here as a teest validation only, like in other cases where SNAPSHOT/SNAPSHOT is used.
+ combinations.add(new Object[]{TWO_SEVENTEEN_ZERO, SNAPSHOT});
+ combinations.add(new Object[]{SNAPSHOT, TWO_SEVENTEEN_ZERO});
+ combinations.add(new Object[]{TWO_EIGHTEEN_ZERO, SNAPSHOT});
+ combinations.add(new Object[]{SNAPSHOT, TWO_EIGHTEEN_ZERO});
+ // The SNAPSHOT/SNAPSHOT is here as a test validation only, like in other cases where SNAPSHOT/SNAPSHOT is used.
combinations.add(new Object[]{SNAPSHOT, SNAPSHOT});
return combinations;
}
@@ -93,35 +99,48 @@ public class MultiVersionReplicaTest extends ClasspathBase {
evaluate(mainClassloader, "multiVersionReplica/mainServerIsReplicated.groovy");
- send(new ActiveMQConnectionFactory("tcp://localhost:61000"), 2000);
- send(new JmsConnectionFactory("amqp://localhost:61000"), 2000);
+ send(new ActiveMQConnectionFactory("tcp://localhost:61000"), 2000, 10);
+ send(new JmsConnectionFactory("amqp://localhost:61000"), 2000, 10);
evaluate(mainClassloader, "multiVersionReplica/mainServerStop.groovy");
evaluate(backupClassLoader, "multiVersionReplica/backupServerIsActive.groovy");
- receive(new ActiveMQConnectionFactory("tcp://localhost:61001"), 2000);
- receive(new JmsConnectionFactory("amqp://localhost:61001"), 2000);
+ receive(new ActiveMQConnectionFactory("tcp://localhost:61001"), 2010);
+ receive(new JmsConnectionFactory("amqp://localhost:61001"), 2010);
evaluate(backupClassLoader, "multiVersionReplica/backupServerStop.groovy");
}
- private void send(ConnectionFactory factory, int numberOfMessages) throws Throwable {
+ private void send(ConnectionFactory factory, int numberOfMessagesTx, int numberOfMessagesNonTx) throws Throwable {
try (Connection connection = factory.createConnection()) {
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- Queue queue = session.createQueue(QUEUE_NAME);
- MessageProducer producer = session.createProducer(queue);
- boolean pending = false;
- for (int i = 0; i < numberOfMessages; i++) {
- producer.send(session.createTextMessage("Hello world!!!!!"));
- pending = true;
- if (i > 0 && i % 100 == 0) {
+ Queue queue;
+
+ {
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ queue = session.createQueue(QUEUE_NAME);
+ MessageProducer producer = session.createProducer(queue);
+ boolean pending = false;
+ for (int i = 0; i < numberOfMessagesTx; i++) {
+ producer.send(session.createTextMessage("Hello world!!!!!"));
+ pending = true;
+ if (i > 0 && i % 100 == 0) {
+ session.commit();
+ pending = false;
+ }
+ }
+ if (pending) {
session.commit();
- pending = false;
}
+ session.close();
}
- if (pending) {
- session.commit();
+
+ {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(queue);
+ for (int i = 0; i < numberOfMessagesNonTx; i++) {
+ producer.send(session.createTextMessage("Hello world!!!!!"));
+ }
}
}
}