You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2014/12/15 17:37:15 UTC

[1/2] activemq git commit: fix failing DbRestartJDBCQueueTest

Repository: activemq
Updated Branches:
  refs/heads/trunk 411c7547a -> 13c207292


fix failing DbRestartJDBCQueueTest


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/13c20729
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/13c20729
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/13c20729

Branch: refs/heads/trunk
Commit: 13c207292f9df7e730b778ac00e4fe30394ff687
Parents: 2d9959a
Author: gtully <ga...@gmail.com>
Authored: Mon Dec 15 14:17:21 2014 +0000
Committer: gtully <ga...@gmail.com>
Committed: Mon Dec 15 14:21:47 2014 +0000

----------------------------------------------------------------------
 .../java/org/apache/activemq/broker/region/Queue.java  | 13 +++----------
 1 file changed, 3 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/13c20729/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index a6515c4..f5f2efe 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -937,15 +937,8 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
 
     @Override
     public String toString() {
-        int size = 0;
-        messagesLock.readLock().lock();
-        try {
-            size = messages.size();
-        } finally {
-            messagesLock.readLock().unlock();
-        }
         return destination.getQualifiedName() + ", subscriptions=" + consumers.size()
-                + ", memory=" + memoryUsage.getPercentUsage() + "%, size=" + size + ", pending="
+                + ", memory=" + memoryUsage.getPercentUsage() + "%, size=" + destinationStatistics.getMessages().getCount() + ", pending="
                 + indexOrderedCursorUpdates.size();
     }
 
@@ -1720,7 +1713,6 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
     protected void removeMessage(ConnectionContext context, Subscription sub, final QueueMessageReference reference,
             MessageAck ack) throws IOException {
         LOG.trace("ack of {} with {}", reference.getMessageId(), ack);
-        reference.setAcked(true);
         // This sends the ack the the journal..
         if (!ack.isInTransaction()) {
             acknowledge(context, sub, ack, reference);
@@ -1759,7 +1751,8 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
                 getDestinationStatistics().getForwards().increment();
             }
         }
-
+        // after successful store update
+        reference.setAcked(true);
     }
 
     private void dropMessage(QueueMessageReference reference) {


[2/2] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5476 - ZeroPrefetchConsumerTest regression - fix default in connection factory and refactor prefetchExtension support - https://issues.apache.org/activemq/browse/AMQ-2560

Posted by gt...@apache.org.
https://issues.apache.org/jira/browse/AMQ-5476 - ZeroPrefetchConsumerTest regression - fix default in connection factory and refactor prefetchExtension support -  https://issues.apache.org/activemq/browse/AMQ-2560


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/2d9959a6
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/2d9959a6
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/2d9959a6

Branch: refs/heads/trunk
Commit: 2d9959a6f6f33f7138606073e425a74261ec3125
Parents: 411c754
Author: gtully <ga...@gmail.com>
Authored: Mon Dec 15 14:12:08 2014 +0000
Committer: gtully <ga...@gmail.com>
Committed: Mon Dec 15 14:21:47 2014 +0000

----------------------------------------------------------------------
 .../broker/region/PrefetchSubscription.java     | 60 +++++++++-----------
 .../activemq/ActiveMQConnectionFactory.java     |  2 +-
 .../activemq/ZeroPrefetchConsumerTest.java      | 14 +++--
 3 files changed, 38 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/2d9959a6/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
index 8b8a788..b101d72 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
@@ -234,26 +234,6 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
                         index++;
                         acknowledge(context, ack, node);
                         if (ack.getLastMessageId().equals(messageId)) {
-                            // contract prefetch if dispatch required a pull
-                            if (getPrefetchSize() == 0) {
-                                // Protect extension update against parallel updates.
-                                while (true) {
-                                    int currentExtension = prefetchExtension.get();
-                                    int newExtension = Math.max(0, currentExtension - index);
-                                    if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
-                                        break;
-                                    }
-                                }
-                            } else if (usePrefetchExtension && context.isInTransaction()) {
-                                // extend prefetch window only if not a pulling consumer
-                                while (true) {
-                                    int currentExtension = prefetchExtension.get();
-                                    int newExtension = Math.max(currentExtension, index);
-                                    if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
-                                        break;
-                                    }
-                                }
-                            }
                             destination = (Destination) node.getRegionDestination();
                             callDispatchMatched = true;
                             break;
@@ -283,14 +263,17 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
                             registerRemoveSync(context, node);
                         }
 
-                        // Protect extension update against parallel updates.
-                        while (true) {
-                            int currentExtension = prefetchExtension.get();
-                            int newExtension = Math.max(0, currentExtension - 1);
-                            if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
-                                break;
+                        if (usePrefetchExtension && getPrefetchSize() != 0 && ack.isInTransaction()) {
+                            // allow transaction batch to exceed prefetch
+                            while (true) {
+                                int currentExtension = prefetchExtension.get();
+                                int newExtension = Math.max(currentExtension, currentExtension + 1);
+                                if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
+                                    break;
+                                }
                             }
                         }
+
                         acknowledge(context, ack, node);
                         destination = (Destination) node.getRegionDestination();
                         callDispatchMatched = true;
@@ -313,7 +296,8 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
                         nodeDest.getDestinationStatistics().getInflight().decrement();
                     }
                     if (ack.getLastMessageId().equals(node.getMessageId())) {
-                        if (usePrefetchExtension) {
+                        if (usePrefetchExtension && getPrefetchSize() != 0) {
+                            // allow  batch to exceed prefetch
                             while (true) {
                                 int currentExtension = prefetchExtension.get();
                                 int newExtension = Math.max(currentExtension, index + 1);
@@ -426,6 +410,19 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
                 new Synchronization() {
 
                     @Override
+                    public void beforeEnd() {
+                        if (usePrefetchExtension && getPrefetchSize() != 0) {
+                            while (true) {
+                                int currentExtension = prefetchExtension.get();
+                                int newExtension = Math.max(0, currentExtension - 1);
+                                if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
+                                    break;
+                                }
+                            }
+                        }
+                    }
+
+                    @Override
                     public void afterCommit()
                             throws Exception {
                         Destination nodeDest = (Destination) node.getRegionDestination();
@@ -516,7 +513,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
      */
     @Override
     public boolean isFull() {
-        return dispatched.size() - prefetchExtension.get() >= info.getPrefetchSize();
+        return getPrefetchSize() == 0 ? prefetchExtension.get() == 0 : dispatched.size() - prefetchExtension.get() >= info.getPrefetchSize();
     }
 
     /**
@@ -537,7 +534,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
 
     @Override
     public int countBeforeFull() {
-        return info.getPrefetchSize() + prefetchExtension.get() - dispatched.size();
+        return getPrefetchSize() == 0 ? prefetchExtension.get() : info.getPrefetchSize() + prefetchExtension.get() - dispatched.size();
     }
 
     @Override
@@ -696,13 +693,12 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
 
         okForAckAsDispatchDone.countDown();
 
-        // No reentrant lock - Patch needed to IndirectMessageReference on method lock
         MessageDispatch md = createMessageDispatch(node, message);
-        // NULL messages don't count... they don't get Acked.
         if (node != QueueMessageReference.NULL_MESSAGE) {
             dispatchCounter++;
             dispatched.add(node);
-        } else {
+        }
+        if (getPrefetchSize() == 0) {
             while (true) {
                 int currentExtension = prefetchExtension.get();
                 int newExtension = Math.max(0, currentExtension - 1);

http://git-wip-us.apache.org/repos/asf/activemq/blob/2d9959a6/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
index 3354ab3..1fbf604 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
@@ -173,7 +173,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
     private long consumerFailoverRedeliveryWaitPeriod = 0;
     private boolean checkForDuplicates = true;
     private ClientInternalExceptionListener clientInternalExceptionListener;
-    private boolean messagePrioritySupported = true;
+    private boolean messagePrioritySupported = false;
     private boolean transactedIndividualAck = false;
     private boolean nonBlockingRedelivery = false;
     private int maxThreadPoolSize = ActiveMQConnection.DEFAULT_THREAD_POOL_SIZE;

http://git-wip-us.apache.org/repos/asf/activemq/blob/2d9959a6/activemq-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
index acf9c03..d4cecab 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
@@ -174,7 +174,7 @@ public class ZeroPrefetchConsumerTest extends EmbeddedBrokerTestSupport {
     }
 
     private void doTestManyMessageConsumer(boolean transacted) throws Exception {
-        Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
+        Session session = connection.createSession(transacted, transacted ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
 
         MessageProducer producer = session.createProducer(queue);
         producer.send(session.createTextMessage("Msg1"));
@@ -221,12 +221,11 @@ public class ZeroPrefetchConsumerTest extends EmbeddedBrokerTestSupport {
             session.commit();
         }
         // Now using other consumer
-        // this call should return the next message (Msg5) still left on the queue
+        // this call should return the next message still left on the queue
         answer = (TextMessage)consumer.receive(5000);
         assertEquals("Should have received a message!", answer.getText(), "Msg6");
         // read one more message without commit
-        // Now using other consumer
-        // this call should return the next message (Msg5) still left on the queue
+        // this call should return the next message still left on the queue
         answer = (TextMessage)consumer.receive(5000);
         assertEquals("Should have received a message!", answer.getText(), "Msg7");
         if (transacted) {
@@ -247,12 +246,17 @@ public class ZeroPrefetchConsumerTest extends EmbeddedBrokerTestSupport {
         doTestManyMessageConsumerWithSend(true);
     }
 
+    public void testManyMessageConsumerWithTxSendPrioritySupport() throws Exception {
+        ((ActiveMQConnection)connection).setMessagePrioritySupported(true);
+        doTestManyMessageConsumerWithSend(true);
+    }
+
     public void testManyMessageConsumerWithSendNoTransaction() throws Exception {
         doTestManyMessageConsumerWithSend(false);
     }
 
     private void doTestManyMessageConsumerWithSend(boolean transacted) throws Exception {
-        Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
+        Session session = connection.createSession(transacted, transacted ? Session.SESSION_TRANSACTED :Session.AUTO_ACKNOWLEDGE);
 
         MessageProducer producer = session.createProducer(queue);
         producer.send(session.createTextMessage("Msg1"));