You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2017/08/15 15:00:16 UTC
qpid-broker-j git commit: QPID-7887: [Java Broker] Add
MessageConversionExceptionHandlingPolicy.REJECT and make it default
Repository: qpid-broker-j
Updated Branches:
refs/heads/master 0406c8734 -> e91c35925
QPID-7887: [Java Broker] Add MessageConversionExceptionHandlingPolicy.REJECT and make it default
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/e91c3592
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/e91c3592
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/e91c3592
Branch: refs/heads/master
Commit: e91c3592574f12ca5fe4c9cfeff9f83fd3407b1b
Parents: 0406c87
Author: Lorenz Quack <lq...@apache.org>
Authored: Mon Aug 14 17:02:42 2017 +0100
Committer: Keith Wall <kw...@apache.org>
Committed: Tue Aug 15 15:59:55 2017 +0100
----------------------------------------------------------------------
.../server/consumer/AbstractConsumerTarget.java | 6 +++
.../qpid/server/message/MessageSource.java | 3 +-
.../org/apache/qpid/server/model/Queue.java | 2 +-
.../consumer/AbstractConsumerTargetTest.java | 44 +++++++++++++++-----
4 files changed, 43 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e91c3592/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java b/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
index aacf022..2ef82cf 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
@@ -316,6 +316,12 @@ public abstract class AbstractConsumerTarget<T extends AbstractConsumerTarget<T>
+ " Message skipped.", entry.getMessage(), mce.getMessage());
}
break;
+ case REJECT:
+ entry.reject(consumer);
+ entry.release(consumer);
+ LOGGER.info("Failed to convert message {} for this consumer because '{}'."
+ + " Message skipped.", entry.getMessage(), mce.getMessage());
+ break;
default:
throw new ServerScopedRuntimeException("Unrecognised policy " + handlingPolicy);
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e91c3592/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java b/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
index 9446be3..2a59526 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
@@ -93,7 +93,8 @@ public interface MessageSource extends TransactionLogResource, MessageNode
enum MessageConversionExceptionHandlingPolicy
{
CLOSE,
- ROUTE_TO_ALTERNATE
+ ROUTE_TO_ALTERNATE,
+ REJECT
}
MessageConversionExceptionHandlingPolicy getMessageConversionExceptionHandlingPolicy();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e91c3592/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
index 3e88324..2561e74 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
@@ -125,7 +125,7 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>,
@ManagedContextDefault( name = MESSAGE_CONVERSION_EXCEPTION_HANDLING_POLICY,
description = "The behaviour of consumer if it tries to consumer a messages that it cannot convert.")
- MessageConversionExceptionHandlingPolicy DEFAULT_MESSAGE_CONVERSION_EXCEPTION_HANDLING_POLICY = MessageConversionExceptionHandlingPolicy.CLOSE;
+ MessageConversionExceptionHandlingPolicy DEFAULT_MESSAGE_CONVERSION_EXCEPTION_HANDLING_POLICY = MessageConversionExceptionHandlingPolicy.REJECT;
@SuppressWarnings("unused")
@ManagedAttribute( defaultValue = "${queue.defaultEnsureNonDestructiveConsumers}" )
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e91c3592/broker-core/src/test/java/org/apache/qpid/server/consumer/AbstractConsumerTargetTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/consumer/AbstractConsumerTargetTest.java b/broker-core/src/test/java/org/apache/qpid/server/consumer/AbstractConsumerTargetTest.java
index 202d5cc..e29458a 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/consumer/AbstractConsumerTargetTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/consumer/AbstractConsumerTargetTest.java
@@ -46,10 +46,10 @@ import org.apache.qpid.test.utils.QpidTestCase;
public class AbstractConsumerTargetTest extends QpidTestCase
{
- TestAbstractConsumerTarget _consumerTarget;
+ private TestAbstractConsumerTarget _consumerTarget;
private Consumer _consumer;
private MessageSource _messageSource;
- AMQPConnection _connection = mock(AMQPConnection.class);
+ private AMQPConnection _connection = mock(AMQPConnection.class);
private MessageInstance _messageInstance;
@Override
@@ -73,8 +73,7 @@ public class AbstractConsumerTargetTest extends QpidTestCase
public void testConversionExceptionPolicyClose() throws Exception
{
- when(_consumer.acquires()).thenReturn(true);
- when(_messageSource.getMessageConversionExceptionHandlingPolicy()).thenReturn(MessageSource.MessageConversionExceptionHandlingPolicy.CLOSE);
+ configureBehaviour(true, MessageSource.MessageConversionExceptionHandlingPolicy.CLOSE);
try
{
@@ -93,8 +92,7 @@ public class AbstractConsumerTargetTest extends QpidTestCase
public void testConversionExceptionPolicyCloseForNonAcquiringConsumer() throws Exception
{
- when(_consumer.acquires()).thenReturn(false);
- when(_messageSource.getMessageConversionExceptionHandlingPolicy()).thenReturn(MessageSource.MessageConversionExceptionHandlingPolicy.CLOSE);
+ configureBehaviour(false, MessageSource.MessageConversionExceptionHandlingPolicy.CLOSE);
try
{
@@ -113,8 +111,7 @@ public class AbstractConsumerTargetTest extends QpidTestCase
public void testConversionExceptionPolicyReroute() throws Exception
{
- when(_consumer.acquires()).thenReturn(true);
- when(_messageSource.getMessageConversionExceptionHandlingPolicy()).thenReturn(MessageSource.MessageConversionExceptionHandlingPolicy.ROUTE_TO_ALTERNATE);
+ configureBehaviour(true, MessageSource.MessageConversionExceptionHandlingPolicy.ROUTE_TO_ALTERNATE);
_consumerTarget.sendNextMessage();
assertTrue("message credit was not restored", _consumerTarget.isCreditRestored());
@@ -123,14 +120,34 @@ public class AbstractConsumerTargetTest extends QpidTestCase
public void testConversionExceptionPolicyRerouteForNonAcquiringConsumer() throws Exception
{
- when(_consumer.acquires()).thenReturn(false);
- when(_messageSource.getMessageConversionExceptionHandlingPolicy()).thenReturn(MessageSource.MessageConversionExceptionHandlingPolicy.ROUTE_TO_ALTERNATE);
+ configureBehaviour(false, MessageSource.MessageConversionExceptionHandlingPolicy.ROUTE_TO_ALTERNATE);
_consumerTarget.sendNextMessage();
assertTrue("message credit was not restored", _consumerTarget.isCreditRestored());
verify(_messageInstance, never()).routeToAlternate(any(Action.class), any(ServerTransaction.class));
}
+ public void testConversionExceptionPolicyReject() throws Exception
+ {
+ configureBehaviour(true, MessageSource.MessageConversionExceptionHandlingPolicy.REJECT);
+
+ _consumerTarget.sendNextMessage();
+
+ assertTrue("message credit was not restored", _consumerTarget.isCreditRestored());
+ verify(_messageInstance).reject(_consumer);
+ verify(_messageInstance).release(_consumer);
+ }
+
+ public void testConversionExceptionPolicyRejectForNonAcquiringConsumer() throws Exception
+ {
+ configureBehaviour(false, MessageSource.MessageConversionExceptionHandlingPolicy.REJECT);
+
+ _consumerTarget.sendNextMessage();
+ assertTrue("message credit was not restored", _consumerTarget.isCreditRestored());
+ verify(_messageInstance).reject(_consumer);
+ verify(_messageInstance).release(_consumer);
+ }
+
public void testConversionExceptionPolicyWhenOwningResourceIsNotMessageSource() throws Exception
{
final TransactionLogResource owningResource = mock(TransactionLogResource.class);
@@ -151,6 +168,13 @@ public class AbstractConsumerTargetTest extends QpidTestCase
verify(_messageInstance, never()).routeToAlternate(any(Action.class), any(ServerTransaction.class));
}
+ private void configureBehaviour(final boolean acquires,
+ final MessageSource.MessageConversionExceptionHandlingPolicy exceptionHandlingPolicy)
+ {
+ when(_consumer.acquires()).thenReturn(acquires);
+ when(_messageSource.getMessageConversionExceptionHandlingPolicy()).thenReturn(exceptionHandlingPolicy);
+ }
+
private class TestAbstractConsumerTarget extends AbstractConsumerTarget<TestAbstractConsumerTarget>
{
private boolean _creditRestored;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org