You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2015/08/21 16:43:46 UTC

qpid-jms git commit: https://issues.apache.org/jira/browse/QPIDJMS-92

Repository: qpid-jms
Updated Branches:
  refs/heads/master 6573d8df8 -> 9f6ecd3b4


https://issues.apache.org/jira/browse/QPIDJMS-92

Makes the pull consumer actually pull from the remote peer and clears
any granted credit if the pull did not succeed in getting any messages.

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/9f6ecd3b
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/9f6ecd3b
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/9f6ecd3b

Branch: refs/heads/master
Commit: 9f6ecd3b407880e4283e403ebbe6aff6665c3813
Parents: 6573d8d
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Aug 21 10:43:31 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Aug 21 10:43:31 2015 -0400

----------------------------------------------------------------------
 .../org/apache/qpid/jms/JmsMessageConsumer.java | 118 +++++++++++++--
 .../org/apache/qpid/jms/provider/Provider.java  |   6 +
 .../qpid/jms/provider/amqp/AmqpConnection.java  |  21 +++
 .../qpid/jms/provider/amqp/AmqpConsumer.java    | 127 ++++++++++++++--
 .../jms/provider/amqp/AmqpFixedProducer.java    |  10 +-
 .../qpid/jms/provider/amqp/AmqpProvider.java    |  10 +-
 .../qpid/jms/provider/amqp/AmqpSession.java     |  21 +++
 .../provider/amqp/AmqpTemporaryDestination.java |   4 +-
 .../qpid/jms/consumer/JmsZeroPrefetchTest.java  | 145 ++++++++++++++++++-
 .../src/test/resources/log4j.properties         |   2 +-
 10 files changed, 414 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9f6ecd3b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
index dff2c83..e8a7066 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
@@ -40,12 +40,16 @@ import org.apache.qpid.jms.provider.ProviderFuture;
 import org.apache.qpid.jms.util.FifoMessageQueue;
 import org.apache.qpid.jms.util.MessageQueue;
 import org.apache.qpid.jms.util.PriorityMessageQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * implementation of a JMS Message Consumer
  */
 public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableConsumer, JmsMessageDispatcher {
 
+    private static final Logger LOG = LoggerFactory.getLogger(JmsMessageConsumer.class);
+
     protected final JmsSession session;
     protected final JmsConnection connection;
     protected JmsConsumerInfo consumerInfo;
@@ -215,18 +219,22 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC
     public Message receive(long timeout) throws JMSException {
         checkClosed();
         checkMessageListener();
-        sendPullCommand(timeout);
 
-        long wait = timeout;
+        // Configure for infinite wait when timeout is zero (JMS Spec)
         if (timeout == 0) {
-            wait = -1;
+            timeout = -1;
         }
 
-        try {
-            return copy(ackFromReceive(this.messageQueue.dequeue(wait)));
-        } catch (InterruptedException e) {
-            throw JmsExceptionSupport.create(e);
+        sendPullCommand(timeout);
+
+        JmsInboundMessageDispatch envelope = null;
+        if (isPullConsumer()) {
+            envelope = dequeue(-1); // Let server tell us if empty.
+        } else {
+            envelope = dequeue(timeout); // Check local prefetch only.
         }
+
+        return copy(ackFromReceive(envelope));
     }
 
     /**
@@ -238,9 +246,82 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC
     public Message receiveNoWait() throws JMSException {
         checkClosed();
         checkMessageListener();
-        sendPullCommand(-1);
+        sendPullCommand(0);
+
+        JmsInboundMessageDispatch envelope = null;
+        if (isPullConsumer()) {
+            envelope = dequeue(-1); // Let server tell us if empty.
+        } else {
+            envelope = dequeue(0); // Check local prefetch only.
+        }
 
-        return copy(ackFromReceive(this.messageQueue.dequeueNoWait()));
+        return copy(ackFromReceive(envelope));
+    }
+
+    /**
+     * Used to get an enqueued message from the unconsumedMessages list. The
+     * amount of time this method blocks is based on the timeout value.
+     *
+     *   timeout < 0 then it blocks until a message is received.
+     *   timeout = 0 then it returns a message or null if none available
+     *   timeout > 0 then it blocks up to timeout amount of time.
+     *
+     * This method may consume messages that are expired or exceed a configured
+     * delivery count value but will continue to wait for the configured timeout.
+     *
+     * @throws JMSException
+     * @return null if we timeout or if the consumer is closed.
+     */
+    private JmsInboundMessageDispatch dequeue(long timeout) throws JMSException {
+
+        try {
+            long deadline = 0;
+            if (timeout > 0) {
+                deadline = System.currentTimeMillis() + timeout;
+            }
+
+            while (true) {
+                JmsInboundMessageDispatch envelope = messageQueue.dequeue(timeout);
+                if (envelope == null) {
+                    if (timeout > 0 && !messageQueue.isClosed()) {
+                        timeout = Math.max(deadline - System.currentTimeMillis(), 0);
+                    } else {
+                        if (failureCause != null && !messageQueue.isClosed()) {
+                            LOG.debug("{} receive failed: {}", getConsumerId(), failureCause.getMessage());
+                            throw JmsExceptionSupport.create(failureCause);
+                        } else {
+                            return null;
+                        }
+                    }
+                } else if (envelope.getMessage() == null) {
+                    LOG.trace("{} no message was available for this consumer: {}", getConsumerId());
+                    return null;
+                } else if (redeliveryExceeded(envelope)) {
+                    LOG.debug("{} received with excessive redelivered: {}", getConsumerId(), envelope);
+                    // TODO - Future
+                    // Reject this delivery as not deliverable here
+                    if (timeout > 0) {
+                        timeout = Math.max(deadline - System.currentTimeMillis(), 0);
+                    }
+                    sendPullCommand(timeout);
+                } else {
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace(getConsumerId() + " received message: " + envelope);
+                    }
+                    return envelope;
+                }
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw JmsExceptionSupport.create(e);
+        }
+    }
+
+    protected boolean redeliveryExceeded(JmsInboundMessageDispatch envelope) {
+        // TODO - Future
+        // Check for message that have been redelivered to see if they exceed
+        // some set maximum redelivery count
+        return false;
     }
 
     protected void checkClosed() throws IllegalStateException {
@@ -500,6 +581,10 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC
         return false;
     }
 
+    public boolean isPullConsumer() {
+        return getPrefetchSize() == 0;
+    }
+
     @Override
     public void setAvailableListener(JmsMessageAvailableListener availableListener) {
         this.availableListener = availableListener;
@@ -511,6 +596,9 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC
     }
 
     protected void onConnectionInterrupted() {
+        // TODO - We might want to wake all blocking receive calls
+        //        to interrupt pull consumers, although that also
+        //        wakes infinite wait receivers so how to deal with that?
         messageQueue.clear();
     }
 
@@ -530,21 +618,19 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC
     }
 
     /**
-     * Triggers a pull request from the connected Provider.  An attempt is made to set
-     * a timeout on the pull request however some providers will not honor this value
-     * and the pull will remain active until a message is dispatched.
+     * Triggers a pull request from the connected Provider with the given timeout value.
      * <p>
      * The timeout value can be one of:
      * <br>
-     * {@literal < 0} to indicate that the request should expire immediately if no message.<br>
-     * {@literal = 0} to indicate that the request should never time out.<br>
-     * {@literal > 1} to indicate that the request should expire after the given time in milliseconds.
+     * {@literal < 0} to indicate that the request should never time out.<br>
+     * {@literal = 0} to indicate that the request should expire immediately if no message.<br>
+     * {@literal > 0} to indicate that the request should expire after the given time in milliseconds.
      *
      * @param timeout
      *        The amount of time the pull request should remain valid.
      */
     protected void sendPullCommand(long timeout) throws JMSException {
-        if (messageQueue.isEmpty() && (getPrefetchSize() == 0 || isBrowser())) {
+        if (!messageQueue.isClosed() && messageQueue.isEmpty() && (getPrefetchSize() == 0 || isBrowser())) {
             connection.pull(getConsumerId(), timeout);
         }
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9f6ecd3b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/Provider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/Provider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/Provider.java
index d56e3b1..e2f4f56 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/Provider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/Provider.java
@@ -283,6 +283,12 @@ public interface Provider {
      * consumer.  If the consumer has a set prefetch that's greater than zero this method
      * should just return without performing and action.
      *
+     *   timeout < 0 then it should remain open until a message is received.
+     *   timeout = 0 then it returns a message or null if none available
+     *   timeout > 0 then it should remain open for timeout amount of time.
+     *
+     * The timeout value when positive is given in milliseconds.
+     *
      * @param timeout
      *        the amount of time to tell the remote peer to keep this pull request valid.
      * @param request

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9f6ecd3b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
index af2b431..ebec59a 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
@@ -22,6 +22,8 @@ import java.net.URI;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 
 import javax.jms.JMSException;
 import javax.jms.JMSSecurityException;
@@ -330,6 +332,25 @@ public class AmqpConnection extends AmqpAbstractResource<JmsConnectionInfo, Conn
         return properties;
     }
 
+    /**
+     * Allows a connection resource to schedule a task for future execution.
+     *
+     * @param task
+     *      The Runnable task to be executed after the given delay.
+     * @param delay
+     *      The delay in milliseconds to schedule the given task for execution.
+     *
+     * @return a ScheduledFuture instance that can be used to cancel the task.
+     */
+    public ScheduledFuture<?> schedule(final Runnable task, long delay) {
+        if (task == null) {
+            LOG.trace("Resource attempted to schedule a null task.");
+            return null;
+        }
+
+        return getProvider().getScheduler().schedule(task, delay, TimeUnit.MILLISECONDS);
+    }
+
     @Override
     public String toString() {
         return "AmqpConnection { " + getConnectionInfo().getConnectionId() + " }";

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9f6ecd3b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index a5405ac..efbc9cb 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.ListIterator;
 import java.util.Map;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.atomic.AtomicLong;
 
 import javax.jms.InvalidDestinationException;
@@ -79,8 +80,8 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
 
     private static final Modified MODIFIED_FAILED = new Modified();
     private static final Modified MODIFIED_UNDELIVERABLE = new Modified();
-    static
-    {
+
+    static {
         MODIFIED_FAILED.setDeliveryFailed(true);
 
         MODIFIED_UNDELIVERABLE.setDeliveryFailed(true);
@@ -96,6 +97,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
     private final AtomicLong incomingSequence = new AtomicLong(0);
 
     private AsyncResult stopRequest;
+    private AsyncResult pullRequest;
 
     public AmqpConsumer(AmqpSession session, JmsConsumerInfo info) {
         super(info);
@@ -127,7 +129,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
                 stopRequest = request;
             }
         } else {
-            // TODO: We dont actually want the additional messages that could be sent while
+            // TODO: We don't actually want the additional messages that could be sent while
             // draining. We could explicitly reduce credit first, or possibly use 'echo' instead
             // of drain if it was supported. We would first need to understand what happens
             // if we reduce credit below the number of messages already in-flight before
@@ -149,12 +151,26 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
             }
         }
 
+        if (pullRequest != null) {
+            Receiver receiver = getEndpoint();
+            if (receiver.getRemoteCredit() <= 0) {
+                if (receiver.getQueued() <= 0) {
+                    pullRequest.onFailure(null);
+                } else {
+                    pullRequest.onSuccess();
+                }
+                pullRequest = null;
+            }
+        }
+
+        LOG.trace("Consumer {} flow updated, remote credit = {}", getConsumerId(), getEndpoint().getRemoteCredit());
+
         super.processFlowUpdates(provider);
     }
 
     @Override
     protected void doOpen() {
-        JmsDestination destination  = resource.getDestination();
+        JmsDestination destination = resource.getDestination();
         String subscription = AmqpDestinationHelper.INSTANCE.getDestinationAddress(destination, session.getConnection());
 
         Source source = new Source();
@@ -188,8 +204,8 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
     @Override
     protected void doOpenCompletion() {
         // Verify the attach response contained a non-null Source
-        org.apache.qpid.proton.amqp.transport.Source s = getEndpoint().getRemoteSource();
-        if (s != null) {
+        org.apache.qpid.proton.amqp.transport.Source source = getEndpoint().getRemoteSource();
+        if (source != null) {
             super.doOpenCompletion();
         } else {
             // No link terminus was created, the peer will now detach/close us.
@@ -199,8 +215,8 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
     @Override
     protected Exception getOpenAbortException() {
         // Verify the attach response contained a non-null Source
-        org.apache.qpid.proton.amqp.transport.Source s = getEndpoint().getRemoteSource();
-        if (s != null) {
+        org.apache.qpid.proton.amqp.transport.Source source = getEndpoint().getRemoteSource();
+        if (source != null) {
             return super.getOpenAbortException();
         } else {
             // No link terminus was created, the peer has detach/closed us, create IDE.
@@ -377,15 +393,41 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
     }
 
     /**
-     * For a consumer whose prefetch value is set to zero this method will attempt to solicite
-     * a new message dispatch from the broker.
+     * Request a remote peer send a Message to this client.
+     *
+     *   timeout < 0 then it should remain open until a message is received.
+     *   timeout = 0 then it returns a message or null if none available
+     *   timeout > 0 then it should remain open for timeout amount of time.
+     *
+     * The timeout value when positive is given in milliseconds.
      *
      * @param timeout
+     *        the amount of time to tell the remote peer to keep this pull request valid.
      */
-    public void pull(long timeout) {
+    public void pull(final long timeout) {
+        LOG.trace("Pull called on consumer {} with timouet = {}", getConsumerId(), timeout);
         if (resource.getPrefetchSize() == 0 && getEndpoint().getCredit() == 0) {
-            // expand the credit window by one.
-            getEndpoint().flow(1);
+            if (timeout < 0) {
+                getEndpoint().flow(1);
+            } else if (timeout == 0) {
+                pullRequest = new DrainingPullRequest();
+                getEndpoint().drain(1);
+            } else if (timeout > 0) {
+                // We need to turn off the credit and signal the consumer
+                // that there was no message.
+                final ScheduledFuture<?> future = getSession().schedule(new Runnable() {
+
+                    @Override
+                    public void run() {
+                        if (getEndpoint().getRemoteCredit() != 0) {
+                            getEndpoint().drain(0);
+                        }
+                    }
+                }, timeout);
+
+                getEndpoint().flow(1);
+                pullRequest = new TimedPullRequest(future);
+            }
         }
     }
 
@@ -397,6 +439,12 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
             if (incoming != null) {
                 if (incoming.isReadable() && !incoming.isPartial()) {
                     LOG.trace("{} has incoming Message(s).", this);
+
+                    if (pullRequest != null) {
+                        pullRequest.onSuccess();
+                        pullRequest = null;
+                    }
+
                     try {
                         processDelivery(incoming);
                     } catch (Exception e) {
@@ -407,11 +455,20 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
                     incoming = null;
                 }
             } else {
+                LOG.info("Incoming null delivery");
+
                 // We have exhausted the locally queued messages on this link.
                 // Check if we tried to stop and have now run out of credit.
-                if (stopRequest != null && getEndpoint().getRemoteCredit() <= 0) {
-                    stopRequest.onSuccess();
-                    stopRequest = null;
+                if (getEndpoint().getRemoteCredit() <= 0) {
+                    if (stopRequest != null) {
+                        stopRequest.onSuccess();
+                        stopRequest = null;
+                    }
+
+                    if (pullRequest != null) {
+                        pullRequest.onFailure(null);
+                        pullRequest = null;
+                    }
                 }
             }
         } while (incoming != null);
@@ -583,4 +640,42 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
      */
     public void postRollback() throws Exception {
     }
+
+    private class DrainingPullRequest implements AsyncResult {
+
+        @Override
+        public void onFailure(Throwable result) {
+            JmsInboundMessageDispatch pullDone = new JmsInboundMessageDispatch(getNextIncomingSequenceNumber());
+            pullDone.setConsumerId(getConsumerId());
+            try {
+                deliver(pullDone);
+            } catch (Exception e) {
+                getSession().reportError(IOExceptionSupport.create(e));
+            }
+        }
+
+        @Override
+        public void onSuccess() {
+            // Nothing to do here.
+        }
+
+        @Override
+        public boolean isComplete() {
+            return false;
+        }
+    }
+
+    private class TimedPullRequest extends DrainingPullRequest {
+
+        private final ScheduledFuture<?> completionTask;
+
+        public TimedPullRequest(ScheduledFuture<?> completionTask) {
+            this.completionTask = completionTask;
+        }
+
+        @Override
+        public void onSuccess() {
+            completionTask.cancel(false);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9f6ecd3b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
index 6c17587..8625e26 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
@@ -258,7 +258,7 @@ public class AmqpFixedProducer extends AmqpProducer {
         Target target = new Target();
         target.setAddress(targetAddress);
         Symbol typeCapability =  AmqpDestinationHelper.INSTANCE.toTypeCapability(destination);
-        if(typeCapability != null) {
+        if (typeCapability != null) {
             target.setCapabilities(typeCapability);
         }
 
@@ -282,8 +282,8 @@ public class AmqpFixedProducer extends AmqpProducer {
     @Override
     protected void doOpenCompletion() {
         // Verify the attach response contained a non-null target
-        org.apache.qpid.proton.amqp.transport.Target t = getEndpoint().getRemoteTarget();
-        if (t != null) {
+        org.apache.qpid.proton.amqp.transport.Target target = getEndpoint().getRemoteTarget();
+        if (target != null) {
             super.doOpenCompletion();
         } else {
             // No link terminus was created, the peer will now detach/close us.
@@ -293,8 +293,8 @@ public class AmqpFixedProducer extends AmqpProducer {
     @Override
     protected Exception getOpenAbortException() {
         // Verify the attach response contained a non-null target
-        org.apache.qpid.proton.amqp.transport.Target t = getEndpoint().getRemoteTarget();
-        if (t != null) {
+        org.apache.qpid.proton.amqp.transport.Target target = getEndpoint().getRemoteTarget();
+        if (target != null) {
             return super.getOpenAbortException();
         } else {
             // No link terminus was created, the peer has detach/closed us, create IDE.

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9f6ecd3b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index 5db3472..15c3cc2 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -1061,6 +1061,10 @@ public class AmqpProvider implements Provider, TransportListener {
         return remoteURI;
     }
 
+    ScheduledExecutorService getScheduler() {
+        return this.serializer;
+    }
+
     private final class IdleTimeoutCheck implements Runnable {
         @Override
         public void run() {
@@ -1089,7 +1093,7 @@ public class AmqpProvider implements Provider, TransportListener {
                 LOG.trace("IdleTimeoutCheck skipping check, connection is not active.");
             }
 
-            if(!checkScheduled) {
+            if (!checkScheduled) {
                 nextIdleTimeoutCheck = null;
                 LOG.trace("IdleTimeoutCheck exiting");
             }
@@ -1097,7 +1101,7 @@ public class AmqpProvider implements Provider, TransportListener {
     }
 
     Principal getLocalPrincipal() {
-        if(transport instanceof SSLTransport) {
+        if (transport instanceof SSLTransport) {
             return ((SSLTransport) transport).getLocalPrincipal();
         }
 
@@ -1105,7 +1109,7 @@ public class AmqpProvider implements Provider, TransportListener {
     }
 
     private static void setHostname(Sasl sasl, String hostname) {
-        //TODO: this is a hack until Proton 0.10+ is available with sasl#setHostname method.
+        // TODO: this is a hack until Proton 0.10+ is available with sasl#setHostname method.
         try {
             Field field = sasl.getClass().getDeclaredField("_hostname");
             field.setAccessible(true);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9f6ecd3b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
index 694ce2c..2dd72fb 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
@@ -18,6 +18,8 @@ package org.apache.qpid.jms.provider.amqp;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 
 import javax.jms.IllegalStateException;
 
@@ -213,6 +215,25 @@ public class AmqpSession extends AmqpAbstractResource<JmsSessionInfo, Session> {
         getTransactionContext().rollback(request);
     }
 
+    /**
+     * Allows a session resource to schedule a task for future execution.
+     *
+     * @param task
+     *      The Runnable task to be executed after the given delay.
+     * @param delay
+     *      The delay in milliseconds to schedule the given task for execution.
+     *
+     * @return a ScheduledFuture instance that can be used to cancel the task.
+     */
+    public ScheduledFuture<?> schedule(final Runnable task, long delay) {
+        if (task == null) {
+            LOG.trace("Resource attempted to schedule a null task.");
+            return null;
+        }
+
+        return getProvider().getScheduler().schedule(task, delay, TimeUnit.MILLISECONDS);
+    }
+
     void addResource(AmqpConsumer consumer) {
         consumers.put(consumer.getConsumerId(), consumer);
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9f6ecd3b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java
index 131e656..2759e7a 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java
@@ -125,8 +125,8 @@ public class AmqpTemporaryDestination extends AmqpAbstractResource<JmsTemporaryD
     @Override
     protected void doOpenCompletion() {
         // Verify the attach response contained a non-null target
-        org.apache.qpid.proton.amqp.transport.Target t = getEndpoint().getRemoteTarget();
-        if (t != null) {
+        org.apache.qpid.proton.amqp.transport.Target target = getEndpoint().getRemoteTarget();
+        if (target != null) {
             super.doOpenCompletion();
         } else {
             // No link terminus was created, the peer will now detach/close us.

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9f6ecd3b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsZeroPrefetchTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsZeroPrefetchTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsZeroPrefetchTest.java
index 084ae13..bfcdf87 100644
--- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsZeroPrefetchTest.java
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsZeroPrefetchTest.java
@@ -37,7 +37,7 @@ import org.apache.qpid.jms.support.Wait;
 import org.junit.Test;
 
 /**
- *
+ * Test for MessageConsumer that has a prefetch value of zero.
  */
 public class JmsZeroPrefetchTest extends AmqpTestSupport {
 
@@ -47,7 +47,7 @@ public class JmsZeroPrefetchTest extends AmqpTestSupport {
         ((JmsConnection)connection).getPrefetchPolicy().setAll(0);
         connection.start();
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Queue queue = session.createQueue(name.getMethodName());
+        Queue queue = session.createQueue(getDestinationName());
         MessageConsumer consumer = session.createConsumer(queue);
 
         MessageListener listener = new MessageListener() {
@@ -61,13 +61,117 @@ public class JmsZeroPrefetchTest extends AmqpTestSupport {
     }
 
     @Test(timeout = 60000)
-    public void testPullConsumerWorks() throws Exception {
+    public void testBlockingReceivesUnBlocksOnMessageSend() throws Exception {
+        connection = createAmqpConnection();
+        ((JmsConnection)connection).getPrefetchPolicy().setAll(0);
+        connection.start();
+
+        final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(getDestinationName());
+
+        final MessageProducer producer = session.createProducer(queue);
+
+        Thread producerThread = new Thread(new Runnable() {
+
+            @Override
+            public void run() {
+                try {
+                    Thread.sleep(1500);
+                    producer.send(session.createTextMessage("Hello World! 1"));
+                } catch (Exception e) {
+                }
+            }
+        });
+        producerThread.start();
+
+        MessageConsumer consumer = session.createConsumer(queue);
+        Message answer = consumer.receive();
+        assertNotNull("Should have received a message!", answer);
+
+        final QueueViewMBean queueView = getProxyToQueue(getDestinationName());
+
+        // Assert that we only pulled one message and that we didn't cause
+        // the other message to be dispatched.
+        assertTrue(Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return queueView.getQueueSize() == 0;
+            }
+        }));
+    }
+
+    @Test(timeout = 60000)
+    public void testReceiveTimesOutAndRemovesCredit() throws Exception {
+        connection = createAmqpConnection();
+        ((JmsConnection)connection).getPrefetchPolicy().setAll(0);
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(getDestinationName());
+        MessageConsumer consumer = session.createConsumer(queue);
+        Message answer = consumer.receive(100);
+        assertNull("Should have not received a message!", answer);
+
+        MessageProducer producer = session.createProducer(queue);
+        producer.send(session.createTextMessage("Hello World! 1"));
+
+        final QueueViewMBean queueView = getProxyToQueue(getDestinationName());
+
+        // Assert that we only pulled one message and that we didn't cause
+        // the other message to be dispatched.
+        assertTrue(Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return queueView.getQueueSize() == 1;
+            }
+        }));
+
+        assertEquals(0, queueView.getInFlightCount());
+    }
+
+    @Test(timeout = 60000)
+    public void testReceiveNoWaitWaitForSever() throws Exception {
         connection = createAmqpConnection();
         ((JmsConnection)connection).getPrefetchPolicy().setAll(0);
         connection.start();
 
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Queue queue = session.createQueue(name.getMethodName());
+        Queue queue = session.createQueue(getDestinationName());
+        MessageProducer producer = session.createProducer(queue);
+        producer.send(session.createTextMessage("Hello World! 1"));
+
+        MessageConsumer consumer = session.createConsumer(queue);
+        Message answer = consumer.receiveNoWait();
+        assertNotNull("Should have received a message!", answer);
+
+        // Send another, it should not get dispatched.
+        producer.send(session.createTextMessage("Hello World! 2"));
+
+        final QueueViewMBean queueView = getProxyToQueue(getDestinationName());
+
+        // Assert that we only pulled one message and that we didn't cause
+        // the other message to be dispatched.
+        assertTrue(Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return queueView.getQueueSize() == 1;
+            }
+        }));
+
+        assertEquals(0, queueView.getInFlightCount());
+    }
+
+    @Test(timeout = 60000)
+    public void testRepeatedPullAttempts() throws Exception {
+        connection = createAmqpConnection();
+        ((JmsConnection)connection).getPrefetchPolicy().setAll(0);
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(getDestinationName());
         MessageProducer producer = session.createProducer(queue);
         producer.send(session.createTextMessage("Hello World!"));
 
@@ -75,6 +179,7 @@ public class JmsZeroPrefetchTest extends AmqpTestSupport {
         MessageConsumer consumer = session.createConsumer(queue);
         Message answer = consumer.receive(5000);
         assertNotNull("Should have received a message!", answer);
+
         // check if method will return at all and will return a null
         answer = consumer.receive(1);
         assertNull("Should have not received a message!", answer);
@@ -89,12 +194,12 @@ public class JmsZeroPrefetchTest extends AmqpTestSupport {
         connection.start();
 
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Queue queue = session.createQueue(name.getMethodName());
+        Queue queue = session.createQueue(getDestinationName());
         MessageProducer producer = session.createProducer(queue);
         producer.send(session.createTextMessage("Hello World! 1"));
         producer.send(session.createTextMessage("Hello World! 2"));
 
-        final QueueViewMBean queueView = getProxyToQueue(name.getMethodName());
+        final QueueViewMBean queueView = getProxyToQueue(getDestinationName());
 
         // Check initial Queue State
         assertEquals(2, queueView.getQueueSize());
@@ -125,7 +230,7 @@ public class JmsZeroPrefetchTest extends AmqpTestSupport {
         connection.start();
 
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Queue queue = session.createQueue(name.getMethodName());
+        Queue queue = session.createQueue(getDestinationName());
 
         MessageProducer producer = session.createProducer(queue);
         producer.send(session.createTextMessage("Msg1"));
@@ -144,4 +249,30 @@ public class JmsZeroPrefetchTest extends AmqpTestSupport {
         answer = (TextMessage)consumer2.receiveNoWait();
         assertNull("Should have not received a message!", answer);
     }
+
+    @Test(timeout = 60000)
+    public void testConsumerWithNoMessageDoesNotHogMessages() throws Exception {
+        connection = createAmqpConnection();
+        ((JmsConnection)connection).getPrefetchPolicy().setAll(0);
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(getDestinationName());
+
+        // Try and receive one message which will fail
+        MessageConsumer consumer1 = session.createConsumer(queue);
+        assertNull(consumer1.receive(10));
+
+        // Now Producer a message
+        MessageProducer producer = session.createProducer(queue);
+        producer.send(session.createTextMessage("Msg1"));
+
+        // now lets receive it with the second consumer, the first should
+        // not be accepting messages now and the broker should give it to
+        // consumer 2.
+        MessageConsumer consumer2 = session.createConsumer(queue);
+        TextMessage answer = (TextMessage)consumer2.receive(3000);
+        assertNotNull(answer);
+        assertEquals("Should have received a message!", answer.getText(), "Msg1");
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9f6ecd3b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/resources/log4j.properties b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/resources/log4j.properties
index 2b107ef..ce9b95c 100644
--- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/resources/log4j.properties
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/resources/log4j.properties
@@ -21,7 +21,7 @@
 log4j.rootLogger=INFO, out, stdout
 
 log4j.logger.org.apache.qpid.jms=INFO
-log4j.logger.org.apache.qpid.jms.provider=DEBUG
+log4j.logger.org.apache.qpid.jms.provider=INFO
 
 # Tune the ActiveMQ and it's AMQP transport as needed for debugging.
 log4j.logger.org.apache.activemq=INFO


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org