You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by fa...@apache.org on 2014/09/06 13:23:14 UTC

svn commit: r1622849 [8/9] - in /qpid/proton/branches/fadams-javascript-binding: ./ contrib/ contrib/proton-hawtdispatch/ contrib/proton-hawtdispatch/src/ contrib/proton-hawtdispatch/src/main/ contrib/proton-hawtdispatch/src/main/java/ contrib/proton-h...

Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java Sat Sep  6 11:23:10 2014
@@ -63,11 +63,11 @@ public class SenderImpl  extends LinkImp
         //TODO.
     }
 
-    public void free()
+    @Override
+    void doFree()
     {
         getSession().freeSender(this);
-        super.free();
-
+        super.doFree();
     }
 
     @Override

Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java Sat Sep  6 11:23:10 2014
@@ -45,7 +45,9 @@ public class SessionImpl extends Endpoin
     SessionImpl(ConnectionImpl connection)
     {
         _connection = connection;
+        _connection.incref();
         _node = _connection.addSessionEndpoint(this);
+        _connection.put(Event.Type.SESSION_INIT, this);
     }
 
     public SenderImpl sender(String name)
@@ -90,25 +92,38 @@ public class SessionImpl extends Endpoin
         return getConnectionImpl();
     }
 
-    public void free()
-    {
-        super.free();
+    @Override
+    void postFinal() {
+        _connection.put(Event.Type.SESSION_FINAL, this);
+        _connection.decref();
+    }
 
+    @Override
+    void doFree() {
         _connection.removeSessionEndpoint(_node);
         _node = null;
 
-        for(SenderImpl sender : _senders.values())
-        {
+        for(SenderImpl sender : _senders.values()) {
             sender.free();
         }
         _senders.clear();
-        for(ReceiverImpl receiver : _receivers.values())
-        {
+        for(ReceiverImpl receiver : _receivers.values()) {
             receiver.free();
         }
         _receivers.clear();
     }
 
+    void modifyEndpoints() {
+        for (SenderImpl snd : _senders.values()) {
+            snd.modifyEndpoints();
+        }
+
+        for (ReceiverImpl rcv : _receivers.values()) {
+            rcv.modifyEndpoints();
+        }
+        modified();
+    }
+
     TransportSession getTransportSession()
     {
         return _transportSession;
@@ -184,11 +199,14 @@ public class SessionImpl extends Endpoin
     }
 
     @Override
-    protected void localStateChanged()
+    void localOpen()
     {
-        EventImpl ev = getConnectionImpl().put(Event.Type.SESSION_LOCAL_STATE);
-        if (ev != null) {
-            ev.init(this);
-        }
+        getConnectionImpl().put(Event.Type.SESSION_OPEN, this);
+    }
+
+    @Override
+    void localClose()
+    {
+        getConnectionImpl().put(Event.Type.SESSION_CLOSE, this);
     }
 }

Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java Sat Sep  6 11:23:10 2014
@@ -17,14 +17,14 @@
 
 package org.apache.qpid.proton.engine.impl;
 
-import static org.apache.qpid.proton.engine.impl.ByteBufferUtils.newReadableBuffer;
-import static org.apache.qpid.proton.engine.impl.ByteBufferUtils.pour;
 import static org.apache.qpid.proton.engine.impl.ByteBufferUtils.pourArrayToBuffer;
 import static org.apache.qpid.proton.engine.impl.ByteBufferUtils.pourBufferToArray;
 
 import java.nio.ByteBuffer;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.UnsignedInteger;
@@ -32,6 +32,7 @@ import org.apache.qpid.proton.amqp.Unsig
 import org.apache.qpid.proton.amqp.transport.Attach;
 import org.apache.qpid.proton.amqp.transport.Begin;
 import org.apache.qpid.proton.amqp.transport.Close;
+import org.apache.qpid.proton.amqp.transport.ConnectionError;
 import org.apache.qpid.proton.amqp.transport.Detach;
 import org.apache.qpid.proton.amqp.transport.Disposition;
 import org.apache.qpid.proton.amqp.transport.End;
@@ -44,10 +45,8 @@ import org.apache.qpid.proton.amqp.trans
 import org.apache.qpid.proton.codec.AMQPDefinedTypes;
 import org.apache.qpid.proton.codec.DecoderImpl;
 import org.apache.qpid.proton.codec.EncoderImpl;
-import org.apache.qpid.proton.codec.WritableBuffer;
 import org.apache.qpid.proton.engine.Connection;
 import org.apache.qpid.proton.engine.EndpointState;
-import org.apache.qpid.proton.engine.EngineFactory;
 import org.apache.qpid.proton.engine.Event;
 import org.apache.qpid.proton.engine.ProtonJTransport;
 import org.apache.qpid.proton.engine.Sasl;
@@ -66,7 +65,20 @@ public class TransportImpl extends Endpo
     implements ProtonJTransport, FrameBody.FrameBodyHandler<Integer>,
         FrameHandler, TransportOutputWriter
 {
-    private static final byte AMQP_FRAME_TYPE = 0;
+    static final int BUFFER_RELEASE_THRESHOLD = Integer.getInteger("proton.transport_buffer_release_threshold", 2 * 1024 * 1024);
+
+    private static final boolean getBooleanEnv(String name)
+    {
+        String value = System.getenv(name);
+        return "true".equalsIgnoreCase(value) ||
+            "1".equals(value) ||
+            "yes".equalsIgnoreCase(value);
+    }
+
+    private static final boolean FRM_ENABLED = getBooleanEnv("PN_TRACE_FRM");
+
+    // trace levels
+    private int _levels = (FRM_ENABLED ? this.TRACE_FRM : 0);
 
     private FrameParser _frameParser;
 
@@ -100,16 +112,16 @@ public class TransportImpl extends Endpo
     private Open _open;
     private SaslImpl _sasl;
     private SslImpl _ssl;
-    private ProtocolTracer _protocolTracer = null;
-
-    private ByteBuffer _lastInputBuffer;
+    private final Ref<ProtocolTracer> _protocolTracer = new Ref(null);
 
     private TransportResult _lastTransportResult = TransportResultFactory.ok();
 
     private boolean _init;
+    private boolean _processingStarted;
 
     private FrameHandler _frameHandler = this;
     private boolean _head_closed = false;
+    private TransportException _tail_error = null;
 
     /**
      * @deprecated This constructor's visibility will be reduced to the default scope in a future release.
@@ -134,7 +146,6 @@ public class TransportImpl extends Endpo
                                        FrameWriter.AMQP_FRAME_TYPE,
                                        _protocolTracer,
                                        this);
-
     }
 
     private void init()
@@ -149,6 +160,11 @@ public class TransportImpl extends Endpo
     }
 
     @Override
+    public void trace(int levels) {
+        _levels = levels;
+    }
+
+    @Override
     public int getMaxFrameSize()
     {
         return _maxFrameSize;
@@ -194,9 +210,10 @@ public class TransportImpl extends Endpo
     @Override
     public void bind(Connection conn)
     {
-        // TODO - check if already bound
-        ((ConnectionImpl) conn).setTransport(this);
         _connectionEndpoint = (ConnectionImpl) conn;
+        // TODO - check if already bound
+        _connectionEndpoint.setTransport(this);
+        _connectionEndpoint.incref();
 
         if(getRemoteState() != EndpointState.UNINITIALIZED)
         {
@@ -211,6 +228,23 @@ public class TransportImpl extends Endpo
     }
 
     @Override
+    public void unbind()
+    {
+        _connectionEndpoint.modifyEndpoints();
+
+        _connectionEndpoint.setTransport(null);
+        _connectionEndpoint.decref();
+
+        for (TransportSession ts: _transportSessionState.values()) {
+            ts.unbind();
+        }
+
+        for (TransportLink tl: _transportLinkState.values()) {
+            tl.unbind();
+        }
+    }
+
+    @Override
     public int input(byte[] bytes, int offset, int length)
     {
         oldApiCheckStateBeforeInput(length).checkIsOk();
@@ -278,8 +312,13 @@ public class TransportImpl extends Endpo
     {
         if(_sasl == null)
         {
+            if(_processingStarted)
+            {
+                throw new IllegalStateException("Sasl can't be initiated after transport has started processing");
+            }
+
             init();
-            _sasl = new SaslImpl(_remoteMaxFrameSize);
+            _sasl = new SaslImpl(this, _remoteMaxFrameSize);
             TransportWrapper transportWrapper = _sasl.wrap(_inputProcessor, _outputProcessor);
             _inputProcessor = transportWrapper;
             _outputProcessor = transportWrapper;
@@ -334,38 +373,37 @@ public class TransportImpl extends Endpo
                        && transportLink.isLocalHandleSet()
                        && !_isCloseSent)
                     {
-                        if(!(link instanceof SenderImpl)
-                           || link.getQueued() == 0
-                           || transportLink.detachReceived()
-                           || transportSession.endReceived()
-                           || _closeReceived)
-                        {
-                            UnsignedInteger localHandle = transportLink.getLocalHandle();
-                            transportLink.clearLocalHandle();
-                            transportSession.freeLocalHandle(localHandle);
+                        if((link instanceof SenderImpl)
+                           && link.getQueued() > 0
+                           && !transportLink.detachReceived()
+                           && !transportSession.endReceived()
+                           && !_closeReceived) {
+                            endpoint = endpoint.transportNext();
+                            continue;
+                        }
 
+                        UnsignedInteger localHandle = transportLink.getLocalHandle();
+                        transportLink.clearLocalHandle();
+                        transportSession.freeLocalHandle(localHandle);
 
-                            Detach detach = new Detach();
-                            detach.setHandle(localHandle);
-                            // TODO - need an API for detaching rather than closing the link
-                            detach.setClosed(true);
 
-                            ErrorCondition localError = link.getCondition();
-                            if( localError.getCondition() !=null )
-                            {
-                                detach.setError(localError);
-                            }
+                        Detach detach = new Detach();
+                        detach.setHandle(localHandle);
+                        // TODO - need an API for detaching rather than closing the link
+                        detach.setClosed(true);
 
+                        ErrorCondition localError = link.getCondition();
+                        if( localError.getCondition() !=null )
+                        {
+                            detach.setError(localError);
+                        }
 
-                            writeFrame(transportSession.getLocalChannel(), detach, null, null);
-                            endpoint.clearModified();
 
-                            // TODO - temporary hack for PROTON-154, this line should be removed and replaced
-                            //        with proper handling for closed links
-                            link.free();
-                        }
+                        writeFrame(transportSession.getLocalChannel(), detach, null, null);
                     }
 
+                    endpoint.clearModified();
+
                 }
                 endpoint = endpoint.transportNext();
             }
@@ -411,8 +449,6 @@ public class TransportImpl extends Endpo
                         sender.setDrained(0);
 
                         writeFlow(transportSession, transportLink);
-
-                        endpoint.clearModified();
                     }
 
                 }
@@ -470,7 +506,7 @@ public class TransportImpl extends Endpo
         if(!delivery.isDone() &&
            (delivery.getDataLength() > 0 || delivery != snd.current()) &&
            tpSession.hasOutgoingCredit() && tpLink.hasCredit() &&
-           tpLink.getLocalHandle() != null)
+           tpLink.getLocalHandle() != null && !_frameWriter.isFull())
         {
             UnsignedInteger deliveryId = tpSession.getOutgoingDeliveryId();
             TransportDelivery tpDelivery = new TransportDelivery(deliveryId, delivery, tpLink);
@@ -529,6 +565,8 @@ public class TransportImpl extends Endpo
                 delivery.setDataLength(payload.remaining());
                 session.incrementOutgoingBytes(-delta);
             }
+
+            getConnectionImpl().put(Event.Type.LINK_FLOW, snd);
         }
 
         if(wasDone && delivery.getLocalState() != null)
@@ -596,10 +634,6 @@ public class TransportImpl extends Endpo
                         {
                             transportLink.addCredit(credits);
                             writeFlow(transportSession, transportLink);
-                            if(receiver.getLocalState() == EndpointState.ACTIVE)
-                            {
-                                endpoint.clearModified();
-                            }
                         }
                     }
                 }
@@ -689,10 +723,6 @@ public class TransportImpl extends Endpo
 
                             writeFrame(transportSession.getLocalChannel(), attach, null, null);
                             transportLink.sentAttach();
-                            if(link.getLocalState() == EndpointState.ACTIVE && (link instanceof SenderImpl || !link.hasCredit()))
-                            {
-                                endpoint.clearModified();
-                            }
                         }
                     }
                 }
@@ -712,15 +742,22 @@ public class TransportImpl extends Endpo
 
     private void processOpen()
     {
-        if(_connectionEndpoint != null && _connectionEndpoint.getLocalState() != EndpointState.UNINITIALIZED && !_isOpenSent)
-        {
+        if ((_tail_error != null ||
+             (_connectionEndpoint != null &&
+              _connectionEndpoint.getLocalState() != EndpointState.UNINITIALIZED)) &&
+            !_isOpenSent) {
             Open open = new Open();
-            String cid = _connectionEndpoint.getLocalContainerId();
-            open.setContainerId(cid == null ? "" : cid);
-            open.setHostname(_connectionEndpoint.getHostname());
-            open.setDesiredCapabilities(_connectionEndpoint.getDesiredCapabilities());
-            open.setOfferedCapabilities(_connectionEndpoint.getOfferedCapabilities());
-            open.setProperties(_connectionEndpoint.getProperties());
+            if (_connectionEndpoint != null) {
+                String cid = _connectionEndpoint.getLocalContainerId();
+                open.setContainerId(cid == null ? "" : cid);
+                open.setHostname(_connectionEndpoint.getHostname());
+                open.setDesiredCapabilities(_connectionEndpoint.getDesiredCapabilities());
+                open.setOfferedCapabilities(_connectionEndpoint.getOfferedCapabilities());
+                open.setProperties(_connectionEndpoint.getProperties());
+            } else {
+                open.setContainerId("");
+            }
+
             if (_maxFrameSize > 0) {
                 open.setMaxFrameSize(UnsignedInteger.valueOf(_maxFrameSize));
             }
@@ -731,7 +768,6 @@ public class TransportImpl extends Endpo
             _isOpenSent = true;
 
             writeFrame(0, open, null, null);
-
         }
     }
 
@@ -762,10 +798,6 @@ public class TransportImpl extends Endpo
 
                         writeFrame(channelId, begin, null, null);
                         transportSession.sentBegin();
-                        if(session.getLocalState() == EndpointState.ACTIVE)
-                        {
-                            endpoint.clearModified();
-                        }
                     }
                 }
                 endpoint = endpoint.transportNext();
@@ -829,21 +861,27 @@ public class TransportImpl extends Endpo
                 SessionImpl session;
                 TransportSession transportSession;
 
-                if((endpoint instanceof SessionImpl)
-                   && (session = (SessionImpl)endpoint).getLocalState() == EndpointState.CLOSED
-                   && (transportSession = session.getTransportSession()).isLocalChannelSet()
-                   && !hasSendableMessages(session)
-                   && !_isCloseSent)
-                {
-                    int channel = freeLocalChannel(transportSession);
-                    End end = new End();
-                    ErrorCondition localError = endpoint.getCondition();
-                    if( localError.getCondition() !=null )
+                if((endpoint instanceof SessionImpl)) {
+                    if ((session = (SessionImpl)endpoint).getLocalState() == EndpointState.CLOSED
+                        && (transportSession = session.getTransportSession()).isLocalChannelSet()
+                        && !_isCloseSent)
                     {
-                        end.setError(localError);
+                        if (hasSendableMessages(session)) {
+                            endpoint = endpoint.transportNext();
+                            continue;
+                        }
+
+                        int channel = freeLocalChannel(transportSession);
+                        End end = new End();
+                        ErrorCondition localError = endpoint.getCondition();
+                        if( localError.getCondition() !=null )
+                        {
+                            end.setError(localError);
+                        }
+
+                        writeFrame(channel, end, null, null);
                     }
 
-                    writeFrame(channel, end, null, null);
                     endpoint.clearModified();
                 }
 
@@ -854,6 +892,9 @@ public class TransportImpl extends Endpo
 
     private boolean hasSendableMessages(SessionImpl session)
     {
+        if (_connectionEndpoint == null) {
+            return false;
+        }
 
         if(!_closeReceived && (session == null || !session.getTransportSession().endReceived()))
         {
@@ -878,14 +919,24 @@ public class TransportImpl extends Endpo
 
     private void processClose()
     {
-        if(_connectionEndpoint != null && _connectionEndpoint.getLocalState() == EndpointState.CLOSED && !_isCloseSent)
-        {
+        if ((_tail_error != null ||
+             (_connectionEndpoint != null &&
+              _connectionEndpoint.getLocalState() == EndpointState.CLOSED)) &&
+            !_isCloseSent) {
             if(!hasSendableMessages(null))
             {
                 Close close = new Close();
 
-                ErrorCondition localError = _connectionEndpoint.getCondition();
-                if( localError.getCondition() !=null )
+                ErrorCondition localError;
+
+                if (_connectionEndpoint == null) {
+                    localError = new ErrorCondition(ConnectionError.FRAMING_ERROR,
+                                                    _tail_error.toString());
+                } else {
+                    localError =  _connectionEndpoint.getCondition();
+                }
+
+                if(localError.getCondition() != null)
                 {
                     close.setError(localError);
                 }
@@ -893,6 +944,10 @@ public class TransportImpl extends Endpo
                 _isCloseSent = true;
 
                 writeFrame(0, close, null, null);
+
+                if (_connectionEndpoint != null) {
+                    _connectionEndpoint.clearModified();
+                }
             }
         }
     }
@@ -912,10 +967,10 @@ public class TransportImpl extends Endpo
     }
 
     @Override
-    public void free()
-    {
-        super.free();
-    }
+    void postFinal() {}
+
+    @Override
+    void doFree() { }
 
     //==================================================================================================================
     // handle incoming amqp data
@@ -967,6 +1022,9 @@ public class TransportImpl extends Endpo
             {
                 // TODO check null
                 transportSession = _localSessions.get(begin.getRemoteChannel().intValue());
+                if (transportSession == null) {
+                    throw new NullPointerException("uncorrelated channel: " + begin.getRemoteChannel());
+                }
                 session = transportSession.getSession();
 
             }
@@ -975,10 +1033,7 @@ public class TransportImpl extends Endpo
             transportSession.setNextIncomingId(begin.getNextOutgoingId());
             _remoteSessions.put(channel, transportSession);
 
-            EventImpl ev = _connectionEndpoint.put(Event.Type.SESSION_REMOTE_STATE);
-            if (ev != null) {
-                ev.init(session);
-            }
+            _connectionEndpoint.put(Event.Type.SESSION_REMOTE_OPEN, session);
         }
 
     }
@@ -1034,10 +1089,7 @@ public class TransportImpl extends Endpo
 
             }
 
-            EventImpl ev = _connectionEndpoint.put(Event.Type.LINK_REMOTE_STATE);
-            if (ev != null) {
-                ev.init(link);
-            }
+            _connectionEndpoint.put(Event.Type.LINK_REMOTE_OPEN, link);
         }
     }
 
@@ -1102,16 +1154,13 @@ public class TransportImpl extends Endpo
                 LinkImpl link = transportLink.getLink();
                 transportLink.receivedDetach();
                 transportSession.freeRemoteHandle(transportLink.getRemoteHandle());
+                _connectionEndpoint.put(Event.Type.LINK_REMOTE_CLOSE, link);
+                transportLink.clearRemoteHandle();
                 link.setRemoteState(EndpointState.CLOSED);
                 if(detach.getError() != null)
                 {
                     link.getRemoteCondition().copyFrom(detach.getError());
                 }
-
-                EventImpl ev = _connectionEndpoint.put(Event.Type.LINK_REMOTE_STATE);
-                if (ev != null) {
-                    ev.init(link);
-                }
             }
             else
             {
@@ -1140,10 +1189,7 @@ public class TransportImpl extends Endpo
                 session.getRemoteCondition().copyFrom(errorCondition);
             }
 
-            EventImpl ev = _connectionEndpoint.put(Event.Type.SESSION_REMOTE_STATE);
-            if (ev != null) {
-                ev.init(session);
-            }
+            _connectionEndpoint.put(Event.Type.SESSION_REMOTE_CLOSE, session);
         }
     }
 
@@ -1160,10 +1206,7 @@ public class TransportImpl extends Endpo
                 _connectionEndpoint.getRemoteCondition().copyFrom(close.getError());
             }
 
-            EventImpl ev = _connectionEndpoint.put(Event.Type.CONNECTION_REMOTE_STATE);
-            if (ev != null) {
-                ev.init(_connectionEndpoint);
-            }
+            _connectionEndpoint.put(Event.Type.CONNECTION_REMOTE_CLOSE, _connectionEndpoint);
         }
 
     }
@@ -1176,11 +1219,12 @@ public class TransportImpl extends Endpo
             throw new IllegalStateException("Transport cannot accept frame: " + frame);
         }
 
-        log(this, INCOMING, frame);
+        log(INCOMING, frame);
 
-        if( _protocolTracer != null )
+        ProtocolTracer tracer = _protocolTracer.get();
+        if( tracer != null )
         {
-            _protocolTracer.receivedFrame(frame);
+            tracer.receivedFrame(frame);
         }
 
         frame.getBody().invoke(this,frame.getPayload(), frame.getChannel());
@@ -1188,10 +1232,15 @@ public class TransportImpl extends Endpo
     }
 
     @Override
-    public void closed()
+    public void closed(TransportException error)
     {
-        if (!_closeReceived) {
-            throw new TransportException("connection aborted");
+        if (!_closeReceived || error != null) {
+            if (error == null) {
+                _tail_error = new TransportException("connection aborted");
+            } else {
+                _tail_error = error;
+            }
+            _head_closed = true;
         }
     }
 
@@ -1204,13 +1253,13 @@ public class TransportImpl extends Endpo
     @Override
     public ProtocolTracer getProtocolTracer()
     {
-        return _protocolTracer;
+        return _protocolTracer.get();
     }
 
     @Override
     public void setProtocolTracer(ProtocolTracer protocolTracer)
     {
-        this._protocolTracer = protocolTracer;
+        this._protocolTracer.set(protocolTracer);
     }
 
     @Override
@@ -1260,6 +1309,8 @@ public class TransportImpl extends Endpo
     @Override
     public void process() throws TransportException
     {
+        _processingStarted = true;
+
         try {
             init();
             _inputProcessor.process();
@@ -1303,6 +1354,12 @@ public class TransportImpl extends Endpo
         _outputProcessor.close_head();
     }
 
+    public boolean isClosed() {
+        int p = pending();
+        int c = capacity();
+        return  p == END_OF_STREAM && c == END_OF_STREAM;
+    }
+
     @Override
     public String toString()
     {
@@ -1337,21 +1394,11 @@ public class TransportImpl extends Endpo
     static String INCOMING = "<-";
     static String OUTGOING = "->";
 
-    private static final boolean getBooleanEnv(String name)
+    void log(String event, TransportFrame frame)
     {
-        String value = System.getenv(name);
-        return "true".equalsIgnoreCase(value) ||
-            "1".equals(value) ||
-            "yes".equalsIgnoreCase(value);
-    }
-
-    private static final boolean ENABLED = getBooleanEnv("PN_TRACE_FRM");
-
-    static void log(Object ctx, String event, TransportFrame frame)
-    {
-        if (ENABLED) {
+        if ((_levels & TRACE_FRM) != 0) {
             StringBuilder msg = new StringBuilder();
-            msg.append("[").append(System.identityHashCode(ctx)).append(":")
+            msg.append("[").append(System.identityHashCode(this)).append(":")
                 .append(frame.getChannel()).append("]");
             msg.append(" ").append(event).append(" ").append(frame.getBody());
             if (frame.getPayload() != null) {
@@ -1366,7 +1413,8 @@ public class TransportImpl extends Endpo
     }
 
     @Override
-    protected void localStateChanged()
-    {
-    }
+    void localOpen() {}
+
+    @Override
+    void localClose() {}
 }

Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportLink.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportLink.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportLink.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportLink.java Sat Sep  6 11:23:10 2014
@@ -51,6 +51,12 @@ class TransportLink<T extends LinkImpl>
                        : new TransportSender((SenderImpl)link));
     }
 
+    void unbind()
+    {
+        clearLocalHandle();
+        clearRemoteHandle();
+    }
+
     public UnsignedInteger getLocalHandle()
     {
         return _localHandle;
@@ -58,6 +64,9 @@ class TransportLink<T extends LinkImpl>
 
     public void setLocalHandle(UnsignedInteger localHandle)
     {
+        if (_localHandle == null) {
+            _link.incref();
+        }
         _localHandle = localHandle;
     }
 
@@ -78,6 +87,9 @@ class TransportLink<T extends LinkImpl>
 
     public void clearLocalHandle()
     {
+        if (_localHandle != null) {
+            _link.decref();
+        }
         _localHandle = null;
     }
 
@@ -88,9 +100,20 @@ class TransportLink<T extends LinkImpl>
 
     public void setRemoteHandle(UnsignedInteger remoteHandle)
     {
+        if (_remoteHandle == null) {
+            _link.incref();
+        }
         _remoteHandle = remoteHandle;
     }
 
+    public void clearRemoteHandle()
+    {
+        if (_remoteHandle != null) {
+            _link.decref();
+        }
+        _remoteHandle = null;
+    }
+
     public UnsignedInteger getDeliveryCount()
     {
         return _deliveryCount;
@@ -122,10 +145,7 @@ class TransportLink<T extends LinkImpl>
         _remoteLinkCredit = flow.getLinkCredit();
 
 
-        EventImpl ev = _link.getConnectionImpl().put(Event.Type.LINK_FLOW);
-        if (ev != null) {
-            ev.init(_link);
-        }
+        _link.getConnectionImpl().put(Event.Type.LINK_FLOW, _link);
     }
 
     void setLinkCredit(UnsignedInteger linkCredit)

Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportOutputAdaptor.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportOutputAdaptor.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportOutputAdaptor.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportOutputAdaptor.java Sat Sep  6 11:23:10 2014
@@ -26,23 +26,20 @@ import org.apache.qpid.proton.engine.Tra
 
 class TransportOutputAdaptor implements TransportOutput
 {
-    private TransportOutputWriter _transportOutputWriter;
+    private static final ByteBuffer _emptyHead = newReadableBuffer(0).asReadOnlyBuffer();
 
-    private final ByteBuffer _outputBuffer;
-    private final ByteBuffer _head;
+    private final TransportOutputWriter _transportOutputWriter;
+    private final int _maxFrameSize;
+
+    private ByteBuffer _outputBuffer = null;
+    private ByteBuffer _head = null;
     private boolean _output_done = false;
     private boolean _head_closed = false;
 
     TransportOutputAdaptor(TransportOutputWriter transportOutputWriter, int maxFrameSize)
     {
         _transportOutputWriter = transportOutputWriter;
-        if (maxFrameSize > 0) {
-            _outputBuffer = newWriteableBuffer(maxFrameSize);
-        } else {
-            _outputBuffer = newWriteableBuffer(4*1024);
-        }
-        _head = _outputBuffer.asReadOnlyBuffer();
-        _head.limit(0);
+        _maxFrameSize = maxFrameSize > 0 ? maxFrameSize : 4*1024;
     }
 
     @Override
@@ -52,13 +49,26 @@ class TransportOutputAdaptor implements 
             return Transport.END_OF_STREAM;
         }
 
+        if(_outputBuffer == null)
+        {
+            init_buffers();
+        }
+
         _output_done = _transportOutputWriter.writeInto(_outputBuffer);
         _head.limit(_outputBuffer.position());
 
-        if (_output_done && _outputBuffer.position() == 0) {
+        if (_outputBuffer.position() == 0 && _outputBuffer.capacity() > TransportImpl.BUFFER_RELEASE_THRESHOLD)
+        {
+            release_buffers();
+        }
+
+        if (_output_done && (_outputBuffer == null || _outputBuffer.position() == 0))
+        {
             return Transport.END_OF_STREAM;
-        } else {
-            return _outputBuffer.position();
+        }
+        else
+        {
+            return _outputBuffer == null ? 0 : _outputBuffer.position();
         }
     }
 
@@ -66,24 +76,40 @@ class TransportOutputAdaptor implements 
     public ByteBuffer head()
     {
         pending();
-        return _head;
+        return _head != null ? _head : _emptyHead;
     }
 
     @Override
     public void pop(int bytes)
     {
-        _outputBuffer.flip();
-        _outputBuffer.position(bytes);
-        _outputBuffer.compact();
-        _head.position(0);
-        _head.limit(_outputBuffer.position());
+        if (_outputBuffer != null) {
+            _outputBuffer.flip();
+            _outputBuffer.position(bytes);
+            _outputBuffer.compact();
+            _head.position(0);
+            _head.limit(_outputBuffer.position());
+            if (_outputBuffer.position() == 0 && _outputBuffer.capacity() > TransportImpl.BUFFER_RELEASE_THRESHOLD) {
+                release_buffers();
+            }
+        }
     }
 
     @Override
     public void close_head()
     {
         _head_closed = true;
-        _transportOutputWriter.closed();
+        _transportOutputWriter.closed(null);
+        release_buffers();
+    }
+
+    private void init_buffers() {
+        _outputBuffer = newWriteableBuffer(_maxFrameSize);
+        _head = _outputBuffer.asReadOnlyBuffer();
+        _head.limit(0);
     }
 
+    private void release_buffers() {
+        _head = null;
+        _outputBuffer = null;
+    }
 }

Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportOutputWriter.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportOutputWriter.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportOutputWriter.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportOutputWriter.java Sat Sep  6 11:23:10 2014
@@ -20,6 +20,8 @@ package org.apache.qpid.proton.engine.im
 
 import java.nio.ByteBuffer;
 
+import org.apache.qpid.proton.engine.TransportException;
+
 interface TransportOutputWriter
 {
     /**
@@ -28,6 +30,6 @@ interface TransportOutputWriter
      */
     boolean writeInto(ByteBuffer outputBuffer);
 
-    void closed();
+    void closed(TransportException error);
 
 }

Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java Sat Sep  6 11:23:10 2014
@@ -69,6 +69,12 @@ class TransportSession
         _session = session;
     }
 
+    void unbind()
+    {
+        unsetLocalChannel();
+        unsetRemoteChannel();
+    }
+
     public SessionImpl getSession()
     {
         return _session;
@@ -81,6 +87,9 @@ class TransportSession
 
     public void setLocalChannel(int localChannel)
     {
+        if (!isLocalChannelSet()) {
+            _session.incref();
+        }
         _localChannel = localChannel;
     }
 
@@ -91,6 +100,9 @@ class TransportSession
 
     public void setRemoteChannel(int remoteChannel)
     {
+        if (!isRemoteChannelSet()) {
+            _session.incref();
+        }
         _remoteChannel = remoteChannel;
     }
 
@@ -116,11 +128,17 @@ class TransportSession
 
     public void unsetLocalChannel()
     {
+        if (isLocalChannelSet()) {
+            _session.decref();
+        }
         _localChannel = -1;
     }
 
     public void unsetRemoteChannel()
     {
+        if (isRemoteChannelSet()) {
+            _session.decref();
+        }
         _remoteChannel = -1;
     }
 
@@ -262,7 +280,7 @@ class TransportSession
             _unsettledIncomingDeliveriesById.put(_incomingDeliveryId, delivery);
             getSession().incrementIncomingDeliveries(1);
         }
-        if( transfer.getState()!=null ) 
+        if( transfer.getState()!=null )
         {
             delivery.setRemoteDeliveryState(transfer.getState());
         }
@@ -308,15 +326,12 @@ class TransportSession
             delivery.getLink().modified(false);
         }
 
-        EventImpl ev = getSession().getConnection().put(Event.Type.DELIVERY);
-        if (ev != null) {
-            ev.init(delivery);
-        }
+        getSession().getConnection().put(Event.Type.DELIVERY, delivery);
     }
 
     public void freeLocalChannel()
     {
-        _localChannel = -1;
+        unsetLocalChannel();
     }
 
     private void setRemoteIncomingWindow(UnsignedInteger incomingWindow)
@@ -394,10 +409,7 @@ class TransportSession
                 }
                 delivery.updateWork();
 
-                EventImpl ev = getSession().getConnection().put(Event.Type.DELIVERY);
-                if (ev != null) {
-                    ev.init(delivery);
-                }
+                getSession().getConnection().put(Event.Type.DELIVERY, delivery);
             }
             id = id.add(UnsignedInteger.ONE);
         }

Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapper.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapper.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapper.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapper.java Sat Sep  6 11:23:10 2014
@@ -338,22 +338,13 @@ public class SimpleSslTransportWrapper i
 
         _inputBuffer.flip();
 
-        try
-        {
-            try {
-                unwrapInput();
-            } catch (SSLException e) {
-                throw new TransportException(e);
-            }
-        }
-        catch (TransportException e)
-        {
+        try {
+            unwrapInput();
+        } catch (SSLException e) {
+            _logger.log(Level.WARNING, e.getMessage());
             _inputBuffer.position(_inputBuffer.limit());
             _tail_closed = true;
-            throw e;
-        }
-        finally
-        {
+        } finally {
             _inputBuffer.compact();
         }
     }
@@ -374,17 +365,17 @@ public class SimpleSslTransportWrapper i
         try {
             wrapOutput();
         } catch (SSLException e) {
-            throw new TransportException(e);
+            _logger.log(Level.WARNING, e.getMessage());
+            _head_closed = true;
         }
 
         _head.limit(_outputBuffer.position());
 
-        if (_head_closed && _outputBuffer.position() == 0)
-        {
+        if (_head_closed && _outputBuffer.position() == 0) {
             return Transport.END_OF_STREAM;
-        } else {
-            return _outputBuffer.position();
         }
+
+        return _outputBuffer.position();
     }
 
     @Override
@@ -408,6 +399,10 @@ public class SimpleSslTransportWrapper i
     public void close_head()
     {
         _underlyingOutput.close_head();
+        int p = pending();
+        if (p > 0) {
+            pop(p);
+        }
     }
 
 

Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslDomainImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslDomainImpl.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslDomainImpl.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslDomainImpl.java Sat Sep  6 11:23:10 2014
@@ -19,7 +19,6 @@
 package org.apache.qpid.proton.engine.impl.ssl;
 
 import org.apache.qpid.proton.ProtonUnsupportedOperationException;
-import org.apache.qpid.proton.engine.EngineFactory;
 import org.apache.qpid.proton.engine.ProtonJSslDomain;
 import org.apache.qpid.proton.engine.SslDomain;
 import org.apache.qpid.proton.engine.SslPeerDetails;

Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslPeerDetailsImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslPeerDetailsImpl.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslPeerDetailsImpl.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslPeerDetailsImpl.java Sat Sep  6 11:23:10 2014
@@ -18,7 +18,6 @@
  */
 package org.apache.qpid.proton.engine.impl.ssl;
 
-import org.apache.qpid.proton.engine.EngineFactory;
 import org.apache.qpid.proton.engine.ProtonJSslPeerDetails;
 
 

Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/message/Message.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/message/Message.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/message/Message.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/message/Message.java Sat Sep  6 11:23:10 2014
@@ -28,6 +28,8 @@ import org.apache.qpid.proton.amqp.messa
 import org.apache.qpid.proton.amqp.messaging.Properties;
 import org.apache.qpid.proton.amqp.messaging.Section;
 
+import org.apache.qpid.proton.message.impl.MessageImpl;
+
 /**
  * Represents a Message within Proton.
  *
@@ -36,6 +38,27 @@ import org.apache.qpid.proton.amqp.messa
  */
 public interface Message
 {
+
+    public static final class Factory
+    {
+        public static Message create() {
+            return new MessageImpl();
+        }
+
+        public static Message create(Header header,
+                                     DeliveryAnnotations deliveryAnnotations,
+                                     MessageAnnotations messageAnnotations,
+                                     Properties properties,
+                                     ApplicationProperties applicationProperties,
+                                     Section body,
+                                     Footer footer) {
+            return new MessageImpl(header, deliveryAnnotations,
+                                   messageAnnotations, properties,
+                                   applicationProperties, body, footer);
+        }
+    }
+
+
     short DEFAULT_PRIORITY = 4;
 
     boolean isDurable();

Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/message/impl/MessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/message/impl/MessageImpl.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/message/impl/MessageImpl.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/message/impl/MessageImpl.java Sat Sep  6 11:23:10 2014
@@ -573,8 +573,15 @@ public class MessageImpl implements Prot
     @Override
     public int decode(byte[] data, int offset, int length)
     {
-        DecoderImpl decoder = tlsCodec.get().decoder;
         final ByteBuffer buffer = ByteBuffer.wrap(data, offset, length);
+        decode(buffer);
+
+        return length-buffer.remaining();
+    }
+
+    public void decode(ByteBuffer buffer)
+    {
+        DecoderImpl decoder = tlsCodec.get().decoder;
         decoder.setByteBuffer(buffer);
 
         _header = null;
@@ -680,9 +687,6 @@ public class MessageImpl implements Prot
         }
 
         decoder.setByteBuffer(null);
-        
-        return length-buffer.remaining();
-
     }
 
     @Override

Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/messenger/Messenger.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/messenger/Messenger.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/messenger/Messenger.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/messenger/Messenger.java Sat Sep  6 11:23:10 2014
@@ -25,6 +25,8 @@ import java.io.IOException;
 import org.apache.qpid.proton.TimeoutException;
 import org.apache.qpid.proton.message.Message;
 
+import org.apache.qpid.proton.messenger.impl.MessengerImpl;
+
 /**
  *
  *  Messenger defines a high level interface for sending and receiving
@@ -69,6 +71,18 @@ import org.apache.qpid.proton.message.Me
 */
 public interface Messenger
 {
+
+    public static final class Factory
+    {
+        public static Messenger create() {
+            return new MessengerImpl();
+        }
+
+        public static Messenger create(String name) {
+            return new MessengerImpl(name);
+        }
+    }
+
     /**
      * Flag for use with reject(), accept() and settle() methods.
      */

Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java Sat Sep  6 11:23:10 2014
@@ -29,17 +29,14 @@ import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.qpid.proton.Proton;
-import org.apache.qpid.proton.ProtonFactoryLoader;
 import org.apache.qpid.proton.InterruptException;
 import org.apache.qpid.proton.TimeoutException;
 import org.apache.qpid.proton.driver.Connector;
 import org.apache.qpid.proton.driver.Driver;
-import org.apache.qpid.proton.driver.DriverFactory;
 import org.apache.qpid.proton.driver.Listener;
 import org.apache.qpid.proton.engine.Connection;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.EndpointState;
-import org.apache.qpid.proton.engine.EngineFactory;
 import org.apache.qpid.proton.engine.Link;
 import org.apache.qpid.proton.engine.Receiver;
 import org.apache.qpid.proton.engine.Sasl;
@@ -49,10 +46,8 @@ import org.apache.qpid.proton.engine.Ssl
 import org.apache.qpid.proton.engine.Ssl;
 import org.apache.qpid.proton.engine.Transport;
 import org.apache.qpid.proton.message.Message;
-import org.apache.qpid.proton.message.MessageFactory;
 import org.apache.qpid.proton.messenger.Messenger;
 import org.apache.qpid.proton.messenger.MessengerException;
-import org.apache.qpid.proton.messenger.MessengerFactory;
 import org.apache.qpid.proton.messenger.Status;
 import org.apache.qpid.proton.messenger.Tracker;
 import org.apache.qpid.proton.amqp.messaging.Source;
@@ -1449,14 +1444,16 @@ public class MessengerImpl implements Me
         {
             _receivers++;
             _blocked.add((Receiver)link);
+            link.setContext(Boolean.TRUE);
         }
     }
 
     // a link is being removed, account for it.
     private void linkRemoved(Link _link)
     {
-        if (_link instanceof Receiver)
+        if (_link instanceof Receiver && (Boolean) _link.getContext())
         {
+            _link.setContext(Boolean.FALSE);
             Receiver link = (Receiver)_link;
             assert _receivers > 0;
             _receivers--;

Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/resources/cengine.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/resources/cengine.py?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/resources/cengine.py (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/resources/cengine.py Sat Sep  6 11:23:10 2014
@@ -25,7 +25,7 @@ from org.apache.qpid.proton.amqp.transac
 from org.apache.qpid.proton.amqp.transport import ErrorCondition, \
   SenderSettleMode, ReceiverSettleMode
 from org.apache.qpid.proton.engine import EndpointState, Sender, \
-  Receiver, TransportException
+  Receiver, Transport, TransportException
 
 from java.util import EnumSet
 from jarray import array, zeros
@@ -57,10 +57,10 @@ PN_NONDURABLE = 0
 PN_CONFIGURATION = 1
 PN_DELIVERIES = 2
 
-PN_LINK_CLOSE = 0
-PN_SESSION_CLOSE = 1
-PN_CONNECTION_CLOSE = 2
-PN_NEVER = 3
+PN_EXPIRE_WITH_LINK = 0
+PN_EXPIRE_WITH_SESSION = 1
+PN_EXPIRE_WITH_CONNECTION = 2
+PN_EXPIRE_NEVER = 3
 
 PN_DIST_MODE_UNSPECIFIED = 0
 PN_DIST_MODE_COPY = 1
@@ -72,10 +72,10 @@ PN_REJECTED = (0x0000000000000025)
 PN_RELEASED = (0x0000000000000026)
 PN_MODIFIED = (0x0000000000000027)
 
-PN_TRACE_OFF = (0)
-PN_TRACE_RAW = (1)
-PN_TRACE_FRM = (2)
-PN_TRACE_DRV = (4)
+PN_TRACE_OFF = Transport.TRACE_OFF
+PN_TRACE_RAW = Transport.TRACE_RAW
+PN_TRACE_FRM = Transport.TRACE_FRM
+PN_TRACE_DRV = Transport.TRACE_DRV
 
 def wrap(obj, wrapper):
   if obj:
@@ -98,7 +98,11 @@ class pn_condition:
       self.description = None
       self.info.clear()
     else:
-      self.name = impl.getCondition().toString()
+      cond = impl.getCondition()
+      if cond is None:
+        self.name = None
+      else:
+        self.name = cond.toString()
       self.description = impl.getDescription()
       obj2dat(impl.getInfo(), self.info)
 
@@ -222,6 +226,9 @@ def pn_connection_set_container(conn, na
 def pn_connection_remote_container(conn):
   return conn.impl.getRemoteContainer()
 
+def pn_connection_get_hostname(conn):
+  return conn.impl.getHostname()
+
 def pn_connection_set_hostname(conn, name):
   conn.impl.setHostname(name)
 
@@ -244,6 +251,9 @@ def pn_connection_close(conn):
   conn.on_close()
   conn.impl.close()
 
+def pn_connection_free(conn):
+  conn.impl.free()
+
 class pn_session_wrapper(endpoint_wrapper):
   pass
 
@@ -325,7 +335,7 @@ def pn_receiver(ssn, name):
   return wrap(ssn.impl.receiver(name), pn_link_wrapper)
 
 def pn_session_free(ssn):
-  ssn.impl = None
+  ssn.impl.free()
 
 TERMINUS_TYPES_J2P = {
   Source: PN_SOURCE,
@@ -354,17 +364,17 @@ DURABILITY_J2P = {
 }
 
 EXPIRY_POLICY_P2J = {
-  PN_LINK_CLOSE: TerminusExpiryPolicy.LINK_DETACH,
-  PN_SESSION_CLOSE: TerminusExpiryPolicy.SESSION_END,
-  PN_CONNECTION_CLOSE: TerminusExpiryPolicy.CONNECTION_CLOSE,
-  PN_NEVER: TerminusExpiryPolicy.NEVER
+  PN_EXPIRE_WITH_LINK: TerminusExpiryPolicy.LINK_DETACH,
+  PN_EXPIRE_WITH_SESSION: TerminusExpiryPolicy.SESSION_END,
+  PN_EXPIRE_WITH_CONNECTION: TerminusExpiryPolicy.CONNECTION_CLOSE,
+  PN_EXPIRE_NEVER: TerminusExpiryPolicy.NEVER
 }
 
 EXPIRY_POLICY_J2P = {
-  TerminusExpiryPolicy.LINK_DETACH: PN_LINK_CLOSE,
-  TerminusExpiryPolicy.SESSION_END: PN_SESSION_CLOSE,
-  TerminusExpiryPolicy.CONNECTION_CLOSE: PN_CONNECTION_CLOSE,
-  TerminusExpiryPolicy.NEVER: PN_NEVER
+  TerminusExpiryPolicy.LINK_DETACH: PN_EXPIRE_WITH_LINK,
+  TerminusExpiryPolicy.SESSION_END: PN_EXPIRE_WITH_SESSION,
+  TerminusExpiryPolicy.CONNECTION_CLOSE: PN_EXPIRE_WITH_CONNECTION,
+  TerminusExpiryPolicy.NEVER: PN_EXPIRE_NEVER
 }
 
 DISTRIBUTION_MODE_P2J = {
@@ -385,7 +395,7 @@ class pn_terminus:
     self.type = type
     self.address = None
     self.durability = PN_NONDURABLE
-    self.expiry_policy = PN_SESSION_CLOSE
+    self.expiry_policy = PN_EXPIRE_WITH_SESSION
     self.distribution_mode = PN_DIST_MODE_UNSPECIFIED
     self.timeout = 0
     self.dynamic = False
@@ -587,6 +597,9 @@ def pn_link_remote_rcv_settle_mode(link)
 def pn_link_is_sender(link):
   return isinstance(link.impl, Sender)
 
+def pn_link_is_receiver(link):
+  return isinstance(link.impl, Receiver)
+
 def pn_link_head(conn, mask):
   local, remote = mask2set(mask)
   return wrap(conn.impl.linkHead(local, remote), pn_link_wrapper)
@@ -652,7 +665,7 @@ def pn_link_current(link):
   return wrap(link.impl.current(), pn_delivery_wrapper)
 
 def pn_link_free(link):
-  link.impl = None
+  link.impl.free()
 
 def pn_work_head(conn):
   return wrap(conn.impl.getWorkHead(), pn_delivery_wrapper)
@@ -802,6 +815,9 @@ def pn_delivery_get_context(dlv):
 def pn_delivery_set_context(dlv, ctx):
   dlv.context = ctx
 
+def pn_delivery_partial(dlv):
+  return dlv.impl.isPartial()
+
 def pn_delivery_pending(dlv):
   return dlv.impl.pending()
 
@@ -847,7 +863,6 @@ class pn_transport_wrapper:
 
   def __init__(self, impl):
     self.impl = impl
-    self.error = pn_error(0, None)
 
 def pn_transport():
   return wrap(Proton.transport(), pn_transport_wrapper)
@@ -877,15 +892,15 @@ def pn_transport_bind(trans, conn):
   trans.impl.bind(conn.impl)
   return 0
 
+def pn_transport_unbind(trans):
+  trans.impl.unbind()
+  return 0
+
 def pn_transport_trace(trans, n):
-  # XXX
-  pass
+  trans.impl.trace(n)
 
 def pn_transport_pending(trans):
-  try:
-    return trans.impl.pending()
-  except TransportException, e:
-    return trans.error.set(PN_ERR, str(e))
+  return trans.impl.pending()
 
 def pn_transport_peek(trans, size):
   size = min(trans.impl.pending(), size)
@@ -893,6 +908,7 @@ def pn_transport_peek(trans, size):
   if size:
     bb = trans.impl.head()
     bb.get(ba)
+    bb.position(0)
   return 0, ba.tostring()
 
 def pn_transport_pop(trans, size):
@@ -906,47 +922,51 @@ def pn_transport_push(trans, input):
   if cap < 0:
     return cap
   elif len(input) > cap:
-    return PN_OVERFLOW
-  else:
-    bb = trans.impl.tail()
-    bb.put(array(input, 'b'))
-    try:
-      trans.impl.process()
-      return 0
-    except TransportException, e:
-      trans.error = pn_error(PN_ERR, str(e))
-      return PN_ERR
+    input = input[:cap]
+
+  bb = trans.impl.tail()
+  bb.put(array(input, 'b'))
+  trans.impl.process()
+  return len(input)
 
 def pn_transport_close_head(trans):
-  try:
-    trans.impl.close_head()
-    return 0
-  except TransportException, e:
-    trans.error = pn_error(PN_ERR, str(e))
-    return PN_ERR
+  trans.impl.close_head()
+  return 0
 
 def pn_transport_close_tail(trans):
-  try:
-    trans.impl.close_tail()
-    return 0
-  except TransportException, e:
-    trans.error = pn_error(PN_ERR, str(e))
-    return PN_ERR
+  trans.impl.close_tail()
+  return 0
 
-def pn_transport_error(trans):
-  return trans.error
+def pn_transport_closed(trans):
+  return trans.impl.isClosed()
 
 from org.apache.qpid.proton.engine import Event
 
-PN_EVENT_CATEGORY_PROTOCOL = Event.Category.PROTOCOL
-
-PN_CONNECTION_LOCAL_STATE = Event.Type.CONNECTION_LOCAL_STATE
-PN_CONNECTION_REMOTE_STATE = Event.Type.CONNECTION_REMOTE_STATE
-PN_SESSION_LOCAL_STATE = Event.Type.SESSION_LOCAL_STATE
-PN_SESSION_REMOTE_STATE = Event.Type.SESSION_REMOTE_STATE
-PN_LINK_LOCAL_STATE = Event.Type.LINK_LOCAL_STATE
-PN_LINK_REMOTE_STATE = Event.Type.LINK_REMOTE_STATE
+PN_EVENT_CATEGORY_CONNECTION = Event.Category.CONNECTION
+PN_EVENT_CATEGORY_SESSION = Event.Category.SESSION
+PN_EVENT_CATEGORY_LINK = Event.Category.LINK
+PN_EVENT_CATEGORY_DELIVERY = Event.Category.DELIVERY
+PN_EVENT_CATEGORY_TRANSPORT = Event.Category.TRANSPORT
+
+PN_CONNECTION_INIT = Event.Type.CONNECTION_INIT
+PN_CONNECTION_OPEN = Event.Type.CONNECTION_OPEN
+PN_CONNECTION_REMOTE_OPEN = Event.Type.CONNECTION_REMOTE_OPEN
+PN_CONNECTION_CLOSE = Event.Type.CONNECTION_CLOSE
+PN_CONNECTION_REMOTE_CLOSE = Event.Type.CONNECTION_REMOTE_CLOSE
+PN_CONNECTION_FINAL = Event.Type.CONNECTION_FINAL
+PN_SESSION_INIT = Event.Type.SESSION_INIT
+PN_SESSION_OPEN = Event.Type.SESSION_OPEN
+PN_SESSION_REMOTE_OPEN = Event.Type.SESSION_REMOTE_OPEN
+PN_SESSION_CLOSE = Event.Type.SESSION_CLOSE
+PN_SESSION_REMOTE_CLOSE = Event.Type.SESSION_REMOTE_CLOSE
+PN_SESSION_FINAL = Event.Type.SESSION_FINAL
+PN_LINK_INIT = Event.Type.LINK_INIT
+PN_LINK_OPEN = Event.Type.LINK_OPEN
+PN_LINK_REMOTE_OPEN = Event.Type.LINK_REMOTE_OPEN
+PN_LINK_CLOSE = Event.Type.LINK_CLOSE
+PN_LINK_REMOTE_CLOSE = Event.Type.LINK_REMOTE_CLOSE
 PN_LINK_FLOW = Event.Type.LINK_FLOW
+PN_LINK_FINAL = Event.Type.LINK_FINAL
 PN_DELIVERY = Event.Type.DELIVERY
 PN_TRANSPORT = Event.Type.TRANSPORT
 

Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/resources/csasl.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/resources/csasl.py?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/resources/csasl.py (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/resources/csasl.py Sat Sep  6 11:23:10 2014
@@ -29,6 +29,7 @@ PN_SASL_AUTH=1
 PN_SASL_SYS=2
 PN_SASL_PERM=3
 PN_SASL_TEMP=4
+PN_SASL_SKIPPED=5
 
 PN_SASL_CONF = 0
 PN_SASL_IDLE = 1
@@ -53,7 +54,8 @@ SASL_OUTCOMES_P2J = {
   PN_SASL_AUTH: Sasl.PN_SASL_AUTH,
   PN_SASL_SYS: Sasl.PN_SASL_SYS,
   PN_SASL_PERM: Sasl.PN_SASL_PERM,
-  PN_SASL_TEMP: Sasl.PN_SASL_TEMP
+  PN_SASL_TEMP: Sasl.PN_SASL_TEMP,
+  PN_SASL_SKIPPED: Sasl.PN_SASL_SKIPPED
 }
 
 SASL_OUTCOMES_J2P = {
@@ -62,7 +64,8 @@ SASL_OUTCOMES_J2P = {
   Sasl.PN_SASL_AUTH: PN_SASL_AUTH,
   Sasl.PN_SASL_SYS: PN_SASL_SYS,
   Sasl.PN_SASL_PERM: PN_SASL_PERM,
-  Sasl.PN_SASL_TEMP: PN_SASL_TEMP
+  Sasl.PN_SASL_TEMP: PN_SASL_TEMP,
+  Sasl.PN_SASL_SKIPPED: PN_SASL_SKIPPED
 }
 
 def pn_sasl_state(sasl):
@@ -77,6 +80,9 @@ def pn_sasl_client(sasl):
 def pn_sasl_server(sasl):
   sasl.server()
 
+def pn_sasl_allow_skip(sasl, allow):
+  sasl.allowSkip(allow)
+
 def pn_sasl_done(sasl, outcome):
   sasl.done(SASL_OUTCOMES_P2J[outcome])
 

Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/FrameParserTest.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/FrameParserTest.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/FrameParserTest.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/FrameParserTest.java Sat Sep  6 11:23:10 2014
@@ -41,6 +41,7 @@ import org.apache.qpid.proton.amqp.trans
 import org.apache.qpid.proton.codec.AMQPDefinedTypes;
 import org.apache.qpid.proton.codec.DecoderImpl;
 import org.apache.qpid.proton.codec.EncoderImpl;
+import org.apache.qpid.proton.engine.Transport;
 import org.apache.qpid.proton.engine.TransportException;
 import org.apache.qpid.proton.engine.TransportResult;
 import org.apache.qpid.proton.engine.TransportResult.Status;
@@ -63,9 +64,6 @@ public class FrameParserTest
 
     private final AmqpFramer _amqpFramer = new AmqpFramer();
 
-    @Rule
-    public ExpectedException _expectedException = ExpectedException.none();
-
     @Before
     public void setUp()
     {
@@ -80,16 +78,8 @@ public class FrameParserTest
         String headerMismatchMessage = "AMQP header mismatch";
         ByteBuffer buffer = _frameParser.tail();
         buffer.put("hello".getBytes());
-        try {
-            _frameParser.process();
-            fail("expected exception");
-        } catch (TransportException e) {
-            assertThat(e.getMessage(), containsString(headerMismatchMessage));
-        }
-
-        _expectedException.expect(TransportException.class);
-        _expectedException.expectMessage(headerMismatchMessage);
-        _frameParser.tail();
+        _frameParser.process();
+        assertEquals(_frameParser.capacity(), Transport.END_OF_STREAM);
     }
 
     @Test

Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java Sat Sep  6 11:23:10 2014
@@ -207,4 +207,19 @@ public class TransportImplTest
         assertTrue("Expecting second buffer to have bytes", buf.remaining() > 0);
         assertTrue("Expecting second buffer to not be full", buf.remaining() < Transport.MIN_MAX_FRAME_SIZE);
     }
+
+    @Test
+    public void testAttemptToInitiateSaslAfterProcessingBeginsCausesIllegalStateException()
+    {
+        _transport.process();
+
+        try
+        {
+            _transport.sasl();
+        }
+        catch(IllegalStateException ise)
+        {
+            //expected, sasl must be initiated before processing begins
+        }
+    }
 }

Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportOutputAdaptorTest.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportOutputAdaptorTest.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportOutputAdaptorTest.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportOutputAdaptorTest.java Sat Sep  6 11:23:10 2014
@@ -25,6 +25,8 @@ import static org.junit.Assert.assertEqu
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import org.apache.qpid.proton.engine.TransportException;
+
 import java.nio.ByteBuffer;
 
 import org.junit.Test;
@@ -143,7 +145,7 @@ public class TransportOutputAdaptorTest
             _cannedOutput = cannedOutput;
         }
 
-        public void closed()
+        public void closed(TransportException error)
         {
             // do nothing
         }

Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapperTest.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapperTest.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapperTest.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapperTest.java Sat Sep  6 11:23:10 2014
@@ -32,6 +32,7 @@ import java.nio.ByteBuffer;
 
 import javax.net.ssl.SSLException;
 
+import org.apache.qpid.proton.engine.Transport;
 import org.apache.qpid.proton.engine.TransportException;
 import org.junit.Before;
 import org.junit.Rule;
@@ -132,16 +133,8 @@ public class SimpleSslTransportWrapperTe
         _dummySslEngine.rejectNextEncodedPacket(sslException);
 
         _sslWrapper.tail().put("<-A->".getBytes());
-        try {
-            _sslWrapper.process();
-            fail("no exception");
-        } catch (TransportException e) {
-            assertSame(sslException, e.getCause());
-            assertEquals("", _underlyingInput.getAcceptedInput());
-        }
-
-        _expectedException.expect(TransportException.class);
-        _sslWrapper.tail();
+        _sslWrapper.process();
+        assertEquals(_sslWrapper.capacity(), Transport.END_OF_STREAM);
     }
 
     @Test

Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/systemtests/ProtonEngineExampleTest.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/systemtests/ProtonEngineExampleTest.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/systemtests/ProtonEngineExampleTest.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/systemtests/ProtonEngineExampleTest.java Sat Sep  6 11:23:10 2014
@@ -26,7 +26,6 @@ import static org.apache.qpid.proton.eng
 import static org.apache.qpid.proton.engine.EndpointState.CLOSED;
 import static org.apache.qpid.proton.engine.EndpointState.UNINITIALIZED;
 import static org.apache.qpid.proton.systemtests.TestLoggingHelper.bold;
-import static org.apache.qpid.proton.systemtests.engine.ProtonFactoryTestFixture.isProtonJ;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 
@@ -34,7 +33,7 @@ import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.logging.Logger;
 
-import org.apache.qpid.proton.ProtonFactoryLoader;
+import org.apache.qpid.proton.Proton;
 import org.apache.qpid.proton.amqp.messaging.Accepted;
 import org.apache.qpid.proton.amqp.messaging.AmqpValue;
 import org.apache.qpid.proton.amqp.messaging.Section;
@@ -45,10 +44,8 @@ import org.apache.qpid.proton.amqp.trans
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Endpoint;
 import org.apache.qpid.proton.engine.EndpointState;
-import org.apache.qpid.proton.engine.EngineFactory;
 import org.apache.qpid.proton.engine.Receiver;
 import org.apache.qpid.proton.message.Message;
-import org.apache.qpid.proton.message.MessageFactory;
 import org.junit.Test;
 
 /**
@@ -80,26 +77,23 @@ public class ProtonEngineExampleTest
 
     private final String _targetAddress = _server.containerId + "-link1-target";
 
-    private final EngineFactory _engineFactory = new ProtonFactoryLoader<EngineFactory>(EngineFactory.class).loadFactory();
-    private final MessageFactory _messageFactory = new ProtonFactoryLoader<MessageFactory>(MessageFactory.class).loadFactory();
-
     @Test
     public void test() throws Exception
     {
         LOGGER.fine(bold("======== About to create transports"));
 
-        _client.transport = _engineFactory.createTransport();
+        _client.transport = Proton.transport();
         ProtocolTracerEnabler.setProtocolTracer(_client.transport, TestLoggingHelper.CLIENT_PREFIX);
 
-        _server.transport = _engineFactory.createTransport();
+        _server.transport = Proton.transport();
         ProtocolTracerEnabler.setProtocolTracer(_server.transport, "            " + TestLoggingHelper.SERVER_PREFIX);
 
         doOutputInputCycle();
 
-        _client.connection = _engineFactory.createConnection();
+        _client.connection = Proton.connection();
         _client.transport.bind(_client.connection);
 
-        _server.connection = _engineFactory.createConnection();
+        _server.connection = Proton.connection();
         _server.transport.bind(_server.connection);
 
 
@@ -182,7 +176,7 @@ public class ProtonEngineExampleTest
 
         LOGGER.fine(bold("======== About to create a message and send it to the server"));
 
-        _client.message = _messageFactory.createMessage();
+        _client.message = Proton.message();
         Section messageBody = new AmqpValue("Hello");
         _client.message.setBody(messageBody);
         _client.messageData = new byte[BUFFER_SIZE];
@@ -195,12 +189,7 @@ public class ProtonEngineExampleTest
         assertEquals("For simplicity, assume the sender can accept all the data",
                      lengthOfEncodedMessage, numberOfBytesAcceptedBySender);
 
-        if (isProtonJ(_engineFactory))
-        {
-            // TODO PROTON-261: Proton-c ProtonJNI.pn_delivery_local_state is returning 0, which doesn't map to an
-            // value within the C enum.
-            assertNull(_client.delivery.getLocalState());
-        }
+        assertNull(_client.delivery.getLocalState());
 
         boolean senderAdvanced = _client.sender.advance();
         assertTrue("sender has not advanced", senderAdvanced);
@@ -213,11 +202,8 @@ public class ProtonEngineExampleTest
         _server.delivery = _server.connection.getWorkHead();
         assertEquals("The received delivery should be on our receiver",
                 _server.receiver, _server.delivery.getLink());
-        if (isProtonJ(_engineFactory))
-        {
-            assertNull(_server.delivery.getLocalState());
-            assertNull(_server.delivery.getRemoteState());
-        }
+        assertNull(_server.delivery.getLocalState());
+        assertNull(_server.delivery.getRemoteState());
 
         assertFalse(_server.delivery.isPartial());
         assertTrue(_server.delivery.isReadable());
@@ -226,7 +212,7 @@ public class ProtonEngineExampleTest
         int numberOfBytesProducedByReceiver = _server.receiver.recv(_server.messageData, 0, BUFFER_SIZE);
         assertEquals(numberOfBytesAcceptedBySender, numberOfBytesProducedByReceiver);
 
-        _server.message = _messageFactory.createMessage();
+        _server.message = Proton.message();
         _server.message.decode(_server.messageData, 0, numberOfBytesProducedByReceiver);
 
         boolean messageProcessed = applicationProcessMessage(_server.message);

Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/systemtests/SimpleTest.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/systemtests/SimpleTest.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/systemtests/SimpleTest.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/systemtests/SimpleTest.java Sat Sep  6 11:23:10 2014
@@ -20,10 +20,9 @@ package org.apache.qpid.proton.systemtes
 
 import static org.junit.Assert.assertEquals;
 
-import org.apache.qpid.proton.ProtonFactoryLoader;
+import org.apache.qpid.proton.Proton;
 import org.apache.qpid.proton.engine.Connection;
 import org.apache.qpid.proton.engine.EndpointState;
-import org.apache.qpid.proton.engine.EngineFactory;
 import org.apache.qpid.proton.engine.Transport;
 import org.junit.Test;
 
@@ -33,14 +32,12 @@ public class SimpleTest
     @Test
     public void test()
     {
-        EngineFactory engineFactory = new ProtonFactoryLoader<EngineFactory>(EngineFactory.class).loadFactory();
-
-        Connection connection1 = engineFactory.createConnection();
-        Connection connection2 = engineFactory.createConnection();;
-        Transport transport1 = engineFactory.createTransport();
+        Connection connection1 = Proton.connection();
+        Connection connection2 = Proton.connection();;
+        Transport transport1 = Proton.transport();
         transport1.bind(connection1);
 
-        Transport transport2 = engineFactory.createTransport();
+        Transport transport2 = Proton.transport();
         transport2.bind(connection2);
 
         assertEquals(EndpointState.UNINITIALIZED, connection1.getLocalState());

Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/systemtests/engine/ConnectionTest.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/systemtests/engine/ConnectionTest.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/systemtests/engine/ConnectionTest.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/systemtests/engine/ConnectionTest.java Sat Sep  6 11:23:10 2014
@@ -23,8 +23,6 @@ import static java.util.EnumSet.of;
 import static org.apache.qpid.proton.engine.EndpointState.ACTIVE;
 import static org.apache.qpid.proton.engine.EndpointState.CLOSED;
 import static org.apache.qpid.proton.engine.EndpointState.UNINITIALIZED;
-import static org.apache.qpid.proton.systemtests.engine.ProtonFactoryTestFixture.isProtonC;
-import static org.apache.qpid.proton.systemtests.engine.ProtonFactoryTestFixture.isProtonJ;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
@@ -34,6 +32,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.qpid.proton.Proton;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.transport.Close;
 import org.apache.qpid.proton.amqp.transport.ErrorCondition;
@@ -41,7 +40,6 @@ import org.apache.qpid.proton.amqp.trans
 import org.apache.qpid.proton.engine.Connection;
 import org.apache.qpid.proton.engine.Endpoint;
 import org.apache.qpid.proton.engine.EndpointState;
-import org.apache.qpid.proton.engine.EngineFactory;
 import org.apache.qpid.proton.engine.Session;
 import org.apache.qpid.proton.engine.Transport;
 import org.apache.qpid.proton.engine.TransportException;
@@ -59,18 +57,13 @@ public class ConnectionTest
     private static final String SERVER_CONTAINER = "serverContainer";
     private static final String CLIENT_CONTAINER = "clientContainer";
 
-    private final ProtonFactoryTestFixture _protonFactoryTestFixture = new ProtonFactoryTestFixture();
-
-    private EngineFactory _clientFactory = _protonFactoryTestFixture.getFactory1();
-    private EngineFactory _serverFactory = _protonFactoryTestFixture.getFactory2();
-
-    private final Transport _clientTransport = _clientFactory.createTransport();
-    private final Transport _serverTransport = _serverFactory.createTransport();
+    private final Transport _clientTransport = Proton.transport();
+    private final Transport _serverTransport = Proton.transport();
 
     private final TransportPumper _pumper = new TransportPumper(_clientTransport, _serverTransport);
 
-    private final Connection _clientConnection = _clientFactory.createConnection();
-    private final Connection _serverConnection = _serverFactory.createConnection();
+    private final Connection _clientConnection = Proton.connection();
+    private final Connection _serverConnection = Proton.connection();
 
     private final AmqpFramer _framer = new AmqpFramer();
 
@@ -87,7 +80,7 @@ public class ConnectionTest
 
 
     /** Container id is a mandatory field so this should cause an error */
-    @Test(expected=TransportException.class)
+    @Test
     public void testReceiptOfOpenWithoutContainerId_causesTODO()
     {
         _pumper.pumpAll();
@@ -97,7 +90,7 @@ public class ConnectionTest
 
         int serverConsumed = _serverTransport.input(openFrameBuffer, 0, openFrameBuffer.length);
         assertEquals(openFrameBuffer.length, serverConsumed);
-        assumeTrue(isProtonJ(_serverFactory));
+        assertEquals(_serverTransport.capacity(), Transport.END_OF_STREAM);
     }
 
     /**
@@ -268,10 +261,7 @@ public class ConnectionTest
         _pumper.pumpOnceFromClientToServer();
 
         assertEnpointState(_clientConnection, CLOSED, UNINITIALIZED);
-        if (!isProtonC(_serverFactory))
-        {
-            assertEnpointState(_serverConnection, UNINITIALIZED, CLOSED);
-        }
+        assertEnpointState(_serverConnection, UNINITIALIZED, CLOSED);
     }
 
     /**
@@ -341,9 +331,6 @@ public class ConnectionTest
     @SuppressWarnings({ "rawtypes", "unchecked" })
     public void testCloseConnectionWithErrorCode_causesCloseFrameContainingErrorCodeToBeSent()
     {
-        // TODO Proton-c fails if no remote condition is set
-        assumeTrue(isProtonJ(_clientFactory) && isProtonJ(_serverFactory));
-
         bindAndOpenConnections();
 
         /*
@@ -411,8 +398,6 @@ public class ConnectionTest
         Close surprisingClose = new Close();
 
         byte[] buf = _framer.generateFrame(0, surprisingClose);
-        assumeTrue(isProtonJ(_serverFactory));
-        // TODO Proton-C: function pn_do_close causes a SEGV fault if you try and close an unopened connection
         _serverTransport.input(buf, 0, buf.length);
 
         // TODO server should indicate error

Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/systemtests/engine/TransportTest.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/systemtests/engine/TransportTest.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/systemtests/engine/TransportTest.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/test/java/org/apache/qpid/proton/systemtests/engine/TransportTest.java Sat Sep  6 11:23:10 2014
@@ -22,7 +22,7 @@ package org.apache.qpid.proton.systemtes
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 
-import org.apache.qpid.proton.engine.EngineFactory;
+import org.apache.qpid.proton.Proton;
 import org.apache.qpid.proton.engine.Transport;
 import org.apache.qpid.proton.engine.TransportException;
 import org.junit.Ignore;
@@ -34,9 +34,7 @@ import org.junit.Test;
  */
 public class TransportTest
 {
-    private final ProtonFactoryTestFixture _protonFactoryTestFixture = new ProtonFactoryTestFixture();
-    private final EngineFactory _factory = _protonFactoryTestFixture.getFactory1();
-    private final Transport _transport = _factory.createTransport();
+    private final Transport _transport = Proton.transport();
 
     /**
      * Note that Proton does not yet give the application explicit control over protocol version negotiation

Modified: qpid/proton/branches/fadams-javascript-binding/tests/java/org/apache/qpid/proton/InteropTest.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/tests/java/org/apache/qpid/proton/InteropTest.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/tests/java/org/apache/qpid/proton/InteropTest.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/tests/java/org/apache/qpid/proton/InteropTest.java Sat Sep  6 11:23:10 2014
@@ -19,13 +19,11 @@
 package org.apache.qpid.proton;
 
 import org.apache.qpid.proton.TestDecoder;
-import org.apache.qpid.proton.ProtonFactoryLoader;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.DescribedType;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.AmqpValue;
 import org.apache.qpid.proton.message.Message;
-import org.apache.qpid.proton.message.MessageFactory;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertArrayEquals;
@@ -69,8 +67,7 @@ public class InteropTest
     Message decodeMessage(String name) throws IOException
     {
         byte[] data = getBytes(name);
-        MessageFactory mf = new ProtonFactoryLoader<MessageFactory>(MessageFactory.class).loadFactory();
-        Message m = mf.createMessage();
+        Message m = Proton.message();
         m.decode(data, 0, data.length);
         return m;
     }

Modified: qpid/proton/branches/fadams-javascript-binding/tests/python/proton_tests/common.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/tests/python/proton_tests/common.py?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/tests/python/proton_tests/common.py (original)
+++ qpid/proton/branches/fadams-javascript-binding/tests/python/proton_tests/common.py Sat Sep  6 11:23:10 2014
@@ -21,7 +21,7 @@ from random import randint
 from threading import Thread
 from socket import socket, AF_INET, SOCK_STREAM
 from subprocess import Popen,PIPE,STDOUT
-import sys, os
+import sys, os, string
 from proton import Driver, Connection, Transport, SASL, Endpoint, Delivery, \
     SSLDomain, SSLUnavailable
 
@@ -46,42 +46,37 @@ def free_tcp_ports(count=1):
     s.close()
   return ports
 
+def pump_uni(src, dst, buffer_size=1024):
+  p = src.pending()
+  c = dst.capacity()
+
+  if c < 0:
+    if p < 0:
+      return False
+    else:
+      src.close_head()
+      return True
+
+  if p < 0:
+    dst.close_tail()
+  elif p == 0 or c == 0:
+    return False
+  else:
+    bytes = src.peek(min(c, buffer_size))
+    dst.push(bytes)
+    src.pop(len(bytes))
+
+  return True
 
 def pump(transport1, transport2, buffer_size=1024):
   """ Transfer all pending bytes between two Proton engines
-      by repeatedly calling input and output.
+      by repeatedly calling peek/pop and push.
       Asserts that each engine accepts some bytes every time
       (unless it's already closed).
   """
-
-  out1_leftover_by_t2 = ""
-  out2_leftover_by_t1 = ""
-  i = 0
-
-  while True:
-    out1 = out1_leftover_by_t2 + (transport1.output(buffer_size) or "")
-    out2 = out2_leftover_by_t1 + (transport2.output(buffer_size) or "")
-
-    if out1:
-      number_t2_consumed = transport2.input(out1)
-      if number_t2_consumed is None:
-        # special None return value means input is closed so discard the leftovers
-        out1_leftover_by_t2 = ""
-      else:
-        assert number_t2_consumed > 0, (number_t2_consumed, len(out1), out1[:100])
-        out1_leftover_by_t2 = out1[number_t2_consumed:]
-
-    if out2:
-      number_t1_consumed = transport1.input(out2)
-      if number_t1_consumed is None:
-        # special None return value means input is closed so discard the leftovers
-        out2_leftover_by_t1 = ""
-      else:
-        assert number_t1_consumed > 0, (number_t1_consumed, len(out1), out1[:100])
-        out2_leftover_by_t1 = out2[number_t1_consumed:]
-
-    if not out1 and not out2: break
-    i = i + 1
+  while (pump_uni(transport1, transport2, buffer_size) or
+         pump_uni(transport2, transport1, buffer_size)):
+    pass
 
 def isSSLPresent():
     """ True if a suitable SSL library is available.
@@ -335,6 +330,16 @@ class MessengerApp(object):
         self.password = None
         self._output = None
 
+    def findfile(self, filename, searchpath):
+        """Find filename in the searchpath
+            return absolute path to the file or None
+        """
+        paths = string.split(searchpath, os.pathsep)
+        for path in paths:
+            if os.path.exists(os.path.join(path, filename)):
+                return os.path.abspath(os.path.join(path, filename))
+        return None
+
     def start(self, verbose=False):
         """ Begin executing the test """
         cmd = self.cmdline()
@@ -343,8 +348,20 @@ class MessengerApp(object):
             print("COMMAND='%s'" % str(cmd))
         #print("ENV='%s'" % str(os.environ.copy()))
         try:
+            if os.name=="nt":
+                # Windows handles python launch by replacing script 'filename' with
+                # 'python abspath-to-filename' in cmdline arg list.
+                if cmd[0].endswith('.py'):
+                    foundfile = self.findfile(cmd[0], os.environ['PATH'])
+                    if foundfile is None:
+                        foundfile = self.findfile(cmd[0], os.environ['PYTHONPATH'])
+                        assert foundfile is not None, "Unable to locate file '%s' in PATH or PYTHONPATH" % cmd[0]
+                    del cmd[0:1]
+                    cmd.insert(0, foundfile)
+                    cmd.insert(0, sys.executable)
             self._process = Popen(cmd, stdout=PIPE, stderr=STDOUT, bufsize=4096)
         except OSError, e:
+            print("ERROR: '%s'" % e)
             assert False, "Unable to execute command '%s', is it in your PATH?" % cmd[0]
         self._ready()  # wait for it to initialize
 
@@ -514,7 +531,7 @@ class MessengerReceiver(MessengerApp):
     def _ready(self):
         """ wait for subscriptions to complete setup. """
         r = self._process.stdout.readline()
-        assert r == "READY\n", "Unexpected input while waiting for receiver to initialize: %s" % r
+        assert r == "READY" + os.linesep, "Unexpected input while waiting for receiver to initialize: %s" % r
 
 class MessengerSenderC(MessengerSender):
     def __init__(self):



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org