You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/07/01 14:39:00 UTC

[1/2] activemq-artemis git commit: ARTEMIS-1269 replication won't finish synchronization

Repository: activemq-artemis
Updated Branches:
  refs/heads/master c7af95447 -> 89a53ad01


ARTEMIS-1269 replication won't finish synchronization


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/387fca58
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/387fca58
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/387fca58

Branch: refs/heads/master
Commit: 387fca584e7cb66656f23b29a24169bb2fb8fc67
Parents: c7af954
Author: Clebert Suconic <cl...@apache.org>
Authored: Fri Jun 30 20:19:43 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Sat Jul 1 00:48:33 2017 -0400

----------------------------------------------------------------------
 .../core/client/ActiveMQClientLogger.java       |  2 +-
 .../core/protocol/core/impl/ChannelImpl.java    | 10 +++
 .../core/impl/RemotingConnectionImpl.java       |  2 +-
 .../ReplicationResponseMessageV2.java           |  3 +-
 .../core/replication/ReplicationEndpoint.java   | 74 +++++++++++++++++---
 .../core/replication/ReplicationManager.java    |  7 +-
 .../core/server/impl/ReplicationError.java      | 12 ++--
 .../impl/SharedNothingBackupActivation.java     | 34 ++++++---
 .../artemis/tests/util/ActiveMQTestBase.java    |  6 +-
 .../failover/LargeMessageFailoverTest.java      |  5 +-
 .../ReplicatedMultipleServerFailoverTest.java   |  2 +-
 .../tests/util/TransportConfigurationUtils.java |  4 +-
 12 files changed, 127 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/387fca58/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java
index bdb4bd1..405ed07 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java
@@ -374,7 +374,7 @@ public interface ActiveMQClientLogger extends BasicLogger {
 
    @LogMessage(level = Logger.Level.ERROR)
    @Message(id = 214013, value = "Failed to decode packet", format = Message.Format.MESSAGE_FORMAT)
-   void errorDecodingPacket(@Cause Exception e);
+   void errorDecodingPacket(@Cause Throwable e);
 
    @LogMessage(level = Logger.Level.ERROR)
    @Message(id = 214014, value = "Failed to execute failure listener", format = Message.Format.MESSAGE_FORMAT)

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/387fca58/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
index 75c23de..39bddf5 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
@@ -462,6 +462,10 @@ public final class ChannelImpl implements Channel {
 
    @Override
    public void setHandler(final ChannelHandler handler) {
+      if (logger.isTraceEnabled()) {
+         logger.trace("Setting handler on " + this + " as " + handler);
+      }
+
       this.handler = handler;
    }
 
@@ -521,6 +525,9 @@ public final class ChannelImpl implements Channel {
 
    @Override
    public void lock() {
+      if (logger.isTraceEnabled()) {
+         logger.trace("lock channel " + this);
+      }
       lock.lock();
 
       reconnectID.incrementAndGet();
@@ -532,6 +539,9 @@ public final class ChannelImpl implements Channel {
 
    @Override
    public void unlock() {
+      if (logger.isTraceEnabled()) {
+         logger.trace("unlock channel " + this);
+      }
       lock.lock();
 
       failingOver = false;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/387fca58/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
index cc1d685..e0837e9 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
@@ -363,7 +363,7 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
          doBufferReceived(packet);
 
          super.bufferReceived(connectionID, buffer);
-      } catch (Exception e) {
+      } catch (Throwable e) {
          ActiveMQClientLogger.LOGGER.errorDecodingPacket(e);
          throw new IllegalStateException(e);
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/387fca58/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessageV2.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessageV2.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessageV2.java
index b26084b..c01dd4f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessageV2.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessageV2.java
@@ -38,8 +38,9 @@ public final class ReplicationResponseMessageV2 extends ReplicationResponseMessa
       return synchronizationIsFinishedAcknowledgement;
    }
 
-   public void setSynchronizationIsFinishedAcknowledgement(boolean synchronizationIsFinishedAcknowledgement) {
+   public ReplicationResponseMessageV2 setSynchronizationIsFinishedAcknowledgement(boolean synchronizationIsFinishedAcknowledgement) {
       this.synchronizationIsFinishedAcknowledgement = synchronizationIsFinishedAcknowledgement;
+      return this;
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/387fca58/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
index f879aeb..a68c3f9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
@@ -210,7 +210,16 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
          ActiveMQServerLogger.LOGGER.errorHandlingReplicationPacket(e, packet);
          response = new ActiveMQExceptionMessage(ActiveMQMessageBundle.BUNDLE.replicationUnhandledError(e));
       }
-      channel.send(response);
+
+      if (response != null) {
+         if (logger.isTraceEnabled()) {
+            logger.trace("Returning " + response);
+         }
+
+         channel.send(response);
+      } else {
+         logger.trace("Response is null, ignoring response");
+      }
    }
 
    /**
@@ -332,34 +341,68 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
 
    private void finishSynchronization(String liveID) throws Exception {
       if (logger.isTraceEnabled()) {
-         logger.trace("finishSynchronization::" + liveID);
+         logger.trace("BACKUP-SYNC-START: finishSynchronization::" + liveID);
       }
       for (JournalContent jc : EnumSet.allOf(JournalContent.class)) {
          Journal journal = journalsHolder.remove(jc);
+         if (logger.isTraceEnabled()) {
+            logger.trace("getting lock on " + jc + ", journal = " + journal);
+         }
+         registerJournal(jc.typeByte, journal);
          journal.synchronizationLock();
          try {
+            if (logger.isTraceEnabled()) {
+               logger.trace("lock acquired on " + jc);
+            }
             // files should be already in place.
             filesReservedForSync.remove(jc);
-            registerJournal(jc.typeByte, journal);
+            if (logger.isTraceEnabled()) {
+               logger.trace("stopping journal for " + jc);
+            }
             journal.stop();
+            if (logger.isTraceEnabled()) {
+               logger.trace("starting journal for " + jc);
+            }
             journal.start();
+            if (logger.isTraceEnabled()) {
+               logger.trace("loadAndSync " + jc);
+            }
             journal.loadSyncOnly(JournalState.SYNCING_UP_TO_DATE);
          } finally {
+            if (logger.isTraceEnabled()) {
+               logger.trace("unlocking " + jc);
+            }
             journal.synchronizationUnlock();
          }
       }
+
+      if (logger.isTraceEnabled()) {
+         logger.trace("Sync on large messages...");
+      }
       ByteBuffer buffer = ByteBuffer.allocate(4 * 1024);
       for (Entry<Long, ReplicatedLargeMessage> entry : largeMessages.entrySet()) {
          ReplicatedLargeMessage lm = entry.getValue();
          if (lm instanceof LargeServerMessageInSync) {
             LargeServerMessageInSync lmSync = (LargeServerMessageInSync) lm;
+            if (logger.isTraceEnabled()) {
+               logger.trace("lmSync on " + lmSync.toString());
+            }
             lmSync.joinSyncedData(buffer);
          }
       }
 
+      if (logger.isTraceEnabled()) {
+         logger.trace("setRemoteBackupUpToDate and liveIDSet for " + liveID);
+      }
+
       journalsHolder = null;
       backupQuorum.liveIDSet(liveID);
       activation.setRemoteBackupUpToDate();
+
+      if (logger.isTraceEnabled()) {
+         logger.trace("Backup is synchronized / BACKUP-SYNC-DONE");
+      }
+
       ActiveMQServerLogger.LOGGER.backupServerSynched(server);
       return;
    }
@@ -428,13 +471,28 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
       if (logger.isTraceEnabled()) {
          logger.trace("handleStartReplicationSynchronization:: nodeID = " + packet);
       }
-      ReplicationResponseMessageV2 replicationResponseMessage = new ReplicationResponseMessageV2();
-      if (!started)
-         return replicationResponseMessage;
 
       if (packet.isSynchronizationFinished()) {
-         finishSynchronization(packet.getNodeID());
-         replicationResponseMessage.setSynchronizationIsFinishedAcknowledgement(true);
+         executor.execute(() -> {
+            try {
+               // this is a long running process, we cannot block the reading thread from netty
+               finishSynchronization(packet.getNodeID());
+               if (logger.isTraceEnabled()) {
+                  logger.trace("returning completion on synchronization catchup");
+               }
+               channel.send(new ReplicationResponseMessageV2().setSynchronizationIsFinishedAcknowledgement(true));
+            } catch (Exception e) {
+               logger.warn(e.getMessage());
+               channel.send(new ActiveMQExceptionMessage(ActiveMQMessageBundle.BUNDLE.replicationUnhandledError(e)));
+            }
+
+         });
+         // the write will happen through an executor
+         return null;
+      }
+
+      ReplicationResponseMessageV2 replicationResponseMessage = new ReplicationResponseMessageV2();
+      if (!started) {
          return replicationResponseMessage;
       }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/387fca58/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
index 398f452..3b6f9d6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
@@ -356,15 +356,16 @@ public final class ReplicationManager implements ActiveMQComponent {
       }
 
       if (enabled) {
-         pendingTokens.add(repliToken);
          if (useExecutor) {
             replicationStream.execute(() -> {
                if (enabled) {
+                  pendingTokens.add(repliToken);
                   flowControl(packet.expectedEncodeSize());
                   replicatingChannel.send(packet);
                }
             });
          } else {
+            pendingTokens.add(repliToken);
             flowControl(packet.expectedEncodeSize());
             replicatingChannel.send(packet);
          }
@@ -411,9 +412,9 @@ public final class ReplicationManager implements ActiveMQComponent {
       OperationContext ctx = pendingTokens.poll();
 
       if (ctx == null) {
-         throw new IllegalStateException("Missing replication token on the queue.");
+         logger.warn("Missing replication token on queue");
+         return;
       }
-
       ctx.replicationDone();
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/387fca58/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationError.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationError.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationError.java
index 7c333a5..83b49c9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationError.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationError.java
@@ -22,10 +22,10 @@ import org.apache.activemq.artemis.api.core.Interceptor;
 import org.apache.activemq.artemis.core.protocol.core.Packet;
 import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.BackupReplicationStartFailedMessage;
-import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.LiveNodeLocator;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.jboss.logging.Logger;
 
 /**
  * Stops the backup in case of an error at the start of Replication.
@@ -36,11 +36,11 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
  */
 final class ReplicationError implements Interceptor {
 
-   private final ActiveMQServer server;
+   private static final Logger logger = Logger.getLogger(ReplicationError.class);
+
    private LiveNodeLocator nodeLocator;
 
-   ReplicationError(ActiveMQServer server, LiveNodeLocator nodeLocator) {
-      this.server = server;
+   ReplicationError(LiveNodeLocator nodeLocator) {
       this.nodeLocator = nodeLocator;
    }
 
@@ -48,6 +48,10 @@ final class ReplicationError implements Interceptor {
    public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException {
       if (packet.getType() != PacketImpl.BACKUP_REGISTRATION_FAILED)
          return true;
+
+      if (logger.isTraceEnabled()) {
+         logger.trace("Received ReplicationError::" + packet);
+      }
       BackupReplicationStartFailedMessage message = (BackupReplicationStartFailedMessage) packet;
       switch (message.getRegistrationProblem()) {
          case ALREADY_REPLICATING:

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/387fca58/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java
index d45abe3..fcba00c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java
@@ -101,6 +101,8 @@ public final class SharedNothingBackupActivation extends Activation {
    @Override
    public void run() {
       try {
+
+         logger.trace("SharedNothingBackupActivation..start");
          synchronized (activeMQServer) {
             activeMQServer.setState(ActiveMQServerImpl.SERVER_STATE.STARTED);
          }
@@ -109,16 +111,24 @@ public final class SharedNothingBackupActivation extends Activation {
          activeMQServer.moveServerData(replicaPolicy.getMaxSavedReplicatedJournalsSize());
          activeMQServer.getNodeManager().start();
          synchronized (this) {
-            if (closed)
+            if (closed) {
+               logger.trace("SharedNothingBackupActivation is closed, ignoring activation!");
                return;
+            }
          }
 
          boolean scalingDown = replicaPolicy.getScaleDownPolicy() != null && replicaPolicy.getScaleDownPolicy().isEnabled();
 
-         if (!activeMQServer.initialisePart1(scalingDown))
+         if (!activeMQServer.initialisePart1(scalingDown)) {
+            if (logger.isTraceEnabled()) {
+               logger.trace("could not initialize part1 " + scalingDown);
+            }
             return;
+         }
 
+         logger.trace("Waiting for a synchronize now...");
          synchronized (this) {
+            logger.trace("Entered a synchronized");
             if (closed)
                return;
             backupQuorum = new SharedNothingBackupQuorum(activeMQServer.getStorageManager(), activeMQServer.getNodeManager(), activeMQServer.getScheduledPool(), networkHealthCheck, replicaPolicy.getQuorumSize());
@@ -136,16 +146,12 @@ public final class SharedNothingBackupActivation extends Activation {
          ClusterController clusterController = activeMQServer.getClusterManager().getClusterController();
          clusterController.addClusterTopologyListenerForReplication(nodeLocator);
 
-         if (logger.isTraceEnabled()) {
-            logger.trace("Waiting on cluster connection");
-         }
-         //todo do we actually need to wait?
+         logger.trace("Waiting on cluster connection");
          clusterController.awaitConnectionToReplicationCluster();
 
-         if (logger.isTraceEnabled()) {
-            logger.trace("Cluster Connected");
-         }
-         clusterController.addIncomingInterceptorForReplication(new ReplicationError(activeMQServer, nodeLocator));
+         logger.trace("Cluster Connected");
+
+         clusterController.addIncomingInterceptorForReplication(new ReplicationError(nodeLocator));
 
          // nodeManager.startBackup();
          if (logger.isTraceEnabled()) {
@@ -320,13 +326,19 @@ public final class SharedNothingBackupActivation extends Activation {
                return;
             }
             ActiveMQServerLogger.LOGGER.becomingLive(activeMQServer);
+            logger.trace("stop backup");
             activeMQServer.getNodeManager().stopBackup();
+            logger.trace("start store manager");
             activeMQServer.getStorageManager().start();
+            logger.trace("activated");
             activeMQServer.getBackupManager().activated();
             if (scalingDown) {
+               logger.trace("Scalling down...");
                activeMQServer.initialisePart2(true);
             } else {
+               logger.trace("Setting up new activation");
                activeMQServer.setActivation(new SharedNothingLiveActivation(activeMQServer, replicaPolicy.getReplicatedPolicy()));
+               logger.trace("initialize part 2");
                activeMQServer.initialisePart2(false);
 
                if (activeMQServer.getIdentity() != null) {
@@ -337,6 +349,8 @@ public final class SharedNothingBackupActivation extends Activation {
 
             }
 
+            logger.trace("completeActivation at the end");
+
             activeMQServer.completeActivation();
          }
       } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/387fca58/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
index 1b35393..a95f77a 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
@@ -534,7 +534,11 @@ public abstract class ActiveMQTestBase extends Assert {
       for (String c : connectors) {
          connectors0.add(c);
       }
-      ClusterConnectionConfiguration clusterConnectionConfiguration = new ClusterConnectionConfiguration().setName("cluster1").setAddress("jms").setConnectorName(connectorName).setRetryInterval(1000).setDuplicateDetection(false).setMaxHops(1).setConfirmationWindowSize(1).setMessageLoadBalancingType(MessageLoadBalancingType.STRICT).setStaticConnectors(connectors0);
+      ClusterConnectionConfiguration clusterConnectionConfiguration = new ClusterConnectionConfiguration().
+         setName("cluster1").setAddress("jms").setConnectorName(connectorName).
+         setRetryInterval(1000).setDuplicateDetection(false).setMaxHops(1).
+         setConfirmationWindowSize(1).setMessageLoadBalancingType(MessageLoadBalancingType.STRICT).
+         setStaticConnectors(connectors0);
 
       return clusterConnectionConfiguration;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/387fca58/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LargeMessageFailoverTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LargeMessageFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LargeMessageFailoverTest.java
index f192506..8889ec5 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LargeMessageFailoverTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LargeMessageFailoverTest.java
@@ -18,22 +18,23 @@ package org.apache.activemq.artemis.tests.integration.cluster.failover;
 
 import org.apache.activemq.artemis.api.core.client.ClientMessage;
 import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
+import org.junit.Ignore;
 import org.junit.Test;
 
 public class LargeMessageFailoverTest extends FailoverTest {
 
    @Override
    @Test
+   @Ignore
    public void testLiveAndBackupLiveComesBackNewFactory() throws Exception {
       // skip test because it triggers OutOfMemoryError.
-      Thread.sleep(1000);
    }
 
    @Override
    @Test
+   @Ignore
    public void testLiveAndBackupBackupComesBackNewFactory() throws Exception {
       // skip test because it triggers OutOfMemoryError.
-      Thread.sleep(1000);
    }
 
    /**

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/387fca58/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedMultipleServerFailoverTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedMultipleServerFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedMultipleServerFailoverTest.java
index 38bf424..383b371 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedMultipleServerFailoverTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedMultipleServerFailoverTest.java
@@ -136,7 +136,7 @@ public class ReplicatedMultipleServerFailoverTest extends MultipleServerFailover
 
    @Override
    public boolean isNetty() {
-      return false;
+      return true;
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/387fca58/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/TransportConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/TransportConfigurationUtils.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/TransportConfigurationUtils.java
index 472d327..abd08b8 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/TransportConfigurationUtils.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/TransportConfigurationUtils.java
@@ -86,7 +86,7 @@ public final class TransportConfigurationUtils {
    private static TransportConfiguration transportConfiguration(String classname, boolean live, int server) {
       if (classname.contains("netty")) {
          Map<String, Object> serverParams = new HashMap<>();
-         Integer port = live ? 61616 : 5545;
+         Integer port = live ? 61616 + server : 5545 + server;
          serverParams.put(org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME, port);
          return new TransportConfiguration(classname, serverParams);
       }
@@ -102,7 +102,7 @@ public final class TransportConfigurationUtils {
                                                                 String name) {
       if (classname.contains("netty")) {
          Map<String, Object> serverParams = new HashMap<>();
-         Integer port = live ? 61616 : 5545;
+         Integer port = live ? 61616 + server : 5545 + server;
          serverParams.put(org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME, port);
          return new TransportConfiguration(classname, serverParams, name);
       }


[2/2] activemq-artemis git commit: This closes #1382

Posted by cl...@apache.org.
This closes #1382


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/89a53ad0
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/89a53ad0
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/89a53ad0

Branch: refs/heads/master
Commit: 89a53ad015f03931d71d52bd9c0b99e501884fcf
Parents: c7af954 387fca5
Author: Clebert Suconic <cl...@apache.org>
Authored: Sat Jul 1 10:38:49 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Sat Jul 1 10:38:49 2017 -0400

----------------------------------------------------------------------
 .../core/client/ActiveMQClientLogger.java       |  2 +-
 .../core/protocol/core/impl/ChannelImpl.java    | 10 +++
 .../core/impl/RemotingConnectionImpl.java       |  2 +-
 .../ReplicationResponseMessageV2.java           |  3 +-
 .../core/replication/ReplicationEndpoint.java   | 74 +++++++++++++++++---
 .../core/replication/ReplicationManager.java    |  7 +-
 .../core/server/impl/ReplicationError.java      | 12 ++--
 .../impl/SharedNothingBackupActivation.java     | 34 ++++++---
 .../artemis/tests/util/ActiveMQTestBase.java    |  6 +-
 .../failover/LargeMessageFailoverTest.java      |  5 +-
 .../ReplicatedMultipleServerFailoverTest.java   |  2 +-
 .../tests/util/TransportConfigurationUtils.java |  4 +-
 12 files changed, 127 insertions(+), 34 deletions(-)
----------------------------------------------------------------------