You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2014/02/10 20:16:38 UTC
svn commit: r1566706 - in /qpid/proton/trunk: proton-c/src/transport/
proton-j/src/main/java/org/apache/qpid/proton/amqp/
proton-j/src/main/java/org/apache/qpid/proton/driver/impl/
proton-j/src/main/java/org/apache/qpid/proton/engine/ proton-j/src/main...
Author: rhs
Date: Mon Feb 10 19:16:38 2014
New Revision: 1566706
URL: http://svn.apache.org/r1566706
Log:
PROTON-517: fixed ssl sniffing and ssl transport implementation; added certificate API to java Messenger
Modified:
qpid/proton/trunk/proton-c/src/transport/transport.c
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/amqp/Binary.java
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/Delivery.java
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapper.java
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslEngineFacadeFactory.java
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslHandshakeSniffingTransportWrapper.java
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslImpl.java
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/messenger/Messenger.java
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/Address.java
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
qpid/proton/trunk/proton-j/src/main/resources/cengine.py
qpid/proton/trunk/proton-j/src/main/resources/cmessenger.py
qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/CannedTransportOutput.java
qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapperTest.java
qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/SslHandshakeSniffingTransportWrapperTest.java
qpid/proton/trunk/tests/python/proton_tests/ssl.py
Modified: qpid/proton/trunk/proton-c/src/transport/transport.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/transport/transport.c?rev=1566706&r1=1566705&r2=1566706&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/transport/transport.c (original)
+++ qpid/proton/trunk/proton-c/src/transport/transport.c Mon Feb 10 19:16:38 2014
@@ -1083,8 +1083,9 @@ int pn_process_conn_setup(pn_transport_t
if (!(endpoint->state & PN_LOCAL_UNINIT) && !transport->open_sent)
{
pn_connection_t *connection = (pn_connection_t *) endpoint;
+ const char *cid = pn_string_get(connection->container);
int err = pn_post_frame(transport->disp, 0, "DL[SS?I?H?InnCCC]", OPEN,
- pn_string_get(connection->container),
+ cid ? cid : "",
pn_string_get(connection->hostname),
// if not zero, advertise our max frame size and idle timeout
(bool)transport->local_max_frame, transport->local_max_frame,
Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/amqp/Binary.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/amqp/Binary.java?rev=1566706&r1=1566705&r2=1566706&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/amqp/Binary.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/amqp/Binary.java Mon Feb 10 19:16:38 2014
@@ -165,7 +165,7 @@ public final class Binary
{
if( buffer == null )
return null;
- if( buffer.isDirect() )
+ if( buffer.isDirect() || buffer.isReadOnly() )
{
byte data[] = new byte [buffer.remaining()];
ByteBuffer dup = buffer.duplicate();
@@ -177,4 +177,5 @@ public final class Binary
return new Binary(buffer.array(), buffer.arrayOffset()+buffer.position(), buffer.remaining());
}
}
+
}
Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java?rev=1566706&r1=1566705&r2=1566706&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java Mon Feb 10 19:16:38 2014
@@ -31,6 +31,7 @@ import org.apache.qpid.proton.driver.Lis
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Sasl;
import org.apache.qpid.proton.engine.Transport;
+import org.apache.qpid.proton.engine.TransportException;
import org.apache.qpid.proton.engine.impl.TransportFactory;
class ConnectorImpl<C> implements Connector<C>
@@ -125,12 +126,17 @@ class ConnectorImpl<C> implements Connec
}
else
{
- int bytesRead = _channel.read(_transport.tail());
+ ByteBuffer tail = _transport.tail();
+ int bytesRead = _channel.read(tail);
if (bytesRead < 0) {
_transport.close_tail();
_inputDone = true;
} else if (bytesRead > 0) {
- _transport.process();
+ try {
+ _transport.process();
+ } catch (TransportException e) {
+ _logger.log(Level.SEVERE, this + " error processing input", e);
+ }
processed = true;
}
}
@@ -156,26 +162,35 @@ class ConnectorImpl<C> implements Connec
int interest = _key.interestOps();
boolean writeBlocked = false;
- while (_transport.pending() >= 0 && !writeBlocked)
- {
- int wrote = _channel.write(_transport.head());
- if (wrote > 0) {
- processed = true;
- _transport.pop(wrote);
- } else {
- writeBlocked = true;
+ try {
+ while (_transport.pending() > 0 && !writeBlocked)
+ {
+ ByteBuffer head = _transport.head();
+ int wrote = _channel.write(head);
+ if (wrote > 0) {
+ processed = true;
+ _transport.pop(wrote);
+ } else {
+ writeBlocked = true;
+ }
}
- }
- int pending = _transport.pending();
- if (pending > 0) {
- interest |= SelectionKey.OP_WRITE;
- } else {
- interest &= ~SelectionKey.OP_WRITE;
- if (pending < 0) {
- _outputDone = true;
+ int pending = _transport.pending();
+ if (pending > 0) {
+ interest |= SelectionKey.OP_WRITE;
+ } else {
+ interest &= ~SelectionKey.OP_WRITE;
+ if (pending < 0) {
+ _outputDone = true;
+ }
}
+ } catch (TransportException e) {
+ _logger.log(Level.SEVERE, this + " error", e);
+ interest &= ~SelectionKey.OP_WRITE;
+ _inputDone = true;
+ _outputDone = true;
}
+
_key.interestOps(interest);
return processed;
Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/Delivery.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/Delivery.java?rev=1566706&r1=1566705&r2=1566706&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/Delivery.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/Delivery.java Mon Feb 10 19:16:38 2014
@@ -78,6 +78,8 @@ public interface Delivery
*/
public Delivery getWorkNext();
+ public Delivery next();
+
public boolean isWritable();
/**
Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java?rev=1566706&r1=1566705&r2=1566706&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java Mon Feb 10 19:16:38 2014
@@ -75,9 +75,9 @@ public interface Link extends Endpoint
public Delivery delivery(byte[] tag, int offset, int length);
/**
- * @return the unsettled deliveries for this link
+ * Returns the head delivery on the link.
*/
- public Iterator<Delivery> unsettled();
+ Delivery head();
/**
* Returns the current delivery
Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java?rev=1566706&r1=1566705&r2=1566706&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java Mon Feb 10 19:16:38 2014
@@ -157,6 +157,11 @@ public class DeliveryImpl implements Del
return _linkNext;
}
+ public DeliveryImpl next()
+ {
+ return getLinkNext();
+ }
+
public void free()
{
Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java?rev=1566706&r1=1566705&r2=1566706&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java Mon Feb 10 19:16:38 2014
@@ -177,6 +177,6 @@ public abstract class EndpointImpl imple
@Override
public String toString()
{
- return "EndpointImpl [_localState=" + _localState + ", _remoteState=" + _remoteState + ", _localError=" + _localError + ", _remoteError=" + _remoteError + "]";
+ return "EndpointImpl(" + System.identityHashCode(this) + ") [_localState=" + _localState + ", _remoteState=" + _remoteState + ", _localError=" + _localError + ", _remoteError=" + _remoteError + "]";
}
}
Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java?rev=1566706&r1=1566705&r2=1566706&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java Mon Feb 10 19:16:38 2014
@@ -381,4 +381,10 @@ public abstract class LinkImpl extends E
{
return _credit - _queued;
}
+
+ public DeliveryImpl head()
+ {
+ return _head;
+ }
+
}
Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java?rev=1566706&r1=1566705&r2=1566706&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java Mon Feb 10 19:16:38 2014
@@ -96,11 +96,6 @@ public class ReceiverImpl extends LinkIm
return consumed;
}
- public Iterator<Delivery> unsettled()
- {
- return null; //TODO.
- }
-
public void free()
{
getSession().freeReceiver(this);
Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java?rev=1566706&r1=1566705&r2=1566706&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java Mon Feb 10 19:16:38 2014
@@ -563,6 +563,9 @@ public class SaslImpl implements Sasl, S
public void close_tail()
{
_tail_closed = true;
+ if (isInputInSaslMode()) {
+ _head_closed = true;
+ }
}
private void reallyProcessInput() throws TransportException
Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java?rev=1566706&r1=1566705&r2=1566706&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java Mon Feb 10 19:16:38 2014
@@ -63,12 +63,6 @@ public class SenderImpl extends LinkImp
//TODO.
}
- public Iterator<Delivery> unsettled()
- {
- return null; //TODO.
- }
-
-
public void free()
{
getSession().freeSender(this);
Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java?rev=1566706&r1=1566705&r2=1566706&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java Mon Feb 10 19:16:38 2014
@@ -108,6 +108,7 @@ public class TransportImpl extends Endpo
private boolean _init;
private FrameHandler _frameHandler = this;
+ private boolean _head_closed = false;
/**
* @deprecated This constructor's visibility will be reduced to the default scope in a future release.
@@ -272,7 +273,7 @@ public class TransportImpl extends Endpo
_frameWriter.readBytes(outputBuffer);
- return _isCloseSent;
+ return _isCloseSent || _head_closed;
}
@Override
@@ -716,7 +717,8 @@ public class TransportImpl extends Endpo
if(_connectionEndpoint != null && _connectionEndpoint.getLocalState() != EndpointState.UNINITIALIZED && !_isOpenSent)
{
Open open = new Open();
- open.setContainerId(_connectionEndpoint.getLocalContainerId());
+ String cid = _connectionEndpoint.getLocalContainerId();
+ open.setContainerId(cid == null ? "" : cid);
open.setHostname(_connectionEndpoint.getHostname());
open.setDesiredCapabilities(_connectionEndpoint.getDesiredCapabilities());
open.setOfferedCapabilities(_connectionEndpoint.getOfferedCapabilities());
@@ -1236,8 +1238,13 @@ public class TransportImpl extends Endpo
@Override
public void process() throws TransportException
{
- init();
- _inputProcessor.process();
+ try {
+ init();
+ _inputProcessor.process();
+ } catch (TransportException e) {
+ _head_closed = true;
+ throw e;
+ }
}
@Override
Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapper.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapper.java?rev=1566706&r1=1566705&r2=1566706&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapper.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapper.java Mon Feb 10 19:16:38 2014
@@ -53,20 +53,18 @@ public class SimpleSslTransportWrapper i
private final TransportOutput _underlyingOutput;
private boolean _tail_closed = false;
- private final ByteBuffer _inputBuffer;
+ private ByteBuffer _inputBuffer;
- private boolean _head_closed = true;
- private final ByteBuffer _outputBuffer;
- private final ByteBuffer _head;
+ private boolean _head_closed = false;
+ private ByteBuffer _outputBuffer;
+ private ByteBuffer _head;
/**
* A buffer for the decoded bytes that will be passed to _underlyingInput.
* This extra layer of buffering is necessary in case the underlying input's buffer
* is too small for SSLEngine to ever unwrap into.
*/
- private final ByteBuffer _decodedInputBuffer;
-
- private ByteBuffer _readOnlyOutputBufferView;
+ private ByteBuffer _decodedInputBuffer;
/** could change during the lifetime of the ssl connection owing to renegotiation. */
private String _cipherName;
@@ -91,7 +89,7 @@ public class SimpleSslTransportWrapper i
_head = _outputBuffer.asReadOnlyBuffer();
_head.limit(0);
- _decodedInputBuffer = newReadableBuffer(effectiveAppBufferMax);
+ _decodedInputBuffer = newWriteableBuffer(effectiveAppBufferMax);
if(_logger.isLoggable(Level.FINE))
{
@@ -108,59 +106,81 @@ public class SimpleSslTransportWrapper i
* - On exit, it is still readable and its "remaining" bytes are those that we were unable
* to unwrap (e.g. if they don't form a whole packet).
*/
- private void unwrapInput() throws TransportException
+ private void unwrapInput() throws SSLException
{
- try
- {
- boolean keepLooping = true;
- do
- {
- _decodedInputBuffer.compact();
- SSLEngineResult result = _sslEngine.unwrap(_inputBuffer, _decodedInputBuffer);
+ while (true) {
+ SSLEngineResult result = _sslEngine.unwrap(_inputBuffer, _decodedInputBuffer);
+ logEngineClientModeAndResult(result, "input");
+
+ int read = result.bytesProduced();
+ Status status = result.getStatus();
+ HandshakeStatus hstatus = result.getHandshakeStatus();
+
+ int capacity = _underlyingInput.capacity();
+ if (capacity == Transport.END_OF_STREAM) {
+ _tail_closed = true;
+ if (_decodedInputBuffer.position() > 0) {
+ throw new TransportException("bytes left unconsumed");
+ }
+ } else {
+ ByteBuffer tail = _underlyingInput.tail();
_decodedInputBuffer.flip();
-
- runDelegatedTasks(result);
- updateCipherAndProtocolName(result);
-
- logEngineClientModeAndResult(result, "input");
-
- Status sslResultStatus = result.getStatus();
- HandshakeStatus handshakeStatus = result.getHandshakeStatus();
-
- if(sslResultStatus == SSLEngineResult.Status.OK)
- {
- // continue
+ tail.put(_decodedInputBuffer);
+ _decodedInputBuffer.compact();
+ _underlyingInput.process();
+ capacity = _underlyingInput.capacity();
+ if (capacity == Transport.END_OF_STREAM) {
+ _tail_closed = true;
}
- else if(sslResultStatus == SSLEngineResult.Status.BUFFER_UNDERFLOW)
+ }
+
+ switch (status) {
+ case CLOSED:
+ _tail_closed = true;
+ break;
+ case BUFFER_OVERFLOW:
{
- // Not an error. The not-yet-decoded bytes remain in _inputBuffer and will hopefully be augmented by some more
- // in a subsequent invocation of this method, allowing decoding to be done.
+ ByteBuffer old = _decodedInputBuffer;
+ _decodedInputBuffer = newWriteableBuffer(old.capacity()*2);
+ old.flip();
+ _decodedInputBuffer.put(old);
}
- else
- {
- throw new IllegalStateException("Unexpected SSL Engine state " + sslResultStatus);
+ continue;
+ case BUFFER_UNDERFLOW:
+ if (_tail_closed) {
+ _head_closed = true;
}
+ // wait for more data
+ break;
+ case OK:
+ break;
+ }
- if(result.bytesProduced() > 0)
- {
- if(handshakeStatus != HandshakeStatus.NOT_HANDSHAKING)
- {
- _logger.warning("WARN unexpectedly produced bytes for the underlying input when handshaking");
+ switch (hstatus)
+ {
+ case NEED_WRAP:
+ // wait for write to kick in
+ break;
+ case NEED_TASK:
+ runDelegatedTasks(result);
+ continue;
+ case FINISHED:
+ updateCipherAndProtocolName(result);
+ case NOT_HANDSHAKING:
+ case NEED_UNWRAP:
+ if (_inputBuffer.position() > 0 && status == Status.OK) {
+ continue;
+ } else {
+ if (_inputBuffer.position() == 0 &&
+ hstatus == HandshakeStatus.NEED_UNWRAP &&
+ _tail_closed) {
+ _head_closed = true;
}
-
- pourAll(_decodedInputBuffer, _underlyingInput);
+ break;
}
-
- keepLooping = handshakeStatus == HandshakeStatus.NOT_HANDSHAKING
- && sslResultStatus != Status.BUFFER_UNDERFLOW;
}
- while(keepLooping);
- }
- catch(SSLException e)
- {
- throw new TransportException("Problem during input. useClientMode: "
- + _sslEngine.getUseClientMode(),
- e);
+
+ break;
}
}
@@ -170,57 +190,66 @@ public class SimpleSslTransportWrapper i
* {@link #_outputBuffer} is assumed to be writeable on entry and is guaranteed to
* be still writeable on exit.
*/
- private void wrapOutput()
+ private void wrapOutput() throws SSLException
{
- boolean keepWrapping = hasSpaceForSslPacket(_outputBuffer);
- while(keepWrapping)
- {
+ while (true) {
int pending = _underlyingOutput.pending();
- if (pending == Transport.END_OF_STREAM)
- {
+ if (pending < 0) {
_head_closed = true;
- keepWrapping = false;
}
ByteBuffer clearOutputBuffer = _underlyingOutput.head();
- try
- {
- if(clearOutputBuffer.hasRemaining())
- {
- SSLEngineResult result = _sslEngine.wrap(clearOutputBuffer, _outputBuffer);
+ SSLEngineResult result = _sslEngine.wrap(clearOutputBuffer, _outputBuffer);
+ logEngineClientModeAndResult(result, "output");
- logEngineClientModeAndResult(result, "output");
-
- Status sslResultStatus = result.getStatus();
- if(sslResultStatus == SSLEngineResult.Status.BUFFER_OVERFLOW)
- {
- throw new IllegalStateException("Insufficient space to perform wrap into encoded output buffer. Buffer: " + _outputBuffer);
- }
- else if(sslResultStatus != SSLEngineResult.Status.OK)
- {
- throw new RuntimeException("Unexpected SSLEngineResult status " + sslResultStatus);
- }
-
- runDelegatedTasks(result);
- updateCipherAndProtocolName(result);
+ int written = result.bytesConsumed();
+ _underlyingOutput.pop(written);
+ pending = _underlyingOutput.pending();
+
+ Status status = result.getStatus();
+ switch (status) {
+ case CLOSED:
+ _head_closed = true;
+ break;
+ case OK:
+ break;
+ case BUFFER_OVERFLOW:
+ ByteBuffer old = _outputBuffer;
+ _outputBuffer = newWriteableBuffer(_outputBuffer.capacity()*2);
+ _head = _outputBuffer.asReadOnlyBuffer();
+ old.flip();
+ _outputBuffer.put(old);
+ continue;
+ case BUFFER_UNDERFLOW:
+ throw new IllegalStateException("app buffer underflow");
+ }
- keepWrapping = result.getHandshakeStatus() == HandshakeStatus.NOT_HANDSHAKING
- && hasSpaceForSslPacket(_outputBuffer);
+ HandshakeStatus hstatus = result.getHandshakeStatus();
+ switch (hstatus) {
+ case NEED_UNWRAP:
+ // wait for input data
+ if (_inputBuffer.position() == 0 && _tail_closed) {
+ _head_closed = true;
}
- else
- {
- // no output to wrap
- keepWrapping = false;
+ break;
+ case NEED_WRAP:
+ // keep looping
+ continue;
+ case NEED_TASK:
+ runDelegatedTasks(result);
+ continue;
+ case FINISHED:
+ updateCipherAndProtocolName(result);
+ // intentionally fall through
+ case NOT_HANDSHAKING:
+ if (pending > 0 && status == Status.OK) {
+ continue;
+ } else {
+ break;
}
}
- catch(SSLException e)
- {
- throw new TransportException("Problem during output. useClientMode: " + _sslEngine.getUseClientMode(), e);
- }
- finally
- {
- _underlyingOutput.pop(clearOutputBuffer.position());
- }
+
+ break;
}
}
@@ -275,7 +304,7 @@ public class SimpleSslTransportWrapper i
if(_logger.isLoggable(Level.FINEST))
{
_logger.log(Level.FINEST, "useClientMode = " + _sslEngine.getUseClientMode() + " direction = " + direction
- + " " + resultToString(result));
+ + " " + resultToString(result));
}
}
@@ -311,7 +340,11 @@ public class SimpleSslTransportWrapper i
try
{
- unwrapInput();
+ try {
+ unwrapInput();
+ } catch (SSLException e) {
+ throw new TransportException(e);
+ }
}
catch (TransportException e)
{
@@ -338,7 +371,12 @@ public class SimpleSslTransportWrapper i
@Override
public int pending()
{
- wrapOutput();
+ try {
+ wrapOutput();
+ } catch (SSLException e) {
+ throw new TransportException(e);
+ }
+
_head.limit(_outputBuffer.position());
if (_head_closed && _outputBuffer.position() == 0)
Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslEngineFacadeFactory.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslEngineFacadeFactory.java?rev=1566706&r1=1566705&r2=1566706&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslEngineFacadeFactory.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslEngineFacadeFactory.java Mon Feb 10 19:16:38 2014
@@ -56,6 +56,7 @@ import javax.net.ssl.X509TrustManager;
import org.apache.qpid.proton.engine.SslDomain;
import org.apache.qpid.proton.engine.SslPeerDetails;
import org.apache.qpid.proton.ProtonUnsupportedOperationException;
+import org.apache.qpid.proton.engine.TransportException;
public class SslEngineFacadeFactory
{
@@ -93,7 +94,7 @@ public class SslEngineFacadeFactory
try {
return klass.getConstructor(params);
} catch (NoSuchMethodException e) {
- throw new RuntimeException(e);
+ throw new TransportException(e);
}
}
@@ -105,7 +106,7 @@ public class SslEngineFacadeFactory
try {
return klass.getMethod(name, params);
} catch (NoSuchMethodException e) {
- throw new RuntimeException(e);
+ throw new TransportException(e);
}
}
@@ -260,19 +261,19 @@ public class SslEngineFacadeFactory
}
catch (NoSuchAlgorithmException e)
{
- throw new IllegalStateException("Unexpected exception creating SSLContext", e);
+ throw new TransportException("Unexpected exception creating SSLContext", e);
}
catch (KeyStoreException e)
{
- throw new IllegalStateException("Unexpected exception creating SSLContext", e);
+ throw new TransportException("Unexpected exception creating SSLContext", e);
}
catch (UnrecoverableKeyException e)
{
- throw new IllegalStateException("Unexpected exception creating SSLContext", e);
+ throw new TransportException("Unexpected exception creating SSLContext", e);
}
catch (KeyManagementException e)
{
- throw new IllegalStateException("Unexpected exception creating SSLContext", e);
+ throw new TransportException("Unexpected exception creating SSLContext", e);
}
}
return _sslContext;
@@ -318,7 +319,7 @@ public class SslEngineFacadeFactory
else
{
// Should not happen - readPemObject will have already verified key type
- throw new IllegalStateException("Unexpected key type " + keyOrKeyPair);
+ throw new TransportException("Unexpected key type " + keyOrKeyPair);
}
keystore.setKeyEntry(clientPrivateKeyAlias, clientPrivateKey,
@@ -328,19 +329,19 @@ public class SslEngineFacadeFactory
}
catch (KeyStoreException e)
{
- throw new IllegalStateException("Unexpected exception creating keystore", e);
+ throw new TransportException("Unexpected exception creating keystore", e);
}
catch (NoSuchAlgorithmException e)
{
- throw new IllegalStateException("Unexpected exception creating keystore", e);
+ throw new TransportException("Unexpected exception creating keystore", e);
}
catch (CertificateException e)
{
- throw new IllegalStateException("Unexpected exception creating keystore", e);
+ throw new TransportException("Unexpected exception creating keystore", e);
}
catch (IOException e)
{
- throw new IllegalStateException("Unexpected exception creating keystore", e);
+ throw new TransportException("Unexpected exception creating keystore", e);
}
}
@@ -370,9 +371,10 @@ public class SslEngineFacadeFactory
if (addedAnonymousCipherSuites == 0)
{
- throw new IllegalStateException("None of " + anonymousCipherSuites
- + " anonymous cipher suites are within the supported list "
- + supportedSuites);
+ throw new TransportException
+ ("None of " + anonymousCipherSuites
+ + " anonymous cipher suites are within the supported list "
+ + supportedSuites);
}
if(_logger.isLoggable(Level.FINE))
@@ -423,29 +425,31 @@ public class SslEngineFacadeFactory
Object pemObject = readObjectMeth.invoke(pemReader);
if (!checkPemObjectIsOfAllowedTypes(pemObject, expectedInterfaces))
{
- throw new IllegalStateException("File " + pemFile + " does not provide a object of the required type."
- + " Read an object of class " + pemObject.getClass().getName()
- + " whilst expecting an implementation of one of the following : " + Arrays.asList(expectedInterfaces));
+ throw new TransportException
+ ("File " + pemFile + " does not provide a object of the required type."
+ + " Read an object of class " + pemObject.getClass().getName()
+ + " whilst expecting an implementation of one of the following : "
+ + Arrays.asList(expectedInterfaces));
}
return pemObject;
}
catch(InstantiationException e)
{
_logger.log(Level.SEVERE, "Unable to read PEM object. Perhaps you need the unlimited strength libraries in <java-home>/jre/lib/security/ ?", e);
- throw new IllegalStateException("Unable to read PEM object from file " + pemFile, e);
+ throw new TransportException("Unable to read PEM object from file " + pemFile, e);
}
catch(InvocationTargetException e)
{
_logger.log(Level.SEVERE, "Unable to read PEM object. Perhaps you need the unlimited strength libraries in <java-home>/jre/lib/security/ ?", e);
- throw new IllegalStateException("Unable to read PEM object from file " + pemFile, e);
+ throw new TransportException("Unable to read PEM object from file " + pemFile, e);
}
catch (IllegalAccessException e)
{
- throw new RuntimeException(e);
+ throw new TransportException(e);
}
catch (IOException e)
{
- throw new RuntimeException("Unable to read PEM object from file " + pemFile, e);
+ throw new TransportException("Unable to read PEM object from file " + pemFile, e);
}
finally
{
@@ -508,13 +512,13 @@ public class SslEngineFacadeFactory
return finder;
} catch (NoSuchMethodException e) {
- throw new RuntimeException(e);
+ throw new TransportException(e);
} catch (InstantiationException e) {
- throw new RuntimeException(e);
+ throw new TransportException(e);
} catch (IllegalAccessException e) {
- throw new RuntimeException(e);
+ throw new TransportException(e);
} catch (InvocationTargetException e) {
- throw new RuntimeException(e);
+ throw new TransportException(e);
}
}
Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslHandshakeSniffingTransportWrapper.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslHandshakeSniffingTransportWrapper.java?rev=1566706&r1=1566705&r2=1566706&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslHandshakeSniffingTransportWrapper.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslHandshakeSniffingTransportWrapper.java Mon Feb 10 19:16:38 2014
@@ -33,14 +33,14 @@ public class SslHandshakeSniffingTranspo
private final TransportWrapper _plainTransportWrapper;
private boolean _tail_closed = false;
- private boolean _head_closed = true;
+ private boolean _head_closed = false;
private TransportWrapper _selectedTransportWrapper;
private final ByteBuffer _determinationBuffer = ByteBuffer.allocate(MINIMUM_LENGTH_FOR_DETERMINATION);
- SslHandshakeSniffingTransportWrapper(
- SslTransportWrapper secureTransportWrapper,
- TransportWrapper plainTransportWrapper)
+ SslHandshakeSniffingTransportWrapper
+ (SslTransportWrapper secureTransportWrapper,
+ TransportWrapper plainTransportWrapper)
{
_secureTransportWrapper = secureTransportWrapper;
_plainTransportWrapper = plainTransportWrapper;
@@ -51,12 +51,12 @@ public class SslHandshakeSniffingTranspo
{
if (isDeterminationMade())
{
- return _secureTransportWrapper.capacity();
+ return _selectedTransportWrapper.capacity();
}
else
{
if (_tail_closed) { return Transport.END_OF_STREAM; }
- return _determinationBuffer.capacity();
+ return _determinationBuffer.remaining();
}
}
@@ -80,7 +80,7 @@ public class SslHandshakeSniffingTranspo
{
_selectedTransportWrapper.process();
}
- else
+ else if (_determinationBuffer.remaining() == 0)
{
_determinationBuffer.flip();
byte[] bytesInput = new byte[_determinationBuffer.remaining()];
@@ -91,6 +91,8 @@ public class SslHandshakeSniffingTranspo
// TODO what if the selected transport has insufficient capacity?? Maybe use pour, and then try to finish pouring next time round.
_selectedTransportWrapper.tail().put(_determinationBuffer);
_selectedTransportWrapper.process();
+ } else if (_tail_closed) {
+ throw new TransportException("connection aborted");
}
}
@@ -111,32 +113,43 @@ public class SslHandshakeSniffingTranspo
public int pending()
{
if (_head_closed) { return Transport.END_OF_STREAM; }
- makePlainUnlessDeterminationAlreadyMade();
- return _selectedTransportWrapper.pending();
+ if (isDeterminationMade()) {
+ return _selectedTransportWrapper.pending();
+ } else {
+ return 0;
+ }
+
}
@Override
public ByteBuffer head()
{
- makePlainUnlessDeterminationAlreadyMade();
-
- return _selectedTransportWrapper.head();
+ if (isDeterminationMade()) {
+ return _selectedTransportWrapper.head();
+ } else {
+ return null;
+ }
}
@Override
public void pop(int bytes)
{
- makePlainUnlessDeterminationAlreadyMade();
-
- _selectedTransportWrapper.pop(bytes);
+ if (isDeterminationMade()) {
+ _selectedTransportWrapper.pop(bytes);
+ } else if (bytes > 0) {
+ throw new IllegalStateException("no bytes have been read");
+ }
}
@Override
public void close_head()
{
- makePlainUnlessDeterminationAlreadyMade();
- _selectedTransportWrapper.close_head();
+ if (isDeterminationMade()) {
+ _selectedTransportWrapper.close_head();
+ } else {
+ _head_closed = true;
+ }
}
@Override
@@ -235,11 +248,4 @@ public class SslHandshakeSniffingTranspo
}
}
- private void makePlainUnlessDeterminationAlreadyMade()
- {
- if (!isDeterminationMade())
- {
- _selectedTransportWrapper = _plainTransportWrapper;
- }
- }
}
Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslImpl.java?rev=1566706&r1=1566705&r2=1566706&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslImpl.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslImpl.java Mon Feb 10 19:16:38 2014
@@ -26,6 +26,7 @@ import org.apache.qpid.proton.ProtonUnsu
import org.apache.qpid.proton.engine.Ssl;
import org.apache.qpid.proton.engine.SslDomain;
import org.apache.qpid.proton.engine.SslPeerDetails;
+import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.engine.TransportException;
import org.apache.qpid.proton.engine.impl.TransportInput;
import org.apache.qpid.proton.engine.impl.TransportOutput;
@@ -40,6 +41,7 @@ public class SslImpl implements Ssl
private final ProtonSslEngineProvider _protonSslEngineProvider;
private final SslPeerDetails _peerDetails;
+ private TransportException _initException;
/**
* @param sslDomain must implement {@link ProtonSslEngineProvider}. This is not possible
@@ -103,14 +105,22 @@ public class SslImpl implements Ssl
public int capacity()
{
initTransportWrapperOnFirstIO();
- return _transportWrapper.capacity();
+ if (_initException == null) {
+ return _transportWrapper.capacity();
+ } else {
+ return Transport.END_OF_STREAM;
+ }
}
@Override
public ByteBuffer tail()
{
initTransportWrapperOnFirstIO();
- return _transportWrapper.tail();
+ if (_initException == null) {
+ return _transportWrapper.tail();
+ } else {
+ return null;
+ }
}
@@ -118,42 +128,60 @@ public class SslImpl implements Ssl
public void process() throws TransportException
{
initTransportWrapperOnFirstIO();
- _transportWrapper.process();
+ if (_initException == null) {
+ _transportWrapper.process();
+ } else {
+ throw new TransportException(_initException);
+ }
}
@Override
public void close_tail()
{
initTransportWrapperOnFirstIO();
- _transportWrapper.process();
+ if (_initException == null) {
+ _transportWrapper.close_tail();
+ }
}
@Override
public int pending()
{
initTransportWrapperOnFirstIO();
- return _transportWrapper.pending();
+ if (_initException == null) {
+ return _transportWrapper.pending();
+ } else {
+ throw new TransportException(_initException);
+ }
}
@Override
public ByteBuffer head()
{
initTransportWrapperOnFirstIO();
- return _transportWrapper.head();
+ if (_initException == null) {
+ return _transportWrapper.head();
+ } else {
+ return null;
+ }
}
@Override
public void pop(int bytes)
{
initTransportWrapperOnFirstIO();
- _transportWrapper.pop(bytes);
+ if (_initException == null) {
+ _transportWrapper.pop(bytes);
+ }
}
@Override
public void close_head()
{
initTransportWrapperOnFirstIO();
- _transportWrapper.close_head();
+ if (_initException == null) {
+ _transportWrapper.close_head();
+ }
}
@Override
@@ -184,22 +212,27 @@ public class SslImpl implements Ssl
private void initTransportWrapperOnFirstIO()
{
- if (_transportWrapper == null)
- {
- SslTransportWrapper sslTransportWrapper = new SimpleSslTransportWrapper(
- _protonSslEngineProvider.createSslEngine(_peerDetails),
- _inputProcessor,
- _outputProcessor);
-
- if (_domain.allowUnsecuredClient())
- {
- TransportWrapper plainTransportWrapper = new PlainTransportWrapper(_outputProcessor, _inputProcessor);
- _transportWrapper = new SslHandshakeSniffingTransportWrapper(sslTransportWrapper, plainTransportWrapper);
- }
- else
+ try {
+ if (_initException == null && _transportWrapper == null)
{
- _transportWrapper = sslTransportWrapper;
+ SslTransportWrapper sslTransportWrapper = new SimpleSslTransportWrapper
+ (_protonSslEngineProvider.createSslEngine(_peerDetails),
+ _inputProcessor, _outputProcessor);
+
+ if (_domain.allowUnsecuredClient() && _domain.getMode() == SslDomain.Mode.SERVER)
+ {
+ TransportWrapper plainTransportWrapper = new PlainTransportWrapper
+ (_outputProcessor, _inputProcessor);
+ _transportWrapper = new SslHandshakeSniffingTransportWrapper
+ (sslTransportWrapper, plainTransportWrapper);
+ }
+ else
+ {
+ _transportWrapper = sslTransportWrapper;
+ }
}
+ } catch (TransportException e) {
+ _initException = e;
}
}
}
Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/messenger/Messenger.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/messenger/Messenger.java?rev=1566706&r1=1566705&r2=1566706&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/messenger/Messenger.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/messenger/Messenger.java Mon Feb 10 19:16:38 2014
@@ -218,8 +218,48 @@ public interface Messenger
*/
Status getStatus(Tracker tracker);
- public void route(String pattern, String address);
+ void route(String pattern, String address);
- public void rewrite(String pattern, String address);
+ void rewrite(String pattern, String address);
+
+ /**
+ * Set the path to the certificate file.
+ */
+ void setCertificate(String certificate);
+
+ /**
+ * Get the path to the certificate file.
+ */
+ String getCertificate();
+
+ /**
+ * Set the path to private key file.
+ */
+ void setPrivateKey(String privateKey);
+
+ /**
+ * Get the path to the private key file.
+ */
+ String getPrivateKey();
+
+ /**
+ * Set the password for private key file.
+ */
+ void setPassword(String password);
+
+ /**
+ * Get the password for the priate key file.
+ */
+ String getPassword();
+
+ /**
+ * Set the path to the trusted certificate database.
+ */
+ void setTrustedCertificates(String trusted);
+
+ /**
+ * Get the path to the trusted certificate database.
+ */
+ String getTrustedCertificates();
}
Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/Address.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/Address.java?rev=1566706&r1=1566705&r2=1566706&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/Address.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/Address.java Mon Feb 10 19:16:38 2014
@@ -132,6 +132,21 @@ class Address
return _port;
}
+ public String getImpliedPort()
+ {
+ if (_port == null) {
+ return getDefaultPort();
+ } else {
+ return getPort();
+ }
+ }
+
+ public String getDefaultPort()
+ {
+ if ("amqps".equals(_scheme)) return "5671";
+ else return "5672";
+ }
+
public String getName()
{
return _name;
Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java?rev=1566706&r1=1566705&r2=1566706&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java Mon Feb 10 19:16:38 2014
@@ -45,6 +45,9 @@ import org.apache.qpid.proton.engine.Rec
import org.apache.qpid.proton.engine.Sasl;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
+import org.apache.qpid.proton.engine.SslDomain;
+import org.apache.qpid.proton.engine.Ssl;
+import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.message.MessageFactory;
import org.apache.qpid.proton.messenger.Messenger;
@@ -98,6 +101,11 @@ public class MessengerImpl implements Me
private Transform _routes = new Transform();
private Transform _rewrites = new Transform();
+ private String _certificate;
+ private String _privateKey;
+ private String _password;
+ private String _trustedDb;
+
/**
* @deprecated This constructor's visibility will be reduced to the default scope in a future release.
@@ -137,6 +145,46 @@ public class MessengerImpl implements Me
_blocking = b;
}
+ public void setCertificate(String certificate)
+ {
+ _certificate = certificate;
+ }
+
+ public String getCertificate()
+ {
+ return _certificate;
+ }
+
+ public void setPrivateKey(String privateKey)
+ {
+ _privateKey = privateKey;
+ }
+
+ public String getPrivateKey()
+ {
+ return _privateKey;
+ }
+
+ public void setPassword(String password)
+ {
+ _password = password;
+ }
+
+ public String getPassword()
+ {
+ return _password;
+ }
+
+ public void setTrustedCertificates(String trusted)
+ {
+ _trustedDb = trusted;
+ }
+
+ public String getTrustedCertificates()
+ {
+ return _trustedDb;
+ }
+
public void start() throws IOException
{
_driver = Proton.driver();
@@ -290,9 +338,7 @@ public class MessengerImpl implements Me
restoreMessage(m);
}
- String ports = address.getPort() == null ? defaultPort(address.getScheme()) : address.getPort();
- int port = Integer.valueOf(ports);
- Sender sender = getLink(address.getHost(), port, new SenderFinder(address.getName()));
+ Sender sender = getLink(address, new SenderFinder(address.getName()));
pumpOut(m.getAddress(), sender);
}
@@ -308,17 +354,18 @@ public class MessengerImpl implements Me
}
}
- Iterator<Delivery> dIter = link.unsettled();
- while (dIter != null && dIter.hasNext())
+ Delivery delivery = link.head();
+ while (delivery != null)
{
- Delivery delivery = (Delivery) dIter.next();
StoreEntry entry = (StoreEntry) delivery.getContext();
if (entry != null)
{
entry.setDelivery(null);
- if (delivery.isBuffered())
+ if (delivery.isBuffered()) {
entry.setStatus(Status.ABORTED);
+ }
}
+ delivery = delivery.next();
}
linkRemoved(link);
}
@@ -494,15 +541,14 @@ public class MessengerImpl implements Me
String hostName = address.getHost();
if (hostName == null) throw new MessengerException("Invalid address (hostname cannot be null): " + routed);
- String ports = address.getPort() == null ? defaultPort(address.getScheme()) : address.getPort();
- int port = Integer.valueOf(ports);
+ int port = Integer.valueOf(address.getImpliedPort());
if (address.isPassive())
{
if(_logger.isLoggable(Level.FINE))
{
_logger.fine(this + " about to subscribe to source " + source + " using address " + hostName + ":" + port);
}
- ListenerContext ctx = new ListenerContext( address.getScheme(), hostName, ports );
+ ListenerContext ctx = new ListenerContext(address);
_driver.createListener(hostName, port, ctx);
}
else
@@ -511,7 +557,7 @@ public class MessengerImpl implements Me
{
_logger.fine(this + " about to subscribe to source " + source);
}
- getLink(hostName, port, new ReceiverFinder(address.getName()));
+ getLink(address, new ReceiverFinder(address.getName()));
}
}
@@ -666,9 +712,10 @@ public class MessengerImpl implements Me
Connection connection = Proton.connection();
connection.setContainer(_name);
ListenerContext ctx = (ListenerContext) l.getContext();
- connection.setContext(new ConnectionContext(ctx.getService(), c));
+ connection.setContext(new ConnectionContext(ctx.getAddress(), c));
c.setConnection(connection);
- //TODO: SSL and full SASL
+ Transport transport = c.getTransport();
+ //TODO: full SASL
Sasl sasl = c.sasl();
if (sasl != null)
{
@@ -676,6 +723,7 @@ public class MessengerImpl implements Me
sasl.setMechanisms(new String[]{"ANONYMOUS"});
sasl.done(Sasl.SaslOutcome.PN_SASL_OK);
}
+ transport.ssl(ctx.getDomain());
connection.open();
}
// process connectors, reclaiming credit on closed connectors
@@ -854,13 +902,13 @@ public class MessengerImpl implements Me
return done;
}
- private Connection lookup(String host, String service)
+ private Connection lookup(Address address)
{
for (Connector<?> c : _driver.connectors())
{
Connection connection = c.getConnection();
ConnectionContext ctx = (ConnectionContext) connection.getContext();
- if (host.equals(connection.getRemoteContainer()) || service.equals(ctx.getService()))
+ if (ctx.matches(address))
{
return connection;
}
@@ -1191,18 +1239,19 @@ public class MessengerImpl implements Me
}
}
- private <C extends Link> C getLink(String host, int port, LinkFinder<C> finder)
+ private <C extends Link> C getLink(Address address, LinkFinder<C> finder)
{
- String service = host + ":" + port;
- Connection connection = lookup(host, service);
+ Connection connection = lookup(address);
if (connection == null)
{
+ String host = address.getHost();
+ int port = Integer.valueOf(address.getImpliedPort());
Connector<?> connector = _driver.createConnector(host, port, null);
_logger.log(Level.FINE, "Connecting to " + host + ":" + port);
connection = Proton.connection();
connection.setContainer(_name);
connection.setHostname(host);
- connection.setContext(new ConnectionContext(service, connector));
+ connection.setContext(new ConnectionContext(address, connector));
connector.setConnection(connection);
Sasl sasl = connector.sasl();
if (sasl != null)
@@ -1210,6 +1259,18 @@ public class MessengerImpl implements Me
sasl.client();
sasl.setMechanisms(new String[]{"ANONYMOUS"});
}
+ if ("amqps".equalsIgnoreCase(address.getScheme())) {
+ Transport transport = connector.getTransport();
+ SslDomain domain = makeDomain(address, SslDomain.Mode.CLIENT);
+ if (_trustedDb != null) {
+ domain.setPeerAuthentication(SslDomain.VerifyMode.VERIFY_PEER);
+ //domain.setPeerAuthentication(SslDomain.VerifyMode.VERIFY_PEER_NAME);
+ } else {
+ domain.setPeerAuthentication(SslDomain.VerifyMode.ANONYMOUS_PEER);
+ }
+ Ssl ssl = transport.ssl(domain);
+ //ssl.setPeerHostname(host);
+ }
connection.open();
}
@@ -1363,12 +1424,6 @@ public class MessengerImpl implements Me
else return path.equals(source.getAddress());
}
- private static String defaultPort(String scheme)
- {
- if ("amqps".equals(scheme)) return "5671";
- else return "5672";
- }
-
@Override
public String toString()
{
@@ -1422,18 +1477,27 @@ public class MessengerImpl implements Me
private class ConnectionContext
{
- private String _service;
+ private Address _address;
private Connector _connector;
- public ConnectionContext(String service, Connector connector)
+ public ConnectionContext(Address address, Connector connector)
{
- _service = service;
+ _address = address;
_connector = connector;
}
- public String getService()
+ public Address getAddress()
+ {
+ return _address;
+ }
+
+ public boolean matches(Address address)
{
- return _service;
+ String host = address.getHost();
+ String port = address.getImpliedPort();
+ Connection conn = _connector.getConnection();
+ return host.equals(conn.getRemoteContainer()) ||
+ (_address.getHost().equals(host) && _address.getImpliedPort().equals(port));
}
public Connector getConnector()
@@ -1442,32 +1506,47 @@ public class MessengerImpl implements Me
}
}
- private class ListenerContext
+ private SslDomain makeDomain(Address address, SslDomain.Mode mode)
{
- private String _host;
- private String _port;
- private String _service; // for now. move to subscription later
+ SslDomain domain = Proton.sslDomain();
+ domain.init(mode);
+ if (_certificate != null) {
+ domain.setCredentials(_certificate, _privateKey, _password);
+ }
+ if (_trustedDb != null) {
+ domain.setTrustedCaDb(_trustedDb);
+ }
- public ListenerContext(String service, String host, String port)
- {
- _service = service;
- _host = host;
- _port = port;
+ if ("amqps".equalsIgnoreCase(address.getScheme())) {
+ domain.allowUnsecuredClient(false);
+ } else {
+ domain.allowUnsecuredClient(true);
}
- public String getService()
+ return domain;
+ }
+
+
+ private class ListenerContext
+ {
+ private Address _address;
+ private SslDomain _domain;
+
+ public ListenerContext(Address address)
{
- return _service;
+ _address = address;
+ _domain = makeDomain(address, SslDomain.Mode.SERVER);
}
- public String getHost()
+ public SslDomain getDomain()
{
- return _host;
+ return _domain;
}
- public String getPort()
+ public Address getAddress()
{
- return _port;
+ return _address;
}
+
}
}
Modified: qpid/proton/trunk/proton-j/src/main/resources/cengine.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/resources/cengine.py?rev=1566706&r1=1566705&r2=1566706&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/resources/cengine.py (original)
+++ qpid/proton/trunk/proton-j/src/main/resources/cengine.py Mon Feb 10 19:16:38 2014
@@ -879,10 +879,11 @@ def pn_transport_pending(trans):
return trans.error.set(PN_ERR, str(e))
def pn_transport_peek(trans, size):
- bb = trans.impl.head()
- size = min(bb.remaining(), size)
+ size = min(trans.impl.pending(), size)
ba = zeros(size, 'b')
- bb.get(ba)
+ if size:
+ bb = trans.impl.head()
+ bb.get(ba)
return 0, ba.tostring()
def pn_transport_pop(trans, size):
Modified: qpid/proton/trunk/proton-j/src/main/resources/cmessenger.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/resources/cmessenger.py?rev=1566706&r1=1566705&r2=1566706&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/resources/cmessenger.py (original)
+++ qpid/proton/trunk/proton-j/src/main/resources/cmessenger.py Mon Feb 10 19:16:38 2014
@@ -41,7 +41,10 @@ class pn_messenger_wrapper:
self.error = pn_error(0, None)
def pn_messenger(name):
- return pn_messenger_wrapper(Proton.messenger(name));
+ if name is None:
+ return pn_messenger_wrapper(Proton.messenger())
+ else:
+ return pn_messenger_wrapper(Proton.messenger(name))
def pn_messenger_error(m):
return m.error
@@ -55,7 +58,20 @@ def pn_messenger_set_blocking(m, b):
return 0
def pn_messenger_set_certificate(m, c):
- raise Skipped()
+ m.impl.setCertificate(c)
+ return 0
+
+def pn_messenger_set_private_key(m, p):
+ m.impl.setPrivateKey(p)
+ return 0
+
+def pn_messenger_set_password(m, p):
+ m.impl.setPassword(p)
+ return 0
+
+def pn_messenger_set_trusted_certificates(m, t):
+ m.impl.setTrustedCertificates(t)
+ return 0
def pn_messenger_set_incoming_window(m, w):
m.impl.setIncomingWindow(w)
@@ -103,6 +119,8 @@ def pn_messenger_interrupt(m):
def pn_messenger_buffered(m, t):
raise Skipped()
+from org.apache.qpid.proton.engine import TransportException
+
def pn_messenger_stop(m):
m.impl.stop()
return 0
Modified: qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/CannedTransportOutput.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/CannedTransportOutput.java?rev=1566706&r1=1566705&r2=1566706&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/CannedTransportOutput.java (original)
+++ qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/CannedTransportOutput.java Mon Feb 10 19:16:38 2014
@@ -29,6 +29,7 @@ public class CannedTransportOutput imple
private ByteBuffer _cannedOutput;
private ByteBuffer _head;
+ private int _popped;
public CannedTransportOutput()
{
@@ -43,6 +44,7 @@ public class CannedTransportOutput imple
{
_cannedOutput = ByteBuffer.wrap(output.getBytes());
_head = _cannedOutput.asReadOnlyBuffer();
+ _popped = 0;
}
@Override
@@ -60,8 +62,8 @@ public class CannedTransportOutput imple
@Override
public void pop(int bytes)
{
- _cannedOutput.position(bytes);
- _head.position(bytes);
+ _popped += bytes;
+ _head.position(_popped);
}
@Override
Modified: qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapperTest.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapperTest.java?rev=1566706&r1=1566705&r2=1566706&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapperTest.java (original)
+++ qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapperTest.java Mon Feb 10 19:16:38 2014
@@ -238,17 +238,18 @@ public class SimpleSslTransportWrapperTe
private String getAllBytesFromTransport()
{
StringBuilder readBytes = new StringBuilder();
- boolean continueLooping;
- do
+ while (true)
{
- ByteBuffer buffer = _sslWrapper.head();
- continueLooping = buffer.hasRemaining();
-
- readBytes.append(pourBufferToString(buffer));
-
- _sslWrapper.pop(buffer.position());
+ int pending = _sslWrapper.pending();
+ if (pending > 0) {
+ ByteBuffer buffer = _sslWrapper.head();
+ readBytes.append(pourBufferToString(buffer));
+ _sslWrapper.pop(pending);
+ continue;
+ } else {
+ break;
+ }
}
- while(continueLooping);
return readBytes.toString();
}
Modified: qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/SslHandshakeSniffingTransportWrapperTest.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/SslHandshakeSniffingTransportWrapperTest.java?rev=1566706&r1=1566705&r2=1566706&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/SslHandshakeSniffingTransportWrapperTest.java (original)
+++ qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/SslHandshakeSniffingTransportWrapperTest.java Mon Feb 10 19:16:38 2014
@@ -29,6 +29,7 @@ import static org.mockito.Mockito.when;
import java.nio.ByteBuffer;
+import org.apache.qpid.proton.engine.TransportException;
import org.apache.qpid.proton.engine.impl.TransportWrapper;
import org.junit.Rule;
import org.junit.Test;
@@ -107,8 +108,9 @@ public class SslHandshakeSniffingTranspo
try
{
_sniffingWrapper.tail().put(sourceBuffer);
+ _sniffingWrapper.close_tail();
- _expectedException.expect(IllegalArgumentException.class);
+ _expectedException.expect(TransportException.class);
_sniffingWrapper.process();
}
finally
Modified: qpid/proton/trunk/tests/python/proton_tests/ssl.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/ssl.py?rev=1566706&r1=1566705&r2=1566706&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/ssl.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/ssl.py Mon Feb 10 19:16:38 2014
@@ -817,6 +817,10 @@ class MessengerSSLTests(common.Test):
key="server-private-key.pem",
password="server-password",
exception=None):
+ import sys
+ # java doesn't do validation in the same way (yet)
+ if exception and "java" in sys.platform:
+ raise Skipped()
self.server.certificate = _testpath(cert)
self.server.private_key = _testpath(key)
self.server.password = password
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org