You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by an...@apache.org on 2018/01/29 07:47:38 UTC
[1/5] activemq-artemis git commit: ARTEMIS-1638 Fixing Purge rollback
behaviour (fix)
Repository: activemq-artemis
Updated Branches:
refs/heads/master c671fa078 -> b66d0f7ac
ARTEMIS-1638 Fixing Purge rollback behaviour (fix)
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/adb466b2
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/adb466b2
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/adb466b2
Branch: refs/heads/master
Commit: adb466b2f8d7e4fb2045804fe48cfd82d710f0d1
Parents: 69429e4
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Jan 25 21:17:38 2018 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Jan 26 23:24:55 2018 -0500
----------------------------------------------------------------------
.../activemq/artemis/core/server/impl/QueueImpl.java | 15 +++++++++++++++
1 file changed, 15 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/adb466b2/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index f6b6e00..77adc85 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -2896,11 +2896,26 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
void postRollback(final LinkedList<MessageReference> refs) {
//if we have purged then ignore adding the messages back
if (purgeOnNoConsumers && getConsumerCount() == 0) {
+ purgeAfterRollback(refs);
+
return;
}
addHead(refs, false);
}
+ private void purgeAfterRollback(LinkedList<MessageReference> refs) {
+ try {
+ Transaction transaction = new TransactionImpl(storageManager);
+ for (MessageReference reference : refs) {
+ incDelivering(); // post ack will decrement this, so need to inc
+ acknowledge(transaction, reference, AckReason.KILLED);
+ }
+ transaction.commit();
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ }
+
private long calculateRedeliveryDelay(final AddressSettings addressSettings, final int deliveryCount) {
long redeliveryDelay = addressSettings.getRedeliveryDelay();
long maxRedeliveryDelay = addressSettings.getMaxRedeliveryDelay();
[2/5] activemq-artemis git commit: ARTEMIS-1638 Fixing Purge rollback
behaviour (test only)
Posted by an...@apache.org.
ARTEMIS-1638 Fixing Purge rollback behaviour (test only)
Having a commit with just a test will make it easy for developers to checkout just this branch
and validate the issue.
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/69429e4e
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/69429e4e
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/69429e4e
Branch: refs/heads/master
Commit: 69429e4e230960c2bd29e031fc270a4a72dac148
Parents: c671fa0
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Jan 25 21:17:25 2018 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Jan 26 23:24:55 2018 -0500
----------------------------------------------------------------------
.../integration/amqp/AmqpClientTestSupport.java | 5 +
.../amqp/AmqpPurgeOnNoConsumersTest.java | 127 ++++++++++++++-----
2 files changed, 103 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/69429e4e/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
index 044183f..a01c975 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
@@ -304,6 +304,10 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
}
protected void sendMessages(String destinationName, int count, RoutingType routingType) throws Exception {
+ sendMessages(destinationName, count, routingType, false);
+ }
+
+ protected void sendMessages(String destinationName, int count, RoutingType routingType, boolean durable) throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
try {
@@ -313,6 +317,7 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
for (int i = 0; i < count; ++i) {
AmqpMessage message = new AmqpMessage();
message.setMessageId("MessageID:" + i);
+ message.setDurable(true);
if (routingType != null) {
message.setMessageAnnotation(AMQPMessageSupport.ROUTING_TYPE.toString(), routingType.getType());
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/69429e4e/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpPurgeOnNoConsumersTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpPurgeOnNoConsumersTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpPurgeOnNoConsumersTest.java
index 9ded61c..ae4849b 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpPurgeOnNoConsumersTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpPurgeOnNoConsumersTest.java
@@ -16,10 +16,20 @@
*/
package org.apache.activemq.artemis.tests.integration.amqp;
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.server.impl.QueueImpl;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
@@ -28,57 +38,116 @@ import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.junit.Test;
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-
public class AmqpPurgeOnNoConsumersTest extends AmqpClientTestSupport {
+ @Override
+ protected String getConfiguredProtocols() {
+ return "AMQP,OPENWIRE,CORE";
+ }
+
@Test(timeout = 60000)
public void testQueueReceiverReadMessage() throws Exception {
+ AmqpConnection connection = null;
String queue = "purgeQueue";
SimpleString ssQueue = new SimpleString(queue);
+
server.addAddressInfo(new AddressInfo(ssQueue, RoutingType.ANYCAST));
server.createQueue(ssQueue, RoutingType.ANYCAST, ssQueue, null, true, false, 1, true, false);
AmqpClient client = createAmqpClient();
- AmqpConnection connection = addConnection(client.connect());
+ connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
final AmqpReceiver receiver = session.createReceiver(queue);
- Queue queueView = getProxyToQueue(queue);
+ QueueImpl queueView = (QueueImpl)getProxyToQueue(queue);
+ assertEquals(0L, queueView.getPageSubscription().getPagingStore().getAddressSize());
assertEquals(0, queueView.getMessageCount());
- Thread t = new Thread(new Runnable() {
- @Override
- public void run() {
- for (int i = 0; i < 4; i++) {
- try {
- AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS);
- receive.accept();
- assertNotNull(receive);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- try {
- receiver.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- });
+ sendMessages(queue, 5, null, true);
- t.start();
+ Wait.assertEquals(5, queueView::getMessageCount);
receiver.flow(5);
- sendMessages(queue, 5);
-
- t.join(5000);
+ for (int i = 0; i < 4; i++) {
+ try {
+ AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS);
+ receive.accept();
+ assertNotNull(receive);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ try {
+ receiver.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
Wait.assertEquals(0, queueView::getMessageCount);
+ assertEquals(0L, queueView.getPageSubscription().getPagingStore().getAddressSize());
connection.close();
+
+ server.stop();
+
+ server.start();
+
+ queueView = (QueueImpl)getProxyToQueue(queue);
+
+ assertEquals(0, queueView.getMessageCount());
+ assertEquals(0L, queueView.getPageSubscription().getPagingStore().getAddressSize());
+ }
+
+
+ // I'm adding the core test here to compare semantics between AMQP and core on this test.
+ @Test(timeout = 60000)
+ public void testPurgeQueueCoreRollback() throws Exception {
+ String queue = "purgeQueue";
+ SimpleString ssQueue = new SimpleString(queue);
+ server.addAddressInfo(new AddressInfo(ssQueue, RoutingType.ANYCAST));
+ server.createQueue(ssQueue, RoutingType.ANYCAST, ssQueue, null, true, false, 1, true, false);
+
+ ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:5672");
+ Connection connection = cf.createConnection();
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ MessageProducer producer = session.createProducer(session.createQueue("purgeQueue"));
+
+ javax.jms.Queue jmsQueue = session.createQueue(queue);
+ MessageConsumer consumer = session.createConsumer(jmsQueue);
+
+ for (int i = 0; i < 10; i++) {
+ Message message = session.createTextMessage("hello " + i);
+ producer.send(message);
+ }
+ session.commit();
+
+ QueueImpl queueView = (QueueImpl)getProxyToQueue(queue);
+
+ Wait.assertEquals(10, queueView::getMessageCount);
+
+ connection.start();
+
+
+ for (int i = 0; i < 10; i++) {
+ TextMessage txt = (TextMessage)consumer.receive(1000);
+ assertNotNull(txt);
+ assertEquals("hello " + i, txt.getText());
+ }
+ consumer.close();
+ session.rollback();
+ connection.close();
+
+ Wait.assertEquals(0, queueView::getMessageCount);
+
+ server.stop();
+
+ server.start();
+
+ queueView = (QueueImpl)getProxyToQueue(queue);
+
+ assertEquals(0, queueView.getMessageCount());
+ assertEquals(0L, queueView.getPageSubscription().getPagingStore().getAddressSize());
}
}
[4/5] activemq-artemis git commit: ARTEMIS-1638 & ARTEMIS-1641 Making
sure Paging survives Purge on a test & cleanup PgTX (fix)
Posted by an...@apache.org.
ARTEMIS-1638 & ARTEMIS-1641 Making sure Paging survives Purge on a test & cleanup PgTX (fix)
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/c10b7441
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/c10b7441
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/c10b7441
Branch: refs/heads/master
Commit: c10b74412a335f6c930c3dddace11ef300215d4f
Parents: 59d2ac5
Author: Clebert Suconic <cl...@apache.org>
Authored: Fri Jan 26 23:05:52 2018 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Jan 26 23:24:56 2018 -0500
----------------------------------------------------------------------
.../core/paging/PageTransactionInfo.java | 4 ++-
.../paging/impl/PageTransactionInfoImpl.java | 32 ++++++++++++++------
.../journal/AbstractJournalStorageManager.java | 30 +++++++++++++++++-
.../core/postoffice/impl/PostOfficeImpl.java | 2 +-
.../artemis/core/server/ActiveMQServer.java | 28 +++++++++++++++++
.../activemq/artemis/core/server/Queue.java | 13 +++++++-
.../core/server/impl/ActiveMQServerImpl.java | 24 ++-------------
.../artemis/core/server/impl/QueueImpl.java | 27 ++++++++++-------
.../core/server/impl/QueueManagerImpl.java | 2 +-
.../core/server/impl/RoutingContextImpl.java | 2 +-
.../core/server/impl/ServerConsumerImpl.java | 2 +-
.../management/impl/ManagementServiceImpl.java | 2 +-
.../impl/ScheduledDeliveryHandlerTest.java | 7 ++++-
.../unit/core/postoffice/impl/FakeQueue.java | 8 ++++-
14 files changed, 131 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c10b7441/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PageTransactionInfo.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PageTransactionInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PageTransactionInfo.java
index 4fb49ea..90aaade 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PageTransactionInfo.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PageTransactionInfo.java
@@ -51,7 +51,9 @@ public interface PageTransactionInfo extends EncodingSupport {
int increment) throws Exception;
// To be used after the update was stored or reload
- void onUpdate(int update, StorageManager storageManager, PagingManager pagingManager);
+ boolean onUpdate(int update, StorageManager storageManager, PagingManager pagingManager);
+
+ boolean checkSize(StorageManager storageManager, PagingManager pagingManager);
void increment(int durableSize, int nonDurableSize);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c10b7441/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java
index b793aec..70594ca 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java
@@ -89,16 +89,30 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo {
}
@Override
- public void onUpdate(final int update, final StorageManager storageManager, PagingManager pagingManager) {
- int sizeAfterUpdate = numberOfMessages.addAndGet(-update);
- if (sizeAfterUpdate == 0 && storageManager != null) {
- try {
- storageManager.deletePageTransactional(this.recordID);
- } catch (Exception e) {
- ActiveMQServerLogger.LOGGER.pageTxDeleteError(e, recordID);
- }
+ public boolean onUpdate(final int update, final StorageManager storageManager, PagingManager pagingManager) {
+ int afterUpdate = numberOfMessages.addAndGet(-update);
+ return internalCheckSize(storageManager, pagingManager, afterUpdate);
+ }
+
+ @Override
+ public boolean checkSize(StorageManager storageManager, PagingManager pagingManager) {
+ return internalCheckSize(storageManager, pagingManager, numberOfMessages.get());
+ }
- pagingManager.removeTransaction(this.transactionID);
+ public boolean internalCheckSize(StorageManager storageManager, PagingManager pagingManager, int size) {
+ if (size <= 0) {
+ if (storageManager != null) {
+ try {
+ storageManager.deletePageTransactional(this.recordID);
+ } catch (Exception e) {
+ ActiveMQServerLogger.LOGGER.pageTxDeleteError(e, recordID);
+ }
+
+ pagingManager.removeTransaction(this.transactionID);
+ }
+ return false;
+ } else {
+ return true;
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c10b7441/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index 7aa4096..5441368 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -839,6 +839,8 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
List<PreparedTransactionInfo> preparedTransactions = new ArrayList<>();
+ Set<PageTransactionInfo> invalidPageTransactions = null;
+
Map<Long, Message> messages = new HashMap<>();
readLock();
try {
@@ -971,6 +973,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
break;
}
case JournalRecordIds.PAGE_TRANSACTION: {
+ PageTransactionInfo invalidPGTx = null;
if (record.isUpdate) {
PageUpdateTXEncoding pageUpdate = new PageUpdateTXEncoding();
@@ -981,7 +984,9 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
if (pageTX == null) {
ActiveMQServerLogger.LOGGER.journalCannotFindPageTX(pageUpdate.pageTX);
} else {
- pageTX.onUpdate(pageUpdate.recods, null, null);
+ if (!pageTX.onUpdate(pageUpdate.recods, null, null)) {
+ invalidPGTx = pageTX;
+ }
}
} else {
PageTransactionInfoImpl pageTransactionInfo = new PageTransactionInfoImpl();
@@ -991,6 +996,17 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
pageTransactionInfo.setRecordID(record.id);
pagingManager.addTransaction(pageTransactionInfo);
+
+ if (!pageTransactionInfo.checkSize(null, null)) {
+ invalidPGTx = pageTransactionInfo;
+ }
+ }
+
+ if (invalidPGTx != null) {
+ if (invalidPageTransactions == null) {
+ invalidPageTransactions = new HashSet<>();
+ }
+ invalidPageTransactions.add(invalidPGTx);
}
break;
@@ -1170,6 +1186,9 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
}
journalLoader.postLoad(messageJournal, resourceManager, duplicateIDMap);
+
+ checkInvalidPageTransactions(pagingManager, invalidPageTransactions);
+
journalLoaded = true;
return info;
} finally {
@@ -1177,6 +1196,15 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
}
}
+ public void checkInvalidPageTransactions(PagingManager pagingManager,
+ Set<PageTransactionInfo> invalidPageTransactions) {
+ if (invalidPageTransactions != null) {
+ for (PageTransactionInfo pginfo : invalidPageTransactions) {
+ pginfo.checkSize(this, pagingManager);
+ }
+ }
+ }
+
/**
* @param queueID
* @param pageSubscriptions
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c10b7441/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index bc14f79..356674b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -1464,7 +1464,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
for (MessageReference ref : refs) {
Message message = ref.getMessage();
- if (message.isDurable() && ref.getQueue().isDurable()) {
+ if (message.isDurable() && ref.getQueue().isDurableMessage()) {
message.decrementDurableRefCount();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c10b7441/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
index d1d3029..6af94ff 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
@@ -73,6 +73,34 @@ import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
*/
public interface ActiveMQServer extends ServiceComponent {
+
+ enum SERVER_STATE {
+ /**
+ * start() has been called but components are not initialized. The whole point of this state,
+ * is to be in a state which is different from {@link SERVER_STATE#STARTED} and
+ * {@link SERVER_STATE#STOPPED}, so that methods testing for these two values such as
+ * {@link #stop(boolean)} worked as intended.
+ */
+ STARTING, /**
+ * server is started. {@code server.isStarted()} returns {@code true}, and all assumptions
+ * about it hold.
+ */
+ STARTED, /**
+ * stop() was called but has not finished yet. Meant to avoids starting components while
+ * stop() is executing.
+ */
+ STOPPING, /**
+ * Stopped: either stop() has been called and has finished running, or start() has never been
+ * called.
+ */
+ STOPPED
+ }
+
+
+ void setState(SERVER_STATE state);
+
+ SERVER_STATE getState();
+
/**
* Sets the server identity.
* <p>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c10b7441/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index d4ec406..b39f4da 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -53,6 +53,13 @@ public interface Queue extends Bindable,CriticalComponent {
boolean isDurable();
+ /**
+ * The queue definition could be durable, but the messages could eventually be considered non durable.
+ * (e.g. purgeOnNoConsumers)
+ * @return
+ */
+ boolean isDurableMessage();
+
boolean isTemporary();
boolean isAutoCreated();
@@ -161,7 +168,11 @@ public interface Queue extends Bindable,CriticalComponent {
int deleteMatchingReferences(Filter filter) throws Exception;
- int deleteMatchingReferences(int flushLImit, Filter filter) throws Exception;
+ default int deleteMatchingReferences(int flushLImit, Filter filter) throws Exception {
+ return deleteMatchingReferences(flushLImit, filter, AckReason.NORMAL);
+ }
+
+ int deleteMatchingReferences(int flushLImit, Filter filter, AckReason ackReason) throws Exception;
boolean expireReference(long messageID) throws Exception;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c10b7441/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 047e839..6ce681e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -207,28 +207,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
private HAPolicy haPolicy;
- enum SERVER_STATE {
- /**
- * start() has been called but components are not initialized. The whole point of this state,
- * is to be in a state which is different from {@link SERVER_STATE#STARTED} and
- * {@link SERVER_STATE#STOPPED}, so that methods testing for these two values such as
- * {@link #stop(boolean)} worked as intended.
- */
- STARTING, /**
- * server is started. {@code server.isStarted()} returns {@code true}, and all assumptions
- * about it hold.
- */
- STARTED, /**
- * stop() was called but has not finished yet. Meant to avoids starting components while
- * stop() is executing.
- */
- STOPPING, /**
- * Stopped: either stop() has been called and has finished running, or start() has never been
- * called.
- */
- STOPPED
- }
-
private volatile SERVER_STATE state = SERVER_STATE.STOPPED;
private final Version version;
@@ -712,10 +690,12 @@ public class ActiveMQServerImpl implements ActiveMQServer {
super.finalize();
}
+ @Override
public void setState(SERVER_STATE state) {
this.state = state;
}
+ @Override
public SERVER_STATE getState() {
return state;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c10b7441/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 77adc85..fd13652 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -139,7 +139,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private volatile Filter filter;
- private final boolean durable;
+ private final boolean propertyDurable;
private final boolean temporary;
@@ -405,7 +405,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
this.pageSubscription = pageSubscription;
- this.durable = durable;
+ this.propertyDurable = durable;
this.temporary = temporary;
@@ -495,7 +495,12 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@Override
public boolean isDurable() {
- return durable;
+ return propertyDurable;
+ }
+
+ @Override
+ public boolean isDurableMessage() {
+ return propertyDurable && !purgeOnNoConsumers;
}
@Override
@@ -1126,7 +1131,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
} else {
Message message = ref.getMessage();
- boolean durableRef = message.isDurable() && durable;
+ boolean durableRef = message.isDurable() && isDurableMessage();
if (durableRef) {
storageManager.storeAcknowledge(id, message.getMessageID());
@@ -1161,7 +1166,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
} else {
Message message = ref.getMessage();
- boolean durableRef = message.isDurable() && durable;
+ boolean durableRef = message.isDurable() && isDurableMessage();
if (durableRef) {
storageManager.storeAcknowledgeTransactional(tx.getID(), id, message.getMessageID());
@@ -1189,7 +1194,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
public void reacknowledge(final Transaction tx, final MessageReference ref) throws Exception {
Message message = ref.getMessage();
- if (message.isDurable() && durable) {
+ if (message.isDurable() && isDurableMessage()) {
tx.setContainsPersistent();
}
@@ -1372,12 +1377,12 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
@Override
- public synchronized int deleteMatchingReferences(final int flushLimit, final Filter filter1) throws Exception {
+ public synchronized int deleteMatchingReferences(final int flushLimit, final Filter filter1, AckReason ackReason) throws Exception {
return iterQueue(flushLimit, filter1, new QueueIterateAction() {
@Override
public void actMessage(Transaction tx, MessageReference ref) throws Exception {
incDelivering();
- acknowledge(tx, ref);
+ acknowledge(tx, ref, ackReason);
refRemoved(ref);
}
});
@@ -2385,7 +2390,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
return true;
}
- if (!internalQueue && message.isDurable() && durable && !reference.isPaged()) {
+ if (!internalQueue && message.isDurable() && isDurableMessage() && !reference.isPaged()) {
storageManager.updateDeliveryCount(reference);
}
@@ -2414,7 +2419,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
reference.setScheduledDeliveryTime(timeBase + redeliveryDelay);
- if (!reference.isPaged() && message.isDurable() && durable) {
+ if (!reference.isPaged() && message.isDurable() && isDurableMessage()) {
storageManager.updateScheduledDeliveryTime(reference);
}
}
@@ -2858,7 +2863,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (message == null)
return;
- boolean durableRef = message.isDurable() && queue.durable;
+ boolean durableRef = message.isDurable() && queue.isDurableMessage();
try {
message.decrementRefCount();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c10b7441/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java
index be83aca..23b0e5d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java
@@ -59,7 +59,7 @@ public class QueueManagerImpl extends ReferenceCounterUtil implements QueueManag
ActiveMQServerLogger.LOGGER.debug("purging queue \"" + queueName + ".\" consumerCount = " + consumerCount + "; messageCount = " + messageCount);
}
try {
- queue.deleteAllReferences();
+ queue.deleteMatchingReferences(QueueImpl.DEFAULT_FLUSH_LIMIT, null, AckReason.KILLED);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.failedToPurgeQueue(e, queueName);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c10b7441/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
index feb12f9..29a70e4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
@@ -60,7 +60,7 @@ public final class RoutingContextImpl implements RoutingContext {
RouteContextList listing = getContextListing(address);
- if (queue.isDurable()) {
+ if (queue.isDurableMessage()) {
listing.getDurableQueues().add(queue);
} else {
listing.getNonDurableQueues().add(queue);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c10b7441/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 15b1465..e1c6c8d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -409,7 +409,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
// If updateDeliveries = false (set by strict-update),
// the updateDeliveryCountAfterCancel would still be updated after c
if (strictUpdateDeliveryCount && !ref.isPaged()) {
- if (ref.getMessage().isDurable() && ref.getQueue().isDurable() &&
+ if (ref.getMessage().isDurable() && ref.getQueue().isDurableMessage() &&
!ref.getQueue().isInternalQueue() &&
!ref.isPaged()) {
storageManager.updateDeliveryCount(ref);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c10b7441/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
index 8a7e009..9b9830a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
@@ -247,7 +247,7 @@ public class ManagementServiceImpl implements ManagementService {
QueueControlImpl queueControl = new QueueControlImpl(queue, addressInfo.getName().toString(), postOffice, storageManager, securityStore, addressSettingsRepository);
if (messageCounterManager != null) {
- MessageCounter counter = new MessageCounter(queue.getName().toString(), null, queue, false, queue.isDurable(), messageCounterManager.getMaxDayCount());
+ MessageCounter counter = new MessageCounter(queue.getName().toString(), null, queue, false, queue.isDurableMessage(), messageCounterManager.getMaxDayCount());
queueControl.setMessageCounter(counter);
messageCounterManager.registerMessageCounter(queue.getName().toString(), counter);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c10b7441/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index a492efd..256a670 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -860,6 +860,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
+ public boolean isDurableMessage() {
+ return false;
+ }
+
+ @Override
public boolean isTemporary() {
return false;
}
@@ -1087,7 +1092,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
- public int deleteMatchingReferences(int flushLImit, Filter filter) throws Exception {
+ public int deleteMatchingReferences(int flushLImit, Filter filter, AckReason reason) throws Exception {
return 0;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c10b7441/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index 7b37879..f0afd9e 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -442,6 +442,12 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
}
@Override
+ public boolean isDurableMessage() {
+ // no-op
+ return false;
+ }
+
+ @Override
public boolean isDurable() {
// no-op
return false;
@@ -601,7 +607,7 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
}
@Override
- public int deleteMatchingReferences(int flushLImit, Filter filter) throws Exception {
+ public int deleteMatchingReferences(int flushLImit, Filter filter, AckReason reason) throws Exception {
return 0;
}
[3/5] activemq-artemis git commit: ARTEMIS-1638 & ARTEMIS-1641 Making
sure Paging survives Purge on a test & cleanup PgTX (test)
Posted by an...@apache.org.
ARTEMIS-1638 & ARTEMIS-1641 Making sure Paging survives Purge on a test & cleanup PgTX (test)
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/59d2ac53
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/59d2ac53
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/59d2ac53
Branch: refs/heads/master
Commit: 59d2ac53ff8958daaab42af49ea34965aff47aea
Parents: adb466b
Author: Clebert Suconic <cl...@apache.org>
Authored: Fri Jan 26 23:05:43 2018 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Jan 26 23:24:56 2018 -0500
----------------------------------------------------------------------
.../tests/integration/paging/PagingTest.java | 110 ++++++++++++++++++-
1 file changed, 108 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/59d2ac53/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
index fc48182..3de9203 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
@@ -16,6 +16,10 @@
*/
package org.apache.activemq.artemis.tests.integration.paging;
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.io.File;
@@ -65,6 +69,7 @@ import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
import org.apache.activemq.artemis.core.paging.cursor.impl.PageCursorProviderImpl;
import org.apache.activemq.artemis.core.paging.cursor.impl.PagePositionImpl;
+import org.apache.activemq.artemis.core.paging.impl.PageTransactionInfoImpl;
import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryNIO;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.StorageManager;
@@ -78,8 +83,10 @@ import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManagerImpl;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
@@ -382,6 +389,105 @@ public class PagingTest extends ActiveMQTestBase {
System.out.println("pgComplete = " + pgComplete);
}
+
+ @Test
+ public void testPurge() throws Exception {
+ clearDataRecreateServerDirs();
+
+ Configuration config = createDefaultNettyConfig().setJournalSyncNonTransactional(false);
+
+ server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
+
+ server.start();
+
+ String queue = "purgeQueue";
+ SimpleString ssQueue = new SimpleString(queue);
+ server.addAddressInfo(new AddressInfo(ssQueue, RoutingType.ANYCAST));
+ QueueImpl purgeQueue = (QueueImpl)server.createQueue(ssQueue, RoutingType.ANYCAST, ssQueue, null, true, false, 1, true, false);
+
+ ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
+ Connection connection = cf.createConnection();
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ javax.jms.Queue jmsQueue = session.createQueue(queue);
+
+ MessageProducer producer = session.createProducer(jmsQueue);
+
+ for (int i = 0; i < 100; i++) {
+ producer.send(session.createTextMessage("hello" + i));
+ }
+ session.commit();
+
+ Wait.assertEquals(0, purgeQueue::getMessageCount);
+
+ Assert.assertEquals(0, purgeQueue.getPageSubscription().getPagingStore().getAddressSize());
+
+ MessageConsumer consumer = session.createConsumer(jmsQueue);
+
+ for (int i = 0; i < 100; i++) {
+ producer.send(session.createTextMessage("hello" + i));
+ if (i == 10) {
+ purgeQueue.getPageSubscription().getPagingStore().startPaging();
+ }
+ }
+ session.commit();
+
+ consumer.close();
+
+ Wait.assertEquals(0, purgeQueue::getMessageCount);
+
+ Wait.assertFalse(purgeQueue.getPageSubscription()::isPaging);
+
+ Wait.assertEquals(0, purgeQueue.getPageSubscription().getPagingStore()::getAddressSize);
+
+ purgeQueue.getPageSubscription().getPagingStore().startPaging();
+
+ Wait.assertTrue(purgeQueue.getPageSubscription()::isPaging);
+
+ consumer = session.createConsumer(jmsQueue);
+
+ for (int i = 0; i < 100; i++) {
+ purgeQueue.getPageSubscription().getPagingStore().startPaging();
+ Assert.assertTrue(purgeQueue.getPageSubscription().isPaging());
+ producer.send(session.createTextMessage("hello" + i));
+ if (i % 2 == 0) {
+ session.commit();
+ }
+ }
+
+ session.commit();
+
+ connection.start();
+
+
+ server.getStorageManager().getMessageJournal().scheduleCompactAndBlock(50000);
+ Assert.assertNotNull(consumer.receive(5000));
+ session.commit();
+
+ consumer.close();
+
+ Wait.assertEquals(0, purgeQueue::getMessageCount);
+ Wait.assertEquals(0, purgeQueue.getPageSubscription().getPagingStore()::getAddressSize);
+ Wait.assertFalse(purgeQueue.getPageSubscription()::isPaging);
+
+ StorageManager sm = server.getStorageManager();
+
+
+ for (int i = 0; i < 1000; i++) {
+ long tx = sm.generateID();
+ PageTransactionInfoImpl txinfo = new PageTransactionInfoImpl(tx);
+ sm.storePageTransaction(tx, txinfo);
+ sm.commit(tx);
+ tx = sm.generateID();
+ sm.updatePageTransaction(tx, txinfo, 1);
+ sm.commit(tx);
+ }
+
+ server.stop();
+ server.start();
+ Assert.assertEquals(0, server.getPagingManager().getTransactions().size());
+ }
+
+
// First page is complete but it wasn't deleted
@Test
public void testFirstPageCompleteNotDeleted() throws Exception {
@@ -5680,8 +5786,8 @@ public class PagingTest extends ActiveMQTestBase {
}
@Override
- protected Configuration createDefaultInVMConfig() throws Exception {
- Configuration configuration = super.createDefaultInVMConfig().setJournalSyncNonTransactional(false);
+ protected Configuration createDefaultConfig(final int serverID, final boolean netty) throws Exception {
+ Configuration configuration = super.createDefaultConfig(serverID, netty);
if (storeType == StoreConfiguration.StoreType.DATABASE) {
setDBStoreType(configuration);
} else if (mapped) {
[5/5] activemq-artemis git commit: This closes #1818 ARTEMIS-1638
Fixing Purge on rollback & paging
Posted by an...@apache.org.
This closes #1818 ARTEMIS-1638 Fixing Purge on rollback & paging
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/b66d0f7a
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/b66d0f7a
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/b66d0f7a
Branch: refs/heads/master
Commit: b66d0f7ac40001cce14ca7146e74720504ff9eb1
Parents: c671fa0 c10b744
Author: Andy Taylor <an...@gmail.com>
Authored: Mon Jan 29 07:46:39 2018 +0000
Committer: Andy Taylor <an...@gmail.com>
Committed: Mon Jan 29 07:46:39 2018 +0000
----------------------------------------------------------------------
.../core/paging/PageTransactionInfo.java | 4 +-
.../paging/impl/PageTransactionInfoImpl.java | 32 +++--
.../journal/AbstractJournalStorageManager.java | 30 ++++-
.../core/postoffice/impl/PostOfficeImpl.java | 2 +-
.../artemis/core/server/ActiveMQServer.java | 28 ++++
.../activemq/artemis/core/server/Queue.java | 13 +-
.../core/server/impl/ActiveMQServerImpl.java | 24 +---
.../artemis/core/server/impl/QueueImpl.java | 42 ++++--
.../core/server/impl/QueueManagerImpl.java | 2 +-
.../core/server/impl/RoutingContextImpl.java | 2 +-
.../core/server/impl/ServerConsumerImpl.java | 2 +-
.../management/impl/ManagementServiceImpl.java | 2 +-
.../impl/ScheduledDeliveryHandlerTest.java | 7 +-
.../integration/amqp/AmqpClientTestSupport.java | 5 +
.../amqp/AmqpPurgeOnNoConsumersTest.java | 127 ++++++++++++++-----
.../tests/integration/paging/PagingTest.java | 110 +++++++++++++++-
.../unit/core/postoffice/impl/FakeQueue.java | 8 +-
17 files changed, 357 insertions(+), 83 deletions(-)
----------------------------------------------------------------------