You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2014/02/10 20:16:38 UTC

svn commit: r1566706 - in /qpid/proton/trunk: proton-c/src/transport/ proton-j/src/main/java/org/apache/qpid/proton/amqp/ proton-j/src/main/java/org/apache/qpid/proton/driver/impl/ proton-j/src/main/java/org/apache/qpid/proton/engine/ proton-j/src/main...

Author: rhs
Date: Mon Feb 10 19:16:38 2014
New Revision: 1566706

URL: http://svn.apache.org/r1566706
Log:
PROTON-517: fixed ssl sniffing and ssl transport implementation; added certificate API to java Messenger

Modified:
    qpid/proton/trunk/proton-c/src/transport/transport.c
    qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/amqp/Binary.java
    qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java
    qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/Delivery.java
    qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java
    qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
    qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java
    qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java
    qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java
    qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java
    qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java
    qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
    qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapper.java
    qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslEngineFacadeFactory.java
    qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslHandshakeSniffingTransportWrapper.java
    qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslImpl.java
    qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/messenger/Messenger.java
    qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/Address.java
    qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
    qpid/proton/trunk/proton-j/src/main/resources/cengine.py
    qpid/proton/trunk/proton-j/src/main/resources/cmessenger.py
    qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/CannedTransportOutput.java
    qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapperTest.java
    qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/SslHandshakeSniffingTransportWrapperTest.java
    qpid/proton/trunk/tests/python/proton_tests/ssl.py

Modified: qpid/proton/trunk/proton-c/src/transport/transport.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/transport/transport.c?rev=1566706&r1=1566705&r2=1566706&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/transport/transport.c (original)
+++ qpid/proton/trunk/proton-c/src/transport/transport.c Mon Feb 10 19:16:38 2014
@@ -1083,8 +1083,9 @@ int pn_process_conn_setup(pn_transport_t
     if (!(endpoint->state & PN_LOCAL_UNINIT) && !transport->open_sent)
     {
       pn_connection_t *connection = (pn_connection_t *) endpoint;
+      const char *cid = pn_string_get(connection->container);
       int err = pn_post_frame(transport->disp, 0, "DL[SS?I?H?InnCCC]", OPEN,
-                              pn_string_get(connection->container),
+                              cid ? cid : "",
                               pn_string_get(connection->hostname),
                               // if not zero, advertise our max frame size and idle timeout
                               (bool)transport->local_max_frame, transport->local_max_frame,

Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/amqp/Binary.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/amqp/Binary.java?rev=1566706&r1=1566705&r2=1566706&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/amqp/Binary.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/amqp/Binary.java Mon Feb 10 19:16:38 2014
@@ -165,7 +165,7 @@ public final class Binary
     {
         if( buffer == null )
             return null;
-        if( buffer.isDirect() ) 
+        if( buffer.isDirect() || buffer.isReadOnly() )
         {
             byte data[] = new byte [buffer.remaining()];
             ByteBuffer dup = buffer.duplicate();
@@ -177,4 +177,5 @@ public final class Binary
             return new Binary(buffer.array(), buffer.arrayOffset()+buffer.position(), buffer.remaining());
         }
     }
+
 }

Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java?rev=1566706&r1=1566705&r2=1566706&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java Mon Feb 10 19:16:38 2014
@@ -31,6 +31,7 @@ import org.apache.qpid.proton.driver.Lis
 import org.apache.qpid.proton.engine.Connection;
 import org.apache.qpid.proton.engine.Sasl;
 import org.apache.qpid.proton.engine.Transport;
+import org.apache.qpid.proton.engine.TransportException;
 import org.apache.qpid.proton.engine.impl.TransportFactory;
 
 class ConnectorImpl<C> implements Connector<C>
@@ -125,12 +126,17 @@ class ConnectorImpl<C> implements Connec
         }
         else
         {
-            int bytesRead = _channel.read(_transport.tail());
+            ByteBuffer tail = _transport.tail();
+            int bytesRead = _channel.read(tail);
             if (bytesRead < 0) {
                 _transport.close_tail();
                 _inputDone = true;
             } else if (bytesRead > 0) {
-                _transport.process();
+                try {
+                    _transport.process();
+                } catch (TransportException e) {
+                    _logger.log(Level.SEVERE, this + " error processing input", e);
+                }
                 processed = true;
             }
         }
@@ -156,26 +162,35 @@ class ConnectorImpl<C> implements Connec
         int interest = _key.interestOps();
         boolean writeBlocked = false;
 
-        while (_transport.pending() >= 0 && !writeBlocked)
-        {
-            int wrote = _channel.write(_transport.head());
-            if (wrote > 0) {
-                processed = true;
-                _transport.pop(wrote);
-            } else {
-                writeBlocked = true;
+        try {
+            while (_transport.pending() > 0 && !writeBlocked)
+            {
+                ByteBuffer head = _transport.head();
+                int wrote = _channel.write(head);
+                if (wrote > 0) {
+                    processed = true;
+                    _transport.pop(wrote);
+                } else {
+                    writeBlocked = true;
+                }
             }
-        }
 
-        int pending = _transport.pending();
-        if (pending > 0) {
-            interest |= SelectionKey.OP_WRITE;
-        } else {
-            interest &= ~SelectionKey.OP_WRITE;
-            if (pending < 0) {
-                _outputDone = true;
+            int pending = _transport.pending();
+            if (pending > 0) {
+                interest |= SelectionKey.OP_WRITE;
+            } else {
+                interest &= ~SelectionKey.OP_WRITE;
+                if (pending < 0) {
+                    _outputDone = true;
+                }
             }
+        } catch (TransportException e) {
+            _logger.log(Level.SEVERE, this + " error", e);
+            interest &= ~SelectionKey.OP_WRITE;
+            _inputDone = true;
+            _outputDone = true;
         }
+
         _key.interestOps(interest);
 
         return processed;

Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/Delivery.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/Delivery.java?rev=1566706&r1=1566705&r2=1566706&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/Delivery.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/Delivery.java Mon Feb 10 19:16:38 2014
@@ -78,6 +78,8 @@ public interface Delivery
      */
     public Delivery getWorkNext();
 
+    public Delivery next();
+
     public boolean isWritable();
 
     /**

Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java?rev=1566706&r1=1566705&r2=1566706&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java Mon Feb 10 19:16:38 2014
@@ -75,9 +75,9 @@ public interface Link extends Endpoint
     public Delivery delivery(byte[] tag, int offset, int length);
 
     /**
-     * @return the unsettled deliveries for this link
+     * Returns the head delivery on the link.
      */
-    public Iterator<Delivery> unsettled();
+    Delivery head();
 
     /**
      * Returns the current delivery

Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java?rev=1566706&r1=1566705&r2=1566706&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java Mon Feb 10 19:16:38 2014
@@ -157,6 +157,11 @@ public class DeliveryImpl implements Del
         return _linkNext;
     }
 
+    public DeliveryImpl next()
+    {
+        return getLinkNext();
+    }
+
     public void free()
     {
 

Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java?rev=1566706&r1=1566705&r2=1566706&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java Mon Feb 10 19:16:38 2014
@@ -177,6 +177,6 @@ public abstract class EndpointImpl imple
     @Override
     public String toString()
     {
-        return "EndpointImpl [_localState=" + _localState + ", _remoteState=" + _remoteState + ", _localError=" + _localError + ", _remoteError=" + _remoteError + "]";
+        return "EndpointImpl(" + System.identityHashCode(this) + ") [_localState=" + _localState + ", _remoteState=" + _remoteState + ", _localError=" + _localError + ", _remoteError=" + _remoteError + "]";
     }
 }

Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java?rev=1566706&r1=1566705&r2=1566706&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java Mon Feb 10 19:16:38 2014
@@ -381,4 +381,10 @@ public abstract class LinkImpl extends E
     {
         return _credit - _queued;
     }
+
+    public DeliveryImpl head()
+    {
+        return _head;
+    }
+
 }

Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java?rev=1566706&r1=1566705&r2=1566706&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java Mon Feb 10 19:16:38 2014
@@ -96,11 +96,6 @@ public class ReceiverImpl extends LinkIm
         return consumed;
     }
 
-    public Iterator<Delivery> unsettled()
-    {
-        return null;  //TODO.
-    }
-
     public void free()
     {
         getSession().freeReceiver(this);

Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java?rev=1566706&r1=1566705&r2=1566706&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java Mon Feb 10 19:16:38 2014
@@ -563,6 +563,9 @@ public class SaslImpl implements Sasl, S
         public void close_tail()
         {
             _tail_closed = true;
+            if (isInputInSaslMode()) {
+                _head_closed = true;
+            }
         }
 
         private void reallyProcessInput() throws TransportException

Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java?rev=1566706&r1=1566705&r2=1566706&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java Mon Feb 10 19:16:38 2014
@@ -63,12 +63,6 @@ public class SenderImpl  extends LinkImp
         //TODO.
     }
 
-    public Iterator<Delivery> unsettled()
-    {
-        return null;  //TODO.
-    }
-
-
     public void free()
     {
         getSession().freeSender(this);

Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java?rev=1566706&r1=1566705&r2=1566706&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java Mon Feb 10 19:16:38 2014
@@ -108,6 +108,7 @@ public class TransportImpl extends Endpo
     private boolean _init;
 
     private FrameHandler _frameHandler = this;
+    private boolean _head_closed = false;
 
     /**
      * @deprecated This constructor's visibility will be reduced to the default scope in a future release.
@@ -272,7 +273,7 @@ public class TransportImpl extends Endpo
 
         _frameWriter.readBytes(outputBuffer);
 
-        return _isCloseSent;
+        return _isCloseSent || _head_closed;
     }
 
     @Override
@@ -716,7 +717,8 @@ public class TransportImpl extends Endpo
         if(_connectionEndpoint != null && _connectionEndpoint.getLocalState() != EndpointState.UNINITIALIZED && !_isOpenSent)
         {
             Open open = new Open();
-            open.setContainerId(_connectionEndpoint.getLocalContainerId());
+            String cid = _connectionEndpoint.getLocalContainerId();
+            open.setContainerId(cid == null ? "" : cid);
             open.setHostname(_connectionEndpoint.getHostname());
             open.setDesiredCapabilities(_connectionEndpoint.getDesiredCapabilities());
             open.setOfferedCapabilities(_connectionEndpoint.getOfferedCapabilities());
@@ -1236,8 +1238,13 @@ public class TransportImpl extends Endpo
     @Override
     public void process() throws TransportException
     {
-        init();
-        _inputProcessor.process();
+        try {
+            init();
+            _inputProcessor.process();
+        } catch (TransportException e) {
+            _head_closed = true;
+            throw e;
+        }
     }
 
     @Override

Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapper.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapper.java?rev=1566706&r1=1566705&r2=1566706&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapper.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapper.java Mon Feb 10 19:16:38 2014
@@ -53,20 +53,18 @@ public class SimpleSslTransportWrapper i
     private final TransportOutput _underlyingOutput;
 
     private boolean _tail_closed = false;
-    private final ByteBuffer _inputBuffer;
+    private ByteBuffer _inputBuffer;
 
-    private boolean _head_closed = true;
-    private final ByteBuffer _outputBuffer;
-    private final ByteBuffer _head;
+    private boolean _head_closed = false;
+    private ByteBuffer _outputBuffer;
+    private ByteBuffer _head;
 
     /**
      * A buffer for the decoded bytes that will be passed to _underlyingInput.
      * This extra layer of buffering is necessary in case the underlying input's buffer
      * is too small for SSLEngine to ever unwrap into.
      */
-    private final ByteBuffer _decodedInputBuffer;
-
-    private ByteBuffer _readOnlyOutputBufferView;
+    private ByteBuffer _decodedInputBuffer;
 
     /** could change during the lifetime of the ssl connection owing to renegotiation. */
     private String _cipherName;
@@ -91,7 +89,7 @@ public class SimpleSslTransportWrapper i
         _head = _outputBuffer.asReadOnlyBuffer();
         _head.limit(0);
 
-        _decodedInputBuffer = newReadableBuffer(effectiveAppBufferMax);
+        _decodedInputBuffer = newWriteableBuffer(effectiveAppBufferMax);
 
         if(_logger.isLoggable(Level.FINE))
         {
@@ -108,59 +106,81 @@ public class SimpleSslTransportWrapper i
      * - On exit, it is still readable and its "remaining" bytes are those that we were unable
      * to unwrap (e.g. if they don't form a whole packet).
      */
-    private void unwrapInput() throws TransportException
+    private void unwrapInput() throws SSLException
     {
-        try
-        {
-            boolean keepLooping = true;
-            do
-            {
-                _decodedInputBuffer.compact();
-                SSLEngineResult result = _sslEngine.unwrap(_inputBuffer, _decodedInputBuffer);
+        while (true) {
+            SSLEngineResult result = _sslEngine.unwrap(_inputBuffer, _decodedInputBuffer);
+            logEngineClientModeAndResult(result, "input");
+
+            int read = result.bytesProduced();
+            Status status = result.getStatus();
+            HandshakeStatus hstatus = result.getHandshakeStatus();
+
+            int capacity = _underlyingInput.capacity();
+            if (capacity == Transport.END_OF_STREAM) {
+                _tail_closed = true;
+                if (_decodedInputBuffer.position() > 0) {
+                    throw new TransportException("bytes left unconsumed");
+                }
+            } else {
+                ByteBuffer tail = _underlyingInput.tail();
                 _decodedInputBuffer.flip();
-
-                runDelegatedTasks(result);
-                updateCipherAndProtocolName(result);
-
-                logEngineClientModeAndResult(result, "input");
-
-                Status sslResultStatus = result.getStatus();
-                HandshakeStatus handshakeStatus = result.getHandshakeStatus();
-
-                if(sslResultStatus == SSLEngineResult.Status.OK)
-                {
-                    // continue
+                tail.put(_decodedInputBuffer);
+                _decodedInputBuffer.compact();
+                _underlyingInput.process();
+                capacity = _underlyingInput.capacity();
+                if (capacity == Transport.END_OF_STREAM) {
+                    _tail_closed = true;
                 }
-                else if(sslResultStatus == SSLEngineResult.Status.BUFFER_UNDERFLOW)
+            }
+
+            switch (status) {
+            case CLOSED:
+                _tail_closed = true;
+                break;
+            case BUFFER_OVERFLOW:
                 {
-                    // Not an error. The not-yet-decoded bytes remain in _inputBuffer and will hopefully be augmented by some more
-                    // in a subsequent invocation of this method, allowing decoding to be done.
+                    ByteBuffer old = _decodedInputBuffer;
+                    _decodedInputBuffer = newWriteableBuffer(old.capacity()*2);
+                    old.flip();
+                    _decodedInputBuffer.put(old);
                 }
-                else
-                {
-                    throw new IllegalStateException("Unexpected SSL Engine state " + sslResultStatus);
+                continue;
+            case BUFFER_UNDERFLOW:
+                if (_tail_closed) {
+                    _head_closed = true;
                 }
+                // wait for more data
+                break;
+            case OK:
+                break;
+            }
 
-                if(result.bytesProduced() > 0)
-                {
-                    if(handshakeStatus != HandshakeStatus.NOT_HANDSHAKING)
-                    {
-                        _logger.warning("WARN unexpectedly produced bytes for the underlying input when handshaking");
+            switch (hstatus)
+            {
+            case NEED_WRAP:
+                // wait for write to kick in
+                break;
+            case NEED_TASK:
+                runDelegatedTasks(result);
+                continue;
+            case FINISHED:
+                updateCipherAndProtocolName(result);
+            case NOT_HANDSHAKING:
+            case NEED_UNWRAP:
+                if (_inputBuffer.position() > 0 && status == Status.OK) {
+                    continue;
+                } else {
+                    if (_inputBuffer.position() == 0 &&
+                        hstatus == HandshakeStatus.NEED_UNWRAP &&
+                        _tail_closed) {
+                        _head_closed = true;
                     }
-
-                    pourAll(_decodedInputBuffer, _underlyingInput);
+                    break;
                 }
-
-                keepLooping = handshakeStatus == HandshakeStatus.NOT_HANDSHAKING
-                            && sslResultStatus != Status.BUFFER_UNDERFLOW;
             }
-            while(keepLooping);
-        }
-        catch(SSLException e)
-        {
-            throw new TransportException("Problem during input. useClientMode: "
-                                         + _sslEngine.getUseClientMode(),
-                                         e);
+
+            break;
         }
     }
 
@@ -170,57 +190,66 @@ public class SimpleSslTransportWrapper i
      * {@link #_outputBuffer} is assumed to be writeable on entry and is guaranteed to
      * be still writeable on exit.
      */
-    private void wrapOutput()
+    private void wrapOutput() throws SSLException
     {
-        boolean keepWrapping = hasSpaceForSslPacket(_outputBuffer);
-        while(keepWrapping)
-        {
+        while (true) {
             int pending = _underlyingOutput.pending();
-            if (pending == Transport.END_OF_STREAM)
-            {
+            if (pending < 0) {
                 _head_closed = true;
-                keepWrapping = false;
             }
 
             ByteBuffer clearOutputBuffer = _underlyingOutput.head();
-            try
-            {
-                if(clearOutputBuffer.hasRemaining())
-                {
-                    SSLEngineResult result = _sslEngine.wrap(clearOutputBuffer, _outputBuffer);
+            SSLEngineResult result = _sslEngine.wrap(clearOutputBuffer, _outputBuffer);
+            logEngineClientModeAndResult(result, "output");
 
-                    logEngineClientModeAndResult(result, "output");
-
-                    Status sslResultStatus = result.getStatus();
-                    if(sslResultStatus == SSLEngineResult.Status.BUFFER_OVERFLOW)
-                    {
-                        throw new IllegalStateException("Insufficient space to perform wrap into encoded output buffer. Buffer: " + _outputBuffer);
-                    }
-                    else if(sslResultStatus != SSLEngineResult.Status.OK)
-                    {
-                        throw new RuntimeException("Unexpected SSLEngineResult status " + sslResultStatus);
-                    }
-
-                    runDelegatedTasks(result);
-                    updateCipherAndProtocolName(result);
+            int written = result.bytesConsumed();
+            _underlyingOutput.pop(written);
+            pending = _underlyingOutput.pending();
+
+            Status status = result.getStatus();
+            switch (status) {
+            case CLOSED:
+                _head_closed = true;
+                break;
+            case OK:
+                break;
+            case BUFFER_OVERFLOW:
+                ByteBuffer old = _outputBuffer;
+                _outputBuffer = newWriteableBuffer(_outputBuffer.capacity()*2);
+                _head = _outputBuffer.asReadOnlyBuffer();
+                old.flip();
+                _outputBuffer.put(old);
+                continue;
+            case BUFFER_UNDERFLOW:
+                throw new IllegalStateException("app buffer underflow");
+            }
 
-                    keepWrapping = result.getHandshakeStatus() == HandshakeStatus.NOT_HANDSHAKING
-                            && hasSpaceForSslPacket(_outputBuffer);
+            HandshakeStatus hstatus = result.getHandshakeStatus();
+            switch (hstatus) {
+            case NEED_UNWRAP:
+                // wait for input data
+                if (_inputBuffer.position() == 0 && _tail_closed) {
+                    _head_closed = true;
                 }
-                else
-                {
-                    // no output to wrap
-                    keepWrapping = false;
+                break;
+            case NEED_WRAP:
+                // keep looping
+                continue;
+            case NEED_TASK:
+                runDelegatedTasks(result);
+                continue;
+            case FINISHED:
+                updateCipherAndProtocolName(result);
+                // intentionally fall through
+            case NOT_HANDSHAKING:
+                if (pending > 0 && status == Status.OK) {
+                    continue;
+                } else {
+                    break;
                 }
             }
-            catch(SSLException e)
-            {
-                throw new TransportException("Problem during output. useClientMode: " + _sslEngine.getUseClientMode(), e);
-            }
-            finally
-            {
-                _underlyingOutput.pop(clearOutputBuffer.position());
-            }
+
+            break;
         }
     }
 
@@ -275,7 +304,7 @@ public class SimpleSslTransportWrapper i
         if(_logger.isLoggable(Level.FINEST))
         {
             _logger.log(Level.FINEST, "useClientMode = " + _sslEngine.getUseClientMode() + " direction = " + direction
-                    + " " + resultToString(result));
+                        + " " + resultToString(result));
         }
     }
 
@@ -311,7 +340,11 @@ public class SimpleSslTransportWrapper i
 
         try
         {
-            unwrapInput();
+            try {
+                unwrapInput();
+            } catch (SSLException e) {
+                throw new TransportException(e);
+            }
         }
         catch (TransportException e)
         {
@@ -338,7 +371,12 @@ public class SimpleSslTransportWrapper i
     @Override
     public int pending()
     {
-        wrapOutput();
+        try {
+            wrapOutput();
+        } catch (SSLException e) {
+            throw new TransportException(e);
+        }
+
         _head.limit(_outputBuffer.position());
 
         if (_head_closed && _outputBuffer.position() == 0)

Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslEngineFacadeFactory.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslEngineFacadeFactory.java?rev=1566706&r1=1566705&r2=1566706&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslEngineFacadeFactory.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslEngineFacadeFactory.java Mon Feb 10 19:16:38 2014
@@ -56,6 +56,7 @@ import javax.net.ssl.X509TrustManager;
 import org.apache.qpid.proton.engine.SslDomain;
 import org.apache.qpid.proton.engine.SslPeerDetails;
 import org.apache.qpid.proton.ProtonUnsupportedOperationException;
+import org.apache.qpid.proton.engine.TransportException;
 
 public class SslEngineFacadeFactory
 {
@@ -93,7 +94,7 @@ public class SslEngineFacadeFactory
         try {
             return klass.getConstructor(params);
         } catch (NoSuchMethodException e) {
-            throw new RuntimeException(e);
+            throw new TransportException(e);
         }
     }
 
@@ -105,7 +106,7 @@ public class SslEngineFacadeFactory
         try {
             return klass.getMethod(name, params);
         } catch (NoSuchMethodException e) {
-            throw new RuntimeException(e);
+            throw new TransportException(e);
         }
     }
 
@@ -260,19 +261,19 @@ public class SslEngineFacadeFactory
             }
             catch (NoSuchAlgorithmException e)
             {
-                throw new IllegalStateException("Unexpected exception creating SSLContext", e);
+                throw new TransportException("Unexpected exception creating SSLContext", e);
             }
             catch (KeyStoreException e)
             {
-                throw new IllegalStateException("Unexpected exception creating SSLContext", e);
+                throw new TransportException("Unexpected exception creating SSLContext", e);
             }
             catch (UnrecoverableKeyException e)
             {
-                throw new IllegalStateException("Unexpected exception creating SSLContext", e);
+                throw new TransportException("Unexpected exception creating SSLContext", e);
             }
             catch (KeyManagementException e)
             {
-                throw new IllegalStateException("Unexpected exception creating SSLContext", e);
+                throw new TransportException("Unexpected exception creating SSLContext", e);
             }
         }
         return _sslContext;
@@ -318,7 +319,7 @@ public class SslEngineFacadeFactory
                 else
                 {
                     // Should not happen - readPemObject will have already verified key type
-                    throw new IllegalStateException("Unexpected key type " + keyOrKeyPair);
+                    throw new TransportException("Unexpected key type " + keyOrKeyPair);
                 }
 
                 keystore.setKeyEntry(clientPrivateKeyAlias, clientPrivateKey,
@@ -328,19 +329,19 @@ public class SslEngineFacadeFactory
         }
         catch (KeyStoreException e)
         {
-           throw new IllegalStateException("Unexpected exception creating keystore", e);
+           throw new TransportException("Unexpected exception creating keystore", e);
         }
         catch (NoSuchAlgorithmException e)
         {
-            throw new IllegalStateException("Unexpected exception creating keystore", e);
+            throw new TransportException("Unexpected exception creating keystore", e);
         }
         catch (CertificateException e)
         {
-            throw new IllegalStateException("Unexpected exception creating keystore", e);
+            throw new TransportException("Unexpected exception creating keystore", e);
         }
         catch (IOException e)
         {
-            throw new IllegalStateException("Unexpected exception creating keystore", e);
+            throw new TransportException("Unexpected exception creating keystore", e);
         }
     }
 
@@ -370,9 +371,10 @@ public class SslEngineFacadeFactory
 
         if (addedAnonymousCipherSuites == 0)
         {
-            throw new IllegalStateException("None of " + anonymousCipherSuites
-                    + " anonymous cipher suites are within the supported list "
-                    + supportedSuites);
+            throw new TransportException
+                ("None of " + anonymousCipherSuites
+                 + " anonymous cipher suites are within the supported list "
+                 + supportedSuites);
         }
 
         if(_logger.isLoggable(Level.FINE))
@@ -423,29 +425,31 @@ public class SslEngineFacadeFactory
             Object pemObject = readObjectMeth.invoke(pemReader);
             if (!checkPemObjectIsOfAllowedTypes(pemObject, expectedInterfaces))
             {
-                throw new IllegalStateException("File " + pemFile + " does not provide a object of the required type."
-                        + " Read an object of class " + pemObject.getClass().getName()
-                        + " whilst expecting an implementation of one of the following  : " + Arrays.asList(expectedInterfaces));
+                throw new TransportException
+                    ("File " + pemFile + " does not provide a object of the required type."
+                     + " Read an object of class " + pemObject.getClass().getName()
+                     + " whilst expecting an implementation of one of the following  : "
+                     + Arrays.asList(expectedInterfaces));
             }
             return pemObject;
         }
         catch(InstantiationException e)
         {
             _logger.log(Level.SEVERE, "Unable to read PEM object. Perhaps you need the unlimited strength libraries in <java-home>/jre/lib/security/ ?", e);
-            throw new IllegalStateException("Unable to read PEM object from file " + pemFile, e);
+            throw new TransportException("Unable to read PEM object from file " + pemFile, e);
         }
         catch(InvocationTargetException e)
         {
             _logger.log(Level.SEVERE, "Unable to read PEM object. Perhaps you need the unlimited strength libraries in <java-home>/jre/lib/security/ ?", e);
-            throw new IllegalStateException("Unable to read PEM object from file " + pemFile, e);
+            throw new TransportException("Unable to read PEM object from file " + pemFile, e);
         }
         catch (IllegalAccessException e)
         {
-            throw new RuntimeException(e);
+            throw new TransportException(e);
         }
         catch (IOException e)
         {
-            throw new RuntimeException("Unable to read PEM object from file " + pemFile, e);
+            throw new TransportException("Unable to read PEM object from file " + pemFile, e);
         }
         finally
         {
@@ -508,13 +512,13 @@ public class SslEngineFacadeFactory
 
             return finder;
         } catch (NoSuchMethodException e) {
-            throw new RuntimeException(e);
+            throw new TransportException(e);
         } catch (InstantiationException e) {
-            throw new RuntimeException(e);
+            throw new TransportException(e);
         } catch (IllegalAccessException e) {
-            throw new RuntimeException(e);
+            throw new TransportException(e);
         } catch (InvocationTargetException e) {
-            throw new RuntimeException(e);
+            throw new TransportException(e);
         }
     }
 

Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslHandshakeSniffingTransportWrapper.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslHandshakeSniffingTransportWrapper.java?rev=1566706&r1=1566705&r2=1566706&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslHandshakeSniffingTransportWrapper.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslHandshakeSniffingTransportWrapper.java Mon Feb 10 19:16:38 2014
@@ -33,14 +33,14 @@ public class SslHandshakeSniffingTranspo
     private final TransportWrapper _plainTransportWrapper;
 
     private boolean _tail_closed = false;
-    private boolean _head_closed = true;
+    private boolean _head_closed = false;
     private TransportWrapper _selectedTransportWrapper;
 
     private final ByteBuffer _determinationBuffer = ByteBuffer.allocate(MINIMUM_LENGTH_FOR_DETERMINATION);
 
-    SslHandshakeSniffingTransportWrapper(
-            SslTransportWrapper secureTransportWrapper,
-            TransportWrapper plainTransportWrapper)
+    SslHandshakeSniffingTransportWrapper
+        (SslTransportWrapper secureTransportWrapper,
+         TransportWrapper plainTransportWrapper)
     {
         _secureTransportWrapper = secureTransportWrapper;
         _plainTransportWrapper = plainTransportWrapper;
@@ -51,12 +51,12 @@ public class SslHandshakeSniffingTranspo
     {
         if (isDeterminationMade())
         {
-            return _secureTransportWrapper.capacity();
+            return _selectedTransportWrapper.capacity();
         }
         else
         {
             if (_tail_closed) { return Transport.END_OF_STREAM; }
-            return _determinationBuffer.capacity();
+            return _determinationBuffer.remaining();
         }
     }
 
@@ -80,7 +80,7 @@ public class SslHandshakeSniffingTranspo
         {
             _selectedTransportWrapper.process();
         }
-        else
+        else if (_determinationBuffer.remaining() == 0)
         {
             _determinationBuffer.flip();
             byte[] bytesInput = new byte[_determinationBuffer.remaining()];
@@ -91,6 +91,8 @@ public class SslHandshakeSniffingTranspo
             // TODO what if the selected transport has insufficient capacity?? Maybe use pour, and then try to finish pouring next time round.
             _selectedTransportWrapper.tail().put(_determinationBuffer);
             _selectedTransportWrapper.process();
+        } else if (_tail_closed) {
+            throw new TransportException("connection aborted");
         }
     }
 
@@ -111,32 +113,43 @@ public class SslHandshakeSniffingTranspo
     public int pending()
     {
         if (_head_closed) { return Transport.END_OF_STREAM; }
-        makePlainUnlessDeterminationAlreadyMade();
 
-        return _selectedTransportWrapper.pending();
+        if (isDeterminationMade()) {
+            return _selectedTransportWrapper.pending();
+        } else {
+            return 0;
+        }
+
     }
 
     @Override
     public ByteBuffer head()
     {
-        makePlainUnlessDeterminationAlreadyMade();
-
-        return _selectedTransportWrapper.head();
+        if (isDeterminationMade()) {
+            return _selectedTransportWrapper.head();
+        } else {
+            return null;
+        }
     }
 
     @Override
     public void pop(int bytes)
     {
-        makePlainUnlessDeterminationAlreadyMade();
-
-        _selectedTransportWrapper.pop(bytes);
+        if (isDeterminationMade()) {
+            _selectedTransportWrapper.pop(bytes);
+        } else if (bytes > 0) {
+            throw new IllegalStateException("no bytes have been read");
+        }
     }
 
     @Override
     public void close_head()
     {
-        makePlainUnlessDeterminationAlreadyMade();
-        _selectedTransportWrapper.close_head();
+        if (isDeterminationMade()) {
+            _selectedTransportWrapper.close_head();
+        } else {
+            _head_closed = true;
+        }
     }
 
     @Override
@@ -235,11 +248,4 @@ public class SslHandshakeSniffingTranspo
         }
     }
 
-    private void makePlainUnlessDeterminationAlreadyMade()
-    {
-        if (!isDeterminationMade())
-        {
-            _selectedTransportWrapper = _plainTransportWrapper;
-        }
-    }
 }

Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslImpl.java?rev=1566706&r1=1566705&r2=1566706&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslImpl.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslImpl.java Mon Feb 10 19:16:38 2014
@@ -26,6 +26,7 @@ import org.apache.qpid.proton.ProtonUnsu
 import org.apache.qpid.proton.engine.Ssl;
 import org.apache.qpid.proton.engine.SslDomain;
 import org.apache.qpid.proton.engine.SslPeerDetails;
+import org.apache.qpid.proton.engine.Transport;
 import org.apache.qpid.proton.engine.TransportException;
 import org.apache.qpid.proton.engine.impl.TransportInput;
 import org.apache.qpid.proton.engine.impl.TransportOutput;
@@ -40,6 +41,7 @@ public class SslImpl implements Ssl
     private final ProtonSslEngineProvider _protonSslEngineProvider;
 
     private final SslPeerDetails _peerDetails;
+    private TransportException _initException;
 
     /**
      * @param sslDomain must implement {@link ProtonSslEngineProvider}. This is not possible
@@ -103,14 +105,22 @@ public class SslImpl implements Ssl
         public int capacity()
         {
             initTransportWrapperOnFirstIO();
-            return _transportWrapper.capacity();
+            if (_initException == null) {
+                return _transportWrapper.capacity();
+            } else {
+                return Transport.END_OF_STREAM;
+            }
         }
 
         @Override
         public ByteBuffer tail()
         {
             initTransportWrapperOnFirstIO();
-            return _transportWrapper.tail();
+            if (_initException == null) {
+                return _transportWrapper.tail();
+            } else {
+                return null;
+            }
         }
 
 
@@ -118,42 +128,60 @@ public class SslImpl implements Ssl
         public void process() throws TransportException
         {
             initTransportWrapperOnFirstIO();
-            _transportWrapper.process();
+            if (_initException == null) {
+                _transportWrapper.process();
+            } else {
+                throw new TransportException(_initException);
+            }
         }
 
         @Override
         public void close_tail()
         {
             initTransportWrapperOnFirstIO();
-            _transportWrapper.process();
+            if (_initException == null) {
+                _transportWrapper.close_tail();
+            }
         }
 
         @Override
         public int pending()
         {
             initTransportWrapperOnFirstIO();
-            return _transportWrapper.pending();
+            if (_initException == null) {
+                return _transportWrapper.pending();
+            } else {
+                throw new TransportException(_initException);
+            }
         }
 
         @Override
         public ByteBuffer head()
         {
             initTransportWrapperOnFirstIO();
-            return _transportWrapper.head();
+            if (_initException == null) {
+                return _transportWrapper.head();
+            } else {
+                return null;
+            }
         }
 
         @Override
         public void pop(int bytes)
         {
             initTransportWrapperOnFirstIO();
-            _transportWrapper.pop(bytes);
+            if (_initException == null) {
+                _transportWrapper.pop(bytes);
+            }
         }
 
         @Override
         public void close_head()
         {
             initTransportWrapperOnFirstIO();
-            _transportWrapper.close_head();
+            if (_initException == null) {
+                _transportWrapper.close_head();
+            }
         }
 
         @Override
@@ -184,22 +212,27 @@ public class SslImpl implements Ssl
 
         private void initTransportWrapperOnFirstIO()
         {
-            if (_transportWrapper == null)
-            {
-                SslTransportWrapper sslTransportWrapper = new SimpleSslTransportWrapper(
-                        _protonSslEngineProvider.createSslEngine(_peerDetails),
-                        _inputProcessor,
-                        _outputProcessor);
-
-                if (_domain.allowUnsecuredClient())
-                {
-                    TransportWrapper plainTransportWrapper = new PlainTransportWrapper(_outputProcessor, _inputProcessor);
-                    _transportWrapper = new SslHandshakeSniffingTransportWrapper(sslTransportWrapper, plainTransportWrapper);
-                }
-                else
+            try {
+                if (_initException == null && _transportWrapper == null)
                 {
-                    _transportWrapper = sslTransportWrapper;
+                    SslTransportWrapper sslTransportWrapper = new SimpleSslTransportWrapper
+                        (_protonSslEngineProvider.createSslEngine(_peerDetails),
+                         _inputProcessor, _outputProcessor);
+
+                    if (_domain.allowUnsecuredClient() && _domain.getMode() == SslDomain.Mode.SERVER)
+                    {
+                        TransportWrapper plainTransportWrapper = new PlainTransportWrapper
+                            (_outputProcessor, _inputProcessor);
+                        _transportWrapper = new SslHandshakeSniffingTransportWrapper
+                            (sslTransportWrapper, plainTransportWrapper);
+                    }
+                    else
+                    {
+                        _transportWrapper = sslTransportWrapper;
+                    }
                 }
+            } catch (TransportException e) {
+                _initException = e;
             }
         }
     }

Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/messenger/Messenger.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/messenger/Messenger.java?rev=1566706&r1=1566705&r2=1566706&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/messenger/Messenger.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/messenger/Messenger.java Mon Feb 10 19:16:38 2014
@@ -218,8 +218,48 @@ public interface Messenger
      */
     Status getStatus(Tracker tracker);
 
-    public void route(String pattern, String address);
+    void route(String pattern, String address);
 
-    public void rewrite(String pattern, String address);
+    void rewrite(String pattern, String address);
+
+    /**
+     * Set the path to the certificate file.
+     */
+    void setCertificate(String certificate);
+
+    /**
+     * Get the path to the certificate file.
+     */
+    String getCertificate();
+
+    /**
+     * Set the path to private key file.
+     */
+    void setPrivateKey(String privateKey);
+
+    /**
+     * Get the path to the private key file.
+     */
+    String getPrivateKey();
+
+    /**
+     * Set the password for private key file.
+     */
+    void setPassword(String password);
+
+    /**
+     * Get the password for the priate key file.
+     */
+    String getPassword();
+
+    /**
+     * Set the path to the trusted certificate database.
+     */
+    void setTrustedCertificates(String trusted);
+
+    /**
+     * Get the path to the trusted certificate database.
+     */
+    String getTrustedCertificates();
 
 }

Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/Address.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/Address.java?rev=1566706&r1=1566705&r2=1566706&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/Address.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/Address.java Mon Feb 10 19:16:38 2014
@@ -132,6 +132,21 @@ class Address
         return _port;
     }
 
+    public String getImpliedPort()
+    {
+        if (_port == null) {
+            return getDefaultPort();
+        } else {
+            return getPort();
+        }
+    }
+
+    public String getDefaultPort()
+    {
+        if ("amqps".equals(_scheme)) return "5671";
+        else return "5672";
+    }
+
     public String getName()
     {
         return _name;

Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java?rev=1566706&r1=1566705&r2=1566706&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java Mon Feb 10 19:16:38 2014
@@ -45,6 +45,9 @@ import org.apache.qpid.proton.engine.Rec
 import org.apache.qpid.proton.engine.Sasl;
 import org.apache.qpid.proton.engine.Sender;
 import org.apache.qpid.proton.engine.Session;
+import org.apache.qpid.proton.engine.SslDomain;
+import org.apache.qpid.proton.engine.Ssl;
+import org.apache.qpid.proton.engine.Transport;
 import org.apache.qpid.proton.message.Message;
 import org.apache.qpid.proton.message.MessageFactory;
 import org.apache.qpid.proton.messenger.Messenger;
@@ -98,6 +101,11 @@ public class MessengerImpl implements Me
     private Transform _routes = new Transform();
     private Transform _rewrites = new Transform();
 
+    private String _certificate;
+    private String _privateKey;
+    private String _password;
+    private String _trustedDb;
+
 
     /**
      * @deprecated This constructor's visibility will be reduced to the default scope in a future release.
@@ -137,6 +145,46 @@ public class MessengerImpl implements Me
         _blocking = b;
     }
 
+    public void setCertificate(String certificate)
+    {
+        _certificate = certificate;
+    }
+
+    public String getCertificate()
+    {
+        return _certificate;
+    }
+
+    public void setPrivateKey(String privateKey)
+    {
+        _privateKey = privateKey;
+    }
+
+    public String getPrivateKey()
+    {
+        return _privateKey;
+    }
+
+    public void setPassword(String password)
+    {
+        _password = password;
+    }
+
+    public String getPassword()
+    {
+        return _password;
+    }
+
+    public void setTrustedCertificates(String trusted)
+    {
+        _trustedDb = trusted;
+    }
+
+    public String getTrustedCertificates()
+    {
+        return _trustedDb;
+    }
+
     public void start() throws IOException
     {
         _driver = Proton.driver();
@@ -290,9 +338,7 @@ public class MessengerImpl implements Me
             restoreMessage(m);
         }
 
-        String ports = address.getPort() == null ? defaultPort(address.getScheme()) : address.getPort();
-        int port = Integer.valueOf(ports);
-        Sender sender = getLink(address.getHost(), port, new SenderFinder(address.getName()));
+        Sender sender = getLink(address, new SenderFinder(address.getName()));
         pumpOut(m.getAddress(), sender);
     }
 
@@ -308,17 +354,18 @@ public class MessengerImpl implements Me
             }
         }
 
-        Iterator<Delivery> dIter = link.unsettled();
-        while (dIter != null && dIter.hasNext())
+        Delivery delivery = link.head();
+        while (delivery != null)
         {
-            Delivery delivery = (Delivery) dIter.next();
             StoreEntry entry = (StoreEntry) delivery.getContext();
             if (entry != null)
             {
                 entry.setDelivery(null);
-                if (delivery.isBuffered())
+                if (delivery.isBuffered()) {
                     entry.setStatus(Status.ABORTED);
+                }
             }
+            delivery = delivery.next();
         }
         linkRemoved(link);
     }
@@ -494,15 +541,14 @@ public class MessengerImpl implements Me
 
         String hostName = address.getHost();
         if (hostName == null) throw new MessengerException("Invalid address (hostname cannot be null): " + routed);
-        String ports = address.getPort() == null ? defaultPort(address.getScheme()) : address.getPort();
-        int port = Integer.valueOf(ports);
+        int port = Integer.valueOf(address.getImpliedPort());
         if (address.isPassive())
         {
             if(_logger.isLoggable(Level.FINE))
             {
                 _logger.fine(this + " about to subscribe to source " + source + " using address " + hostName + ":" + port);
             }
-            ListenerContext ctx = new ListenerContext( address.getScheme(), hostName, ports );
+            ListenerContext ctx = new ListenerContext(address);
             _driver.createListener(hostName, port, ctx);
         }
         else
@@ -511,7 +557,7 @@ public class MessengerImpl implements Me
             {
                 _logger.fine(this + " about to subscribe to source " + source);
             }
-            getLink(hostName, port, new ReceiverFinder(address.getName()));
+            getLink(address, new ReceiverFinder(address.getName()));
         }
     }
 
@@ -666,9 +712,10 @@ public class MessengerImpl implements Me
             Connection connection = Proton.connection();
             connection.setContainer(_name);
             ListenerContext ctx = (ListenerContext) l.getContext();
-            connection.setContext(new ConnectionContext(ctx.getService(), c));
+            connection.setContext(new ConnectionContext(ctx.getAddress(), c));
             c.setConnection(connection);
-            //TODO: SSL and full SASL
+            Transport transport = c.getTransport();
+            //TODO: full SASL
             Sasl sasl = c.sasl();
             if (sasl != null)
             {
@@ -676,6 +723,7 @@ public class MessengerImpl implements Me
                 sasl.setMechanisms(new String[]{"ANONYMOUS"});
                 sasl.done(Sasl.SaslOutcome.PN_SASL_OK);
             }
+            transport.ssl(ctx.getDomain());
             connection.open();
         }
         // process connectors, reclaiming credit on closed connectors
@@ -854,13 +902,13 @@ public class MessengerImpl implements Me
         return done;
     }
 
-    private Connection lookup(String host, String service)
+    private Connection lookup(Address address)
     {
         for (Connector<?> c : _driver.connectors())
         {
             Connection connection = c.getConnection();
             ConnectionContext ctx = (ConnectionContext) connection.getContext();
-            if (host.equals(connection.getRemoteContainer()) || service.equals(ctx.getService()))
+            if (ctx.matches(address))
             {
                 return connection;
             }
@@ -1191,18 +1239,19 @@ public class MessengerImpl implements Me
         }
     }
 
-    private <C extends Link> C getLink(String host, int port, LinkFinder<C> finder)
+    private <C extends Link> C getLink(Address address, LinkFinder<C> finder)
     {
-        String service = host + ":" + port;
-        Connection connection = lookup(host, service);
+        Connection connection = lookup(address);
         if (connection == null)
         {
+            String host = address.getHost();
+            int port = Integer.valueOf(address.getImpliedPort());
             Connector<?> connector = _driver.createConnector(host, port, null);
             _logger.log(Level.FINE, "Connecting to " + host + ":" + port);
             connection = Proton.connection();
             connection.setContainer(_name);
             connection.setHostname(host);
-            connection.setContext(new ConnectionContext(service, connector));
+            connection.setContext(new ConnectionContext(address, connector));
             connector.setConnection(connection);
             Sasl sasl = connector.sasl();
             if (sasl != null)
@@ -1210,6 +1259,18 @@ public class MessengerImpl implements Me
                 sasl.client();
                 sasl.setMechanisms(new String[]{"ANONYMOUS"});
             }
+            if ("amqps".equalsIgnoreCase(address.getScheme())) {
+                Transport transport = connector.getTransport();
+                SslDomain domain = makeDomain(address, SslDomain.Mode.CLIENT);
+                if (_trustedDb != null) {
+                    domain.setPeerAuthentication(SslDomain.VerifyMode.VERIFY_PEER);
+                    //domain.setPeerAuthentication(SslDomain.VerifyMode.VERIFY_PEER_NAME);
+                } else {
+                    domain.setPeerAuthentication(SslDomain.VerifyMode.ANONYMOUS_PEER);
+                }
+                Ssl ssl = transport.ssl(domain);
+                //ssl.setPeerHostname(host);
+            }
             connection.open();
         }
 
@@ -1363,12 +1424,6 @@ public class MessengerImpl implements Me
         else return path.equals(source.getAddress());
     }
 
-    private static String defaultPort(String scheme)
-    {
-        if ("amqps".equals(scheme)) return "5671";
-        else return "5672";
-    }
-
     @Override
     public String toString()
     {
@@ -1422,18 +1477,27 @@ public class MessengerImpl implements Me
 
     private class ConnectionContext
     {
-        private String _service;
+        private Address _address;
         private Connector _connector;
 
-        public ConnectionContext(String service, Connector connector)
+        public ConnectionContext(Address address, Connector connector)
         {
-            _service = service;
+            _address = address;
             _connector = connector;
         }
 
-        public String getService()
+        public Address getAddress()
+        {
+            return _address;
+        }
+
+        public boolean matches(Address address)
         {
-            return _service;
+            String host = address.getHost();
+            String port = address.getImpliedPort();
+            Connection conn = _connector.getConnection();
+            return host.equals(conn.getRemoteContainer()) ||
+                (_address.getHost().equals(host) && _address.getImpliedPort().equals(port));
         }
 
         public Connector getConnector()
@@ -1442,32 +1506,47 @@ public class MessengerImpl implements Me
         }
     }
 
-    private class ListenerContext
+    private SslDomain makeDomain(Address address, SslDomain.Mode mode)
     {
-        private String _host;
-        private String _port;
-        private String _service;  // for now. move to subscription later
+        SslDomain domain = Proton.sslDomain();
+        domain.init(mode);
+        if (_certificate != null) {
+            domain.setCredentials(_certificate, _privateKey, _password);
+        }
+        if (_trustedDb != null) {
+            domain.setTrustedCaDb(_trustedDb);
+        }
 
-        public ListenerContext(String service, String host, String port)
-        {
-            _service = service;
-            _host = host;
-            _port = port;
+        if ("amqps".equalsIgnoreCase(address.getScheme())) {
+            domain.allowUnsecuredClient(false);
+        } else {
+            domain.allowUnsecuredClient(true);
         }
 
-        public String getService()
+        return domain;
+    }
+
+
+    private class ListenerContext
+    {
+        private Address _address;
+        private SslDomain _domain;
+
+        public ListenerContext(Address address)
         {
-            return _service;
+            _address = address;
+            _domain = makeDomain(address, SslDomain.Mode.SERVER);
         }
 
-        public String getHost()
+        public SslDomain getDomain()
         {
-            return _host;
+            return _domain;
         }
 
-        public String getPort()
+        public Address getAddress()
         {
-            return _port;
+            return _address;
         }
+
     }
 }

Modified: qpid/proton/trunk/proton-j/src/main/resources/cengine.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/resources/cengine.py?rev=1566706&r1=1566705&r2=1566706&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/resources/cengine.py (original)
+++ qpid/proton/trunk/proton-j/src/main/resources/cengine.py Mon Feb 10 19:16:38 2014
@@ -879,10 +879,11 @@ def pn_transport_pending(trans):
     return trans.error.set(PN_ERR, str(e))
 
 def pn_transport_peek(trans, size):
-  bb = trans.impl.head()
-  size = min(bb.remaining(), size)
+  size = min(trans.impl.pending(), size)
   ba = zeros(size, 'b')
-  bb.get(ba)
+  if size:
+    bb = trans.impl.head()
+    bb.get(ba)
   return 0, ba.tostring()
 
 def pn_transport_pop(trans, size):

Modified: qpid/proton/trunk/proton-j/src/main/resources/cmessenger.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/resources/cmessenger.py?rev=1566706&r1=1566705&r2=1566706&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/resources/cmessenger.py (original)
+++ qpid/proton/trunk/proton-j/src/main/resources/cmessenger.py Mon Feb 10 19:16:38 2014
@@ -41,7 +41,10 @@ class pn_messenger_wrapper:
     self.error = pn_error(0, None)
 
 def pn_messenger(name):
-  return pn_messenger_wrapper(Proton.messenger(name));
+  if name is None:
+    return pn_messenger_wrapper(Proton.messenger())
+  else:
+    return pn_messenger_wrapper(Proton.messenger(name))
 
 def pn_messenger_error(m):
   return m.error
@@ -55,7 +58,20 @@ def pn_messenger_set_blocking(m, b):
   return 0
 
 def pn_messenger_set_certificate(m, c):
-  raise Skipped()
+  m.impl.setCertificate(c)
+  return 0
+
+def pn_messenger_set_private_key(m, p):
+  m.impl.setPrivateKey(p)
+  return 0
+
+def pn_messenger_set_password(m, p):
+  m.impl.setPassword(p)
+  return 0
+
+def pn_messenger_set_trusted_certificates(m, t):
+  m.impl.setTrustedCertificates(t)
+  return 0
 
 def pn_messenger_set_incoming_window(m, w):
   m.impl.setIncomingWindow(w)
@@ -103,6 +119,8 @@ def pn_messenger_interrupt(m):
 def pn_messenger_buffered(m, t):
   raise Skipped()
 
+from org.apache.qpid.proton.engine import TransportException
+
 def pn_messenger_stop(m):
   m.impl.stop()
   return 0

Modified: qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/CannedTransportOutput.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/CannedTransportOutput.java?rev=1566706&r1=1566705&r2=1566706&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/CannedTransportOutput.java (original)
+++ qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/CannedTransportOutput.java Mon Feb 10 19:16:38 2014
@@ -29,6 +29,7 @@ public class CannedTransportOutput imple
 
     private ByteBuffer _cannedOutput;
     private ByteBuffer _head;
+    private int _popped;
 
     public CannedTransportOutput()
     {
@@ -43,6 +44,7 @@ public class CannedTransportOutput imple
     {
         _cannedOutput = ByteBuffer.wrap(output.getBytes());
         _head = _cannedOutput.asReadOnlyBuffer();
+        _popped = 0;
     }
 
     @Override
@@ -60,8 +62,8 @@ public class CannedTransportOutput imple
     @Override
     public void pop(int bytes)
     {
-        _cannedOutput.position(bytes);
-        _head.position(bytes);
+        _popped += bytes;
+        _head.position(_popped);
     }
 
     @Override

Modified: qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapperTest.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapperTest.java?rev=1566706&r1=1566705&r2=1566706&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapperTest.java (original)
+++ qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapperTest.java Mon Feb 10 19:16:38 2014
@@ -238,17 +238,18 @@ public class SimpleSslTransportWrapperTe
     private String getAllBytesFromTransport()
     {
         StringBuilder readBytes = new StringBuilder();
-        boolean continueLooping;
-        do
+        while (true)
         {
-            ByteBuffer buffer = _sslWrapper.head();
-            continueLooping = buffer.hasRemaining();
-
-            readBytes.append(pourBufferToString(buffer));
-
-            _sslWrapper.pop(buffer.position());
+            int pending = _sslWrapper.pending();
+            if (pending > 0) {
+                ByteBuffer buffer = _sslWrapper.head();
+                readBytes.append(pourBufferToString(buffer));
+                _sslWrapper.pop(pending);
+                continue;
+            } else {
+                break;
+            }
         }
-        while(continueLooping);
 
         return readBytes.toString();
     }

Modified: qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/SslHandshakeSniffingTransportWrapperTest.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/SslHandshakeSniffingTransportWrapperTest.java?rev=1566706&r1=1566705&r2=1566706&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/SslHandshakeSniffingTransportWrapperTest.java (original)
+++ qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/SslHandshakeSniffingTransportWrapperTest.java Mon Feb 10 19:16:38 2014
@@ -29,6 +29,7 @@ import static org.mockito.Mockito.when;
 
 import java.nio.ByteBuffer;
 
+import org.apache.qpid.proton.engine.TransportException;
 import org.apache.qpid.proton.engine.impl.TransportWrapper;
 import org.junit.Rule;
 import org.junit.Test;
@@ -107,8 +108,9 @@ public class SslHandshakeSniffingTranspo
         try
         {
             _sniffingWrapper.tail().put(sourceBuffer);
+            _sniffingWrapper.close_tail();
 
-            _expectedException.expect(IllegalArgumentException.class);
+            _expectedException.expect(TransportException.class);
             _sniffingWrapper.process();
         }
         finally

Modified: qpid/proton/trunk/tests/python/proton_tests/ssl.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/ssl.py?rev=1566706&r1=1566705&r2=1566706&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/ssl.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/ssl.py Mon Feb 10 19:16:38 2014
@@ -817,6 +817,10 @@ class MessengerSSLTests(common.Test):
                                 key="server-private-key.pem",
                                 password="server-password",
                                 exception=None):
+        import sys
+        # java doesn't do validation in the same way (yet)
+        if exception and "java" in sys.platform:
+            raise Skipped()
         self.server.certificate = _testpath(cert)
         self.server.private_key = _testpath(key)
         self.server.password = password



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org