You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2019/11/22 17:44:55 UTC
[qpid-proton-j] branch master updated: PROTON-2142: use transport
error condition in close frame if connection error not set
This is an automated email from the ASF dual-hosted git repository.
robbie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton-j.git
The following commit(s) were added to refs/heads/master by this push:
new 3ac1200 PROTON-2142: use transport error condition in close frame if connection error not set
3ac1200 is described below
commit 3ac12001c67fd8746c0000d2b3b9153e35d8d24a
Author: Robbie Gemmell <ro...@apache.org>
AuthorDate: Fri Nov 22 17:40:53 2019 +0000
PROTON-2142: use transport error condition in close frame if connection error not set
---
.../proton/engine/TransportDecodeException.java | 36 +++++
.../qpid/proton/engine/impl/FrameParser.java | 5 +-
.../qpid/proton/engine/impl/SaslFrameParser.java | 5 +-
.../qpid/proton/engine/impl/TransportImpl.java | 28 +++-
.../qpid/proton/engine/impl/FrameParserTest.java | 1 +
.../qpid/proton/engine/impl/TransportImplTest.java | 177 ++++++++++++++++++++-
6 files changed, 239 insertions(+), 13 deletions(-)
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/TransportDecodeException.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/TransportDecodeException.java
new file mode 100644
index 0000000..47b7d8b
--- /dev/null
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/TransportDecodeException.java
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.engine;
+
+import java.util.Objects;
+
+import org.apache.qpid.proton.codec.DecodeException;
+import org.apache.qpid.proton.engine.TransportException;
+
+public class TransportDecodeException extends TransportException
+{
+ public TransportDecodeException(String message, DecodeException cause)
+ {
+ super(message, cause);
+ Objects.requireNonNull(message, "A message must be provided");
+ }
+}
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java
index ce4283e..d0df499 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java
@@ -34,6 +34,7 @@ import org.apache.qpid.proton.amqp.transport.FrameBody;
import org.apache.qpid.proton.codec.ByteBufferDecoder;
import org.apache.qpid.proton.codec.DecodeException;
import org.apache.qpid.proton.engine.Transport;
+import org.apache.qpid.proton.engine.TransportDecodeException;
import org.apache.qpid.proton.engine.TransportException;
import org.apache.qpid.proton.framing.TransportFrame;
@@ -445,7 +446,9 @@ class FrameParser implements TransportInput
catch (DecodeException ex)
{
state = State.ERROR;
- frameParsingError = new TransportException(ex);
+ String message = ex.getMessage() == null ? ex.toString(): ex.getMessage();
+
+ frameParsingError = new TransportDecodeException(message, ex);
}
break;
case ERROR:
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslFrameParser.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslFrameParser.java
index 141ec31..72e23c0 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslFrameParser.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslFrameParser.java
@@ -29,6 +29,7 @@ import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.security.SaslFrameBody;
import org.apache.qpid.proton.codec.ByteBufferDecoder;
import org.apache.qpid.proton.codec.DecodeException;
+import org.apache.qpid.proton.engine.TransportDecodeException;
import org.apache.qpid.proton.engine.TransportException;
class SaslFrameParser
@@ -390,7 +391,9 @@ class SaslFrameParser
catch (DecodeException ex)
{
state = State.ERROR;
- frameParsingError = new TransportException(ex);
+ String message = ex.getMessage() == null ? ex.toString(): ex.getMessage();
+
+ frameParsingError = new TransportDecodeException(message, ex);
}
break;
case ERROR:
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
index d8bac2d..7307aff 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
@@ -31,6 +31,7 @@ import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.apache.qpid.proton.amqp.UnsignedShort;
import org.apache.qpid.proton.amqp.security.SaslFrameBody;
+import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.Attach;
import org.apache.qpid.proton.amqp.transport.Begin;
import org.apache.qpid.proton.amqp.transport.Close;
@@ -57,6 +58,7 @@ import org.apache.qpid.proton.engine.Sasl;
import org.apache.qpid.proton.engine.Ssl;
import org.apache.qpid.proton.engine.SslDomain;
import org.apache.qpid.proton.engine.SslPeerDetails;
+import org.apache.qpid.proton.engine.TransportDecodeException;
import org.apache.qpid.proton.engine.TransportException;
import org.apache.qpid.proton.engine.TransportResult;
import org.apache.qpid.proton.engine.TransportResultFactory;
@@ -260,14 +262,18 @@ public class TransportImpl extends EndpointImpl
// condition had been set (by TransportImpl itself) rather than the 'empty' ErrorCondition
// object historically used in the other areas.
ErrorCondition errorCondition = super.getCondition();
- return errorCondition.getCondition() != null ? errorCondition : null;
+ return isConditionPopulated(errorCondition) ? errorCondition : null;
}
@Override
public void setCondition(ErrorCondition error)
{
super.setCondition(error);
- _conditionSet = error != null && error.getCondition() != null;
+ _conditionSet = isConditionPopulated(error);
+ }
+
+ private boolean isConditionPopulated(ErrorCondition error) {
+ return error != null && error.getCondition() != null;
}
@Override
@@ -1097,13 +1103,14 @@ public class TransportImpl extends EndpointImpl
ErrorCondition localError;
- if (_connectionEndpoint == null) {
- localError = getCondition();
- } else {
+ if (_connectionEndpoint != null && isConditionPopulated(_connectionEndpoint.getCondition())) {
localError = _connectionEndpoint.getCondition();
+ } else {
+ // Connection is null or didn't have a condition set, use transport condition.
+ localError = getCondition();
}
- if(localError != null && localError.getCondition() != null)
+ if(isConditionPopulated(localError))
{
close.setError(localError);
}
@@ -1466,8 +1473,13 @@ public class TransportImpl extends EndpointImpl
if (!_closeReceived || error != null) {
// Set an error condition, but only if one was not already set
if(!_conditionSet) {
- String description = error == null ? "connection aborted" : error.toString();
- setCondition(new ErrorCondition(ConnectionError.FRAMING_ERROR, description));
+ if(error instanceof TransportDecodeException) {
+ setCondition(new ErrorCondition(AmqpError.DECODE_ERROR, error.getMessage()));
+ } else {
+ String description = error == null ? "connection aborted" : error.toString();
+
+ setCondition(new ErrorCondition(ConnectionError.FRAMING_ERROR, description));
+ }
}
_head_closed = true;
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/FrameParserTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/FrameParserTest.java
index 3b0e36b..e03273f 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/FrameParserTest.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/FrameParserTest.java
@@ -70,6 +70,7 @@ public class FrameParserTest
ByteBuffer buffer = _frameParser.tail();
buffer.put("hello".getBytes());
_frameParser.process();
+ verify(_mockFrameHandler).closed(any(TransportException.class));
assertEquals(_frameParser.capacity(), Transport.END_OF_STREAM);
}
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
index 4584cdb..bd7e569 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
@@ -49,6 +49,7 @@ import org.apache.qpid.proton.amqp.UnsignedShort;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Released;
+import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.Attach;
import org.apache.qpid.proton.amqp.transport.Begin;
import org.apache.qpid.proton.amqp.transport.Close;
@@ -3612,6 +3613,8 @@ public class TransportImplTest
assertNotNull("Expected an ErrorCondition to be returned", transport.getCondition());
assertEquals("Unexpected ErrorCondition returned", origErrorCondition, transport.getCondition());
+ // ---------------------------------------------------------------- //
+
// Set an error condition, then call 'closed' without an error.
// Expect the original condition which was set to remain.
transport = new TransportImpl();
@@ -3622,15 +3625,18 @@ public class TransportImplTest
assertNotNull("Expected an ErrorCondition to be returned", transport.getCondition());
assertEquals("Unexpected ErrorCondition returned", origErrorCondition, transport.getCondition());
+ // ---------------------------------------------------------------- //
+
// Without having set an error condition, call 'closed' specifying an error.
// Expect a condition to be set.
- String errorDescription = "some-error-description";
transport = new TransportImpl();
- transport.closed(new TransportException(errorDescription));
+ transport.closed(new TransportException(description));
assertNotNull("Expected an ErrorCondition to be returned", transport.getCondition());
assertEquals("Unexpected condition returned", ConnectionError.FRAMING_ERROR, transport.getCondition().getCondition());
- assertEquals("Unexpected description returned", "org.apache.qpid.proton.engine.TransportException: " + errorDescription, transport.getCondition().getDescription());
+ assertEquals("Unexpected description returned", "org.apache.qpid.proton.engine.TransportException: " + description, transport.getCondition().getDescription());
+
+ // ---------------------------------------------------------------- //
// Without having set an error condition, call 'closed' without an error.
// Expect a condition to be set.
@@ -3644,6 +3650,171 @@ public class TransportImplTest
}
@Test
+ public void testCloseFrameErrorAfterTransportClosed() {
+ MockTransportImpl transport = new MockTransportImpl();
+ Connection connection = Proton.connection();
+ prepareAndOpenConnection(transport, connection);
+
+ Symbol condition = Symbol.getSymbol("some-error");
+ String description = "some-error-description";
+ ErrorCondition origErrorCondition = new ErrorCondition();
+ origErrorCondition.setCondition(condition);
+ origErrorCondition.setDescription(description);
+ assertNotNull("Expected a Condition", origErrorCondition.getCondition());
+
+ // Set an error condition, then call 'closed' specifying an error.
+ // Expect the original condition which was set to be emitted
+ // in the close frame generated.
+
+ transport.setCondition(origErrorCondition);
+ transport.closed(new TransportException("my-ignored-exception"));
+
+ pumpMockTransport(transport);
+
+ assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 2, transport.writes.size());
+ FrameBody frameBody = transport.writes.get(1);
+ assertTrue("Unexpected frame type", frameBody instanceof Close);
+ assertEquals("Unexpected condition", origErrorCondition, ((Close) frameBody).getError());
+
+ // ---------------------------------------------------------------- //
+
+ // Set an error condition, then call 'closed' without an error.
+ // Expect the original condition which was set to be emitted
+ // in the close frame generated.
+
+ transport = new MockTransportImpl();
+ connection = Proton.connection();
+ prepareAndOpenConnection(transport, connection);
+
+ transport.setCondition(origErrorCondition);
+ transport.closed(null);
+
+ pumpMockTransport(transport);
+
+ assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 2, transport.writes.size());
+ frameBody = transport.writes.get(1);
+ assertTrue("Unexpected frame type", frameBody instanceof Close);
+ assertEquals("Unexpected condition", origErrorCondition, ((Close) frameBody).getError());
+
+ // ---------------------------------------------------------------- //
+
+ // Without having set an error condition, call 'closed' specifying an error.
+ // Expect a condition to be emitted in the close frame generated.
+ transport = new MockTransportImpl();
+ connection = Proton.connection();
+ prepareAndOpenConnection(transport, connection);
+
+ transport.closed(new TransportException(description));
+
+ pumpMockTransport(transport);
+
+ assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 2, transport.writes.size());
+ frameBody = transport.writes.get(1);
+ assertTrue("Unexpected frame type", frameBody instanceof Close);
+ ErrorCondition expectedCondition = new ErrorCondition(ConnectionError.FRAMING_ERROR, "org.apache.qpid.proton.engine.TransportException: " + description);
+ assertEquals("Unexpected condition", expectedCondition, ((Close) frameBody).getError());
+
+ // ---------------------------------------------------------------- //
+
+ // Without having set an error condition, call 'closed' without an error.
+ // Expect a condition to be emitted in the close frame generated.
+ transport = new MockTransportImpl();
+ connection = Proton.connection();
+ prepareAndOpenConnection(transport, connection);
+
+ transport.closed(null);
+
+ pumpMockTransport(transport);
+
+ assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 2, transport.writes.size());
+ frameBody = transport.writes.get(1);
+ assertTrue("Unexpected frame type", frameBody instanceof Close);
+ expectedCondition = new ErrorCondition(ConnectionError.FRAMING_ERROR, "connection aborted");
+ assertEquals("Unexpected condition", expectedCondition, ((Close) frameBody).getError());
+
+ // ---------------------------------------------------------------- //
+
+ // Without having set an error condition on the transport, call 'closed' with an error,
+ // but then also set a condition on the connection, and expect the connection error
+ // condition to be emitted in the close frame generated.
+ transport = new MockTransportImpl();
+ connection = Proton.connection();
+ prepareAndOpenConnection(transport, connection);
+
+ transport.closed(new TransportException("some other transport exception"));
+ connection.setCondition(origErrorCondition);
+
+ pumpMockTransport(transport);
+
+ assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 2, transport.writes.size());
+ frameBody = transport.writes.get(1);
+ assertTrue("Unexpected frame type", frameBody instanceof Close);
+ assertEquals("Unexpected condition", origErrorCondition, ((Close) frameBody).getError());
+ }
+
+ @Test
+ public void testCloseFrameErrorAfterDecodeError() {
+ MockTransportImpl transport = new MockTransportImpl();
+ Connection connection = Proton.connection();
+
+ Collector collector = Collector.Factory.create();
+ connection.collect(collector);
+
+ transport.bind(connection);
+ connection.open();
+
+ pumpMockTransport(transport);
+
+ assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 1, transport.writes.size());
+ assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open);
+
+ assertEvents(collector, Event.Type.CONNECTION_INIT, Event.Type.CONNECTION_BOUND, Event.Type.CONNECTION_LOCAL_OPEN, Event.Type.TRANSPORT);
+
+ // Provide the response bytes for the header
+ transport.tail().put(AmqpHeader.HEADER);
+ transport.process();
+
+ // Provide the bytes for Open, but omit the mandatory container-id to provoke a decode error.
+ byte[] bytes = new byte[] { 0x00, 0x00, 0x00, 0x0C, // Frame size = 12 bytes.
+ 0x02, 0x00, 0x00, 0x00, // DOFF, TYPE, 2x CHANNEL
+ 0x00, 0x53, 0x10, 0x45};// Described-type, ulong type, open descriptor, list0.
+
+ int capacity = transport.capacity();
+ assertTrue("Unexpected transport capacity: " + capacity, capacity > bytes.length);
+
+ transport.tail().put(bytes);
+ transport.process();
+
+ assertEvents(collector, Event.Type.TRANSPORT_ERROR, Event.Type.TRANSPORT_TAIL_CLOSED);
+
+ pumpMockTransport(transport);
+
+ assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 2, transport.writes.size());
+ FrameBody frameBody = transport.writes.get(1);
+ assertTrue("Unexpected frame type", frameBody instanceof Close);
+
+ // Expect the close frame generated to contain a decode error condition referencing the missing container-id.
+ ErrorCondition expectedCondition = new ErrorCondition();
+ expectedCondition.setCondition(AmqpError.DECODE_ERROR);
+ expectedCondition.setDescription("The container-id field cannot be omitted");
+
+ assertEquals("Unexpected condition", expectedCondition, ((Close) frameBody).getError());
+ }
+
+ private void prepareAndOpenConnection(MockTransportImpl transport, Connection connection) {
+ transport.bind(connection);
+ connection.open();
+
+ pumpMockTransport(transport);
+
+ assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 1, transport.writes.size());
+ assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open);
+
+ // Give the necessary response to open
+ transport.handleFrame(new TransportFrame(0, new Open(), null));
+ }
+
+ @Test
public void testProtocolTracingLogsFrameToTracer()
{
Connection connection = new ConnectionImpl();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org