You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2015/08/21 16:43:46 UTC
qpid-jms git commit: https://issues.apache.org/jira/browse/QPIDJMS-92
Repository: qpid-jms
Updated Branches:
refs/heads/master 6573d8df8 -> 9f6ecd3b4
https://issues.apache.org/jira/browse/QPIDJMS-92
Makes the pull consumer actually pull from the remote peer and clears
any granted credit if the pull did not succeed in getting any messages.
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/9f6ecd3b
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/9f6ecd3b
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/9f6ecd3b
Branch: refs/heads/master
Commit: 9f6ecd3b407880e4283e403ebbe6aff6665c3813
Parents: 6573d8d
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Aug 21 10:43:31 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Aug 21 10:43:31 2015 -0400
----------------------------------------------------------------------
.../org/apache/qpid/jms/JmsMessageConsumer.java | 118 +++++++++++++--
.../org/apache/qpid/jms/provider/Provider.java | 6 +
.../qpid/jms/provider/amqp/AmqpConnection.java | 21 +++
.../qpid/jms/provider/amqp/AmqpConsumer.java | 127 ++++++++++++++--
.../jms/provider/amqp/AmqpFixedProducer.java | 10 +-
.../qpid/jms/provider/amqp/AmqpProvider.java | 10 +-
.../qpid/jms/provider/amqp/AmqpSession.java | 21 +++
.../provider/amqp/AmqpTemporaryDestination.java | 4 +-
.../qpid/jms/consumer/JmsZeroPrefetchTest.java | 145 ++++++++++++++++++-
.../src/test/resources/log4j.properties | 2 +-
10 files changed, 414 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9f6ecd3b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
index dff2c83..e8a7066 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
@@ -40,12 +40,16 @@ import org.apache.qpid.jms.provider.ProviderFuture;
import org.apache.qpid.jms.util.FifoMessageQueue;
import org.apache.qpid.jms.util.MessageQueue;
import org.apache.qpid.jms.util.PriorityMessageQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* implementation of a JMS Message Consumer
*/
public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableConsumer, JmsMessageDispatcher {
+ private static final Logger LOG = LoggerFactory.getLogger(JmsMessageConsumer.class);
+
protected final JmsSession session;
protected final JmsConnection connection;
protected JmsConsumerInfo consumerInfo;
@@ -215,18 +219,22 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC
public Message receive(long timeout) throws JMSException {
checkClosed();
checkMessageListener();
- sendPullCommand(timeout);
- long wait = timeout;
+ // Configure for infinite wait when timeout is zero (JMS Spec)
if (timeout == 0) {
- wait = -1;
+ timeout = -1;
}
- try {
- return copy(ackFromReceive(this.messageQueue.dequeue(wait)));
- } catch (InterruptedException e) {
- throw JmsExceptionSupport.create(e);
+ sendPullCommand(timeout);
+
+ JmsInboundMessageDispatch envelope = null;
+ if (isPullConsumer()) {
+ envelope = dequeue(-1); // Let server tell us if empty.
+ } else {
+ envelope = dequeue(timeout); // Check local prefetch only.
}
+
+ return copy(ackFromReceive(envelope));
}
/**
@@ -238,9 +246,82 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC
public Message receiveNoWait() throws JMSException {
checkClosed();
checkMessageListener();
- sendPullCommand(-1);
+ sendPullCommand(0);
+
+ JmsInboundMessageDispatch envelope = null;
+ if (isPullConsumer()) {
+ envelope = dequeue(-1); // Let server tell us if empty.
+ } else {
+ envelope = dequeue(0); // Check local prefetch only.
+ }
- return copy(ackFromReceive(this.messageQueue.dequeueNoWait()));
+ return copy(ackFromReceive(envelope));
+ }
+
+ /**
+ * Used to get an enqueued message from the unconsumedMessages list. The
+ * amount of time this method blocks is based on the timeout value.
+ *
+ * timeout < 0 then it blocks until a message is received.
+ * timeout = 0 then it returns a message or null if none available
+ * timeout > 0 then it blocks up to timeout amount of time.
+ *
+ * This method may consume messages that are expired or exceed a configured
+ * delivery count value but will continue to wait for the configured timeout.
+ *
+ * @throws JMSException
+ * @return null if we timeout or if the consumer is closed.
+ */
+ private JmsInboundMessageDispatch dequeue(long timeout) throws JMSException {
+
+ try {
+ long deadline = 0;
+ if (timeout > 0) {
+ deadline = System.currentTimeMillis() + timeout;
+ }
+
+ while (true) {
+ JmsInboundMessageDispatch envelope = messageQueue.dequeue(timeout);
+ if (envelope == null) {
+ if (timeout > 0 && !messageQueue.isClosed()) {
+ timeout = Math.max(deadline - System.currentTimeMillis(), 0);
+ } else {
+ if (failureCause != null && !messageQueue.isClosed()) {
+ LOG.debug("{} receive failed: {}", getConsumerId(), failureCause.getMessage());
+ throw JmsExceptionSupport.create(failureCause);
+ } else {
+ return null;
+ }
+ }
+ } else if (envelope.getMessage() == null) {
+ LOG.trace("{} no message was available for this consumer: {}", getConsumerId());
+ return null;
+ } else if (redeliveryExceeded(envelope)) {
+ LOG.debug("{} received with excessive redelivered: {}", getConsumerId(), envelope);
+ // TODO - Future
+ // Reject this delivery as not deliverable here
+ if (timeout > 0) {
+ timeout = Math.max(deadline - System.currentTimeMillis(), 0);
+ }
+ sendPullCommand(timeout);
+ } else {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(getConsumerId() + " received message: " + envelope);
+ }
+ return envelope;
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw JmsExceptionSupport.create(e);
+ }
+ }
+
+ protected boolean redeliveryExceeded(JmsInboundMessageDispatch envelope) {
+ // TODO - Future
+ // Check for message that have been redelivered to see if they exceed
+ // some set maximum redelivery count
+ return false;
}
protected void checkClosed() throws IllegalStateException {
@@ -500,6 +581,10 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC
return false;
}
+ public boolean isPullConsumer() {
+ return getPrefetchSize() == 0;
+ }
+
@Override
public void setAvailableListener(JmsMessageAvailableListener availableListener) {
this.availableListener = availableListener;
@@ -511,6 +596,9 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC
}
protected void onConnectionInterrupted() {
+ // TODO - We might want to wake all blocking receive calls
+ // to interrupt pull consumers, although that also
+ // wakes infinite wait receivers so how to deal with that?
messageQueue.clear();
}
@@ -530,21 +618,19 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC
}
/**
- * Triggers a pull request from the connected Provider. An attempt is made to set
- * a timeout on the pull request however some providers will not honor this value
- * and the pull will remain active until a message is dispatched.
+ * Triggers a pull request from the connected Provider with the given timeout value.
* <p>
* The timeout value can be one of:
* <br>
- * {@literal < 0} to indicate that the request should expire immediately if no message.<br>
- * {@literal = 0} to indicate that the request should never time out.<br>
- * {@literal > 1} to indicate that the request should expire after the given time in milliseconds.
+ * {@literal < 0} to indicate that the request should never time out.<br>
+ * {@literal = 0} to indicate that the request should expire immediately if no message.<br>
+ * {@literal > 0} to indicate that the request should expire after the given time in milliseconds.
*
* @param timeout
* The amount of time the pull request should remain valid.
*/
protected void sendPullCommand(long timeout) throws JMSException {
- if (messageQueue.isEmpty() && (getPrefetchSize() == 0 || isBrowser())) {
+ if (!messageQueue.isClosed() && messageQueue.isEmpty() && (getPrefetchSize() == 0 || isBrowser())) {
connection.pull(getConsumerId(), timeout);
}
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9f6ecd3b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/Provider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/Provider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/Provider.java
index d56e3b1..e2f4f56 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/Provider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/Provider.java
@@ -283,6 +283,12 @@ public interface Provider {
* consumer. If the consumer has a set prefetch that's greater than zero this method
* should just return without performing and action.
*
+ * timeout < 0 then it should remain open until a message is received.
+ * timeout = 0 then it returns a message or null if none available
+ * timeout > 0 then it should remain open for timeout amount of time.
+ *
+ * The timeout value when positive is given in milliseconds.
+ *
* @param timeout
* the amount of time to tell the remote peer to keep this pull request valid.
* @param request
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9f6ecd3b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
index af2b431..ebec59a 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
@@ -22,6 +22,8 @@ import java.net.URI;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import javax.jms.JMSException;
import javax.jms.JMSSecurityException;
@@ -330,6 +332,25 @@ public class AmqpConnection extends AmqpAbstractResource<JmsConnectionInfo, Conn
return properties;
}
+ /**
+ * Allows a connection resource to schedule a task for future execution.
+ *
+ * @param task
+ * The Runnable task to be executed after the given delay.
+ * @param delay
+ * The delay in milliseconds to schedule the given task for execution.
+ *
+ * @return a ScheduledFuture instance that can be used to cancel the task.
+ */
+ public ScheduledFuture<?> schedule(final Runnable task, long delay) {
+ if (task == null) {
+ LOG.trace("Resource attempted to schedule a null task.");
+ return null;
+ }
+
+ return getProvider().getScheduler().schedule(task, delay, TimeUnit.MILLISECONDS);
+ }
+
@Override
public String toString() {
return "AmqpConnection { " + getConnectionInfo().getConnectionId() + " }";
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9f6ecd3b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index a5405ac..efbc9cb 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.ListIterator;
import java.util.Map;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.InvalidDestinationException;
@@ -79,8 +80,8 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
private static final Modified MODIFIED_FAILED = new Modified();
private static final Modified MODIFIED_UNDELIVERABLE = new Modified();
- static
- {
+
+ static {
MODIFIED_FAILED.setDeliveryFailed(true);
MODIFIED_UNDELIVERABLE.setDeliveryFailed(true);
@@ -96,6 +97,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
private final AtomicLong incomingSequence = new AtomicLong(0);
private AsyncResult stopRequest;
+ private AsyncResult pullRequest;
public AmqpConsumer(AmqpSession session, JmsConsumerInfo info) {
super(info);
@@ -127,7 +129,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
stopRequest = request;
}
} else {
- // TODO: We dont actually want the additional messages that could be sent while
+ // TODO: We don't actually want the additional messages that could be sent while
// draining. We could explicitly reduce credit first, or possibly use 'echo' instead
// of drain if it was supported. We would first need to understand what happens
// if we reduce credit below the number of messages already in-flight before
@@ -149,12 +151,26 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
}
}
+ if (pullRequest != null) {
+ Receiver receiver = getEndpoint();
+ if (receiver.getRemoteCredit() <= 0) {
+ if (receiver.getQueued() <= 0) {
+ pullRequest.onFailure(null);
+ } else {
+ pullRequest.onSuccess();
+ }
+ pullRequest = null;
+ }
+ }
+
+ LOG.trace("Consumer {} flow updated, remote credit = {}", getConsumerId(), getEndpoint().getRemoteCredit());
+
super.processFlowUpdates(provider);
}
@Override
protected void doOpen() {
- JmsDestination destination = resource.getDestination();
+ JmsDestination destination = resource.getDestination();
String subscription = AmqpDestinationHelper.INSTANCE.getDestinationAddress(destination, session.getConnection());
Source source = new Source();
@@ -188,8 +204,8 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
@Override
protected void doOpenCompletion() {
// Verify the attach response contained a non-null Source
- org.apache.qpid.proton.amqp.transport.Source s = getEndpoint().getRemoteSource();
- if (s != null) {
+ org.apache.qpid.proton.amqp.transport.Source source = getEndpoint().getRemoteSource();
+ if (source != null) {
super.doOpenCompletion();
} else {
// No link terminus was created, the peer will now detach/close us.
@@ -199,8 +215,8 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
@Override
protected Exception getOpenAbortException() {
// Verify the attach response contained a non-null Source
- org.apache.qpid.proton.amqp.transport.Source s = getEndpoint().getRemoteSource();
- if (s != null) {
+ org.apache.qpid.proton.amqp.transport.Source source = getEndpoint().getRemoteSource();
+ if (source != null) {
return super.getOpenAbortException();
} else {
// No link terminus was created, the peer has detach/closed us, create IDE.
@@ -377,15 +393,41 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
}
/**
- * For a consumer whose prefetch value is set to zero this method will attempt to solicite
- * a new message dispatch from the broker.
+ * Request a remote peer send a Message to this client.
+ *
+ * timeout < 0 then it should remain open until a message is received.
+ * timeout = 0 then it returns a message or null if none available
+ * timeout > 0 then it should remain open for timeout amount of time.
+ *
+ * The timeout value when positive is given in milliseconds.
*
* @param timeout
+ * the amount of time to tell the remote peer to keep this pull request valid.
*/
- public void pull(long timeout) {
+ public void pull(final long timeout) {
+ LOG.trace("Pull called on consumer {} with timouet = {}", getConsumerId(), timeout);
if (resource.getPrefetchSize() == 0 && getEndpoint().getCredit() == 0) {
- // expand the credit window by one.
- getEndpoint().flow(1);
+ if (timeout < 0) {
+ getEndpoint().flow(1);
+ } else if (timeout == 0) {
+ pullRequest = new DrainingPullRequest();
+ getEndpoint().drain(1);
+ } else if (timeout > 0) {
+ // We need to turn off the credit and signal the consumer
+ // that there was no message.
+ final ScheduledFuture<?> future = getSession().schedule(new Runnable() {
+
+ @Override
+ public void run() {
+ if (getEndpoint().getRemoteCredit() != 0) {
+ getEndpoint().drain(0);
+ }
+ }
+ }, timeout);
+
+ getEndpoint().flow(1);
+ pullRequest = new TimedPullRequest(future);
+ }
}
}
@@ -397,6 +439,12 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
if (incoming != null) {
if (incoming.isReadable() && !incoming.isPartial()) {
LOG.trace("{} has incoming Message(s).", this);
+
+ if (pullRequest != null) {
+ pullRequest.onSuccess();
+ pullRequest = null;
+ }
+
try {
processDelivery(incoming);
} catch (Exception e) {
@@ -407,11 +455,20 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
incoming = null;
}
} else {
+ LOG.info("Incoming null delivery");
+
// We have exhausted the locally queued messages on this link.
// Check if we tried to stop and have now run out of credit.
- if (stopRequest != null && getEndpoint().getRemoteCredit() <= 0) {
- stopRequest.onSuccess();
- stopRequest = null;
+ if (getEndpoint().getRemoteCredit() <= 0) {
+ if (stopRequest != null) {
+ stopRequest.onSuccess();
+ stopRequest = null;
+ }
+
+ if (pullRequest != null) {
+ pullRequest.onFailure(null);
+ pullRequest = null;
+ }
}
}
} while (incoming != null);
@@ -583,4 +640,42 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
*/
public void postRollback() throws Exception {
}
+
+ private class DrainingPullRequest implements AsyncResult {
+
+ @Override
+ public void onFailure(Throwable result) {
+ JmsInboundMessageDispatch pullDone = new JmsInboundMessageDispatch(getNextIncomingSequenceNumber());
+ pullDone.setConsumerId(getConsumerId());
+ try {
+ deliver(pullDone);
+ } catch (Exception e) {
+ getSession().reportError(IOExceptionSupport.create(e));
+ }
+ }
+
+ @Override
+ public void onSuccess() {
+ // Nothing to do here.
+ }
+
+ @Override
+ public boolean isComplete() {
+ return false;
+ }
+ }
+
+ private class TimedPullRequest extends DrainingPullRequest {
+
+ private final ScheduledFuture<?> completionTask;
+
+ public TimedPullRequest(ScheduledFuture<?> completionTask) {
+ this.completionTask = completionTask;
+ }
+
+ @Override
+ public void onSuccess() {
+ completionTask.cancel(false);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9f6ecd3b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
index 6c17587..8625e26 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
@@ -258,7 +258,7 @@ public class AmqpFixedProducer extends AmqpProducer {
Target target = new Target();
target.setAddress(targetAddress);
Symbol typeCapability = AmqpDestinationHelper.INSTANCE.toTypeCapability(destination);
- if(typeCapability != null) {
+ if (typeCapability != null) {
target.setCapabilities(typeCapability);
}
@@ -282,8 +282,8 @@ public class AmqpFixedProducer extends AmqpProducer {
@Override
protected void doOpenCompletion() {
// Verify the attach response contained a non-null target
- org.apache.qpid.proton.amqp.transport.Target t = getEndpoint().getRemoteTarget();
- if (t != null) {
+ org.apache.qpid.proton.amqp.transport.Target target = getEndpoint().getRemoteTarget();
+ if (target != null) {
super.doOpenCompletion();
} else {
// No link terminus was created, the peer will now detach/close us.
@@ -293,8 +293,8 @@ public class AmqpFixedProducer extends AmqpProducer {
@Override
protected Exception getOpenAbortException() {
// Verify the attach response contained a non-null target
- org.apache.qpid.proton.amqp.transport.Target t = getEndpoint().getRemoteTarget();
- if (t != null) {
+ org.apache.qpid.proton.amqp.transport.Target target = getEndpoint().getRemoteTarget();
+ if (target != null) {
return super.getOpenAbortException();
} else {
// No link terminus was created, the peer has detach/closed us, create IDE.
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9f6ecd3b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index 5db3472..15c3cc2 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -1061,6 +1061,10 @@ public class AmqpProvider implements Provider, TransportListener {
return remoteURI;
}
+ ScheduledExecutorService getScheduler() {
+ return this.serializer;
+ }
+
private final class IdleTimeoutCheck implements Runnable {
@Override
public void run() {
@@ -1089,7 +1093,7 @@ public class AmqpProvider implements Provider, TransportListener {
LOG.trace("IdleTimeoutCheck skipping check, connection is not active.");
}
- if(!checkScheduled) {
+ if (!checkScheduled) {
nextIdleTimeoutCheck = null;
LOG.trace("IdleTimeoutCheck exiting");
}
@@ -1097,7 +1101,7 @@ public class AmqpProvider implements Provider, TransportListener {
}
Principal getLocalPrincipal() {
- if(transport instanceof SSLTransport) {
+ if (transport instanceof SSLTransport) {
return ((SSLTransport) transport).getLocalPrincipal();
}
@@ -1105,7 +1109,7 @@ public class AmqpProvider implements Provider, TransportListener {
}
private static void setHostname(Sasl sasl, String hostname) {
- //TODO: this is a hack until Proton 0.10+ is available with sasl#setHostname method.
+ // TODO: this is a hack until Proton 0.10+ is available with sasl#setHostname method.
try {
Field field = sasl.getClass().getDeclaredField("_hostname");
field.setAccessible(true);
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9f6ecd3b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
index 694ce2c..2dd72fb 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
@@ -18,6 +18,8 @@ package org.apache.qpid.jms.provider.amqp;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import javax.jms.IllegalStateException;
@@ -213,6 +215,25 @@ public class AmqpSession extends AmqpAbstractResource<JmsSessionInfo, Session> {
getTransactionContext().rollback(request);
}
+ /**
+ * Allows a session resource to schedule a task for future execution.
+ *
+ * @param task
+ * The Runnable task to be executed after the given delay.
+ * @param delay
+ * The delay in milliseconds to schedule the given task for execution.
+ *
+ * @return a ScheduledFuture instance that can be used to cancel the task.
+ */
+ public ScheduledFuture<?> schedule(final Runnable task, long delay) {
+ if (task == null) {
+ LOG.trace("Resource attempted to schedule a null task.");
+ return null;
+ }
+
+ return getProvider().getScheduler().schedule(task, delay, TimeUnit.MILLISECONDS);
+ }
+
void addResource(AmqpConsumer consumer) {
consumers.put(consumer.getConsumerId(), consumer);
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9f6ecd3b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java
index 131e656..2759e7a 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java
@@ -125,8 +125,8 @@ public class AmqpTemporaryDestination extends AmqpAbstractResource<JmsTemporaryD
@Override
protected void doOpenCompletion() {
// Verify the attach response contained a non-null target
- org.apache.qpid.proton.amqp.transport.Target t = getEndpoint().getRemoteTarget();
- if (t != null) {
+ org.apache.qpid.proton.amqp.transport.Target target = getEndpoint().getRemoteTarget();
+ if (target != null) {
super.doOpenCompletion();
} else {
// No link terminus was created, the peer will now detach/close us.
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9f6ecd3b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsZeroPrefetchTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsZeroPrefetchTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsZeroPrefetchTest.java
index 084ae13..bfcdf87 100644
--- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsZeroPrefetchTest.java
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsZeroPrefetchTest.java
@@ -37,7 +37,7 @@ import org.apache.qpid.jms.support.Wait;
import org.junit.Test;
/**
- *
+ * Test for MessageConsumer that has a prefetch value of zero.
*/
public class JmsZeroPrefetchTest extends AmqpTestSupport {
@@ -47,7 +47,7 @@ public class JmsZeroPrefetchTest extends AmqpTestSupport {
((JmsConnection)connection).getPrefetchPolicy().setAll(0);
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue queue = session.createQueue(name.getMethodName());
+ Queue queue = session.createQueue(getDestinationName());
MessageConsumer consumer = session.createConsumer(queue);
MessageListener listener = new MessageListener() {
@@ -61,13 +61,117 @@ public class JmsZeroPrefetchTest extends AmqpTestSupport {
}
@Test(timeout = 60000)
- public void testPullConsumerWorks() throws Exception {
+ public void testBlockingReceivesUnBlocksOnMessageSend() throws Exception {
+ connection = createAmqpConnection();
+ ((JmsConnection)connection).getPrefetchPolicy().setAll(0);
+ connection.start();
+
+ final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(getDestinationName());
+
+ final MessageProducer producer = session.createProducer(queue);
+
+ Thread producerThread = new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(1500);
+ producer.send(session.createTextMessage("Hello World! 1"));
+ } catch (Exception e) {
+ }
+ }
+ });
+ producerThread.start();
+
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message answer = consumer.receive();
+ assertNotNull("Should have received a message!", answer);
+
+ final QueueViewMBean queueView = getProxyToQueue(getDestinationName());
+
+ // Assert that we only pulled one message and that we didn't cause
+ // the other message to be dispatched.
+ assertTrue(Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return queueView.getQueueSize() == 0;
+ }
+ }));
+ }
+
+ @Test(timeout = 60000)
+ public void testReceiveTimesOutAndRemovesCredit() throws Exception {
+ connection = createAmqpConnection();
+ ((JmsConnection)connection).getPrefetchPolicy().setAll(0);
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(getDestinationName());
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message answer = consumer.receive(100);
+ assertNull("Should have not received a message!", answer);
+
+ MessageProducer producer = session.createProducer(queue);
+ producer.send(session.createTextMessage("Hello World! 1"));
+
+ final QueueViewMBean queueView = getProxyToQueue(getDestinationName());
+
+ // Assert that we only pulled one message and that we didn't cause
+ // the other message to be dispatched.
+ assertTrue(Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return queueView.getQueueSize() == 1;
+ }
+ }));
+
+ assertEquals(0, queueView.getInFlightCount());
+ }
+
+ @Test(timeout = 60000)
+ public void testReceiveNoWaitWaitForSever() throws Exception {
connection = createAmqpConnection();
((JmsConnection)connection).getPrefetchPolicy().setAll(0);
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue queue = session.createQueue(name.getMethodName());
+ Queue queue = session.createQueue(getDestinationName());
+ MessageProducer producer = session.createProducer(queue);
+ producer.send(session.createTextMessage("Hello World! 1"));
+
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message answer = consumer.receiveNoWait();
+ assertNotNull("Should have received a message!", answer);
+
+ // Send another, it should not get dispatched.
+ producer.send(session.createTextMessage("Hello World! 2"));
+
+ final QueueViewMBean queueView = getProxyToQueue(getDestinationName());
+
+ // Assert that we only pulled one message and that we didn't cause
+ // the other message to be dispatched.
+ assertTrue(Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return queueView.getQueueSize() == 1;
+ }
+ }));
+
+ assertEquals(0, queueView.getInFlightCount());
+ }
+
+ @Test(timeout = 60000)
+ public void testRepeatedPullAttempts() throws Exception {
+ connection = createAmqpConnection();
+ ((JmsConnection)connection).getPrefetchPolicy().setAll(0);
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(getDestinationName());
MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage("Hello World!"));
@@ -75,6 +179,7 @@ public class JmsZeroPrefetchTest extends AmqpTestSupport {
MessageConsumer consumer = session.createConsumer(queue);
Message answer = consumer.receive(5000);
assertNotNull("Should have received a message!", answer);
+
// check if method will return at all and will return a null
answer = consumer.receive(1);
assertNull("Should have not received a message!", answer);
@@ -89,12 +194,12 @@ public class JmsZeroPrefetchTest extends AmqpTestSupport {
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue queue = session.createQueue(name.getMethodName());
+ Queue queue = session.createQueue(getDestinationName());
MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage("Hello World! 1"));
producer.send(session.createTextMessage("Hello World! 2"));
- final QueueViewMBean queueView = getProxyToQueue(name.getMethodName());
+ final QueueViewMBean queueView = getProxyToQueue(getDestinationName());
// Check initial Queue State
assertEquals(2, queueView.getQueueSize());
@@ -125,7 +230,7 @@ public class JmsZeroPrefetchTest extends AmqpTestSupport {
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue queue = session.createQueue(name.getMethodName());
+ Queue queue = session.createQueue(getDestinationName());
MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage("Msg1"));
@@ -144,4 +249,30 @@ public class JmsZeroPrefetchTest extends AmqpTestSupport {
answer = (TextMessage)consumer2.receiveNoWait();
assertNull("Should have not received a message!", answer);
}
+
+ @Test(timeout = 60000)
+ public void testConsumerWithNoMessageDoesNotHogMessages() throws Exception {
+ connection = createAmqpConnection();
+ ((JmsConnection)connection).getPrefetchPolicy().setAll(0);
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(getDestinationName());
+
+ // Try and receive one message which will fail
+ MessageConsumer consumer1 = session.createConsumer(queue);
+ assertNull(consumer1.receive(10));
+
+ // Now Producer a message
+ MessageProducer producer = session.createProducer(queue);
+ producer.send(session.createTextMessage("Msg1"));
+
+ // now lets receive it with the second consumer, the first should
+ // not be accepting messages now and the broker should give it to
+ // consumer 2.
+ MessageConsumer consumer2 = session.createConsumer(queue);
+ TextMessage answer = (TextMessage)consumer2.receive(3000);
+ assertNotNull(answer);
+ assertEquals("Should have received a message!", answer.getText(), "Msg1");
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9f6ecd3b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/resources/log4j.properties b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/resources/log4j.properties
index 2b107ef..ce9b95c 100644
--- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/resources/log4j.properties
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/resources/log4j.properties
@@ -21,7 +21,7 @@
log4j.rootLogger=INFO, out, stdout
log4j.logger.org.apache.qpid.jms=INFO
-log4j.logger.org.apache.qpid.jms.provider=DEBUG
+log4j.logger.org.apache.qpid.jms.provider=INFO
# Tune the ActiveMQ and it's AMQP transport as needed for debugging.
log4j.logger.org.apache.activemq=INFO
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org