You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2015/08/28 21:26:25 UTC

qpid-jms git commit: QPIDJMS-51 Refactor QueueBroser to use mechanisms based on the recent pull consumer updates.

Repository: qpid-jms
Updated Branches:
  refs/heads/master 4dd2fbd70 -> 7e4e131fc


QPIDJMS-51 Refactor QueueBroser to use mechanisms based on the recent
pull consumer updates.  

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/7e4e131f
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/7e4e131f
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/7e4e131f

Branch: refs/heads/master
Commit: 7e4e131fc6635c8f65a549f2b5e6c2c56bd2bf35
Parents: 4dd2fbd
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Aug 28 15:26:16 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Aug 28 15:26:16 2015 -0400

----------------------------------------------------------------------
 .../org/apache/qpid/jms/JmsMessageConsumer.java |  2 +-
 .../org/apache/qpid/jms/JmsQueueBrowser.java    | 41 +++------
 .../qpid/jms/provider/amqp/AmqpConsumer.java    | 22 ++---
 .../jms/provider/amqp/AmqpQueueBrowser.java     | 96 +++++++-------------
 .../qpid/jms/consumer/JmsQueueBrowserTest.java  |  1 +
 5 files changed, 62 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7e4e131f/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
index 7823499..8a1edf7 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
@@ -666,7 +666,7 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC
      *        The amount of time the pull request should remain valid.
      */
     protected void sendPullCommand(long timeout) throws JMSException {
-        if (!messageQueue.isClosed() && messageQueue.isEmpty() && (getPrefetchSize() == 0 || isBrowser())) {
+        if (!messageQueue.isClosed() && messageQueue.isEmpty() && isPullConsumer()) {
             connection.pull(getConsumerId(), timeout);
 
             // Once a new pull has gone out check to see if the queue was stopped due to failover

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7e4e131f/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueBrowser.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueBrowser.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueBrowser.java
index 2fea6a1..d329c39 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueBrowser.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueBrowser.java
@@ -67,7 +67,6 @@ public class JmsQueueBrowser implements QueueBrowser, Enumeration<Message> {
 
     private Message next;
     private final AtomicBoolean closed = new AtomicBoolean();
-    private final Object semaphore = new Object();
 
     /**
      * Constructor for an JmsQueueBrowser - used internally
@@ -133,7 +132,7 @@ public class JmsQueueBrowser implements QueueBrowser, Enumeration<Message> {
 
             if (next == null) {
                 try {
-                    next = consumer.receiveNoWait();
+                    next = consumer.receive(2000);
                 } catch (JMSException e) {
                     LOG.warn("Error while receive the next message: {}", e.getMessage());
                 }
@@ -141,6 +140,9 @@ public class JmsQueueBrowser implements QueueBrowser, Enumeration<Message> {
                 if (next != null) {
                     return true;
                 }
+
+                LOG.info("Browser {} read message", next);
+
             } else {
                 return true;
             }
@@ -149,8 +151,6 @@ public class JmsQueueBrowser implements QueueBrowser, Enumeration<Message> {
                 destroyConsumer();
                 return false;
             }
-
-            waitForMessage();
         }
     }
 
@@ -208,25 +208,6 @@ public class JmsQueueBrowser implements QueueBrowser, Enumeration<Message> {
         return selector;
     }
 
-    /**
-     * Wait on a semaphore for a fixed amount of time for a message to come in.
-     */
-    protected void waitForMessage() {
-        try {
-            synchronized (semaphore) {
-                semaphore.wait(2000);
-            }
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-        }
-    }
-
-    protected void notifyMessageAvailable() {
-        synchronized (semaphore) {
-            semaphore.notifyAll();
-        }
-    }
-
     @Override
     public String toString() {
         JmsMessageConsumer consumer = this.consumer;
@@ -243,13 +224,21 @@ public class JmsQueueBrowser implements QueueBrowser, Enumeration<Message> {
             }
 
             @Override
+            public boolean isPullConsumer() {
+                return true;
+            }
+
+            @Override
             public void onInboundMessage(JmsInboundMessageDispatch envelope) {
                 if (envelope.getMessage() == null) {
+
+                    // TODO - Remove
+                    LOG.info("Browser {} read browse done.", getConsumerId());
+
                     browseDone.set(true);
-                } else {
-                    super.onInboundMessage(envelope);
                 }
-                notifyMessageAvailable();
+
+                super.onInboundMessage(envelope);
             }
         };
         rc.init();

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7e4e131f/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index ab8c2f3..9ba2730 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -91,13 +91,10 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
     protected final AmqpSession session;
     protected final Map<JmsInboundMessageDispatch, Delivery> delivered = new LinkedHashMap<JmsInboundMessageDispatch, Delivery>();
     protected boolean presettle;
-
-    private final ByteBuf incomingBuffer = Unpooled.buffer(INITIAL_BUFFER_CAPACITY);
-
-    private final AtomicLong incomingSequence = new AtomicLong(0);
-
-    private AsyncResult stopRequest;
-    private PullRequest pullRequest;
+    protected AsyncResult stopRequest;
+    protected PullRequest pullRequest;
+    protected final ByteBuf incomingBuffer = Unpooled.buffer(INITIAL_BUFFER_CAPACITY);
+    protected final AtomicLong incomingSequence = new AtomicLong(0);
 
     public AmqpConsumer(AmqpSession session, JmsConsumerInfo info) {
         super(info);
@@ -385,7 +382,6 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
 
         while (reverseIterator.hasPrevious()) {
             JmsInboundMessageDispatch envelope = reverseIterator.previous();
-            // TODO: apply connection redelivery policy to those messages that are past max redelivery.
             envelope.getMessage().getFacade().setRedeliveryCount(
                 envelope.getMessage().getFacade().getRedeliveryCount() + 1);
             envelope.setEnqueueFirst(true);
@@ -408,8 +404,8 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
      *        the amount of time to tell the remote peer to keep this pull request valid.
      */
     public void pull(final long timeout) {
-        LOG.trace("Pull called on consumer {} with timeout = {}", getConsumerId(), timeout);
-        if (resource.getPrefetchSize() == 0 && getEndpoint().getCredit() == 0) {
+        LOG.trace("Pull on consumer {} with timeout = {}", getConsumerId(), timeout);
+        if (getEndpoint().getCredit() == 0 && getEndpoint().getQueued() == 0) {
             if (timeout < 0) {
                 getEndpoint().flow(1);
             } else if (timeout == 0) {
@@ -636,7 +632,9 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
     public void postRollback() throws Exception {
     }
 
-    private class PullRequest implements AsyncResult {
+    //----- Inner classes used in message pull operations --------------------//
+
+    protected class PullRequest implements AsyncResult {
 
         @Override
         public void onFailure(Throwable result) {
@@ -661,7 +659,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
         }
     }
 
-    private class TimedPullRequest extends PullRequest {
+    protected class TimedPullRequest extends PullRequest {
 
         private final ScheduledFuture<?> completionTask;
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7e4e131f/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpQueueBrowser.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpQueueBrowser.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpQueueBrowser.java
index 245f938..3a28e39 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpQueueBrowser.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpQueueBrowser.java
@@ -16,11 +16,9 @@
  */
 package org.apache.qpid.jms.provider.amqp;
 
-import java.io.IOException;
+import java.util.concurrent.ScheduledFuture;
 
-import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
 import org.apache.qpid.jms.meta.JmsConsumerInfo;
-import org.apache.qpid.jms.util.IOExceptionSupport;
 import org.apache.qpid.proton.amqp.messaging.Source;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,72 +30,34 @@ public class AmqpQueueBrowser extends AmqpConsumer {
 
     private static final Logger LOG = LoggerFactory.getLogger(AmqpQueueBrowser.class);
 
-    /**
-     * @param session
-     * @param info
-     */
     public AmqpQueueBrowser(AmqpSession session, JmsConsumerInfo info) {
         super(session, info);
     }
 
-    /**
-     * QueueBrowser will attempt to initiate a pull whenever there are no pending Messages.
-     *
-     * We need to initiate a drain to see if there are any messages and if the remote sender
-     * indicates it is drained then we can send end of browse.  We only do this when there
-     * are no pending incoming deliveries and all delivered messages have become settled
-     * in order to give the remote a chance to dispatch more messages once all deliveries
-     * have been settled.
-     *
-     * @param timeout
-     *        ignored in this context.
-     */
     @Override
-    public void pull(long timeout) {
-        if (!getEndpoint().getDrain() && getEndpoint().current() == null && getEndpoint().getUnsettled() == 0) {
-            LOG.trace("QueueBrowser {} will try to drain remote.", getConsumerId());
-            getEndpoint().drain(resource.getPrefetchSize());
-        } else {
-            getEndpoint().setDrain(false);
-        }
-    }
-
-    @Override
-    public void processFlowUpdates(AmqpProvider provider) throws IOException {
-        if (getEndpoint().getDrain() && getEndpoint().getCredit() == getEndpoint().getRemoteCredit()) {
-            JmsInboundMessageDispatch browseDone = new JmsInboundMessageDispatch(getNextIncomingSequenceNumber());
-            browseDone.setConsumerId(getConsumerId());
-            try {
-                deliver(browseDone);
-            } catch (Exception e) {
-                throw IOExceptionSupport.create(e);
-            }
-        } else {
-            getEndpoint().setDrain(false);
-        }
+    public void pull(final long timeout) {
+        LOG.trace("Pull on browser {} with timeout = {}", getConsumerId(), timeout);
 
-        super.processFlowUpdates(provider);
-    }
+        // Pull for browser is called when there are no available messages buffered.
+        // If we still have some to dispatch then no pull is needed otherwise we might
+        // need to attempt try and drain to end the browse.
+        if (getEndpoint().getQueued() == 0) {
+            final ScheduledFuture<?> future = getSession().schedule(new Runnable() {
 
-    @Override
-    public void processDeliveryUpdates(AmqpProvider provider) throws IOException {
-        if (getEndpoint().getDrain() && getEndpoint().current() != null) {
-            LOG.trace("{} incoming delivery, cancel drain.", getConsumerId());
-            getEndpoint().setDrain(false);
-        }
+                @Override
+                public void run() {
+                    // Try for one last time to pull a message down, if this
+                    // fails then we can end the browse otherwise the link credit
+                    // will get updated on the next sent disposition and we will
+                    // end up back here if no more messages arrive.
+                    LOG.trace("Browser {} attemptig to force a message dispatch");
+                    getEndpoint().drain(1);
+                    pullRequest = new PullRequest();
+                    session.getProvider().pumpToProtonTransport();
+                }
+            }, timeout);
 
-        super.processDeliveryUpdates(provider);
-
-        if (getEndpoint().getDrain() && getEndpoint().getCredit() == getEndpoint().getRemoteCredit()) {
-            JmsInboundMessageDispatch browseDone = new JmsInboundMessageDispatch(getNextIncomingSequenceNumber());
-            browseDone.setConsumerId(getConsumerId());
-            try {
-                deliver(browseDone);
-            } catch (Exception e) {
-                throw IOExceptionSupport.create(e);
-            }
-        } else {
-            getEndpoint().setDrain(false);
+            pullRequest = new BrowseEndPullRequest(future);
         }
     }
 
@@ -114,4 +74,18 @@ public class AmqpQueueBrowser extends AmqpConsumer {
     public boolean isBrowser() {
         return true;
     }
+
+    //----- Inner classes used in message pull operations --------------------//
+
+    protected class BrowseEndPullRequest extends TimedPullRequest {
+
+        public BrowseEndPullRequest(ScheduledFuture<?> completionTask) {
+            super(completionTask);
+        }
+
+        @Override
+        public void onFailure(Throwable result) {
+            // Nothing to do, the timer will take care of the end of browse signal.
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7e4e131f/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsQueueBrowserTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsQueueBrowserTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsQueueBrowserTest.java
index 2076e61..1423c81 100644
--- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsQueueBrowserTest.java
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsQueueBrowserTest.java
@@ -99,6 +99,7 @@ public class JmsQueueBrowserTest extends AmqpTestSupport {
         while (enumeration.hasMoreElements()) {
             Message m = (Message) enumeration.nextElement();
             assertTrue(m instanceof TextMessage);
+            LOG.debug("Browsed message {} from Queue {}", m, queue);
         }
 
         browser.close();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org