You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/04/15 03:14:48 UTC

[2/6] activemq-artemis git commit: ARTEMIS-484 Large Message Loss on Initial replication

ARTEMIS-484 Large Message Loss on Initial replication

https://issues.apache.org/jira/browse/ARTEMIS-484

The File copy after the initial synchronization on large messages was broken.
On this commit we fix how the buffer is cleaned up before each read since
a previously unfinished body read would make the buffer dirty.

I'm keeping also lots of Traces I have added to debug this issue, so they will
be useful if anything like this happens again.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/d6c7e305
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/d6c7e305
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/d6c7e305

Branch: refs/heads/master
Commit: d6c7e30594cd6620ad05cbbd247421e07793ee77
Parents: 3ecd8b7
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Apr 14 13:24:46 2016 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Apr 14 18:55:01 2016 -0400

----------------------------------------------------------------------
 .../core/client/impl/ClientMessageImpl.java     |   2 +-
 .../jms/client/ActiveMQJMSClientLogger.java     |   8 +-
 .../jms/client/ActiveMQMessageConsumer.java     |  14 +-
 .../jms/client/ActiveMQQueueBrowser.java        |   2 +-
 .../jms/client/JMSMessageListenerWrapper.java   |   2 +-
 .../artemis/core/io/aio/AIOSequentialFile.java  |  10 +-
 .../artemis/core/io/util/FileIOUtil.java        |  84 +++++++++
 .../artemis/core/io/aio/FileIOUtilTest.java     |  87 ++++++++++
 .../impl/journal/LargeServerMessageInSync.java  |  51 ++++--
 .../ReplicationLargeMessageBeginMessage.java    |   7 +
 .../ReplicationLargeMessageEndMessage.java      |   7 +
 .../ReplicationLargeMessageWriteMessage.java    |   8 +
 .../core/replication/ReplicationEndpoint.java   | 172 ++++++++-----------
 .../core/replication/ReplicationManager.java    |  12 +-
 14 files changed, 344 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d6c7e305/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java
index 31d9aad..926ac1b 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.java
@@ -141,7 +141,7 @@ public class ClientMessageImpl extends MessageImpl implements ClientMessageInter
 
    @Override
    public String toString() {
-      return "ClientMessage[messageID=" + messageID + ", durable=" + durable + ", address=" + getAddress() + ",userID=" + (getUserID() != null ? getUserID() : "null") + ",properties=" + properties.toString() + "]";
+      return getClass().getSimpleName() + "[messageID=" + messageID + ", durable=" + durable + ", address=" + getAddress() + ",userID=" + (getUserID() != null ? getUserID() : "null") + ",properties=" + properties.toString() + "]";
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d6c7e305/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQJMSClientLogger.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQJMSClientLogger.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQJMSClientLogger.java
index 7eb56cd..3f11696 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQJMSClientLogger.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQJMSClientLogger.java
@@ -58,12 +58,12 @@ public interface ActiveMQJMSClientLogger extends BasicLogger {
    void errorCallingExcListener(@Cause Exception e);
 
    @LogMessage(level = Logger.Level.ERROR)
-   @Message(id = 124002, value = "Queue Browser failed to create message", format = Message.Format.MESSAGE_FORMAT)
-   void errorCreatingMessage(@Cause Throwable e);
+   @Message(id = 124002, value = "Queue Browser failed to create message {0}", format = Message.Format.MESSAGE_FORMAT)
+   void errorCreatingMessage(String messageToString, @Cause Throwable e);
 
    @LogMessage(level = Logger.Level.ERROR)
-   @Message(id = 124003, value = "Message Listener failed to prepare message for receipt", format = Message.Format.MESSAGE_FORMAT)
-   void errorPreparingMessageForReceipt(@Cause Throwable e);
+   @Message(id = 124003, value = "Message Listener failed to prepare message for receipt, message={0}", format = Message.Format.MESSAGE_FORMAT)
+   void errorPreparingMessageForReceipt(String messagetoString, @Cause Throwable e);
 
    @LogMessage(level = Logger.Level.ERROR)
    @Message(id = 124004, value = "Message Listener failed to process message", format = Message.Format.MESSAGE_FORMAT)

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d6c7e305/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java
index 3f47209..04e4f41 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java
@@ -32,6 +32,7 @@ import org.apache.activemq.artemis.api.core.client.ClientConsumer;
 import org.apache.activemq.artemis.api.core.client.ClientMessage;
 import org.apache.activemq.artemis.api.core.client.MessageHandler;
 import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants;
+import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
 
 /**
  * ActiveMQ Artemis implementation of a JMS MessageConsumer.
@@ -211,7 +212,18 @@ public final class ActiveMQMessageConsumer implements QueueReceiver, TopicSubscr
             boolean needSession = ackMode == Session.CLIENT_ACKNOWLEDGE || ackMode == ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE;
             jmsMsg = ActiveMQMessage.createMessage(coreMessage, needSession ? session.getCoreSession() : null);
 
-            jmsMsg.doBeforeReceive();
+            try {
+               jmsMsg.doBeforeReceive();
+            }
+            catch (IndexOutOfBoundsException ioob) {
+               // In case this exception happen you will need to know where it happened.
+               // it has been a bug here in the past, and this was used to debug it.
+               // nothing better than keep it for future investigations in case it happened again
+               IndexOutOfBoundsException newIOOB = new IndexOutOfBoundsException(ioob.getMessage() + "@" + jmsMsg.getCoreMessage());
+               newIOOB.initCause(ioob);
+               ActiveMQClientLogger.LOGGER.warn(newIOOB.getMessage(), newIOOB);
+               throw ioob;
+            }
 
             // We Do the ack after doBeforeRecive, as in the case of large messages, this may fail so we don't want messages redelivered
             // https://issues.jboss.org/browse/JBPAPP-6110

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d6c7e305/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueueBrowser.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueueBrowser.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueueBrowser.java
index 5022fcd..4cf34ea 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueueBrowser.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQQueueBrowser.java
@@ -141,7 +141,7 @@ public final class ActiveMQQueueBrowser implements QueueBrowser {
                msg.doBeforeReceive();
             }
             catch (Exception e) {
-               ActiveMQJMSClientLogger.LOGGER.errorCreatingMessage(e);
+               ActiveMQJMSClientLogger.LOGGER.errorCreatingMessage(msg.getCoreMessage().toString(), e);
 
                return null;
             }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d6c7e305/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java
index 0d831f0..ab62dbc 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java
@@ -73,7 +73,7 @@ public class JMSMessageListenerWrapper implements MessageHandler {
          msg.doBeforeReceive();
       }
       catch (Exception e) {
-         ActiveMQJMSClientLogger.LOGGER.errorPreparingMessageForReceipt(e);
+         ActiveMQJMSClientLogger.LOGGER.errorPreparingMessageForReceipt(msg.getCoreMessage().toString(), e);
 
          return;
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d6c7e305/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java
index efeeb2e..1a109cb 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java
@@ -32,6 +32,7 @@ import org.apache.activemq.artemis.core.io.DummyCallback;
 import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback;
 import org.apache.activemq.artemis.jlibaio.LibaioFile;
+import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
 import org.apache.activemq.artemis.utils.ReusableLatch;
 
 public class AIOSequentialFile extends AbstractSequentialFile {
@@ -202,7 +203,14 @@ public class AIOSequentialFile extends AbstractSequentialFile {
     */
    @Override
    public void writeDirect(final ByteBuffer bytes, final boolean sync, final IOCallback callback) {
-      checkOpened();
+      try {
+         checkOpened();
+      }
+      catch (Exception e) {
+         ActiveMQJournalLogger.LOGGER.warn(e.getMessage(), e);
+         callback.onError(-1, e.getMessage());
+         return;
+      }
 
       final int bytesToWrite = factory.calculateBlockSize(bytes.limit());
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d6c7e305/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/util/FileIOUtil.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/util/FileIOUtil.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/util/FileIOUtil.java
new file mode 100644
index 0000000..f9d3ab7
--- /dev/null
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/util/FileIOUtil.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.io.util;
+
+import java.nio.ByteBuffer;
+
+import org.apache.activemq.artemis.core.io.SequentialFile;
+import org.jboss.logging.Logger;
+
+public class FileIOUtil {
+
+   private static final Logger logger = Logger.getLogger(Logger.class);
+   private static final boolean isTrace = logger.isTraceEnabled();
+
+   public static void copyData(SequentialFile from, SequentialFile to, ByteBuffer buffer) throws Exception {
+
+      boolean fromIsOpen = from.isOpen();
+      boolean toIsOpen = to.isOpen();
+
+      from.close();
+      from.open();
+
+      if (!toIsOpen) {
+         to.open();
+      }
+
+      to.position(to.size());
+
+      from.position(0);
+
+      try {
+         for (;;) {
+            // The buffer is reused...
+            // We need to make sure we clear the limits and the buffer before reusing it
+            buffer.clear();
+            int bytesRead = from.read(buffer);
+
+            if (isTrace) {
+               logger.trace("appending " + bytesRead + " bytes on " + to.getFileName());
+            }
+
+            if (bytesRead > 0) {
+               to.writeDirect(buffer, false);
+            }
+
+            if (bytesRead < buffer.capacity()) {
+               logger.trace("Interrupting reading as the whole thing was sent on " + to.getFileName());
+               break;
+            }
+         }
+      }
+      finally {
+         if (!fromIsOpen) {
+            from.close();
+         }
+         else {
+            from.position(from.size());
+         }
+         if (!toIsOpen) {
+            to.close();
+         }
+         else {
+            to.position(to.size());
+         }
+      }
+
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d6c7e305/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/aio/FileIOUtilTest.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/aio/FileIOUtilTest.java b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/aio/FileIOUtilTest.java
new file mode 100644
index 0000000..0e3d7d7
--- /dev/null
+++ b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/aio/FileIOUtilTest.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.io.aio;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+
+import org.apache.activemq.artemis.core.io.SequentialFile;
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
+import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
+import org.apache.activemq.artemis.core.io.util.FileIOUtil;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class FileIOUtilTest {
+
+   @Rule
+   public TemporaryFolder temporaryFolder;
+
+   public FileIOUtilTest() {
+      File parent = new File("./target");
+      parent.mkdirs();
+      temporaryFolder = new TemporaryFolder(parent);
+   }
+
+   @Test
+   public void testCopy() throws Exception {
+      System.out.println("Data at " + temporaryFolder.getRoot());
+      SequentialFileFactory factory = new NIOSequentialFileFactory(temporaryFolder.getRoot(), 100);
+      SequentialFile file = factory.createSequentialFile("file1.bin");
+      file.open();
+
+      ByteBuffer buffer = ByteBuffer.allocate(204800);
+      buffer.put(new byte[204800]);
+      buffer.rewind();
+      file.writeDirect(buffer, true);
+
+      buffer = ByteBuffer.allocate(409605);
+      buffer.put(new byte[409605]);
+      buffer.rewind();
+
+      SequentialFile file2 = factory.createSequentialFile("file2.bin");
+
+      file2.open();
+      file2.writeDirect(buffer, true);
+
+
+      // This is allocating a reusable buffer to perform the copy, just like it's used within LargeMessageInSync
+      buffer = ByteBuffer.allocate(4 * 1024);
+
+      SequentialFile newFile = factory.createSequentialFile("file1.cop");
+      FileIOUtil.copyData(file, newFile, buffer);
+
+      SequentialFile newFile2 = factory.createSequentialFile("file2.cop");
+      FileIOUtil.copyData(file2, newFile2, buffer);
+
+      Assert.assertEquals(file.size(), newFile.size());
+      Assert.assertEquals(file2.size(), newFile2.size());
+
+      newFile.close();
+      newFile2.close();
+      file.close();
+      file2.close();
+
+      System.out.println("Test result::");
+
+   }
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d6c7e305/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageInSync.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageInSync.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageInSync.java
index f264415..274abeb 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageInSync.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageInSync.java
@@ -21,14 +21,19 @@ import java.nio.ByteBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.core.io.SequentialFile;
+import org.apache.activemq.artemis.core.io.util.FileIOUtil;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.persistence.StorageManager.LargeMessageExtension;
 import org.apache.activemq.artemis.core.replication.ReplicatedLargeMessage;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.LargeServerMessage;
+import org.jboss.logging.Logger;
 
 public final class LargeServerMessageInSync implements ReplicatedLargeMessage {
 
+   private static final Logger logger = Logger.getLogger(LargeServerMessageInSync.class);
+   private static final boolean isTrace = logger.isTraceEnabled();
+
    private final LargeServerMessage mainLM;
    private final StorageManager storageManager;
    private SequentialFile appendFile;
@@ -50,20 +55,33 @@ public final class LargeServerMessageInSync implements ReplicatedLargeMessage {
       if (!mainSeqFile.isOpen()) {
          mainSeqFile.open();
       }
-      if (appendFile != null) {
-         appendFile.close();
-         appendFile.open();
-         for (;;) {
-            buffer.rewind();
-            int bytesRead = appendFile.read(buffer);
-            if (bytesRead > 0)
-               mainSeqFile.writeDirect(buffer, false);
-            if (bytesRead < buffer.capacity()) {
-               break;
+
+      try {
+         if (appendFile != null) {
+            if (isTrace) {
+               logger.trace("joinSyncedData on " + mainLM + ", currentSize on mainMessage=" + mainSeqFile.size() + ", appendFile size = " + appendFile.size());
+            }
+
+            FileIOUtil.copyData(appendFile, mainSeqFile, buffer);
+            deleteAppendFile();
+         }
+         else {
+            if (isTrace) {
+               logger.trace("joinSyncedData, appendFile is null, ignoring joinSyncedData on " + mainLM);
             }
          }
-         deleteAppendFile();
       }
+      catch (Throwable e) {
+         logger.warn("Error while sincing data on largeMessageInSync::" + mainLM);
+      }
+
+
+      if (isTrace) {
+         logger.trace("joinedSyncData on " + mainLM + " finished with " + mainSeqFile.size());
+      }
+
+
+
       syncDone = true;
    }
 
@@ -85,6 +103,9 @@ public final class LargeServerMessageInSync implements ReplicatedLargeMessage {
 
    @Override
    public synchronized void releaseResources() {
+      if (isTrace) {
+         logger.warn("release resources called on " + mainLM, new Exception("trace"));
+      }
       mainLM.releaseResources();
       if (appendFile != null && appendFile.isOpen()) {
          try {
@@ -122,11 +143,19 @@ public final class LargeServerMessageInSync implements ReplicatedLargeMessage {
    public synchronized void addBytes(byte[] bytes) throws Exception {
       if (deleted)
          return;
+
       if (syncDone) {
+         if (isTrace) {
+            logger.trace("Adding " + bytes.length + " towards sync message::" + mainLM);
+         }
          mainLM.addBytes(bytes);
          return;
       }
 
+      if (isTrace) {
+         logger.trace("addBytes(bytes.length=" + bytes.length + ") on message=" + mainLM);
+      }
+
       if (appendFile == null) {
          appendFile = storageManager.createFileForLargeMessage(mainLM.getMessageID(), LargeMessageExtension.SYNC);
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d6c7e305/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageBeginMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageBeginMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageBeginMessage.java
index 0a36a56..20af68c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageBeginMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageBeginMessage.java
@@ -58,6 +58,13 @@ public class ReplicationLargeMessageBeginMessage extends PacketImpl {
    }
 
    @Override
+   public String toString() {
+      return "ReplicationLargeMessageBeginMessage{" +
+         "messageId=" + messageId +
+         '}';
+   }
+
+   @Override
    public boolean equals(Object obj) {
       if (this == obj)
          return true;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d6c7e305/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java
index eea788a..bb77929 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java
@@ -58,6 +58,13 @@ public class ReplicationLargeMessageEndMessage extends PacketImpl {
    }
 
    @Override
+   public String toString() {
+      return "ReplicationLargeMessageEndMessage{" +
+         "messageId=" + messageId +
+         '}';
+   }
+
+   @Override
    public boolean equals(Object obj) {
       if (this == obj)
          return true;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d6c7e305/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageWriteMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageWriteMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageWriteMessage.java
index 0970f05..f60c629 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageWriteMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageWriteMessage.java
@@ -81,6 +81,14 @@ public final class ReplicationLargeMessageWriteMessage extends PacketImpl {
    }
 
    @Override
+   public String toString() {
+      return "ReplicationLargeMessageWriteMessage{" +
+         "messageId=" + messageId +
+         ", body.size=" + body.length +
+         '}';
+   }
+
+   @Override
    public boolean equals(Object obj) {
       if (this == obj)
          return true;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d6c7e305/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
index 7cf5450..3cd5bfd 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
@@ -80,6 +80,7 @@ import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.cluster.qourum.SharedNothingBackupQuorum;
 import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
 import org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivation;
+import org.jboss.logging.Logger;
 
 /**
  * Handles all the synchronization necessary for replication on the backup side (that is the
@@ -87,7 +88,8 @@ import org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivatio
  */
 public final class ReplicationEndpoint implements ChannelHandler, ActiveMQComponent {
 
-   private static final boolean trace = ActiveMQServerLogger.LOGGER.isTraceEnabled();
+   private static final Logger logger = Logger.getLogger(ReplicationEndpoint.class);
+   private static final boolean isTrace = logger.isTraceEnabled();
 
    private final IOCriticalErrorListener criticalErrorListener;
    private final ActiveMQServerImpl server;
@@ -153,11 +155,18 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
 
    @Override
    public void handlePacket(final Packet packet) {
+      if (isTrace) {
+         logger.trace("handlePacket::handling " + packet);
+      }
       PacketImpl response = new ReplicationResponseMessage();
       final byte type = packet.getType();
 
       try {
          if (!started) {
+            if (isTrace) {
+               logger.trace("handlePacket::ignoring " + packet);
+            }
+
             return;
          }
 
@@ -340,56 +349,10 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
       this.channel = channel;
    }
 
-   public void compareJournalInformation(final JournalLoadInformation[] journalInformation) throws ActiveMQException {
-      if (!activation.isRemoteBackupUpToDate()) {
-         throw ActiveMQMessageBundle.BUNDLE.journalsNotInSync();
-      }
-
-      if (journalLoadInformation == null || journalLoadInformation.length != journalInformation.length) {
-         throw ActiveMQMessageBundle.BUNDLE.replicationTooManyJournals();
-      }
-
-      for (int i = 0; i < journalInformation.length; i++) {
-         if (!journalInformation[i].equals(journalLoadInformation[i])) {
-            ActiveMQServerLogger.LOGGER.journalcomparisonMismatch(journalParametersToString(journalInformation));
-            throw ActiveMQMessageBundle.BUNDLE.replicationTooManyJournals();
-         }
-      }
-
-   }
-
-   /**
-    * Used on tests only. To simulate missing page deletes
-    */
-   public void setDeletePages(final boolean deletePages) {
-      this.deletePages = deletePages;
-   }
-
-   /**
-    * @param journalInformation
-    */
-   private String journalParametersToString(final JournalLoadInformation[] journalInformation) {
-      return "**********************************************************\n" + "parameters:\n" +
-         "BindingsImpl = " +
-         journalInformation[0] +
-         "\n" +
-         "Messaging = " +
-         journalInformation[1] +
-         "\n" +
-         "**********************************************************" +
-         "\n" +
-         "Expected:" +
-         "\n" +
-         "BindingsImpl = " +
-         journalLoadInformation[0] +
-         "\n" +
-         "Messaging = " +
-         journalLoadInformation[1] +
-         "\n" +
-         "**********************************************************";
-   }
-
    private void finishSynchronization(String liveID) throws Exception {
+      if (isTrace) {
+         logger.trace("finishSynchronization::" + liveID);
+      }
       for (JournalContent jc : EnumSet.allOf(JournalContent.class)) {
          Journal journal = journalsHolder.remove(jc);
          journal.synchronizationLock();
@@ -427,7 +390,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
     * @param msg
     * @throws Exception
     */
-   private synchronized void handleReplicationSynchronization(ReplicationSyncFileMessage msg) throws Exception {
+   private void handleReplicationSynchronization(ReplicationSyncFileMessage msg) throws Exception {
       Long id = Long.valueOf(msg.getId());
       byte[] data = msg.getData();
       SequentialFile channel1;
@@ -462,7 +425,6 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
       }
 
       if (data == null) {
-         channel1.close();
          return;
       }
 
@@ -477,69 +439,73 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
     * {@link FileWrapperJournal} in place to store messages while synchronization is going on.
     *
     * @param packet
-    * @throws Exception
     * @return if the incoming packet indicates the synchronization is finished then return an acknowledgement otherwise
-    *         return an empty response
+    * return an empty response
+    * @throws Exception
     */
    private ReplicationResponseMessageV2 handleStartReplicationSynchronization(final ReplicationStartSyncMessage packet) throws Exception {
-      ReplicationResponseMessageV2 replicationResponseMessage = new ReplicationResponseMessageV2();
-      if (activation.isRemoteBackupUpToDate()) {
-         throw ActiveMQMessageBundle.BUNDLE.replicationBackupUpToDate();
-      }
 
-      synchronized (this) {
-         if (!started)
-            return replicationResponseMessage;
+      if (isTrace) {
+         logger.trace("handleStartReplicationSynchronization:: nodeID = " + packet);
+      }
+      ReplicationResponseMessageV2 replicationResponseMessage = new ReplicationResponseMessageV2();
+      if (!started)
+         return replicationResponseMessage;
 
-         if (packet.isSynchronizationFinished()) {
-            finishSynchronization(packet.getNodeID());
-            replicationResponseMessage.setSynchronizationIsFinishedAcknowledgement(true);
-            return replicationResponseMessage;
-         }
+      if (packet.isSynchronizationFinished()) {
+         finishSynchronization(packet.getNodeID());
+         replicationResponseMessage.setSynchronizationIsFinishedAcknowledgement(true);
+         return replicationResponseMessage;
+      }
 
-         switch (packet.getDataType()) {
-            case LargeMessages:
-               for (long msgID : packet.getFileIds()) {
-                  createLargeMessage(msgID, true);
-               }
-               break;
-            case JournalBindings:
-            case JournalMessages:
-               if (wantedFailBack && !packet.isServerToFailBack()) {
-                  ActiveMQServerLogger.LOGGER.autoFailBackDenied();
-               }
+      switch (packet.getDataType()) {
+         case LargeMessages:
+            for (long msgID : packet.getFileIds()) {
+               createLargeMessage(msgID, true);
+            }
+            break;
+         case JournalBindings:
+         case JournalMessages:
+            if (wantedFailBack && !packet.isServerToFailBack()) {
+               ActiveMQServerLogger.LOGGER.autoFailBackDenied();
+            }
 
-               final JournalContent journalContent = SyncDataType.getJournalContentType(packet.getDataType());
-               final Journal journal = journalsHolder.get(journalContent);
+            final JournalContent journalContent = SyncDataType.getJournalContentType(packet.getDataType());
+            final Journal journal = journalsHolder.get(journalContent);
 
-               if (packet.getNodeID() != null) {
-                  // At the start of replication, we still do not know which is the nodeID that the live uses.
-                  // This is the point where the backup gets this information.
-                  backupQuorum.liveIDSet(packet.getNodeID());
-               }
-               Map<Long, JournalSyncFile> mapToFill = filesReservedForSync.get(journalContent);
+            if (packet.getNodeID() != null) {
+               // At the start of replication, we still do not know which is the nodeID that the live uses.
+               // This is the point where the backup gets this information.
+               backupQuorum.liveIDSet(packet.getNodeID());
+            }
+            Map<Long, JournalSyncFile> mapToFill = filesReservedForSync.get(journalContent);
 
-               for (Entry<Long, JournalFile> entry : journal.createFilesForBackupSync(packet.getFileIds()).entrySet()) {
-                  mapToFill.put(entry.getKey(), new JournalSyncFile(entry.getValue()));
-               }
-               FileWrapperJournal syncJournal = new FileWrapperJournal(journal);
-               registerJournal(journalContent.typeByte, syncJournal);
-               break;
-            default:
-               throw ActiveMQMessageBundle.BUNDLE.replicationUnhandledDataType();
-         }
+            for (Entry<Long, JournalFile> entry : journal.createFilesForBackupSync(packet.getFileIds()).entrySet()) {
+               mapToFill.put(entry.getKey(), new JournalSyncFile(entry.getValue()));
+            }
+            FileWrapperJournal syncJournal = new FileWrapperJournal(journal);
+            registerJournal(journalContent.typeByte, syncJournal);
+            break;
+         default:
+            throw ActiveMQMessageBundle.BUNDLE.replicationUnhandledDataType();
       }
 
       return replicationResponseMessage;
    }
 
    private void handleLargeMessageEnd(final ReplicationLargeMessageEndMessage packet) {
+      if (isTrace) {
+         logger.trace("handleLargeMessageEnd on " + packet.getMessageId());
+      }
       final ReplicatedLargeMessage message = lookupLargeMessage(packet.getMessageId(), true, false);
       if (message != null) {
          executor.execute(new Runnable() {
             @Override
             public void run() {
                try {
+                  if (isTrace) {
+                     logger.trace("Deleting LargeMessage " + packet.getMessageId() + " on the executor @ handleLargeMessageEnd");
+                  }
                   message.deleteFile();
                }
                catch (Exception e) {
@@ -560,7 +526,9 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
       }
    }
 
-   private ReplicatedLargeMessage lookupLargeMessage(final long messageId, final boolean delete, final boolean createIfNotExists) {
+   private ReplicatedLargeMessage lookupLargeMessage(final long messageId,
+                                                     final boolean delete,
+                                                     final boolean createIfNotExists) {
       ReplicatedLargeMessage message;
 
       if (delete) {
@@ -590,7 +558,9 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
    private void handleLargeMessageBegin(final ReplicationLargeMessageBeginMessage packet) {
       final long id = packet.getMessageId();
       createLargeMessage(id, false);
-      ActiveMQServerLogger.LOGGER.trace("Receiving Large Message " + id + " on backup");
+      if (isTrace) {
+         logger.trace("Receiving Large Message Begin " + id + " on backup");
+      }
    }
 
    private void createLargeMessage(final long id, boolean liveToBackupSync) {
@@ -666,14 +636,14 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
    private void handleAppendAddRecord(final ReplicationAddMessage packet) throws Exception {
       Journal journalToUse = getJournal(packet.getJournalID());
       if (packet.getRecord() == ADD_OPERATION_TYPE.UPDATE) {
-         if (ReplicationEndpoint.trace) {
-            ActiveMQServerLogger.LOGGER.trace("Endpoint appendUpdate id = " + packet.getId());
+         if (isTrace) {
+           logger.trace("Endpoint appendUpdate id = " + packet.getId());
          }
          journalToUse.appendUpdateRecord(packet.getId(), packet.getJournalRecordType(), packet.getRecordData(), noSync);
       }
       else {
-         if (ReplicationEndpoint.trace) {
-            ActiveMQServerLogger.LOGGER.trace("Endpoint append id = " + packet.getId());
+         if (isTrace) {
+            logger.trace("Endpoint append id = " + packet.getId());
          }
          journalToUse.appendAddRecord(packet.getId(), packet.getJournalRecordType(), packet.getRecordData(), noSync);
       }
@@ -807,7 +777,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
     *
     * @param backupQuorum
     */
-   public synchronized void setBackupQuorum(SharedNothingBackupQuorum backupQuorum) {
+   public void setBackupQuorum(SharedNothingBackupQuorum backupQuorum) {
       this.backupQuorum = backupQuorum;
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d6c7e305/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
index 4aabbea..4081dd9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
@@ -70,6 +70,7 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
 import org.apache.activemq.artemis.utils.ExecutorFactory;
 import org.apache.activemq.artemis.utils.ReusableLatch;
+import org.jboss.logging.Logger;
 
 /**
  * Manages replication tasks on the live server (that is the live server side of a "remote backup"
@@ -81,6 +82,10 @@ import org.apache.activemq.artemis.utils.ReusableLatch;
  */
 public final class ReplicationManager implements ActiveMQComponent, ReadyListener {
 
+
+   Logger logger = Logger.getLogger(ReplicationManager.class);
+   final boolean isTrace = logger.isTraceEnabled();
+
    public enum ADD_OPERATION_TYPE {
       UPDATE {
          @Override
@@ -330,7 +335,7 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
       return sendReplicatePacket(packet, true);
    }
 
-   private synchronized OperationContext sendReplicatePacket(final Packet packet, boolean lineUp) {
+   private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp) {
       if (!enabled)
          return null;
       boolean runItNow = false;
@@ -578,6 +583,11 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
     */
    public void sendSynchronizationDone(String nodeID, long initialReplicationSyncTimeout) {
       if (enabled) {
+
+         if (isTrace) {
+            logger.trace("sendSynchronizationDone ::" + nodeID + ", " + initialReplicationSyncTimeout);
+         }
+
          synchronizationIsFinishedAcknowledgement.countUp();
          sendReplicatePacket(new ReplicationStartSyncMessage(nodeID));
          try {