You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2014/02/06 17:00:36 UTC

svn commit: r1565316 - in /qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton: driver/impl/ConnectorImpl.java engine/impl/SaslImpl.java

Author: rhs
Date: Thu Feb  6 16:00:36 2014
New Revision: 1565316

URL: http://svn.apache.org/r1565316
Log:
PROTON-508: applied patch from Timothy Bish

Modified:
    qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java
    qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java

Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java?rev=1565316&r1=1565315&r2=1565316&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java Thu Feb  6 16:00:36 2014
@@ -154,24 +154,22 @@ class ConnectorImpl<C> implements Connec
         boolean processed = false;
 
         int interest = _key.interestOps();
-        int pending = _transport.pending();
-        if (pending >= 0)
+        boolean writeBlocked = false;
+
+        while (_transport.pending() >= 0 && !writeBlocked)
         {
             int wrote = _channel.write(_transport.head());
             if (wrote > 0) {
                 processed = true;
                 _transport.pop(wrote);
+            } else {
+                writeBlocked = true;
             }
         }
-        else
-        {
-            _outputDone = true;
-        }
 
-        pending = _transport.pending();
+        int pending = _transport.pending();
         if (pending > 0) {
             interest |= SelectionKey.OP_WRITE;
-
         } else {
             interest &= ~SelectionKey.OP_WRITE;
             if (pending < 0) {

Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java?rev=1565316&r1=1565315&r2=1565316&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java Thu Feb  6 16:00:36 2014
@@ -21,9 +21,7 @@
 
 package org.apache.qpid.proton.engine.impl;
 
-import static org.apache.qpid.proton.engine.impl.ByteBufferUtils.newReadableBuffer;
 import static org.apache.qpid.proton.engine.impl.ByteBufferUtils.newWriteableBuffer;
-import static org.apache.qpid.proton.engine.impl.ByteBufferUtils.pour;
 import static org.apache.qpid.proton.engine.impl.ByteBufferUtils.pourAll;
 import static org.apache.qpid.proton.engine.impl.ByteBufferUtils.pourBufferToArray;
 
@@ -42,7 +40,6 @@ import org.apache.qpid.proton.amqp.secur
 import org.apache.qpid.proton.codec.AMQPDefinedTypes;
 import org.apache.qpid.proton.codec.DecoderImpl;
 import org.apache.qpid.proton.codec.EncoderImpl;
-import org.apache.qpid.proton.codec.WritableBuffer;
 import org.apache.qpid.proton.engine.Sasl;
 import org.apache.qpid.proton.engine.Transport;
 import org.apache.qpid.proton.engine.TransportException;
@@ -96,8 +93,7 @@ public class SaslImpl implements Sasl, S
 
         AMQPDefinedTypes.registerAllTypes(_decoder,_encoder);
         _frameParser = new SaslFrameParser(this, _decoder);
-        _frameWriter = new FrameWriter(_encoder, maxFrameSize, FrameWriter.SASL_FRAME_TYPE, null,
-                                       this);
+        _frameWriter = new FrameWriter(_encoder, maxFrameSize, FrameWriter.SASL_FRAME_TYPE, null, this);
     }
 
     @Override
@@ -168,7 +164,6 @@ public class SaslImpl implements Sasl, S
             {
                 processResponse();
             }
-
         }
     }
 
@@ -203,7 +198,6 @@ public class SaslImpl implements Sasl, S
 
     final int processHeader()
     {
-
         if(!_headerWritten)
         {
             _frameWriter.writeHeader(AmqpHeader.SASL_HEADER);
@@ -329,7 +323,6 @@ public class SaslImpl implements Sasl, S
         if(saslInit.getInitialResponse() != null)
         {
             setPending(saslInit.getInitialResponse().asByteBuffer());
-
         }
     }
 
@@ -441,7 +434,6 @@ public class SaslImpl implements Sasl, S
         System.arraycopy(passwordBytes, 0, data, 2+usernameBytes.length, passwordBytes.length);
 
         setChallengeResponse(new Binary(data));
-
     }
 
     @Override
@@ -468,7 +460,6 @@ public class SaslImpl implements Sasl, S
         _role = Role.SERVER;
     }
 
-
     public TransportWrapper wrap(final TransportInput input, final TransportOutput output)
     {
         return new SaslTransportWrapper(input, output);
@@ -487,14 +478,12 @@ public class SaslImpl implements Sasl, S
         return builder.toString();
     }
 
-
     private class SaslTransportWrapper implements TransportWrapper
     {
         private final TransportInput _underlyingInput;
         private final TransportOutput _underlyingOutput;
         private boolean _outputComplete;
-
-        private ByteBuffer _head;
+        private final ByteBuffer _head;
 
         private SaslTransportWrapper(TransportInput input, TransportOutput output)
         {
@@ -513,29 +502,6 @@ public class SaslImpl implements Sasl, S
                 {
                     _outputComplete = true;
                 }
-
-                // TODO if sasl is now 'done', it would be more efficient if any remaining space is
-                // now offered directly to _output rather than awaiting the next invocation.
-            }
-            else
-            {
-                int pending = _underlyingOutput.pending();
-                if (pending == Transport.END_OF_STREAM)
-                {
-                    _head_closed = true;
-                }
-                else
-                {
-                    ByteBuffer outputBuffer = _underlyingOutput.head();
-                    pour(outputBuffer, _outputBuffer);
-
-                    _underlyingOutput.pop(outputBuffer.position());
-
-                    if(_logger.isLoggable(Level.FINER))
-                    {
-                        _logger.log(Level.FINER, SaslImpl.this + " filled output buffer with plain output");
-                    }
-                }
             }
         }
 
@@ -557,24 +523,24 @@ public class SaslImpl implements Sasl, S
         public int capacity()
         {
             if (_tail_closed) return Transport.END_OF_STREAM;
-            if (isInputInSaslMode()) {
+            if (isInputInSaslMode())
+            {
                 return _inputBuffer.remaining();
-            } else {
-                int capacity = _underlyingInput.capacity();
-                if (capacity < 0) {
-                    return capacity;
-                } else {
-                    return _inputBuffer.remaining();
-                }
+            }
+            else
+            {
+                return _underlyingInput.capacity();
             }
         }
 
         @Override
         public ByteBuffer tail()
         {
-            // TODO if the SASL negotiation is complete, it would be more efficient
-            // to return the underlying transport input's buffer.
-            // The same optimisation would be possible in getOutputBuffer
+            if (!isInputInSaslMode())
+            {
+                return _underlyingInput.tail();
+            }
+
             return _inputBuffer;
         }
 
@@ -618,10 +584,19 @@ public class SaslImpl implements Sasl, S
                     _logger.log(Level.FINER, SaslImpl.this + " about to call plain input");
                 }
 
-                int bytes = pourAll(_inputBuffer, _underlyingInput);
-                if (bytes == Transport.END_OF_STREAM)
+                if (_inputBuffer.hasRemaining())
+                {
+                    int bytes = pourAll(_inputBuffer, _underlyingInput);
+                    if (bytes == Transport.END_OF_STREAM)
+                    {
+                        _tail_closed = true;
+                    }
+
+                    _underlyingInput.process();
+                }
+                else
                 {
-                    _tail_closed = true;
+                    _underlyingInput.process();
                 }
             }
         }
@@ -629,31 +604,55 @@ public class SaslImpl implements Sasl, S
         @Override
         public int pending()
         {
-            fillOutputBuffer();
-            _head.limit(_outputBuffer.position());
+            if (isOutputInSaslMode() || _outputBuffer.position() != 0)
+            {
+                fillOutputBuffer();
+                _head.limit(_outputBuffer.position());
 
-            if (_head_closed && _outputBuffer.position() == 0) {
-                return Transport.END_OF_STREAM;
-            } else {
-                return _outputBuffer.position();
+                if (_head_closed && _outputBuffer.position() == 0)
+                {
+                    return Transport.END_OF_STREAM;
+                }
+                else
+                {
+                    return _outputBuffer.position();
+                }
+            }
+            else
+            {
+                return _underlyingOutput.pending();
             }
         }
 
         @Override
         public ByteBuffer head()
         {
-            pending();
-            return _head;
+            if (isOutputInSaslMode() || _outputBuffer.position() != 0)
+            {
+                pending();
+                return _head;
+            }
+            else
+            {
+                return _underlyingOutput.head();
+            }
         }
 
         @Override
         public void pop(int bytes)
         {
-            _outputBuffer.flip();
-            _outputBuffer.position(bytes);
-            _outputBuffer.compact();
-            _head.position(0);
-            _head.limit(_outputBuffer.position());
+            if (isOutputInSaslMode() || _outputBuffer.position() != 0)
+            {
+                _outputBuffer.flip();
+                _outputBuffer.position(bytes);
+                _outputBuffer.compact();
+                _head.position(0);
+                _head.limit(_outputBuffer.position());
+            }
+            else
+            {
+                _underlyingOutput.pop(bytes);
+            }
         }
 
         @Override
@@ -661,6 +660,5 @@ public class SaslImpl implements Sasl, S
         {
             _underlyingOutput.close_head();
         }
-
     }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org