You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/09/26 18:48:52 UTC

[04/17] activemq-artemis git commit: ARTEMIS-1221 Duplicated ID causes LargeMessage lost at backup

ARTEMIS-1221 Duplicated ID causes LargeMessage lost at backup

When a large message is replicated to backup, a pendingID is generated
when the large message is finished. This pendingID is generated by a
BatchingIDGenerator at backup.

It is possible that a pendingID generated at backup may be a duplicate
to an ID generated at live server.

This can cause a problem when a large message with a messageID that is
the same as another largemessage's pendingID is replicated and stored
in the backup's journal, and then a deleteRecord for the pendingID
is appended. If backup becomes live and loads the journal, it will
drop the large message add record because there is a deleteRecord of
the same ID (even though it is a pendingID of another message).
As a result the expecting client will never get this large message.

So in summary, the root cause is that the pendingIDs for large
messages are generated at backup while backup is not alive.

The solution to this is that instead of the backup generating
the pendingID, we make them all be generated in advance
at live server and let them replicated to backup whereever needed.
The ID generater at backup only works when backup becomes live
(when it is properly initialized from journal).

(cherry picked from commit d50f577cd50df37634f592db65200861fe3e13d3)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/7b1a1b05
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/7b1a1b05
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/7b1a1b05

Branch: refs/heads/1.x
Commit: 7b1a1b058a5ec84267363ad1d12eacef5f3d8a1d
Parents: 5faf2cd
Author: Howard Gao <ho...@gmail.com>
Authored: Thu Jun 29 00:03:47 2017 +0800
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Sep 26 14:28:05 2017 -0400

----------------------------------------------------------------------
 .../journal/AbstractJournalStorageManager.java  |   2 +-
 .../impl/journal/JournalStorageManager.java     |  19 ++-
 .../impl/journal/LargeServerMessageImpl.java    |   2 +-
 .../impl/journal/LargeServerMessageInSync.java  |  10 ++
 .../ReplicationLargeMessageEndMessage.java      |  19 ++-
 .../replication/ReplicatedLargeMessage.java     |   4 +
 .../core/replication/ReplicationEndpoint.java   |   1 +
 .../core/replication/ReplicationManager.java    |   7 +-
 .../artemis/core/server/LargeServerMessage.java |   6 +-
 .../failover/FailoverTestWithDivert.java        | 148 +++++++++++++++++++
 .../replication/ReplicationTest.java            |   4 +-
 11 files changed, 203 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b1a1b05/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index bf682ff..5ea104b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -1546,7 +1546,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
       if (largeServerMessage.getPendingRecordID() >= 0) {
          try {
             confirmPendingLargeMessage(largeServerMessage.getPendingRecordID());
-            largeServerMessage.setPendingRecordID(-1);
+            largeServerMessage.setPendingRecordID(LargeServerMessage.NO_PENDING_ID);
          } catch (Exception e) {
             ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b1a1b05/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index e79a9cb..ca1b805 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -272,7 +272,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
             ActiveMQServerLogger.LOGGER.journalErrorDeletingMessage(e, largeMsgId);
          }
          if (replicator != null) {
-            replicator.largeMessageDelete(largeMsgId);
+            replicator.largeMessageDelete(largeMsgId, JournalStorageManager.this);
          }
       }
       largeMessagesToDelete.clear();
@@ -375,10 +375,17 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
       journalFF.releaseBuffer(buffer);
    }
 
-   public long storePendingLargeMessage(final long messageID) throws Exception {
+   public long storePendingLargeMessage(final long messageID, long recordID) throws Exception {
       readLock();
       try {
-         long recordID = generateID();
+         if (recordID == LargeServerMessage.NO_PENDING_ID) {
+            recordID = generateID();
+         } else {
+            //this means the large message doesn't
+            //have a pendingRecordID, but one has been
+            //generated (coming from live server) for use.
+            recordID = -recordID;
+         }
 
          messageJournal.appendAddRecord(recordID, JournalRecordIds.ADD_LARGE_MESSAGE_PENDING, new PendingLargeMessageEncoding(messageID), true, getContext(true));
 
@@ -396,7 +403,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
             // And the client won't be waiting for the actual file to be deleted.
             // We set a temporary record (short lived) on the journal
             // to avoid a situation where the server is restarted and pending large message stays on forever
-            largeServerMessage.setPendingRecordID(storePendingLargeMessage(largeServerMessage.getMessageID()));
+            largeServerMessage.setPendingRecordID(storePendingLargeMessage(largeServerMessage.getMessageID(), largeServerMessage.getPendingRecordID()));
          } catch (Exception e) {
             throw new ActiveMQInternalErrorException(e.getMessage(), e);
          }
@@ -427,7 +434,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
                readLock();
                try {
                   if (replicator != null) {
-                     replicator.largeMessageDelete(largeServerMessage.getMessageID());
+                     replicator.largeMessageDelete(largeServerMessage.getMessageID(), JournalStorageManager.this);
                   }
                   file.delete();
 
@@ -475,7 +482,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
 
          if (largeMessage.isDurable()) {
             // We store a marker on the journal that the large file is pending
-            long pendingRecordID = storePendingLargeMessage(id);
+            long pendingRecordID = storePendingLargeMessage(id, LargeServerMessage.NO_PENDING_ID);
 
             largeMessage.setPendingRecordID(pendingRecordID);
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b1a1b05/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
index 22929e7..22cfa0b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
@@ -44,7 +44,7 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L
 
    private final JournalStorageManager storageManager;
 
-   private long pendingRecordID = -1;
+   private long pendingRecordID = NO_PENDING_ID;
 
    private boolean paged;
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b1a1b05/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageInSync.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageInSync.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageInSync.java
index 42126d4..66ccd8c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageInSync.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageInSync.java
@@ -158,4 +158,14 @@ public final class LargeServerMessageInSync implements ReplicatedLargeMessage {
       storageManager.addBytesToLargeMessage(appendFile, mainLM.getMessageID(), bytes);
    }
 
+   @Override
+   public void setPendingRecordID(long pendingRecordID) {
+      mainLM.setPendingRecordID(pendingRecordID);
+   }
+
+   @Override
+   public long getPendingRecordID() {
+      return mainLM.getPendingRecordID();
+   }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b1a1b05/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java
index 4a09cc0..a9be86a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java
@@ -23,31 +23,40 @@ import org.apache.activemq.artemis.utils.DataConstants;
 public class ReplicationLargeMessageEndMessage extends PacketImpl {
 
    long messageId;
+   long pendingRecordId;
 
    public ReplicationLargeMessageEndMessage() {
       super(PacketImpl.REPLICATION_LARGE_MESSAGE_END);
    }
 
-   public ReplicationLargeMessageEndMessage(final long messageId) {
+   public ReplicationLargeMessageEndMessage(final long messageId, final long pendingRecordId) {
       this();
       this.messageId = messageId;
+      //we use negative value to indicate that this id is pre-generated by live node
+      //so that it won't be generated at backup.
+      //see https://issues.apache.org/jira/browse/ARTEMIS-1221
+      this.pendingRecordId = -pendingRecordId;
    }
 
-
    @Override
    public int expectedEncodeSize() {
       return PACKET_HEADERS_SIZE +
-         DataConstants.SIZE_LONG; // buffer.writeLong(messageId);
+         DataConstants.SIZE_LONG + // buffer.writeLong(messageId)
+         DataConstants.SIZE_LONG; // buffer.writeLong(pendingRecordId);
    }
 
    @Override
    public void encodeRest(final ActiveMQBuffer buffer) {
       buffer.writeLong(messageId);
+      buffer.writeLong(pendingRecordId);
    }
 
    @Override
    public void decodeRest(final ActiveMQBuffer buffer) {
       messageId = buffer.readLong();
+      if (buffer.readableBytes() >= DataConstants.SIZE_LONG) {
+         pendingRecordId = buffer.readLong();
+      }
    }
 
    /**
@@ -85,4 +94,8 @@ public class ReplicationLargeMessageEndMessage extends PacketImpl {
          return false;
       return true;
    }
+
+   public long getPendingRecordId() {
+      return pendingRecordId;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b1a1b05/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedLargeMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedLargeMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedLargeMessage.java
index 3b6327a..b744805 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedLargeMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedLargeMessage.java
@@ -52,4 +52,8 @@ public interface ReplicatedLargeMessage {
     */
    void addBytes(byte[] body) throws Exception;
 
+   void setPendingRecordID(long pendingRecordID);
+
+   long getPendingRecordID();
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b1a1b05/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
index 5a01bf7..d6f807c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
@@ -519,6 +519,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
       }
       final ReplicatedLargeMessage message = lookupLargeMessage(packet.getMessageId(), true, false);
       if (message != null) {
+         message.setPendingRecordID(packet.getPendingRecordId());
          executor.execute(new Runnable() {
             @Override
             public void run() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b1a1b05/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
index d8d70f0..e1027d4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
@@ -42,6 +42,7 @@ import org.apache.activemq.artemis.core.journal.impl.JournalFile;
 import org.apache.activemq.artemis.core.paging.PagedMessage;
 import org.apache.activemq.artemis.core.persistence.OperationContext;
 import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager;
+import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
 import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
 import org.apache.activemq.artemis.core.protocol.core.Channel;
 import org.apache.activemq.artemis.core.protocol.core.ChannelHandler;
@@ -238,9 +239,11 @@ public final class ReplicationManager implements ActiveMQComponent {
       }
    }
 
-   public void largeMessageDelete(final Long messageId) {
+   //we pass in storageManager to generate ID only if enabled
+   public void largeMessageDelete(final Long messageId, JournalStorageManager storageManager) {
       if (enabled) {
-         sendReplicatePacket(new ReplicationLargeMessageEndMessage(messageId));
+         long pendingRecordID = storageManager.generateID();
+         sendReplicatePacket(new ReplicationLargeMessageEndMessage(messageId, pendingRecordID));
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b1a1b05/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java
index 2a16ed2..38f36ad 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java
@@ -22,13 +22,11 @@ import org.apache.activemq.artemis.core.replication.ReplicatedLargeMessage;
 
 public interface LargeServerMessage extends ServerMessage, ReplicatedLargeMessage {
 
+   long NO_PENDING_ID = -1;
+
    @Override
    void addBytes(byte[] bytes) throws Exception;
 
-   void setPendingRecordID(long pendingRecordID);
-
-   long getPendingRecordID();
-
    /**
     * We have to copy the large message content in case of DLQ and paged messages
     * For that we need to pre-mark the LargeMessage with a flag when it is paged

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b1a1b05/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestWithDivert.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestWithDivert.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestWithDivert.java
new file mode 100644
index 0000000..76efc22
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestWithDivert.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.cluster.failover;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
+import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
+import org.apache.activemq.artemis.core.config.DivertConfiguration;
+import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class FailoverTestWithDivert extends FailoverTestBase {
+
+   private static final String DIVERT_ADDRESS = "jms.queue.testQueue";
+   private static final String DIVERT_FORWARD_ADDRESS = "jms.queue.divertedQueue";
+   private ClientSessionFactoryInternal sf;
+
+   @Before
+   @Override
+   public void setUp() throws Exception {
+      super.setUp();
+   }
+
+   @Override
+   protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live) {
+      return getNettyAcceptorTransportConfiguration(live);
+   }
+
+   @Override
+   protected TransportConfiguration getConnectorTransportConfiguration(final boolean live) {
+      return getNettyConnectorTransportConfiguration(live);
+   }
+
+   @Override
+   protected void createConfigs() throws Exception {
+      createReplicatedConfigs();
+
+      liveConfig.setJournalFileSize(10240000);
+      backupConfig.setJournalFileSize(10240000);
+      addQueue(liveConfig, DIVERT_ADDRESS, DIVERT_ADDRESS);
+      addQueue(liveConfig, DIVERT_FORWARD_ADDRESS, DIVERT_FORWARD_ADDRESS);
+      addDivert(liveConfig, DIVERT_ADDRESS, DIVERT_FORWARD_ADDRESS, false);
+      addDivert(backupConfig, DIVERT_ADDRESS, DIVERT_FORWARD_ADDRESS, false);
+   }
+
+   private void addQueue(Configuration serverConfig, String address, String name) {
+
+      List<CoreAddressConfiguration> addrConfigs = serverConfig.getAddressConfigurations();
+      CoreAddressConfiguration addrCfg = new CoreAddressConfiguration();
+      addrCfg.setName(address);
+      addrCfg.addRoutingType(RoutingType.ANYCAST);
+      CoreQueueConfiguration qConfig = new CoreQueueConfiguration();
+      qConfig.setName(name);
+      qConfig.setAddress(address);
+      addrCfg.addQueueConfiguration(qConfig);
+      addrConfigs.add(addrCfg);
+   }
+
+   private void addDivert(Configuration serverConfig, String source, String target, boolean exclusive) {
+      List<DivertConfiguration> divertConfigs = serverConfig.getDivertConfigurations();
+      DivertConfiguration newDivert = new DivertConfiguration();
+      newDivert.setName("myDivert");
+      newDivert.setAddress(source);
+      newDivert.setForwardingAddress(target);
+      newDivert.setExclusive(exclusive);
+      divertConfigs.add(newDivert);
+   }
+
+   @Test
+   public void testUniqueIDsWithDivert() throws Exception {
+      Map<String, Object> params = new HashMap<>();
+      params.put(TransportConstants.HOST_PROP_NAME, "localhost");
+      TransportConfiguration tc = createTransportConfiguration(true, false, params);
+      ServerLocator locator = addServerLocator(ActiveMQClient.createServerLocatorWithHA(tc)).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true).setReconnectAttempts(-1);
+      sf = createSessionFactoryAndWaitForTopology(locator, 2);
+      int minLarge = locator.getMinLargeMessageSize();
+
+      ClientSession session = sf.createSession(false, false);
+      addClientSession(session);
+      session.start();
+
+      final int num = 100;
+      ClientProducer producer = session.createProducer(DIVERT_ADDRESS);
+      for (int i = 0; i < num; i++) {
+         ClientMessage message = createLargeMessage(session, 2 * minLarge);
+         producer.send(message);
+      }
+      session.commit();
+
+      ClientConsumer consumer = session.createConsumer(DIVERT_ADDRESS);
+      for (int i = 0;  i < num; i++) {
+         ClientMessage receivedFromSourceQueue = consumer.receive(5000);
+         assertNotNull(receivedFromSourceQueue);
+         receivedFromSourceQueue.acknowledge();
+      }
+      session.commit();
+
+      crash(session);
+
+      ClientConsumer consumer1 = session.createConsumer(DIVERT_FORWARD_ADDRESS);
+      for (int i = 0; i < num; i++) {
+         ClientMessage receivedFromTargetQueue = consumer1.receive(5000);
+         assertNotNull(receivedFromTargetQueue);
+         receivedFromTargetQueue.acknowledge();
+      }
+      session.commit();
+   }
+
+   private ClientMessage createLargeMessage(ClientSession session, final int largeSize) {
+      ClientMessage message = session.createMessage(true);
+      ActiveMQBuffer bodyBuffer = message.getBodyBuffer();
+      final int propSize = 10240;
+      while (bodyBuffer.writerIndex() < largeSize) {
+         byte[] prop = new byte[propSize];
+         bodyBuffer.writeBytes(prop);
+      }
+      return message;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b1a1b05/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
index 1ae9527..398e895 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
@@ -204,7 +204,7 @@ public final class ReplicationTest extends ActiveMQTestBase {
    public void testSendPackets() throws Exception {
       setupServer(true);
 
-      StorageManager storage = getStorage();
+      JournalStorageManager storage = getStorage();
 
       manager = liveServer.getReplicationManager();
       waitForComponent(manager);
@@ -270,7 +270,7 @@ public final class ReplicationTest extends ActiveMQTestBase {
 
       manager.largeMessageWrite(500, new byte[1024]);
 
-      manager.largeMessageDelete(Long.valueOf(500));
+      manager.largeMessageDelete(Long.valueOf(500), storage);
 
       blockOnReplication(storage, manager);