You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2018/02/26 20:54:51 UTC

qpid-proton-j git commit: PROTON-1774 Remove sasl processing when complete

Repository: qpid-proton-j
Updated Branches:
  refs/heads/master 45bdc03dc -> 88310a5a7


PROTON-1774 Remove sasl processing when complete

Once complete remove any impact of the SaslTransportWrapper from the
transport chain by selecting back to the next TransportWrappers for
input and output once SASL is complete and the pending buffers are
drained.  Move the buffers for sasl input and output into the wrapper
which can be GC'd once removed from the chain.

Project: http://git-wip-us.apache.org/repos/asf/qpid-proton-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton-j/commit/88310a5a
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton-j/tree/88310a5a
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton-j/diff/88310a5a

Branch: refs/heads/master
Commit: 88310a5a7fe1073e95504c1cfbc69206e4b643db
Parents: 45bdc03
Author: Timothy Bish <ta...@gmail.com>
Authored: Mon Feb 26 15:53:35 2018 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Mon Feb 26 15:53:35 2018 -0500

----------------------------------------------------------------------
 .../qpid/proton/engine/impl/SaslImpl.java       | 133 ++++++++++++++++---
 .../qpid/proton/systemtests/EngineTestBase.java |  15 ++-
 2 files changed, 124 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/88310a5a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java
index 9125625..cee798f 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java
@@ -58,9 +58,8 @@ public class SaslImpl implements Sasl, SaslFrameBody.SaslFrameBodyHandler<Void>,
     private final TransportImpl _transport;
 
     private boolean _tail_closed = false;
-    private final ByteBuffer _inputBuffer;
     private boolean _head_closed = false;
-    private final ByteBuffer _outputBuffer;
+    private final int _maxFrameSize;
     private final FrameWriter _frameWriter;
 
     private ByteBuffer _pending;
@@ -96,8 +95,7 @@ public class SaslImpl implements Sasl, SaslFrameBody.SaslFrameBodyHandler<Void>,
     SaslImpl(TransportImpl transport, int maxFrameSize)
     {
         _transport = transport;
-        _inputBuffer = newWriteableBuffer(maxFrameSize);
-        _outputBuffer = newWriteableBuffer(maxFrameSize);
+        _maxFrameSize = maxFrameSize;
 
         AMQPDefinedTypes.registerAllTypes(_decoder,_encoder);
         _frameParser = new SaslFrameParser(this, _decoder, maxFrameSize);
@@ -122,17 +120,6 @@ public class SaslImpl implements Sasl, SaslFrameBody.SaslFrameBodyHandler<Void>,
         return _done && (_role==Role.CLIENT || _initReceived);
     }
 
-    private void writeSaslOutput()
-    {
-        process();
-        _frameWriter.readBytes(_outputBuffer);
-
-        if(_logger.isLoggable(Level.FINER))
-        {
-            _logger.log(Level.FINER, "Finished writing SASL output. Output Buffer : " + _outputBuffer);
-        }
-    }
-
     private void process()
     {
         processHeader();
@@ -226,7 +213,6 @@ public class SaslImpl implements Sasl, SaslFrameBody.SaslFrameBodyHandler<Void>,
         if(!_headerWritten)
         {
             _frameWriter.writeHeader(AmqpHeader.SASL_HEADER);
-
             _headerWritten = true;
             return AmqpHeader.SASL_HEADER.length;
         }
@@ -514,7 +500,7 @@ public class SaslImpl implements Sasl, SaslFrameBody.SaslFrameBodyHandler<Void>,
 
     public TransportWrapper wrap(final TransportInput input, final TransportOutput output)
     {
-        return new SaslSniffer(new SaslTransportWrapper(input, output),
+        return new SaslSniffer(new SwitchingSaslTransportWrapper(input, output),
                                new PlainTransportWrapper(output, input)) {
             protected boolean isDeterminationMade() {
                 if (_role == Role.SERVER && _allowSkip) {
@@ -545,13 +531,23 @@ public class SaslImpl implements Sasl, SaslFrameBody.SaslFrameBodyHandler<Void>,
         private final TransportInput _underlyingInput;
         private final TransportOutput _underlyingOutput;
         private boolean _outputComplete;
+
+        private final ByteBuffer _outputBuffer;
+        private final ByteBuffer _inputBuffer;
         private final ByteBuffer _head;
 
-        private SaslTransportWrapper(TransportInput input, TransportOutput output)
+        private final SwitchingSaslTransportWrapper _parent;
+
+        private SaslTransportWrapper(SwitchingSaslTransportWrapper parent, TransportInput input, TransportOutput output)
         {
             _underlyingInput = input;
             _underlyingOutput = output;
 
+            _inputBuffer = newWriteableBuffer(_maxFrameSize);
+            _outputBuffer = newWriteableBuffer(_maxFrameSize);
+
+            _parent = parent;
+
             if (_transport.isUseReadOnlyOutputBuffer()) {
                 _head = _outputBuffer.asReadOnlyBuffer();
             } else {
@@ -565,7 +561,7 @@ public class SaslImpl implements Sasl, SaslFrameBody.SaslFrameBodyHandler<Void>,
         {
             if(isOutputInSaslMode())
             {
-                SaslImpl.this.writeSaslOutput();
+                writeSaslOutput();
                 if(_done)
                 {
                     _outputComplete = true;
@@ -579,7 +575,7 @@ public class SaslImpl implements Sasl, SaslFrameBody.SaslFrameBodyHandler<Void>,
          */
         private boolean isInputInSaslMode()
         {
-            return _role == null || (_role == Role.CLIENT && !_done) ||(_role == Role.SERVER && (!_initReceived || !_done));
+            return _role == null || (_role == Role.CLIENT && !_done) || (_role == Role.SERVER && (!_initReceived || !_done));
         }
 
         private boolean isOutputInSaslMode()
@@ -680,12 +676,17 @@ public class SaslImpl implements Sasl, SaslFrameBody.SaslFrameBodyHandler<Void>,
                         _tail_closed = true;
                     }
 
-                    _underlyingInput.process();
+                    if (!_inputBuffer.hasRemaining())
+                    {
+                        _parent.switchToNextInput();
+                    }
                 }
                 else
                 {
-                    _underlyingInput.process();
+                    _parent.switchToNextInput();
                 }
+
+                _underlyingInput.process();
             }
         }
 
@@ -708,6 +709,7 @@ public class SaslImpl implements Sasl, SaslFrameBody.SaslFrameBodyHandler<Void>,
             }
             else
             {
+                _parent.switchToNextOutput();
                 return _underlyingOutput.pending();
             }
         }
@@ -722,6 +724,7 @@ public class SaslImpl implements Sasl, SaslFrameBody.SaslFrameBodyHandler<Void>,
             }
             else
             {
+                _parent.switchToNextOutput();
                 return _underlyingOutput.head();
             }
         }
@@ -739,6 +742,7 @@ public class SaslImpl implements Sasl, SaslFrameBody.SaslFrameBodyHandler<Void>,
             }
             else
             {
+                _parent.switchToNextOutput();
                 _underlyingOutput.pop(bytes);
             }
         }
@@ -746,8 +750,93 @@ public class SaslImpl implements Sasl, SaslFrameBody.SaslFrameBodyHandler<Void>,
         @Override
         public void close_head()
         {
+            _parent.switchToNextOutput();
             _underlyingOutput.close_head();
         }
+
+        private void writeSaslOutput()
+        {
+            SaslImpl.this.process();
+            _frameWriter.readBytes(_outputBuffer);
+
+            if(_logger.isLoggable(Level.FINER))
+            {
+                _logger.log(Level.FINER, "Finished writing SASL output. Output Buffer : " + _outputBuffer);
+            }
+        }
+    }
+
+    private class SwitchingSaslTransportWrapper implements TransportWrapper {
+
+        private final TransportInput _underlyingInput;
+        private final TransportOutput _underlyingOutput;
+
+        private TransportInput currentInput;
+        private TransportOutput currentOutput;
+
+        private SwitchingSaslTransportWrapper(TransportInput input, TransportOutput output) {
+            _underlyingInput = input;
+            _underlyingOutput = output;
+
+            // The wrapper can be GC'd after both current's are switched to next.
+            SaslTransportWrapper saslProcessor = new SaslTransportWrapper(this, input, output);
+
+            currentInput = saslProcessor;
+            currentOutput = saslProcessor;
+        }
+
+        @Override
+        public int capacity() {
+            return currentInput.capacity();
+        }
+
+        @Override
+        public int position() {
+            return currentInput.position();
+        }
+
+        @Override
+        public ByteBuffer tail() throws TransportException {
+            return currentInput.tail();
+        }
+
+        @Override
+        public void process() throws TransportException {
+            currentInput.process();
+        }
+
+        @Override
+        public void close_tail() {
+            currentInput.close_tail();
+        }
+
+        @Override
+        public int pending() {
+            return currentOutput.pending();
+        }
+
+        @Override
+        public ByteBuffer head() {
+            return currentOutput.head();
+        }
+
+        @Override
+        public void pop(int bytes) {
+            currentOutput.pop(bytes);
+        }
+
+        @Override
+        public void close_head() {
+            currentOutput.close_head();
+        }
+
+        void switchToNextInput() {
+            currentInput = _underlyingInput;
+        }
+
+        void switchToNextOutput() {
+            currentOutput = _underlyingOutput;
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/88310a5a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/EngineTestBase.java
----------------------------------------------------------------------
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/EngineTestBase.java b/proton-j/src/test/java/org/apache/qpid/proton/systemtests/EngineTestBase.java
index 9dadf29..6868496 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/EngineTestBase.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/systemtests/EngineTestBase.java
@@ -36,6 +36,11 @@ public abstract class EngineTestBase
     private final ProtonContainer _client = new ProtonContainer("clientContainer");
     private final ProtonContainer _server = new ProtonContainer("serverContainer");
 
+    protected boolean shouldLogPumpedBytes()
+    {
+        return true;
+    }
+
     protected TestLoggingHelper getTestLoggingHelper()
     {
         return _testLoggingHelper;
@@ -61,7 +66,10 @@ public abstract class EngineTestBase
     {
         ByteBuffer serverBuffer = getServer().transport.getOutputBuffer();
 
-        getTestLoggingHelper().prettyPrint("          <<<" + TestLoggingHelper.SERVER_PREFIX + " ", serverBuffer);
+        if (shouldLogPumpedBytes())
+        {
+            getTestLoggingHelper().prettyPrint("          <<<" + TestLoggingHelper.SERVER_PREFIX + " ", serverBuffer);
+        }
         assertTrue("Server expected to produce some output", serverBuffer.hasRemaining());
 
         ByteBuffer clientBuffer = getClient().transport.getInputBuffer();
@@ -78,7 +86,10 @@ public abstract class EngineTestBase
     {
         ByteBuffer clientBuffer = getClient().transport.getOutputBuffer();
 
-        getTestLoggingHelper().prettyPrint(TestLoggingHelper.CLIENT_PREFIX + ">>> ", clientBuffer);
+        if (shouldLogPumpedBytes())
+        {
+            getTestLoggingHelper().prettyPrint(TestLoggingHelper.CLIENT_PREFIX + ">>> ", clientBuffer);
+        }
         assertTrue("Client expected to produce some output", clientBuffer.hasRemaining());
 
         ByteBuffer serverBuffer = getServer().transport.getInputBuffer();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org