You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2019/11/25 12:10:30 UTC

[qpid-jms] branch master updated: QPIDJMS-481: improve handling of transport/decode errors and better signal failure cause

This is an automated email from the ASF dual-hosted git repository.

robbie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-jms.git


The following commit(s) were added to refs/heads/master by this push:
     new c4ab27d  QPIDJMS-481: improve handling of transport/decode errors and better signal failure cause
c4ab27d is described below

commit c4ab27d2b92c8d9156e138bd7486a9088165ddcb
Author: Robbie Gemmell <ro...@apache.org>
AuthorDate: Mon Nov 25 12:09:24 2019 +0000

    QPIDJMS-481: improve handling of transport/decode errors and better signal failure cause
---
 .../qpid/jms/provider/amqp/AmqpProvider.java       |  35 ++++
 .../integration/TransactionsIntegrationTest.java   |  54 ++++++
 .../jms/test/testpeer/matchers/ErrorMatcher.java   | 192 +++++++++++++++++++++
 3 files changed, 281 insertions(+)

diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index 718019b..8880e3a 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -77,6 +77,8 @@ import org.apache.qpid.jms.transports.Transport;
 import org.apache.qpid.jms.transports.TransportListener;
 import org.apache.qpid.jms.util.PropertyUtil;
 import org.apache.qpid.jms.util.QpidJMSThreadFactory;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
 import org.apache.qpid.proton.engine.Collector;
 import org.apache.qpid.proton.engine.Connection;
 import org.apache.qpid.proton.engine.Delivery;
@@ -153,6 +155,7 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
         org.apache.qpid.proton.engine.Transport.Factory.create();
     private final Collector protonCollector = new CollectorImpl();
     private final Connection protonConnection = Connection.Factory.create();
+    private boolean protonTransportErrorHandled;
 
     private final ProviderFutureFactory futureFactory;
     private AsyncResult connectionRequest;
@@ -850,6 +853,11 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
                 TRACE_BYTES.info("Received: {}", ByteBufUtil.hexDump(input));
             }
 
+            if(protonTransportErrorHandled) {
+                LOG.trace("Skipping data processing, proton transport previously errored.");
+                return;
+            }
+
             do {
                 ByteBuffer buffer = protonTransport.tail();
                 int chunkSize = Math.min(buffer.remaining(), input.readableBytes());
@@ -1001,6 +1009,17 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
                             amqpEventSink.processDeliveryUpdates(this, (Delivery) protonEvent.getContext());
                         }
                         break;
+                    case TRANSPORT_ERROR:
+                        // We handle authentication failure elsewhere, but in doing so we close the transport
+                        // head which would also get us here, so only action this if auth succeeded.
+                        if(authenticator == null || (authenticator.isComplete() && authenticator.wasSuccessful())) {
+                            protonTransportErrorHandled = true;
+                            ErrorCondition transportCondition = protonTransport.getCondition();
+                            String message = extractTransportErrorMessage(transportCondition);
+
+                            throw new ProviderFailedException(message);
+                        }
+                        break;
                     default:
                         break;
                 }
@@ -1016,6 +1035,22 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
         }
     }
 
+    private static String extractTransportErrorMessage(ErrorCondition errorCondition) {
+        String message = "Error without description from proton Transport";
+        if (errorCondition != null) {
+            if (errorCondition.getDescription() != null && !errorCondition.getDescription().isEmpty()) {
+                message = "Error in proton Transport: " + errorCondition.getDescription();
+            }
+
+            Symbol condition = errorCondition.getCondition();
+            if (condition != null) {
+                message = message + " [condition = " + condition + "]";
+            }
+        }
+
+        return message;
+    }
+
     protected boolean pumpToProtonTransport() {
         return pumpToProtonTransport(NOOP_REQUEST, true);
     }
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
index 39d8f00..42dbada 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
@@ -34,6 +34,7 @@ import java.util.ArrayDeque;
 import java.util.Deque;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 import javax.jms.Connection;
 import javax.jms.JMSException;
@@ -48,6 +49,7 @@ import javax.jms.TransactionRolledBackException;
 import org.apache.qpid.jms.JmsConnection;
 import org.apache.qpid.jms.JmsDefaultConnectionListener;
 import org.apache.qpid.jms.JmsOperationTimedOutException;
+import org.apache.qpid.jms.exceptions.JmsConnectionFailedException;
 import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
 import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy;
 import org.apache.qpid.jms.test.QpidJmsTestCase;
@@ -77,6 +79,7 @@ import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.DescribedType;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.hamcrest.MatcherAssert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.slf4j.Logger;
@@ -1683,4 +1686,55 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
             testPeer.waitForAllHandlersToComplete(3000);
         }
     }
+
+    @Test(timeout=20000)
+    public void testTransactionDeclaredDispositionWithoutTxnId() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            AtomicReference<JMSException> failure = new AtomicReference<>();
+            CountDownLatch exceptionListenerFired = new CountDownLatch(1);
+            connection.setExceptionListener(jmse -> {
+                failure.compareAndSet(null, jmse);
+                exceptionListenerFired.countDown();
+            });
+
+            testPeer.expectBegin();
+            testPeer.expectCoordinatorAttach();
+
+            // Expect declare, reply declared but without a txn-id, which is illegal.
+            testPeer.expectDeclare(null);
+
+            // TODO: swap this in for below after PROTON-2142 fix is available:
+            // ErrorMatcher errorMatcher = new ErrorMatcher()
+            //         .withCondition(equalTo(AmqpError.DECODE_ERROR))
+            //         .withDescription(equalTo("The txn-id field cannot be omitted"));
+            // testPeer.expectClose(errorMatcher, false);
+            testPeer.expectClose(false);
+            testPeer.setSuppressReadExceptionOnClose(true);
+
+            try {
+                connection.createSession(true, Session.SESSION_TRANSACTED);
+                fail("expected exception to be thrown");
+            } catch (JMSException e) {
+                // Expected
+            }
+
+            assertTrue("The ExceptionListener should have been alerted", exceptionListenerFired.await(3, TimeUnit.SECONDS));
+            JMSException ex = failure.get();
+            assertTrue("Unexpected exception type: " + ex, ex instanceof JmsConnectionFailedException);
+
+            // TODO: swap this in for below after PROTON-2142 fix is available:
+            // MatcherAssert.assertThat("Unexpected exception type: ", ex.getMessage(),
+            //         equalTo("The JMS connection has failed: Error in proton Transport: The txn-id field cannot be omitted [condition = amqp:decode-error]"));
+            MatcherAssert.assertThat("Unexpected exception type: ", ex.getMessage(),
+                    equalTo("The JMS connection has failed: Error in proton Transport: org.apache.qpid.proton.engine.TransportException: "
+                            + "org.apache.qpid.proton.codec.DecodeException: The txn-id field cannot be omitted [condition = amqp:connection:framing-error]"));
+
+            testPeer.waitForAllHandlersToComplete(1000);
+
+            connection.close(); // Already nuked under the covers due to txn-id being missing
+        }
+    }
 }
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/ErrorMatcher.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/ErrorMatcher.java
new file mode 100644
index 0000000..4f90c72
--- /dev/null
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/ErrorMatcher.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.jms.test.testpeer.matchers;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.instanceOf;
+
+import java.util.List;
+
+import org.apache.qpid.proton.amqp.DescribedType;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.UnsignedLong;
+import org.apache.qpid.jms.test.testpeer.AbstractFieldAndDescriptorMatcher;
+import org.hamcrest.Matcher;
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
+
+public class ErrorMatcher extends TypeSafeMatcher<Object>
+{
+    private ErrorMatcherCore coreMatcher = new ErrorMatcherCore();
+    private String mismatchTextAddition;
+    private Object described;
+    private Object descriptor;
+
+    public ErrorMatcher()
+    {
+    }
+
+    @Override
+    protected boolean matchesSafely(Object received)
+    {
+        try
+        {
+            assertThat(received, instanceOf(DescribedType.class));
+            descriptor = ((DescribedType)received).getDescriptor();
+            if(!coreMatcher.descriptorMatches(descriptor))
+            {
+                mismatchTextAddition = "Descriptor mismatch";
+                return false;
+            }
+
+            described = ((DescribedType)received).getDescribed();
+            assertThat(described, instanceOf(List.class));
+            @SuppressWarnings("unchecked")
+            List<Object> fields = (List<Object>) described;
+
+            coreMatcher.verifyFields(fields);
+        }
+        catch (AssertionError ae)
+        {
+            mismatchTextAddition = "AssertionFailure: " + ae.getMessage();
+            return false;
+        }
+
+        return true;
+    }
+
+    @Override
+    protected void describeMismatchSafely(Object item, Description mismatchDescription)
+    {
+        mismatchDescription.appendText("\nActual form: ").appendValue(item);
+
+        mismatchDescription.appendText("\nExpected descriptor: ")
+                .appendValue(coreMatcher.getSymbolicDescriptor())
+                .appendText(" / ")
+                .appendValue(coreMatcher.getNumericDescriptor());
+
+        if(mismatchTextAddition != null)
+        {
+            mismatchDescription.appendText("\nAdditional info: ").appendValue(mismatchTextAddition);
+        }
+    }
+
+    public void describeTo(Description description)
+    {
+        description
+            .appendText("Modified which matches: ")
+            .appendValue(coreMatcher.getMatchers());
+    }
+
+
+    public ErrorMatcher withCondition(Matcher<?> m)
+    {
+        coreMatcher.withCondition(m);
+        return this;
+    }
+
+    public ErrorMatcher withDescription(Matcher<?> m)
+    {
+        coreMatcher.withDescription(m);
+        return this;
+    }
+
+    public ErrorMatcher withInfo(Matcher<?> m)
+    {
+        coreMatcher.withInfo(m);
+        return this;
+    }
+
+    public Object getReceivedCondition()
+    {
+        return coreMatcher.getReceivedCondition();
+    }
+
+    public Object getReceivedDescription()
+    {
+        return coreMatcher.getReceivedDescription();
+    }
+
+    public Object getReceivedInfo()
+    {
+        return coreMatcher.getReceivedInfo();
+    }
+
+
+
+    //Inner core matching class
+    public static class ErrorMatcherCore extends AbstractFieldAndDescriptorMatcher
+    {
+        /** Note that the ordinals of the Field enums match the order specified in the AMQP spec */
+        public enum Field
+        {
+            CONDITION,
+            DESCRIPTION,
+            INFO
+        }
+
+        public ErrorMatcherCore()
+        {
+            super(UnsignedLong.valueOf(0x000000000000001DL),
+                  Symbol.valueOf("amqp:error:list"));
+        }
+
+
+        public ErrorMatcherCore withCondition(Matcher<?> m)
+        {
+            getMatchers().put(Field.CONDITION, m);
+            return this;
+        }
+
+        public ErrorMatcherCore withDescription(Matcher<?> m)
+        {
+            getMatchers().put(Field.DESCRIPTION, m);
+            return this;
+        }
+
+        public ErrorMatcherCore withInfo(Matcher<?> m)
+        {
+            getMatchers().put(Field.INFO, m);
+            return this;
+        }
+
+        public Object getReceivedCondition()
+        {
+            return getReceivedFields().get(Field.CONDITION);
+        }
+
+        public Object getReceivedDescription()
+        {
+            return getReceivedFields().get(Field.DESCRIPTION);
+        }
+
+        public Object getReceivedInfo()
+        {
+            return getReceivedFields().get(Field.INFO);
+        }
+
+        @Override
+        protected Enum<?> getField(int fieldIndex)
+        {
+            return Field.values()[fieldIndex];
+        }
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org