You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2016/04/13 15:53:16 UTC

qpid-proton git commit: PROTON-1171: update transport ssl wrapper to ensure attempt to pass all decoded bytes to the underlying input or throw, and avoid processing the underlying layer when no bytes have been passed on

Repository: qpid-proton
Updated Branches:
  refs/heads/master c55ae663f -> 3ea4287d4


PROTON-1171: update transport ssl wrapper to ensure attempt to pass all decoded bytes to the underlying input or throw, and avoid processing the underlying layer when no bytes have been passed on

This closes #73


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/3ea4287d
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/3ea4287d
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/3ea4287d

Branch: refs/heads/master
Commit: 3ea4287d483f3ce275be75bf50ac1728527d697a
Parents: c55ae66
Author: Robert Gemmell <ro...@apache.org>
Authored: Wed Apr 13 14:36:52 2016 +0100
Committer: Robert Gemmell <ro...@apache.org>
Committed: Wed Apr 13 14:36:52 2016 +0100

----------------------------------------------------------------------
 .../impl/ssl/SimpleSslTransportWrapper.java     |  32 ++-
 .../proton/engine/impl/TransportTestHelper.java |   3 +-
 .../impl/ssl/CapitalisingDummySslEngine.java    |  25 +-
 .../impl/ssl/RememberingTransportInput.java     |  63 ++++-
 .../impl/ssl/SimpleSslTransportWrapperTest.java | 245 ++++++++++++++++++-
 5 files changed, 337 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/3ea4287d/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapper.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapper.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapper.java
index db8c290..a30e88b 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapper.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapper.java
@@ -115,27 +115,35 @@ public class SimpleSslTransportWrapper implements SslTransportWrapper
             HandshakeStatus hstatus = result.getHandshakeStatus();
 
             int capacity = _underlyingInput.capacity();
-            if (capacity == Transport.END_OF_STREAM) {
+            if (capacity == Transport.END_OF_STREAM || capacity <= 0) {
                 _tail_closed = true;
                 if (_decodedInputBuffer.position() > 0) {
                     throw new TransportException("bytes left unconsumed");
                 }
             } else {
-                ByteBuffer tail = _underlyingInput.tail();
                 _decodedInputBuffer.flip();
-                int limit = _decodedInputBuffer.limit();
-                int overflow = _decodedInputBuffer.remaining() - capacity;
-                if (overflow > 0) {
-                    _decodedInputBuffer.limit(limit - overflow);
+
+                while (_decodedInputBuffer.hasRemaining() && capacity > 0) {
+                    ByteBuffer tail = _underlyingInput.tail();
+                    int limit = _decodedInputBuffer.limit();
+                    int overflow = _decodedInputBuffer.remaining() - capacity;
+                    if (overflow > 0) {
+                        _decodedInputBuffer.limit(limit - overflow);
+                    }
+                    tail.put(_decodedInputBuffer);
+                    _decodedInputBuffer.limit(limit);
+                    _underlyingInput.process();
+                    capacity = _underlyingInput.capacity();
                 }
-                tail.put(_decodedInputBuffer);
-                _decodedInputBuffer.limit(limit);
-                _decodedInputBuffer.compact();
-                _underlyingInput.process();
-                capacity = _underlyingInput.capacity();
-                if (capacity == Transport.END_OF_STREAM) {
+
+                if (capacity == Transport.END_OF_STREAM || capacity <= 0) {
                     _tail_closed = true;
+                    if (_decodedInputBuffer.hasRemaining()) {
+                        throw new TransportException("bytes left unconsumed");
+                    }
                 }
+
+                _decodedInputBuffer.compact();
             }
 
             switch (status) {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/3ea4287d/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportTestHelper.java
----------------------------------------------------------------------
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportTestHelper.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportTestHelper.java
index f94cf04..995ed54 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportTestHelper.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportTestHelper.java
@@ -21,6 +21,7 @@ package org.apache.qpid.proton.engine.impl;
 import static org.junit.Assert.assertEquals;
 
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 
 public class TransportTestHelper
 {
@@ -47,7 +48,7 @@ public class TransportTestHelper
     {
         byte[] buf = new byte[sizeRequested];
         int numberRead = ByteBufferUtils.pourBufferToArray(source, buf, 0, sizeRequested);
-        return new String(buf, 0, numberRead);
+        return new String(buf, 0, numberRead, StandardCharsets.UTF_8);
     }
 
     public static String stringOfLength(String value, int repeat)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/3ea4287d/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/CapitalisingDummySslEngine.java
----------------------------------------------------------------------
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/CapitalisingDummySslEngine.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/CapitalisingDummySslEngine.java
index 7bde5fb..10a64d4 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/CapitalisingDummySslEngine.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/CapitalisingDummySslEngine.java
@@ -41,7 +41,7 @@ import javax.net.ssl.SSLException;
 public class CapitalisingDummySslEngine implements ProtonSslEngine
 {
     static final int SHORT_ENCODED_CHUNK_SIZE = 2;
-    private static final int MAX_ENCODED_CHUNK_SIZE = 5;
+    static final int MAX_ENCODED_CHUNK_SIZE = 5;
     private static final char ENCODED_TEXT_BEGIN = '<';
     private static final char ENCODED_TEXT_END = '>';
     private static final char ENCODED_TEXT_INNER_CHAR = '-';
@@ -49,6 +49,9 @@ public class CapitalisingDummySslEngine implements ProtonSslEngine
     private static final int CLEAR_CHUNK_SIZE = 2;
     private static final char CLEARTEXT_PADDING = '_';
     private SSLException _nextException;
+    private int _applicationBufferSize = CLEAR_CHUNK_SIZE;
+    private int _packetBufferSize = MAX_ENCODED_CHUNK_SIZE;
+    private int _unwrapCount;
 
     /**
      * Converts a_ to <-A->.  z_ is special and encodes as <> (to give us packets of different lengths).
@@ -116,6 +119,8 @@ public class CapitalisingDummySslEngine implements ProtonSslEngine
     public SSLEngineResult unwrap(ByteBuffer src, ByteBuffer dst)
             throws SSLException
     {
+        _unwrapCount++;
+
         if(_nextException != null)
         {
             throw _nextException;
@@ -185,13 +190,23 @@ public class CapitalisingDummySslEngine implements ProtonSslEngine
 
     private int getApplicationBufferSize()
     {
-        return CLEAR_CHUNK_SIZE;
+        return _applicationBufferSize;
     }
 
     @Override
     public int getPacketBufferSize()
     {
-        return MAX_ENCODED_CHUNK_SIZE;
+        return _packetBufferSize;
+    }
+
+    public void setApplicationBufferSize(int value)
+    {
+        _applicationBufferSize = value;
+    }
+
+    public void setPacketBufferSize(int value)
+    {
+        _packetBufferSize = value;
     }
 
     @Override
@@ -244,4 +259,8 @@ public class CapitalisingDummySslEngine implements ProtonSslEngine
     {
         _nextException = nextException;
     }
+
+    int getUnwrapCount() {
+        return _unwrapCount;
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/3ea4287d/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/RememberingTransportInput.java
----------------------------------------------------------------------
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/RememberingTransportInput.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/RememberingTransportInput.java
index 191cd58..366feb3 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/RememberingTransportInput.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/RememberingTransportInput.java
@@ -30,7 +30,10 @@ class RememberingTransportInput implements TransportInput
     private StringBuilder _receivedInput = new StringBuilder();
     private String _nextError;
     private int _inputBufferSize = 1024;
-    private ByteBuffer _buffer = ByteBuffer.allocate(_inputBufferSize);
+    private ByteBuffer _buffer;
+    private int _processCount = 0;
+    private Integer _zeroCapacityAtCount = null;
+    private int _capacityCount = 0;
 
     String getAcceptedInput()
     {
@@ -46,24 +49,37 @@ class RememberingTransportInput implements TransportInput
     @Override
     public int capacity()
     {
+        initIntermediateBuffer();
+
+        _capacityCount++;
+        if(_zeroCapacityAtCount != null && _capacityCount >= _zeroCapacityAtCount) {
+            return 0;
+        }
+
         return _buffer.remaining();
     }
 
     @Override
     public int position()
     {
+        initIntermediateBuffer();
         return _buffer.position();
     }
 
     @Override
     public ByteBuffer tail()
     {
+        initIntermediateBuffer();
         return _buffer;
     }
 
     @Override
     public void process() throws TransportException
     {
+        _processCount++;
+
+        initIntermediateBuffer();
+
         if(_nextError != null)
         {
             throw new TransportException(_nextError);
@@ -87,8 +103,51 @@ class RememberingTransportInput implements TransportInput
         _nextError = nextError;
     }
 
-    public void setInputBufferSize(int inputBufferSize)
+    /**
+     * If called before the object is otherwise used, the intermediate input buffer will be
+     * initiated to the given size. If called after use, an ISE will be thrown.
+     *
+     * @param inputBufferSize size of the intermediate input buffer
+     * @throws IllegalStateException if the buffer was already initialised
+     */
+    public void setInputBufferSize(int inputBufferSize) throws IllegalStateException
     {
+        if (_buffer != null)
+        {
+            throw new IllegalStateException("Intermediate input buffer already initialised");
+        }
+
         _inputBufferSize = inputBufferSize;
     }
+
+    private void initIntermediateBuffer()
+    {
+        if (_buffer == null)
+        {
+            _buffer = ByteBuffer.allocate(_inputBufferSize);
+        }
+    }
+
+    int getProcessCount()
+    {
+        return _processCount;
+    }
+
+    int getCapacityCount()
+    {
+        return _capacityCount;
+    }
+
+    /**
+     * Sets a point at which calls to capacity will return 0 regardless of the actual buffer state.
+     *
+     * @param zeroCapacityAtCount number of calls to capacity at which zero starts being returned.
+     */
+    void setZeroCapacityAtCount(Integer zeroCapacityAtCount)
+    {
+        if(zeroCapacityAtCount != null && zeroCapacityAtCount < 1) {
+            throw new IllegalArgumentException("Value must be null, or at least 1");
+        }
+        _zeroCapacityAtCount = zeroCapacityAtCount;
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/3ea4287d/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapperTest.java
----------------------------------------------------------------------
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapperTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapperTest.java
index 92455e1..2a7aca7 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapperTest.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapperTest.java
@@ -29,6 +29,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 
 import javax.net.ssl.SSLException;
 
@@ -46,12 +47,10 @@ import org.junit.rules.ExpectedException;
  */
 public class SimpleSslTransportWrapperTest
 {
-    private RememberingTransportInput _underlyingInput = new RememberingTransportInput();
-    private CannedTransportOutput _underlyingOutput = new CannedTransportOutput();
-
+    private RememberingTransportInput _underlyingInput;
+    private CannedTransportOutput _underlyingOutput;
     private SimpleSslTransportWrapper _sslWrapper;
-
-    private CapitalisingDummySslEngine _dummySslEngine = new CapitalisingDummySslEngine();
+    private CapitalisingDummySslEngine _dummySslEngine;
 
     @Rule
     public ExpectedException _expectedException = ExpectedException.none();
@@ -59,6 +58,9 @@ public class SimpleSslTransportWrapperTest
     @Before
     public void setUp()
     {
+        _underlyingInput = new RememberingTransportInput();
+        _underlyingOutput = new CannedTransportOutput();
+        _dummySslEngine = new CapitalisingDummySslEngine();
         _sslWrapper = new SimpleSslTransportWrapper(_dummySslEngine, _underlyingInput, _underlyingOutput);
     }
 
@@ -70,8 +72,16 @@ public class SimpleSslTransportWrapperTest
         putBytesIntoTransport(encodedBytes);
 
         assertEquals("a_", _underlyingInput.getAcceptedInput());
+        assertEquals(CapitalisingDummySslEngine.MAX_ENCODED_CHUNK_SIZE, _sslWrapper.capacity());
+        assertEquals(2, _dummySslEngine.getUnwrapCount());// 1 packet, 1 underflow
+        assertEquals(1, _underlyingInput.getProcessCount());
     }
 
+    /**
+     * Note that this only feeds 1 encoded packet in at a time due to default settings of the dummy engine,
+     * See {@link #testUnderlyingInputUsingSmallBuffer_receivesAllDecodedInputRequiringMultipleUnwraps}
+     * for a related test that passes multiple encoded packets into the ssl wrapper at once.
+     */
     @Test
     public void testInputWithMultiplePackets()
     {
@@ -80,6 +90,9 @@ public class SimpleSslTransportWrapperTest
         putBytesIntoTransport(encodedBytes);
 
         assertEquals("a_b_c_z_", _underlyingInput.getAcceptedInput());
+        assertEquals(CapitalisingDummySslEngine.MAX_ENCODED_CHUNK_SIZE, _sslWrapper.capacity());
+        assertEquals(8, _dummySslEngine.getUnwrapCount()); // (1 decode + 1 underflow) * 4 packets
+        assertEquals(4, _underlyingInput.getProcessCount()); // 1 process per decoded packet
     }
 
     @Test
@@ -90,9 +103,15 @@ public class SimpleSslTransportWrapperTest
 
         putBytesIntoTransport(incompleteEncodedBytes);
         assertEquals("a_b_", _underlyingInput.getAcceptedInput());
+        assertEquals(5, _dummySslEngine.getUnwrapCount()); // 2 * (1 decode + 1 underflow) + 1 underflow
+        assertEquals(2, _underlyingInput.getProcessCount()); // 1 process per decoded packet
 
         putBytesIntoTransport(remainingEncodedBytes);
         assertEquals("a_b_c_d_", _underlyingInput.getAcceptedInput());
+        assertEquals(4, _underlyingInput.getProcessCount()); // earlier + 2
+        assertEquals(9, _dummySslEngine.getUnwrapCount()); // Earlier + 2 * (1 decode + 1 underflow)
+                                                           // due to way the bytes are fed in across
+                                                           // boundary of encoded packets
     }
 
     /**
@@ -108,22 +127,222 @@ public class SimpleSslTransportWrapperTest
 
         putBytesIntoTransport(firstEncodedBytes);
         assertEquals("a_b_", _underlyingInput.getAcceptedInput());
+        assertEquals(5, _dummySslEngine.getUnwrapCount()); // 2 * (1 decode + 1 underflow) + 1 underflow
+        assertEquals(2, _underlyingInput.getProcessCount()); // 1 process per decoded packet
 
         putBytesIntoTransport(secondEncodedBytes);
         assertEquals("a_b_", _underlyingInput.getAcceptedInput());
+        assertEquals(6, _dummySslEngine.getUnwrapCount()); // earlier + 1 underflow
+        assertEquals(2, _underlyingInput.getProcessCount()); // as earlier
 
         putBytesIntoTransport(thirdEncodedBytes);
         assertEquals("a_b_c_d_", _underlyingInput.getAcceptedInput());
+        assertEquals(4, _underlyingInput.getProcessCount()); // 1 process per decoded packet
+        assertEquals(10, _dummySslEngine.getUnwrapCount()); // Earlier + (decode + underflow) * 2
+                                                           // due to way the bytes are fed in across
+                                                           // boundary of encoded packets
     }
 
+    /**
+     * Tests that when a small underlying input buffer (1 byte here) is used, all of the encoded
+     * data packet (5 bytes each here) can be processed despite multiple attempts being required to
+     * pass the decoded bytes (2 bytes here) to the underlying input layer for processing.
+     */
     @Test
-    public void testUnderlyingInputUsingSmallBuffer_receivesAllDecodedInput() throws Exception
+    public void testUnderlyingInputUsingSmallBuffer_receivesAllDecodedInputRequiringMultipleUnderlyingProcesses()
     {
-        _underlyingInput.setInputBufferSize(1);
+        int underlyingInputBufferSize = 1;
+        int encodedPacketSize = 5;
 
-        putBytesIntoTransport("<-A->");
+        _underlyingInput.setInputBufferSize(underlyingInputBufferSize);
+        assertEquals("Unexpected underlying input capacity", underlyingInputBufferSize, _underlyingInput.capacity());
 
-        assertEquals("a_", _underlyingInput.getAcceptedInput());
+        assertEquals("Unexpected max encoded chunk size", encodedPacketSize, CapitalisingDummySslEngine.MAX_ENCODED_CHUNK_SIZE);
+
+        byte[] bytes = "<-A-><-B->".getBytes(StandardCharsets.UTF_8);
+        ByteBuffer encodedByteSource = ByteBuffer.wrap(bytes);
+
+        assertEquals("Unexpected initial capacity", encodedPacketSize, _sslWrapper.capacity());
+
+        // Process the first 'encoded packet' (<-A->)
+        int numberPoured = pour(encodedByteSource, _sslWrapper.tail());
+        assertEquals("Unexpected number of bytes poured into the wrapper input buffer", encodedPacketSize, numberPoured);
+        assertEquals("Unexpected position in encoded source byte buffer", encodedPacketSize * 1, encodedByteSource.position());
+        assertEquals("Unexpected capacity", 0, _sslWrapper.capacity());
+        _sslWrapper.process();
+        assertEquals("Unexpected capacity", encodedPacketSize, _sslWrapper.capacity());
+
+        assertEquals("unexpected underlying output after first wrapper process", "a_", _underlyingInput.getAcceptedInput());
+        assertEquals("unexpected underlying process count after first wrapper process", 2 , _underlyingInput.getProcessCount());
+
+        // Process the second 'encoded packet' (<-B->)
+        numberPoured = pour(encodedByteSource, _sslWrapper.tail());
+        assertEquals("Unexpected number of bytes poured into the wrapper input buffer", encodedPacketSize, numberPoured);
+        assertEquals("Unexpected position in encoded source byte buffer", encodedPacketSize * 2, encodedByteSource.position());
+        assertEquals("Unexpected capacity", 0, _sslWrapper.capacity());
+        _sslWrapper.process();
+        assertEquals("Unexpected capacity", encodedPacketSize, _sslWrapper.capacity());
+
+        assertEquals("unexpected underlying output after second wrapper process", "a_b_", _underlyingInput.getAcceptedInput());
+        assertEquals("unexpected underlying process count after second wrapper process", 4 , _underlyingInput.getProcessCount());
+    }
+
+    /**
+     * Tests that when a small underlying input buffer (1 byte here) is used, all of the encoded
+     * data packets (20 bytes total here) can be processed despite multiple unwraps being required
+     * to process a given set of input (3 packets, 15 bytes here) and then as a result also multiple
+     * attempts to pass the decoded packet (2 bytes here) to the underlying input layer for processing.
+     */
+    @Test
+    public void testUnderlyingInputUsingSmallBuffer_receivesAllDecodedInputRequiringMultipleUnwraps()
+    {
+        int underlyingInputBufferSize = 1;
+        int encodedPacketSize = 5;
+        int sslEngineBufferSize = 15;
+
+        assertEquals("Unexpected max encoded chunk size", encodedPacketSize, CapitalisingDummySslEngine.MAX_ENCODED_CHUNK_SIZE);
+
+        _underlyingOutput = new CannedTransportOutput();
+        _underlyingInput = new RememberingTransportInput();
+        _underlyingInput.setInputBufferSize(underlyingInputBufferSize);
+        assertEquals("Unexpected underlying input capacity", underlyingInputBufferSize, _underlyingInput.capacity());
+
+        // Create a dummy ssl engine that has buffers that holds multiple encoded/decoded
+        // packets, but still can't fit all of the input
+        _dummySslEngine = new CapitalisingDummySslEngine();
+        _dummySslEngine.setApplicationBufferSize(sslEngineBufferSize);
+        _dummySslEngine.setPacketBufferSize(sslEngineBufferSize);
+
+        _sslWrapper = new SimpleSslTransportWrapper(_dummySslEngine, _underlyingInput, _underlyingOutput);
+
+        byte[] bytes = "<-A-><-B-><-C-><-D->".getBytes(StandardCharsets.UTF_8);
+        ByteBuffer encodedByteSource = ByteBuffer.wrap(bytes);
+
+        assertEquals("Unexpected initial capacity", sslEngineBufferSize, _sslWrapper.capacity());
+
+        // Process the first two 'encoded packets' (<-A-><-B-><-C->). This will require 3 'proper' unwraps, and
+        // as each decoded packet is 2 bytes, each of those will require 2 underlying input processes.
+        int numberPoured = pour(encodedByteSource, _sslWrapper.tail());
+        assertEquals("Unexpected number of bytes poured into the wrapper input buffer", sslEngineBufferSize, numberPoured);
+        assertEquals("Unexpected position in encoded source byte buffer", encodedPacketSize * 3, encodedByteSource.position());
+        assertEquals("Unexpected capacity", 0, _sslWrapper.capacity());
+
+        _sslWrapper.process();
+
+        assertEquals("a_b_c_", _underlyingInput.getAcceptedInput());
+        assertEquals("Unexpected capacity", sslEngineBufferSize, _sslWrapper.capacity());
+        assertEquals("unexpected underlying process count after wrapper process", 6 , _underlyingInput.getProcessCount());
+        assertEquals(4, _dummySslEngine.getUnwrapCount()); // 3 decodes + 1 underflow
+
+        // Process the fourth 'encoded packet' (<-D->)
+        numberPoured = pour(encodedByteSource, _sslWrapper.tail());
+        assertEquals("Unexpected number of bytes poured into the wrapper input buffer", encodedPacketSize, numberPoured);
+        assertEquals("Unexpected position in encoded source byte buffer", encodedPacketSize * 4, encodedByteSource.position());
+        assertEquals("Unexpected capacity", sslEngineBufferSize - encodedPacketSize, _sslWrapper.capacity());
+
+        _sslWrapper.process();
+
+        assertEquals("a_b_c_d_", _underlyingInput.getAcceptedInput());
+        assertEquals("Unexpected capacity", sslEngineBufferSize, _sslWrapper.capacity());
+        assertEquals("unexpected underlying process count after second wrapper process", 8 , _underlyingInput.getProcessCount());
+        assertEquals(6, _dummySslEngine.getUnwrapCount()); // earlier + 1 decode + 1 underflow
+    }
+
+    /**
+     * Tests that an exception is thrown when the underlying input has zero capacity when the call
+     * with newly decoded input is initially made.
+     */
+    @Test (timeout = 5000)
+    public void testUnderlyingInputHasZeroCapacityInitially()
+    {
+        int underlyingInputBufferSize = 1;
+        int encodedPacketSize = 5;
+
+        assertEquals("Unexpected max encoded chunk size", encodedPacketSize, CapitalisingDummySslEngine.MAX_ENCODED_CHUNK_SIZE);
+
+        // Set the input to have a small buffer, but then return 0 from the 2nd capacity call onward.
+        _underlyingInput.setInputBufferSize(underlyingInputBufferSize);
+        _underlyingInput.setZeroCapacityAtCount(2);
+        assertEquals("Unexpected initial underlying input capacity", underlyingInputBufferSize, _underlyingInput.capacity());
+        assertEquals("Unexpected underlying input capacity", 0, _underlyingInput.capacity());
+
+        // Now try decoding the input, should fail
+        byte[] bytes = "<-A->".getBytes(StandardCharsets.UTF_8);
+        ByteBuffer encodedByteSource = ByteBuffer.wrap(bytes);
+
+        assertEquals("Unexpected initial wrapper capacity", encodedPacketSize, _sslWrapper.capacity());
+
+        int numberPoured = pour(encodedByteSource, _sslWrapper.tail());
+        assertEquals("Unexpected number of bytes poured into the wrapper input buffer", encodedPacketSize, numberPoured);
+        assertEquals("Unexpected position in encoded source byte buffer", encodedPacketSize, encodedByteSource.position());
+        assertEquals("Unexpected wrapper capacity", 0, _sslWrapper.capacity());
+
+        try
+        {
+            _sslWrapper.process();
+            fail("Expected an exception");
+        }
+        catch (TransportException te)
+        {
+            System.out.println("Caught exception:" + te);
+            // expected.
+        }
+
+        //Check we got no chars of decoded output.
+        assertEquals("", _underlyingInput.getAcceptedInput());
+        assertEquals("Unexpected wrapper capacity", -1, _sslWrapper.capacity());
+        assertEquals("unexpected underlying process count after wrapper process", 0 , _underlyingInput.getProcessCount());
+        assertEquals("unexpected underlying capacity count after wrapper process", 3, _underlyingInput.getCapacityCount());
+        assertEquals("unexpected underlying capacity after wrapper process", 0 , _underlyingInput.capacity());
+        assertEquals(1, _dummySslEngine.getUnwrapCount()); // 1 decode (then exception)
+    }
+
+    /**
+     * Tests that an exception is thrown when the underlying input has no capacity (but isn't closed)
+     * during the process of incrementally passing the decoded bytes to its smaller input buffer
+     * for processing.
+     */
+    @Test (timeout = 5000)
+    public void testUnderlyingInputHasZeroCapacityMidProcessing()
+    {
+        int underlyingInputBufferSize = 1;
+        int encodedPacketSize = 5;
+
+        assertEquals("Unexpected max encoded chunk size", encodedPacketSize, CapitalisingDummySslEngine.MAX_ENCODED_CHUNK_SIZE);
+
+        // Set the input to have a small buffer, but then return 0 from the 3rd capacity call onward.
+        _underlyingInput.setInputBufferSize(underlyingInputBufferSize);
+        _underlyingInput.setZeroCapacityAtCount(3);
+        assertEquals("Unexpected initial underlying input capacity", underlyingInputBufferSize, _underlyingInput.capacity());
+
+        // Now try decoding the input, should fail
+        byte[] bytes = "<-A->".getBytes(StandardCharsets.UTF_8);
+        ByteBuffer encodedByteSource = ByteBuffer.wrap(bytes);
+
+        assertEquals("Unexpected initial wrapper capacity", encodedPacketSize, _sslWrapper.capacity());
+
+        int numberPoured = pour(encodedByteSource, _sslWrapper.tail());
+        assertEquals("Unexpected number of bytes poured into the wrapper input buffer", encodedPacketSize, numberPoured);
+        assertEquals("Unexpected position in encoded source byte buffer", encodedPacketSize, encodedByteSource.position());
+        assertEquals("Unexpected wrapper capacity", 0, _sslWrapper.capacity());
+
+        try
+        {
+            _sslWrapper.process();
+            fail("Expected an exception");
+        }
+        catch (TransportException te)
+        {
+            // expected.
+        }
+
+        //Check we got the first char (a) of decoded output, but not the second (_).
+        assertEquals("a", _underlyingInput.getAcceptedInput());
+        assertEquals("Unexpected wrapper capacity", -1, _sslWrapper.capacity());
+        assertEquals("unexpected underlying process count after wrapper process", 1 , _underlyingInput.getProcessCount());
+        assertEquals("unexpected underlying capacity count after wrapper process", 3, _underlyingInput.getCapacityCount());
+        assertEquals("unexpected underlying capacity after wrapper process", 0 , _underlyingInput.capacity());
+        assertEquals(1, _dummySslEngine.getUnwrapCount()); // 1 decode (then exception)
     }
 
     @Test
@@ -132,7 +351,7 @@ public class SimpleSslTransportWrapperTest
         SSLException sslException = new SSLException("unwrap exception");
         _dummySslEngine.rejectNextEncodedPacket(sslException);
 
-        _sslWrapper.tail().put("<-A->".getBytes());
+        _sslWrapper.tail().put("<-A->".getBytes(StandardCharsets.UTF_8));
         _sslWrapper.process();
         assertEquals(_sslWrapper.capacity(), Transport.END_OF_STREAM);
     }
@@ -143,7 +362,7 @@ public class SimpleSslTransportWrapperTest
         String underlyingErrorDescription = "dummy underlying error";
         _underlyingInput.rejectNextInput(underlyingErrorDescription);
 
-        _sslWrapper.tail().put("<-A->".getBytes());
+        _sslWrapper.tail().put("<-A->".getBytes(StandardCharsets.UTF_8));
 
         try {
             _sslWrapper.process();
@@ -167,7 +386,7 @@ public class SimpleSslTransportWrapperTest
 
         ByteBuffer outputBuffer = _sslWrapper.head();
 
-        assertByteBufferContentEquals("<-A->".getBytes(), outputBuffer);
+        assertByteBufferContentEquals("<-A->".getBytes(StandardCharsets.UTF_8), outputBuffer);
     }
 
     @Test
@@ -218,7 +437,7 @@ public class SimpleSslTransportWrapperTest
 
     private void putBytesIntoTransport(String encodedBytes)
     {
-        ByteBuffer byteBuffer = ByteBuffer.wrap(encodedBytes.getBytes());
+        ByteBuffer byteBuffer = ByteBuffer.wrap(encodedBytes.getBytes(StandardCharsets.UTF_8));
         while(byteBuffer.hasRemaining())
         {
             int numberPoured = pour(byteBuffer, _sslWrapper.tail());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org