You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2019/11/22 17:44:55 UTC

[qpid-proton-j] branch master updated: PROTON-2142: use transport error condition in close frame if connection error not set

This is an automated email from the ASF dual-hosted git repository.

robbie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton-j.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ac1200  PROTON-2142: use transport error condition in close frame if connection error not set
3ac1200 is described below

commit 3ac12001c67fd8746c0000d2b3b9153e35d8d24a
Author: Robbie Gemmell <ro...@apache.org>
AuthorDate: Fri Nov 22 17:40:53 2019 +0000

    PROTON-2142: use transport error condition in close frame if connection error not set
---
 .../proton/engine/TransportDecodeException.java    |  36 +++++
 .../qpid/proton/engine/impl/FrameParser.java       |   5 +-
 .../qpid/proton/engine/impl/SaslFrameParser.java   |   5 +-
 .../qpid/proton/engine/impl/TransportImpl.java     |  28 +++-
 .../qpid/proton/engine/impl/FrameParserTest.java   |   1 +
 .../qpid/proton/engine/impl/TransportImplTest.java | 177 ++++++++++++++++++++-
 6 files changed, 239 insertions(+), 13 deletions(-)

diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/TransportDecodeException.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/TransportDecodeException.java
new file mode 100644
index 0000000..47b7d8b
--- /dev/null
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/TransportDecodeException.java
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.engine;
+
+import java.util.Objects;
+
+import org.apache.qpid.proton.codec.DecodeException;
+import org.apache.qpid.proton.engine.TransportException;
+
+public class TransportDecodeException extends TransportException
+{
+    public TransportDecodeException(String message, DecodeException cause)
+    {
+        super(message, cause);
+        Objects.requireNonNull(message, "A message must be provided");
+    }
+}
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java
index ce4283e..d0df499 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java
@@ -34,6 +34,7 @@ import org.apache.qpid.proton.amqp.transport.FrameBody;
 import org.apache.qpid.proton.codec.ByteBufferDecoder;
 import org.apache.qpid.proton.codec.DecodeException;
 import org.apache.qpid.proton.engine.Transport;
+import org.apache.qpid.proton.engine.TransportDecodeException;
 import org.apache.qpid.proton.engine.TransportException;
 import org.apache.qpid.proton.framing.TransportFrame;
 
@@ -445,7 +446,9 @@ class FrameParser implements TransportInput
                     catch (DecodeException ex)
                     {
                         state = State.ERROR;
-                        frameParsingError = new TransportException(ex);
+                        String message = ex.getMessage() == null ? ex.toString(): ex.getMessage();
+
+                        frameParsingError = new TransportDecodeException(message, ex);
                     }
                     break;
                 case ERROR:
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslFrameParser.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslFrameParser.java
index 141ec31..72e23c0 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslFrameParser.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslFrameParser.java
@@ -29,6 +29,7 @@ import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.security.SaslFrameBody;
 import org.apache.qpid.proton.codec.ByteBufferDecoder;
 import org.apache.qpid.proton.codec.DecodeException;
+import org.apache.qpid.proton.engine.TransportDecodeException;
 import org.apache.qpid.proton.engine.TransportException;
 
 class SaslFrameParser
@@ -390,7 +391,9 @@ class SaslFrameParser
                     catch (DecodeException ex)
                     {
                         state = State.ERROR;
-                        frameParsingError = new TransportException(ex);
+                        String message = ex.getMessage() == null ? ex.toString(): ex.getMessage();
+
+                        frameParsingError = new TransportDecodeException(message, ex);
                     }
                     break;
                 case ERROR:
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
index d8bac2d..7307aff 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
@@ -31,6 +31,7 @@ import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.UnsignedInteger;
 import org.apache.qpid.proton.amqp.UnsignedShort;
 import org.apache.qpid.proton.amqp.security.SaslFrameBody;
+import org.apache.qpid.proton.amqp.transport.AmqpError;
 import org.apache.qpid.proton.amqp.transport.Attach;
 import org.apache.qpid.proton.amqp.transport.Begin;
 import org.apache.qpid.proton.amqp.transport.Close;
@@ -57,6 +58,7 @@ import org.apache.qpid.proton.engine.Sasl;
 import org.apache.qpid.proton.engine.Ssl;
 import org.apache.qpid.proton.engine.SslDomain;
 import org.apache.qpid.proton.engine.SslPeerDetails;
+import org.apache.qpid.proton.engine.TransportDecodeException;
 import org.apache.qpid.proton.engine.TransportException;
 import org.apache.qpid.proton.engine.TransportResult;
 import org.apache.qpid.proton.engine.TransportResultFactory;
@@ -260,14 +262,18 @@ public class TransportImpl extends EndpointImpl
         // condition had been set (by TransportImpl itself) rather than the 'empty' ErrorCondition
         // object historically used in the other areas.
         ErrorCondition errorCondition = super.getCondition();
-        return errorCondition.getCondition() != null ? errorCondition : null;
+        return isConditionPopulated(errorCondition) ? errorCondition : null;
     }
 
     @Override
     public void setCondition(ErrorCondition error)
     {
         super.setCondition(error);
-        _conditionSet = error != null && error.getCondition() != null;
+        _conditionSet = isConditionPopulated(error);
+    }
+
+    private boolean isConditionPopulated(ErrorCondition error) {
+        return error != null && error.getCondition() != null;
     }
 
     @Override
@@ -1097,13 +1103,14 @@ public class TransportImpl extends EndpointImpl
 
                 ErrorCondition localError;
 
-                if (_connectionEndpoint == null) {
-                    localError = getCondition();
-                } else {
+                if (_connectionEndpoint != null && isConditionPopulated(_connectionEndpoint.getCondition())) {
                     localError =  _connectionEndpoint.getCondition();
+                } else {
+                    // Connection is null or didn't have a condition set, use transport condition.
+                    localError = getCondition();
                 }
 
-                if(localError != null && localError.getCondition() != null)
+                if(isConditionPopulated(localError))
                 {
                     close.setError(localError);
                 }
@@ -1466,8 +1473,13 @@ public class TransportImpl extends EndpointImpl
         if (!_closeReceived || error != null) {
             // Set an error condition, but only if one was not already set
             if(!_conditionSet) {
-                String description =  error == null ? "connection aborted" : error.toString();
-                setCondition(new ErrorCondition(ConnectionError.FRAMING_ERROR, description));
+                if(error instanceof TransportDecodeException) {
+                    setCondition(new ErrorCondition(AmqpError.DECODE_ERROR, error.getMessage()));
+                } else {
+                    String description =  error == null ? "connection aborted" : error.toString();
+
+                    setCondition(new ErrorCondition(ConnectionError.FRAMING_ERROR, description));
+                }
             }
 
             _head_closed = true;
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/FrameParserTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/FrameParserTest.java
index 3b0e36b..e03273f 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/FrameParserTest.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/FrameParserTest.java
@@ -70,6 +70,7 @@ public class FrameParserTest
         ByteBuffer buffer = _frameParser.tail();
         buffer.put("hello".getBytes());
         _frameParser.process();
+        verify(_mockFrameHandler).closed(any(TransportException.class));
         assertEquals(_frameParser.capacity(), Transport.END_OF_STREAM);
     }
 
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
index 4584cdb..bd7e569 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
@@ -49,6 +49,7 @@ import org.apache.qpid.proton.amqp.UnsignedShort;
 import org.apache.qpid.proton.amqp.messaging.Accepted;
 import org.apache.qpid.proton.amqp.messaging.AmqpValue;
 import org.apache.qpid.proton.amqp.messaging.Released;
+import org.apache.qpid.proton.amqp.transport.AmqpError;
 import org.apache.qpid.proton.amqp.transport.Attach;
 import org.apache.qpid.proton.amqp.transport.Begin;
 import org.apache.qpid.proton.amqp.transport.Close;
@@ -3612,6 +3613,8 @@ public class TransportImplTest
         assertNotNull("Expected an ErrorCondition to be returned", transport.getCondition());
         assertEquals("Unexpected ErrorCondition returned", origErrorCondition, transport.getCondition());
 
+        // ---------------------------------------------------------------- //
+
         // Set an error condition, then call 'closed' without an error.
         // Expect the original condition which was set to remain.
         transport = new TransportImpl();
@@ -3622,15 +3625,18 @@ public class TransportImplTest
         assertNotNull("Expected an ErrorCondition to be returned", transport.getCondition());
         assertEquals("Unexpected ErrorCondition returned", origErrorCondition, transport.getCondition());
 
+        // ---------------------------------------------------------------- //
+
         // Without having set an error condition, call 'closed' specifying an error.
         // Expect a condition to be set.
-        String errorDescription = "some-error-description";
         transport = new TransportImpl();
-        transport.closed(new TransportException(errorDescription));
+        transport.closed(new TransportException(description));
 
         assertNotNull("Expected an ErrorCondition to be returned", transport.getCondition());
         assertEquals("Unexpected condition returned", ConnectionError.FRAMING_ERROR, transport.getCondition().getCondition());
-        assertEquals("Unexpected description returned", "org.apache.qpid.proton.engine.TransportException: " + errorDescription, transport.getCondition().getDescription());
+        assertEquals("Unexpected description returned", "org.apache.qpid.proton.engine.TransportException: " + description, transport.getCondition().getDescription());
+
+        // ---------------------------------------------------------------- //
 
         // Without having set an error condition, call 'closed' without an error.
         // Expect a condition to be set.
@@ -3644,6 +3650,171 @@ public class TransportImplTest
     }
 
     @Test
+    public void testCloseFrameErrorAfterTransportClosed() {
+        MockTransportImpl transport = new MockTransportImpl();
+        Connection connection = Proton.connection();
+        prepareAndOpenConnection(transport, connection);
+
+        Symbol condition = Symbol.getSymbol("some-error");
+        String description = "some-error-description";
+        ErrorCondition origErrorCondition = new ErrorCondition();
+        origErrorCondition.setCondition(condition);
+        origErrorCondition.setDescription(description);
+        assertNotNull("Expected a Condition", origErrorCondition.getCondition());
+
+        // Set an error condition, then call 'closed' specifying an error.
+        // Expect the original condition which was set to be emitted
+        // in the close frame generated.
+
+        transport.setCondition(origErrorCondition);
+        transport.closed(new TransportException("my-ignored-exception"));
+
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 2, transport.writes.size());
+        FrameBody frameBody = transport.writes.get(1);
+        assertTrue("Unexpected frame type", frameBody instanceof Close);
+        assertEquals("Unexpected condition", origErrorCondition, ((Close) frameBody).getError());
+
+        // ---------------------------------------------------------------- //
+
+        // Set an error condition, then call 'closed' without an error.
+        // Expect the original condition which was set to be emitted
+        // in the close frame generated.
+
+        transport = new MockTransportImpl();
+        connection = Proton.connection();
+        prepareAndOpenConnection(transport, connection);
+
+        transport.setCondition(origErrorCondition);
+        transport.closed(null);
+
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 2, transport.writes.size());
+        frameBody = transport.writes.get(1);
+        assertTrue("Unexpected frame type", frameBody instanceof Close);
+        assertEquals("Unexpected condition", origErrorCondition, ((Close) frameBody).getError());
+
+        // ---------------------------------------------------------------- //
+
+        // Without having set an error condition, call 'closed' specifying an error.
+        // Expect a condition to be emitted in the close frame generated.
+        transport = new MockTransportImpl();
+        connection = Proton.connection();
+        prepareAndOpenConnection(transport, connection);
+
+        transport.closed(new TransportException(description));
+
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 2, transport.writes.size());
+        frameBody = transport.writes.get(1);
+        assertTrue("Unexpected frame type", frameBody instanceof Close);
+        ErrorCondition expectedCondition = new ErrorCondition(ConnectionError.FRAMING_ERROR, "org.apache.qpid.proton.engine.TransportException: " + description);
+        assertEquals("Unexpected condition", expectedCondition, ((Close) frameBody).getError());
+
+        // ---------------------------------------------------------------- //
+
+        // Without having set an error condition, call 'closed' without an error.
+        // Expect a condition to be emitted in the close frame generated.
+        transport = new MockTransportImpl();
+        connection = Proton.connection();
+        prepareAndOpenConnection(transport, connection);
+
+        transport.closed(null);
+
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 2, transport.writes.size());
+        frameBody = transport.writes.get(1);
+        assertTrue("Unexpected frame type", frameBody instanceof Close);
+        expectedCondition = new ErrorCondition(ConnectionError.FRAMING_ERROR, "connection aborted");
+        assertEquals("Unexpected condition", expectedCondition, ((Close) frameBody).getError());
+
+        // ---------------------------------------------------------------- //
+
+        // Without having set an error condition on the transport, call 'closed' with an error,
+        // but then also set a condition on the connection, and expect the connection error
+        // condition to be emitted in the close frame generated.
+        transport = new MockTransportImpl();
+        connection = Proton.connection();
+        prepareAndOpenConnection(transport, connection);
+
+        transport.closed(new TransportException("some other transport exception"));
+        connection.setCondition(origErrorCondition);
+
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 2, transport.writes.size());
+        frameBody = transport.writes.get(1);
+        assertTrue("Unexpected frame type", frameBody instanceof Close);
+        assertEquals("Unexpected condition", origErrorCondition, ((Close) frameBody).getError());
+    }
+
+    @Test
+    public void testCloseFrameErrorAfterDecodeError() {
+        MockTransportImpl transport = new MockTransportImpl();
+        Connection connection = Proton.connection();
+
+        Collector collector = Collector.Factory.create();
+        connection.collect(collector);
+
+        transport.bind(connection);
+        connection.open();
+
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 1, transport.writes.size());
+        assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open);
+
+        assertEvents(collector, Event.Type.CONNECTION_INIT, Event.Type.CONNECTION_BOUND, Event.Type.CONNECTION_LOCAL_OPEN, Event.Type.TRANSPORT);
+
+        // Provide the response bytes for the header
+        transport.tail().put(AmqpHeader.HEADER);
+        transport.process();
+
+        // Provide the bytes for Open, but omit the mandatory container-id to provoke a decode error.
+        byte[] bytes = new byte[] {  0x00, 0x00, 0x00, 0x0C, // Frame size = 12 bytes.
+                                     0x02, 0x00, 0x00, 0x00, // DOFF, TYPE, 2x CHANNEL
+                                     0x00, 0x53, 0x10, 0x45};// Described-type, ulong type, open descriptor, list0.
+
+        int capacity = transport.capacity();
+        assertTrue("Unexpected transport capacity: " + capacity, capacity > bytes.length);
+
+        transport.tail().put(bytes);
+        transport.process();
+
+        assertEvents(collector, Event.Type.TRANSPORT_ERROR, Event.Type.TRANSPORT_TAIL_CLOSED);
+
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 2, transport.writes.size());
+        FrameBody frameBody = transport.writes.get(1);
+        assertTrue("Unexpected frame type", frameBody instanceof Close);
+
+        // Expect the close frame generated to contain a decode error condition referencing the missing container-id.
+        ErrorCondition expectedCondition = new ErrorCondition();
+        expectedCondition.setCondition(AmqpError.DECODE_ERROR);
+        expectedCondition.setDescription("The container-id field cannot be omitted");
+
+        assertEquals("Unexpected condition", expectedCondition, ((Close) frameBody).getError());
+    }
+
+    private void prepareAndOpenConnection(MockTransportImpl transport, Connection connection) {
+        transport.bind(connection);
+        connection.open();
+
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 1, transport.writes.size());
+        assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open);
+
+        // Give the necessary response to open
+        transport.handleFrame(new TransportFrame(0, new Open(), null));
+    }
+
+    @Test
     public void testProtocolTracingLogsFrameToTracer()
     {
         Connection connection = new ConnectionImpl();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org