You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2019/11/25 12:10:30 UTC
[qpid-jms] branch master updated: QPIDJMS-481: improve handling of
transport/decode errors and better signal failure cause
This is an automated email from the ASF dual-hosted git repository.
robbie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-jms.git
The following commit(s) were added to refs/heads/master by this push:
new c4ab27d QPIDJMS-481: improve handling of transport/decode errors and better signal failure cause
c4ab27d is described below
commit c4ab27d2b92c8d9156e138bd7486a9088165ddcb
Author: Robbie Gemmell <ro...@apache.org>
AuthorDate: Mon Nov 25 12:09:24 2019 +0000
QPIDJMS-481: improve handling of transport/decode errors and better signal failure cause
---
.../qpid/jms/provider/amqp/AmqpProvider.java | 35 ++++
.../integration/TransactionsIntegrationTest.java | 54 ++++++
.../jms/test/testpeer/matchers/ErrorMatcher.java | 192 +++++++++++++++++++++
3 files changed, 281 insertions(+)
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index 718019b..8880e3a 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -77,6 +77,8 @@ import org.apache.qpid.jms.transports.Transport;
import org.apache.qpid.jms.transports.TransportListener;
import org.apache.qpid.jms.util.PropertyUtil;
import org.apache.qpid.jms.util.QpidJMSThreadFactory;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Collector;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Delivery;
@@ -153,6 +155,7 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
org.apache.qpid.proton.engine.Transport.Factory.create();
private final Collector protonCollector = new CollectorImpl();
private final Connection protonConnection = Connection.Factory.create();
+ private boolean protonTransportErrorHandled;
private final ProviderFutureFactory futureFactory;
private AsyncResult connectionRequest;
@@ -850,6 +853,11 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
TRACE_BYTES.info("Received: {}", ByteBufUtil.hexDump(input));
}
+ if(protonTransportErrorHandled) {
+ LOG.trace("Skipping data processing, proton transport previously errored.");
+ return;
+ }
+
do {
ByteBuffer buffer = protonTransport.tail();
int chunkSize = Math.min(buffer.remaining(), input.readableBytes());
@@ -1001,6 +1009,17 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
amqpEventSink.processDeliveryUpdates(this, (Delivery) protonEvent.getContext());
}
break;
+ case TRANSPORT_ERROR:
+ // We handle authentication failure elsewhere, but in doing so we close the transport
+ // head which would also get us here, so only action this if auth succeeded.
+ if(authenticator == null || (authenticator.isComplete() && authenticator.wasSuccessful())) {
+ protonTransportErrorHandled = true;
+ ErrorCondition transportCondition = protonTransport.getCondition();
+ String message = extractTransportErrorMessage(transportCondition);
+
+ throw new ProviderFailedException(message);
+ }
+ break;
default:
break;
}
@@ -1016,6 +1035,22 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
}
}
+ private static String extractTransportErrorMessage(ErrorCondition errorCondition) {
+ String message = "Error without description from proton Transport";
+ if (errorCondition != null) {
+ if (errorCondition.getDescription() != null && !errorCondition.getDescription().isEmpty()) {
+ message = "Error in proton Transport: " + errorCondition.getDescription();
+ }
+
+ Symbol condition = errorCondition.getCondition();
+ if (condition != null) {
+ message = message + " [condition = " + condition + "]";
+ }
+ }
+
+ return message;
+ }
+
protected boolean pumpToProtonTransport() {
return pumpToProtonTransport(NOOP_REQUEST, true);
}
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
index 39d8f00..42dbada 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
@@ -34,6 +34,7 @@ import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import javax.jms.Connection;
import javax.jms.JMSException;
@@ -48,6 +49,7 @@ import javax.jms.TransactionRolledBackException;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsDefaultConnectionListener;
import org.apache.qpid.jms.JmsOperationTimedOutException;
+import org.apache.qpid.jms.exceptions.JmsConnectionFailedException;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy;
import org.apache.qpid.jms.test.QpidJmsTestCase;
@@ -77,6 +79,7 @@ import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.DescribedType;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.hamcrest.MatcherAssert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
@@ -1683,4 +1686,55 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase {
testPeer.waitForAllHandlersToComplete(3000);
}
}
+
+ @Test(timeout=20000)
+ public void testTransactionDeclaredDispositionWithoutTxnId() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer);
+ connection.start();
+
+ AtomicReference<JMSException> failure = new AtomicReference<>();
+ CountDownLatch exceptionListenerFired = new CountDownLatch(1);
+ connection.setExceptionListener(jmse -> {
+ failure.compareAndSet(null, jmse);
+ exceptionListenerFired.countDown();
+ });
+
+ testPeer.expectBegin();
+ testPeer.expectCoordinatorAttach();
+
+ // Expect declare, reply declared but without a txn-id, which is illegal.
+ testPeer.expectDeclare(null);
+
+ // TODO: swap this in for below after PROTON-2142 fix is available:
+ // ErrorMatcher errorMatcher = new ErrorMatcher()
+ // .withCondition(equalTo(AmqpError.DECODE_ERROR))
+ // .withDescription(equalTo("The txn-id field cannot be omitted"));
+ // testPeer.expectClose(errorMatcher, false);
+ testPeer.expectClose(false);
+ testPeer.setSuppressReadExceptionOnClose(true);
+
+ try {
+ connection.createSession(true, Session.SESSION_TRANSACTED);
+ fail("expected exception to be thrown");
+ } catch (JMSException e) {
+ // Expected
+ }
+
+ assertTrue("The ExceptionListener should have been alerted", exceptionListenerFired.await(3, TimeUnit.SECONDS));
+ JMSException ex = failure.get();
+ assertTrue("Unexpected exception type: " + ex, ex instanceof JmsConnectionFailedException);
+
+ // TODO: swap this in for below after PROTON-2142 fix is available:
+ // MatcherAssert.assertThat("Unexpected exception type: ", ex.getMessage(),
+ // equalTo("The JMS connection has failed: Error in proton Transport: The txn-id field cannot be omitted [condition = amqp:decode-error]"));
+ MatcherAssert.assertThat("Unexpected exception type: ", ex.getMessage(),
+ equalTo("The JMS connection has failed: Error in proton Transport: org.apache.qpid.proton.engine.TransportException: "
+ + "org.apache.qpid.proton.codec.DecodeException: The txn-id field cannot be omitted [condition = amqp:connection:framing-error]"));
+
+ testPeer.waitForAllHandlersToComplete(1000);
+
+ connection.close(); // Already nuked under the covers due to txn-id being missing
+ }
+ }
}
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/ErrorMatcher.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/ErrorMatcher.java
new file mode 100644
index 0000000..4f90c72
--- /dev/null
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/ErrorMatcher.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.jms.test.testpeer.matchers;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.instanceOf;
+
+import java.util.List;
+
+import org.apache.qpid.proton.amqp.DescribedType;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.UnsignedLong;
+import org.apache.qpid.jms.test.testpeer.AbstractFieldAndDescriptorMatcher;
+import org.hamcrest.Matcher;
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
+
+public class ErrorMatcher extends TypeSafeMatcher<Object>
+{
+ private ErrorMatcherCore coreMatcher = new ErrorMatcherCore();
+ private String mismatchTextAddition;
+ private Object described;
+ private Object descriptor;
+
+ public ErrorMatcher()
+ {
+ }
+
+ @Override
+ protected boolean matchesSafely(Object received)
+ {
+ try
+ {
+ assertThat(received, instanceOf(DescribedType.class));
+ descriptor = ((DescribedType)received).getDescriptor();
+ if(!coreMatcher.descriptorMatches(descriptor))
+ {
+ mismatchTextAddition = "Descriptor mismatch";
+ return false;
+ }
+
+ described = ((DescribedType)received).getDescribed();
+ assertThat(described, instanceOf(List.class));
+ @SuppressWarnings("unchecked")
+ List<Object> fields = (List<Object>) described;
+
+ coreMatcher.verifyFields(fields);
+ }
+ catch (AssertionError ae)
+ {
+ mismatchTextAddition = "AssertionFailure: " + ae.getMessage();
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ protected void describeMismatchSafely(Object item, Description mismatchDescription)
+ {
+ mismatchDescription.appendText("\nActual form: ").appendValue(item);
+
+ mismatchDescription.appendText("\nExpected descriptor: ")
+ .appendValue(coreMatcher.getSymbolicDescriptor())
+ .appendText(" / ")
+ .appendValue(coreMatcher.getNumericDescriptor());
+
+ if(mismatchTextAddition != null)
+ {
+ mismatchDescription.appendText("\nAdditional info: ").appendValue(mismatchTextAddition);
+ }
+ }
+
+ public void describeTo(Description description)
+ {
+ description
+ .appendText("Modified which matches: ")
+ .appendValue(coreMatcher.getMatchers());
+ }
+
+
+ public ErrorMatcher withCondition(Matcher<?> m)
+ {
+ coreMatcher.withCondition(m);
+ return this;
+ }
+
+ public ErrorMatcher withDescription(Matcher<?> m)
+ {
+ coreMatcher.withDescription(m);
+ return this;
+ }
+
+ public ErrorMatcher withInfo(Matcher<?> m)
+ {
+ coreMatcher.withInfo(m);
+ return this;
+ }
+
+ public Object getReceivedCondition()
+ {
+ return coreMatcher.getReceivedCondition();
+ }
+
+ public Object getReceivedDescription()
+ {
+ return coreMatcher.getReceivedDescription();
+ }
+
+ public Object getReceivedInfo()
+ {
+ return coreMatcher.getReceivedInfo();
+ }
+
+
+
+ //Inner core matching class
+ public static class ErrorMatcherCore extends AbstractFieldAndDescriptorMatcher
+ {
+ /** Note that the ordinals of the Field enums match the order specified in the AMQP spec */
+ public enum Field
+ {
+ CONDITION,
+ DESCRIPTION,
+ INFO
+ }
+
+ public ErrorMatcherCore()
+ {
+ super(UnsignedLong.valueOf(0x000000000000001DL),
+ Symbol.valueOf("amqp:error:list"));
+ }
+
+
+ public ErrorMatcherCore withCondition(Matcher<?> m)
+ {
+ getMatchers().put(Field.CONDITION, m);
+ return this;
+ }
+
+ public ErrorMatcherCore withDescription(Matcher<?> m)
+ {
+ getMatchers().put(Field.DESCRIPTION, m);
+ return this;
+ }
+
+ public ErrorMatcherCore withInfo(Matcher<?> m)
+ {
+ getMatchers().put(Field.INFO, m);
+ return this;
+ }
+
+ public Object getReceivedCondition()
+ {
+ return getReceivedFields().get(Field.CONDITION);
+ }
+
+ public Object getReceivedDescription()
+ {
+ return getReceivedFields().get(Field.DESCRIPTION);
+ }
+
+ public Object getReceivedInfo()
+ {
+ return getReceivedFields().get(Field.INFO);
+ }
+
+ @Override
+ protected Enum<?> getField(int fieldIndex)
+ {
+ return Field.values()[fieldIndex];
+ }
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org