You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/01/05 15:15:09 UTC

[1/4] activemq-artemis git commit: ignoring stuff from cmake / native build

Repository: activemq-artemis
Updated Branches:
  refs/heads/master 2c430e597 -> 86934c91e


ignoring stuff from cmake / native build


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/34b66351
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/34b66351
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/34b66351

Branch: refs/heads/master
Commit: 34b66351fbe0cc2f1f5106181c73cefbca94f53d
Parents: 2c430e5
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue Dec 29 00:30:37 2015 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Jan 4 11:21:10 2016 -0500

----------------------------------------------------------------------
 .gitignore                                           |  8 ++++----
 .../src/main/assembly/source-assembly.xml            | 15 +++++++++++++++
 pom.xml                                              |  8 ++++++++
 3 files changed, 27 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/34b66351/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index ce57485..c4b0866 100644
--- a/.gitignore
+++ b/.gitignore
@@ -11,10 +11,10 @@ ratReport.txt
 .factorypath
 
 # for native build
-**/CMakeCache.txt
-**/CMakeFiles/
-**/Makefile
-**/cmake_install.cmake
+CMakeCache.txt
+CMakeFiles/
+Makefile
+cmake_install.cmake
 
 # this file is generated
 artemis-native/src/main/c/org_apache_activemq_artemis_jlibaio_LibaioContext.h

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/34b66351/artemis-distribution/src/main/assembly/source-assembly.xml
----------------------------------------------------------------------
diff --git a/artemis-distribution/src/main/assembly/source-assembly.xml b/artemis-distribution/src/main/assembly/source-assembly.xml
index acf6b11..7f0f00e 100644
--- a/artemis-distribution/src/main/assembly/source-assembly.xml
+++ b/artemis-distribution/src/main/assembly/source-assembly.xml
@@ -56,6 +56,21 @@
             <exclude>artemis_doap.rdf</exclude>
             <exclude>artemis-native/bin/</exclude>
 
+            <!-- Files generated from automake -->
+            <exclude>CMakeCache.txt</exclude>
+            <exclude>CMakeFiles/</exclude>
+            <exclude>Makefile</exclude>
+            <exclude>artemis-native/CMakeCache.txt</exclude>
+            <exclude>artemis-native/CMakeFiles/</exclude>
+            <exclude>artemis-native/Makefile</exclude>
+            <exclude>artemis-native/cmake_install.cmake</exclude>
+            <exclude>artemis-native/src/main/c/CMakeFiles/</exclude>
+            <exclude>artemis-native/src/main/c/Makefile</exclude>
+            <exclude>artemis-native/src/main/c/cmake_install.cmake</exclude>
+            <exclude>artemis-native/src/main/c/org_apache_activemq_artemis_jlibaio_LibaioContext.h</exclude>
+            <exclude>cmake_install.cmake</exclude>
+
+
             <!--  build output  -->
             <exclude>
                %regex[(?!((?!${project.build.directory}/)[^/]+/)*src/).*${project.build.directory}.*]

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/34b66351/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 9a4a781..ce35540 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1096,6 +1096,14 @@
                   <exclude>**/node/**</exclude>
                   <exclude>**/node_modules/**</exclude>
                   <exclude>**/package.json</exclude>
+
+                  <!-- things from cmake on the native build -->
+                  <exclude>**/CMakeCache.txt</exclude>
+                  <exclude>**/CMakeFiles/</exclude>
+                  <exclude>**/Makefile</exclude>
+                  <exclude>**/cmake_install.cmake</exclude>
+                  <exclude>artemis-native/src/main/c/org_apache_activemq_artemis_jlibaio_LibaioContext.h</exclude>
+
                </excludes>
             </configuration>
             <executions>


[4/4] activemq-artemis git commit: This closes #291

Posted by cl...@apache.org.
This closes #291


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/86934c91
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/86934c91
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/86934c91

Branch: refs/heads/master
Commit: 86934c91e65f0a2bf58a5286433da50e60ca9fdd
Parents: 2c430e5 96849a4
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue Jan 5 09:14:57 2016 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Jan 5 09:14:57 2016 -0500

----------------------------------------------------------------------
 .gitignore                                      |   8 +-
 .../apache/activemq/artemis/utils/ByteUtil.java |  37 ++-
 .../activemq/artemis/utils/ByteUtilTest.java    |   8 +
 .../core/protocol/core/impl/ChannelImpl.java    |   4 +
 .../src/main/assembly/source-assembly.xml       |  15 ++
 .../core/io/IOCriticalErrorListener.java        |   2 +-
 .../openwire/amq/AMQServerConsumer.java         |  37 +--
 .../core/management/impl/QueueControlImpl.java  |   4 +-
 .../artemis/core/paging/PagingStore.java        |   2 +
 .../artemis/core/paging/PagingStoreFactory.java |   2 +
 .../artemis/core/paging/cursor/PageCache.java   |  11 -
 .../core/paging/cursor/PageCursorProvider.java  |   5 +-
 .../core/paging/cursor/PageSubscription.java    |   5 +-
 .../core/paging/cursor/PagedReference.java      |   3 +-
 .../core/paging/cursor/PagedReferenceImpl.java  |  36 ++-
 .../paging/cursor/impl/LivePageCacheImpl.java   |  10 -
 .../core/paging/cursor/impl/PageCacheImpl.java  |  37 +--
 .../cursor/impl/PageCursorProviderImpl.java     |  61 +++--
 .../cursor/impl/PageSubscriptionImpl.java       | 147 +++++++----
 .../core/paging/impl/PagingStoreFactoryNIO.java |   4 +
 .../core/paging/impl/PagingStoreImpl.java       |   6 +
 .../core/persistence/StorageManager.java        |   2 +
 .../impl/journal/JournalStorageManager.java     |  25 +-
 .../impl/nullpm/NullStorageManager.java         |  23 +-
 .../core/postoffice/DuplicateIDCache.java       |   2 +
 .../artemis/core/postoffice/PostOffice.java     |   2 +-
 .../postoffice/impl/DuplicateIDCacheImpl.java   |  93 ++++++-
 .../core/postoffice/impl/PostOfficeImpl.java    |  36 +--
 .../core/ServerSessionPacketHandler.java        |  90 ++++---
 .../artemis/core/server/MessageReference.java   |   4 +-
 .../activemq/artemis/core/server/Queue.java     |   7 +-
 .../core/server/ScheduledDeliveryHandler.java   |   5 +-
 .../artemis/core/server/ServerSession.java      |   2 +
 .../cluster/impl/ClusterConnectionImpl.java     |   2 +-
 .../core/server/impl/ActiveMQServerImpl.java    |  30 +--
 .../core/server/impl/LastValueQueue.java        |  79 ++++--
 .../core/server/impl/MessageReferenceImpl.java  |   2 +-
 .../artemis/core/server/impl/QueueImpl.java     |  87 ++++---
 .../artemis/core/server/impl/RefsOperation.java |  27 +-
 .../impl/ScheduledDeliveryHandlerImpl.java      |   5 +-
 .../core/server/impl/ServerConsumerImpl.java    |  62 ++++-
 .../core/server/impl/ServerSessionImpl.java     |  15 ++
 .../impl/ScheduledDeliveryHandlerTest.java      |   4 +-
 .../transaction/impl/TransactionImplTest.java   |   5 +
 pom.xml                                         |   8 +
 .../integration/DuplicateDetectionTest.java     | 249 ++++++-------------
 .../storage/PersistMultiThreadTest.java         |   4 +
 .../core/paging/impl/PagingStoreImplTest.java   |   4 +
 .../impl/DuplicateDetectionUnitTest.java        |   2 +-
 .../core/server/impl/fakes/FakeConsumer.java    |  60 +++--
 .../core/server/impl/fakes/FakePostOffice.java  |   3 +-
 51 files changed, 815 insertions(+), 568 deletions(-)
----------------------------------------------------------------------



[2/4] activemq-artemis git commit: ARTEMIS-332 - Duplicate delivery over Bridges under OME scenarios, paging and other failures

Posted by cl...@apache.org.
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index b0a5a7e..ae93a97 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -415,7 +415,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
             backupActivationThread.start();
          }
          else {
-            ActiveMQServerLogger.LOGGER.serverStarted(getVersion().getFullVersion(), configuration.getName(), nodeManager.getNodeId(),  identity != null ? identity : "" );
+            ActiveMQServerLogger.LOGGER.serverStarted(getVersion().getFullVersion(), configuration.getName(), nodeManager.getNodeId(), identity != null ? identity : "");
          }
          // start connector service
          connectorsService = new ConnectorsService(configuration, storageManager, scheduledPool, postOffice, serviceRegistry);
@@ -508,18 +508,19 @@ public class ActiveMQServerImpl implements ActiveMQServer {
     * Stops the server in a different thread.
     */
    public final void stopTheServer(final boolean criticalIOError) {
-      ExecutorService executor = Executors.newSingleThreadExecutor();
-      executor.submit(new Runnable() {
+      Thread thread = new Thread() {
          @Override
          public void run() {
             try {
-               stop(false, criticalIOError, false);
+               ActiveMQServerImpl.this.stop(false, criticalIOError, false);
             }
             catch (Exception e) {
                ActiveMQServerLogger.LOGGER.errorStoppingServer(e);
             }
          }
-      });
+      };
+
+      thread.start();
    }
 
    @Override
@@ -722,7 +723,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
          }
       }
 
-
       pagingManager = null;
       securityStore = null;
       resourceManager = null;
@@ -1016,7 +1016,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
       if (securityStore != null) {
          X509Certificate[] certificates = null;
          if (connection.getTransportConnection() instanceof NettyConnection) {
-            certificates = CertificateUtil.getCertsFromChannel(((NettyConnection)connection.getTransportConnection()).getChannel());
+            certificates = CertificateUtil.getCertsFromChannel(((NettyConnection) connection.getTransportConnection()).getChannel());
          }
          securityStore.authenticate(username, password, certificates);
       }
@@ -1428,7 +1428,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
          throw ActiveMQMessageBundle.BUNDLE.bindingNotDivert(name);
       }
 
-      postOffice.removeBinding(name, null);
+      postOffice.removeBinding(name, null, true);
    }
 
    @Override
@@ -1954,11 +1954,16 @@ public class ActiveMQServerImpl implements ActiveMQServer {
       boolean failedAlready = false;
 
       @Override
-      public synchronized void onIOException(Exception cause, String message, SequentialFile file) {
+      public synchronized void onIOException(Throwable cause, String message, SequentialFile file) {
          if (!failedAlready) {
             failedAlready = true;
 
-            ActiveMQServerLogger.LOGGER.ioCriticalIOError(message, file.toString(), cause);
+            if (file == null) {
+               ActiveMQServerLogger.LOGGER.ioCriticalIOError(message, "NULL", cause);
+            }
+            else {
+               ActiveMQServerLogger.LOGGER.ioCriticalIOError(message, file.toString(), cause);
+            }
 
             stopTheServer(true);
          }
@@ -2021,10 +2026,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
     * move any older data away and log a warning about it.
     */
    void moveServerData() {
-      File[] dataDirs = new File[]{configuration.getBindingsLocation(),
-                                   configuration.getJournalLocation(),
-                                   configuration.getPagingLocation(),
-                                   configuration.getLargeMessagesLocation()};
+      File[] dataDirs = new File[]{configuration.getBindingsLocation(), configuration.getJournalLocation(), configuration.getPagingLocation(), configuration.getLargeMessagesLocation()};
 
       boolean allEmpty = true;
       int lowestSuffixForMovedData = 1;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
index 5420688..c6d5aee 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
@@ -21,6 +21,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 
+import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.filter.Filter;
@@ -66,7 +67,15 @@ public class LastValueQueue extends QueueImpl {
 
    @Override
    public synchronized void addTail(final MessageReference ref, final boolean direct) {
-      SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME);
+      SimpleString prop;
+
+      try {
+         prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME);
+      }
+      catch (ActiveMQException e) {
+         criticalError(e);
+         throw new IllegalStateException(e);
+      }
 
       if (prop != null) {
          HolderReference hr = map.get(prop);
@@ -103,45 +112,59 @@ public class LastValueQueue extends QueueImpl {
 
    @Override
    public synchronized void addHead(final MessageReference ref) {
-      SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME);
+      try {
+         SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME);
 
-      if (prop != null) {
-         HolderReference hr = map.get(prop);
+         if (prop != null) {
+            HolderReference hr = map.get(prop);
 
-         if (hr != null) {
-            // We keep the current ref and ack the one we are returning
+            if (hr != null) {
+               // We keep the current ref and ack the one we are returning
 
-            super.referenceHandled();
+               super.referenceHandled();
 
-            try {
-               super.acknowledge(ref);
+               try {
+                  super.acknowledge(ref);
+               }
+               catch (Exception e) {
+                  ActiveMQServerLogger.LOGGER.errorAckingOldReference(e);
+               }
             }
-            catch (Exception e) {
-               ActiveMQServerLogger.LOGGER.errorAckingOldReference(e);
+            else {
+               map.put(prop, (HolderReference) ref);
+
+               super.addHead(ref);
             }
          }
          else {
-            map.put(prop, (HolderReference) ref);
-
             super.addHead(ref);
          }
       }
-      else {
-         super.addHead(ref);
+      catch (ActiveMQException e) {
+         criticalError(e);
+         throw new IllegalStateException(e);
       }
    }
 
    @Override
    protected void refRemoved(MessageReference ref) {
-      synchronized (this) {
-         SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME);
+      try {
 
-         if (prop != null) {
-            map.remove(prop);
+         synchronized (this) {
+            SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME);
+
+            if (prop != null) {
+               map.remove(prop);
+            }
          }
+
+         super.refRemoved(ref);
+      }
+      catch (ActiveMQException e) {
+         criticalError(e);
+         throw new IllegalStateException(e);
       }
 
-      super.refRemoved(ref);
    }
 
    private class HolderReference implements MessageReference {
@@ -200,7 +223,13 @@ public class LastValueQueue extends QueueImpl {
 
       @Override
       public ServerMessage getMessage() {
-         return ref.getMessage();
+         try {
+            return ref.getMessage();
+         }
+         catch (ActiveMQException e) {
+            criticalError(e);
+            throw new IllegalStateException(e);
+         }
       }
 
       @Override
@@ -256,7 +285,13 @@ public class LastValueQueue extends QueueImpl {
        */
       @Override
       public int getMessageMemoryEstimate() {
-         return ref.getMessage().getMemoryEstimate();
+         try {
+            return ref.getMessage().getMemoryEstimate();
+         }
+         catch (ActiveMQException e) {
+            criticalError(e);
+            throw new IllegalStateException(e);
+         }
       }
 
       /* (non-Javadoc)

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
index 96413f7..fd04b6d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
@@ -208,7 +208,7 @@ public class MessageReferenceImpl implements MessageReference {
       }
 
       if (other instanceof MessageReferenceImpl) {
-         MessageReference reference = (MessageReferenceImpl) other;
+         MessageReferenceImpl reference = (MessageReferenceImpl) other;
 
          if (this.getMessage().equals(reference.getMessage()))
             return true;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 18eb0b8..c963e4d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -40,6 +40,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.SimpleString;
@@ -893,7 +894,7 @@ public class QueueImpl implements Queue {
    }
 
    @Override
-   public synchronized MessageReference getReference(final long id1) {
+   public synchronized MessageReference getReference(final long id1) throws ActiveMQException {
       LinkedListIterator<MessageReference> iterator = iterator();
 
       try {
@@ -1053,7 +1054,13 @@ public class QueueImpl implements Queue {
 
    @Override
    public void cancel(final Transaction tx, final MessageReference reference, boolean ignoreRedeliveryCheck) {
-      getRefsOperation(tx, ignoreRedeliveryCheck).addAck(reference);
+      try {
+         getRefsOperation(tx, ignoreRedeliveryCheck).addAck(reference);
+      }
+      catch (ActiveMQException e) {
+         criticalError(e);
+         getPageSubscription().getPagingStore().criticalException(e);
+      }
    }
 
    @Override
@@ -1102,7 +1109,7 @@ public class QueueImpl implements Queue {
    }
 
    @Override
-   public void deliverScheduledMessages() {
+   public void deliverScheduledMessages() throws ActiveMQException {
       List<MessageReference> scheduledMessages = scheduledDeliveryHandler.cancel(null);
       if (scheduledMessages != null && scheduledMessages.size() > 0) {
          for (MessageReference ref : scheduledMessages) {
@@ -1311,7 +1318,7 @@ public class QueueImpl implements Queue {
       Transaction tx = new BindingsTransactionImpl(storageManager);
 
       try {
-         postOffice.removeBinding(name, tx);
+         postOffice.removeBinding(name, tx, true);
 
          deleteAllReferences();
 
@@ -1770,7 +1777,12 @@ public class QueueImpl implements Queue {
 
    private synchronized void internalAddTail(final MessageReference ref) {
       refAdded(ref);
-      messageReferences.addTail(ref, ref.getMessage().getPriority());
+      try {
+         messageReferences.addTail(ref, ref.getMessage().getPriority());
+      }
+      catch (ActiveMQException e) {
+         criticalError(e);
+      }
    }
 
    /**
@@ -1781,9 +1793,18 @@ public class QueueImpl implements Queue {
     * @param ref
     */
    private void internalAddHead(final MessageReference ref) {
-      queueMemorySize.addAndGet(ref.getMessageMemoryEstimate());
-      refAdded(ref);
-      messageReferences.addHead(ref, ref.getMessage().getPriority());
+      try {
+         queueMemorySize.addAndGet(ref.getMessageMemoryEstimate());
+         refAdded(ref);
+         messageReferences.addHead(ref, ref.getMessage().getPriority());
+      }
+      catch (ActiveMQException e) {
+         criticalError(e);
+      }
+   }
+
+   void criticalError(ActiveMQException e) {
+      storageManager.criticalError(e);
    }
 
    private synchronized void doInternalPoll() {
@@ -2011,14 +2032,17 @@ public class QueueImpl implements Queue {
          return null;
       }
       else {
-         // But we don't use the groupID on internal queues (clustered queues) otherwise the group map would leak forever
-         return ref.getMessage().getSimpleStringProperty(Message.HDR_GROUP_ID);
+         try {
+            // But we don't use the groupID on internal queues (clustered queues) otherwise the group map would leak forever
+            return ref.getMessage().getSimpleStringProperty(Message.HDR_GROUP_ID);
+         }
+         catch (ActiveMQException e) {
+            criticalError(e);
+            throw new IllegalStateException(e);
+         }
       }
    }
 
-   /**
-    * @param ref
-    */
    protected void refRemoved(MessageReference ref) {
       queueMemorySize.addAndGet(-ref.getMessageMemoryEstimate());
       if (ref.isPaged()) {
@@ -2026,9 +2050,6 @@ public class QueueImpl implements Queue {
       }
    }
 
-   /**
-    * @param ref
-    */
    protected void refAdded(final MessageReference ref) {
       if (ref.isPaged()) {
          pagedReferences.incrementAndGet();
@@ -2502,23 +2523,29 @@ public class QueueImpl implements Queue {
    }
 
    private boolean checkExpired(final MessageReference reference) {
-      if (reference.getMessage().isExpired()) {
-         if (isTrace) {
-            ActiveMQServerLogger.LOGGER.trace("Reference " + reference + " is expired");
-         }
-         reference.handled();
+      try {
+         if (reference.getMessage().isExpired()) {
+            if (isTrace) {
+               ActiveMQServerLogger.LOGGER.trace("Reference " + reference + " is expired");
+            }
+            reference.handled();
 
-         try {
-            expire(reference);
+            try {
+               expire(reference);
+            }
+            catch (Exception e) {
+               ActiveMQServerLogger.LOGGER.errorExpiringRef(e);
+            }
+
+            return true;
          }
-         catch (Exception e) {
-            ActiveMQServerLogger.LOGGER.errorExpiringRef(e);
+         else {
+            return false;
          }
-
-         return true;
       }
-      else {
-         return false;
+      catch (ActiveMQException e) {
+         criticalError(e);
+         throw new IllegalStateException(e);
       }
    }
 
@@ -2557,7 +2584,7 @@ public class QueueImpl implements Queue {
    }
 
    @Override
-   public void postAcknowledge(final MessageReference ref) {
+   public void postAcknowledge(final MessageReference ref) throws ActiveMQException {
       QueueImpl queue = (QueueImpl) ref.getQueue();
 
       queue.decDelivering();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
index d117186..92d1a61 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
@@ -16,6 +16,13 @@
  */
 package org.apache.activemq.artemis.core.server.impl;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.MessageReference;
@@ -25,12 +32,6 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
 import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
 public class RefsOperation extends TransactionOperationAbstract {
 
    private final StorageManager storageManager;
@@ -55,7 +56,7 @@ public class RefsOperation extends TransactionOperationAbstract {
       ignoreRedeliveryCheck = true;
    }
 
-   synchronized void addAck(final MessageReference ref) {
+   synchronized void addAck(final MessageReference ref) throws ActiveMQException {
       refsToAck.add(ref);
       if (ref.isPaged()) {
          if (pagedMessagesToPostACK == null) {
@@ -147,7 +148,17 @@ public class RefsOperation extends TransactionOperationAbstract {
    public void afterCommit(final Transaction tx) {
       for (MessageReference ref : refsToAck) {
          synchronized (ref.getQueue()) {
-            queue.postAcknowledge(ref);
+            try {
+               queue.postAcknowledge(ref);
+            }
+            catch (ActiveMQException e) {
+               if (queue instanceof QueueImpl) {
+                  ((QueueImpl) queue).criticalError(e);
+               }
+               else {
+                  ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
+               }
+            }
          }
       }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java
index f9ee1ce..6b5e2e2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.MessageReference;
@@ -97,7 +98,7 @@ public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler {
    }
 
    @Override
-   public List<MessageReference> cancel(final Filter filter) {
+   public List<MessageReference> cancel(final Filter filter) throws ActiveMQException {
       List<MessageReference> refs = new ArrayList<>();
 
       synchronized (scheduledReferences) {
@@ -115,7 +116,7 @@ public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler {
    }
 
    @Override
-   public MessageReference removeReferenceWithID(final long id) {
+   public MessageReference removeReferenceWithID(final long id) throws ActiveMQException {
       synchronized (scheduledReferences) {
          Iterator<RefScheduled> iter = scheduledReferences.iterator();
          while (iter.hasNext()) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 4a8b16a..7d54d31 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -286,7 +286,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
          // should go back into the
          // queue for delivery later.
          // TCP-flow control has to be done first than everything else otherwise we may lose notifications
-         if (!callback.isWritable(this) || !started || transferring ) {
+         if (!callback.isWritable(this) || !started || transferring) {
             return HandleStatus.BUSY;
          }
 
@@ -733,25 +733,63 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
       }
    }
 
-   @Override
-   public void individualAcknowledge(final Transaction tx, final long messageID) throws Exception {
+   public void individualAcknowledge(Transaction tx,
+                                     final long messageID) throws Exception {
       if (browseOnly) {
          return;
       }
 
-      MessageReference ref = removeReferenceByID(messageID);
+      boolean startedTransaction = false;
 
-      if (ref == null) {
-         ActiveMQIllegalStateException ils = ActiveMQMessageBundle.BUNDLE.consumerNoReference(id, messageID, messageQueue.getName());
-         if (tx != null) {
-            tx.markAsRollbackOnly(ils);
-         }
-         throw ils;
+      if (tx == null) {
+         startedTransaction = true;
+         tx = new TransactionImpl(storageManager);
       }
 
-      ackReference(tx, ref);
+      try {
+
+         MessageReference ref;
+         ref = removeReferenceByID(messageID);
+
+         if (ActiveMQServerLogger.LOGGER.isTraceEnabled()) {
+            ActiveMQServerLogger.LOGGER.trace("ACKing ref " + ref + " on tx= " + tx + ", consumer=" + this);
+         }
+
+         if (ref == null) {
+            ActiveMQIllegalStateException ils = new ActiveMQIllegalStateException("Cannot find ref to ack " + messageID);
+            if (tx != null) {
+               tx.markAsRollbackOnly(ils);
+            }
+            throw ils;
+         }
+
+         ackReference(tx, ref);
+
+         if (startedTransaction) {
+            tx.commit();
+         }
+      }
+      catch (ActiveMQException e) {
+         if (startedTransaction) {
+            tx.rollback();
+         }
+         else {
+            tx.markAsRollbackOnly(e);
+         }
+         throw e;
+      }
+      catch (Throwable e) {
+         ActiveMQServerLogger.LOGGER.errorAckingMessage((Exception) e);
+         ActiveMQIllegalStateException hqex = new ActiveMQIllegalStateException(e.getMessage());
+         if (startedTransaction) {
+            tx.rollback();
+         }
+         else {
+            tx.markAsRollbackOnly(hqex);
+         }
+         throw hqex;
+      }
 
-      acks++;
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index e21102c..ebe2f8a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -316,6 +316,21 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
    }
 
    @Override
+   public void markTXFailed(Throwable e) {
+      Transaction currentTX = this.tx;
+      if (currentTX != null) {
+         if (e instanceof ActiveMQException) {
+            currentTX.markAsRollbackOnly((ActiveMQException) e);
+         }
+         else {
+            ActiveMQException exception = new ActiveMQException(e.getMessage());
+            exception.initCause(e);
+            currentTX.markAsRollbackOnly(exception);
+         }
+      }
+   }
+
+   @Override
    public boolean removeConsumer(final long consumerID) throws Exception {
       return consumers.remove(consumerID) != null;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index 49dcbe8..3f726f0 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -229,7 +229,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
    }
 
-   private void validateSequence(ScheduledDeliveryHandlerImpl handler) {
+   private void validateSequence(ScheduledDeliveryHandlerImpl handler) throws Exception {
       long lastSequence = -1;
       for (MessageReference ref : handler.getScheduledReferences()) {
          assertEquals(lastSequence + 1, ref.getMessage().getMessageID());
@@ -256,7 +256,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       handler.checkAndSchedule(refImpl, tail);
    }
 
-   private void debugList(boolean fail, ScheduledDeliveryHandlerImpl handler, long numberOfExpectedMessages) {
+   private void debugList(boolean fail, ScheduledDeliveryHandlerImpl handler, long numberOfExpectedMessages) throws Exception {
       List<MessageReference> refs = handler.getScheduledReferences();
 
       HashSet<Long> messages = new HashSet<>();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
index 3909c3c..6c5cfe5 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
@@ -208,6 +208,11 @@ public class TransactionImplTest extends ActiveMQTestBase {
       }
 
       @Override
+      public void criticalError(Throwable error) {
+         error.printStackTrace();
+      }
+
+      @Override
       public OperationContext newContext(Executor executor) {
          return null;
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/DuplicateDetectionTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/DuplicateDetectionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/DuplicateDetectionTest.java
index 05a48e9..5fe8953 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/DuplicateDetectionTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/DuplicateDetectionTest.java
@@ -16,10 +16,16 @@
  */
 package org.apache.activemq.artemis.tests.integration;
 
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
 import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
 import org.apache.activemq.artemis.api.core.client.ClientConsumer;
 import org.apache.activemq.artemis.api.core.client.ClientMessage;
 import org.apache.activemq.artemis.api.core.client.ClientProducer;
@@ -37,10 +43,6 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import javax.transaction.xa.XAException;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-
 public class DuplicateDetectionTest extends ActiveMQTestBase {
 
    private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
@@ -213,6 +215,75 @@ public class DuplicateDetectionTest extends ActiveMQTestBase {
       Assert.assertEquals(0, ((PostOfficeImpl) server.getPostOffice()).getDuplicateIDCaches().size());
    }
 
+   // It is important to test the shrink with this rule
+   // because we could have this after crashes
+   // we would eventually have a higher number of caches while we couldn't have time to clear previous ones
+   @Test
+   public void testShrinkCache() throws Exception {
+      server.stop();
+      server.getConfiguration().setIDCacheSize(150);
+      server.start();
+
+      final int TEST_SIZE = 200;
+
+      ServerLocator locator = ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+
+      locator.setBlockOnNonDurableSend(true);
+
+      ClientSessionFactory sf = createSessionFactory(locator);
+
+      ClientSession session = sf.createSession(false, true, true);
+
+      session.start();
+
+      final SimpleString queueName = new SimpleString("DuplicateDetectionTestQueue");
+      session.createQueue(queueName, queueName, null, true);
+
+      ClientProducer producer = session.createProducer(queueName);
+
+      for (int i = 0; i < TEST_SIZE; i++) {
+         ClientMessage message = session.createMessage(true);
+         message.putStringProperty(Message.HDR_DUPLICATE_DETECTION_ID, SimpleString.toSimpleString("DUPL-" + i));
+         producer.send(message);
+      }
+      session.commit();
+
+      sf.close();
+      session.close();
+      locator.close();
+
+      server.stop();
+
+      server.getConfiguration().setIDCacheSize(100);
+
+      server.start();
+
+      locator = ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+
+      locator.setBlockOnNonDurableSend(true);
+      sf = createSessionFactory(locator);
+      session = sf.createSession(false, false, false);
+      session.start();
+
+      producer = session.createProducer(queueName);
+
+      // will send the last 50 again
+      for (int i = TEST_SIZE - 50; i < TEST_SIZE; i++) {
+         ClientMessage message = session.createMessage(true);
+         message.putStringProperty(Message.HDR_DUPLICATE_DETECTION_ID, SimpleString.toSimpleString("DUPL-" + i));
+         producer.send(message);
+      }
+
+      try {
+         session.commit();
+         Assert.fail("Exception expected");
+      }
+      catch (ActiveMQException expected) {
+
+      }
+
+   }
+
    @Test
    public void testSimpleDuplicateDetectionWithString() throws Exception {
       ClientSession session = sf.createSession(false, true, true);
@@ -1240,176 +1311,6 @@ public class DuplicateDetectionTest extends ActiveMQTestBase {
    }
 
    @Test
-   public void testDuplicateCachePersistedRestartWithSmallerCache() throws Exception {
-      server.stop();
-
-      final int initialCacheSize = 10;
-      final int subsequentCacheSize = 5;
-
-      config = createDefaultInVMConfig().setIDCacheSize(initialCacheSize);
-
-      server = createServer(config);
-
-      server.start();
-
-      sf = createSessionFactory(locator);
-
-      ClientSession session = sf.createSession(false, true, true);
-
-      session.start();
-
-      final SimpleString queueName = new SimpleString("DuplicateDetectionTestQueue");
-
-      session.createQueue(queueName, queueName, null, false);
-
-      ClientProducer producer = session.createProducer(queueName);
-
-      ClientConsumer consumer = session.createConsumer(queueName);
-
-      for (int i = 0; i < initialCacheSize; i++) {
-         ClientMessage message = createMessage(session, i);
-         SimpleString dupID = new SimpleString("abcdefg" + i);
-         message.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
-         producer.send(message);
-         ClientMessage message2 = consumer.receive(1000);
-         Assert.assertEquals(i, message2.getObjectProperty(propKey));
-      }
-
-      session.close();
-
-      sf.close();
-
-      server.stop();
-
-      waitForServerToStop(server);
-
-      config.setIDCacheSize(subsequentCacheSize);
-
-      server = createServer(config);
-
-      server.start();
-
-      sf = createSessionFactory(locator);
-
-      session = sf.createSession(false, true, true);
-
-      session.start();
-
-      session.createQueue(queueName, queueName, null, false);
-
-      producer = session.createProducer(queueName);
-
-      consumer = session.createConsumer(queueName);
-
-      for (int i = 0; i < initialCacheSize; i++) {
-         ClientMessage message = createMessage(session, i);
-         SimpleString dupID = new SimpleString("abcdefg" + i);
-         message.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
-         producer.send(message);
-         if (i >= subsequentCacheSize) {
-            // Message should get through
-            ClientMessage message2 = consumer.receive(1000);
-            Assert.assertEquals(i, message2.getObjectProperty(propKey));
-         }
-         else {
-            ClientMessage message2 = consumer.receiveImmediate();
-            Assert.assertNull(message2);
-         }
-      }
-   }
-
-   @Test
-   public void testDuplicateCachePersistedRestartWithSmallerCacheEnsureDeleted() throws Exception {
-      server.stop();
-
-      final int initialCacheSize = 10;
-      final int subsequentCacheSize = 5;
-
-      config = createDefaultInVMConfig().setIDCacheSize(initialCacheSize);
-
-      server = createServer(config);
-
-      server.start();
-
-      sf = createSessionFactory(locator);
-
-      ClientSession session = sf.createSession(false, true, true);
-
-      session.start();
-
-      final SimpleString queueName = new SimpleString("DuplicateDetectionTestQueue");
-
-      session.createQueue(queueName, queueName, null, false);
-
-      ClientProducer producer = session.createProducer(queueName);
-
-      ClientConsumer consumer = session.createConsumer(queueName);
-
-      for (int i = 0; i < initialCacheSize; i++) {
-         ClientMessage message = createMessage(session, i);
-         SimpleString dupID = new SimpleString("abcdefg" + i);
-         message.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
-         producer.send(message);
-         ClientMessage message2 = consumer.receive(1000);
-         Assert.assertEquals(i, message2.getObjectProperty(propKey));
-      }
-
-      session.close();
-
-      sf.close();
-
-      server.stop();
-
-      waitForServerToStop(server);
-
-      config.setIDCacheSize(subsequentCacheSize);
-
-      server = createServer(config);
-
-      server.start();
-
-      // Now stop and set back to original cache size and restart
-
-      server.stop();
-
-      waitForServerToStop(server);
-
-      config.setIDCacheSize(initialCacheSize);
-
-      server = createServer(config);
-
-      server.start();
-
-      sf = createSessionFactory(locator);
-
-      session = sf.createSession(false, true, true);
-
-      session.start();
-
-      session.createQueue(queueName, queueName, null, false);
-
-      producer = session.createProducer(queueName);
-
-      consumer = session.createConsumer(queueName);
-
-      for (int i = 0; i < initialCacheSize; i++) {
-         ClientMessage message = createMessage(session, i);
-         SimpleString dupID = new SimpleString("abcdefg" + i);
-         message.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
-         producer.send(message);
-         if (i >= subsequentCacheSize) {
-            // Message should get through
-            ClientMessage message2 = consumer.receive(1000);
-            Assert.assertEquals(i, message2.getObjectProperty(propKey));
-         }
-         else {
-            ClientMessage message2 = consumer.receiveImmediate();
-            Assert.assertNull(message2);
-         }
-      }
-   }
-
-   @Test
    public void testNoPersist() throws Exception {
       server.stop();
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
----------------------------------------------------------------------
diff --git a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
index 6351357..6244330 100644
--- a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
+++ b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
@@ -257,6 +257,10 @@ public class PersistMultiThreadTest extends ActiveMQTestBase {
       }
 
       @Override
+      public void criticalException(Throwable e) {
+      }
+
+      @Override
       public int getNumberOfPages() {
          return 0;
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
index 59d2646..5f02cf9 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
@@ -782,6 +782,10 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
 
    static final class FakeStoreFactory implements PagingStoreFactory {
 
+      @Override
+      public void criticalException(Throwable e) {
+      }
+
       final SequentialFileFactory factory;
 
       public FakeStoreFactory() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
index 30e302e..c47041a 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
@@ -106,7 +106,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase {
          DuplicateIDCacheImpl cacheID = new DuplicateIDCacheImpl(ADDRESS, 10, journal, true);
 
          for (int i = 0; i < 100; i++) {
-            cacheID.addToCache(RandomUtil.randomBytes(), null);
+            cacheID.addToCache(RandomUtil.randomBytes());
          }
 
          journal.stop();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
index a7be2fa..81015e4 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
@@ -93,41 +93,47 @@ public class FakeConsumer implements Consumer {
 
    @Override
    public synchronized HandleStatus handle(final MessageReference reference) {
-      if (statusToReturn == HandleStatus.BUSY) {
-         return HandleStatus.BUSY;
-      }
-
-      if (filter != null) {
-         if (filter.match(reference.getMessage())) {
-            references.addLast(reference);
-            reference.getQueue().referenceHandled();
-            notify();
-
-            return HandleStatus.HANDLED;
+      try {
+         if (statusToReturn == HandleStatus.BUSY) {
+            return HandleStatus.BUSY;
          }
-         else {
-            return HandleStatus.NO_MATCH;
+
+         if (filter != null) {
+            if (filter.match(reference.getMessage())) {
+               references.addLast(reference);
+               reference.getQueue().referenceHandled();
+               notify();
+
+               return HandleStatus.HANDLED;
+            }
+            else {
+               return HandleStatus.NO_MATCH;
+            }
          }
-      }
 
-      if (newStatus != null) {
-         if (delayCountdown == 0) {
-            statusToReturn = newStatus;
+         if (newStatus != null) {
+            if (delayCountdown == 0) {
+               statusToReturn = newStatus;
 
-            newStatus = null;
+               newStatus = null;
+            }
+            else {
+               delayCountdown--;
+            }
          }
-         else {
-            delayCountdown--;
+
+         if (statusToReturn == HandleStatus.HANDLED) {
+            reference.getQueue().referenceHandled();
+            references.addLast(reference);
+            notify();
          }
-      }
 
-      if (statusToReturn == HandleStatus.HANDLED) {
-         reference.getQueue().referenceHandled();
-         references.addLast(reference);
-         notify();
+         return statusToReturn;
+      }
+      catch (Exception e) {
+         e.printStackTrace();
+         throw new IllegalStateException(e.getMessage(), e);
       }
-
-      return statusToReturn;
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
index 27d9c33..4f8a007 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
@@ -113,8 +113,7 @@ public class FakePostOffice implements PostOffice {
    }
 
    @Override
-   public Binding removeBinding(final SimpleString uniqueName, final Transaction tx) throws Exception {
-
+   public Binding removeBinding(SimpleString uniqueName, Transaction tx, boolean deleteData) throws Exception {
       return null;
    }
 


[3/4] activemq-artemis git commit: ARTEMIS-332 - Duplicate delivery over Bridges under OME scenarios, paging and other failures

Posted by cl...@apache.org.
ARTEMIS-332 - Duplicate delivery over Bridges under OME scenarios, paging and other failures

https://issues.apache.org/jira/browse/ARTEMIS-332


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/96849a42
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/96849a42
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/96849a42

Branch: refs/heads/master
Commit: 96849a42b756b3ba4f8ab65fff668893c59fc39f
Parents: 34b6635
Author: Clebert Suconic <cl...@apache.org>
Authored: Mon Jan 4 19:49:27 2016 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Jan 4 20:49:28 2016 -0500

----------------------------------------------------------------------
 .../apache/activemq/artemis/utils/ByteUtil.java |  37 ++-
 .../activemq/artemis/utils/ByteUtilTest.java    |   8 +
 .../core/protocol/core/impl/ChannelImpl.java    |   4 +
 .../core/io/IOCriticalErrorListener.java        |   2 +-
 .../openwire/amq/AMQServerConsumer.java         |  37 +--
 .../core/management/impl/QueueControlImpl.java  |   4 +-
 .../artemis/core/paging/PagingStore.java        |   2 +
 .../artemis/core/paging/PagingStoreFactory.java |   2 +
 .../artemis/core/paging/cursor/PageCache.java   |  11 -
 .../core/paging/cursor/PageCursorProvider.java  |   5 +-
 .../core/paging/cursor/PageSubscription.java    |   5 +-
 .../core/paging/cursor/PagedReference.java      |   3 +-
 .../core/paging/cursor/PagedReferenceImpl.java  |  36 ++-
 .../paging/cursor/impl/LivePageCacheImpl.java   |  10 -
 .../core/paging/cursor/impl/PageCacheImpl.java  |  37 +--
 .../cursor/impl/PageCursorProviderImpl.java     |  61 +++--
 .../cursor/impl/PageSubscriptionImpl.java       | 147 +++++++----
 .../core/paging/impl/PagingStoreFactoryNIO.java |   4 +
 .../core/paging/impl/PagingStoreImpl.java       |   6 +
 .../core/persistence/StorageManager.java        |   2 +
 .../impl/journal/JournalStorageManager.java     |  25 +-
 .../impl/nullpm/NullStorageManager.java         |  23 +-
 .../core/postoffice/DuplicateIDCache.java       |   2 +
 .../artemis/core/postoffice/PostOffice.java     |   2 +-
 .../postoffice/impl/DuplicateIDCacheImpl.java   |  93 ++++++-
 .../core/postoffice/impl/PostOfficeImpl.java    |  36 +--
 .../core/ServerSessionPacketHandler.java        |  90 ++++---
 .../artemis/core/server/MessageReference.java   |   4 +-
 .../activemq/artemis/core/server/Queue.java     |   7 +-
 .../core/server/ScheduledDeliveryHandler.java   |   5 +-
 .../artemis/core/server/ServerSession.java      |   2 +
 .../cluster/impl/ClusterConnectionImpl.java     |   2 +-
 .../core/server/impl/ActiveMQServerImpl.java    |  30 +--
 .../core/server/impl/LastValueQueue.java        |  79 ++++--
 .../core/server/impl/MessageReferenceImpl.java  |   2 +-
 .../artemis/core/server/impl/QueueImpl.java     |  87 ++++---
 .../artemis/core/server/impl/RefsOperation.java |  27 +-
 .../impl/ScheduledDeliveryHandlerImpl.java      |   5 +-
 .../core/server/impl/ServerConsumerImpl.java    |  62 ++++-
 .../core/server/impl/ServerSessionImpl.java     |  15 ++
 .../impl/ScheduledDeliveryHandlerTest.java      |   4 +-
 .../transaction/impl/TransactionImplTest.java   |   5 +
 .../integration/DuplicateDetectionTest.java     | 249 ++++++-------------
 .../storage/PersistMultiThreadTest.java         |   4 +
 .../core/paging/impl/PagingStoreImplTest.java   |   4 +
 .../impl/DuplicateDetectionUnitTest.java        |   2 +-
 .../core/server/impl/fakes/FakeConsumer.java    |  60 +++--
 .../core/server/impl/fakes/FakePostOffice.java  |   3 +-
 48 files changed, 788 insertions(+), 564 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java
index b7ff841..50ac3cc 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java
@@ -19,9 +19,12 @@ package org.apache.activemq.artemis.utils;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.UnpooledByteBufAllocator;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.SimpleString;
 
 public class ByteUtil {
 
+   public static final String NON_ASCII_STRING = "@@@@@";
+
    private static final char[] hexArray = "0123456789ABCDEF".toCharArray();
 
    public static String maxString(String value, int size) {
@@ -34,22 +37,30 @@ public class ByteUtil {
    }
 
    public static String bytesToHex(byte[] bytes, int groupSize) {
-      if (bytes == null) {
-         return "null";
+      char[] hexChars = new char[bytes.length * 2 + numberOfGroups(bytes, groupSize)];
+      int outPos = 0;
+      for (int j = 0; j < bytes.length; j++) {
+         if (j > 0 && j % groupSize == 0) {
+            hexChars[outPos++] = ' ';
+         }
+         int v = bytes[j] & 0xFF;
+         hexChars[outPos++] = hexArray[v >>> 4];
+         hexChars[outPos++] = hexArray[v & 0x0F];
       }
-      else {
-         char[] hexChars = new char[bytes.length * 2 + numberOfGroups(bytes, groupSize)];
-         int outPos = 0;
-         for (int j = 0; j < bytes.length; j++) {
-            if (j > 0 && j % groupSize == 0) {
-               hexChars[outPos++] = ' ';
-            }
-            int v = bytes[j] & 0xFF;
-            hexChars[outPos++] = hexArray[v >>> 4];
-            hexChars[outPos++] = hexArray[v & 0x0F];
+      return new String(hexChars);
+   }
+
+   public static String toSimpleString(byte[] bytes) {
+      SimpleString simpleString = new SimpleString(bytes);
+      String value = simpleString.toString();
+
+      for (char c : value.toCharArray()) {
+         if (c < ' ' || c > 127) {
+            return NON_ASCII_STRING;
          }
-         return new String(hexChars);
       }
+
+      return value;
    }
 
    private static int numberOfGroups(byte[] bytes, int groupSize) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ByteUtilTest.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ByteUtilTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ByteUtilTest.java
index ada02f4..feebae1 100644
--- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ByteUtilTest.java
+++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ByteUtilTest.java
@@ -31,6 +31,14 @@ public class ByteUtilTest {
    }
 
    @Test
+   public void testNonASCII() {
+      Assert.assertEquals("aA", ByteUtil.toSimpleString(new byte[]{97, 0, 65, 0}));
+      Assert.assertEquals(ByteUtil.NON_ASCII_STRING, ByteUtil.toSimpleString(new byte[]{0, 97, 0, 65}));
+
+      System.out.println(ByteUtil.toSimpleString(new byte[]{0, 97, 0, 65}));
+   }
+
+   @Test
    public void testMaxString() {
       byte[] byteArray = new byte[20 * 1024];
       System.out.println(ByteUtil.maxString(ByteUtil.bytesToHex(byteArray, 2), 150));

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
index 9e5ccb3..4ef6104 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
@@ -536,6 +536,10 @@ public final class ChannelImpl implements Channel {
       if (resendCache != null && packet.isRequiresConfirmations()) {
          lastConfirmedCommandID.incrementAndGet();
 
+         if (isTrace) {
+            ActiveMQClientLogger.LOGGER.trace("ChannelImpl::confirming packet " + packet + " last commandID=" + lastConfirmedCommandID);
+         }
+
          receivedBytes += packet.getPacketSize();
 
          if (receivedBytes >= confWindowSize) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/IOCriticalErrorListener.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/IOCriticalErrorListener.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/IOCriticalErrorListener.java
index 1a38462..c0e180e 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/IOCriticalErrorListener.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/IOCriticalErrorListener.java
@@ -21,5 +21,5 @@ package org.apache.activemq.artemis.core.io;
  */
 public interface IOCriticalErrorListener {
 
-   void onIOException(Exception code, String message, SequentialFile file);
+   void onIOException(Throwable code, String message, SequentialFile file);
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java
index b0ec7ed..625adcd 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.openwire.amq;
 
 import java.util.List;
 
+import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.postoffice.QueueBinding;
@@ -125,24 +126,30 @@ public class AMQServerConsumer extends ServerConsumerImpl {
    }
 
    public void amqPutBackToDeliveringList(final List<MessageReference> refs) {
-      synchronized (this.deliveringRefs) {
-         for (MessageReference ref : refs) {
-            ref.incrementDeliveryCount();
-            deliveringRefs.add(ref);
-         }
-         //adjust the order. Suppose deliveringRefs has 2 existing
-         //refs m1, m2, and refs has 3 m3, m4, m5
-         //new order must be m3, m4, m5, m1, m2
-         if (refs.size() > 0) {
-            long first = refs.get(0).getMessage().getMessageID();
-            MessageReference m = deliveringRefs.peek();
-            while (m.getMessage().getMessageID() != first) {
-               deliveringRefs.poll();
-               deliveringRefs.add(m);
-               m = deliveringRefs.peek();
+      try {
+         synchronized (this.deliveringRefs) {
+            for (MessageReference ref : refs) {
+               ref.incrementDeliveryCount();
+               deliveringRefs.add(ref);
+            }
+            //adjust the order. Suppose deliveringRefs has 2 existing
+            //refs m1, m2, and refs has 3 m3, m4, m5
+            //new order must be m3, m4, m5, m1, m2
+            if (refs.size() > 0) {
+               long first = refs.get(0).getMessage().getMessageID();
+               MessageReference m = deliveringRefs.peek();
+               while (m.getMessage().getMessageID() != first) {
+                  deliveringRefs.poll();
+                  deliveringRefs.add(m);
+                  m = deliveringRefs.peek();
+               }
             }
          }
       }
+      catch (ActiveMQException e) {
+         // TODO: what to do here?
+         throw new IllegalStateException(e.getMessage(), e);
+      }
    }
 
    public void moveToDeadLetterAddress(long mid, Throwable cause) throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
index b61409e..26d22a0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
@@ -339,7 +339,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
     * @param refs
     * @return
     */
-   private Map<String, Object>[] convertMessagesToMaps(List<MessageReference> refs) {
+   private Map<String, Object>[] convertMessagesToMaps(List<MessageReference> refs) throws ActiveMQException {
       Map<String, Object>[] messages = new Map[refs.size()];
       int i = 0;
       for (MessageReference ref : refs) {
@@ -350,7 +350,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
    }
 
    @Override
-   public Map<String, Map<String, Object>[]> listDeliveringMessages() {
+   public Map<String, Map<String, Object>[]> listDeliveringMessages() throws ActiveMQException {
       checkStarted();
 
       clearIO();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
index e831966..c8808b3 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
@@ -47,6 +47,8 @@ public interface PagingStore extends ActiveMQComponent {
 
    int getNumberOfPages();
 
+   void criticalException(Throwable e);
+
    /**
     * Returns the page id of the current page in which the system is writing files.
     */

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStoreFactory.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStoreFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStoreFactory.java
index 8c2d11a..91907ba 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStoreFactory.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStoreFactory.java
@@ -38,4 +38,6 @@ public interface PagingStoreFactory {
 
    SequentialFileFactory newFileFactory(SimpleString address) throws Exception;
 
+   void criticalException(Throwable e);
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCache.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCache.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCache.java
index 2c82974..bb21b6b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCache.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCache.java
@@ -41,17 +41,6 @@ public interface PageCache extends SoftValueHashMap.ValueCache {
     */
    PagedMessage getMessage(int messageNumber);
 
-   /**
-    * When the cache is being created,
-    * We need to first read the files before other threads can get messages from this.
-    */
-   void lock();
-
-   /**
-    * You have to call this method within the same thread you called lock
-    */
-   void unlock();
-
    void close();
 
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java
index 951b83c..c8404ba 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.core.paging.cursor;
 
+import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.paging.PagedMessage;
 
@@ -24,7 +25,7 @@ import org.apache.activemq.artemis.core.paging.PagedMessage;
  */
 public interface PageCursorProvider {
 
-   PageCache getPageCache(long pageNr);
+   PageCache getPageCache(long pageNr) throws ActiveMQException;
 
    PagedReference newReference(final PagePosition pos, final PagedMessage msg, PageSubscription sub);
 
@@ -38,7 +39,7 @@ public interface PageCursorProvider {
 
    PageSubscription createSubscription(long queueId, Filter filter, boolean durable);
 
-   PagedMessage getMessage(PagePosition pos);
+   PagedMessage getMessage(PagePosition pos) throws ActiveMQException;
 
    void processReload() throws Exception;
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
index df2ccc3..386f21f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.paging.cursor;
 
 import java.util.concurrent.Executor;
 
+import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.core.paging.PagedMessage;
 import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.paging.impl.Page;
@@ -95,7 +96,7 @@ public interface PageSubscription {
 
    void reloadPageCompletion(PagePosition position);
 
-   void reloadPageInfo(long pageNr);
+   void reloadPageInfo(long pageNr) throws ActiveMQException;
 
    /**
     * To be called when the cursor decided to ignore a position.
@@ -147,7 +148,7 @@ public interface PageSubscription {
     * @param pos
     * @return
     */
-   PagedMessage queryMessage(PagePosition pos);
+   PagedMessage queryMessage(PagePosition pos) throws ActiveMQException;
 
    /**
     * @return executor used by the PageSubscription

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReference.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReference.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReference.java
index c1ff089..46041c5 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReference.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReference.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.core.paging.cursor;
 
+import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.core.paging.PagedMessage;
 import org.apache.activemq.artemis.core.server.MessageReference;
 
@@ -23,5 +24,5 @@ public interface PagedReference extends MessageReference {
 
    PagePosition getPosition();
 
-   PagedMessage getPagedMessage();
+   PagedMessage getPagedMessage() throws ActiveMQException;
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
index 05b88d6..964737f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.paging.cursor;
 import java.lang.ref.WeakReference;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.core.paging.PagedMessage;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
@@ -49,12 +50,12 @@ public class PagedReferenceImpl implements PagedReference {
    private boolean alreadyAcked;
 
    @Override
-   public ServerMessage getMessage() {
+   public ServerMessage getMessage() throws ActiveMQException {
       return getPagedMessage().getMessage();
    }
 
    @Override
-   public synchronized PagedMessage getPagedMessage() {
+   public synchronized PagedMessage getPagedMessage() throws ActiveMQException {
       PagedMessage returnMessage = message != null ? message.get() : null;
 
       // We only keep a few references on the Queue from paging...
@@ -107,25 +108,42 @@ public class PagedReferenceImpl implements PagedReference {
    @Override
    public int getMessageMemoryEstimate() {
       if (messageEstimate < 0) {
-         messageEstimate = getMessage().getMemoryEstimate();
+         try {
+            messageEstimate = getMessage().getMemoryEstimate();
+         }
+         catch (ActiveMQException e) {
+            ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
+         }
       }
       return messageEstimate;
    }
 
    @Override
    public MessageReference copy(final Queue queue) {
-      return new PagedReferenceImpl(this.position, this.getPagedMessage(), this.subscription);
+      try {
+         return new PagedReferenceImpl(this.position, this.getPagedMessage(), this.subscription);
+      }
+      catch (ActiveMQException e) {
+         ActiveMQServerLogger.LOGGER.warn(e);
+         return this;
+      }
    }
 
    @Override
    public long getScheduledDeliveryTime() {
       if (deliveryTime == null) {
-         ServerMessage msg = getMessage();
-         if (msg.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME)) {
-            deliveryTime = getMessage().getLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
+         try {
+            ServerMessage msg = getMessage();
+            if (msg.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME)) {
+               deliveryTime = getMessage().getLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
+            }
+            else {
+               deliveryTime = 0L;
+            }
          }
-         else {
-            deliveryTime = 0L;
+         catch (ActiveMQException e) {
+            ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
+            return 0L;
          }
       }
       return deliveryTime;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
index fd88e7a..29d990a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
@@ -68,16 +68,6 @@ public class LivePageCacheImpl implements LivePageCache {
    }
 
    @Override
-   public void lock() {
-      // nothing to be done on live cache
-   }
-
-   @Override
-   public void unlock() {
-      // nothing to be done on live cache
-   }
-
-   @Override
    public synchronized boolean isLive() {
       return isLive;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCacheImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCacheImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCacheImpl.java
index 5efe6d6..8d9b872 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCacheImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCacheImpl.java
@@ -16,9 +16,6 @@
  */
 package org.apache.activemq.artemis.core.paging.cursor.impl;
 
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
 import org.apache.activemq.artemis.core.paging.PagedMessage;
 import org.apache.activemq.artemis.core.paging.cursor.PageCache;
 import org.apache.activemq.artemis.core.paging.impl.Page;
@@ -32,8 +29,6 @@ class PageCacheImpl implements PageCache {
 
    // Attributes ----------------------------------------------------
 
-   private final ReadWriteLock lock = new ReentrantReadWriteLock();
-
    private PagedMessage[] messages;
 
    private final Page page;
@@ -50,17 +45,11 @@ class PageCacheImpl implements PageCache {
 
    @Override
    public PagedMessage getMessage(final int messageNumber) {
-      lock.readLock().lock();
-      try {
-         if (messageNumber < messages.length) {
-            return messages[messageNumber];
-         }
-         else {
-            return null;
-         }
+      if (messageNumber < messages.length) {
+         return messages[messageNumber];
       }
-      finally {
-         lock.readLock().unlock();
+      else {
+         return null;
       }
    }
 
@@ -70,29 +59,13 @@ class PageCacheImpl implements PageCache {
    }
 
    @Override
-   public void lock() {
-      lock.writeLock().lock();
-   }
-
-   @Override
-   public void unlock() {
-      lock.writeLock().unlock();
-   }
-
-   @Override
    public void setMessages(final PagedMessage[] messages) {
       this.messages = messages;
    }
 
    @Override
    public int getNumberOfMessages() {
-      lock.readLock().lock();
-      try {
-         return messages.length;
-      }
-      finally {
-         lock.readLock().unlock();
-      }
+      return messages.length;
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
index f9d2bb5..ef57e1c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
@@ -24,6 +24,8 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.paging.PagedMessage;
 import org.apache.activemq.artemis.core.paging.PagingStore;
@@ -109,7 +111,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
    }
 
    @Override
-   public PagedMessage getMessage(final PagePosition pos) {
+   public PagedMessage getMessage(final PagePosition pos) throws ActiveMQException {
       PageCache cache = getPageCache(pos.getPageNr());
 
       if (cache == null || pos.getMessageNr() >= cache.getNumberOfMessages()) {
@@ -128,10 +130,9 @@ public class PageCursorProviderImpl implements PageCursorProvider {
    }
 
    @Override
-   public PageCache getPageCache(final long pageId) {
+   public PageCache getPageCache(final long pageId) throws ActiveMQException {
       try {
-         boolean needToRead = false;
-         PageCache cache = null;
+         PageCache cache;
          synchronized (softCache) {
             if (pageId > pagingStore.getCurrentWritingPage()) {
                return null;
@@ -144,47 +145,43 @@ public class PageCursorProviderImpl implements PageCursorProvider {
                }
 
                cache = createPageCache(pageId);
-               needToRead = true;
                // anyone reading from this cache will have to wait reading to finish first
                // we also want only one thread reading this cache
-               cache.lock();
                if (isTrace) {
                   ActiveMQServerLogger.LOGGER.trace("adding " + pageId + " into cursor = " + this.pagingStore.getAddress());
                }
+               readPage((int) pageId, cache);
                softCache.put(pageId, cache);
             }
          }
 
-         // Reading is done outside of the synchronized block, however
-         // the page stays locked until the entire reading is finished
-         if (needToRead) {
-            Page page = null;
-            try {
-               page = pagingStore.createPage((int) pageId);
+         return cache;
+      }
+      catch (Throwable e) {
+         throw new ActiveMQIOErrorException("Couldn't complete paging due to an IO Exception on Paging - " + e.getMessage(), e);
+      }
+   }
 
-               storageManager.beforePageRead();
-               page.open();
+   private void readPage(int pageId, PageCache cache) throws Exception {
+      Page page = null;
+      try {
+         page = pagingStore.createPage(pageId);
 
-               List<PagedMessage> pgdMessages = page.read(storageManager);
-               cache.setMessages(pgdMessages.toArray(new PagedMessage[pgdMessages.size()]));
-            }
-            finally {
-               try {
-                  if (page != null) {
-                     page.close();
-                  }
-               }
-               catch (Throwable ignored) {
-               }
-               storageManager.afterPageRead();
-               cache.unlock();
-            }
-         }
+         storageManager.beforePageRead();
+         page.open();
 
-         return cache;
+         List<PagedMessage> pgdMessages = page.read(storageManager);
+         cache.setMessages(pgdMessages.toArray(new PagedMessage[pgdMessages.size()]));
       }
-      catch (Exception e) {
-         throw new RuntimeException("Couldn't complete paging due to an IO Exception on Paging - " + e.getMessage(), e);
+      finally {
+         try {
+            if (page != null) {
+               page.close();
+            }
+         }
+         catch (Throwable ignored) {
+         }
+         storageManager.afterPageRead();
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
index b800c4b..d7a6ded 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
@@ -32,8 +32,9 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.core.filter.Filter;
+import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.paging.PageTransactionInfo;
 import org.apache.activemq.artemis.core.paging.PagedMessage;
 import org.apache.activemq.artemis.core.paging.PagingStore;
@@ -332,7 +333,7 @@ final class PageSubscriptionImpl implements PageSubscription {
       return "PageSubscriptionImpl [cursorId=" + cursorId + ", queue=" + queue + ", filter = " + filter + "]";
    }
 
-   private PagedReference getReference(PagePosition pos) {
+   private PagedReference getReference(PagePosition pos) throws ActiveMQException {
       return cursorProvider.newReference(pos, cursorProvider.getMessage(pos), this);
    }
 
@@ -341,7 +342,7 @@ final class PageSubscriptionImpl implements PageSubscription {
       return new CursorIterator();
    }
 
-   private PagedReference internalGetNext(final PagePosition pos) {
+   private PagedReference internalGetNext(final PagePosition pos) throws ActiveMQException {
       PagePosition retPos = pos.nextMessage();
 
       PageCache cache = cursorProvider.getPageCache(pos.getPageNr());
@@ -470,11 +471,17 @@ final class PageSubscriptionImpl implements PageSubscription {
          public void onError(final int errorCode, final String errorMessage) {
             error = " errorCode=" + errorCode + ", msg=" + errorMessage;
             ActiveMQServerLogger.LOGGER.pageSubscriptionError(this, error);
+            getPagingStore().criticalException(new ActiveMQException(errorMessage));
          }
 
          @Override
          public void done() {
-            processACK(position);
+            try {
+               processACK(position);
+            }
+            catch (ActiveMQException e) {
+               getPagingStore().criticalException(e);
+            }
          }
 
          @Override
@@ -504,7 +511,12 @@ final class PageSubscriptionImpl implements PageSubscription {
 
    @Override
    public void addPendingDelivery(final PagePosition position) {
-      getPageInfo(position).incrementPendingTX();
+      try {
+         getPageInfo(position).incrementPendingTX();
+      }
+      catch (Exception e) {
+         getPagingStore().criticalException(e);
+      }
    }
 
    @Override
@@ -523,13 +535,8 @@ final class PageSubscriptionImpl implements PageSubscription {
    }
 
    @Override
-   public PagedMessage queryMessage(PagePosition pos) {
-      try {
-         return cursorProvider.getMessage(pos);
-      }
-      catch (Exception e) {
-         throw new RuntimeException(e.getMessage(), e);
-      }
+   public PagedMessage queryMessage(PagePosition pos) throws ActiveMQException {
+      return cursorProvider.getMessage(pos);
    }
 
    /**
@@ -547,18 +554,32 @@ final class PageSubscriptionImpl implements PageSubscription {
    @Override
    public void reloadPreparedACK(final Transaction tx, final PagePosition position) {
       deliveredCount.incrementAndGet();
-      installTXCallback(tx, position);
+      try {
+         installTXCallback(tx, position);
+      }
+      catch (Exception e) {
+         getPagingStore().criticalException(e);
+      }
    }
 
    @Override
    public void positionIgnored(final PagePosition position) {
-      processACK(position);
+      try {
+         processACK(position);
+      }
+      catch (Exception e) {
+         getPagingStore().criticalException(e);
+      }
    }
 
-   @Override
    public void lateDeliveryRollback(PagePosition position) {
-      PageCursorInfo cursorInfo = processACK(position);
-      cursorInfo.decrementPendingTX();
+      try {
+         PageCursorInfo cursorInfo = processACK(position);
+         cursorInfo.decrementPendingTX();
+      }
+      catch (ActiveMQException e) {
+         getPagingStore().criticalException(e);
+      }
    }
 
    @Override
@@ -729,15 +750,15 @@ final class PageSubscriptionImpl implements PageSubscription {
    }
 
    @Override
-   public void reloadPageInfo(long pageNr) {
+   public void reloadPageInfo(long pageNr) throws ActiveMQException {
       getPageInfo(pageNr, true);
    }
 
-   private PageCursorInfo getPageInfo(final PagePosition pos) {
+   private PageCursorInfo getPageInfo(final PagePosition pos) throws ActiveMQException {
       return getPageInfo(pos.getPageNr(), true);
    }
 
-   private PageCursorInfo getPageInfo(final long pageNr, boolean create) {
+   private PageCursorInfo getPageInfo(final long pageNr, boolean create) throws ActiveMQException {
       synchronized (consumedPages) {
          PageCursorInfo pageInfo = consumedPages.get(pageNr);
 
@@ -771,7 +792,7 @@ final class PageSubscriptionImpl implements PageSubscription {
 
    // To be called only after the ACK has been processed and guaranteed to be on storage
    // The only exception is on non storage events such as not matching messages
-   private PageCursorInfo processACK(final PagePosition pos) {
+   private PageCursorInfo processACK(final PagePosition pos) throws ActiveMQException {
       if (lastAckedPosition == null || pos.compareTo(lastAckedPosition) > 0) {
          if (isTrace) {
             ActiveMQServerLogger.LOGGER.trace("a new position is being processed as ACK");
@@ -807,7 +828,7 @@ final class PageSubscriptionImpl implements PageSubscription {
     * @param tx
     * @param position
     */
-   private void installTXCallback(final Transaction tx, final PagePosition position) {
+   private void installTXCallback(final Transaction tx, final PagePosition position) throws ActiveMQException {
       if (position.getRecordID() >= 0) {
          // It needs to persist, otherwise the cursor will return to the fist page position
          tx.setContainsPersistent();
@@ -827,7 +848,7 @@ final class PageSubscriptionImpl implements PageSubscription {
 
    }
 
-   private PageTransactionInfo getPageTransaction(final PagedReference reference) {
+   private PageTransactionInfo getPageTransaction(final PagedReference reference) throws ActiveMQException {
       if (reference.getPagedMessage().getTransactionID() >= 0) {
          return pageStore.getPagingManager().getTransaction(reference.getPagedMessage().getTransactionID());
       }
@@ -895,13 +916,24 @@ final class PageSubscriptionImpl implements PageSubscription {
 
       @Override
       public String toString() {
-         return "PageCursorInfo::PageID=" + pageId +
-            " numberOfMessage = " +
-            numberOfMessages +
-            ", confirmed = " +
-            confirmed +
-            ", isDone=" +
-            this.isDone();
+         try {
+            return "PageCursorInfo::PageID=" + pageId +
+               " numberOfMessage = " +
+               numberOfMessages +
+               ", confirmed = " +
+               confirmed +
+               ", isDone=" +
+               this.isDone();
+         }
+         catch (Exception e) {
+            return "PageCursorInfo::PageID=" + pageId +
+               " numberOfMessage = " +
+               numberOfMessages +
+               ", confirmed = " +
+               confirmed +
+               ", isDone=" +
+               e.toString();
+         }
       }
 
       public PageCursorInfo(final long pageId, final int numberOfMessages, final PageCache cache) {
@@ -928,7 +960,13 @@ final class PageSubscriptionImpl implements PageSubscription {
       }
 
       public boolean isDone() {
-         return completePage != null || (getNumberOfMessages() == confirmed.get() && pendingTX.get() == 0);
+         try {
+            return completePage != null || (getNumberOfMessages() == confirmed.get() && pendingTX.get() == 0);
+         }
+         catch (ActiveMQException e) {
+            getPagingStore().criticalException(e);
+            throw new RuntimeException(e.getMessage(), e);
+         }
       }
 
       public boolean isPendingDelete() {
@@ -966,12 +1004,17 @@ final class PageSubscriptionImpl implements PageSubscription {
       public void addACK(final PagePosition posACK) {
 
          if (isTrace) {
-            ActiveMQServerLogger.LOGGER.trace("numberOfMessages =  " + getNumberOfMessages() +
-                                                 " confirmed =  " +
-                                                 (confirmed.get() + 1) +
-                                                 " pendingTX = " + pendingTX +
-                                                 ", page = " +
-                                                 pageId + " posACK = " + posACK);
+            try {
+               ActiveMQServerLogger.LOGGER.trace("numberOfMessages =  " + getNumberOfMessages() +
+                                                    " confirmed =  " +
+                                                    (confirmed.get() + 1) +
+                                                    " pendingTX = " + pendingTX +
+                                                    ", page = " +
+                                                    pageId + " posACK = " + posACK);
+            }
+            catch (Throwable ignored) {
+               ActiveMQServerLogger.LOGGER.debug(ignored.getMessage(), ignored);
+            }
          }
 
          boolean added = internalAddACK(posACK);
@@ -1004,7 +1047,7 @@ final class PageSubscriptionImpl implements PageSubscription {
          }
       }
 
-      private int getNumberOfMessages() {
+      private int getNumberOfMessages() throws ActiveMQException {
          if (wasLive) {
             // if the page was live at any point, we need to
             // get the number of messages from the page-cache
@@ -1023,7 +1066,7 @@ final class PageSubscriptionImpl implements PageSubscription {
 
    }
 
-   private static final class PageCursorTX extends TransactionOperationAbstract {
+   private final class PageCursorTX extends TransactionOperationAbstract {
 
       private final Map<PageSubscriptionImpl, List<PagePosition>> pendingPositions = new HashMap<>();
 
@@ -1046,7 +1089,12 @@ final class PageSubscriptionImpl implements PageSubscription {
             List<PagePosition> positions = entry.getValue();
 
             for (PagePosition confirmed : positions) {
-               cursor.processACK(confirmed);
+               try {
+                  cursor.processACK(confirmed);
+               }
+               catch (ActiveMQException e) {
+                  getPagingStore().criticalException(e);
+               }
                cursor.deliveredCount.decrementAndGet();
             }
 
@@ -1125,13 +1173,13 @@ final class PageSubscriptionImpl implements PageSubscription {
             currentDelivery = moveNext();
             return currentDelivery;
          }
-         catch (RuntimeException e) {
-            e.printStackTrace();
-            throw e;
+         catch (ActiveMQException e) {
+            getPagingStore().criticalException(e);
+            throw new IllegalStateException(e.getMessage(), e);
          }
       }
 
-      private PagedReference moveNext() {
+      private PagedReference moveNext() throws ActiveMQException {
          synchronized (PageSubscriptionImpl.this) {
             boolean match = false;
 
@@ -1261,9 +1309,14 @@ final class PageSubscriptionImpl implements PageSubscription {
          deliveredCount.incrementAndGet();
          PagedReference delivery = currentDelivery;
          if (delivery != null) {
-            PageCursorInfo info = PageSubscriptionImpl.this.getPageInfo(delivery.getPosition());
-            if (info != null) {
-               info.remove(delivery.getPosition());
+            try {
+               PageCursorInfo info = PageSubscriptionImpl.this.getPageInfo(currentDelivery.getPosition());
+               if (info != null) {
+                  info.remove(currentDelivery.getPosition());
+               }
+            }
+            catch (ActiveMQException e) {
+               getPagingStore().criticalException(e);
             }
          }
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java
index 0b0d210..39cd956 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java
@@ -87,6 +87,10 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory {
 
    // Public --------------------------------------------------------
 
+   public void criticalException(Throwable e) {
+      critialErrorListener.onIOException(e, e.getMessage(), null);
+   }
+
    @Override
    public void stop() {
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index 1463b3c..9136c17 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -176,6 +176,12 @@ public class PagingStoreImpl implements PagingStore {
 
    }
 
+   @Override
+   public void criticalException(Throwable e) {
+      ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
+      storeFactory.criticalException(e);
+   }
+
    /**
     * @param addressSettings
     */

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
index 23dff8d..a0a5200 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
@@ -64,6 +64,8 @@ import org.apache.activemq.artemis.utils.IDGenerator;
  */
 public interface StorageManager extends IDGenerator, ActiveMQComponent {
 
+   void criticalError(Throwable error);
+
    /**
     * Get the context associated with the thread for later reuse
     */

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index 77cfd0d..390e742 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -54,22 +54,22 @@ import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.filter.Filter;
-import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.io.SequentialFile;
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
+import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
+import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
+import org.apache.activemq.artemis.core.journal.EncodingSupport;
 import org.apache.activemq.artemis.core.journal.Journal;
 import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
 import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
 import org.apache.activemq.artemis.core.journal.RecordInfo;
-import org.apache.activemq.artemis.core.io.SequentialFile;
-import org.apache.activemq.artemis.core.io.SequentialFileFactory;
 import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
-import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
 import org.apache.activemq.artemis.core.journal.impl.JournalFile;
 import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
-import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
 import org.apache.activemq.artemis.core.message.impl.MessageInternal;
 import org.apache.activemq.artemis.core.paging.PageTransactionInfo;
 import org.apache.activemq.artemis.core.paging.PagedMessage;
@@ -111,11 +111,11 @@ import org.apache.activemq.artemis.core.transaction.TransactionOperation;
 import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
 import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
 import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
+import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
 import org.apache.activemq.artemis.utils.Base64;
 import org.apache.activemq.artemis.utils.ByteUtil;
 import org.apache.activemq.artemis.utils.DataConstants;
 import org.apache.activemq.artemis.utils.ExecutorFactory;
-import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
 import org.apache.activemq.artemis.utils.IDGenerator;
 import org.apache.activemq.artemis.utils.UUID;
 import org.apache.activemq.artemis.utils.XidCodecSupport;
@@ -204,6 +204,8 @@ public class JournalStorageManager implements StorageManager {
 
    private boolean journalLoaded = false;
 
+   private final IOCriticalErrorListener ioCriticalErrorListener;
+
    private final Configuration config;
 
    // Persisted core configuration
@@ -222,6 +224,8 @@ public class JournalStorageManager implements StorageManager {
                                 final IOCriticalErrorListener criticalErrorListener) {
       this.executorFactory = executorFactory;
 
+      this.ioCriticalErrorListener = criticalErrorListener;
+
       this.config = config;
 
       executor = executorFactory.getExecutor();
@@ -276,6 +280,11 @@ public class JournalStorageManager implements StorageManager {
    }
 
    @Override
+   public void criticalError(Throwable error) {
+      ioCriticalErrorListener.onIOException(error, error.getMessage(), null);
+   }
+
+   @Override
    public void clearContext() {
       OperationContextImpl.clearContext();
    }
@@ -3031,7 +3040,7 @@ public class JournalStorageManager implements StorageManager {
                bridgeRepresentation + "]";
          }
          else {
-            return "DuplicateIDEncoding [address=" + address + ", duplID=" + ByteUtil.bytesToHex(duplID, 2) + "]";
+            return "DuplicateIDEncoding [address=" + address + ",str=" + ByteUtil.toSimpleString(duplID) + ", duplID=" + ByteUtil.bytesToHex(duplID, 2) + "]";
          }
       }
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
index 381222f..39c5de5 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
@@ -28,9 +28,10 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.core.journal.Journal;
 import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
-import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.core.message.impl.MessageInternal;
 import org.apache.activemq.artemis.core.paging.PageTransactionInfo;
 import org.apache.activemq.artemis.core.paging.PagedMessage;
@@ -62,6 +63,26 @@ public class NullStorageManager implements StorageManager {
 
    private volatile boolean started;
 
+   private final IOCriticalErrorListener ioCriticalErrorListener;
+
+   public NullStorageManager(IOCriticalErrorListener ioCriticalErrorListener) {
+      this.ioCriticalErrorListener = ioCriticalErrorListener;
+   }
+
+   public NullStorageManager() {
+      this(new IOCriticalErrorListener() {
+         @Override
+         public void onIOException(Throwable code, String message, SequentialFile file) {
+            code.printStackTrace();
+         }
+      });
+   }
+
+   @Override
+   public void criticalError(Throwable error) {
+
+   }
+
    private static final OperationContext dummyContext = new OperationContext() {
 
       @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/DuplicateIDCache.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/DuplicateIDCache.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/DuplicateIDCache.java
index f316c56..26920f8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/DuplicateIDCache.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/DuplicateIDCache.java
@@ -25,6 +25,8 @@ public interface DuplicateIDCache {
 
    boolean contains(byte[] duplicateID);
 
+   boolean atomicVerify(final byte[] duplID, final Transaction tx) throws Exception;
+
    void addToCache(byte[] duplicateID) throws Exception;
 
    void addToCache(byte[] duplicateID, Transaction tx) throws Exception;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
index e6bb837..d07ea5b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
@@ -44,7 +44,7 @@ public interface PostOffice extends ActiveMQComponent {
 
    void addBinding(Binding binding) throws Exception;
 
-   Binding removeBinding(SimpleString uniqueName, Transaction tx) throws Exception;
+   Binding removeBinding(SimpleString uniqueName, Transaction tx, boolean deleteData) throws Exception;
 
    /**
     * It will lookup the Binding without creating an item on the Queue if non-existent

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java
index 7059671..cfeeb7b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java
@@ -21,6 +21,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException;
 import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
@@ -29,6 +30,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
+import org.apache.activemq.artemis.utils.ByteUtil;
 
 /**
  * A DuplicateIDCacheImpl
@@ -37,6 +39,8 @@ import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract
  */
 public class DuplicateIDCacheImpl implements DuplicateIDCache {
 
+   private final boolean isTrace = ActiveMQServerLogger.LOGGER.isTraceEnabled();
+
    // ByteHolder, position
    private final Map<ByteArrayHolder, Integer> cache = new ConcurrentHashMap<>();
 
@@ -71,12 +75,27 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache {
 
    @Override
    public void load(final List<Pair<byte[], Long>> theIds) throws Exception {
-      int count = 0;
-
       long txID = -1;
 
+      // If we have more IDs than cache size, we shrink the first ones
+      int deleteCount = theIds.size() - cacheSize;
+      if (deleteCount < 0) {
+         deleteCount = 0;
+      }
+
       for (Pair<byte[], Long> id : theIds) {
-         if (count < cacheSize) {
+         if (deleteCount > 0) {
+            if (txID == -1) {
+               txID = storageManager.generateID();
+            }
+            if (isTrace) {
+               ActiveMQServerLogger.LOGGER.trace("DuplicateIDCacheImpl::load deleting id=" + describeID(id.getA(), id.getB()));
+            }
+
+            storageManager.deleteDuplicateIDTransactional(txID, id.getB());
+            deleteCount--;
+         }
+         else {
             ByteArrayHolder bah = new ByteArrayHolder(id.getA());
 
             Pair<ByteArrayHolder, Long> pair = new Pair<>(bah, id.getB());
@@ -84,17 +103,11 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache {
             cache.put(bah, ids.size());
 
             ids.add(pair);
-         }
-         else {
-            // cache size has been reduced in config - delete the extra records
-            if (txID == -1) {
-               txID = storageManager.generateID();
+            if (isTrace) {
+               ActiveMQServerLogger.LOGGER.trace("DuplicateIDCacheImpl::load loading id=" + describeID(id.getA(), id.getB()));
             }
-
-            storageManager.deleteDuplicateIDTransactional(txID, id.getB());
          }
 
-         count++;
       }
 
       if (txID != -1) {
@@ -111,6 +124,10 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache {
 
    @Override
    public void deleteFromCache(byte[] duplicateID) throws Exception {
+      if (isTrace) {
+         ActiveMQServerLogger.LOGGER.trace("DuplicateIDCacheImpl::deleteFromCache deleting id=" + describeID(duplicateID, 0));
+      }
+
       ByteArrayHolder bah = new ByteArrayHolder(duplicateID);
 
       Integer posUsed = cache.remove(bah);
@@ -124,6 +141,9 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache {
             if (id.getA().equals(bah)) {
                id.setA(null);
                storageManager.deleteDuplicateID(id.getB());
+               if (isTrace) {
+                  ActiveMQServerLogger.LOGGER.trace("DuplicateIDCacheImpl(" + this.address + ")::deleteFromCache deleting id=" + describeID(duplicateID, id.getB()));
+               }
                id.setB(null);
             }
          }
@@ -131,9 +151,23 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache {
 
    }
 
+   private String describeID(byte[] duplicateID, long id) {
+      if (id != 0) {
+         return ByteUtil.bytesToHex(duplicateID, 4) + ", simpleString=" + ByteUtil.toSimpleString(duplicateID);
+      }
+      else {
+         return ByteUtil.bytesToHex(duplicateID, 4) + ", simpleString=" + ByteUtil.toSimpleString(duplicateID) + ", id=" + id;
+      }
+   }
+
    @Override
    public boolean contains(final byte[] duplID) {
-      return cache.get(new ByteArrayHolder(duplID)) != null;
+      boolean contains = cache.get(new ByteArrayHolder(duplID)) != null;
+
+      if (contains) {
+         ActiveMQServerLogger.LOGGER.trace("DuplicateIDCacheImpl(" + this.address + ")::constains found a duplicate " + describeID(duplID, 0));
+      }
+      return contains;
    }
 
    @Override
@@ -147,6 +181,21 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache {
    }
 
    @Override
+   public synchronized boolean atomicVerify(final byte[] duplID, final Transaction tx) throws Exception {
+
+      if (contains(duplID)) {
+         if (tx != null) {
+            tx.markAsRollbackOnly(new ActiveMQDuplicateIdException());
+         }
+         return false;
+      }
+      else {
+         addToCache(duplID, tx, true);
+         return true;
+      }
+
+   }
+
    public synchronized void addToCache(final byte[] duplID, final Transaction tx, boolean instantAdd) throws Exception {
       long recordID = -1;
 
@@ -170,6 +219,9 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache {
             addToCacheInMemory(duplID, recordID);
          }
          else {
+            if (isTrace) {
+               ActiveMQServerLogger.LOGGER.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCache Adding duplicateID TX operation for " + describeID(duplID, recordID));
+            }
             // For a tx, it's important that the entry is not added to the cache until commit
             // since if the client fails then resends them tx we don't want it to get rejected
             tx.addOperation(new AddDuplicateIDOperation(duplID, recordID));
@@ -183,6 +235,10 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache {
    }
 
    private synchronized void addToCacheInMemory(final byte[] duplID, final long recordID) {
+      if (isTrace) {
+         ActiveMQServerLogger.LOGGER.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCacheInMemory Adding " + describeID(duplID, recordID));
+      }
+
       ByteArrayHolder holder = new ByteArrayHolder(duplID);
 
       cache.put(holder, pos);
@@ -195,6 +251,10 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache {
 
          // The id here might be null if it was explicit deleted
          if (id.getA() != null) {
+            if (isTrace) {
+               ActiveMQServerLogger.LOGGER.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCacheInMemory removing excess duplicateDetection " + describeID(id.getA().bytes, id.getB()));
+            }
+
             cache.remove(id.getA());
 
             // Record already exists - we delete the old one and add the new one
@@ -217,11 +277,19 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache {
          // -1 would mean null on this case
          id.setB(recordID >= 0 ? recordID : null);
 
+         if (isTrace) {
+            ActiveMQServerLogger.LOGGER.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCacheInMemory replacing old duplicateID by " + describeID(id.getA().bytes, id.getB()));
+         }
+
          holder.pos = pos;
       }
       else {
          id = new Pair<>(holder, recordID >= 0 ? recordID : null);
 
+         if (isTrace) {
+            ActiveMQServerLogger.LOGGER.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCacheInMemory Adding new duplicateID " + describeID(id.getA().bytes, id.getB()));
+         }
+
          ids.add(id);
 
          holder.pos = pos;
@@ -234,6 +302,7 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache {
 
    @Override
    public void clear() throws Exception {
+      ActiveMQServerLogger.LOGGER.debug("DuplicateIDCacheImpl(" + this.address + ")::clear removing duplicate ID data");
       synchronized (this) {
          if (ids.size() > 0) {
             long tx = storageManager.generateID();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 4b25023..659c612 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -41,8 +41,8 @@ import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
 import org.apache.activemq.artemis.api.core.management.ManagementHelper;
 import org.apache.activemq.artemis.api.core.management.NotificationType;
-import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.filter.Filter;
+import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.message.impl.MessageImpl;
 import org.apache.activemq.artemis.core.paging.PagingManager;
 import org.apache.activemq.artemis.core.paging.PagingStore;
@@ -463,7 +463,9 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
    }
 
    @Override
-   public synchronized Binding removeBinding(final SimpleString uniqueName, Transaction tx) throws Exception {
+   public synchronized Binding removeBinding(final SimpleString uniqueName,
+                                             Transaction tx,
+                                             boolean deleteData) throws Exception {
 
       addressSettingsRepository.clearCache();
 
@@ -473,7 +475,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
          throw new ActiveMQNonExistentQueueException();
       }
 
-      if (addressManager.getBindingsForRoutingAddress(binding.getAddress()) == null) {
+      if (deleteData && addressManager.getBindingsForRoutingAddress(binding.getAddress()) == null) {
          pagingManager.deletePageStore(binding.getAddress());
 
          managementService.unregisterAddress(binding.getAddress());
@@ -1159,31 +1161,19 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
 
          DuplicateIDCache cacheBridge = getDuplicateIDCache(BRIDGE_CACHE_STR.concat(message.getAddress()));
 
-         if (cacheBridge.contains(bridgeDupBytes)) {
-            ActiveMQServerLogger.LOGGER.duplicateMessageDetectedThruBridge(message);
-
-            if (context.getTransaction() != null) {
-               context.getTransaction().markAsRollbackOnly(new ActiveMQDuplicateIdException());
-            }
+         if (context.getTransaction() == null) {
+            context.setTransaction(new TransactionImpl(storageManager));
+            startedTX.set(true);
+         }
 
+         if (!cacheBridge.atomicVerify(bridgeDupBytes, context.getTransaction())) {
+            context.getTransaction().rollback();
+            startedTX.set(false);
             message.decrementRefCount();
-
             return false;
          }
-         else {
-            if (context.getTransaction() == null) {
-               context.setTransaction(new TransactionImpl(storageManager));
-               startedTX.set(true);
-            }
-         }
-
-         // on the bridge case there is a case where the bridge reconnects
-         // and the send hasn't finished yet (think of CPU outages).
-         // for that reason we add the cache right away
-         cacheBridge.addToCache(bridgeDupBytes, context.getTransaction(), true);
 
          message.removeProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID);
-
       }
       else {
          // if used BridgeDuplicate, it's not going to use the regular duplicate
@@ -1222,7 +1212,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
                startedTX.set(true);
             }
 
-            cache.addToCache(duplicateIDBytes, context.getTransaction());
+            cache.addToCache(duplicateIDBytes, context.getTransaction(), false);
          }
       }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index 78fd83f..3285bc1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -16,55 +16,21 @@
  */
 package org.apache.activemq.artemis.core.protocol.core;
 
-import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_QUEUE;
-import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_SHARED_QUEUE;
-import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.DELETE_QUEUE;
-import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_ACKNOWLEDGE;
-import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY;
-import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_CLOSE;
-import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_COMMIT;
-import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_CONSUMER_CLOSE;
-import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_CREATECONSUMER;
-import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_EXPIRED;
-import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_FLOWTOKEN;
-import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_FORCE_CONSUMER_DELIVERY;
-import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_INDIVIDUAL_ACKNOWLEDGE;
-import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_QUEUEQUERY;
-import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_ROLLBACK;
-import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_SEND;
-import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_SEND_CONTINUATION;
-import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_SEND_LARGE;
-import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_START;
-import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_STOP;
-import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_COMMIT;
-import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_END;
-import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_FORGET;
-import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_GET_TIMEOUT;
-import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_INDOUBT_XIDS;
-import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_JOIN;
-import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_PREPARE;
-import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_RESUME;
-import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_ROLLBACK;
-import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_SET_TIMEOUT;
-import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_START;
-import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_FAILED;
-import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_SUSPEND;
-
-import java.util.List;
-
 import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
+import java.util.List;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
+import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
 import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException;
-import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.exception.ActiveMQXAException;
+import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage;
-import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.RollbackMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage;
@@ -105,14 +71,48 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAS
 import org.apache.activemq.artemis.core.remoting.CloseListener;
 import org.apache.activemq.artemis.core.remoting.FailureListener;
 import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
+import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.BindingQueryResult;
-import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
 import org.apache.activemq.artemis.core.server.QueueQueryResult;
 import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.ServerSession;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
 
+import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_QUEUE;
+import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_SHARED_QUEUE;
+import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.DELETE_QUEUE;
+import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_ACKNOWLEDGE;
+import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY;
+import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_CLOSE;
+import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_COMMIT;
+import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_CONSUMER_CLOSE;
+import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_CREATECONSUMER;
+import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_EXPIRED;
+import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_FLOWTOKEN;
+import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_FORCE_CONSUMER_DELIVERY;
+import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_INDIVIDUAL_ACKNOWLEDGE;
+import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_QUEUEQUERY;
+import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_ROLLBACK;
+import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_SEND;
+import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_SEND_CONTINUATION;
+import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_SEND_LARGE;
+import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_START;
+import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_STOP;
+import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_COMMIT;
+import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_END;
+import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_FAILED;
+import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_FORGET;
+import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_GET_TIMEOUT;
+import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_INDOUBT_XIDS;
+import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_JOIN;
+import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_PREPARE;
+import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_RESUME;
+import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_ROLLBACK;
+import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_SET_TIMEOUT;
+import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_START;
+import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_SUSPEND;
+
 public class ServerSessionPacketHandler implements ChannelHandler {
 
    private final boolean isTrace = ActiveMQServerLogger.LOGGER.isTraceEnabled();
@@ -483,6 +483,16 @@ public class ServerSessionPacketHandler implements ChannelHandler {
                }
             }
          }
+         catch (ActiveMQIOErrorException e) {
+            getSession().markTXFailed(e);
+            if (requiresResponse) {
+               ActiveMQServerLogger.LOGGER.debug("Sending exception to client", e);
+               response = new ActiveMQExceptionMessage(e);
+            }
+            else {
+               ActiveMQServerLogger.LOGGER.caughtException(e);
+            }
+         }
          catch (ActiveMQXAException e) {
             if (requiresResponse) {
                ActiveMQServerLogger.LOGGER.debug("Sending exception to client", e);
@@ -507,6 +517,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
             }
          }
          catch (Throwable t) {
+            getSession().markTXFailed(t);
             if (requiresResponse) {
                ActiveMQServerLogger.LOGGER.warn("Sending unexpected exception to the client", t);
                ActiveMQException activeMQInternalErrorException = new ActiveMQInternalErrorException();
@@ -611,7 +622,6 @@ public class ServerSessionPacketHandler implements ChannelHandler {
 
       newConnection.syncIDGeneratorSequence(remotingConnection.getIDGeneratorSequence());
 
-
       Connection oldTransportConnection = remotingConnection.getTransportConnection();
 
       remotingConnection = newConnection;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
index 0ff55ac..95b30b2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.artemis.core.server;
 
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+
 /**
  * A reference to a message.
  *
@@ -25,7 +27,7 @@ public interface MessageReference {
 
    boolean isPaged();
 
-   ServerMessage getMessage();
+   ServerMessage getMessage() throws ActiveMQException;
 
    /**
     * We define this method aggregation here because on paging we need to hold the original estimate,

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index f5a19a8..9ea60cd 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -21,6 +21,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Executor;
 
+import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
@@ -122,7 +123,7 @@ public interface Queue extends Bindable {
 
    MessageReference removeReferenceWithID(long id) throws Exception;
 
-   MessageReference getReference(long id);
+   MessageReference getReference(long id) throws ActiveMQException;
 
    int deleteAllReferences() throws Exception;
 
@@ -236,9 +237,9 @@ public interface Queue extends Bindable {
    /**
     * cancels scheduled messages and send them to the head of the queue.
     */
-   void deliverScheduledMessages();
+   void deliverScheduledMessages() throws ActiveMQException;
 
-   void postAcknowledge(MessageReference ref);
+   void postAcknowledge(MessageReference ref) throws ActiveMQException;
 
    float getRate();
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ScheduledDeliveryHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ScheduledDeliveryHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ScheduledDeliveryHandler.java
index 5afb052..8260507 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ScheduledDeliveryHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ScheduledDeliveryHandler.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.core.server;
 
+import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.core.filter.Filter;
 
 import java.util.List;
@@ -28,7 +29,7 @@ public interface ScheduledDeliveryHandler {
 
    List<MessageReference> getScheduledReferences();
 
-   List<MessageReference> cancel(Filter filter);
+   List<MessageReference> cancel(Filter filter) throws ActiveMQException;
 
-   MessageReference removeReferenceWithID(long id);
+   MessageReference removeReferenceWithID(long id) throws ActiveMQException;
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
index 6026887..62bb3b5 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
@@ -73,6 +73,8 @@ public interface ServerSession extends SecurityAuth {
 
    void xaSuspend() throws Exception;
 
+   void markTXFailed(Throwable e);
+
    QueueCreator getQueueCreator();
 
    List<Xid> xaGetInDoubtXids();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
index 9624ab9..64f99eb 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
@@ -1244,7 +1244,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
             throw new IllegalStateException("Cannot find binding for queue " + clusterName);
          }
 
-         postOffice.removeBinding(binding.getUniqueName(), null);
+         postOffice.removeBinding(binding.getUniqueName(), null, false);
       }
 
       private synchronized void resetBinding(final SimpleString clusterName) throws Exception {