You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2017/08/14 13:46:14 UTC

qpid-broker-j git commit: QPID-7887: [Java Broker] Message conversion error handling

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 54e9e100a -> 10699183d


QPID-7887: [Java Broker] Message conversion error handling


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/10699183
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/10699183
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/10699183

Branch: refs/heads/master
Commit: 10699183d3874db216f01a8f1d9a5cd07f21c680
Parents: 54e9e10
Author: Lorenz Quack <lq...@apache.org>
Authored: Mon Aug 14 12:06:00 2017 +0100
Committer: Keith Wall <kw...@apache.org>
Committed: Mon Aug 14 14:45:53 2017 +0100

----------------------------------------------------------------------
 .../server/consumer/AbstractConsumerTarget.java |  80 ++++++-
 .../qpid/server/message/MessageSource.java      |   6 +
 .../org/apache/qpid/server/model/Queue.java     |   9 +
 .../apache/qpid/server/queue/AbstractQueue.java |  10 +
 .../security/TrustStoreMessageSource.java       |   3 +-
 .../AbstractSystemMessageSource.java            |   6 +
 .../consumer/AbstractConsumerTargetTest.java    | 216 +++++++++++++++++++
 .../protocol/v0_10/ConsumerTarget_0_10.java     |  20 --
 .../qpid/server/protocol/v0_8/AMQChannel.java   |  14 +-
 .../protocol/v0_8/AMQPConnection_0_8Impl.java   |   3 +-
 .../protocol/v0_8/ClientDeliveryMethod.java     |   3 +-
 .../protocol/v0_8/ConsumerTarget_0_8.java       | 136 ++++++------
 .../protocol/v0_8/ProtocolOutputConverter.java  |   5 +-
 .../v0_8/ProtocolOutputConverterImpl.java       |  47 +---
 .../protocol/v1_0/ConsumerTarget_1_0.java       |  24 ---
 .../server/management/amqp/ManagementNode.java  |   6 +
 .../management/amqp/ProxyMessageSource.java     |   6 +
 17 files changed, 416 insertions(+), 178 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/10699183/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java b/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
index 09768b3..aacf022 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
@@ -25,22 +25,32 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.messages.SubscriptionMessages;
+import org.apache.qpid.server.message.MessageContainer;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.MessageInstanceConsumer;
+import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.model.Consumer;
-import org.apache.qpid.server.message.MessageContainer;
+import org.apache.qpid.server.protocol.converter.MessageConversionException;
 import org.apache.qpid.server.queue.SuspendedConsumerLoggingTicker;
+import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.transport.AMQPConnection;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
 
 public abstract class AbstractConsumerTarget<T extends AbstractConsumerTarget<T>> implements ConsumerTarget<T>
 {
+    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractConsumerTarget.class);
+
     private static final LogSubject MULTI_QUEUE_LOG_SUBJECT = new LogSubject()
     {
         @Override
@@ -49,6 +59,8 @@ public abstract class AbstractConsumerTarget<T extends AbstractConsumerTarget<T>
             return "[(** Multi-Queue **)] ";
         }
     };
+    protected final AtomicLong _unacknowledgedBytes = new AtomicLong(0);
+    protected final AtomicLong _unacknowledgedCount = new AtomicLong(0);
     private final AtomicReference<State> _state = new AtomicReference<>(State.OPEN);
 
     private final boolean _isMultiQueue;
@@ -87,6 +99,12 @@ public abstract class AbstractConsumerTarget<T extends AbstractConsumerTarget<T>
     }
 
     @Override
+    public void acquisitionRemoved(final MessageInstance node)
+    {
+
+    }
+
+    @Override
     public boolean isMultiQueue()
     {
         return _isMultiQueue;
@@ -214,6 +232,18 @@ public abstract class AbstractConsumerTarget<T extends AbstractConsumerTarget<T>
         }
     }
 
+    @Override
+    public long getUnacknowledgedMessages()
+    {
+        return _unacknowledgedCount.longValue();
+    }
+
+    @Override
+    public long getUnacknowledgedBytes()
+    {
+        return _unacknowledgedBytes.longValue();
+    }
+
     protected abstract void doSend(final MessageInstanceConsumer consumer, MessageInstance entry, boolean batch);
 
 
@@ -249,6 +279,54 @@ public abstract class AbstractConsumerTarget<T extends AbstractConsumerTarget<T>
             {
                 send(consumer, entry, false);
             }
+            catch (MessageConversionException mce)
+            {
+                restoreCredit(entry.getMessage());
+                final TransactionLogResource owningResource = entry.getOwningResource();
+                if (owningResource instanceof MessageSource)
+                {
+                    final MessageSource.MessageConversionExceptionHandlingPolicy handlingPolicy =
+                            ((MessageSource) owningResource).getMessageConversionExceptionHandlingPolicy();
+                    switch(handlingPolicy)
+                    {
+                        case CLOSE:
+                            entry.release(consumer);
+                            throw new ConnectionScopedRuntimeException(String.format(
+                                    "Unable to convert message %s for this consumer",
+                                    entry.getMessage()), mce);
+                        case ROUTE_TO_ALTERNATE:
+                            if (consumer.acquires())
+                            {
+                                int enqueues = entry.routeToAlternate(null, null);
+                                if (enqueues == 0)
+                                {
+                                    LOGGER.info("Failed to convert message {} for this consumer because '{}'."
+                                                + "  Message discarded.", entry.getMessage(), mce.getMessage());
+
+                                }
+                                else
+                                {
+                                    LOGGER.info("Failed to convert message {} for this consumer because '{}'."
+                                                + "  Message routed to alternate.", entry.getMessage(), mce.getMessage());
+                                }
+                            }
+                            else
+                            {
+                                LOGGER.info("Failed to convert message {} for this browser because '{}'."
+                                            + "  Message skipped.", entry.getMessage(), mce.getMessage());
+                            }
+                            break;
+                        default:
+                            throw new ServerScopedRuntimeException("Unrecognised policy " + handlingPolicy);
+                    }
+                }
+                else
+                {
+                    throw new ConnectionScopedRuntimeException(String.format(
+                            "Unable to convert message %s for this consumer",
+                            entry.getMessage()), mce);
+                }
+            }
             finally
             {
                 if (messageContainer.getMessageReference() != null)

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/10699183/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java b/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
index e3d380d..9446be3 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
@@ -90,5 +90,11 @@ public interface MessageSource extends TransactionLogResource, MessageNode
         }
     }
 
+    enum MessageConversionExceptionHandlingPolicy
+    {
+        CLOSE,
+        ROUTE_TO_ALTERNATE
+    }
 
+    MessageConversionExceptionHandlingPolicy getMessageConversionExceptionHandlingPolicy();
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/10699183/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
index cd84c3f..3e88324 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
@@ -121,6 +121,11 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>,
                           + "none is explicitly set")
     String DEFAULT_ENSURE_NON_DESTRUCTIVE_CONSUMERS = "false";
 
+    String MESSAGE_CONVERSION_EXCEPTION_HANDLING_POLICY = "qpid.queue.messageConversion.exceptionHandlingPolicy";
+
+    @ManagedContextDefault( name = MESSAGE_CONVERSION_EXCEPTION_HANDLING_POLICY,
+            description = "The behaviour of consumer if it tries to consumer a messages that it cannot convert.")
+    MessageConversionExceptionHandlingPolicy DEFAULT_MESSAGE_CONVERSION_EXCEPTION_HANDLING_POLICY = MessageConversionExceptionHandlingPolicy.CLOSE;
 
     @SuppressWarnings("unused")
     @ManagedAttribute( defaultValue = "${queue.defaultEnsureNonDestructiveConsumers}" )
@@ -175,6 +180,10 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>,
     @DerivedAttribute
     boolean isQueueFlowStopped();
 
+    @Override
+    @DerivedAttribute
+    MessageConversionExceptionHandlingPolicy getMessageConversionExceptionHandlingPolicy();
+
     @SuppressWarnings("unused")
     @ManagedContextDefault( name = "queue.alertThresholdMessageAge")
     long DEFAULT_ALERT_THRESHOLD_MESSAGE_AGE = 0L;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/10699183/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 0e72802..a6e7386 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -92,6 +92,7 @@ import org.apache.qpid.server.message.MessageInfoImpl;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.MessageSender;
+import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.message.RejectType;
 import org.apache.qpid.server.message.RoutingResult;
 import org.apache.qpid.server.message.ServerMessage;
@@ -269,6 +270,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
     private volatile OverflowPolicyHandler _postEnqueueOverflowPolicyHandler;
     private long _flowToDiskThreshold;
     private volatile MessageDestination _alternateBindingDestination;
+    private volatile MessageConversionExceptionHandlingPolicy _messageConversionExceptionHandlingPolicy;
 
     private interface HoldMethod
     {
@@ -503,6 +505,8 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
         }
 
         _mimeTypeToFileExtension = getContextValue(Map.class, MAP_OF_STRING_STRING, MIME_TYPE_TO_FILE_EXTENSION);
+        _messageConversionExceptionHandlingPolicy = getContextValue(MessageConversionExceptionHandlingPolicy.class, MESSAGE_CONVERSION_EXCEPTION_HANDLING_POLICY);
+
         _flowToDiskThreshold = getAncestor(Broker.class).getFlowToDiskThreshold();
 
         if(_defaultFilters != null)
@@ -3542,6 +3546,12 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
         }
     }
 
+    @Override
+    public MessageConversionExceptionHandlingPolicy getMessageConversionExceptionHandlingPolicy()
+    {
+        return _messageConversionExceptionHandlingPolicy;
+    }
+
     private void validateOrCreateAlternateBinding(final Queue<?> queue, final boolean mayCreate)
     {
         Object value = queue.getAttribute(ALTERNATE_BINDING);

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/10699183/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java b/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java
index 6e7cb94..4643eb5 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java
@@ -38,7 +38,6 @@ import org.slf4j.LoggerFactory;
 import org.apache.qpid.server.consumer.ConsumerOption;
 import org.apache.qpid.server.consumer.ConsumerTarget;
 import org.apache.qpid.server.filter.FilterManager;
-import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.message.internal.InternalMessage;
 import org.apache.qpid.server.message.internal.InternalMessageHeader;
@@ -49,7 +48,7 @@ import org.apache.qpid.server.model.TrustStore;
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.virtualhost.AbstractSystemMessageSource;
 
-public class TrustStoreMessageSource extends AbstractSystemMessageSource implements MessageSource
+public class TrustStoreMessageSource extends AbstractSystemMessageSource
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(TrustStoreMessageSource.class);
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/10699183/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
index df9f0fe..b77e2a4 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
@@ -109,6 +109,12 @@ public abstract class AbstractSystemMessageSource implements MessageSource
         return true;
     }
 
+    @Override
+    public MessageConversionExceptionHandlingPolicy getMessageConversionExceptionHandlingPolicy()
+    {
+        return MessageConversionExceptionHandlingPolicy.CLOSE;
+    }
+
     protected class Consumer<T extends ConsumerTarget> implements MessageInstanceConsumer<T>
     {
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/10699183/broker-core/src/test/java/org/apache/qpid/server/consumer/AbstractConsumerTargetTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/consumer/AbstractConsumerTargetTest.java b/broker-core/src/test/java/org/apache/qpid/server/consumer/AbstractConsumerTargetTest.java
new file mode 100644
index 0000000..202d5cc
--- /dev/null
+++ b/broker-core/src/test/java/org/apache/qpid/server/consumer/AbstractConsumerTargetTest.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.consumer;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.qpid.server.message.MessageContainer;
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
+import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.Consumer;
+import org.apache.qpid.server.protocol.converter.MessageConversionException;
+import org.apache.qpid.server.session.AMQPSession;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.transport.AMQPConnection;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+public class AbstractConsumerTargetTest extends QpidTestCase
+{
+
+    TestAbstractConsumerTarget _consumerTarget;
+    private Consumer _consumer;
+    private MessageSource _messageSource;
+    AMQPConnection _connection = mock(AMQPConnection.class);
+    private MessageInstance _messageInstance;
+
+    @Override
+    public void setUp() throws Exception
+    {
+        super.setUp();
+        when(_connection.getContextValue(eq(Long.class),
+                                         eq(Consumer.SUSPEND_NOTIFICATION_PERIOD))).thenReturn(1000000L);
+
+        _consumer = mock(Consumer.class);
+        _messageSource = mock(MessageSource.class);
+        when(_messageSource.getMessageConversionExceptionHandlingPolicy()).thenReturn(MessageSource.MessageConversionExceptionHandlingPolicy.CLOSE);
+        _messageInstance = mock(MessageInstance.class);
+        when(_messageInstance.getOwningResource()).thenReturn(_messageSource);
+        final MessageContainer messageContainer =
+                new MessageContainer(_messageInstance, mock(MessageReference.class), false);
+        when(_consumer.pullMessage()).thenReturn(messageContainer);
+        _consumerTarget = new TestAbstractConsumerTarget();
+        _consumerTarget.consumerAdded(_consumer);
+    }
+
+    public void testConversionExceptionPolicyClose() throws Exception
+    {
+        when(_consumer.acquires()).thenReturn(true);
+        when(_messageSource.getMessageConversionExceptionHandlingPolicy()).thenReturn(MessageSource.MessageConversionExceptionHandlingPolicy.CLOSE);
+
+        try
+        {
+            _consumerTarget.sendNextMessage();
+            fail("exception not thrown");
+        }
+        catch (ConnectionScopedRuntimeException e)
+        {
+            assertTrue(String.format("ConnectionScopedRuntimeException has unexpected cause '%s'",
+                                     e.getCause().getClass().getSimpleName()),
+                       e.getCause() instanceof MessageConversionException);
+        }
+        assertTrue("message credit was not restored", _consumerTarget.isCreditRestored());
+        verify(_messageInstance, never()).routeToAlternate(any(Action.class), any(ServerTransaction.class));
+    }
+
+    public void testConversionExceptionPolicyCloseForNonAcquiringConsumer() throws Exception
+    {
+        when(_consumer.acquires()).thenReturn(false);
+        when(_messageSource.getMessageConversionExceptionHandlingPolicy()).thenReturn(MessageSource.MessageConversionExceptionHandlingPolicy.CLOSE);
+
+        try
+        {
+            _consumerTarget.sendNextMessage();
+            fail("exception not thrown");
+        }
+        catch (ConnectionScopedRuntimeException e)
+        {
+            assertTrue(String.format("ConnectionScopedRuntimeException has unexpected cause '%s'",
+                                     e.getCause().getClass().getSimpleName()),
+                       e.getCause() instanceof MessageConversionException);
+        }
+        assertTrue("message credit was not restored", _consumerTarget.isCreditRestored());
+        verify(_messageInstance, never()).routeToAlternate(any(Action.class), any(ServerTransaction.class));
+    }
+
+    public void testConversionExceptionPolicyReroute() throws Exception
+    {
+        when(_consumer.acquires()).thenReturn(true);
+        when(_messageSource.getMessageConversionExceptionHandlingPolicy()).thenReturn(MessageSource.MessageConversionExceptionHandlingPolicy.ROUTE_TO_ALTERNATE);
+
+        _consumerTarget.sendNextMessage();
+        assertTrue("message credit was not restored", _consumerTarget.isCreditRestored());
+        verify(_messageInstance).routeToAlternate(any(Action.class), any(ServerTransaction.class));
+    }
+
+    public void testConversionExceptionPolicyRerouteForNonAcquiringConsumer() throws Exception
+    {
+        when(_consumer.acquires()).thenReturn(false);
+        when(_messageSource.getMessageConversionExceptionHandlingPolicy()).thenReturn(MessageSource.MessageConversionExceptionHandlingPolicy.ROUTE_TO_ALTERNATE);
+
+        _consumerTarget.sendNextMessage();
+        assertTrue("message credit was not restored", _consumerTarget.isCreditRestored());
+        verify(_messageInstance, never()).routeToAlternate(any(Action.class), any(ServerTransaction.class));
+    }
+
+    public void testConversionExceptionPolicyWhenOwningResourceIsNotMessageSource() throws Exception
+    {
+        final TransactionLogResource owningResource = mock(TransactionLogResource.class);
+        when(_messageInstance.getOwningResource()).thenReturn(owningResource);
+
+        try
+        {
+            _consumerTarget.sendNextMessage();
+            fail("exception not thrown");
+        }
+        catch (ConnectionScopedRuntimeException e)
+        {
+            assertTrue(String.format("ConnectionScopedRuntimeException has unexpected cause '%s'",
+                                     e.getCause().getClass().getSimpleName()),
+                       e.getCause() instanceof MessageConversionException);
+        }
+        assertTrue("message credit was not restored", _consumerTarget.isCreditRestored());
+        verify(_messageInstance, never()).routeToAlternate(any(Action.class), any(ServerTransaction.class));
+    }
+
+    private class TestAbstractConsumerTarget extends AbstractConsumerTarget<TestAbstractConsumerTarget>
+    {
+        private boolean _creditRestored;
+
+        TestAbstractConsumerTarget()
+        {
+            super(false, _connection);
+        }
+
+        @Override
+        protected void doSend(final MessageInstanceConsumer consumer, final MessageInstance entry, final boolean batch)
+        {
+            throw new MessageConversionException("testException");
+        }
+
+        @Override
+        public String getTargetAddress()
+        {
+            return null;
+        }
+
+        @Override
+        public void updateNotifyWorkDesired()
+        {
+
+        }
+
+        @Override
+        public AMQPSession<?, TestAbstractConsumerTarget> getSession()
+        {
+            return null;
+        }
+
+        @Override
+        public void flushBatched()
+        {
+
+        }
+
+        @Override
+        public void queueEmpty()
+        {
+
+        }
+
+        @Override
+        public boolean allocateCredit(final ServerMessage msg)
+        {
+            return false;
+        }
+
+        @Override
+        public void restoreCredit(final ServerMessage queueEntry)
+        {
+            _creditRestored = true;
+        }
+
+        public boolean isCreditRestored()
+        {
+            return _creditRestored;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/10699183/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
index ebdd4cc..e342db7 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
@@ -80,9 +80,6 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget<ConsumerTarget_0
     private final ServerSession _session;
     private final AtomicBoolean _stopped = new AtomicBoolean(true);
 
-    private final AtomicLong _unacknowledgedCount = new AtomicLong(0);
-    private final AtomicLong _unacknowledgedBytes = new AtomicLong(0);
-
     private int _deferredMessageCredit;
     private long _deferredSizeCredit;
 
@@ -368,11 +365,6 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget<ConsumerTarget_0
         _unacknowledgedCount.decrementAndGet();
     }
 
-    @Override
-    public void acquisitionRemoved(final MessageInstance entry)
-    {
-    }
-
     private void deferredAddCredit(final int deferredMessageCredit, final long deferredSizeCredit)
     {
         _deferredMessageCredit += deferredMessageCredit;
@@ -614,18 +606,6 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget<ConsumerTarget_0
     }
 
     @Override
-    public long getUnacknowledgedBytes()
-    {
-        return _unacknowledgedBytes.longValue();
-    }
-
-    @Override
-    public long getUnacknowledgedMessages()
-    {
-        return _unacknowledgedCount.longValue();
-    }
-
-    @Override
     public String toString()
     {
         return "ConsumerTarget_0_10[name=" + _name + ", session=" + _session.toLogString() + "]";

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/10699183/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index b3a3266..922040e 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -265,8 +265,7 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
                    MessageSource.ExistingExclusiveConsumer, MessageSource.ConsumerAccessRefused,
                    MessageSource.QueueDeleted
     {
-        final GetDeliveryMethod getDeliveryMethod =
-                new GetDeliveryMethod(queue);
+        final GetDeliveryMethod getDeliveryMethod = new GetDeliveryMethod(queue);
 
         ConsumerTarget_0_8 target;
         EnumSet<ConsumerOption> options = EnumSet.of(ConsumerOption.TRANSIENT, ConsumerOption.ACQUIRES,
@@ -285,9 +284,14 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
                                                              INFINITE_CREDIT_CREDIT_MANAGER, getDeliveryMethod);
         }
 
-        MessageInstanceConsumer<ConsumerTarget_0_8> sub = queue.addConsumer(target, null, AMQMessage.class, "", options, null);
+        queue.addConsumer(target, null, AMQMessage.class, "", options, null);
         target.updateNotifyWorkDesired();
-        target.sendNextMessage();
+        boolean canCallSendNextMessageAgain;
+        do
+        {
+            canCallSendNextMessageAgain = target.sendNextMessage();
+        }
+        while (canCallSendNextMessageAgain && !getDeliveryMethod.hasDeliveredMessage());
         target.close();
         return getDeliveryMethod.hasDeliveredMessage();
     }
@@ -1291,7 +1295,7 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
         }
 
         @Override
-        public long deliverToClient(final ConsumerTarget_0_8 target, final ServerMessage message,
+        public long deliverToClient(final ConsumerTarget_0_8 target, final AMQMessage message,
                                     final InstanceProperties props, final long deliveryTag)
         {
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/10699183/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
index 2f55bb5..b7f2bb5 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
@@ -54,7 +54,6 @@ import org.apache.qpid.server.properties.ConnectionStartProperties;
 import org.apache.qpid.server.protocol.ErrorCodes;
 import org.apache.qpid.server.logging.messages.ConnectionMessages;
 import org.apache.qpid.server.message.InstanceProperties;
-import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.Connection;
 import org.apache.qpid.server.model.NamedAddressSpace;
@@ -1250,7 +1249,7 @@ public class AMQPConnection_0_8Impl
         }
 
         @Override
-        public long deliverToClient(final ConsumerTarget_0_8 target, final ServerMessage message,
+        public long deliverToClient(final ConsumerTarget_0_8 target, final AMQMessage message,
                                     final InstanceProperties props, final long deliveryTag)
         {
             long size = _protocolOutputConverter.writeDeliver(message,

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/10699183/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java
index bcb3294..95ada00 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java
@@ -21,10 +21,9 @@
 package org.apache.qpid.server.protocol.v0_8;
 
 import org.apache.qpid.server.message.InstanceProperties;
-import org.apache.qpid.server.message.ServerMessage;
 
 public interface ClientDeliveryMethod
 {
-    long deliverToClient(final ConsumerTarget_0_8 target, final ServerMessage message, final InstanceProperties props,
+    long deliverToClient(final ConsumerTarget_0_8 target, final AMQMessage message, final InstanceProperties props,
                          final long deliveryTag);
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/10699183/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
index 57b9bdd..23d8c0e 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
@@ -20,9 +20,6 @@
  */
 package org.apache.qpid.server.protocol.v0_8;
 
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.qpid.server.QpidException;
 import org.apache.qpid.server.filter.AMQPFilterTypes;
 import org.apache.qpid.server.consumer.AbstractConsumerTarget;
 import org.apache.qpid.server.flow.FlowCreditManager;
@@ -32,6 +29,8 @@ import org.apache.qpid.server.message.MessageInstance.EntryState;
 import org.apache.qpid.server.message.MessageInstanceConsumer;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.plugin.MessageConverter;
+import org.apache.qpid.server.protocol.MessageConverterRegistry;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.StateChangeListener;
@@ -47,8 +46,6 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget<Consumer
 
     private final ClientDeliveryMethod _deliveryMethod;
 
-    private final AtomicLong _unacknowledgedCount = new AtomicLong(0);
-    private final AtomicLong _unacknowledgedBytes = new AtomicLong(0);
     private final String _targetAddress;
 
 
@@ -82,28 +79,18 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget<Consumer
                   filters, creditManager, deliveryMethod, multiQueue);
         }
 
-        /**
-         * This method can be called by each of the publisher threads. As a result all changes to the channel object must be
-         * thread safe.
-         *
-         *
-         *
-         * @param consumer
-         * @param entry
-         * @param batch
-         * @throws QpidException
-         */
         @Override
-        public void doSend(final MessageInstanceConsumer consumer, MessageInstance entry, boolean batch)
+        protected void doSendInternal(final MessageInstanceConsumer consumer,
+                                      final MessageInstance entry,
+                                      final AMQMessage message,
+                                      final boolean batch)
         {
             // We don't decrement the reference here as we don't want to consume the message
             // but we do want to send it to the client.
 
             long deliveryTag = getChannel().getNextDeliveryTag();
-            sendToClient(consumer, entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
-
+            sendToClient(consumer, message, entry.getInstanceProperties(), deliveryTag);
         }
-
     }
 
     public static ConsumerTarget_0_8 createNoAckTarget(AMQChannel channel,
@@ -131,16 +118,11 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget<Consumer
             _txn = new AutoCommitTransaction(channel.getAddressSpace().getMessageStore());
         }
 
-        /**
-         * This method can be called by each of the publisher threads. As a result all changes to the channel object must be
-         * thread safe.
-         *
-         * @param consumer
-         * @param entry   The message to send
-         * @param batch
-         */
         @Override
-        public void doSend(final MessageInstanceConsumer consumer, MessageInstance entry, boolean batch)
+        protected void doSendInternal(final MessageInstanceConsumer consumer,
+                                      final MessageInstance entry,
+                                      final AMQMessage message,
+                                      final boolean batch)
         {
             // if we do not need to wait for client acknowledgements
             // we can decrement the reference count immediately.
@@ -153,17 +135,15 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget<Consumer
             // the message is unacked, it will be lost.
             _txn.dequeue(entry.getEnqueueRecord(), NOOP);
 
-            ServerMessage message = entry.getMessage();
-            MessageReference ref = message.newReference();
-            InstanceProperties props = entry.getInstanceProperties();
-            entry.delete();
-            getChannel().getConnection().setDeferFlush(batch);
-            long deliveryTag = getChannel().getNextDeliveryTag();
-
-            sendToClient(consumer, message, props, deliveryTag);
-
-            ref.release();
+            try( MessageReference ref = entry.getMessage().newReference())
+            {
+                InstanceProperties props = entry.getInstanceProperties();
+                entry.delete();
+                getChannel().getConnection().setDeferFlush(batch);
+                long deliveryTag = getChannel().getNextDeliveryTag();
 
+                sendToClient(consumer, message, props, deliveryTag);
+            }
         }
 
         private static final ServerTransaction.Action NOOP =
@@ -234,20 +214,13 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget<Consumer
             _usesCredit = usesCredit;
         }
 
-        /**
-         * This method can be called by each of the publisher threads. As a result all changes to the channel object must be
-         * thread safe.
-         *
-         * @param consumer
-         * @param entry   The message to send
-         * @param batch
-         */
         @Override
-        public void doSend(final MessageInstanceConsumer consumer, MessageInstance entry, boolean batch)
+        protected void doSendInternal(final MessageInstanceConsumer consumer,
+                                      final MessageInstance entry,
+                                      final AMQMessage message,
+                                      final boolean batch)
         {
-
             // put queue entry on a list and then notify the connection to read list.
-
             synchronized (getChannel())
             {
                 getChannel().getConnection().setDeferFlush(batch);
@@ -255,17 +228,10 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget<Consumer
 
                 addUnacknowledgedMessage(entry);
                 getChannel().addUnacknowledgedMessage(entry, deliveryTag, consumer, _usesCredit);
-                long size = sendToClient(consumer, entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
+                sendToClient(consumer, message, entry.getInstanceProperties(), deliveryTag);
                 entry.incrementDeliveryCount();
             }
-
-
         }
-
-
-
-
-
     }
 
     private final AMQChannel _channel;
@@ -399,7 +365,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget<Consumer
         updateNotifyWorkDesired();
     }
 
-    protected long sendToClient(final MessageInstanceConsumer consumer, final ServerMessage message,
+    protected long sendToClient(final MessageInstanceConsumer consumer, final AMQMessage message,
                                 final InstanceProperties props,
                                 final long deliveryTag)
     {
@@ -425,6 +391,41 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget<Consumer
     }
 
     @Override
+    final protected void doSend(final MessageInstanceConsumer consumer, final MessageInstance entry, final boolean batch)
+    {
+        ServerMessage serverMessage = entry.getMessage();
+        MessageConverter<ServerMessage<?>, AMQMessage> messageConverter = null;
+        final AMQMessage msg;
+        if(serverMessage instanceof AMQMessage)
+        {
+            msg = (AMQMessage) serverMessage;
+        }
+        else
+        {
+            messageConverter = MessageConverterRegistry.getConverter((Class<ServerMessage<?>>) serverMessage.getClass(), AMQMessage.class);
+            msg = messageConverter.convert(serverMessage, getConnection().getAddressSpace());
+        }
+
+        try
+        {
+            doSendInternal(consumer, entry, msg, batch);
+        }
+        finally
+        {
+            if(messageConverter != null)
+            {
+                messageConverter.dispose(msg);
+            }
+        }
+    }
+
+    protected abstract void doSendInternal(final MessageInstanceConsumer consumer,
+                                           final MessageInstance entry,
+                                           final AMQMessage message,
+                                           final boolean batch);
+
+
+    @Override
     public void flushBatched()
     {
         _channel.getConnection().setDeferFlush(false);
@@ -443,23 +444,6 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget<Consumer
         _unacknowledgedCount.decrementAndGet();
     }
 
-    @Override
-    public void acquisitionRemoved(final MessageInstance node)
-    {
-    }
-
-    @Override
-    public long getUnacknowledgedBytes()
-    {
-        return _unacknowledgedBytes.longValue();
-    }
-
-    @Override
-    public long getUnacknowledgedMessages()
-    {
-        return _unacknowledgedCount.longValue();
-    }
-
     private final StateChangeListener<MessageInstance, EntryState> _unacknowledgedMessageListener = new StateChangeListener<MessageInstance, EntryState>()
     {
         @Override

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/10699183/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverter.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverter.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverter.java
index 39b4f6e..36085e4 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverter.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverter.java
@@ -31,18 +31,17 @@ import org.apache.qpid.server.protocol.v0_8.transport.ContentHeaderBody;
 import org.apache.qpid.server.protocol.v0_8.transport.MessagePublishInfo;
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageContentSource;
-import org.apache.qpid.server.message.ServerMessage;
 
 public interface ProtocolOutputConverter
 {
     void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag);
 
-    long writeDeliver(final ServerMessage msg,
+    long writeDeliver(final AMQMessage msg,
                       final InstanceProperties props, int channelId,
                       long deliveryTag,
                       AMQShortString consumerTag);
 
-    long writeGetOk(final ServerMessage msg,
+    long writeGetOk(final AMQMessage msg,
                     final InstanceProperties props,
                     int channelId,
                     long deliveryTag,

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/10699183/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
index e1f283a..c099c0d 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
@@ -42,7 +42,6 @@ import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageContentSource;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.plugin.MessageConverter;
-import org.apache.qpid.server.protocol.MessageConverterRegistry;
 import org.apache.qpid.server.transport.ByteBufferSender;
 import org.apache.qpid.server.util.GZIPUtils;
 
@@ -61,36 +60,14 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
 
 
     @Override
-    public long writeDeliver(final ServerMessage m,
+    public long writeDeliver(final AMQMessage msg,
                              final InstanceProperties props, int channelId,
                              long deliveryTag,
                              AMQShortString consumerTag)
     {
-        MessageConverter<ServerMessage, AMQMessage> messageConverter = null;
-        final AMQMessage msg;
-        if(m instanceof AMQMessage)
-        {
-            msg = (AMQMessage) m;
-        }
-        else
-        {
-            messageConverter = getMessageConverter(m);
-            msg = messageConverter.convert(m, _connection.getAddressSpace());
-        }
         final boolean isRedelivered = Boolean.TRUE.equals(props.getProperty(InstanceProperties.Property.REDELIVERED));
         AMQBody deliverBody = createEncodedDeliverBody(msg, isRedelivered, deliveryTag, consumerTag);
-        final long result = writeMessageDelivery(msg, channelId, deliverBody);
-        if(messageConverter != null)
-        {
-            messageConverter.dispose(msg);
-        }
-        return result;
-    }
-
-    private <M extends ServerMessage> MessageConverter<M, AMQMessage> getMessageConverter(M message)
-    {
-        Class<M> clazz = (Class<M>) message.getClass();
-        return MessageConverterRegistry.getConverter(clazz, AMQMessage.class);
+        return writeMessageDelivery(msg, channelId, deliverBody);
     }
 
     private long writeMessageDelivery(AMQMessage message, int channelId, AMQBody deliverBody)
@@ -304,30 +281,14 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
     }
 
     @Override
-    public long writeGetOk(final ServerMessage msg,
+    public long writeGetOk(final AMQMessage amqMessage,
                            final InstanceProperties props,
                            int channelId,
                            long deliveryTag,
                            int queueSize)
     {
-        final AMQMessage amqMessage;
-        MessageConverter<ServerMessage, AMQMessage> messageConverter = null;
-        if(msg instanceof AMQMessage)
-        {
-            amqMessage = (AMQMessage) msg;
-        }
-        else
-        {
-            messageConverter = getMessageConverter(msg);
-            amqMessage = messageConverter.convert(msg, _connection.getAddressSpace());
-        }
         AMQBody deliver = createEncodedGetOkBody(amqMessage, props, deliveryTag, queueSize);
-        final long result = writeMessageDelivery(amqMessage, channelId, deliver);
-        if(messageConverter != null)
-        {
-            messageConverter.dispose(amqMessage);
-        }
-        return result;
+        return writeMessageDelivery(amqMessage, channelId, deliver);
     }
 
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/10699183/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index af25173..fc7d334 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -72,8 +72,6 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0>
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerTarget_1_0.class);
     private final boolean _acquires;
-    private final AtomicLong _unacknowledgedCount = new AtomicLong(0);
-    private final AtomicLong _unacknowledgedBytes = new AtomicLong(0);
 
     private long _deliveryTag = 0L;
 
@@ -373,11 +371,6 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0>
         return _linkEndpoint.getSession();
     }
 
-    public void flush()
-    {
-        while(sendNextMessage());
-    }
-
     private class DispositionAction implements UnsettledAction
     {
 
@@ -655,29 +648,12 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0>
     }
 
     @Override
-    public void acquisitionRemoved(final MessageInstance node)
-    {
-    }
-
-    @Override
     public String getTargetAddress()
     {
         return _linkEndpoint.getTarget().getAddress();
     }
 
     @Override
-    public long getUnacknowledgedBytes()
-    {
-        return _unacknowledgedBytes.get();
-    }
-
-    @Override
-    public long getUnacknowledgedMessages()
-    {
-        return _unacknowledgedCount.get();
-    }
-
-    @Override
     public String toString()
     {
         return "ConsumerTarget_1_0[linkSession=" + _linkEndpoint.getSession().toLogString() + "]";

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/10699183/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
----------------------------------------------------------------------
diff --git a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
index e72ca8b..6af70b0 100644
--- a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
+++ b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
@@ -1581,6 +1581,12 @@ class ManagementNode implements MessageSource, MessageDestination, BaseQueue
         _consumers.remove(managementNodeConsumer);
     }
 
+    @Override
+    public MessageConversionExceptionHandlingPolicy getMessageConversionExceptionHandlingPolicy()
+    {
+        return MessageConversionExceptionHandlingPolicy.CLOSE;
+    }
+
     private AmqpConnectionMetaData getCallerConnectionMetaData()
     {
         Subject currentSubject = Subject.getSubject(AccessController.getContext());

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/10699183/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java
----------------------------------------------------------------------
diff --git a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java
index 6be1fb3..0e546f1 100644
--- a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java
+++ b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java
@@ -194,6 +194,12 @@ public class ProxyMessageSource implements MessageSource, MessageDestination
         return session.getConnectionReference() == _connectionReference;
     }
 
+    @Override
+    public MessageConversionExceptionHandlingPolicy getMessageConversionExceptionHandlingPolicy()
+    {
+        return MessageConversionExceptionHandlingPolicy.CLOSE;
+    }
+
     private class WrappingTarget<T extends ConsumerTarget<T>> implements ConsumerTarget<WrappingTarget<T>>
     {
         private final T _underlying;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org