You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by an...@apache.org on 2018/01/29 07:47:38 UTC

[1/5] activemq-artemis git commit: ARTEMIS-1638 Fixing Purge rollback behaviour (fix)

Repository: activemq-artemis
Updated Branches:
  refs/heads/master c671fa078 -> b66d0f7ac


ARTEMIS-1638 Fixing Purge rollback behaviour (fix)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/adb466b2
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/adb466b2
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/adb466b2

Branch: refs/heads/master
Commit: adb466b2f8d7e4fb2045804fe48cfd82d710f0d1
Parents: 69429e4
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Jan 25 21:17:38 2018 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Jan 26 23:24:55 2018 -0500

----------------------------------------------------------------------
 .../activemq/artemis/core/server/impl/QueueImpl.java | 15 +++++++++++++++
 1 file changed, 15 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/adb466b2/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index f6b6e00..77adc85 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -2896,11 +2896,26 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    void postRollback(final LinkedList<MessageReference> refs) {
       //if we have purged then ignore adding the messages back
       if (purgeOnNoConsumers && getConsumerCount() == 0) {
+         purgeAfterRollback(refs);
+
          return;
       }
       addHead(refs, false);
    }
 
+   private void purgeAfterRollback(LinkedList<MessageReference> refs) {
+      try {
+         Transaction transaction = new TransactionImpl(storageManager);
+         for (MessageReference reference : refs) {
+            incDelivering(); // post ack will decrement this, so need to inc
+            acknowledge(transaction, reference, AckReason.KILLED);
+         }
+         transaction.commit();
+      } catch (Exception e) {
+         logger.warn(e.getMessage(), e);
+      }
+   }
+
    private long calculateRedeliveryDelay(final AddressSettings addressSettings, final int deliveryCount) {
       long redeliveryDelay = addressSettings.getRedeliveryDelay();
       long maxRedeliveryDelay = addressSettings.getMaxRedeliveryDelay();


[2/5] activemq-artemis git commit: ARTEMIS-1638 Fixing Purge rollback behaviour (test only)

Posted by an...@apache.org.
ARTEMIS-1638 Fixing Purge rollback behaviour (test only)

Having a commit with just a test will make it easy for developers to checkout just this branch
and validate the issue.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/69429e4e
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/69429e4e
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/69429e4e

Branch: refs/heads/master
Commit: 69429e4e230960c2bd29e031fc270a4a72dac148
Parents: c671fa0
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Jan 25 21:17:25 2018 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Jan 26 23:24:55 2018 -0500

----------------------------------------------------------------------
 .../integration/amqp/AmqpClientTestSupport.java |   5 +
 .../amqp/AmqpPurgeOnNoConsumersTest.java        | 127 ++++++++++++++-----
 2 files changed, 103 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/69429e4e/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
index 044183f..a01c975 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
@@ -304,6 +304,10 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
    }
 
    protected void sendMessages(String destinationName, int count, RoutingType routingType) throws Exception {
+      sendMessages(destinationName, count, routingType, false);
+   }
+
+   protected void sendMessages(String destinationName, int count, RoutingType routingType, boolean durable) throws Exception {
       AmqpClient client = createAmqpClient();
       AmqpConnection connection = addConnection(client.connect());
       try {
@@ -313,6 +317,7 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
          for (int i = 0; i < count; ++i) {
             AmqpMessage message = new AmqpMessage();
             message.setMessageId("MessageID:" + i);
+            message.setDurable(true);
             if (routingType != null) {
                message.setMessageAnnotation(AMQPMessageSupport.ROUTING_TYPE.toString(), routingType.getType());
             }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/69429e4e/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpPurgeOnNoConsumersTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpPurgeOnNoConsumersTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpPurgeOnNoConsumersTest.java
index 9ded61c..ae4849b 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpPurgeOnNoConsumersTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpPurgeOnNoConsumersTest.java
@@ -16,10 +16,20 @@
  */
 package org.apache.activemq.artemis.tests.integration.amqp;
 
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.server.impl.QueueImpl;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
 import org.apache.activemq.artemis.tests.util.Wait;
 import org.apache.activemq.transport.amqp.client.AmqpClient;
 import org.apache.activemq.transport.amqp.client.AmqpConnection;
@@ -28,57 +38,116 @@ import org.apache.activemq.transport.amqp.client.AmqpReceiver;
 import org.apache.activemq.transport.amqp.client.AmqpSession;
 import org.junit.Test;
 
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-
 public class AmqpPurgeOnNoConsumersTest extends AmqpClientTestSupport {
 
+   @Override
+   protected String getConfiguredProtocols() {
+      return "AMQP,OPENWIRE,CORE";
+   }
+
    @Test(timeout = 60000)
    public void testQueueReceiverReadMessage() throws Exception {
+      AmqpConnection connection = null;
       String queue = "purgeQueue";
       SimpleString ssQueue = new SimpleString(queue);
+
       server.addAddressInfo(new AddressInfo(ssQueue, RoutingType.ANYCAST));
       server.createQueue(ssQueue, RoutingType.ANYCAST, ssQueue, null, true, false, 1, true, false);
 
       AmqpClient client = createAmqpClient();
-      AmqpConnection connection = addConnection(client.connect());
+      connection = addConnection(client.connect());
       AmqpSession session = connection.createSession();
 
       final AmqpReceiver receiver = session.createReceiver(queue);
 
-      Queue queueView = getProxyToQueue(queue);
+      QueueImpl queueView = (QueueImpl)getProxyToQueue(queue);
+      assertEquals(0L, queueView.getPageSubscription().getPagingStore().getAddressSize());
       assertEquals(0, queueView.getMessageCount());
 
-      Thread t = new Thread(new Runnable() {
-         @Override
-         public void run() {
-            for (int i = 0; i < 4; i++) {
-               try {
-                  AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS);
-                  receive.accept();
-                  assertNotNull(receive);
-               } catch (Exception e) {
-                  e.printStackTrace();
-               }
-            }
-            try {
-               receiver.close();
-            } catch (IOException e) {
-               e.printStackTrace();
-            }
-         }
-      });
+      sendMessages(queue, 5, null, true);
 
-      t.start();
+      Wait.assertEquals(5, queueView::getMessageCount);
 
       receiver.flow(5);
 
-      sendMessages(queue, 5);
-
-      t.join(5000);
+      for (int i = 0; i < 4; i++) {
+         try {
+            AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS);
+            receive.accept();
+            assertNotNull(receive);
+         } catch (Exception e) {
+            e.printStackTrace();
+         }
+      }
+      try {
+         receiver.close();
+      } catch (IOException e) {
+         e.printStackTrace();
+      }
 
       Wait.assertEquals(0, queueView::getMessageCount);
+      assertEquals(0L, queueView.getPageSubscription().getPagingStore().getAddressSize());
 
       connection.close();
+
+      server.stop();
+
+      server.start();
+
+      queueView = (QueueImpl)getProxyToQueue(queue);
+
+      assertEquals(0, queueView.getMessageCount());
+      assertEquals(0L, queueView.getPageSubscription().getPagingStore().getAddressSize());
+   }
+
+
+   // I'm adding the core test here to compare semantics between AMQP and core on this test.
+   @Test(timeout = 60000)
+   public void testPurgeQueueCoreRollback() throws Exception {
+      String queue = "purgeQueue";
+      SimpleString ssQueue = new SimpleString(queue);
+      server.addAddressInfo(new AddressInfo(ssQueue, RoutingType.ANYCAST));
+      server.createQueue(ssQueue, RoutingType.ANYCAST, ssQueue, null, true, false, 1, true, false);
+
+      ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:5672");
+      Connection connection = cf.createConnection();
+      Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+      MessageProducer producer = session.createProducer(session.createQueue("purgeQueue"));
+
+      javax.jms.Queue jmsQueue = session.createQueue(queue);
+      MessageConsumer consumer = session.createConsumer(jmsQueue);
+
+      for (int i = 0; i < 10; i++) {
+         Message message = session.createTextMessage("hello " + i);
+         producer.send(message);
+      }
+      session.commit();
+
+      QueueImpl queueView = (QueueImpl)getProxyToQueue(queue);
+
+      Wait.assertEquals(10, queueView::getMessageCount);
+
+      connection.start();
+
+
+      for (int i = 0; i < 10; i++) {
+         TextMessage txt = (TextMessage)consumer.receive(1000);
+         assertNotNull(txt);
+         assertEquals("hello " + i, txt.getText());
+      }
+      consumer.close();
+      session.rollback();
+      connection.close();
+
+      Wait.assertEquals(0, queueView::getMessageCount);
+
+      server.stop();
+
+      server.start();
+
+      queueView = (QueueImpl)getProxyToQueue(queue);
+
+      assertEquals(0, queueView.getMessageCount());
+      assertEquals(0L, queueView.getPageSubscription().getPagingStore().getAddressSize());
    }
 }


[4/5] activemq-artemis git commit: ARTEMIS-1638 & ARTEMIS-1641 Making sure Paging survives Purge on a test & cleanup PgTX (fix)

Posted by an...@apache.org.
ARTEMIS-1638 & ARTEMIS-1641 Making sure Paging survives Purge on a test & cleanup PgTX (fix)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/c10b7441
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/c10b7441
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/c10b7441

Branch: refs/heads/master
Commit: c10b74412a335f6c930c3dddace11ef300215d4f
Parents: 59d2ac5
Author: Clebert Suconic <cl...@apache.org>
Authored: Fri Jan 26 23:05:52 2018 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Jan 26 23:24:56 2018 -0500

----------------------------------------------------------------------
 .../core/paging/PageTransactionInfo.java        |  4 ++-
 .../paging/impl/PageTransactionInfoImpl.java    | 32 ++++++++++++++------
 .../journal/AbstractJournalStorageManager.java  | 30 +++++++++++++++++-
 .../core/postoffice/impl/PostOfficeImpl.java    |  2 +-
 .../artemis/core/server/ActiveMQServer.java     | 28 +++++++++++++++++
 .../activemq/artemis/core/server/Queue.java     | 13 +++++++-
 .../core/server/impl/ActiveMQServerImpl.java    | 24 ++-------------
 .../artemis/core/server/impl/QueueImpl.java     | 27 ++++++++++-------
 .../core/server/impl/QueueManagerImpl.java      |  2 +-
 .../core/server/impl/RoutingContextImpl.java    |  2 +-
 .../core/server/impl/ServerConsumerImpl.java    |  2 +-
 .../management/impl/ManagementServiceImpl.java  |  2 +-
 .../impl/ScheduledDeliveryHandlerTest.java      |  7 ++++-
 .../unit/core/postoffice/impl/FakeQueue.java    |  8 ++++-
 14 files changed, 131 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c10b7441/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PageTransactionInfo.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PageTransactionInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PageTransactionInfo.java
index 4fb49ea..90aaade 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PageTransactionInfo.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PageTransactionInfo.java
@@ -51,7 +51,9 @@ public interface PageTransactionInfo extends EncodingSupport {
                      int increment) throws Exception;
 
    // To be used after the update was stored or reload
-   void onUpdate(int update, StorageManager storageManager, PagingManager pagingManager);
+   boolean onUpdate(int update, StorageManager storageManager, PagingManager pagingManager);
+
+   boolean checkSize(StorageManager storageManager, PagingManager pagingManager);
 
    void increment(int durableSize, int nonDurableSize);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c10b7441/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java
index b793aec..70594ca 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java
@@ -89,16 +89,30 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo {
    }
 
    @Override
-   public void onUpdate(final int update, final StorageManager storageManager, PagingManager pagingManager) {
-      int sizeAfterUpdate = numberOfMessages.addAndGet(-update);
-      if (sizeAfterUpdate == 0 && storageManager != null) {
-         try {
-            storageManager.deletePageTransactional(this.recordID);
-         } catch (Exception e) {
-            ActiveMQServerLogger.LOGGER.pageTxDeleteError(e, recordID);
-         }
+   public boolean onUpdate(final int update, final StorageManager storageManager, PagingManager pagingManager) {
+      int afterUpdate = numberOfMessages.addAndGet(-update);
+      return internalCheckSize(storageManager, pagingManager, afterUpdate);
+   }
+
+   @Override
+   public boolean checkSize(StorageManager storageManager, PagingManager pagingManager) {
+      return internalCheckSize(storageManager, pagingManager, numberOfMessages.get());
+   }
 
-         pagingManager.removeTransaction(this.transactionID);
+   public boolean internalCheckSize(StorageManager storageManager, PagingManager pagingManager, int size) {
+      if (size <= 0) {
+         if (storageManager != null) {
+            try {
+               storageManager.deletePageTransactional(this.recordID);
+            } catch (Exception e) {
+               ActiveMQServerLogger.LOGGER.pageTxDeleteError(e, recordID);
+            }
+
+            pagingManager.removeTransaction(this.transactionID);
+         }
+         return false;
+      } else {
+         return true;
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c10b7441/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index 7aa4096..5441368 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -839,6 +839,8 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
 
       List<PreparedTransactionInfo> preparedTransactions = new ArrayList<>();
 
+      Set<PageTransactionInfo> invalidPageTransactions = null;
+
       Map<Long, Message> messages = new HashMap<>();
       readLock();
       try {
@@ -971,6 +973,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
                   break;
                }
                case JournalRecordIds.PAGE_TRANSACTION: {
+                  PageTransactionInfo invalidPGTx = null;
                   if (record.isUpdate) {
                      PageUpdateTXEncoding pageUpdate = new PageUpdateTXEncoding();
 
@@ -981,7 +984,9 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
                      if (pageTX == null) {
                         ActiveMQServerLogger.LOGGER.journalCannotFindPageTX(pageUpdate.pageTX);
                      } else {
-                        pageTX.onUpdate(pageUpdate.recods, null, null);
+                        if (!pageTX.onUpdate(pageUpdate.recods, null, null)) {
+                           invalidPGTx = pageTX;
+                        }
                      }
                   } else {
                      PageTransactionInfoImpl pageTransactionInfo = new PageTransactionInfoImpl();
@@ -991,6 +996,17 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
                      pageTransactionInfo.setRecordID(record.id);
 
                      pagingManager.addTransaction(pageTransactionInfo);
+
+                     if (!pageTransactionInfo.checkSize(null, null)) {
+                        invalidPGTx = pageTransactionInfo;
+                     }
+                  }
+
+                  if (invalidPGTx != null) {
+                     if (invalidPageTransactions == null) {
+                        invalidPageTransactions = new HashSet<>();
+                     }
+                     invalidPageTransactions.add(invalidPGTx);
                   }
 
                   break;
@@ -1170,6 +1186,9 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
          }
 
          journalLoader.postLoad(messageJournal, resourceManager, duplicateIDMap);
+
+         checkInvalidPageTransactions(pagingManager, invalidPageTransactions);
+
          journalLoaded = true;
          return info;
       } finally {
@@ -1177,6 +1196,15 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
       }
    }
 
+   public void checkInvalidPageTransactions(PagingManager pagingManager,
+                                            Set<PageTransactionInfo> invalidPageTransactions) {
+      if (invalidPageTransactions != null) {
+         for (PageTransactionInfo pginfo : invalidPageTransactions) {
+            pginfo.checkSize(this, pagingManager);
+         }
+      }
+   }
+
    /**
     * @param queueID
     * @param pageSubscriptions

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c10b7441/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index bc14f79..356674b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -1464,7 +1464,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
          for (MessageReference ref : refs) {
             Message message = ref.getMessage();
 
-            if (message.isDurable() && ref.getQueue().isDurable()) {
+            if (message.isDurable() && ref.getQueue().isDurableMessage()) {
                message.decrementDurableRefCount();
             }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c10b7441/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
index d1d3029..6af94ff 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
@@ -73,6 +73,34 @@ import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
  */
 public interface ActiveMQServer extends ServiceComponent {
 
+
+   enum SERVER_STATE {
+      /**
+       * start() has been called but components are not initialized. The whole point of this state,
+       * is to be in a state which is different from {@link SERVER_STATE#STARTED} and
+       * {@link SERVER_STATE#STOPPED}, so that methods testing for these two values such as
+       * {@link #stop(boolean)} worked as intended.
+       */
+      STARTING, /**
+       * server is started. {@code server.isStarted()} returns {@code true}, and all assumptions
+       * about it hold.
+       */
+      STARTED, /**
+       * stop() was called but has not finished yet. Meant to avoids starting components while
+       * stop() is executing.
+       */
+      STOPPING, /**
+       * Stopped: either stop() has been called and has finished running, or start() has never been
+       * called.
+       */
+      STOPPED
+   }
+
+
+   void setState(SERVER_STATE state);
+
+   SERVER_STATE getState();
+
    /**
     * Sets the server identity.
     * <p>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c10b7441/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index d4ec406..b39f4da 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -53,6 +53,13 @@ public interface Queue extends Bindable,CriticalComponent {
 
    boolean isDurable();
 
+   /**
+    * The queue definition could be durable, but the messages could eventually be considered non durable.
+    * (e.g. purgeOnNoConsumers)
+    * @return
+    */
+   boolean isDurableMessage();
+
    boolean isTemporary();
 
    boolean isAutoCreated();
@@ -161,7 +168,11 @@ public interface Queue extends Bindable,CriticalComponent {
 
    int deleteMatchingReferences(Filter filter) throws Exception;
 
-   int deleteMatchingReferences(int flushLImit, Filter filter) throws Exception;
+   default int deleteMatchingReferences(int flushLImit, Filter filter) throws Exception {
+      return deleteMatchingReferences(flushLImit, filter, AckReason.NORMAL);
+   }
+
+   int deleteMatchingReferences(int flushLImit, Filter filter, AckReason ackReason) throws Exception;
 
    boolean expireReference(long messageID) throws Exception;
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c10b7441/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 047e839..6ce681e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -207,28 +207,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
 
    private HAPolicy haPolicy;
 
-   enum SERVER_STATE {
-      /**
-       * start() has been called but components are not initialized. The whole point of this state,
-       * is to be in a state which is different from {@link SERVER_STATE#STARTED} and
-       * {@link SERVER_STATE#STOPPED}, so that methods testing for these two values such as
-       * {@link #stop(boolean)} worked as intended.
-       */
-      STARTING, /**
-       * server is started. {@code server.isStarted()} returns {@code true}, and all assumptions
-       * about it hold.
-       */
-      STARTED, /**
-       * stop() was called but has not finished yet. Meant to avoids starting components while
-       * stop() is executing.
-       */
-      STOPPING, /**
-       * Stopped: either stop() has been called and has finished running, or start() has never been
-       * called.
-       */
-      STOPPED
-   }
-
    private volatile SERVER_STATE state = SERVER_STATE.STOPPED;
 
    private final Version version;
@@ -712,10 +690,12 @@ public class ActiveMQServerImpl implements ActiveMQServer {
       super.finalize();
    }
 
+   @Override
    public void setState(SERVER_STATE state) {
       this.state = state;
    }
 
+   @Override
    public SERVER_STATE getState() {
       return state;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c10b7441/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 77adc85..fd13652 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -139,7 +139,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
    private volatile Filter filter;
 
-   private final boolean durable;
+   private final boolean propertyDurable;
 
    private final boolean temporary;
 
@@ -405,7 +405,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
       this.pageSubscription = pageSubscription;
 
-      this.durable = durable;
+      this.propertyDurable = durable;
 
       this.temporary = temporary;
 
@@ -495,7 +495,12 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
    @Override
    public boolean isDurable() {
-      return durable;
+      return propertyDurable;
+   }
+
+   @Override
+   public boolean isDurableMessage() {
+      return propertyDurable && !purgeOnNoConsumers;
    }
 
    @Override
@@ -1126,7 +1131,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       } else {
          Message message = ref.getMessage();
 
-         boolean durableRef = message.isDurable() && durable;
+         boolean durableRef = message.isDurable() && isDurableMessage();
 
          if (durableRef) {
             storageManager.storeAcknowledge(id, message.getMessageID());
@@ -1161,7 +1166,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       } else {
          Message message = ref.getMessage();
 
-         boolean durableRef = message.isDurable() && durable;
+         boolean durableRef = message.isDurable() && isDurableMessage();
 
          if (durableRef) {
             storageManager.storeAcknowledgeTransactional(tx.getID(), id, message.getMessageID());
@@ -1189,7 +1194,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    public void reacknowledge(final Transaction tx, final MessageReference ref) throws Exception {
       Message message = ref.getMessage();
 
-      if (message.isDurable() && durable) {
+      if (message.isDurable() && isDurableMessage()) {
          tx.setContainsPersistent();
       }
 
@@ -1372,12 +1377,12 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    }
 
    @Override
-   public synchronized int deleteMatchingReferences(final int flushLimit, final Filter filter1) throws Exception {
+   public synchronized int deleteMatchingReferences(final int flushLimit, final Filter filter1, AckReason ackReason) throws Exception {
       return iterQueue(flushLimit, filter1, new QueueIterateAction() {
          @Override
          public void actMessage(Transaction tx, MessageReference ref) throws Exception {
             incDelivering();
-            acknowledge(tx, ref);
+            acknowledge(tx, ref, ackReason);
             refRemoved(ref);
          }
       });
@@ -2385,7 +2390,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
          return true;
       }
 
-      if (!internalQueue && message.isDurable() && durable && !reference.isPaged()) {
+      if (!internalQueue && message.isDurable() && isDurableMessage() && !reference.isPaged()) {
          storageManager.updateDeliveryCount(reference);
       }
 
@@ -2414,7 +2419,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
             reference.setScheduledDeliveryTime(timeBase + redeliveryDelay);
 
-            if (!reference.isPaged() && message.isDurable() && durable) {
+            if (!reference.isPaged() && message.isDurable() && isDurableMessage()) {
                storageManager.updateScheduledDeliveryTime(reference);
             }
          }
@@ -2858,7 +2863,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       if (message == null)
          return;
 
-      boolean durableRef = message.isDurable() && queue.durable;
+      boolean durableRef = message.isDurable() && queue.isDurableMessage();
 
       try {
          message.decrementRefCount();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c10b7441/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java
index be83aca..23b0e5d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java
@@ -59,7 +59,7 @@ public class QueueManagerImpl extends ReferenceCounterUtil implements QueueManag
             ActiveMQServerLogger.LOGGER.debug("purging queue \"" + queueName + ".\" consumerCount = " + consumerCount + "; messageCount = " + messageCount);
          }
          try {
-            queue.deleteAllReferences();
+            queue.deleteMatchingReferences(QueueImpl.DEFAULT_FLUSH_LIMIT, null, AckReason.KILLED);
          } catch (Exception e) {
             ActiveMQServerLogger.LOGGER.failedToPurgeQueue(e, queueName);
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c10b7441/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
index feb12f9..29a70e4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
@@ -60,7 +60,7 @@ public final class RoutingContextImpl implements RoutingContext {
 
       RouteContextList listing = getContextListing(address);
 
-      if (queue.isDurable()) {
+      if (queue.isDurableMessage()) {
          listing.getDurableQueues().add(queue);
       } else {
          listing.getNonDurableQueues().add(queue);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c10b7441/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 15b1465..e1c6c8d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -409,7 +409,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
             // If updateDeliveries = false (set by strict-update),
             // the updateDeliveryCountAfterCancel would still be updated after c
             if (strictUpdateDeliveryCount && !ref.isPaged()) {
-               if (ref.getMessage().isDurable() && ref.getQueue().isDurable() &&
+               if (ref.getMessage().isDurable() && ref.getQueue().isDurableMessage() &&
                   !ref.getQueue().isInternalQueue() &&
                   !ref.isPaged()) {
                   storageManager.updateDeliveryCount(ref);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c10b7441/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
index 8a7e009..9b9830a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
@@ -247,7 +247,7 @@ public class ManagementServiceImpl implements ManagementService {
 
       QueueControlImpl queueControl = new QueueControlImpl(queue, addressInfo.getName().toString(), postOffice, storageManager, securityStore, addressSettingsRepository);
       if (messageCounterManager != null) {
-         MessageCounter counter = new MessageCounter(queue.getName().toString(), null, queue, false, queue.isDurable(), messageCounterManager.getMaxDayCount());
+         MessageCounter counter = new MessageCounter(queue.getName().toString(), null, queue, false, queue.isDurableMessage(), messageCounterManager.getMaxDayCount());
          queueControl.setMessageCounter(counter);
          messageCounterManager.registerMessageCounter(queue.getName().toString(), counter);
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c10b7441/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index a492efd..256a670 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -860,6 +860,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       @Override
+      public boolean isDurableMessage() {
+         return false;
+      }
+
+      @Override
       public boolean isTemporary() {
          return false;
       }
@@ -1087,7 +1092,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       @Override
-      public int deleteMatchingReferences(int flushLImit, Filter filter) throws Exception {
+      public int deleteMatchingReferences(int flushLImit, Filter filter, AckReason reason) throws Exception {
          return 0;
       }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c10b7441/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index 7b37879..f0afd9e 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -442,6 +442,12 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
    }
 
    @Override
+   public boolean isDurableMessage() {
+      // no-op
+      return false;
+   }
+
+   @Override
    public boolean isDurable() {
       // no-op
       return false;
@@ -601,7 +607,7 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
    }
 
    @Override
-   public int deleteMatchingReferences(int flushLImit, Filter filter) throws Exception {
+   public int deleteMatchingReferences(int flushLImit, Filter filter, AckReason reason) throws Exception {
       return 0;
    }
 


[3/5] activemq-artemis git commit: ARTEMIS-1638 & ARTEMIS-1641 Making sure Paging survives Purge on a test & cleanup PgTX (test)

Posted by an...@apache.org.
ARTEMIS-1638 & ARTEMIS-1641 Making sure Paging survives Purge on a test & cleanup PgTX (test)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/59d2ac53
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/59d2ac53
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/59d2ac53

Branch: refs/heads/master
Commit: 59d2ac53ff8958daaab42af49ea34965aff47aea
Parents: adb466b
Author: Clebert Suconic <cl...@apache.org>
Authored: Fri Jan 26 23:05:43 2018 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Jan 26 23:24:56 2018 -0500

----------------------------------------------------------------------
 .../tests/integration/paging/PagingTest.java    | 110 ++++++++++++++++++-
 1 file changed, 108 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/59d2ac53/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
index fc48182..3de9203 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
@@ -16,6 +16,10 @@
  */
 package org.apache.activemq.artemis.tests.integration.paging;
 
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
 import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
 import java.io.File;
@@ -65,6 +69,7 @@ import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
 import org.apache.activemq.artemis.core.paging.cursor.impl.PageCursorProviderImpl;
 import org.apache.activemq.artemis.core.paging.cursor.impl.PagePositionImpl;
+import org.apache.activemq.artemis.core.paging.impl.PageTransactionInfoImpl;
 import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryNIO;
 import org.apache.activemq.artemis.core.persistence.OperationContext;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
@@ -78,8 +83,10 @@ import org.apache.activemq.artemis.core.server.JournalType;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.server.impl.QueueImpl;
 import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
 import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
 import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManagerImpl;
 import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
@@ -382,6 +389,105 @@ public class PagingTest extends ActiveMQTestBase {
       System.out.println("pgComplete = " + pgComplete);
    }
 
+
+   @Test
+   public void testPurge() throws Exception {
+      clearDataRecreateServerDirs();
+
+      Configuration config = createDefaultNettyConfig().setJournalSyncNonTransactional(false);
+
+      server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
+
+      server.start();
+
+      String queue = "purgeQueue";
+      SimpleString ssQueue = new SimpleString(queue);
+      server.addAddressInfo(new AddressInfo(ssQueue, RoutingType.ANYCAST));
+      QueueImpl purgeQueue = (QueueImpl)server.createQueue(ssQueue, RoutingType.ANYCAST, ssQueue, null, true, false, 1, true, false);
+
+      ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
+      Connection connection = cf.createConnection();
+      Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+      javax.jms.Queue jmsQueue = session.createQueue(queue);
+
+      MessageProducer producer = session.createProducer(jmsQueue);
+
+      for (int i = 0; i < 100; i++) {
+         producer.send(session.createTextMessage("hello" + i));
+      }
+      session.commit();
+
+      Wait.assertEquals(0, purgeQueue::getMessageCount);
+
+      Assert.assertEquals(0, purgeQueue.getPageSubscription().getPagingStore().getAddressSize());
+
+      MessageConsumer consumer = session.createConsumer(jmsQueue);
+
+      for (int i = 0; i < 100; i++) {
+         producer.send(session.createTextMessage("hello" + i));
+         if (i == 10) {
+            purgeQueue.getPageSubscription().getPagingStore().startPaging();
+         }
+      }
+      session.commit();
+
+      consumer.close();
+
+      Wait.assertEquals(0, purgeQueue::getMessageCount);
+
+      Wait.assertFalse(purgeQueue.getPageSubscription()::isPaging);
+
+      Wait.assertEquals(0, purgeQueue.getPageSubscription().getPagingStore()::getAddressSize);
+
+      purgeQueue.getPageSubscription().getPagingStore().startPaging();
+
+      Wait.assertTrue(purgeQueue.getPageSubscription()::isPaging);
+
+      consumer = session.createConsumer(jmsQueue);
+
+      for (int i = 0; i < 100; i++) {
+         purgeQueue.getPageSubscription().getPagingStore().startPaging();
+         Assert.assertTrue(purgeQueue.getPageSubscription().isPaging());
+         producer.send(session.createTextMessage("hello" + i));
+         if (i % 2 == 0) {
+            session.commit();
+         }
+      }
+
+      session.commit();
+
+      connection.start();
+
+
+      server.getStorageManager().getMessageJournal().scheduleCompactAndBlock(50000);
+      Assert.assertNotNull(consumer.receive(5000));
+      session.commit();
+
+      consumer.close();
+
+      Wait.assertEquals(0, purgeQueue::getMessageCount);
+      Wait.assertEquals(0, purgeQueue.getPageSubscription().getPagingStore()::getAddressSize);
+      Wait.assertFalse(purgeQueue.getPageSubscription()::isPaging);
+
+      StorageManager sm = server.getStorageManager();
+
+
+      for (int i = 0; i < 1000; i++) {
+         long tx = sm.generateID();
+         PageTransactionInfoImpl txinfo = new PageTransactionInfoImpl(tx);
+         sm.storePageTransaction(tx, txinfo);
+         sm.commit(tx);
+         tx = sm.generateID();
+         sm.updatePageTransaction(tx, txinfo, 1);
+         sm.commit(tx);
+      }
+
+      server.stop();
+      server.start();
+      Assert.assertEquals(0, server.getPagingManager().getTransactions().size());
+   }
+
+
    // First page is complete but it wasn't deleted
    @Test
    public void testFirstPageCompleteNotDeleted() throws Exception {
@@ -5680,8 +5786,8 @@ public class PagingTest extends ActiveMQTestBase {
    }
 
    @Override
-   protected Configuration createDefaultInVMConfig() throws Exception {
-      Configuration configuration = super.createDefaultInVMConfig().setJournalSyncNonTransactional(false);
+   protected Configuration createDefaultConfig(final int serverID, final boolean netty) throws Exception {
+      Configuration configuration = super.createDefaultConfig(serverID, netty);
       if (storeType == StoreConfiguration.StoreType.DATABASE) {
          setDBStoreType(configuration);
       } else if (mapped) {


[5/5] activemq-artemis git commit: This closes #1818 ARTEMIS-1638 Fixing Purge on rollback & paging

Posted by an...@apache.org.
This closes #1818 ARTEMIS-1638 Fixing Purge on rollback & paging


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/b66d0f7a
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/b66d0f7a
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/b66d0f7a

Branch: refs/heads/master
Commit: b66d0f7ac40001cce14ca7146e74720504ff9eb1
Parents: c671fa0 c10b744
Author: Andy Taylor <an...@gmail.com>
Authored: Mon Jan 29 07:46:39 2018 +0000
Committer: Andy Taylor <an...@gmail.com>
Committed: Mon Jan 29 07:46:39 2018 +0000

----------------------------------------------------------------------
 .../core/paging/PageTransactionInfo.java        |   4 +-
 .../paging/impl/PageTransactionInfoImpl.java    |  32 +++--
 .../journal/AbstractJournalStorageManager.java  |  30 ++++-
 .../core/postoffice/impl/PostOfficeImpl.java    |   2 +-
 .../artemis/core/server/ActiveMQServer.java     |  28 ++++
 .../activemq/artemis/core/server/Queue.java     |  13 +-
 .../core/server/impl/ActiveMQServerImpl.java    |  24 +---
 .../artemis/core/server/impl/QueueImpl.java     |  42 ++++--
 .../core/server/impl/QueueManagerImpl.java      |   2 +-
 .../core/server/impl/RoutingContextImpl.java    |   2 +-
 .../core/server/impl/ServerConsumerImpl.java    |   2 +-
 .../management/impl/ManagementServiceImpl.java  |   2 +-
 .../impl/ScheduledDeliveryHandlerTest.java      |   7 +-
 .../integration/amqp/AmqpClientTestSupport.java |   5 +
 .../amqp/AmqpPurgeOnNoConsumersTest.java        | 127 ++++++++++++++-----
 .../tests/integration/paging/PagingTest.java    | 110 +++++++++++++++-
 .../unit/core/postoffice/impl/FakeQueue.java    |   8 +-
 17 files changed, 357 insertions(+), 83 deletions(-)
----------------------------------------------------------------------