You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2017/08/15 15:00:16 UTC

qpid-broker-j git commit: QPID-7887: [Java Broker] Add MessageConversionExceptionHandlingPolicy.REJECT and make it default

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 0406c8734 -> e91c35925


QPID-7887: [Java Broker] Add MessageConversionExceptionHandlingPolicy.REJECT and make it default


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/e91c3592
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/e91c3592
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/e91c3592

Branch: refs/heads/master
Commit: e91c3592574f12ca5fe4c9cfeff9f83fd3407b1b
Parents: 0406c87
Author: Lorenz Quack <lq...@apache.org>
Authored: Mon Aug 14 17:02:42 2017 +0100
Committer: Keith Wall <kw...@apache.org>
Committed: Tue Aug 15 15:59:55 2017 +0100

----------------------------------------------------------------------
 .../server/consumer/AbstractConsumerTarget.java |  6 +++
 .../qpid/server/message/MessageSource.java      |  3 +-
 .../org/apache/qpid/server/model/Queue.java     |  2 +-
 .../consumer/AbstractConsumerTargetTest.java    | 44 +++++++++++++++-----
 4 files changed, 43 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e91c3592/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java b/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
index aacf022..2ef82cf 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
@@ -316,6 +316,12 @@ public abstract class AbstractConsumerTarget<T extends AbstractConsumerTarget<T>
                                             + "  Message skipped.", entry.getMessage(), mce.getMessage());
                             }
                             break;
+                        case REJECT:
+                            entry.reject(consumer);
+                            entry.release(consumer);
+                            LOGGER.info("Failed to convert message {} for this consumer because '{}'."
+                                        + "  Message skipped.", entry.getMessage(), mce.getMessage());
+                            break;
                         default:
                             throw new ServerScopedRuntimeException("Unrecognised policy " + handlingPolicy);
                     }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e91c3592/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java b/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
index 9446be3..2a59526 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
@@ -93,7 +93,8 @@ public interface MessageSource extends TransactionLogResource, MessageNode
     enum MessageConversionExceptionHandlingPolicy
     {
         CLOSE,
-        ROUTE_TO_ALTERNATE
+        ROUTE_TO_ALTERNATE,
+        REJECT
     }
 
     MessageConversionExceptionHandlingPolicy getMessageConversionExceptionHandlingPolicy();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e91c3592/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
index 3e88324..2561e74 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
@@ -125,7 +125,7 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>,
 
     @ManagedContextDefault( name = MESSAGE_CONVERSION_EXCEPTION_HANDLING_POLICY,
             description = "The behaviour of consumer if it tries to consumer a messages that it cannot convert.")
-    MessageConversionExceptionHandlingPolicy DEFAULT_MESSAGE_CONVERSION_EXCEPTION_HANDLING_POLICY = MessageConversionExceptionHandlingPolicy.CLOSE;
+    MessageConversionExceptionHandlingPolicy DEFAULT_MESSAGE_CONVERSION_EXCEPTION_HANDLING_POLICY = MessageConversionExceptionHandlingPolicy.REJECT;
 
     @SuppressWarnings("unused")
     @ManagedAttribute( defaultValue = "${queue.defaultEnsureNonDestructiveConsumers}" )

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e91c3592/broker-core/src/test/java/org/apache/qpid/server/consumer/AbstractConsumerTargetTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/consumer/AbstractConsumerTargetTest.java b/broker-core/src/test/java/org/apache/qpid/server/consumer/AbstractConsumerTargetTest.java
index 202d5cc..e29458a 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/consumer/AbstractConsumerTargetTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/consumer/AbstractConsumerTargetTest.java
@@ -46,10 +46,10 @@ import org.apache.qpid.test.utils.QpidTestCase;
 public class AbstractConsumerTargetTest extends QpidTestCase
 {
 
-    TestAbstractConsumerTarget _consumerTarget;
+    private TestAbstractConsumerTarget _consumerTarget;
     private Consumer _consumer;
     private MessageSource _messageSource;
-    AMQPConnection _connection = mock(AMQPConnection.class);
+    private AMQPConnection _connection = mock(AMQPConnection.class);
     private MessageInstance _messageInstance;
 
     @Override
@@ -73,8 +73,7 @@ public class AbstractConsumerTargetTest extends QpidTestCase
 
     public void testConversionExceptionPolicyClose() throws Exception
     {
-        when(_consumer.acquires()).thenReturn(true);
-        when(_messageSource.getMessageConversionExceptionHandlingPolicy()).thenReturn(MessageSource.MessageConversionExceptionHandlingPolicy.CLOSE);
+        configureBehaviour(true, MessageSource.MessageConversionExceptionHandlingPolicy.CLOSE);
 
         try
         {
@@ -93,8 +92,7 @@ public class AbstractConsumerTargetTest extends QpidTestCase
 
     public void testConversionExceptionPolicyCloseForNonAcquiringConsumer() throws Exception
     {
-        when(_consumer.acquires()).thenReturn(false);
-        when(_messageSource.getMessageConversionExceptionHandlingPolicy()).thenReturn(MessageSource.MessageConversionExceptionHandlingPolicy.CLOSE);
+        configureBehaviour(false, MessageSource.MessageConversionExceptionHandlingPolicy.CLOSE);
 
         try
         {
@@ -113,8 +111,7 @@ public class AbstractConsumerTargetTest extends QpidTestCase
 
     public void testConversionExceptionPolicyReroute() throws Exception
     {
-        when(_consumer.acquires()).thenReturn(true);
-        when(_messageSource.getMessageConversionExceptionHandlingPolicy()).thenReturn(MessageSource.MessageConversionExceptionHandlingPolicy.ROUTE_TO_ALTERNATE);
+        configureBehaviour(true, MessageSource.MessageConversionExceptionHandlingPolicy.ROUTE_TO_ALTERNATE);
 
         _consumerTarget.sendNextMessage();
         assertTrue("message credit was not restored", _consumerTarget.isCreditRestored());
@@ -123,14 +120,34 @@ public class AbstractConsumerTargetTest extends QpidTestCase
 
     public void testConversionExceptionPolicyRerouteForNonAcquiringConsumer() throws Exception
     {
-        when(_consumer.acquires()).thenReturn(false);
-        when(_messageSource.getMessageConversionExceptionHandlingPolicy()).thenReturn(MessageSource.MessageConversionExceptionHandlingPolicy.ROUTE_TO_ALTERNATE);
+        configureBehaviour(false, MessageSource.MessageConversionExceptionHandlingPolicy.ROUTE_TO_ALTERNATE);
 
         _consumerTarget.sendNextMessage();
         assertTrue("message credit was not restored", _consumerTarget.isCreditRestored());
         verify(_messageInstance, never()).routeToAlternate(any(Action.class), any(ServerTransaction.class));
     }
 
+    public void testConversionExceptionPolicyReject() throws Exception
+    {
+        configureBehaviour(true, MessageSource.MessageConversionExceptionHandlingPolicy.REJECT);
+
+        _consumerTarget.sendNextMessage();
+
+        assertTrue("message credit was not restored", _consumerTarget.isCreditRestored());
+        verify(_messageInstance).reject(_consumer);
+        verify(_messageInstance).release(_consumer);
+    }
+
+    public void testConversionExceptionPolicyRejectForNonAcquiringConsumer() throws Exception
+    {
+        configureBehaviour(false, MessageSource.MessageConversionExceptionHandlingPolicy.REJECT);
+
+        _consumerTarget.sendNextMessage();
+        assertTrue("message credit was not restored", _consumerTarget.isCreditRestored());
+        verify(_messageInstance).reject(_consumer);
+        verify(_messageInstance).release(_consumer);
+    }
+
     public void testConversionExceptionPolicyWhenOwningResourceIsNotMessageSource() throws Exception
     {
         final TransactionLogResource owningResource = mock(TransactionLogResource.class);
@@ -151,6 +168,13 @@ public class AbstractConsumerTargetTest extends QpidTestCase
         verify(_messageInstance, never()).routeToAlternate(any(Action.class), any(ServerTransaction.class));
     }
 
+    private void configureBehaviour(final boolean acquires,
+                                    final MessageSource.MessageConversionExceptionHandlingPolicy exceptionHandlingPolicy)
+    {
+        when(_consumer.acquires()).thenReturn(acquires);
+        when(_messageSource.getMessageConversionExceptionHandlingPolicy()).thenReturn(exceptionHandlingPolicy);
+    }
+
     private class TestAbstractConsumerTarget extends AbstractConsumerTarget<TestAbstractConsumerTarget>
     {
         private boolean _creditRestored;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org