You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/07/01 14:39:00 UTC
[1/2] activemq-artemis git commit: ARTEMIS-1269 replication won't
finish synchronization
Repository: activemq-artemis
Updated Branches:
refs/heads/master c7af95447 -> 89a53ad01
ARTEMIS-1269 replication won't finish synchronization
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/387fca58
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/387fca58
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/387fca58
Branch: refs/heads/master
Commit: 387fca584e7cb66656f23b29a24169bb2fb8fc67
Parents: c7af954
Author: Clebert Suconic <cl...@apache.org>
Authored: Fri Jun 30 20:19:43 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Sat Jul 1 00:48:33 2017 -0400
----------------------------------------------------------------------
.../core/client/ActiveMQClientLogger.java | 2 +-
.../core/protocol/core/impl/ChannelImpl.java | 10 +++
.../core/impl/RemotingConnectionImpl.java | 2 +-
.../ReplicationResponseMessageV2.java | 3 +-
.../core/replication/ReplicationEndpoint.java | 74 +++++++++++++++++---
.../core/replication/ReplicationManager.java | 7 +-
.../core/server/impl/ReplicationError.java | 12 ++--
.../impl/SharedNothingBackupActivation.java | 34 ++++++---
.../artemis/tests/util/ActiveMQTestBase.java | 6 +-
.../failover/LargeMessageFailoverTest.java | 5 +-
.../ReplicatedMultipleServerFailoverTest.java | 2 +-
.../tests/util/TransportConfigurationUtils.java | 4 +-
12 files changed, 127 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/387fca58/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java
index bdb4bd1..405ed07 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java
@@ -374,7 +374,7 @@ public interface ActiveMQClientLogger extends BasicLogger {
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 214013, value = "Failed to decode packet", format = Message.Format.MESSAGE_FORMAT)
- void errorDecodingPacket(@Cause Exception e);
+ void errorDecodingPacket(@Cause Throwable e);
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 214014, value = "Failed to execute failure listener", format = Message.Format.MESSAGE_FORMAT)
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/387fca58/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
index 75c23de..39bddf5 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
@@ -462,6 +462,10 @@ public final class ChannelImpl implements Channel {
@Override
public void setHandler(final ChannelHandler handler) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Setting handler on " + this + " as " + handler);
+ }
+
this.handler = handler;
}
@@ -521,6 +525,9 @@ public final class ChannelImpl implements Channel {
@Override
public void lock() {
+ if (logger.isTraceEnabled()) {
+ logger.trace("lock channel " + this);
+ }
lock.lock();
reconnectID.incrementAndGet();
@@ -532,6 +539,9 @@ public final class ChannelImpl implements Channel {
@Override
public void unlock() {
+ if (logger.isTraceEnabled()) {
+ logger.trace("unlock channel " + this);
+ }
lock.lock();
failingOver = false;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/387fca58/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
index cc1d685..e0837e9 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
@@ -363,7 +363,7 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
doBufferReceived(packet);
super.bufferReceived(connectionID, buffer);
- } catch (Exception e) {
+ } catch (Throwable e) {
ActiveMQClientLogger.LOGGER.errorDecodingPacket(e);
throw new IllegalStateException(e);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/387fca58/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessageV2.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessageV2.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessageV2.java
index b26084b..c01dd4f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessageV2.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessageV2.java
@@ -38,8 +38,9 @@ public final class ReplicationResponseMessageV2 extends ReplicationResponseMessa
return synchronizationIsFinishedAcknowledgement;
}
- public void setSynchronizationIsFinishedAcknowledgement(boolean synchronizationIsFinishedAcknowledgement) {
+ public ReplicationResponseMessageV2 setSynchronizationIsFinishedAcknowledgement(boolean synchronizationIsFinishedAcknowledgement) {
this.synchronizationIsFinishedAcknowledgement = synchronizationIsFinishedAcknowledgement;
+ return this;
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/387fca58/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
index f879aeb..a68c3f9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
@@ -210,7 +210,16 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
ActiveMQServerLogger.LOGGER.errorHandlingReplicationPacket(e, packet);
response = new ActiveMQExceptionMessage(ActiveMQMessageBundle.BUNDLE.replicationUnhandledError(e));
}
- channel.send(response);
+
+ if (response != null) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Returning " + response);
+ }
+
+ channel.send(response);
+ } else {
+ logger.trace("Response is null, ignoring response");
+ }
}
/**
@@ -332,34 +341,68 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
private void finishSynchronization(String liveID) throws Exception {
if (logger.isTraceEnabled()) {
- logger.trace("finishSynchronization::" + liveID);
+ logger.trace("BACKUP-SYNC-START: finishSynchronization::" + liveID);
}
for (JournalContent jc : EnumSet.allOf(JournalContent.class)) {
Journal journal = journalsHolder.remove(jc);
+ if (logger.isTraceEnabled()) {
+ logger.trace("getting lock on " + jc + ", journal = " + journal);
+ }
+ registerJournal(jc.typeByte, journal);
journal.synchronizationLock();
try {
+ if (logger.isTraceEnabled()) {
+ logger.trace("lock acquired on " + jc);
+ }
// files should be already in place.
filesReservedForSync.remove(jc);
- registerJournal(jc.typeByte, journal);
+ if (logger.isTraceEnabled()) {
+ logger.trace("stopping journal for " + jc);
+ }
journal.stop();
+ if (logger.isTraceEnabled()) {
+ logger.trace("starting journal for " + jc);
+ }
journal.start();
+ if (logger.isTraceEnabled()) {
+ logger.trace("loadAndSync " + jc);
+ }
journal.loadSyncOnly(JournalState.SYNCING_UP_TO_DATE);
} finally {
+ if (logger.isTraceEnabled()) {
+ logger.trace("unlocking " + jc);
+ }
journal.synchronizationUnlock();
}
}
+
+ if (logger.isTraceEnabled()) {
+ logger.trace("Sync on large messages...");
+ }
ByteBuffer buffer = ByteBuffer.allocate(4 * 1024);
for (Entry<Long, ReplicatedLargeMessage> entry : largeMessages.entrySet()) {
ReplicatedLargeMessage lm = entry.getValue();
if (lm instanceof LargeServerMessageInSync) {
LargeServerMessageInSync lmSync = (LargeServerMessageInSync) lm;
+ if (logger.isTraceEnabled()) {
+ logger.trace("lmSync on " + lmSync.toString());
+ }
lmSync.joinSyncedData(buffer);
}
}
+ if (logger.isTraceEnabled()) {
+ logger.trace("setRemoteBackupUpToDate and liveIDSet for " + liveID);
+ }
+
journalsHolder = null;
backupQuorum.liveIDSet(liveID);
activation.setRemoteBackupUpToDate();
+
+ if (logger.isTraceEnabled()) {
+ logger.trace("Backup is synchronized / BACKUP-SYNC-DONE");
+ }
+
ActiveMQServerLogger.LOGGER.backupServerSynched(server);
return;
}
@@ -428,13 +471,28 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
if (logger.isTraceEnabled()) {
logger.trace("handleStartReplicationSynchronization:: nodeID = " + packet);
}
- ReplicationResponseMessageV2 replicationResponseMessage = new ReplicationResponseMessageV2();
- if (!started)
- return replicationResponseMessage;
if (packet.isSynchronizationFinished()) {
- finishSynchronization(packet.getNodeID());
- replicationResponseMessage.setSynchronizationIsFinishedAcknowledgement(true);
+ executor.execute(() -> {
+ try {
+ // this is a long running process, we cannot block the reading thread from netty
+ finishSynchronization(packet.getNodeID());
+ if (logger.isTraceEnabled()) {
+ logger.trace("returning completion on synchronization catchup");
+ }
+ channel.send(new ReplicationResponseMessageV2().setSynchronizationIsFinishedAcknowledgement(true));
+ } catch (Exception e) {
+ logger.warn(e.getMessage());
+ channel.send(new ActiveMQExceptionMessage(ActiveMQMessageBundle.BUNDLE.replicationUnhandledError(e)));
+ }
+
+ });
+ // the write will happen through an executor
+ return null;
+ }
+
+ ReplicationResponseMessageV2 replicationResponseMessage = new ReplicationResponseMessageV2();
+ if (!started) {
return replicationResponseMessage;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/387fca58/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
index 398f452..3b6f9d6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
@@ -356,15 +356,16 @@ public final class ReplicationManager implements ActiveMQComponent {
}
if (enabled) {
- pendingTokens.add(repliToken);
if (useExecutor) {
replicationStream.execute(() -> {
if (enabled) {
+ pendingTokens.add(repliToken);
flowControl(packet.expectedEncodeSize());
replicatingChannel.send(packet);
}
});
} else {
+ pendingTokens.add(repliToken);
flowControl(packet.expectedEncodeSize());
replicatingChannel.send(packet);
}
@@ -411,9 +412,9 @@ public final class ReplicationManager implements ActiveMQComponent {
OperationContext ctx = pendingTokens.poll();
if (ctx == null) {
- throw new IllegalStateException("Missing replication token on the queue.");
+ logger.warn("Missing replication token on queue");
+ return;
}
-
ctx.replicationDone();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/387fca58/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationError.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationError.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationError.java
index 7c333a5..83b49c9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationError.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationError.java
@@ -22,10 +22,10 @@ import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.BackupReplicationStartFailedMessage;
-import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.LiveNodeLocator;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.jboss.logging.Logger;
/**
* Stops the backup in case of an error at the start of Replication.
@@ -36,11 +36,11 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
*/
final class ReplicationError implements Interceptor {
- private final ActiveMQServer server;
+ private static final Logger logger = Logger.getLogger(ReplicationError.class);
+
private LiveNodeLocator nodeLocator;
- ReplicationError(ActiveMQServer server, LiveNodeLocator nodeLocator) {
- this.server = server;
+ ReplicationError(LiveNodeLocator nodeLocator) {
this.nodeLocator = nodeLocator;
}
@@ -48,6 +48,10 @@ final class ReplicationError implements Interceptor {
public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException {
if (packet.getType() != PacketImpl.BACKUP_REGISTRATION_FAILED)
return true;
+
+ if (logger.isTraceEnabled()) {
+ logger.trace("Received ReplicationError::" + packet);
+ }
BackupReplicationStartFailedMessage message = (BackupReplicationStartFailedMessage) packet;
switch (message.getRegistrationProblem()) {
case ALREADY_REPLICATING:
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/387fca58/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java
index d45abe3..fcba00c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java
@@ -101,6 +101,8 @@ public final class SharedNothingBackupActivation extends Activation {
@Override
public void run() {
try {
+
+ logger.trace("SharedNothingBackupActivation..start");
synchronized (activeMQServer) {
activeMQServer.setState(ActiveMQServerImpl.SERVER_STATE.STARTED);
}
@@ -109,16 +111,24 @@ public final class SharedNothingBackupActivation extends Activation {
activeMQServer.moveServerData(replicaPolicy.getMaxSavedReplicatedJournalsSize());
activeMQServer.getNodeManager().start();
synchronized (this) {
- if (closed)
+ if (closed) {
+ logger.trace("SharedNothingBackupActivation is closed, ignoring activation!");
return;
+ }
}
boolean scalingDown = replicaPolicy.getScaleDownPolicy() != null && replicaPolicy.getScaleDownPolicy().isEnabled();
- if (!activeMQServer.initialisePart1(scalingDown))
+ if (!activeMQServer.initialisePart1(scalingDown)) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("could not initialize part1 " + scalingDown);
+ }
return;
+ }
+ logger.trace("Waiting for a synchronize now...");
synchronized (this) {
+ logger.trace("Entered a synchronized");
if (closed)
return;
backupQuorum = new SharedNothingBackupQuorum(activeMQServer.getStorageManager(), activeMQServer.getNodeManager(), activeMQServer.getScheduledPool(), networkHealthCheck, replicaPolicy.getQuorumSize());
@@ -136,16 +146,12 @@ public final class SharedNothingBackupActivation extends Activation {
ClusterController clusterController = activeMQServer.getClusterManager().getClusterController();
clusterController.addClusterTopologyListenerForReplication(nodeLocator);
- if (logger.isTraceEnabled()) {
- logger.trace("Waiting on cluster connection");
- }
- //todo do we actually need to wait?
+ logger.trace("Waiting on cluster connection");
clusterController.awaitConnectionToReplicationCluster();
- if (logger.isTraceEnabled()) {
- logger.trace("Cluster Connected");
- }
- clusterController.addIncomingInterceptorForReplication(new ReplicationError(activeMQServer, nodeLocator));
+ logger.trace("Cluster Connected");
+
+ clusterController.addIncomingInterceptorForReplication(new ReplicationError(nodeLocator));
// nodeManager.startBackup();
if (logger.isTraceEnabled()) {
@@ -320,13 +326,19 @@ public final class SharedNothingBackupActivation extends Activation {
return;
}
ActiveMQServerLogger.LOGGER.becomingLive(activeMQServer);
+ logger.trace("stop backup");
activeMQServer.getNodeManager().stopBackup();
+ logger.trace("start store manager");
activeMQServer.getStorageManager().start();
+ logger.trace("activated");
activeMQServer.getBackupManager().activated();
if (scalingDown) {
+ logger.trace("Scalling down...");
activeMQServer.initialisePart2(true);
} else {
+ logger.trace("Setting up new activation");
activeMQServer.setActivation(new SharedNothingLiveActivation(activeMQServer, replicaPolicy.getReplicatedPolicy()));
+ logger.trace("initialize part 2");
activeMQServer.initialisePart2(false);
if (activeMQServer.getIdentity() != null) {
@@ -337,6 +349,8 @@ public final class SharedNothingBackupActivation extends Activation {
}
+ logger.trace("completeActivation at the end");
+
activeMQServer.completeActivation();
}
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/387fca58/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
index 1b35393..a95f77a 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
@@ -534,7 +534,11 @@ public abstract class ActiveMQTestBase extends Assert {
for (String c : connectors) {
connectors0.add(c);
}
- ClusterConnectionConfiguration clusterConnectionConfiguration = new ClusterConnectionConfiguration().setName("cluster1").setAddress("jms").setConnectorName(connectorName).setRetryInterval(1000).setDuplicateDetection(false).setMaxHops(1).setConfirmationWindowSize(1).setMessageLoadBalancingType(MessageLoadBalancingType.STRICT).setStaticConnectors(connectors0);
+ ClusterConnectionConfiguration clusterConnectionConfiguration = new ClusterConnectionConfiguration().
+ setName("cluster1").setAddress("jms").setConnectorName(connectorName).
+ setRetryInterval(1000).setDuplicateDetection(false).setMaxHops(1).
+ setConfirmationWindowSize(1).setMessageLoadBalancingType(MessageLoadBalancingType.STRICT).
+ setStaticConnectors(connectors0);
return clusterConnectionConfiguration;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/387fca58/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LargeMessageFailoverTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LargeMessageFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LargeMessageFailoverTest.java
index f192506..8889ec5 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LargeMessageFailoverTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LargeMessageFailoverTest.java
@@ -18,22 +18,23 @@ package org.apache.activemq.artemis.tests.integration.cluster.failover;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
+import org.junit.Ignore;
import org.junit.Test;
public class LargeMessageFailoverTest extends FailoverTest {
@Override
@Test
+ @Ignore
public void testLiveAndBackupLiveComesBackNewFactory() throws Exception {
// skip test because it triggers OutOfMemoryError.
- Thread.sleep(1000);
}
@Override
@Test
+ @Ignore
public void testLiveAndBackupBackupComesBackNewFactory() throws Exception {
// skip test because it triggers OutOfMemoryError.
- Thread.sleep(1000);
}
/**
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/387fca58/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedMultipleServerFailoverTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedMultipleServerFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedMultipleServerFailoverTest.java
index 38bf424..383b371 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedMultipleServerFailoverTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedMultipleServerFailoverTest.java
@@ -136,7 +136,7 @@ public class ReplicatedMultipleServerFailoverTest extends MultipleServerFailover
@Override
public boolean isNetty() {
- return false;
+ return true;
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/387fca58/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/TransportConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/TransportConfigurationUtils.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/TransportConfigurationUtils.java
index 472d327..abd08b8 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/TransportConfigurationUtils.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/TransportConfigurationUtils.java
@@ -86,7 +86,7 @@ public final class TransportConfigurationUtils {
private static TransportConfiguration transportConfiguration(String classname, boolean live, int server) {
if (classname.contains("netty")) {
Map<String, Object> serverParams = new HashMap<>();
- Integer port = live ? 61616 : 5545;
+ Integer port = live ? 61616 + server : 5545 + server;
serverParams.put(org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME, port);
return new TransportConfiguration(classname, serverParams);
}
@@ -102,7 +102,7 @@ public final class TransportConfigurationUtils {
String name) {
if (classname.contains("netty")) {
Map<String, Object> serverParams = new HashMap<>();
- Integer port = live ? 61616 : 5545;
+ Integer port = live ? 61616 + server : 5545 + server;
serverParams.put(org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME, port);
return new TransportConfiguration(classname, serverParams, name);
}
[2/2] activemq-artemis git commit: This closes #1382
Posted by cl...@apache.org.
This closes #1382
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/89a53ad0
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/89a53ad0
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/89a53ad0
Branch: refs/heads/master
Commit: 89a53ad015f03931d71d52bd9c0b99e501884fcf
Parents: c7af954 387fca5
Author: Clebert Suconic <cl...@apache.org>
Authored: Sat Jul 1 10:38:49 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Sat Jul 1 10:38:49 2017 -0400
----------------------------------------------------------------------
.../core/client/ActiveMQClientLogger.java | 2 +-
.../core/protocol/core/impl/ChannelImpl.java | 10 +++
.../core/impl/RemotingConnectionImpl.java | 2 +-
.../ReplicationResponseMessageV2.java | 3 +-
.../core/replication/ReplicationEndpoint.java | 74 +++++++++++++++++---
.../core/replication/ReplicationManager.java | 7 +-
.../core/server/impl/ReplicationError.java | 12 ++--
.../impl/SharedNothingBackupActivation.java | 34 ++++++---
.../artemis/tests/util/ActiveMQTestBase.java | 6 +-
.../failover/LargeMessageFailoverTest.java | 5 +-
.../ReplicatedMultipleServerFailoverTest.java | 2 +-
.../tests/util/TransportConfigurationUtils.java | 4 +-
12 files changed, 127 insertions(+), 34 deletions(-)
----------------------------------------------------------------------