You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2017/02/03 16:26:31 UTC

qpid-jms git commit: QPIDJMS-256 QPIDJMS-231 Better deal with deffered consumer close

Repository: qpid-jms
Updated Branches:
  refs/heads/master 98c362188 -> d22a22f64


QPIDJMS-256 QPIDJMS-231 Better deal with deffered consumer close

Handle consumer close in TX better by moving the deferment handling to
the AmqpConsumer where close decide if it should hold until the TX has
completed and ensure that on deferred close that the consumer is drained
and prefetched messages are released back to the remote for redispatch.
Ensure that when a consumer is closed but deferred due to being in a
transaction or having pending delivered messages that are not yet ack'd
the client does not dispatch any more inbound messages to the consumer.


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/d22a22f6
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/d22a22f6
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/d22a22f6

Branch: refs/heads/master
Commit: d22a22f64df447152b78a575e5b2ce44c0b89087
Parents: 98c3621
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Feb 3 11:06:48 2017 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Feb 3 11:26:03 2017 -0500

----------------------------------------------------------------------
 .../qpid/jms/JmsLocalTransactionContext.java    |  82 +------
 .../org/apache/qpid/jms/JmsMessageConsumer.java |  41 +---
 .../qpid/jms/JmsNoTxTransactionContext.java     |  10 -
 .../apache/qpid/jms/JmsTransactionContext.java  |  13 --
 .../qpid/jms/JmsTransactionSynchronization.java |  65 ------
 .../jms/message/JmsInboundMessageDispatch.java  |   9 +
 .../qpid/jms/provider/amqp/AmqpConsumer.java    | 219 ++++++++++++-------
 .../integration/ConsumerIntegrationTest.java    |  46 +++-
 .../TransactionsIntegrationTest.java            |  70 +++---
 9 files changed, 242 insertions(+), 313 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d22a22f6/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
index b80e1bd..1e38d07 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
@@ -16,16 +16,13 @@
  */
 package org.apache.qpid.jms;
 
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import javax.jms.JMSException;
 import javax.jms.TransactionRolledBackException;
 
-import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
 import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
 import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
 import org.apache.qpid.jms.meta.JmsResourceId;
@@ -45,7 +42,6 @@ public class JmsLocalTransactionContext implements JmsTransactionContext {
 
     private static final Logger LOG = LoggerFactory.getLogger(JmsLocalTransactionContext.class);
 
-    private final List<JmsTransactionSynchronization> synchronizations = new ArrayList<JmsTransactionSynchronization>();
     private final Map<JmsResourceId, JmsResourceId> participants = new HashMap<JmsResourceId, JmsResourceId>();
     private final JmsSession session;
     private final JmsConnection connection;
@@ -116,20 +112,6 @@ public class JmsLocalTransactionContext implements JmsTransactionContext {
     }
 
     @Override
-    public void addSynchronization(JmsTransactionSynchronization sync) throws JMSException {
-        lock.writeLock().lock();
-        try {
-            if (sync.validate(this)) {
-                synchronizations.add(sync);
-            }
-        } catch (Exception e) {
-            throw JmsExceptionSupport.create(e);
-        } finally {
-            lock.writeLock().unlock();
-        }
-    }
-
-    @Override
     public boolean isInDoubt() {
         return transactionInfo != null ? transactionInfo.isInDoubt() : false;
     }
@@ -180,8 +162,7 @@ public class JmsLocalTransactionContext implements JmsTransactionContext {
                 }
                 throw new TransactionRolledBackException("Transaction failed and has been rolled back.");
             } else {
-                LOG.debug("Commit: {} syncCount: {}", transactionInfo.getId(),
-                          (synchronizations != null ? synchronizations.size() : 0));
+                LOG.debug("Commit: {}", transactionInfo.getId());
 
                 JmsTransactionId oldTransactionId = transactionInfo.getId();
                 try {
@@ -205,7 +186,6 @@ public class JmsLocalTransactionContext implements JmsTransactionContext {
                             LOG.trace("Local TX listener error ignored: {}", error);
                         }
                     }
-                    afterCommit();
                 } catch (JMSException cause) {
                     LOG.info("Commit failed for transaction: {}", oldTransactionId);
                     if (listener != null) {
@@ -215,7 +195,6 @@ public class JmsLocalTransactionContext implements JmsTransactionContext {
                             LOG.trace("Local TX listener error ignored: {}", error);
                         }
                     }
-                    afterRollback();
                     throw cause;
                 } finally {
                     LOG.trace("Commit starting new TX after commit completed.");
@@ -235,8 +214,7 @@ public class JmsLocalTransactionContext implements JmsTransactionContext {
     private void doRollback(boolean startNewTx) throws JMSException {
         lock.writeLock().lock();
         try {
-            LOG.debug("Rollback: {} syncCount: {}", transactionInfo.getId(),
-                      (synchronizations != null ? synchronizations.size() : 0));
+            LOG.debug("Rollback: {}", transactionInfo.getId());
             try {
                 connection.rollback(transactionInfo, new ProviderSynchronization() {
 
@@ -258,8 +236,6 @@ public class JmsLocalTransactionContext implements JmsTransactionContext {
                         LOG.trace("Local TX listener error ignored: {}", error);
                     }
                 }
-
-                afterRollback();
             } finally {
                 if (startNewTx) {
                     LOG.trace("Rollback starting new TX after rollback completed.");
@@ -375,58 +351,4 @@ public class JmsLocalTransactionContext implements JmsTransactionContext {
         JmsTransactionId transactionId = connection.getNextTransactionId();
         return new JmsTransactionInfo(session.getSessionId(), transactionId);
     }
-
-    /*
-     * Must be called with the write lock held to ensure the synchronizations list
-     * can be safely cleared.
-     */
-    private void afterRollback() throws JMSException {
-        if (synchronizations.isEmpty()) {
-            return;
-        }
-
-        Throwable firstException = null;
-        for (JmsTransactionSynchronization sync : synchronizations) {
-            try {
-                sync.afterRollback();
-            } catch (Throwable thrown) {
-                LOG.debug("Exception from afterRollback on " + sync, thrown);
-                if (firstException == null) {
-                    firstException = thrown;
-                }
-            }
-        }
-
-        synchronizations.clear();
-        if (firstException != null) {
-            throw JmsExceptionSupport.create(firstException);
-        }
-    }
-
-    /*
-     * Must be called with the write lock held to ensure the synchronizations list
-     * can be safely cleared.
-     */
-    private void afterCommit() throws JMSException {
-        if (synchronizations.isEmpty()) {
-            return;
-        }
-
-        Throwable firstException = null;
-        for (JmsTransactionSynchronization sync : synchronizations) {
-            try {
-                sync.afterCommit();
-            } catch (Throwable thrown) {
-                LOG.debug("Exception from afterCommit on " + sync, thrown);
-                if (firstException == null) {
-                    firstException = thrown;
-                }
-            }
-        }
-
-        synchronizations.clear();
-        if (firstException != null) {
-            throw JmsExceptionSupport.create(firstException);
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d22a22f6/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
index 0cca6ff..29446af 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
@@ -16,7 +16,6 @@
  */
 package org.apache.qpid.jms;
 
-import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
@@ -138,28 +137,7 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
     @Override
     public void close() throws JMSException {
         if (!closed.get()) {
-            session.getTransactionContext().addSynchronization(new JmsTransactionSynchronization() {
-
-                @Override
-                public boolean validate(JmsTransactionContext context) throws Exception {
-                    if (isBrowser() || !context.isActiveInThisContext(getConsumerId())) {
-                        doClose();
-                        return false;
-                    }
-
-                    return true;
-                }
-
-                @Override
-                public void afterCommit() throws Exception {
-                    doClose();
-                }
-
-                @Override
-                public void afterRollback() throws Exception {
-                    doClose();
-                }
-            });
+            doClose();
         }
     }
 
@@ -549,20 +527,17 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
     void suspendForRollback() throws JMSException {
         stop();
 
-        session.getConnection().stopResource(consumerInfo);
-    }
-
-    void resumeAfterRollback() throws JMSException {
-        if (!this.messageQueue.isEmpty()) {
-            List<JmsInboundMessageDispatch> drain = this.messageQueue.removeAll();
-            for (JmsInboundMessageDispatch envelope : drain) {
-                doAckReleased(envelope);
+        try {
+            session.getConnection().stopResource(consumerInfo);
+        } finally {
+            if (session.getTransactionContext().isActiveInThisContext(getConsumerId())) {
+                messageQueue.clear();
             }
-            drain.clear();
         }
+    }
 
+    void resumeAfterRollback() throws JMSException {
         start();
-
         startConsumerResource();
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d22a22f6/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsNoTxTransactionContext.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsNoTxTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsNoTxTransactionContext.java
index 86b6935..bb3e421 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsNoTxTransactionContext.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsNoTxTransactionContext.java
@@ -18,7 +18,6 @@ package org.apache.qpid.jms;
 
 import javax.jms.JMSException;
 
-import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
 import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
 import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
 import org.apache.qpid.jms.meta.JmsResourceId;
@@ -43,15 +42,6 @@ public class JmsNoTxTransactionContext implements JmsTransactionContext {
     }
 
     @Override
-    public void addSynchronization(JmsTransactionSynchronization sync) throws JMSException {
-        try {
-            sync.validate(this);
-        } catch (Exception e) {
-            throw JmsExceptionSupport.create(e);
-        }
-    }
-
-    @Override
     public boolean isInDoubt() {
         return false;
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d22a22f6/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionContext.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionContext.java
index d8e6259..d076000 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionContext.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionContext.java
@@ -61,19 +61,6 @@ public interface JmsTransactionContext {
     void send(JmsConnection connection, JmsOutboundMessageDispatch envelope) throws JMSException;
 
     /**
-     * Adds the given Transaction synchronization to the current list. The
-     * registered synchronization will be notified of various event points
-     * in the lifetime of a transaction such as before and after commit or
-     * rollback.
-     *
-     * @param sync
-     *        the transaction synchronization to add.
-     *
-     * @throws JMSException if an error occurs during the send.
-     */
-    void addSynchronization(JmsTransactionSynchronization sync) throws JMSException;
-
-    /**
      * @return if the currently transaction has been marked as being in an unknown state.
      */
     boolean isInDoubt();

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d22a22f6/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionSynchronization.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionSynchronization.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionSynchronization.java
deleted file mode 100644
index 782e655..0000000
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionSynchronization.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.jms;
-
-/**
- * Interface for JmsResources that are part of a running transaction to use
- * to register for notifications of transaction commit and rollback in order
- * to execute specific actions.
- *
- * One such use of this might be for a consumer to register a synchronization
- * when it is closed while it's parent session is still operating inside a
- * transaction.  The Consumer can close itself following the commit or rollback
- * of the running Transaction.
- */
-public abstract class JmsTransactionSynchronization {
-
-    /**
-     * Called once before the synchronization is added to the set
-     * of synchronizations held for a pending TX.  The caller can
-     * check TX state and react accordingly.  If the resource finds
-     * that is does not need to be added to the TX it can return false
-     * to indicate such.
-     *
-     * @param context
-     *        reference to the transaction context.
-     *
-     * @return true if the synchronization should be added to the TX.
-     *
-     * @throws Exception if an error occurs during the event.
-     */
-    public boolean validate(JmsTransactionContext context) throws Exception {
-        return true;
-    }
-
-    /**
-     * Called after a successful commit of the current Transaction.
-     *
-     * @throws Exception if an error occurs during the event.
-     */
-    public void afterCommit() throws Exception {
-    }
-
-    /**
-     * Called after the current transaction has been rolled back either
-     * by a call to rollback or by a failure to complete a commit operation.
-     *
-     * @throws Exception if an error occurs during the event.
-     */
-    public void afterRollback() throws Exception {
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d22a22f6/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java
index c038519..fbb5a1d 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java
@@ -29,6 +29,7 @@ public class JmsInboundMessageDispatch extends JmsAbstractResourceId {
     private final long sequence;
     private JmsMessage message;
     private boolean enqueueFirst;
+    private boolean delivered;
 
     private transient String stringView;
 
@@ -64,6 +65,14 @@ public class JmsInboundMessageDispatch extends JmsAbstractResourceId {
         return enqueueFirst;
     }
 
+    public boolean isDelivered() {
+        return delivered;
+    }
+
+    public void setDelivered(boolean delivered) {
+        this.delivered = delivered;
+    }
+
     public int getRedeliveryCount() {
         int redeliveryCount = 0;
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d22a22f6/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index b88bc18..28f2ba5 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -22,12 +22,8 @@ import static org.apache.qpid.jms.provider.amqp.AmqpSupport.REJECTED;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
-import java.util.LinkedHashMap;
 import java.util.ListIterator;
-import java.util.Map;
 import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.atomic.AtomicLong;
 
 import javax.jms.JMSException;
 
@@ -41,6 +37,7 @@ import org.apache.qpid.jms.meta.JmsConsumerInfo;
 import org.apache.qpid.jms.provider.AsyncResult;
 import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
 import org.apache.qpid.jms.provider.ProviderListener;
+import org.apache.qpid.jms.provider.WrappedAsyncResult;
 import org.apache.qpid.jms.provider.amqp.message.AmqpCodec;
 import org.apache.qpid.jms.util.IOExceptionSupport;
 import org.apache.qpid.proton.amqp.Binary;
@@ -65,13 +62,11 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
     private static final int INITIAL_BUFFER_CAPACITY = 1024 * 128;
 
     protected final AmqpSession session;
-    protected final Map<JmsInboundMessageDispatch, Delivery> delivered = new LinkedHashMap<JmsInboundMessageDispatch, Delivery>();
-    protected boolean presettle;
     protected AsyncResult stopRequest;
     protected AsyncResult pullRequest;
     protected final ByteBuf incomingBuffer = Unpooled.buffer(INITIAL_BUFFER_CAPACITY);
-    protected final AtomicLong incomingSequence = new AtomicLong(0);
-
+    protected long incomingSequence;
+    protected long deliveredCount;
     protected boolean deferredClose;
 
     public AmqpConsumer(AmqpSession session, JmsConsumerInfo info, Receiver receiver) {
@@ -85,8 +80,8 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
         // If we have pending deliveries we remain open to allow for ACK or for a
         // pending transaction that this consumer is active in to complete.
         if (shouldDeferClose()) {
-            request.onSuccess();
             deferredClose = true;
+            stop(new StopAndReleaseRequest(request));
         } else {
             super.close(request);
         }
@@ -213,31 +208,42 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
      */
     public void acknowledge(ACK_TYPE ackType) {
         LOG.trace("Session Acknowledge for consumer {} with ack type {}", getResourceInfo().getId(), ackType);
-        for (Delivery delivery : delivered.values()) {
-            switch (ackType) {
-                case ACCEPTED:
-                    delivery.disposition(Accepted.getInstance());
-                    break;
-                case RELEASED:
-                    delivery.disposition(Released.getInstance());
-                    break;
-                case REJECTED:
-                    delivery.disposition(REJECTED);
-                    break;
-                case MODIFIED_FAILED:
-                    delivery.disposition(MODIFIED_FAILED);
-                    break;
-                case MODIFIED_FAILED_UNDELIVERABLE:
-                    delivery.disposition(MODIFIED_FAILED_UNDELIVERABLE);
-                    break;
-                default:
-                    throw new IllegalArgumentException("Invalid acknowledgement type specified: " + ackType);
+        Delivery delivery = getEndpoint().head();
+        while (delivery != null) {
+            Delivery current = delivery;
+            delivery = delivery.next();
+
+            if (!(current.getContext() instanceof JmsInboundMessageDispatch)) {
+                LOG.debug("{} Found incomplete delivery with no context during recover processing", AmqpConsumer.this);
+                continue;
             }
 
-            delivery.settle();
-        }
+            JmsInboundMessageDispatch envelope = (JmsInboundMessageDispatch) current.getContext();
+            if (envelope.isDelivered()) {
+                switch (ackType) {
+                    case ACCEPTED:
+                        current.disposition(Accepted.getInstance());
+                        break;
+                    case RELEASED:
+                        current.disposition(Released.getInstance());
+                        break;
+                    case REJECTED:
+                        current.disposition(REJECTED);
+                        break;
+                    case MODIFIED_FAILED:
+                        current.disposition(MODIFIED_FAILED);
+                        break;
+                    case MODIFIED_FAILED_UNDELIVERABLE:
+                        current.disposition(MODIFIED_FAILED_UNDELIVERABLE);
+                        break;
+                    default:
+                        throw new IllegalArgumentException("Invalid acknowledgement type specified: " + ackType);
+                }
 
-        delivered.clear();
+                current.settle();
+                deliveredCount--;
+            }
+        }
 
         tryCompleteDeferredClose();
     }
@@ -260,28 +266,25 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
         if (envelope.getProviderHint() instanceof Delivery) {
             delivery = (Delivery) envelope.getProviderHint();
         } else {
-            delivery = delivered.get(envelope);
-            if (delivery == null) {
-                LOG.warn("Received Ack for unknown message: {}", envelope);
-                return;
-            }
+            LOG.warn("Received Ack for unknown message: {}", envelope);
+            return;
         }
 
         if (ackType.equals(ACK_TYPE.DELIVERED)) {
             LOG.debug("Delivered Ack of message: {}", envelope);
-            if (!delivery.isSettled()) {
-                delivered.put(envelope, delivery);
-                delivery.setDefaultDeliveryState(MODIFIED_FAILED);
-            }
+            deliveredCount++;
+            envelope.setDelivered(true);
+            delivery.setDefaultDeliveryState(MODIFIED_FAILED);
             sendFlowIfNeeded();
+            return;
         } else if (ackType.equals(ACK_TYPE.ACCEPTED)) {
             // A Consumer may not always send a DELIVERED ack so we need to
             // check to ensure we don't add too much credit to the link.
-            if (delivery.isSettled() || delivered.remove(envelope) == null) {
+            if (!envelope.isDelivered()) {
                 sendFlowIfNeeded();
             }
             LOG.debug("Accepted Ack of message: {}", envelope);
-            if (!delivery.isSettled()) {
+            if (!delivery.remotelySettled()) {
                 if (session.isTransacted() && !getResourceInfo().isBrowser()) {
 
                     if (session.isTransactionFailed()) {
@@ -302,6 +305,8 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
                     delivery.disposition(Accepted.getInstance());
                     delivery.settle();
                 }
+            } else {
+                delivery.settle();
             }
         } else if (ackType.equals(ACK_TYPE.MODIFIED_FAILED_UNDELIVERABLE)) {
             deliveryFailedUndeliverable(delivery);
@@ -312,6 +317,11 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
             delivery.settle();
         } else {
             LOG.warn("Unsupported Ack Type for message: {}", envelope);
+            return;
+        }
+
+        if (envelope.isDelivered()) {
+            deliveredCount--;
         }
 
         tryCompleteDeferredClose();
@@ -355,19 +365,34 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
      */
     public void recover() throws Exception {
         LOG.debug("Session Recover for consumer: {}", getResourceInfo().getId());
-        Collection<JmsInboundMessageDispatch> values = delivered.keySet();
-        ArrayList<JmsInboundMessageDispatch> envelopes = new ArrayList<JmsInboundMessageDispatch>(values);
-        ListIterator<JmsInboundMessageDispatch> reverseIterator = envelopes.listIterator(values.size());
 
-        while (reverseIterator.hasPrevious()) {
-            JmsInboundMessageDispatch envelope = reverseIterator.previous();
-            envelope.getMessage().getFacade().setRedeliveryCount(
-                envelope.getMessage().getFacade().getRedeliveryCount() + 1);
-            envelope.setEnqueueFirst(true);
-            deliver(envelope);
+        ArrayList<JmsInboundMessageDispatch> redispatchList = new ArrayList<JmsInboundMessageDispatch>();
+
+        Delivery delivery = getEndpoint().head();
+        while (delivery != null) {
+            Delivery current = delivery;
+            delivery = delivery.next();
+
+            if (!(current.getContext() instanceof JmsInboundMessageDispatch)) {
+                LOG.debug("{} Found incomplete delivery with no context during recover processing", AmqpConsumer.this);
+                continue;
+            }
+
+            JmsInboundMessageDispatch envelope = (JmsInboundMessageDispatch) current.getContext();
+            if (envelope.isDelivered()) {
+                envelope.getMessage().getFacade().setRedeliveryCount(
+                    envelope.getMessage().getFacade().getRedeliveryCount() + 1);
+                envelope.setEnqueueFirst(true);
+                envelope.setDelivered(false);
+
+                redispatchList.add(envelope);
+            }
         }
 
-        delivered.clear();
+        ListIterator<JmsInboundMessageDispatch> reverseIterator = redispatchList.listIterator(redispatchList.size());
+        while (reverseIterator.hasPrevious()) {
+            deliver(reverseIterator.previous());
+        }
     }
 
     /**
@@ -458,13 +483,6 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
     private boolean processDelivery(Delivery incoming) throws Exception {
         incoming.setDefaultDeliveryState(Released.getInstance());
 
-        // If we are awaiting to close for some conditions to be met then we don't
-        // need to decode or dispatch the message.
-        if (deferredClose) {
-            getEndpoint().advance();
-            return true;
-        }
-
         JmsMessage message = null;
         try {
             message = AmqpCodec.decodeMessage(this, unwrapIncomingMessage(incoming)).asJmsMessage();
@@ -504,7 +522,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
     }
 
     protected long getNextIncomingSequenceNumber() {
-        return incomingSequence.incrementAndGet();
+        return ++incomingSequence;
     }
 
     @Override
@@ -532,18 +550,10 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
         return this.getResourceInfo().getDestination();
     }
 
-    public boolean isPresettle() {
-        return presettle || getResourceInfo().isBrowser();
-    }
-
     public boolean isStopping() {
         return stopRequest != null;
     }
 
-    public void setPresettle(boolean presettle) {
-        this.presettle = presettle;
-    }
-
     public int getDrainTimeout() {
         return session.getProvider().getDrainTimeout();
     }
@@ -562,12 +572,14 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
     }
 
     protected void deliver(JmsInboundMessageDispatch envelope) throws Exception {
-        ProviderListener listener = session.getProvider().getProviderListener();
-        if (listener != null) {
-            LOG.debug("Dispatching received message: {}", envelope);
-            listener.onInboundMessage(envelope);
-        } else {
-            LOG.error("Provider listener is not set, message will be dropped: {}", envelope);
+        if (!deferredClose) {
+            ProviderListener listener = session.getProvider().getProviderListener();
+            if (listener != null) {
+                LOG.debug("Dispatching received message: {}", envelope);
+                listener.onInboundMessage(envelope);
+            } else {
+                LOG.error("Provider listener is not set, message will be dropped: {}", envelope);
+            }
         }
     }
 
@@ -595,9 +607,12 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
     }
 
     public void postCommit() {
+        tryCompleteDeferredClose();
     }
 
     public void postRollback() {
+        releasePrefetch();
+        tryCompleteDeferredClose();
     }
 
     @Override
@@ -630,18 +645,64 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
     }
 
     private boolean shouldDeferClose() {
-        return !delivered.isEmpty();
+        if (getSession().isTransacted() && getSession().getTransactionContext().isInTransaction(this)) {
+            return true;
+        }
+
+        if (deliveredCount > 0) {
+            return true;
+        }
+
+        return false;
     }
 
     private void tryCompleteDeferredClose() {
-        if (deferredClose && delivered.isEmpty()) {
-            close(new DeferredCloseRequest());
+        if (deferredClose && deliveredCount == 0) {
+            super.close(new DeferredCloseRequest());
+        }
+    }
+
+    private void releasePrefetch() {
+        Delivery delivery = getEndpoint().head();
+
+        while (delivery != null) {
+            Delivery current = delivery;
+            delivery = delivery.next();
+
+            if (!(current.getContext() instanceof JmsInboundMessageDispatch)) {
+                LOG.debug("{} Found incomplete delivery with no context during release processing", AmqpConsumer.this);
+                continue;
+            }
+
+            JmsInboundMessageDispatch envelope = (JmsInboundMessageDispatch) current.getContext();
+            if (!envelope.isDelivered()) {
+                current.disposition(Released.getInstance());
+                current.settle();
+            }
+        }
+
+    }
+
+    //----- Inner class used to report on deferred close ---------------------//
+
+    private final class StopAndReleaseRequest extends WrappedAsyncResult {
+
+        public StopAndReleaseRequest(AsyncResult closeRequest) {
+            super(closeRequest);
+        }
+
+        @Override
+        public void onSuccess() {
+            // Now that the link is drained we can release all the prefetched
+            // messages so that the remote can send them elsewhere.
+            releasePrefetch();
+            super.onSuccess();
         }
     }
 
     //----- Inner class used to report on deferred close ---------------------//
 
-    protected final class DeferredCloseRequest implements AsyncResult {
+    private final class DeferredCloseRequest implements AsyncResult {
 
         @Override
         public void onFailure(Throwable result) {
@@ -662,7 +723,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
 
     //----- Inner class used in message pull operations ----------------------//
 
-    protected static final class ScheduledRequest implements AsyncResult {
+    private static final class ScheduledRequest implements AsyncResult {
 
         private final ScheduledFuture<?> sheduledTask;
         private final AsyncResult origRequest;

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d22a22f6/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
index 00af430..bc1b9a2 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
@@ -1313,7 +1313,10 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase {
     @Test(timeout=20000)
     public void testCloseClientAckAsyncConsumerCanStillAckMessages() throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
-            Connection connection = testFixture.establishConnecton(testPeer);
+            final int DEFAULT_PREFETCH = 100;
+
+            // Set to fixed known value to reduce breakage if defaults are changed.
+            Connection connection = testFixture.establishConnecton(testPeer, "jms.prefetchPolicy.all=" + DEFAULT_PREFETCH);
             connection.start();
 
             testPeer.expectBegin();
@@ -1341,6 +1344,9 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase {
 
             assertTrue("Did not consume all messages", consumedLatch.await(10, TimeUnit.SECONDS));
 
+            // Expect the client to then drain off all credit from the link.
+            testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(DEFAULT_PREFETCH - messageCount)));
+
             // Close should be deferred as these messages were delivered but not acknowledged.
             consumer.close();
 
@@ -1366,7 +1372,10 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase {
     @Test(timeout=20000)
     public void testCloseClientAckSyncConsumerCanStillAckMessages() throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
-            Connection connection = testFixture.establishConnecton(testPeer);
+            final int DEFAULT_PREFETCH = 100;
+
+            // Set to fixed known value to reduce breakage if defaults are changed.
+            Connection connection = testFixture.establishConnecton(testPeer, "jms.prefetchPolicy.all=" + DEFAULT_PREFETCH);
             connection.start();
 
             testPeer.expectBegin();
@@ -1375,7 +1384,7 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase {
             Queue queue = session.createQueue(getTestName());
 
             int messageCount = 5;
-            int consumeCount = 4;
+            int consumeCount = 3;
 
             testPeer.expectReceiverAttach();
             testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), messageCount);
@@ -1390,6 +1399,14 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase {
                 assertTrue(receivedMessage instanceof TextMessage);
             }
 
+            // Expect the client to then drain off all credit from the link.
+            testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(DEFAULT_PREFETCH - messageCount)));
+
+            // Expect the prefetched messages to be released for dispatch elsewhere.
+            for (int i = 1; i <= messageCount - consumeCount; i++) {
+                testPeer.expectDisposition(true, new ReleasedMatcher());
+            }
+
             // Close should be deferred as these messages were delivered but not acknowledged.
             consumer.close();
 
@@ -1415,7 +1432,10 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase {
     @Test(timeout=20000)
     public void testConsumerWithDeferredCloseActsAsClosed() throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
-            Connection connection = testFixture.establishConnecton(testPeer);
+            final int DEFAULT_PREFETCH = 100;
+
+            // Set to fixed known value to reduce breakage if defaults are changed.
+            Connection connection = testFixture.establishConnecton(testPeer, "jms.prefetchPolicy.all=" + DEFAULT_PREFETCH);
             connection.start();
 
             testPeer.expectBegin();
@@ -1440,6 +1460,14 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase {
                 assertTrue(receivedMessage instanceof TextMessage);
             }
 
+            // Expect the client to then drain off all credit from the link.
+            testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(DEFAULT_PREFETCH - messageCount)));
+
+            // Expect the prefetched messages to be released for dispatch elsewhere.
+            for (int i = 1; i <= messageCount - consumeCount; i++) {
+                testPeer.expectDisposition(true, new ReleasedMatcher());
+            }
+
             // Close should be deferred as these messages were delivered but not acknowledged.
             consumer.close();
 
@@ -1483,8 +1511,11 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
             final CountDownLatch errorLatch = new CountDownLatch(1);
 
-            JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
-            connection.setCloseTimeout(500);
+            final int DEFAULT_PREFETCH = 100;
+
+            // Set to fixed known value to reduce breakage if defaults are changed.
+            Connection connection = testFixture.establishConnecton(testPeer,
+                "jms.closeTimeout=500&jms.prefetchPolicy.all=" + DEFAULT_PREFETCH);
             connection.setExceptionListener(new ExceptionListener() {
 
                 @Override
@@ -1510,6 +1541,9 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase {
             assertNotNull(receivedMessage);
             assertTrue(receivedMessage instanceof TextMessage);
 
+            // Expect the client to then drain off all credit from the link.
+            testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(DEFAULT_PREFETCH - 1)));
+
             // Close should be deferred as these messages were delivered but not acknowledged.
             consumer.close();
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d22a22f6/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
index f9f9beb..bd8d997 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
@@ -352,7 +352,10 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
 
     private void doCommitTransactedSessionWithConsumerTestImpl(int transferCount, int consumeCount, boolean closeConsumer, boolean closeBeforeCommit) throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
-            Connection connection = testFixture.establishConnecton(testPeer);
+            final int DEFAULT_PREFETCH = 100;
+
+            // Set to fixed known value to reduce breakage if defaults are changed.
+            Connection connection = testFixture.establishConnecton(testPeer, "jms.prefetchPolicy.all=" + DEFAULT_PREFETCH);
             connection.start();
 
             testPeer.expectBegin();
@@ -387,13 +390,11 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
                 assertTrue(receivedMessage instanceof TextMessage);
             }
 
-            // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
-            // and reply with accepted and settled disposition to indicate the commit succeeded
-            testPeer.expectDischarge(txnId, false);
-
             // Expect the consumer to close now
             if (closeConsumer && closeBeforeCommit) {
-                testPeer.expectDetach(true, true, true);
+
+                // Expect the client to then drain off all credit from the link.
+                testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(DEFAULT_PREFETCH - transferCount)));
 
                 // Expect the messages that were not consumed to be released
                 int unconsumed = transferCount - consumeCount;
@@ -401,7 +402,18 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
                     testPeer.expectDispositionThatIsReleasedAndSettled();
                 }
 
+                // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
+                // and reply with accepted and settled disposition to indicate the commit succeeded
+                testPeer.expectDischarge(txnId, false);
+
+                // Now the deferred close should be performed.
+                testPeer.expectDetach(true, true, true);
+
                 messageConsumer.close();
+            } else {
+                // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
+                // and reply with accepted and settled disposition to indicate the commit succeeded
+                testPeer.expectDischarge(txnId, false);
             }
 
             // Then expect an unsettled 'declare' transfer to the txn coordinator, and
@@ -653,6 +665,7 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
 
     private void doRollbackTransactedSessionWithConsumerTestImpl(int transferCount, int consumeCount, boolean closeConsumer) throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            // Set to fixed known value to reduce breakage if defaults are changed.
             Connection connection = testFixture.establishConnecton(testPeer);
             connection.start();
 
@@ -691,25 +704,28 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
             // Expect the consumer to be 'stopped' prior to rollback by issuing a 'drain'
             testPeer.expectLinkFlow(true, true, greaterThan(UnsignedInteger.ZERO));
 
-            // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
-            // and reply with accepted and settled disposition to indicate the rollback succeeded
-            testPeer.expectDischarge(txnId, true);
-
             if (closeConsumer) {
-                testPeer.expectDetach(true, true, true);
+
+                // Expect the messages that were not consumed to be released
                 int unconsumed = transferCount - consumeCount;
                 for (int i = 1; i <= unconsumed; i++) {
-                    testPeer.expectDisposition(true, new ReleasedMatcher());
+                    testPeer.expectDispositionThatIsReleasedAndSettled();
                 }
 
+                // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
+                // and reply with accepted and settled disposition to indicate the rollback succeeded
+                testPeer.expectDischarge(txnId, true);
+
+                // Now the deferred close should be performed.
+                testPeer.expectDetach(true, true, true);
+
                 messageConsumer.close();
+            } else {
+                // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
+                // and reply with accepted and settled disposition to indicate the rollback succeeded
+                testPeer.expectDischarge(txnId, true);
             }
 
-            // Then expect an unsettled 'declare' transfer to the txn coordinator, and
-            // reply with a declared disposition state containing the txnId.
-            txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
-            testPeer.expectDeclare(txnId);
-
             if (!closeConsumer) {
                 // Expect the messages that were not consumed to be released
                 int unconsumed = transferCount - consumeCount;
@@ -717,8 +733,18 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
                     testPeer.expectDisposition(true, new ReleasedMatcher());
                 }
 
+                // Then expect an unsettled 'declare' transfer to the txn coordinator, and
+                // reply with a declared disposition state containing the txnId.
+                txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
+                testPeer.expectDeclare(txnId);
+
                 // Expect the consumer to be 'started' again as rollback completes
                 testPeer.expectLinkFlow(false, false, greaterThan(UnsignedInteger.ZERO));
+            } else {
+                // Then expect an unsettled 'declare' transfer to the txn coordinator, and
+                // reply with a declared disposition state containing the txnId.
+                txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
+                testPeer.expectDeclare(txnId);
             }
 
             testPeer.expectDischarge(txnId, true);
@@ -788,11 +814,6 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
             txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
             testPeer.expectDeclare(txnId);
 
-            // Expect the messages that were not consumed to be released
-            for (int i = 1; i <= messageCount; i++) {
-                testPeer.expectDisposition(true, new ReleasedMatcher());
-            }
-
             // Expect the consumer to be 'started' again as rollback completes
             testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(messageCount)));
 
@@ -868,11 +889,6 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
             txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
             testPeer.expectDeclare(txnId);
 
-            // Expect the messages that were not consumed to be released
-            for (int i = 1; i <= messageCount; i++) {
-                testPeer.expectDisposition(true, new ReleasedMatcher());
-            }
-
             // Expect the consumer to be 'started' again as rollback completes
             testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(messageCount)));
             testPeer.expectDischarge(txnId, true);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org