You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2016/10/24 13:35:09 UTC

[03/17] activemq-artemis git commit: ARTEMIS-753 Queue Pause and Resumed persisted

ARTEMIS-753 Queue Pause and Resumed persisted


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/a074f9f1
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/a074f9f1
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/a074f9f1

Branch: refs/heads/ARTEMIS-780
Commit: a074f9f1a5f0541a83dd2500427022eb6e63c8f9
Parents: 0f9efa9
Author: Clebert Suconic <cl...@apache.org>
Authored: Fri Oct 21 14:59:52 2016 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Oct 21 20:26:23 2016 -0400

----------------------------------------------------------------------
 .../api/core/management/QueueControl.java       |  8 ++-
 .../api/jms/management/JMSQueueControl.java     |  6 ++
 .../management/impl/JMSQueueControlImpl.java    |  5 ++
 .../core/management/impl/QueueControlImpl.java  | 12 ++++
 .../core/persistence/QueueBindingInfo.java      |  7 ++
 .../artemis/core/persistence/QueueStatus.java   | 46 ++++++++++++
 .../core/persistence/StorageManager.java        | 11 +++
 .../journal/AbstractJournalStorageManager.java  | 66 ++++++++++++++++-
 .../impl/journal/DescribeJournal.java           |  4 ++
 .../impl/journal/JournalRecordIds.java          |  2 +
 .../codec/PersistentQueueBindingEncoding.java   | 18 +++++
 .../impl/journal/codec/QueueEncoding.java       |  3 +-
 .../impl/journal/codec/QueueStatusEncoding.java | 75 ++++++++++++++++++++
 .../impl/nullpm/NullStorageManager.java         | 11 +++
 .../activemq/artemis/core/server/Queue.java     | 15 ++++
 .../server/impl/PostOfficeJournalLoader.java    | 13 +++-
 .../artemis/core/server/impl/QueueImpl.java     | 41 +++++++++++
 .../impl/ScheduledDeliveryHandlerTest.java      | 13 ++++
 .../transaction/impl/TransactionImplTest.java   | 11 +++
 .../management/JMSQueueControlUsingJMSTest.java |  5 ++
 .../management/QueueControlUsingCoreTest.java   |  5 ++
 .../server/QueuePeristPauseTest.java            | 60 ++++++++++++++++
 .../unit/core/postoffice/impl/FakeQueue.java    | 15 ++++
 23 files changed, 448 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a074f9f1/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
index 0a74d1c..3336aae 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
@@ -411,9 +411,15 @@ public interface QueueControl {
    void pause() throws Exception;
 
    /**
+    * Pauses the queue. Messages are no longer delivered to its consumers.
+    */
+   @Operation(desc = "Pauses the Queue", impact = MBeanOperationInfo.ACTION)
+   void pause(@Parameter(name = "persist", desc = "if true, the pause state will be persisted.") boolean persist) throws Exception;
+
+   /**
     * Resumes the queue. Messages are again delivered to its consumers.
     */
-   @Operation(desc = "Resumes delivery of queued messages and gets the queue out of paused state.", impact = MBeanOperationInfo.ACTION)
+   @Operation(desc = "Resumes delivery of queued messages and gets the queue out of paused state. It will also affected the state of a persisted pause.", impact = MBeanOperationInfo.ACTION)
    void resume() throws Exception;
 
    @Operation(desc = "List all the existent consumers on the Queue")

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a074f9f1/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSQueueControl.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSQueueControl.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSQueueControl.java
index 56a127c..3a4101a 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSQueueControl.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSQueueControl.java
@@ -370,6 +370,12 @@ public interface JMSQueueControl extends DestinationControl {
    void pause() throws Exception;
 
    /**
+    * Pauses the queue. Messages are no longer delivered to its consumers.
+    */
+   @Operation(desc = "Pauses the Queue", impact = MBeanOperationInfo.ACTION)
+   void pause(@Parameter(name = "persist", desc = "if true, the pause state will be persisted.") boolean persist) throws Exception;
+
+   /**
     * Returns whether the queue is paused.
     */
    @Attribute(desc = "Returns true if the queue is paused.")

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a074f9f1/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java
index a836146..36cba96 100644
--- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java
+++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java
@@ -474,6 +474,11 @@ public class JMSQueueControlImpl extends StandardMBean implements JMSQueueContro
    }
 
    @Override
+   public void pause(boolean persist) throws Exception {
+      coreQueueControl.pause(persist);
+   }
+
+   @Override
    public void resume() throws Exception {
       coreQueueControl.resume();
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a074f9f1/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
index 7275ea4..cfa8aa5 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
@@ -848,6 +848,18 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
       }
    }
 
+
+   @Override
+   public void pause(boolean persist) {
+      checkStarted();
+
+      clearIO();
+      try {
+         queue.pause(persist);
+      } finally {
+         blockOnIO();
+      }
+   }
    @Override
    public void resume() {
       checkStarted();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a074f9f1/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java
index c05b86c..8c80a8a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java
@@ -16,7 +16,10 @@
  */
 package org.apache.activemq.artemis.core.persistence;
 
+import java.util.List;
+
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.QueueStatusEncoding;
 
 public interface QueueBindingInfo {
 
@@ -39,4 +42,8 @@ public interface QueueBindingInfo {
 
    SimpleString getUser();
 
+   void addQueueStatusEncoding(QueueStatusEncoding status);
+
+   List<QueueStatusEncoding> getQueueStatusEncodings();
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a074f9f1/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueStatus.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueStatus.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueStatus.java
new file mode 100644
index 0000000..18fa9b9
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueStatus.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.persistence;
+
+public enum QueueStatus {
+   PAUSED((short) 0), RUNNING((short) 1);
+
+   public final short id;
+
+   QueueStatus(short id) {
+      this.id = id;
+   }
+
+   public static QueueStatus[] values;
+
+   static {
+      QueueStatus[] allValues = QueueStatus.values();
+      values = new QueueStatus[allValues.length];
+      for (QueueStatus v : allValues) {
+         values[v.id] = v;
+      }
+   }
+
+   public static QueueStatus fromID(short id) {
+      if (id < 0 || id > values.length) {
+         return null;
+      } else {
+         return values[id];
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a074f9f1/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
index e820664..bbfec14 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
@@ -287,6 +287,17 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
 
    void deleteQueueBinding(long tx, long queueBindingID) throws Exception;
 
+   /**
+    *
+    * @param queueID The id of the queue
+    * @param status The current status of the queue. (Reserved for future use, ATM we only use this record for PAUSED)
+    * @return the id of the journal
+    * @throws Exception
+    */
+   long storeQueueStatus(long queueID, QueueStatus status) throws Exception;
+
+   void deleteQueueStatus(long recordID) throws Exception;
+
    JournalLoadInformation loadBindingJournal(List<QueueBindingInfo> queueBindingInfos,
                                              List<GroupingInfo> groupingInfos) throws Exception;
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a074f9f1/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index cd86191..a6938d6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -63,6 +63,7 @@ import org.apache.activemq.artemis.core.paging.impl.PageTransactionInfoImpl;
 import org.apache.activemq.artemis.core.persistence.GroupingInfo;
 import org.apache.activemq.artemis.core.persistence.OperationContext;
 import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
+import org.apache.activemq.artemis.core.persistence.QueueStatus;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
 import org.apache.activemq.artemis.core.persistence.config.PersistedRoles;
@@ -81,6 +82,7 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageCount
 import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageUpdateTXEncoding;
 import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PendingLargeMessageEncoding;
 import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PersistentQueueBindingEncoding;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.QueueStatusEncoding;
 import org.apache.activemq.artemis.core.persistence.impl.journal.codec.RefEncoding;
 import org.apache.activemq.artemis.core.persistence.impl.journal.codec.ScheduledDeliveryEncoding;
 import org.apache.activemq.artemis.core.persistence.impl.journal.codec.XidEncoding;
@@ -105,6 +107,7 @@ import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
 import org.apache.activemq.artemis.utils.Base64;
 import org.apache.activemq.artemis.utils.ExecutorFactory;
 import org.apache.activemq.artemis.utils.IDGenerator;
+import org.jboss.logging.Logger;
 
 import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ACKNOWLEDGE_CURSOR;
 import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE_PENDING;
@@ -122,6 +125,8 @@ import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalR
  */
 public abstract class AbstractJournalStorageManager implements StorageManager {
 
+   private static final Logger logger = Logger.getLogger(AbstractJournalStorageManager.class);
+
    public enum JournalContent {
       BINDINGS((byte) 0), MESSAGES((byte) 1);
 
@@ -1237,6 +1242,32 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
    }
 
    @Override
+   public long storeQueueStatus(long queueID, QueueStatus status) throws Exception {
+      long recordID = idGenerator.generateID();
+
+      readLock();
+      try {
+         bindingsJournal.appendAddRecord(recordID, JournalRecordIds.QUEUE_STATUS_RECORD, new QueueStatusEncoding(queueID, status), true);
+      } finally {
+         readUnLock();
+      }
+
+
+      return recordID;
+   }
+
+   @Override
+   public void deleteQueueStatus(long recordID) throws Exception {
+      readLock();
+      try {
+         bindingsJournal.appendDeleteRecord(recordID, true);
+      } finally {
+         readUnLock();
+      }
+
+   }
+
+   @Override
    public long storePageCounterInc(long txID, long queueID, int value) throws Exception {
       readLock();
       try {
@@ -1326,6 +1357,8 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
 
       JournalLoadInformation bindingsInfo = bindingsJournal.load(records, preparedTransactions, null);
 
+      HashMap<Long, PersistentQueueBindingEncoding> mapBindings = new HashMap<>();
+
       for (RecordInfo record : records) {
          long id = record.id;
 
@@ -1337,6 +1370,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
             PersistentQueueBindingEncoding bindingEncoding = newBindingEncoding(id, buffer);
 
             queueBindingInfos.add(bindingEncoding);
+            mapBindings.put(bindingEncoding.getId(), bindingEncoding);
          } else if (rec == JournalRecordIds.ID_COUNTER_RECORD) {
             idGenerator.loadState(record.id, buffer);
          } else if (rec == JournalRecordIds.GROUP_RECORD) {
@@ -1348,11 +1382,24 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
          } else if (rec == JournalRecordIds.SECURITY_RECORD) {
             PersistedRoles roles = newSecurityRecord(id, buffer);
             mapPersistedRoles.put(roles.getAddressMatch(), roles);
+         } else if (rec == JournalRecordIds.QUEUE_STATUS_RECORD) {
+            QueueStatusEncoding statusEncoding = newQueueStatusEncoding(id, buffer);
+            PersistentQueueBindingEncoding queueBindingEncoding = mapBindings.get(statusEncoding.queueID);
+            if (queueBindingEncoding != null) {
+               queueBindingEncoding.addQueueStatusEncoding(statusEncoding);
+            } else {
+               // unlikely to happen, so I didn't bother about the Logger method
+               logger.info("There is no queue with ID " + statusEncoding.queueID + ", deleting record " + statusEncoding.getId());
+               this.deleteQueueStatus(statusEncoding.getId());
+            }
          } else {
-            throw new IllegalStateException("Invalid record type " + rec);
+            // unlikely to happen
+            logger.warn("Invalid record type " + rec, new Exception("invalid record type " + rec));
          }
       }
 
+      mapBindings.clear(); // just to give a hand to GC
+
       // This will instruct the IDGenerator to beforeStop old records
       idGenerator.cleanup();
 
@@ -1821,6 +1868,23 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
       return bindingEncoding;
    }
 
+   /**
+    * @param id
+    * @param buffer
+    * @return
+    */
+   protected static QueueStatusEncoding newQueueStatusEncoding(long id, ActiveMQBuffer buffer) {
+      QueueStatusEncoding statusEncoding = new QueueStatusEncoding();
+
+      statusEncoding.decode(buffer);
+      statusEncoding.setId(id);
+
+      return statusEncoding;
+   }
+
+
+
+
    @Override
    public boolean addToPage(PagingStore store,
                             ServerMessage msg,

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a074f9f1/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java
index 34a47bf..58723c6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java
@@ -74,6 +74,7 @@ import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalR
 import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_PENDING_COUNTER;
 import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_TRANSACTION;
 import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.QUEUE_BINDING_RECORD;
+import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.QUEUE_STATUS_RECORD;
 import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.SECURITY_RECORD;
 import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME;
 import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.UPDATE_DELIVERY_COUNT;
@@ -550,6 +551,9 @@ public final class DescribeJournal {
             return encoding;
          }
 
+         case QUEUE_STATUS_RECORD:
+            return AbstractJournalStorageManager.newQueueStatusEncoding(id, buffer);
+
          case QUEUE_BINDING_RECORD:
             return AbstractJournalStorageManager.newBindingEncoding(id, buffer);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a074f9f1/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java
index 4aa470b..0169f38 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java
@@ -33,6 +33,8 @@ public final class JournalRecordIds {
 
    public static final byte QUEUE_BINDING_RECORD = 21;
 
+   public static final byte QUEUE_STATUS_RECORD = 22;
+
    /**
     * Records storing the current recordID number.
     *

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a074f9f1/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
index 4efe292..039460c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
@@ -16,6 +16,9 @@
  */
 package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
 
+import java.util.LinkedList;
+import java.util.List;
+
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.journal.EncodingSupport;
@@ -36,6 +39,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
 
    public SimpleString user;
 
+   public List<QueueStatusEncoding> queueStatusEncodings;
+
    public PersistentQueueBindingEncoding() {
    }
 
@@ -107,6 +112,19 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
    }
 
    @Override
+   public void addQueueStatusEncoding(QueueStatusEncoding status) {
+      if (queueStatusEncodings == null) {
+         queueStatusEncodings = new LinkedList<>();
+      }
+      queueStatusEncodings.add(status);
+   }
+
+   @Override
+   public List<QueueStatusEncoding> getQueueStatusEncodings() {
+      return queueStatusEncodings;
+   }
+
+   @Override
    public void decode(final ActiveMQBuffer buffer) {
       name = buffer.readSimpleString();
       address = buffer.readSimpleString();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a074f9f1/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/QueueEncoding.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/QueueEncoding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/QueueEncoding.java
index 1e05195..9220509 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/QueueEncoding.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/QueueEncoding.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.utils.DataConstants;
 
 public class QueueEncoding implements EncodingSupport {
 
@@ -44,7 +45,7 @@ public class QueueEncoding implements EncodingSupport {
 
    @Override
    public int getEncodeSize() {
-      return 8;
+      return DataConstants.SIZE_LONG;
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a074f9f1/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/QueueStatusEncoding.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/QueueStatusEncoding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/QueueStatusEncoding.java
new file mode 100644
index 0000000..fe2b1f5
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/QueueStatusEncoding.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.core.persistence.QueueStatus;
+import org.apache.activemq.artemis.utils.DataConstants;
+
+public class QueueStatusEncoding extends QueueEncoding {
+
+   private QueueStatus status;
+
+   private long id;
+
+   public QueueStatusEncoding(long queueID, QueueStatus status) {
+      super(queueID);
+      this.status = status;
+   }
+
+   public QueueStatusEncoding() {
+      super();
+   }
+
+   @Override
+   public void decode(final ActiveMQBuffer buffer) {
+      super.decode(buffer);
+      short shortStatus = buffer.readShort();
+      this.status = QueueStatus.fromID(shortStatus);
+   }
+
+   @Override
+   public void encode(final ActiveMQBuffer buffer) {
+      super.encode(buffer);
+      buffer.writeShort(status.id);
+   }
+
+   public QueueStatus getStatus() {
+      return status;
+   }
+
+   public long getId() {
+      return id;
+   }
+
+   public QueueStatusEncoding setId(long id) {
+      this.id = id;
+      return this;
+   }
+
+   @Override
+   public int getEncodeSize() {
+      return super.getEncodeSize() + DataConstants.SIZE_SHORT;
+   }
+
+   @Override
+   public String toString() {
+      return "QueueStatusEncoding [id=" + id + ", queueID=" + queueID + ", status=" + status + "]";
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a074f9f1/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
index f13d2fa..3a2999e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
@@ -41,6 +41,7 @@ import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
 import org.apache.activemq.artemis.core.persistence.GroupingInfo;
 import org.apache.activemq.artemis.core.persistence.OperationContext;
 import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
+import org.apache.activemq.artemis.core.persistence.QueueStatus;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
 import org.apache.activemq.artemis.core.persistence.config.PersistedRoles;
@@ -85,6 +86,16 @@ public class NullStorageManager implements StorageManager {
    }
 
    @Override
+   public long storeQueueStatus(long queueID, QueueStatus status) throws Exception {
+      return 0;
+   }
+
+   @Override
+   public void deleteQueueStatus(long recordID) throws Exception {
+
+   }
+
+   @Override
    public void injectMonitor(FileStoreMonitor monitor) throws Exception {
 
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a074f9f1/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index a9a87c3..0dcef3d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -207,6 +207,15 @@ public interface Queue extends Bindable {
    void pause();
 
    /**
+    * Pauses the queue. It will receive messages but won't give them to the consumers until resumed.
+    * If a queue is paused, pausing it again will only throw a warning.
+    * To check if a queue is paused, invoke <i>isPaused()</i>
+    */
+   void pause(boolean persist);
+
+   void reloadPause(long recordID);
+
+   /**
     * Resumes the delivery of message for the queue.
     * If a queue is resumed, resuming it again will only throw a warning.
     * To check if a queue is resumed, invoke <i>isPaused()</i>
@@ -218,6 +227,12 @@ public interface Queue extends Bindable {
     */
    boolean isPaused();
 
+   /**
+    * if the pause was persisted
+    * @return
+    */
+   boolean isPersistedPause();
+
    Executor getExecutor();
 
    void resetAllIterators();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a074f9f1/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
index ccb00cb..9a8ae74 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
@@ -39,9 +39,11 @@ import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
 import org.apache.activemq.artemis.core.paging.impl.Page;
 import org.apache.activemq.artemis.core.persistence.GroupingInfo;
 import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
+import org.apache.activemq.artemis.core.persistence.QueueStatus;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.persistence.impl.PageCountPending;
 import org.apache.activemq.artemis.core.persistence.impl.journal.AddMessageRecord;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.QueueStatusEncoding;
 import org.apache.activemq.artemis.core.postoffice.Binding;
 import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
 import org.apache.activemq.artemis.core.postoffice.PostOffice;
@@ -146,6 +148,13 @@ public class PostOfficeJournalLoader implements JournalLoader {
             queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(((PostOfficeImpl) postOffice).getServer().getJMSQueueDeleter(), queueBindingInfo.getQueueName()));
          }
 
+         if (queueBindingInfo.getQueueStatusEncodings() != null) {
+            for (QueueStatusEncoding encoding : queueBindingInfo.getQueueStatusEncodings()) {
+               if (encoding.getStatus() == QueueStatus.PAUSED)
+               queue.reloadPause(encoding.getId());
+            }
+         }
+
          final Binding binding = new LocalQueueBinding(queue.getAddress(), queue, nodeManager.getNodeId());
          queues.put(queue.getID(), queue);
          postOffice.addBinding(binding);
@@ -245,7 +254,9 @@ public class PostOfficeJournalLoader implements JournalLoader {
                         ResourceManager resourceManager,
                         Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception {
       for (Queue queue : queues.values()) {
-         queue.resume();
+         if (!queue.isPersistedPause()) {
+            queue.resume();
+         }
       }
 
       if (System.getProperty("org.apache.activemq.opt.directblast") != null) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a074f9f1/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 7c8ad0a..d30544f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -51,6 +51,7 @@ import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.message.impl.MessageImpl;
 import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
 import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
+import org.apache.activemq.artemis.core.persistence.QueueStatus;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.postoffice.Binding;
 import org.apache.activemq.artemis.core.postoffice.Bindings;
@@ -176,6 +177,8 @@ public class QueueImpl implements Queue {
 
    private boolean paused;
 
+   private long pauseStatusRecord = -1;
+
    private static final int MAX_SCHEDULED_RUNNERS = 2;
 
    // We don't ever need more than two DeliverRunner on the executor's list
@@ -1718,8 +1721,32 @@ public class QueueImpl implements Queue {
 
    @Override
    public synchronized void pause() {
+      pause(false);
+   }
+
+   @Override
+   public synchronized void reloadPause(long recordID) {
+      this.paused = true;
+      if (pauseStatusRecord >= 0) {
+         try {
+            storageManager.deleteQueueStatus(pauseStatusRecord);
+         } catch (Exception e) {
+            logger.warn(e.getMessage(), e);
+         }
+      }
+      this.pauseStatusRecord = recordID;
+   }
+
+   @Override
+   public synchronized void pause(boolean persist)  {
       try {
          this.flushDeliveriesInTransit();
+         if (persist && isDurable()) {
+            if (pauseStatusRecord >= 0) {
+               storageManager.deleteQueueStatus(pauseStatusRecord);
+            }
+            pauseStatusRecord = storageManager.storeQueueStatus(this.id, QueueStatus.PAUSED);
+         }
       } catch (Exception e) {
          ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
       }
@@ -1730,6 +1757,15 @@ public class QueueImpl implements Queue {
    public synchronized void resume() {
       paused = false;
 
+      if (pauseStatusRecord >= 0) {
+         try {
+            storageManager.deleteQueueStatus(pauseStatusRecord);
+         } catch (Exception e) {
+            ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
+         }
+         pauseStatusRecord = -1;
+      }
+
       deliverAsync();
    }
 
@@ -1739,6 +1775,11 @@ public class QueueImpl implements Queue {
    }
 
    @Override
+   public synchronized boolean isPersistedPause() {
+      return this.pauseStatusRecord >= 0;
+   }
+
+   @Override
    public boolean isDirectDeliver() {
       return directDeliver;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a074f9f1/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index 91e5e7a..d82f7d3 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -845,6 +845,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
          this.expectedElements = new CountDownLatch(expectedElements);
       }
 
+      @Override
+      public boolean isPersistedPause() {
+         return false;
+      }
+
       public boolean waitCompletion(long timeout, TimeUnit timeUnit) throws Exception {
          return expectedElements.await(timeout, timeUnit);
       }
@@ -863,6 +868,14 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       @Override
+      public void pause(boolean persist) {
+      }
+
+      @Override
+      public void reloadPause(long recordID) {
+      }
+
+      @Override
       public Filter getFilter() {
          return null;
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a074f9f1/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
index 0c2fec5..93c5c9d 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
@@ -41,6 +41,7 @@ import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
 import org.apache.activemq.artemis.core.persistence.GroupingInfo;
 import org.apache.activemq.artemis.core.persistence.OperationContext;
 import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
+import org.apache.activemq.artemis.core.persistence.QueueStatus;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
 import org.apache.activemq.artemis.core.persistence.config.PersistedRoles;
@@ -243,6 +244,16 @@ public class TransactionImplTest extends ActiveMQTestBase {
       }
 
       @Override
+      public long storeQueueStatus(long queueID, QueueStatus status) throws Exception {
+         return 0;
+      }
+
+      @Override
+      public void deleteQueueStatus(long recordID) throws Exception {
+
+      }
+
+      @Override
       public void pageWrite(PagedMessage message, int pageNumber) {
 
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a074f9f1/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java
index f6c4bde..7a07439 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java
@@ -351,6 +351,11 @@ public class JMSQueueControlUsingJMSTest extends JMSQueueControlTest {
          }
 
          @Override
+         public void pause(boolean persist) throws Exception {
+            proxy.invokeOperation("pause", persist);
+         }
+
+         @Override
          public void resume() throws Exception {
             proxy.invokeOperation("resume");
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a074f9f1/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
index e8e04f2..9b901fc 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
@@ -337,6 +337,11 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
          }
 
          @Override
+         public void pause(boolean persist) throws Exception {
+            proxy.invokeOperation("pause", persist);
+         }
+
+         @Override
          public void resume() throws Exception {
             proxy.invokeOperation("resume");
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a074f9f1/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/QueuePeristPauseTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/QueuePeristPauseTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/QueuePeristPauseTest.java
new file mode 100644
index 0000000..e383c85
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/QueuePeristPauseTest.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.server;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class QueuePeristPauseTest extends ActiveMQTestBase {
+
+   @Test
+      public void testPauseQueue() throws Exception {
+      ActiveMQServer server = createServer(true, false);
+      server.start();
+
+      Queue queue = server.createQueue(SimpleString.toSimpleString("q1"),
+                                       SimpleString.toSimpleString("q1"),
+                                       null, true, false);
+
+      queue.pause(true);
+
+      server.stop();
+      server.start();
+
+      for (int i = 0; i < 4; i++) {
+         server.stop();
+         server.start();
+         queue = server.locateQueue(SimpleString.toSimpleString("q1"));
+         Assert.assertTrue(queue.isPaused());
+      }
+
+      queue.resume();
+
+      for (int i = 0; i < 4; i++) {
+         server.stop();
+         server.start();
+         queue = server.locateQueue(SimpleString.toSimpleString("q1"));
+         Assert.assertFalse(queue.isPaused());
+      }
+
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a074f9f1/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index 8eae7d6..bba5dc1 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -57,6 +57,16 @@ public class FakeQueue implements Queue {
    }
 
    @Override
+   public void reloadPause(long recordID) {
+
+   }
+
+   @Override
+   public boolean isPersistedPause() {
+      return false;
+   }
+
+   @Override
    public int retryMessages(Filter filter) throws Exception {
       return 0;
    }
@@ -103,6 +113,11 @@ public class FakeQueue implements Queue {
    }
 
    @Override
+   public void pause(boolean persist) {
+
+   }
+
+   @Override
    public boolean flushExecutor() {
       return true;
    }