You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2016/10/22 01:00:15 UTC
[1/2] activemq-artemis git commit: ARTEMIS-753 Queue Pause and
Resumed persisted
Repository: activemq-artemis
Updated Branches:
refs/heads/master 0f9efa922 -> 8d4f507cb
ARTEMIS-753 Queue Pause and Resumed persisted
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/a074f9f1
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/a074f9f1
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/a074f9f1
Branch: refs/heads/master
Commit: a074f9f1a5f0541a83dd2500427022eb6e63c8f9
Parents: 0f9efa9
Author: Clebert Suconic <cl...@apache.org>
Authored: Fri Oct 21 14:59:52 2016 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Oct 21 20:26:23 2016 -0400
----------------------------------------------------------------------
.../api/core/management/QueueControl.java | 8 ++-
.../api/jms/management/JMSQueueControl.java | 6 ++
.../management/impl/JMSQueueControlImpl.java | 5 ++
.../core/management/impl/QueueControlImpl.java | 12 ++++
.../core/persistence/QueueBindingInfo.java | 7 ++
.../artemis/core/persistence/QueueStatus.java | 46 ++++++++++++
.../core/persistence/StorageManager.java | 11 +++
.../journal/AbstractJournalStorageManager.java | 66 ++++++++++++++++-
.../impl/journal/DescribeJournal.java | 4 ++
.../impl/journal/JournalRecordIds.java | 2 +
.../codec/PersistentQueueBindingEncoding.java | 18 +++++
.../impl/journal/codec/QueueEncoding.java | 3 +-
.../impl/journal/codec/QueueStatusEncoding.java | 75 ++++++++++++++++++++
.../impl/nullpm/NullStorageManager.java | 11 +++
.../activemq/artemis/core/server/Queue.java | 15 ++++
.../server/impl/PostOfficeJournalLoader.java | 13 +++-
.../artemis/core/server/impl/QueueImpl.java | 41 +++++++++++
.../impl/ScheduledDeliveryHandlerTest.java | 13 ++++
.../transaction/impl/TransactionImplTest.java | 11 +++
.../management/JMSQueueControlUsingJMSTest.java | 5 ++
.../management/QueueControlUsingCoreTest.java | 5 ++
.../server/QueuePeristPauseTest.java | 60 ++++++++++++++++
.../unit/core/postoffice/impl/FakeQueue.java | 15 ++++
23 files changed, 448 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a074f9f1/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
index 0a74d1c..3336aae 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
@@ -411,9 +411,15 @@ public interface QueueControl {
void pause() throws Exception;
/**
+ * Pauses the queue. Messages are no longer delivered to its consumers.
+ */
+ @Operation(desc = "Pauses the Queue", impact = MBeanOperationInfo.ACTION)
+ void pause(@Parameter(name = "persist", desc = "if true, the pause state will be persisted.") boolean persist) throws Exception;
+
+ /**
* Resumes the queue. Messages are again delivered to its consumers.
*/
- @Operation(desc = "Resumes delivery of queued messages and gets the queue out of paused state.", impact = MBeanOperationInfo.ACTION)
+ @Operation(desc = "Resumes delivery of queued messages and gets the queue out of paused state. It will also affected the state of a persisted pause.", impact = MBeanOperationInfo.ACTION)
void resume() throws Exception;
@Operation(desc = "List all the existent consumers on the Queue")
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a074f9f1/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSQueueControl.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSQueueControl.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSQueueControl.java
index 56a127c..3a4101a 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSQueueControl.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSQueueControl.java
@@ -370,6 +370,12 @@ public interface JMSQueueControl extends DestinationControl {
void pause() throws Exception;
/**
+ * Pauses the queue. Messages are no longer delivered to its consumers.
+ */
+ @Operation(desc = "Pauses the Queue", impact = MBeanOperationInfo.ACTION)
+ void pause(@Parameter(name = "persist", desc = "if true, the pause state will be persisted.") boolean persist) throws Exception;
+
+ /**
* Returns whether the queue is paused.
*/
@Attribute(desc = "Returns true if the queue is paused.")
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a074f9f1/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java
index a836146..36cba96 100644
--- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java
+++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java
@@ -474,6 +474,11 @@ public class JMSQueueControlImpl extends StandardMBean implements JMSQueueContro
}
@Override
+ public void pause(boolean persist) throws Exception {
+ coreQueueControl.pause(persist);
+ }
+
+ @Override
public void resume() throws Exception {
coreQueueControl.resume();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a074f9f1/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
index 7275ea4..cfa8aa5 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
@@ -848,6 +848,18 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
}
}
+
+ @Override
+ public void pause(boolean persist) {
+ checkStarted();
+
+ clearIO();
+ try {
+ queue.pause(persist);
+ } finally {
+ blockOnIO();
+ }
+ }
@Override
public void resume() {
checkStarted();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a074f9f1/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java
index c05b86c..8c80a8a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java
@@ -16,7 +16,10 @@
*/
package org.apache.activemq.artemis.core.persistence;
+import java.util.List;
+
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.QueueStatusEncoding;
public interface QueueBindingInfo {
@@ -39,4 +42,8 @@ public interface QueueBindingInfo {
SimpleString getUser();
+ void addQueueStatusEncoding(QueueStatusEncoding status);
+
+ List<QueueStatusEncoding> getQueueStatusEncodings();
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a074f9f1/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueStatus.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueStatus.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueStatus.java
new file mode 100644
index 0000000..18fa9b9
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueStatus.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.persistence;
+
+public enum QueueStatus {
+ PAUSED((short) 0), RUNNING((short) 1);
+
+ public final short id;
+
+ QueueStatus(short id) {
+ this.id = id;
+ }
+
+ public static QueueStatus[] values;
+
+ static {
+ QueueStatus[] allValues = QueueStatus.values();
+ values = new QueueStatus[allValues.length];
+ for (QueueStatus v : allValues) {
+ values[v.id] = v;
+ }
+ }
+
+ public static QueueStatus fromID(short id) {
+ if (id < 0 || id > values.length) {
+ return null;
+ } else {
+ return values[id];
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a074f9f1/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
index e820664..bbfec14 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
@@ -287,6 +287,17 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
void deleteQueueBinding(long tx, long queueBindingID) throws Exception;
+ /**
+ *
+ * @param queueID The id of the queue
+ * @param status The current status of the queue. (Reserved for future use, ATM we only use this record for PAUSED)
+ * @return the id of the journal
+ * @throws Exception
+ */
+ long storeQueueStatus(long queueID, QueueStatus status) throws Exception;
+
+ void deleteQueueStatus(long recordID) throws Exception;
+
JournalLoadInformation loadBindingJournal(List<QueueBindingInfo> queueBindingInfos,
List<GroupingInfo> groupingInfos) throws Exception;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a074f9f1/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index cd86191..a6938d6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -63,6 +63,7 @@ import org.apache.activemq.artemis.core.paging.impl.PageTransactionInfoImpl;
import org.apache.activemq.artemis.core.persistence.GroupingInfo;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
+import org.apache.activemq.artemis.core.persistence.QueueStatus;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
import org.apache.activemq.artemis.core.persistence.config.PersistedRoles;
@@ -81,6 +82,7 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageCount
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageUpdateTXEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PendingLargeMessageEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PersistentQueueBindingEncoding;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.QueueStatusEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.RefEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.ScheduledDeliveryEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.XidEncoding;
@@ -105,6 +107,7 @@ import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.Base64;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.IDGenerator;
+import org.jboss.logging.Logger;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ACKNOWLEDGE_CURSOR;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE_PENDING;
@@ -122,6 +125,8 @@ import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalR
*/
public abstract class AbstractJournalStorageManager implements StorageManager {
+ private static final Logger logger = Logger.getLogger(AbstractJournalStorageManager.class);
+
public enum JournalContent {
BINDINGS((byte) 0), MESSAGES((byte) 1);
@@ -1237,6 +1242,32 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
}
@Override
+ public long storeQueueStatus(long queueID, QueueStatus status) throws Exception {
+ long recordID = idGenerator.generateID();
+
+ readLock();
+ try {
+ bindingsJournal.appendAddRecord(recordID, JournalRecordIds.QUEUE_STATUS_RECORD, new QueueStatusEncoding(queueID, status), true);
+ } finally {
+ readUnLock();
+ }
+
+
+ return recordID;
+ }
+
+ @Override
+ public void deleteQueueStatus(long recordID) throws Exception {
+ readLock();
+ try {
+ bindingsJournal.appendDeleteRecord(recordID, true);
+ } finally {
+ readUnLock();
+ }
+
+ }
+
+ @Override
public long storePageCounterInc(long txID, long queueID, int value) throws Exception {
readLock();
try {
@@ -1326,6 +1357,8 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
JournalLoadInformation bindingsInfo = bindingsJournal.load(records, preparedTransactions, null);
+ HashMap<Long, PersistentQueueBindingEncoding> mapBindings = new HashMap<>();
+
for (RecordInfo record : records) {
long id = record.id;
@@ -1337,6 +1370,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
PersistentQueueBindingEncoding bindingEncoding = newBindingEncoding(id, buffer);
queueBindingInfos.add(bindingEncoding);
+ mapBindings.put(bindingEncoding.getId(), bindingEncoding);
} else if (rec == JournalRecordIds.ID_COUNTER_RECORD) {
idGenerator.loadState(record.id, buffer);
} else if (rec == JournalRecordIds.GROUP_RECORD) {
@@ -1348,11 +1382,24 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
} else if (rec == JournalRecordIds.SECURITY_RECORD) {
PersistedRoles roles = newSecurityRecord(id, buffer);
mapPersistedRoles.put(roles.getAddressMatch(), roles);
+ } else if (rec == JournalRecordIds.QUEUE_STATUS_RECORD) {
+ QueueStatusEncoding statusEncoding = newQueueStatusEncoding(id, buffer);
+ PersistentQueueBindingEncoding queueBindingEncoding = mapBindings.get(statusEncoding.queueID);
+ if (queueBindingEncoding != null) {
+ queueBindingEncoding.addQueueStatusEncoding(statusEncoding);
+ } else {
+ // unlikely to happen, so I didn't bother about the Logger method
+ logger.info("There is no queue with ID " + statusEncoding.queueID + ", deleting record " + statusEncoding.getId());
+ this.deleteQueueStatus(statusEncoding.getId());
+ }
} else {
- throw new IllegalStateException("Invalid record type " + rec);
+ // unlikely to happen
+ logger.warn("Invalid record type " + rec, new Exception("invalid record type " + rec));
}
}
+ mapBindings.clear(); // just to give a hand to GC
+
// This will instruct the IDGenerator to beforeStop old records
idGenerator.cleanup();
@@ -1821,6 +1868,23 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
return bindingEncoding;
}
+ /**
+ * @param id
+ * @param buffer
+ * @return
+ */
+ protected static QueueStatusEncoding newQueueStatusEncoding(long id, ActiveMQBuffer buffer) {
+ QueueStatusEncoding statusEncoding = new QueueStatusEncoding();
+
+ statusEncoding.decode(buffer);
+ statusEncoding.setId(id);
+
+ return statusEncoding;
+ }
+
+
+
+
@Override
public boolean addToPage(PagingStore store,
ServerMessage msg,
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a074f9f1/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java
index 34a47bf..58723c6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java
@@ -74,6 +74,7 @@ import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalR
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_PENDING_COUNTER;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_TRANSACTION;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.QUEUE_BINDING_RECORD;
+import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.QUEUE_STATUS_RECORD;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.SECURITY_RECORD;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.UPDATE_DELIVERY_COUNT;
@@ -550,6 +551,9 @@ public final class DescribeJournal {
return encoding;
}
+ case QUEUE_STATUS_RECORD:
+ return AbstractJournalStorageManager.newQueueStatusEncoding(id, buffer);
+
case QUEUE_BINDING_RECORD:
return AbstractJournalStorageManager.newBindingEncoding(id, buffer);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a074f9f1/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java
index 4aa470b..0169f38 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java
@@ -33,6 +33,8 @@ public final class JournalRecordIds {
public static final byte QUEUE_BINDING_RECORD = 21;
+ public static final byte QUEUE_STATUS_RECORD = 22;
+
/**
* Records storing the current recordID number.
*
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a074f9f1/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
index 4efe292..039460c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
@@ -16,6 +16,9 @@
*/
package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
+import java.util.LinkedList;
+import java.util.List;
+
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
@@ -36,6 +39,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
public SimpleString user;
+ public List<QueueStatusEncoding> queueStatusEncodings;
+
public PersistentQueueBindingEncoding() {
}
@@ -107,6 +112,19 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
}
@Override
+ public void addQueueStatusEncoding(QueueStatusEncoding status) {
+ if (queueStatusEncodings == null) {
+ queueStatusEncodings = new LinkedList<>();
+ }
+ queueStatusEncodings.add(status);
+ }
+
+ @Override
+ public List<QueueStatusEncoding> getQueueStatusEncodings() {
+ return queueStatusEncodings;
+ }
+
+ @Override
public void decode(final ActiveMQBuffer buffer) {
name = buffer.readSimpleString();
address = buffer.readSimpleString();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a074f9f1/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/QueueEncoding.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/QueueEncoding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/QueueEncoding.java
index 1e05195..9220509 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/QueueEncoding.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/QueueEncoding.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.utils.DataConstants;
public class QueueEncoding implements EncodingSupport {
@@ -44,7 +45,7 @@ public class QueueEncoding implements EncodingSupport {
@Override
public int getEncodeSize() {
- return 8;
+ return DataConstants.SIZE_LONG;
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a074f9f1/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/QueueStatusEncoding.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/QueueStatusEncoding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/QueueStatusEncoding.java
new file mode 100644
index 0000000..fe2b1f5
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/QueueStatusEncoding.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.core.persistence.QueueStatus;
+import org.apache.activemq.artemis.utils.DataConstants;
+
+public class QueueStatusEncoding extends QueueEncoding {
+
+ private QueueStatus status;
+
+ private long id;
+
+ public QueueStatusEncoding(long queueID, QueueStatus status) {
+ super(queueID);
+ this.status = status;
+ }
+
+ public QueueStatusEncoding() {
+ super();
+ }
+
+ @Override
+ public void decode(final ActiveMQBuffer buffer) {
+ super.decode(buffer);
+ short shortStatus = buffer.readShort();
+ this.status = QueueStatus.fromID(shortStatus);
+ }
+
+ @Override
+ public void encode(final ActiveMQBuffer buffer) {
+ super.encode(buffer);
+ buffer.writeShort(status.id);
+ }
+
+ public QueueStatus getStatus() {
+ return status;
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ public QueueStatusEncoding setId(long id) {
+ this.id = id;
+ return this;
+ }
+
+ @Override
+ public int getEncodeSize() {
+ return super.getEncodeSize() + DataConstants.SIZE_SHORT;
+ }
+
+ @Override
+ public String toString() {
+ return "QueueStatusEncoding [id=" + id + ", queueID=" + queueID + ", status=" + status + "]";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a074f9f1/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
index f13d2fa..3a2999e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
@@ -41,6 +41,7 @@ import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
import org.apache.activemq.artemis.core.persistence.GroupingInfo;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
+import org.apache.activemq.artemis.core.persistence.QueueStatus;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
import org.apache.activemq.artemis.core.persistence.config.PersistedRoles;
@@ -85,6 +86,16 @@ public class NullStorageManager implements StorageManager {
}
@Override
+ public long storeQueueStatus(long queueID, QueueStatus status) throws Exception {
+ return 0;
+ }
+
+ @Override
+ public void deleteQueueStatus(long recordID) throws Exception {
+
+ }
+
+ @Override
public void injectMonitor(FileStoreMonitor monitor) throws Exception {
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a074f9f1/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index a9a87c3..0dcef3d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -207,6 +207,15 @@ public interface Queue extends Bindable {
void pause();
/**
+ * Pauses the queue. It will receive messages but won't give them to the consumers until resumed.
+ * If a queue is paused, pausing it again will only throw a warning.
+ * To check if a queue is paused, invoke <i>isPaused()</i>
+ */
+ void pause(boolean persist);
+
+ void reloadPause(long recordID);
+
+ /**
* Resumes the delivery of message for the queue.
* If a queue is resumed, resuming it again will only throw a warning.
* To check if a queue is resumed, invoke <i>isPaused()</i>
@@ -218,6 +227,12 @@ public interface Queue extends Bindable {
*/
boolean isPaused();
+ /**
+ * if the pause was persisted
+ * @return
+ */
+ boolean isPersistedPause();
+
Executor getExecutor();
void resetAllIterators();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a074f9f1/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
index ccb00cb..9a8ae74 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
@@ -39,9 +39,11 @@ import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
import org.apache.activemq.artemis.core.paging.impl.Page;
import org.apache.activemq.artemis.core.persistence.GroupingInfo;
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
+import org.apache.activemq.artemis.core.persistence.QueueStatus;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.impl.PageCountPending;
import org.apache.activemq.artemis.core.persistence.impl.journal.AddMessageRecord;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.QueueStatusEncoding;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
@@ -146,6 +148,13 @@ public class PostOfficeJournalLoader implements JournalLoader {
queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(((PostOfficeImpl) postOffice).getServer().getJMSQueueDeleter(), queueBindingInfo.getQueueName()));
}
+ if (queueBindingInfo.getQueueStatusEncodings() != null) {
+ for (QueueStatusEncoding encoding : queueBindingInfo.getQueueStatusEncodings()) {
+ if (encoding.getStatus() == QueueStatus.PAUSED)
+ queue.reloadPause(encoding.getId());
+ }
+ }
+
final Binding binding = new LocalQueueBinding(queue.getAddress(), queue, nodeManager.getNodeId());
queues.put(queue.getID(), queue);
postOffice.addBinding(binding);
@@ -245,7 +254,9 @@ public class PostOfficeJournalLoader implements JournalLoader {
ResourceManager resourceManager,
Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception {
for (Queue queue : queues.values()) {
- queue.resume();
+ if (!queue.isPersistedPause()) {
+ queue.resume();
+ }
}
if (System.getProperty("org.apache.activemq.opt.directblast") != null) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a074f9f1/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 7c8ad0a..d30544f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -51,6 +51,7 @@ import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.message.impl.MessageImpl;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
+import org.apache.activemq.artemis.core.persistence.QueueStatus;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
@@ -176,6 +177,8 @@ public class QueueImpl implements Queue {
private boolean paused;
+ private long pauseStatusRecord = -1;
+
private static final int MAX_SCHEDULED_RUNNERS = 2;
// We don't ever need more than two DeliverRunner on the executor's list
@@ -1718,8 +1721,32 @@ public class QueueImpl implements Queue {
@Override
public synchronized void pause() {
+ pause(false);
+ }
+
+ @Override
+ public synchronized void reloadPause(long recordID) {
+ this.paused = true;
+ if (pauseStatusRecord >= 0) {
+ try {
+ storageManager.deleteQueueStatus(pauseStatusRecord);
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ }
+ this.pauseStatusRecord = recordID;
+ }
+
+ @Override
+ public synchronized void pause(boolean persist) {
try {
this.flushDeliveriesInTransit();
+ if (persist && isDurable()) {
+ if (pauseStatusRecord >= 0) {
+ storageManager.deleteQueueStatus(pauseStatusRecord);
+ }
+ pauseStatusRecord = storageManager.storeQueueStatus(this.id, QueueStatus.PAUSED);
+ }
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
}
@@ -1730,6 +1757,15 @@ public class QueueImpl implements Queue {
public synchronized void resume() {
paused = false;
+ if (pauseStatusRecord >= 0) {
+ try {
+ storageManager.deleteQueueStatus(pauseStatusRecord);
+ } catch (Exception e) {
+ ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
+ }
+ pauseStatusRecord = -1;
+ }
+
deliverAsync();
}
@@ -1739,6 +1775,11 @@ public class QueueImpl implements Queue {
}
@Override
+ public synchronized boolean isPersistedPause() {
+ return this.pauseStatusRecord >= 0;
+ }
+
+ @Override
public boolean isDirectDeliver() {
return directDeliver;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a074f9f1/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index 91e5e7a..d82f7d3 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -845,6 +845,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
this.expectedElements = new CountDownLatch(expectedElements);
}
+ @Override
+ public boolean isPersistedPause() {
+ return false;
+ }
+
public boolean waitCompletion(long timeout, TimeUnit timeUnit) throws Exception {
return expectedElements.await(timeout, timeUnit);
}
@@ -863,6 +868,14 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
+ public void pause(boolean persist) {
+ }
+
+ @Override
+ public void reloadPause(long recordID) {
+ }
+
+ @Override
public Filter getFilter() {
return null;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a074f9f1/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
index 0c2fec5..93c5c9d 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
@@ -41,6 +41,7 @@ import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
import org.apache.activemq.artemis.core.persistence.GroupingInfo;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
+import org.apache.activemq.artemis.core.persistence.QueueStatus;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
import org.apache.activemq.artemis.core.persistence.config.PersistedRoles;
@@ -243,6 +244,16 @@ public class TransactionImplTest extends ActiveMQTestBase {
}
@Override
+ public long storeQueueStatus(long queueID, QueueStatus status) throws Exception {
+ return 0;
+ }
+
+ @Override
+ public void deleteQueueStatus(long recordID) throws Exception {
+
+ }
+
+ @Override
public void pageWrite(PagedMessage message, int pageNumber) {
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a074f9f1/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java
index f6c4bde..7a07439 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java
@@ -351,6 +351,11 @@ public class JMSQueueControlUsingJMSTest extends JMSQueueControlTest {
}
@Override
+ public void pause(boolean persist) throws Exception {
+ proxy.invokeOperation("pause", persist);
+ }
+
+ @Override
public void resume() throws Exception {
proxy.invokeOperation("resume");
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a074f9f1/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
index e8e04f2..9b901fc 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
@@ -337,6 +337,11 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
}
@Override
+ public void pause(boolean persist) throws Exception {
+ proxy.invokeOperation("pause", persist);
+ }
+
+ @Override
public void resume() throws Exception {
proxy.invokeOperation("resume");
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a074f9f1/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/QueuePeristPauseTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/QueuePeristPauseTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/QueuePeristPauseTest.java
new file mode 100644
index 0000000..e383c85
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/QueuePeristPauseTest.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.server;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class QueuePeristPauseTest extends ActiveMQTestBase {
+
+ @Test
+ public void testPauseQueue() throws Exception {
+ ActiveMQServer server = createServer(true, false);
+ server.start();
+
+ Queue queue = server.createQueue(SimpleString.toSimpleString("q1"),
+ SimpleString.toSimpleString("q1"),
+ null, true, false);
+
+ queue.pause(true);
+
+ server.stop();
+ server.start();
+
+ for (int i = 0; i < 4; i++) {
+ server.stop();
+ server.start();
+ queue = server.locateQueue(SimpleString.toSimpleString("q1"));
+ Assert.assertTrue(queue.isPaused());
+ }
+
+ queue.resume();
+
+ for (int i = 0; i < 4; i++) {
+ server.stop();
+ server.start();
+ queue = server.locateQueue(SimpleString.toSimpleString("q1"));
+ Assert.assertFalse(queue.isPaused());
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a074f9f1/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index 8eae7d6..bba5dc1 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -57,6 +57,16 @@ public class FakeQueue implements Queue {
}
@Override
+ public void reloadPause(long recordID) {
+
+ }
+
+ @Override
+ public boolean isPersistedPause() {
+ return false;
+ }
+
+ @Override
public int retryMessages(Filter filter) throws Exception {
return 0;
}
@@ -103,6 +113,11 @@ public class FakeQueue implements Queue {
}
@Override
+ public void pause(boolean persist) {
+
+ }
+
+ @Override
public boolean flushExecutor() {
return true;
}
[2/2] activemq-artemis git commit: This closes #856
Posted by jb...@apache.org.
This closes #856
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/8d4f507c
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/8d4f507c
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/8d4f507c
Branch: refs/heads/master
Commit: 8d4f507cb83dd91b8ba6acc35a895367fa305ee1
Parents: 0f9efa9 a074f9f
Author: jbertram <jb...@apache.com>
Authored: Fri Oct 21 19:58:16 2016 -0500
Committer: jbertram <jb...@apache.com>
Committed: Fri Oct 21 19:58:16 2016 -0500
----------------------------------------------------------------------
.../api/core/management/QueueControl.java | 8 ++-
.../api/jms/management/JMSQueueControl.java | 6 ++
.../management/impl/JMSQueueControlImpl.java | 5 ++
.../core/management/impl/QueueControlImpl.java | 12 ++++
.../core/persistence/QueueBindingInfo.java | 7 ++
.../artemis/core/persistence/QueueStatus.java | 46 ++++++++++++
.../core/persistence/StorageManager.java | 11 +++
.../journal/AbstractJournalStorageManager.java | 66 ++++++++++++++++-
.../impl/journal/DescribeJournal.java | 4 ++
.../impl/journal/JournalRecordIds.java | 2 +
.../codec/PersistentQueueBindingEncoding.java | 18 +++++
.../impl/journal/codec/QueueEncoding.java | 3 +-
.../impl/journal/codec/QueueStatusEncoding.java | 75 ++++++++++++++++++++
.../impl/nullpm/NullStorageManager.java | 11 +++
.../activemq/artemis/core/server/Queue.java | 15 ++++
.../server/impl/PostOfficeJournalLoader.java | 13 +++-
.../artemis/core/server/impl/QueueImpl.java | 41 +++++++++++
.../impl/ScheduledDeliveryHandlerTest.java | 13 ++++
.../transaction/impl/TransactionImplTest.java | 11 +++
.../management/JMSQueueControlUsingJMSTest.java | 5 ++
.../management/QueueControlUsingCoreTest.java | 5 ++
.../server/QueuePeristPauseTest.java | 60 ++++++++++++++++
.../unit/core/postoffice/impl/FakeQueue.java | 15 ++++
23 files changed, 448 insertions(+), 4 deletions(-)
----------------------------------------------------------------------