You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2015/08/28 21:26:25 UTC
qpid-jms git commit: QPIDJMS-51 Refactor QueueBroser to use
mechanisms based on the recent pull consumer updates.
Repository: qpid-jms
Updated Branches:
refs/heads/master 4dd2fbd70 -> 7e4e131fc
QPIDJMS-51 Refactor QueueBroser to use mechanisms based on the recent
pull consumer updates.
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/7e4e131f
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/7e4e131f
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/7e4e131f
Branch: refs/heads/master
Commit: 7e4e131fc6635c8f65a549f2b5e6c2c56bd2bf35
Parents: 4dd2fbd
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Aug 28 15:26:16 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Aug 28 15:26:16 2015 -0400
----------------------------------------------------------------------
.../org/apache/qpid/jms/JmsMessageConsumer.java | 2 +-
.../org/apache/qpid/jms/JmsQueueBrowser.java | 41 +++------
.../qpid/jms/provider/amqp/AmqpConsumer.java | 22 ++---
.../jms/provider/amqp/AmqpQueueBrowser.java | 96 +++++++-------------
.../qpid/jms/consumer/JmsQueueBrowserTest.java | 1 +
5 files changed, 62 insertions(+), 100 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7e4e131f/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
index 7823499..8a1edf7 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
@@ -666,7 +666,7 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC
* The amount of time the pull request should remain valid.
*/
protected void sendPullCommand(long timeout) throws JMSException {
- if (!messageQueue.isClosed() && messageQueue.isEmpty() && (getPrefetchSize() == 0 || isBrowser())) {
+ if (!messageQueue.isClosed() && messageQueue.isEmpty() && isPullConsumer()) {
connection.pull(getConsumerId(), timeout);
// Once a new pull has gone out check to see if the queue was stopped due to failover
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7e4e131f/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueBrowser.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueBrowser.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueBrowser.java
index 2fea6a1..d329c39 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueBrowser.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueBrowser.java
@@ -67,7 +67,6 @@ public class JmsQueueBrowser implements QueueBrowser, Enumeration<Message> {
private Message next;
private final AtomicBoolean closed = new AtomicBoolean();
- private final Object semaphore = new Object();
/**
* Constructor for an JmsQueueBrowser - used internally
@@ -133,7 +132,7 @@ public class JmsQueueBrowser implements QueueBrowser, Enumeration<Message> {
if (next == null) {
try {
- next = consumer.receiveNoWait();
+ next = consumer.receive(2000);
} catch (JMSException e) {
LOG.warn("Error while receive the next message: {}", e.getMessage());
}
@@ -141,6 +140,9 @@ public class JmsQueueBrowser implements QueueBrowser, Enumeration<Message> {
if (next != null) {
return true;
}
+
+ LOG.info("Browser {} read message", next);
+
} else {
return true;
}
@@ -149,8 +151,6 @@ public class JmsQueueBrowser implements QueueBrowser, Enumeration<Message> {
destroyConsumer();
return false;
}
-
- waitForMessage();
}
}
@@ -208,25 +208,6 @@ public class JmsQueueBrowser implements QueueBrowser, Enumeration<Message> {
return selector;
}
- /**
- * Wait on a semaphore for a fixed amount of time for a message to come in.
- */
- protected void waitForMessage() {
- try {
- synchronized (semaphore) {
- semaphore.wait(2000);
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
-
- protected void notifyMessageAvailable() {
- synchronized (semaphore) {
- semaphore.notifyAll();
- }
- }
-
@Override
public String toString() {
JmsMessageConsumer consumer = this.consumer;
@@ -243,13 +224,21 @@ public class JmsQueueBrowser implements QueueBrowser, Enumeration<Message> {
}
@Override
+ public boolean isPullConsumer() {
+ return true;
+ }
+
+ @Override
public void onInboundMessage(JmsInboundMessageDispatch envelope) {
if (envelope.getMessage() == null) {
+
+ // TODO - Remove
+ LOG.info("Browser {} read browse done.", getConsumerId());
+
browseDone.set(true);
- } else {
- super.onInboundMessage(envelope);
}
- notifyMessageAvailable();
+
+ super.onInboundMessage(envelope);
}
};
rc.init();
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7e4e131f/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index ab8c2f3..9ba2730 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -91,13 +91,10 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
protected final AmqpSession session;
protected final Map<JmsInboundMessageDispatch, Delivery> delivered = new LinkedHashMap<JmsInboundMessageDispatch, Delivery>();
protected boolean presettle;
-
- private final ByteBuf incomingBuffer = Unpooled.buffer(INITIAL_BUFFER_CAPACITY);
-
- private final AtomicLong incomingSequence = new AtomicLong(0);
-
- private AsyncResult stopRequest;
- private PullRequest pullRequest;
+ protected AsyncResult stopRequest;
+ protected PullRequest pullRequest;
+ protected final ByteBuf incomingBuffer = Unpooled.buffer(INITIAL_BUFFER_CAPACITY);
+ protected final AtomicLong incomingSequence = new AtomicLong(0);
public AmqpConsumer(AmqpSession session, JmsConsumerInfo info) {
super(info);
@@ -385,7 +382,6 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
while (reverseIterator.hasPrevious()) {
JmsInboundMessageDispatch envelope = reverseIterator.previous();
- // TODO: apply connection redelivery policy to those messages that are past max redelivery.
envelope.getMessage().getFacade().setRedeliveryCount(
envelope.getMessage().getFacade().getRedeliveryCount() + 1);
envelope.setEnqueueFirst(true);
@@ -408,8 +404,8 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
* the amount of time to tell the remote peer to keep this pull request valid.
*/
public void pull(final long timeout) {
- LOG.trace("Pull called on consumer {} with timeout = {}", getConsumerId(), timeout);
- if (resource.getPrefetchSize() == 0 && getEndpoint().getCredit() == 0) {
+ LOG.trace("Pull on consumer {} with timeout = {}", getConsumerId(), timeout);
+ if (getEndpoint().getCredit() == 0 && getEndpoint().getQueued() == 0) {
if (timeout < 0) {
getEndpoint().flow(1);
} else if (timeout == 0) {
@@ -636,7 +632,9 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
public void postRollback() throws Exception {
}
- private class PullRequest implements AsyncResult {
+ //----- Inner classes used in message pull operations --------------------//
+
+ protected class PullRequest implements AsyncResult {
@Override
public void onFailure(Throwable result) {
@@ -661,7 +659,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
}
}
- private class TimedPullRequest extends PullRequest {
+ protected class TimedPullRequest extends PullRequest {
private final ScheduledFuture<?> completionTask;
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7e4e131f/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpQueueBrowser.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpQueueBrowser.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpQueueBrowser.java
index 245f938..3a28e39 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpQueueBrowser.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpQueueBrowser.java
@@ -16,11 +16,9 @@
*/
package org.apache.qpid.jms.provider.amqp;
-import java.io.IOException;
+import java.util.concurrent.ScheduledFuture;
-import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.apache.qpid.jms.meta.JmsConsumerInfo;
-import org.apache.qpid.jms.util.IOExceptionSupport;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,72 +30,34 @@ public class AmqpQueueBrowser extends AmqpConsumer {
private static final Logger LOG = LoggerFactory.getLogger(AmqpQueueBrowser.class);
- /**
- * @param session
- * @param info
- */
public AmqpQueueBrowser(AmqpSession session, JmsConsumerInfo info) {
super(session, info);
}
- /**
- * QueueBrowser will attempt to initiate a pull whenever there are no pending Messages.
- *
- * We need to initiate a drain to see if there are any messages and if the remote sender
- * indicates it is drained then we can send end of browse. We only do this when there
- * are no pending incoming deliveries and all delivered messages have become settled
- * in order to give the remote a chance to dispatch more messages once all deliveries
- * have been settled.
- *
- * @param timeout
- * ignored in this context.
- */
@Override
- public void pull(long timeout) {
- if (!getEndpoint().getDrain() && getEndpoint().current() == null && getEndpoint().getUnsettled() == 0) {
- LOG.trace("QueueBrowser {} will try to drain remote.", getConsumerId());
- getEndpoint().drain(resource.getPrefetchSize());
- } else {
- getEndpoint().setDrain(false);
- }
- }
-
- @Override
- public void processFlowUpdates(AmqpProvider provider) throws IOException {
- if (getEndpoint().getDrain() && getEndpoint().getCredit() == getEndpoint().getRemoteCredit()) {
- JmsInboundMessageDispatch browseDone = new JmsInboundMessageDispatch(getNextIncomingSequenceNumber());
- browseDone.setConsumerId(getConsumerId());
- try {
- deliver(browseDone);
- } catch (Exception e) {
- throw IOExceptionSupport.create(e);
- }
- } else {
- getEndpoint().setDrain(false);
- }
+ public void pull(final long timeout) {
+ LOG.trace("Pull on browser {} with timeout = {}", getConsumerId(), timeout);
- super.processFlowUpdates(provider);
- }
+ // Pull for browser is called when there are no available messages buffered.
+ // If we still have some to dispatch then no pull is needed otherwise we might
+ // need to attempt try and drain to end the browse.
+ if (getEndpoint().getQueued() == 0) {
+ final ScheduledFuture<?> future = getSession().schedule(new Runnable() {
- @Override
- public void processDeliveryUpdates(AmqpProvider provider) throws IOException {
- if (getEndpoint().getDrain() && getEndpoint().current() != null) {
- LOG.trace("{} incoming delivery, cancel drain.", getConsumerId());
- getEndpoint().setDrain(false);
- }
+ @Override
+ public void run() {
+ // Try for one last time to pull a message down, if this
+ // fails then we can end the browse otherwise the link credit
+ // will get updated on the next sent disposition and we will
+ // end up back here if no more messages arrive.
+ LOG.trace("Browser {} attemptig to force a message dispatch");
+ getEndpoint().drain(1);
+ pullRequest = new PullRequest();
+ session.getProvider().pumpToProtonTransport();
+ }
+ }, timeout);
- super.processDeliveryUpdates(provider);
-
- if (getEndpoint().getDrain() && getEndpoint().getCredit() == getEndpoint().getRemoteCredit()) {
- JmsInboundMessageDispatch browseDone = new JmsInboundMessageDispatch(getNextIncomingSequenceNumber());
- browseDone.setConsumerId(getConsumerId());
- try {
- deliver(browseDone);
- } catch (Exception e) {
- throw IOExceptionSupport.create(e);
- }
- } else {
- getEndpoint().setDrain(false);
+ pullRequest = new BrowseEndPullRequest(future);
}
}
@@ -114,4 +74,18 @@ public class AmqpQueueBrowser extends AmqpConsumer {
public boolean isBrowser() {
return true;
}
+
+ //----- Inner classes used in message pull operations --------------------//
+
+ protected class BrowseEndPullRequest extends TimedPullRequest {
+
+ public BrowseEndPullRequest(ScheduledFuture<?> completionTask) {
+ super(completionTask);
+ }
+
+ @Override
+ public void onFailure(Throwable result) {
+ // Nothing to do, the timer will take care of the end of browse signal.
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7e4e131f/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsQueueBrowserTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsQueueBrowserTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsQueueBrowserTest.java
index 2076e61..1423c81 100644
--- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsQueueBrowserTest.java
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsQueueBrowserTest.java
@@ -99,6 +99,7 @@ public class JmsQueueBrowserTest extends AmqpTestSupport {
while (enumeration.hasMoreElements()) {
Message m = (Message) enumeration.nextElement();
assertTrue(m instanceof TextMessage);
+ LOG.debug("Browsed message {} from Queue {}", m, queue);
}
browser.close();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org