You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2017/08/14 13:46:14 UTC
qpid-broker-j git commit: QPID-7887: [Java Broker] Message conversion
error handling
Repository: qpid-broker-j
Updated Branches:
refs/heads/master 54e9e100a -> 10699183d
QPID-7887: [Java Broker] Message conversion error handling
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/10699183
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/10699183
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/10699183
Branch: refs/heads/master
Commit: 10699183d3874db216f01a8f1d9a5cd07f21c680
Parents: 54e9e10
Author: Lorenz Quack <lq...@apache.org>
Authored: Mon Aug 14 12:06:00 2017 +0100
Committer: Keith Wall <kw...@apache.org>
Committed: Mon Aug 14 14:45:53 2017 +0100
----------------------------------------------------------------------
.../server/consumer/AbstractConsumerTarget.java | 80 ++++++-
.../qpid/server/message/MessageSource.java | 6 +
.../org/apache/qpid/server/model/Queue.java | 9 +
.../apache/qpid/server/queue/AbstractQueue.java | 10 +
.../security/TrustStoreMessageSource.java | 3 +-
.../AbstractSystemMessageSource.java | 6 +
.../consumer/AbstractConsumerTargetTest.java | 216 +++++++++++++++++++
.../protocol/v0_10/ConsumerTarget_0_10.java | 20 --
.../qpid/server/protocol/v0_8/AMQChannel.java | 14 +-
.../protocol/v0_8/AMQPConnection_0_8Impl.java | 3 +-
.../protocol/v0_8/ClientDeliveryMethod.java | 3 +-
.../protocol/v0_8/ConsumerTarget_0_8.java | 136 ++++++------
.../protocol/v0_8/ProtocolOutputConverter.java | 5 +-
.../v0_8/ProtocolOutputConverterImpl.java | 47 +---
.../protocol/v1_0/ConsumerTarget_1_0.java | 24 ---
.../server/management/amqp/ManagementNode.java | 6 +
.../management/amqp/ProxyMessageSource.java | 6 +
17 files changed, 416 insertions(+), 178 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/10699183/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java b/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
index 09768b3..aacf022 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
@@ -25,22 +25,32 @@ import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.messages.SubscriptionMessages;
+import org.apache.qpid.server.message.MessageContainer;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageInstanceConsumer;
+import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.model.Consumer;
-import org.apache.qpid.server.message.MessageContainer;
+import org.apache.qpid.server.protocol.converter.MessageConversionException;
import org.apache.qpid.server.queue.SuspendedConsumerLoggingTicker;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.transport.AMQPConnection;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
public abstract class AbstractConsumerTarget<T extends AbstractConsumerTarget<T>> implements ConsumerTarget<T>
{
+ private static final Logger LOGGER = LoggerFactory.getLogger(AbstractConsumerTarget.class);
+
private static final LogSubject MULTI_QUEUE_LOG_SUBJECT = new LogSubject()
{
@Override
@@ -49,6 +59,8 @@ public abstract class AbstractConsumerTarget<T extends AbstractConsumerTarget<T>
return "[(** Multi-Queue **)] ";
}
};
+ protected final AtomicLong _unacknowledgedBytes = new AtomicLong(0);
+ protected final AtomicLong _unacknowledgedCount = new AtomicLong(0);
private final AtomicReference<State> _state = new AtomicReference<>(State.OPEN);
private final boolean _isMultiQueue;
@@ -87,6 +99,12 @@ public abstract class AbstractConsumerTarget<T extends AbstractConsumerTarget<T>
}
@Override
+ public void acquisitionRemoved(final MessageInstance node)
+ {
+
+ }
+
+ @Override
public boolean isMultiQueue()
{
return _isMultiQueue;
@@ -214,6 +232,18 @@ public abstract class AbstractConsumerTarget<T extends AbstractConsumerTarget<T>
}
}
+ @Override
+ public long getUnacknowledgedMessages()
+ {
+ return _unacknowledgedCount.longValue();
+ }
+
+ @Override
+ public long getUnacknowledgedBytes()
+ {
+ return _unacknowledgedBytes.longValue();
+ }
+
protected abstract void doSend(final MessageInstanceConsumer consumer, MessageInstance entry, boolean batch);
@@ -249,6 +279,54 @@ public abstract class AbstractConsumerTarget<T extends AbstractConsumerTarget<T>
{
send(consumer, entry, false);
}
+ catch (MessageConversionException mce)
+ {
+ restoreCredit(entry.getMessage());
+ final TransactionLogResource owningResource = entry.getOwningResource();
+ if (owningResource instanceof MessageSource)
+ {
+ final MessageSource.MessageConversionExceptionHandlingPolicy handlingPolicy =
+ ((MessageSource) owningResource).getMessageConversionExceptionHandlingPolicy();
+ switch(handlingPolicy)
+ {
+ case CLOSE:
+ entry.release(consumer);
+ throw new ConnectionScopedRuntimeException(String.format(
+ "Unable to convert message %s for this consumer",
+ entry.getMessage()), mce);
+ case ROUTE_TO_ALTERNATE:
+ if (consumer.acquires())
+ {
+ int enqueues = entry.routeToAlternate(null, null);
+ if (enqueues == 0)
+ {
+ LOGGER.info("Failed to convert message {} for this consumer because '{}'."
+ + " Message discarded.", entry.getMessage(), mce.getMessage());
+
+ }
+ else
+ {
+ LOGGER.info("Failed to convert message {} for this consumer because '{}'."
+ + " Message routed to alternate.", entry.getMessage(), mce.getMessage());
+ }
+ }
+ else
+ {
+ LOGGER.info("Failed to convert message {} for this browser because '{}'."
+ + " Message skipped.", entry.getMessage(), mce.getMessage());
+ }
+ break;
+ default:
+ throw new ServerScopedRuntimeException("Unrecognised policy " + handlingPolicy);
+ }
+ }
+ else
+ {
+ throw new ConnectionScopedRuntimeException(String.format(
+ "Unable to convert message %s for this consumer",
+ entry.getMessage()), mce);
+ }
+ }
finally
{
if (messageContainer.getMessageReference() != null)
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/10699183/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java b/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
index e3d380d..9446be3 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
@@ -90,5 +90,11 @@ public interface MessageSource extends TransactionLogResource, MessageNode
}
}
+ enum MessageConversionExceptionHandlingPolicy
+ {
+ CLOSE,
+ ROUTE_TO_ALTERNATE
+ }
+ MessageConversionExceptionHandlingPolicy getMessageConversionExceptionHandlingPolicy();
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/10699183/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
index cd84c3f..3e88324 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
@@ -121,6 +121,11 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>,
+ "none is explicitly set")
String DEFAULT_ENSURE_NON_DESTRUCTIVE_CONSUMERS = "false";
+ String MESSAGE_CONVERSION_EXCEPTION_HANDLING_POLICY = "qpid.queue.messageConversion.exceptionHandlingPolicy";
+
+ @ManagedContextDefault( name = MESSAGE_CONVERSION_EXCEPTION_HANDLING_POLICY,
+ description = "The behaviour of consumer if it tries to consumer a messages that it cannot convert.")
+ MessageConversionExceptionHandlingPolicy DEFAULT_MESSAGE_CONVERSION_EXCEPTION_HANDLING_POLICY = MessageConversionExceptionHandlingPolicy.CLOSE;
@SuppressWarnings("unused")
@ManagedAttribute( defaultValue = "${queue.defaultEnsureNonDestructiveConsumers}" )
@@ -175,6 +180,10 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>,
@DerivedAttribute
boolean isQueueFlowStopped();
+ @Override
+ @DerivedAttribute
+ MessageConversionExceptionHandlingPolicy getMessageConversionExceptionHandlingPolicy();
+
@SuppressWarnings("unused")
@ManagedContextDefault( name = "queue.alertThresholdMessageAge")
long DEFAULT_ALERT_THRESHOLD_MESSAGE_AGE = 0L;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/10699183/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 0e72802..a6e7386 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -92,6 +92,7 @@ import org.apache.qpid.server.message.MessageInfoImpl;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.MessageSender;
+import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.RejectType;
import org.apache.qpid.server.message.RoutingResult;
import org.apache.qpid.server.message.ServerMessage;
@@ -269,6 +270,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
private volatile OverflowPolicyHandler _postEnqueueOverflowPolicyHandler;
private long _flowToDiskThreshold;
private volatile MessageDestination _alternateBindingDestination;
+ private volatile MessageConversionExceptionHandlingPolicy _messageConversionExceptionHandlingPolicy;
private interface HoldMethod
{
@@ -503,6 +505,8 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
_mimeTypeToFileExtension = getContextValue(Map.class, MAP_OF_STRING_STRING, MIME_TYPE_TO_FILE_EXTENSION);
+ _messageConversionExceptionHandlingPolicy = getContextValue(MessageConversionExceptionHandlingPolicy.class, MESSAGE_CONVERSION_EXCEPTION_HANDLING_POLICY);
+
_flowToDiskThreshold = getAncestor(Broker.class).getFlowToDiskThreshold();
if(_defaultFilters != null)
@@ -3542,6 +3546,12 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
}
+ @Override
+ public MessageConversionExceptionHandlingPolicy getMessageConversionExceptionHandlingPolicy()
+ {
+ return _messageConversionExceptionHandlingPolicy;
+ }
+
private void validateOrCreateAlternateBinding(final Queue<?> queue, final boolean mayCreate)
{
Object value = queue.getAttribute(ALTERNATE_BINDING);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/10699183/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java b/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java
index 6e7cb94..4643eb5 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java
@@ -38,7 +38,6 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.server.consumer.ConsumerOption;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.filter.FilterManager;
-import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.message.internal.InternalMessage;
import org.apache.qpid.server.message.internal.InternalMessageHeader;
@@ -49,7 +48,7 @@ import org.apache.qpid.server.model.TrustStore;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.virtualhost.AbstractSystemMessageSource;
-public class TrustStoreMessageSource extends AbstractSystemMessageSource implements MessageSource
+public class TrustStoreMessageSource extends AbstractSystemMessageSource
{
private static final Logger LOGGER = LoggerFactory.getLogger(TrustStoreMessageSource.class);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/10699183/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
index df9f0fe..b77e2a4 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
@@ -109,6 +109,12 @@ public abstract class AbstractSystemMessageSource implements MessageSource
return true;
}
+ @Override
+ public MessageConversionExceptionHandlingPolicy getMessageConversionExceptionHandlingPolicy()
+ {
+ return MessageConversionExceptionHandlingPolicy.CLOSE;
+ }
+
protected class Consumer<T extends ConsumerTarget> implements MessageInstanceConsumer<T>
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/10699183/broker-core/src/test/java/org/apache/qpid/server/consumer/AbstractConsumerTargetTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/consumer/AbstractConsumerTargetTest.java b/broker-core/src/test/java/org/apache/qpid/server/consumer/AbstractConsumerTargetTest.java
new file mode 100644
index 0000000..202d5cc
--- /dev/null
+++ b/broker-core/src/test/java/org/apache/qpid/server/consumer/AbstractConsumerTargetTest.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.consumer;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.qpid.server.message.MessageContainer;
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
+import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.Consumer;
+import org.apache.qpid.server.protocol.converter.MessageConversionException;
+import org.apache.qpid.server.session.AMQPSession;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.transport.AMQPConnection;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+public class AbstractConsumerTargetTest extends QpidTestCase
+{
+
+ TestAbstractConsumerTarget _consumerTarget;
+ private Consumer _consumer;
+ private MessageSource _messageSource;
+ AMQPConnection _connection = mock(AMQPConnection.class);
+ private MessageInstance _messageInstance;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ when(_connection.getContextValue(eq(Long.class),
+ eq(Consumer.SUSPEND_NOTIFICATION_PERIOD))).thenReturn(1000000L);
+
+ _consumer = mock(Consumer.class);
+ _messageSource = mock(MessageSource.class);
+ when(_messageSource.getMessageConversionExceptionHandlingPolicy()).thenReturn(MessageSource.MessageConversionExceptionHandlingPolicy.CLOSE);
+ _messageInstance = mock(MessageInstance.class);
+ when(_messageInstance.getOwningResource()).thenReturn(_messageSource);
+ final MessageContainer messageContainer =
+ new MessageContainer(_messageInstance, mock(MessageReference.class), false);
+ when(_consumer.pullMessage()).thenReturn(messageContainer);
+ _consumerTarget = new TestAbstractConsumerTarget();
+ _consumerTarget.consumerAdded(_consumer);
+ }
+
+ public void testConversionExceptionPolicyClose() throws Exception
+ {
+ when(_consumer.acquires()).thenReturn(true);
+ when(_messageSource.getMessageConversionExceptionHandlingPolicy()).thenReturn(MessageSource.MessageConversionExceptionHandlingPolicy.CLOSE);
+
+ try
+ {
+ _consumerTarget.sendNextMessage();
+ fail("exception not thrown");
+ }
+ catch (ConnectionScopedRuntimeException e)
+ {
+ assertTrue(String.format("ConnectionScopedRuntimeException has unexpected cause '%s'",
+ e.getCause().getClass().getSimpleName()),
+ e.getCause() instanceof MessageConversionException);
+ }
+ assertTrue("message credit was not restored", _consumerTarget.isCreditRestored());
+ verify(_messageInstance, never()).routeToAlternate(any(Action.class), any(ServerTransaction.class));
+ }
+
+ public void testConversionExceptionPolicyCloseForNonAcquiringConsumer() throws Exception
+ {
+ when(_consumer.acquires()).thenReturn(false);
+ when(_messageSource.getMessageConversionExceptionHandlingPolicy()).thenReturn(MessageSource.MessageConversionExceptionHandlingPolicy.CLOSE);
+
+ try
+ {
+ _consumerTarget.sendNextMessage();
+ fail("exception not thrown");
+ }
+ catch (ConnectionScopedRuntimeException e)
+ {
+ assertTrue(String.format("ConnectionScopedRuntimeException has unexpected cause '%s'",
+ e.getCause().getClass().getSimpleName()),
+ e.getCause() instanceof MessageConversionException);
+ }
+ assertTrue("message credit was not restored", _consumerTarget.isCreditRestored());
+ verify(_messageInstance, never()).routeToAlternate(any(Action.class), any(ServerTransaction.class));
+ }
+
+ public void testConversionExceptionPolicyReroute() throws Exception
+ {
+ when(_consumer.acquires()).thenReturn(true);
+ when(_messageSource.getMessageConversionExceptionHandlingPolicy()).thenReturn(MessageSource.MessageConversionExceptionHandlingPolicy.ROUTE_TO_ALTERNATE);
+
+ _consumerTarget.sendNextMessage();
+ assertTrue("message credit was not restored", _consumerTarget.isCreditRestored());
+ verify(_messageInstance).routeToAlternate(any(Action.class), any(ServerTransaction.class));
+ }
+
+ public void testConversionExceptionPolicyRerouteForNonAcquiringConsumer() throws Exception
+ {
+ when(_consumer.acquires()).thenReturn(false);
+ when(_messageSource.getMessageConversionExceptionHandlingPolicy()).thenReturn(MessageSource.MessageConversionExceptionHandlingPolicy.ROUTE_TO_ALTERNATE);
+
+ _consumerTarget.sendNextMessage();
+ assertTrue("message credit was not restored", _consumerTarget.isCreditRestored());
+ verify(_messageInstance, never()).routeToAlternate(any(Action.class), any(ServerTransaction.class));
+ }
+
+ public void testConversionExceptionPolicyWhenOwningResourceIsNotMessageSource() throws Exception
+ {
+ final TransactionLogResource owningResource = mock(TransactionLogResource.class);
+ when(_messageInstance.getOwningResource()).thenReturn(owningResource);
+
+ try
+ {
+ _consumerTarget.sendNextMessage();
+ fail("exception not thrown");
+ }
+ catch (ConnectionScopedRuntimeException e)
+ {
+ assertTrue(String.format("ConnectionScopedRuntimeException has unexpected cause '%s'",
+ e.getCause().getClass().getSimpleName()),
+ e.getCause() instanceof MessageConversionException);
+ }
+ assertTrue("message credit was not restored", _consumerTarget.isCreditRestored());
+ verify(_messageInstance, never()).routeToAlternate(any(Action.class), any(ServerTransaction.class));
+ }
+
+ private class TestAbstractConsumerTarget extends AbstractConsumerTarget<TestAbstractConsumerTarget>
+ {
+ private boolean _creditRestored;
+
+ TestAbstractConsumerTarget()
+ {
+ super(false, _connection);
+ }
+
+ @Override
+ protected void doSend(final MessageInstanceConsumer consumer, final MessageInstance entry, final boolean batch)
+ {
+ throw new MessageConversionException("testException");
+ }
+
+ @Override
+ public String getTargetAddress()
+ {
+ return null;
+ }
+
+ @Override
+ public void updateNotifyWorkDesired()
+ {
+
+ }
+
+ @Override
+ public AMQPSession<?, TestAbstractConsumerTarget> getSession()
+ {
+ return null;
+ }
+
+ @Override
+ public void flushBatched()
+ {
+
+ }
+
+ @Override
+ public void queueEmpty()
+ {
+
+ }
+
+ @Override
+ public boolean allocateCredit(final ServerMessage msg)
+ {
+ return false;
+ }
+
+ @Override
+ public void restoreCredit(final ServerMessage queueEntry)
+ {
+ _creditRestored = true;
+ }
+
+ public boolean isCreditRestored()
+ {
+ return _creditRestored;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/10699183/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
index ebdd4cc..e342db7 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
@@ -80,9 +80,6 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget<ConsumerTarget_0
private final ServerSession _session;
private final AtomicBoolean _stopped = new AtomicBoolean(true);
- private final AtomicLong _unacknowledgedCount = new AtomicLong(0);
- private final AtomicLong _unacknowledgedBytes = new AtomicLong(0);
-
private int _deferredMessageCredit;
private long _deferredSizeCredit;
@@ -368,11 +365,6 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget<ConsumerTarget_0
_unacknowledgedCount.decrementAndGet();
}
- @Override
- public void acquisitionRemoved(final MessageInstance entry)
- {
- }
-
private void deferredAddCredit(final int deferredMessageCredit, final long deferredSizeCredit)
{
_deferredMessageCredit += deferredMessageCredit;
@@ -614,18 +606,6 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget<ConsumerTarget_0
}
@Override
- public long getUnacknowledgedBytes()
- {
- return _unacknowledgedBytes.longValue();
- }
-
- @Override
- public long getUnacknowledgedMessages()
- {
- return _unacknowledgedCount.longValue();
- }
-
- @Override
public String toString()
{
return "ConsumerTarget_0_10[name=" + _name + ", session=" + _session.toLogString() + "]";
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/10699183/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index b3a3266..922040e 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -265,8 +265,7 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
MessageSource.ExistingExclusiveConsumer, MessageSource.ConsumerAccessRefused,
MessageSource.QueueDeleted
{
- final GetDeliveryMethod getDeliveryMethod =
- new GetDeliveryMethod(queue);
+ final GetDeliveryMethod getDeliveryMethod = new GetDeliveryMethod(queue);
ConsumerTarget_0_8 target;
EnumSet<ConsumerOption> options = EnumSet.of(ConsumerOption.TRANSIENT, ConsumerOption.ACQUIRES,
@@ -285,9 +284,14 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
INFINITE_CREDIT_CREDIT_MANAGER, getDeliveryMethod);
}
- MessageInstanceConsumer<ConsumerTarget_0_8> sub = queue.addConsumer(target, null, AMQMessage.class, "", options, null);
+ queue.addConsumer(target, null, AMQMessage.class, "", options, null);
target.updateNotifyWorkDesired();
- target.sendNextMessage();
+ boolean canCallSendNextMessageAgain;
+ do
+ {
+ canCallSendNextMessageAgain = target.sendNextMessage();
+ }
+ while (canCallSendNextMessageAgain && !getDeliveryMethod.hasDeliveredMessage());
target.close();
return getDeliveryMethod.hasDeliveredMessage();
}
@@ -1291,7 +1295,7 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
}
@Override
- public long deliverToClient(final ConsumerTarget_0_8 target, final ServerMessage message,
+ public long deliverToClient(final ConsumerTarget_0_8 target, final AMQMessage message,
final InstanceProperties props, final long deliveryTag)
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/10699183/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
index 2f55bb5..b7f2bb5 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
@@ -54,7 +54,6 @@ import org.apache.qpid.server.properties.ConnectionStartProperties;
import org.apache.qpid.server.protocol.ErrorCodes;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.message.InstanceProperties;
-import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Connection;
import org.apache.qpid.server.model.NamedAddressSpace;
@@ -1250,7 +1249,7 @@ public class AMQPConnection_0_8Impl
}
@Override
- public long deliverToClient(final ConsumerTarget_0_8 target, final ServerMessage message,
+ public long deliverToClient(final ConsumerTarget_0_8 target, final AMQMessage message,
final InstanceProperties props, final long deliveryTag)
{
long size = _protocolOutputConverter.writeDeliver(message,
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/10699183/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java
index bcb3294..95ada00 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java
@@ -21,10 +21,9 @@
package org.apache.qpid.server.protocol.v0_8;
import org.apache.qpid.server.message.InstanceProperties;
-import org.apache.qpid.server.message.ServerMessage;
public interface ClientDeliveryMethod
{
- long deliverToClient(final ConsumerTarget_0_8 target, final ServerMessage message, final InstanceProperties props,
+ long deliverToClient(final ConsumerTarget_0_8 target, final AMQMessage message, final InstanceProperties props,
final long deliveryTag);
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/10699183/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
index 57b9bdd..23d8c0e 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
@@ -20,9 +20,6 @@
*/
package org.apache.qpid.server.protocol.v0_8;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.qpid.server.QpidException;
import org.apache.qpid.server.filter.AMQPFilterTypes;
import org.apache.qpid.server.consumer.AbstractConsumerTarget;
import org.apache.qpid.server.flow.FlowCreditManager;
@@ -32,6 +29,8 @@ import org.apache.qpid.server.message.MessageInstance.EntryState;
import org.apache.qpid.server.message.MessageInstanceConsumer;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.plugin.MessageConverter;
+import org.apache.qpid.server.protocol.MessageConverterRegistry;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.StateChangeListener;
@@ -47,8 +46,6 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget<Consumer
private final ClientDeliveryMethod _deliveryMethod;
- private final AtomicLong _unacknowledgedCount = new AtomicLong(0);
- private final AtomicLong _unacknowledgedBytes = new AtomicLong(0);
private final String _targetAddress;
@@ -82,28 +79,18 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget<Consumer
filters, creditManager, deliveryMethod, multiQueue);
}
- /**
- * This method can be called by each of the publisher threads. As a result all changes to the channel object must be
- * thread safe.
- *
- *
- *
- * @param consumer
- * @param entry
- * @param batch
- * @throws QpidException
- */
@Override
- public void doSend(final MessageInstanceConsumer consumer, MessageInstance entry, boolean batch)
+ protected void doSendInternal(final MessageInstanceConsumer consumer,
+ final MessageInstance entry,
+ final AMQMessage message,
+ final boolean batch)
{
// We don't decrement the reference here as we don't want to consume the message
// but we do want to send it to the client.
long deliveryTag = getChannel().getNextDeliveryTag();
- sendToClient(consumer, entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
-
+ sendToClient(consumer, message, entry.getInstanceProperties(), deliveryTag);
}
-
}
public static ConsumerTarget_0_8 createNoAckTarget(AMQChannel channel,
@@ -131,16 +118,11 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget<Consumer
_txn = new AutoCommitTransaction(channel.getAddressSpace().getMessageStore());
}
- /**
- * This method can be called by each of the publisher threads. As a result all changes to the channel object must be
- * thread safe.
- *
- * @param consumer
- * @param entry The message to send
- * @param batch
- */
@Override
- public void doSend(final MessageInstanceConsumer consumer, MessageInstance entry, boolean batch)
+ protected void doSendInternal(final MessageInstanceConsumer consumer,
+ final MessageInstance entry,
+ final AMQMessage message,
+ final boolean batch)
{
// if we do not need to wait for client acknowledgements
// we can decrement the reference count immediately.
@@ -153,17 +135,15 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget<Consumer
// the message is unacked, it will be lost.
_txn.dequeue(entry.getEnqueueRecord(), NOOP);
- ServerMessage message = entry.getMessage();
- MessageReference ref = message.newReference();
- InstanceProperties props = entry.getInstanceProperties();
- entry.delete();
- getChannel().getConnection().setDeferFlush(batch);
- long deliveryTag = getChannel().getNextDeliveryTag();
-
- sendToClient(consumer, message, props, deliveryTag);
-
- ref.release();
+ try( MessageReference ref = entry.getMessage().newReference())
+ {
+ InstanceProperties props = entry.getInstanceProperties();
+ entry.delete();
+ getChannel().getConnection().setDeferFlush(batch);
+ long deliveryTag = getChannel().getNextDeliveryTag();
+ sendToClient(consumer, message, props, deliveryTag);
+ }
}
private static final ServerTransaction.Action NOOP =
@@ -234,20 +214,13 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget<Consumer
_usesCredit = usesCredit;
}
- /**
- * This method can be called by each of the publisher threads. As a result all changes to the channel object must be
- * thread safe.
- *
- * @param consumer
- * @param entry The message to send
- * @param batch
- */
@Override
- public void doSend(final MessageInstanceConsumer consumer, MessageInstance entry, boolean batch)
+ protected void doSendInternal(final MessageInstanceConsumer consumer,
+ final MessageInstance entry,
+ final AMQMessage message,
+ final boolean batch)
{
-
// put queue entry on a list and then notify the connection to read list.
-
synchronized (getChannel())
{
getChannel().getConnection().setDeferFlush(batch);
@@ -255,17 +228,10 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget<Consumer
addUnacknowledgedMessage(entry);
getChannel().addUnacknowledgedMessage(entry, deliveryTag, consumer, _usesCredit);
- long size = sendToClient(consumer, entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
+ sendToClient(consumer, message, entry.getInstanceProperties(), deliveryTag);
entry.incrementDeliveryCount();
}
-
-
}
-
-
-
-
-
}
private final AMQChannel _channel;
@@ -399,7 +365,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget<Consumer
updateNotifyWorkDesired();
}
- protected long sendToClient(final MessageInstanceConsumer consumer, final ServerMessage message,
+ protected long sendToClient(final MessageInstanceConsumer consumer, final AMQMessage message,
final InstanceProperties props,
final long deliveryTag)
{
@@ -425,6 +391,41 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget<Consumer
}
@Override
+ final protected void doSend(final MessageInstanceConsumer consumer, final MessageInstance entry, final boolean batch)
+ {
+ ServerMessage serverMessage = entry.getMessage();
+ MessageConverter<ServerMessage<?>, AMQMessage> messageConverter = null;
+ final AMQMessage msg;
+ if(serverMessage instanceof AMQMessage)
+ {
+ msg = (AMQMessage) serverMessage;
+ }
+ else
+ {
+ messageConverter = MessageConverterRegistry.getConverter((Class<ServerMessage<?>>) serverMessage.getClass(), AMQMessage.class);
+ msg = messageConverter.convert(serverMessage, getConnection().getAddressSpace());
+ }
+
+ try
+ {
+ doSendInternal(consumer, entry, msg, batch);
+ }
+ finally
+ {
+ if(messageConverter != null)
+ {
+ messageConverter.dispose(msg);
+ }
+ }
+ }
+
+ protected abstract void doSendInternal(final MessageInstanceConsumer consumer,
+ final MessageInstance entry,
+ final AMQMessage message,
+ final boolean batch);
+
+
+ @Override
public void flushBatched()
{
_channel.getConnection().setDeferFlush(false);
@@ -443,23 +444,6 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget<Consumer
_unacknowledgedCount.decrementAndGet();
}
- @Override
- public void acquisitionRemoved(final MessageInstance node)
- {
- }
-
- @Override
- public long getUnacknowledgedBytes()
- {
- return _unacknowledgedBytes.longValue();
- }
-
- @Override
- public long getUnacknowledgedMessages()
- {
- return _unacknowledgedCount.longValue();
- }
-
private final StateChangeListener<MessageInstance, EntryState> _unacknowledgedMessageListener = new StateChangeListener<MessageInstance, EntryState>()
{
@Override
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/10699183/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverter.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverter.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverter.java
index 39b4f6e..36085e4 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverter.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverter.java
@@ -31,18 +31,17 @@ import org.apache.qpid.server.protocol.v0_8.transport.ContentHeaderBody;
import org.apache.qpid.server.protocol.v0_8.transport.MessagePublishInfo;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageContentSource;
-import org.apache.qpid.server.message.ServerMessage;
public interface ProtocolOutputConverter
{
void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag);
- long writeDeliver(final ServerMessage msg,
+ long writeDeliver(final AMQMessage msg,
final InstanceProperties props, int channelId,
long deliveryTag,
AMQShortString consumerTag);
- long writeGetOk(final ServerMessage msg,
+ long writeGetOk(final AMQMessage msg,
final InstanceProperties props,
int channelId,
long deliveryTag,
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/10699183/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
index e1f283a..c099c0d 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
@@ -42,7 +42,6 @@ import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageContentSource;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.plugin.MessageConverter;
-import org.apache.qpid.server.protocol.MessageConverterRegistry;
import org.apache.qpid.server.transport.ByteBufferSender;
import org.apache.qpid.server.util.GZIPUtils;
@@ -61,36 +60,14 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
@Override
- public long writeDeliver(final ServerMessage m,
+ public long writeDeliver(final AMQMessage msg,
final InstanceProperties props, int channelId,
long deliveryTag,
AMQShortString consumerTag)
{
- MessageConverter<ServerMessage, AMQMessage> messageConverter = null;
- final AMQMessage msg;
- if(m instanceof AMQMessage)
- {
- msg = (AMQMessage) m;
- }
- else
- {
- messageConverter = getMessageConverter(m);
- msg = messageConverter.convert(m, _connection.getAddressSpace());
- }
final boolean isRedelivered = Boolean.TRUE.equals(props.getProperty(InstanceProperties.Property.REDELIVERED));
AMQBody deliverBody = createEncodedDeliverBody(msg, isRedelivered, deliveryTag, consumerTag);
- final long result = writeMessageDelivery(msg, channelId, deliverBody);
- if(messageConverter != null)
- {
- messageConverter.dispose(msg);
- }
- return result;
- }
-
- private <M extends ServerMessage> MessageConverter<M, AMQMessage> getMessageConverter(M message)
- {
- Class<M> clazz = (Class<M>) message.getClass();
- return MessageConverterRegistry.getConverter(clazz, AMQMessage.class);
+ return writeMessageDelivery(msg, channelId, deliverBody);
}
private long writeMessageDelivery(AMQMessage message, int channelId, AMQBody deliverBody)
@@ -304,30 +281,14 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
}
@Override
- public long writeGetOk(final ServerMessage msg,
+ public long writeGetOk(final AMQMessage amqMessage,
final InstanceProperties props,
int channelId,
long deliveryTag,
int queueSize)
{
- final AMQMessage amqMessage;
- MessageConverter<ServerMessage, AMQMessage> messageConverter = null;
- if(msg instanceof AMQMessage)
- {
- amqMessage = (AMQMessage) msg;
- }
- else
- {
- messageConverter = getMessageConverter(msg);
- amqMessage = messageConverter.convert(msg, _connection.getAddressSpace());
- }
AMQBody deliver = createEncodedGetOkBody(amqMessage, props, deliveryTag, queueSize);
- final long result = writeMessageDelivery(amqMessage, channelId, deliver);
- if(messageConverter != null)
- {
- messageConverter.dispose(amqMessage);
- }
- return result;
+ return writeMessageDelivery(amqMessage, channelId, deliver);
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/10699183/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index af25173..fc7d334 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -72,8 +72,6 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0>
{
private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerTarget_1_0.class);
private final boolean _acquires;
- private final AtomicLong _unacknowledgedCount = new AtomicLong(0);
- private final AtomicLong _unacknowledgedBytes = new AtomicLong(0);
private long _deliveryTag = 0L;
@@ -373,11 +371,6 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0>
return _linkEndpoint.getSession();
}
- public void flush()
- {
- while(sendNextMessage());
- }
-
private class DispositionAction implements UnsettledAction
{
@@ -655,29 +648,12 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0>
}
@Override
- public void acquisitionRemoved(final MessageInstance node)
- {
- }
-
- @Override
public String getTargetAddress()
{
return _linkEndpoint.getTarget().getAddress();
}
@Override
- public long getUnacknowledgedBytes()
- {
- return _unacknowledgedBytes.get();
- }
-
- @Override
- public long getUnacknowledgedMessages()
- {
- return _unacknowledgedCount.get();
- }
-
- @Override
public String toString()
{
return "ConsumerTarget_1_0[linkSession=" + _linkEndpoint.getSession().toLogString() + "]";
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/10699183/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
----------------------------------------------------------------------
diff --git a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
index e72ca8b..6af70b0 100644
--- a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
+++ b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
@@ -1581,6 +1581,12 @@ class ManagementNode implements MessageSource, MessageDestination, BaseQueue
_consumers.remove(managementNodeConsumer);
}
+ @Override
+ public MessageConversionExceptionHandlingPolicy getMessageConversionExceptionHandlingPolicy()
+ {
+ return MessageConversionExceptionHandlingPolicy.CLOSE;
+ }
+
private AmqpConnectionMetaData getCallerConnectionMetaData()
{
Subject currentSubject = Subject.getSubject(AccessController.getContext());
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/10699183/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java
----------------------------------------------------------------------
diff --git a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java
index 6be1fb3..0e546f1 100644
--- a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java
+++ b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java
@@ -194,6 +194,12 @@ public class ProxyMessageSource implements MessageSource, MessageDestination
return session.getConnectionReference() == _connectionReference;
}
+ @Override
+ public MessageConversionExceptionHandlingPolicy getMessageConversionExceptionHandlingPolicy()
+ {
+ return MessageConversionExceptionHandlingPolicy.CLOSE;
+ }
+
private class WrappingTarget<T extends ConsumerTarget<T>> implements ConsumerTarget<WrappingTarget<T>>
{
private final T _underlying;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org