You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2014/02/06 17:00:36 UTC
svn commit: r1565316 - in
/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton:
driver/impl/ConnectorImpl.java engine/impl/SaslImpl.java
Author: rhs
Date: Thu Feb 6 16:00:36 2014
New Revision: 1565316
URL: http://svn.apache.org/r1565316
Log:
PROTON-508: applied patch from Timothy Bish
Modified:
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java
Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java?rev=1565316&r1=1565315&r2=1565316&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java Thu Feb 6 16:00:36 2014
@@ -154,24 +154,22 @@ class ConnectorImpl<C> implements Connec
boolean processed = false;
int interest = _key.interestOps();
- int pending = _transport.pending();
- if (pending >= 0)
+ boolean writeBlocked = false;
+
+ while (_transport.pending() >= 0 && !writeBlocked)
{
int wrote = _channel.write(_transport.head());
if (wrote > 0) {
processed = true;
_transport.pop(wrote);
+ } else {
+ writeBlocked = true;
}
}
- else
- {
- _outputDone = true;
- }
- pending = _transport.pending();
+ int pending = _transport.pending();
if (pending > 0) {
interest |= SelectionKey.OP_WRITE;
-
} else {
interest &= ~SelectionKey.OP_WRITE;
if (pending < 0) {
Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java?rev=1565316&r1=1565315&r2=1565316&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java Thu Feb 6 16:00:36 2014
@@ -21,9 +21,7 @@
package org.apache.qpid.proton.engine.impl;
-import static org.apache.qpid.proton.engine.impl.ByteBufferUtils.newReadableBuffer;
import static org.apache.qpid.proton.engine.impl.ByteBufferUtils.newWriteableBuffer;
-import static org.apache.qpid.proton.engine.impl.ByteBufferUtils.pour;
import static org.apache.qpid.proton.engine.impl.ByteBufferUtils.pourAll;
import static org.apache.qpid.proton.engine.impl.ByteBufferUtils.pourBufferToArray;
@@ -42,7 +40,6 @@ import org.apache.qpid.proton.amqp.secur
import org.apache.qpid.proton.codec.AMQPDefinedTypes;
import org.apache.qpid.proton.codec.DecoderImpl;
import org.apache.qpid.proton.codec.EncoderImpl;
-import org.apache.qpid.proton.codec.WritableBuffer;
import org.apache.qpid.proton.engine.Sasl;
import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.engine.TransportException;
@@ -96,8 +93,7 @@ public class SaslImpl implements Sasl, S
AMQPDefinedTypes.registerAllTypes(_decoder,_encoder);
_frameParser = new SaslFrameParser(this, _decoder);
- _frameWriter = new FrameWriter(_encoder, maxFrameSize, FrameWriter.SASL_FRAME_TYPE, null,
- this);
+ _frameWriter = new FrameWriter(_encoder, maxFrameSize, FrameWriter.SASL_FRAME_TYPE, null, this);
}
@Override
@@ -168,7 +164,6 @@ public class SaslImpl implements Sasl, S
{
processResponse();
}
-
}
}
@@ -203,7 +198,6 @@ public class SaslImpl implements Sasl, S
final int processHeader()
{
-
if(!_headerWritten)
{
_frameWriter.writeHeader(AmqpHeader.SASL_HEADER);
@@ -329,7 +323,6 @@ public class SaslImpl implements Sasl, S
if(saslInit.getInitialResponse() != null)
{
setPending(saslInit.getInitialResponse().asByteBuffer());
-
}
}
@@ -441,7 +434,6 @@ public class SaslImpl implements Sasl, S
System.arraycopy(passwordBytes, 0, data, 2+usernameBytes.length, passwordBytes.length);
setChallengeResponse(new Binary(data));
-
}
@Override
@@ -468,7 +460,6 @@ public class SaslImpl implements Sasl, S
_role = Role.SERVER;
}
-
public TransportWrapper wrap(final TransportInput input, final TransportOutput output)
{
return new SaslTransportWrapper(input, output);
@@ -487,14 +478,12 @@ public class SaslImpl implements Sasl, S
return builder.toString();
}
-
private class SaslTransportWrapper implements TransportWrapper
{
private final TransportInput _underlyingInput;
private final TransportOutput _underlyingOutput;
private boolean _outputComplete;
-
- private ByteBuffer _head;
+ private final ByteBuffer _head;
private SaslTransportWrapper(TransportInput input, TransportOutput output)
{
@@ -513,29 +502,6 @@ public class SaslImpl implements Sasl, S
{
_outputComplete = true;
}
-
- // TODO if sasl is now 'done', it would be more efficient if any remaining space is
- // now offered directly to _output rather than awaiting the next invocation.
- }
- else
- {
- int pending = _underlyingOutput.pending();
- if (pending == Transport.END_OF_STREAM)
- {
- _head_closed = true;
- }
- else
- {
- ByteBuffer outputBuffer = _underlyingOutput.head();
- pour(outputBuffer, _outputBuffer);
-
- _underlyingOutput.pop(outputBuffer.position());
-
- if(_logger.isLoggable(Level.FINER))
- {
- _logger.log(Level.FINER, SaslImpl.this + " filled output buffer with plain output");
- }
- }
}
}
@@ -557,24 +523,24 @@ public class SaslImpl implements Sasl, S
public int capacity()
{
if (_tail_closed) return Transport.END_OF_STREAM;
- if (isInputInSaslMode()) {
+ if (isInputInSaslMode())
+ {
return _inputBuffer.remaining();
- } else {
- int capacity = _underlyingInput.capacity();
- if (capacity < 0) {
- return capacity;
- } else {
- return _inputBuffer.remaining();
- }
+ }
+ else
+ {
+ return _underlyingInput.capacity();
}
}
@Override
public ByteBuffer tail()
{
- // TODO if the SASL negotiation is complete, it would be more efficient
- // to return the underlying transport input's buffer.
- // The same optimisation would be possible in getOutputBuffer
+ if (!isInputInSaslMode())
+ {
+ return _underlyingInput.tail();
+ }
+
return _inputBuffer;
}
@@ -618,10 +584,19 @@ public class SaslImpl implements Sasl, S
_logger.log(Level.FINER, SaslImpl.this + " about to call plain input");
}
- int bytes = pourAll(_inputBuffer, _underlyingInput);
- if (bytes == Transport.END_OF_STREAM)
+ if (_inputBuffer.hasRemaining())
+ {
+ int bytes = pourAll(_inputBuffer, _underlyingInput);
+ if (bytes == Transport.END_OF_STREAM)
+ {
+ _tail_closed = true;
+ }
+
+ _underlyingInput.process();
+ }
+ else
{
- _tail_closed = true;
+ _underlyingInput.process();
}
}
}
@@ -629,31 +604,55 @@ public class SaslImpl implements Sasl, S
@Override
public int pending()
{
- fillOutputBuffer();
- _head.limit(_outputBuffer.position());
+ if (isOutputInSaslMode() || _outputBuffer.position() != 0)
+ {
+ fillOutputBuffer();
+ _head.limit(_outputBuffer.position());
- if (_head_closed && _outputBuffer.position() == 0) {
- return Transport.END_OF_STREAM;
- } else {
- return _outputBuffer.position();
+ if (_head_closed && _outputBuffer.position() == 0)
+ {
+ return Transport.END_OF_STREAM;
+ }
+ else
+ {
+ return _outputBuffer.position();
+ }
+ }
+ else
+ {
+ return _underlyingOutput.pending();
}
}
@Override
public ByteBuffer head()
{
- pending();
- return _head;
+ if (isOutputInSaslMode() || _outputBuffer.position() != 0)
+ {
+ pending();
+ return _head;
+ }
+ else
+ {
+ return _underlyingOutput.head();
+ }
}
@Override
public void pop(int bytes)
{
- _outputBuffer.flip();
- _outputBuffer.position(bytes);
- _outputBuffer.compact();
- _head.position(0);
- _head.limit(_outputBuffer.position());
+ if (isOutputInSaslMode() || _outputBuffer.position() != 0)
+ {
+ _outputBuffer.flip();
+ _outputBuffer.position(bytes);
+ _outputBuffer.compact();
+ _head.position(0);
+ _head.limit(_outputBuffer.position());
+ }
+ else
+ {
+ _underlyingOutput.pop(bytes);
+ }
}
@Override
@@ -661,6 +660,5 @@ public class SaslImpl implements Sasl, S
{
_underlyingOutput.close_head();
}
-
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org