You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by fa...@apache.org on 2014/09/06 13:23:14 UTC

svn commit: r1622849 [2/9] - in /qpid/proton/branches/fadams-javascript-binding: ./ contrib/ contrib/proton-hawtdispatch/ contrib/proton-hawtdispatch/src/ contrib/proton-hawtdispatch/src/main/ contrib/proton-hawtdispatch/src/main/java/ contrib/proton-h...

Added: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/ChainedCallback.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/ChainedCallback.java?rev=1622849&view=auto
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/ChainedCallback.java (added)
+++ qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/ChainedCallback.java Sat Sep  6 11:23:10 2014
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.hawtdispatch.api;
+
+/**
+ * <p>
+ * Function Result that carries one value.
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract public class ChainedCallback<In,Out> implements Callback<In> {
+
+    public final Callback<Out> next;
+
+    public ChainedCallback(Callback<Out> next) {
+        this.next = next;
+    }
+
+    public void onFailure(Throwable value) {
+        next.onFailure(value);
+    }
+}

Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/ChainedCallback.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/ChainedCallback.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/DeliveryAttachment.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/DeliveryAttachment.java?rev=1622849&view=auto
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/DeliveryAttachment.java (added)
+++ qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/DeliveryAttachment.java Sat Sep  6 11:23:10 2014
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.api;
+
+import org.apache.qpid.proton.engine.Delivery;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract public class DeliveryAttachment {
+    abstract void processDelivery(Delivery delivery);
+}

Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/DeliveryAttachment.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/DeliveryAttachment.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Future.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Future.java?rev=1622849&view=auto
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Future.java (added)
+++ qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Future.java Sat Sep  6 11:23:10 2014
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.hawtdispatch.api;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * <p>A simplified Future function results interface.</p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface Future<T> {
+    T await() throws Exception;
+    T await(long amount, TimeUnit unit) throws Exception;
+    void then(Callback<T> callback);
+
+}

Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Future.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Future.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/MessageDelivery.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/MessageDelivery.java?rev=1622849&view=auto
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/MessageDelivery.java (added)
+++ qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/MessageDelivery.java Sat Sep  6 11:23:10 2014
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.api;
+
+import org.apache.qpid.proton.amqp.transport.DeliveryState;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.hawtdispatch.impl.Watch;
+import org.apache.qpid.proton.hawtdispatch.impl.WatchBase;
+import org.apache.qpid.proton.message.Message;
+import org.apache.qpid.proton.message.ProtonJMessage;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtdispatch.Task;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public abstract class MessageDelivery extends WatchBase {
+
+    final int initialSize;
+    private Message message;
+    private Buffer encoded;
+    public Delivery delivery;
+    private int sizeHint = 32;
+
+    static Buffer encode(Message message, int sizeHint) {
+        byte[] buffer = new byte[sizeHint];
+        int size = ((ProtonJMessage)message).encode2(buffer, 0, sizeHint);
+        if( size > sizeHint ) {
+            buffer = new byte[size];
+            size = message.encode(buffer, 0, size);
+        }
+        return new Buffer(buffer, 0, size);
+    }
+
+    static Message decode(Buffer buffer) {
+        Message msg = Message.Factory.create();
+        int offset = buffer.offset;
+        int len = buffer.length;
+        while( len > 0 ) {
+            int decoded = msg.decode(buffer.data, offset, len);
+            assert decoded > 0: "Make progress decoding the message";
+            offset += decoded;
+            len -= decoded;
+        }
+        return msg;
+    }
+
+    public MessageDelivery(Message message) {
+        this(message, encode(message, 32));
+    }
+
+    public MessageDelivery(Buffer encoded) {
+        this(null, encoded);
+    }
+
+    public MessageDelivery(Message message, Buffer encoded) {
+        this.message = message;
+        this.encoded = encoded;
+        sizeHint = this.encoded.length;
+        initialSize = sizeHint;
+    }
+
+    public Message getMessage() {
+        if( message == null ) {
+            message = decode(encoded);
+        }
+        return message;
+    }
+
+    public Buffer encoded() {
+        if( encoded == null ) {
+            encoded = encode(message, sizeHint);
+            sizeHint = encoded.length;
+        }
+        return encoded;
+    }
+
+    public boolean isSettled() {
+        return delivery!=null && delivery.isSettled();
+    }
+
+    public DeliveryState getRemoteState() {
+        return delivery==null ? null : delivery.getRemoteState();
+    }
+
+    public DeliveryState getLocalState() {
+        return delivery==null ? null : delivery.getLocalState();
+    }
+
+    public void onEncoded(final Callback<Void> cb) {
+        addWatch(new Watch() {
+            @Override
+            public boolean execute() {
+                if( delivery!=null ) {
+                    cb.onSuccess(null);
+                    return true;
+                }
+                return false;
+            }
+        });
+    }
+
+    /**
+     * @return the remote delivery state when it changes.
+     * @throws Exception
+     */
+    public DeliveryState getRemoteStateChange() throws Exception {
+        AmqpEndpointBase.assertNotOnDispatchQueue();
+        return getRemoteStateChangeFuture().await();
+    }
+
+    /**
+     * @return the future remote delivery state when it changes.
+     */
+    public Future<DeliveryState> getRemoteStateChangeFuture() {
+        final Promise<DeliveryState> rc = new Promise<DeliveryState>();
+        link().queue().execute(new Task() {
+            @Override
+            public void run() {
+                onRemoteStateChange(rc);
+            }
+        });
+        return rc;
+    }
+
+    abstract AmqpLink link();
+
+    boolean watchingRemoteStateChange;
+    public void onRemoteStateChange(final Callback<DeliveryState> cb) {
+        watchingRemoteStateChange = true;
+        final DeliveryState original = delivery.getRemoteState();
+        addWatch(new Watch() {
+            @Override
+            public boolean execute() {
+                if (original == null) {
+                    if( delivery.getRemoteState()!=null ) {
+                        cb.onSuccess(delivery.getRemoteState());
+                        watchingRemoteStateChange = false;
+                        return true;
+                    }
+                } else {
+                    if( !original.equals(delivery.getRemoteState()) ) {
+                        cb.onSuccess(delivery.getRemoteState());
+                        watchingRemoteStateChange = false;
+                        return true;
+                    }
+                }
+                return false;
+            }
+        });
+    }
+
+    /**
+     * @return the remote delivery state once settled.
+     * @throws Exception
+     */
+    public DeliveryState getSettle() throws Exception {
+        AmqpEndpointBase.assertNotOnDispatchQueue();
+        return getSettleFuture().await();
+    }
+
+    /**
+     * @return the future remote delivery state once the delivery is settled.
+     */
+    public Future<DeliveryState> getSettleFuture() {
+        final Promise<DeliveryState> rc = new Promise<DeliveryState>();
+        link().queue().execute(new Task() {
+            @Override
+            public void run() {
+                onSettle(rc);
+            }
+        });
+        return rc;
+    }
+
+    public void onSettle(final Callback<DeliveryState> cb) {
+        addWatch(new Watch() {
+            @Override
+            public boolean execute() {
+                if( delivery!=null && delivery.isSettled() ) {
+                    cb.onSuccess(delivery.getRemoteState());
+                    return true;
+                }
+                return false;
+            }
+        });
+    }
+
+    @Override
+    protected void fireWatches() {
+        super.fireWatches();
+    }
+
+    void incrementDeliveryCount() {
+        Message msg = getMessage();
+        msg.setDeliveryCount(msg.getDeliveryCount()+1);
+        encoded = null;
+    }
+
+    public void redeliver(boolean incrementDeliveryCounter) {
+        if( incrementDeliveryCounter ) {
+            incrementDeliveryCount();
+        }
+    }
+
+    public void settle() {
+        if( !delivery.isSettled() ) {
+            delivery.settle();
+        }
+    }
+}

Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/MessageDelivery.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/MessageDelivery.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Promise.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Promise.java?rev=1622849&view=auto
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Promise.java (added)
+++ qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Promise.java Sat Sep  6 11:23:10 2014
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.hawtdispatch.api;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class Promise<T> implements Callback<T>, Future<T> {
+
+    private final CountDownLatch latch = new CountDownLatch(1);
+    Callback<T> next;
+    Throwable error;
+    T value;
+
+    public void onFailure(Throwable value) {
+        Callback<T> callback = null;
+        synchronized(this)  {
+            error = value;
+            latch.countDown();
+            callback = next;
+        }
+        if( callback!=null ) {
+            callback.onFailure(value);
+        }
+    }
+
+    public void onSuccess(T value) {
+        Callback<T> callback = null;
+        synchronized(this)  {
+            this.value = value;
+            latch.countDown();
+            callback = next;
+        }
+        if( callback!=null ) {
+            callback.onSuccess(value);
+        }
+    }
+
+    public void then(Callback<T> callback) {
+        boolean fire = false;
+        synchronized(this)  {
+            next = callback;
+            if( latch.getCount() == 0 ) {
+                fire = true;
+            }
+        }
+        if( fire ) {
+            if( error!=null ) {
+                callback.onFailure(error);
+            } else {
+                callback.onSuccess(value);
+            }
+        }
+    }
+
+    public T await(long amount, TimeUnit unit) throws Exception {
+        if( latch.await(amount, unit) ) {
+            return get();
+        } else {
+            throw new TimeoutException();
+        }
+    }
+
+    public T await() throws Exception {
+        latch.await();
+        return get();
+    }
+
+    private T get() throws Exception {
+        Throwable e = error;
+        if( e !=null ) {
+            if( e instanceof RuntimeException ) {
+                throw (RuntimeException) e;
+            } else if( e instanceof Exception) {
+                throw (Exception) e;
+            } else if( e instanceof Error) {
+                throw (Error) e;
+            } else {
+                // don't expect to hit this case.
+                throw new RuntimeException(e);
+            }
+        }
+        return value;
+    }
+
+}

Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Promise.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Promise.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/QoS.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/QoS.java?rev=1622849&view=auto
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/QoS.java (added)
+++ qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/QoS.java Sat Sep  6 11:23:10 2014
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.hawtdispatch.api;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public enum QoS {
+    AT_MOST_ONCE,
+    AT_LEAST_ONCE,
+    EXACTLY_ONCE
+}

Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/QoS.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/QoS.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/TransportState.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/TransportState.java?rev=1622849&view=auto
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/TransportState.java (added)
+++ qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/TransportState.java Sat Sep  6 11:23:10 2014
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.api;
+
+/**
+* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+*/
+public enum TransportState {
+    CREATED,
+    CONNECTING,
+    CONNECTED,
+    DISCONNECTING,
+    DISCONNECTED
+}

Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/TransportState.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/TransportState.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpHeader.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpHeader.java?rev=1622849&view=auto
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpHeader.java (added)
+++ qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpHeader.java Sat Sep  6 11:23:10 2014
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.hawtdispatch.impl;
+
+import org.fusesource.hawtbuf.Buffer;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class AmqpHeader {
+
+    static final Buffer PREFIX = new Buffer(new byte[]{
+      'A', 'M', 'Q', 'P'
+    });
+
+    private Buffer buffer;
+
+    public AmqpHeader(){
+        this(new Buffer(new byte[]{
+          'A', 'M', 'Q', 'P', 0, 1, 0, 0
+        }));
+    }
+
+    public AmqpHeader(Buffer buffer){
+        setBuffer(buffer);
+    }
+
+    public int getProtocolId() {
+        return buffer.get(4) & 0xFF;
+    }
+    public void setProtocolId(int value) {
+        buffer.data[buffer.offset+4] = (byte) value;
+    }
+
+    public int getMajor() {
+        return buffer.get(5) & 0xFF;
+    }
+    public void setMajor(int value) {
+        buffer.data[buffer.offset+5] = (byte) value;
+    }
+
+    public int getMinor() {
+        return buffer.get(6) & 0xFF;
+    }
+    public void setMinor(int value) {
+        buffer.data[buffer.offset+6] = (byte) value;
+    }
+
+    public int getRevision() {
+        return buffer.get(7) & 0xFF;
+    }
+    public void setRevision(int value) {
+        buffer.data[buffer.offset+7] = (byte) value;
+    }
+
+    public Buffer getBuffer() {
+        return buffer;
+    }
+    public void setBuffer(Buffer value) {
+        if( !value.startsWith(PREFIX) || value.length()!=8 ) {
+            throw new IllegalArgumentException("Not an AMQP header buffer");
+        }
+        buffer = value.buffer();
+    }
+
+
+    @Override
+    public String toString() {
+        return buffer.toString();
+    }
+}

Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpHeader.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpHeader.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpListener.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpListener.java?rev=1622849&view=auto
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpListener.java (added)
+++ qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpListener.java Sat Sep  6 11:23:10 2014
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.impl;
+
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.engine.*;
+import org.fusesource.hawtdispatch.Task;
+
+import java.io.IOException;
+
+
+/**
+* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+*/
+public class AmqpListener {
+
+    public Sasl processSaslConnect(ProtonJTransport protonTransport) {
+        return null;
+    }
+
+    public Sasl processSaslEvent(Sasl sasl) {
+        return sasl;
+    }
+
+    public void processRemoteOpen(Endpoint endpoint, Task onComplete) {
+        ErrorCondition condition = endpoint.getCondition();
+        condition.setCondition(Symbol.valueOf("error"));
+        condition.setDescription("Not supported");
+        endpoint.close();
+        onComplete.run();
+    }
+
+    public void processRemoteClose(Endpoint endpoint, Task onComplete) {
+        endpoint.close();
+        onComplete.run();
+    }
+
+    public void processDelivery(Delivery delivery){
+    }
+
+    public void processTransportConnected() {
+    }
+
+    public void processTransportFailure(IOException e) {
+        this.processFailure(e);
+    }
+
+    public void processFailure(Throwable e) {
+        e.printStackTrace();
+    }
+
+    public void processRefill() {
+    }
+
+}

Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpListener.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpProtocolCodec.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpProtocolCodec.java?rev=1622849&view=auto
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpProtocolCodec.java (added)
+++ qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpProtocolCodec.java Sat Sep  6 11:23:10 2014
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.impl;
+
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtdispatch.transport.AbstractProtocolCodec;
+
+import java.io.IOException;
+
+/**
+ * A HawtDispatch protocol codec that encodes/decodes AMQP 1.0 frames.
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class AmqpProtocolCodec extends AbstractProtocolCodec {
+
+    int maxFrameSize = 4*1024*1024;
+
+    @Override
+    protected void encode(Object object) throws IOException {
+        nextWriteBuffer.write((Buffer) object);
+    }
+
+    @Override
+    protected Action initialDecodeAction() {
+        return new Action() {
+            public Object apply() throws IOException {
+                Buffer magic = readBytes(8);
+                if (magic != null) {
+                    nextDecodeAction = readFrameSize;
+                    return new AmqpHeader(magic);
+                } else {
+                    return null;
+                }
+            }
+        };
+    }
+
+    private final Action readFrameSize = new Action() {
+        public Object apply() throws IOException {
+            Buffer sizeBytes = peekBytes(4);
+            if (sizeBytes != null) {
+                int size = sizeBytes.bigEndianEditor().readInt();
+                if (size < 8) {
+                    throw new IOException(String.format("specified frame size %d is smaller than minimum frame size", size));
+                }
+                if( size > maxFrameSize ) {
+                    throw new IOException(String.format("specified frame size %d is larger than maximum frame size", size));
+                }
+
+                // TODO: check frame min and max size..
+                nextDecodeAction = readFrame(size);
+                return nextDecodeAction.apply();
+            } else {
+                return null;
+            }
+        }
+    };
+
+
+    private final Action readFrame(final int size) {
+        return new Action() {
+            public Object apply() throws IOException {
+                Buffer frameData = readBytes(size);
+                if (frameData != null) {
+                    nextDecodeAction = readFrameSize;
+                    return frameData;
+                } else {
+                    return null;
+                }
+            }
+        };
+    }
+
+    public int getReadBytesPendingDecode() {
+        return readBuffer.position() - readStart;
+    }
+
+    public void skipProtocolHeader() {
+        nextDecodeAction = readFrameSize;
+    }
+
+    public void readProtocolHeader() {
+        nextDecodeAction = initialDecodeAction();
+    }
+
+    public int getMaxFrameSize() {
+        return maxFrameSize;
+    }
+
+    public void setMaxFrameSize(int maxFrameSize) {
+        this.maxFrameSize = maxFrameSize;
+    }
+}

Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpProtocolCodec.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpProtocolCodec.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpTransport.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpTransport.java?rev=1622849&view=auto
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpTransport.java (added)
+++ qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpTransport.java Sat Sep  6 11:23:10 2014
@@ -0,0 +1,587 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.hawtdispatch.impl;
+
+import org.apache.qpid.proton.hawtdispatch.api.AmqpConnectOptions;
+import org.apache.qpid.proton.hawtdispatch.api.Callback;
+import org.apache.qpid.proton.hawtdispatch.api.ChainedCallback;
+import org.apache.qpid.proton.hawtdispatch.api.TransportState;
+import org.apache.qpid.proton.engine.*;
+import org.apache.qpid.proton.engine.impl.ByteBufferUtils;
+import org.apache.qpid.proton.engine.impl.ProtocolTracer;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtbuf.DataByteArrayOutputStream;
+import org.fusesource.hawtbuf.UTF8Buffer;
+import org.fusesource.hawtdispatch.*;
+import org.fusesource.hawtdispatch.transport.DefaultTransportListener;
+import org.fusesource.hawtdispatch.transport.SslTransport;
+import org.fusesource.hawtdispatch.transport.TcpTransport;
+import org.fusesource.hawtdispatch.transport.Transport;
+
+import java.io.IOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.LinkedList;
+
+import static org.apache.qpid.proton.hawtdispatch.api.TransportState.*;
+import static org.fusesource.hawtdispatch.Dispatch.NOOP;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class AmqpTransport extends WatchBase {
+
+    private TransportState state = CREATED;
+
+    final DispatchQueue queue;
+    final ProtonJConnection connection;
+    Transport hawtdispatchTransport;
+    ProtonJTransport protonTransport;
+    Throwable failure;
+    CustomDispatchSource<Defer,LinkedList<Defer>> defers;
+
+    public static final EnumSet<EndpointState> ALL_SET = EnumSet.allOf(EndpointState.class);
+
+    private AmqpTransport(DispatchQueue queue) {
+        this.queue = queue;
+        this.connection = (ProtonJConnection) Connection.Factory.create();
+
+        defers = Dispatch.createSource(EventAggregators.<Defer>linkedList(), this.queue);
+        defers.setEventHandler(new Task(){
+            public void run() {
+                for( Defer defer: defers.getData() ) {
+                    assert defer.defered = true;
+                    defer.defered = false;
+                    defer.run();
+                }
+            }
+        });
+        defers.resume();
+    }
+
+    static public AmqpTransport connect(AmqpConnectOptions options) {
+        AmqpConnectOptions opts = options.clone();
+        if( opts.getDispatchQueue() == null ) {
+            opts.setDispatchQueue(Dispatch.createQueue());
+        }
+        if( opts.getBlockingExecutor() == null ) {
+            opts.setBlockingExecutor(AmqpConnectOptions.getBlockingThreadPool());
+        }
+        return new AmqpTransport(opts.getDispatchQueue()).connecting(opts);
+    }
+
+    private AmqpTransport connecting(final AmqpConnectOptions options) {
+        assert state == CREATED;
+        try {
+            state = CONNECTING;
+            if( options.getLocalContainerId()!=null ) {
+                connection.setLocalContainerId(options.getLocalContainerId());
+            }
+            if( options.getRemoteContainerId()!=null ) {
+                connection.setContainer(options.getRemoteContainerId());
+            }
+            connection.setHostname(options.getHost().getHost());
+            Callback<Void> onConnect = new Callback<Void>() {
+                @Override
+                public void onSuccess(Void value) {
+                    if( state == CONNECTED ) {
+                        hawtdispatchTransport.setTransportListener(new AmqpTransportListener());
+                        fireWatches();
+                    }
+                }
+
+                @Override
+                public void onFailure(Throwable value) {
+                    if( state == CONNECTED || state == CONNECTING ) {
+                        failure = value;
+                        disconnect();
+                        fireWatches();
+                    }
+                }
+            };
+            if( options.getUser()!=null ) {
+                onConnect = new SaslClientHandler(options, onConnect);
+            }
+            createTransport(options, onConnect);
+        } catch (Throwable e) {
+            failure = e;
+        }
+        fireWatches();
+        return this;
+    }
+
+    public TransportState getState() {
+        return state;
+    }
+
+    /**
+     * Creates and start a transport to the AMQP server.  Passes it to the onConnect
+     * once the transport is connected.
+     *
+     * @param onConnect
+     * @throws Exception
+     */
+    void createTransport(AmqpConnectOptions options, final Callback<Void> onConnect) throws Exception {
+        final TcpTransport transport;
+        if( options.getSslContext() !=null ) {
+            SslTransport ssl = new SslTransport();
+            ssl.setSSLContext(options.getSslContext());
+            transport = ssl;
+        } else {
+            transport = new TcpTransport();
+        }
+
+        URI host = options.getHost();
+        if( host.getPort() == -1 ) {
+            if( options.getSslContext()!=null ) {
+                host = new URI(host.getScheme()+"://"+host.getHost()+":5672");
+            } else {
+                host = new URI(host.getScheme()+"://"+host.getHost()+":5671");
+            }
+        }
+
+
+        transport.setBlockingExecutor(options.getBlockingExecutor());
+        transport.setDispatchQueue(options.getDispatchQueue());
+
+        transport.setMaxReadRate(options.getMaxReadRate());
+        transport.setMaxWriteRate(options.getMaxWriteRate());
+        transport.setReceiveBufferSize(options.getReceiveBufferSize());
+        transport.setSendBufferSize(options.getSendBufferSize());
+        transport.setTrafficClass(options.getTrafficClass());
+        transport.setUseLocalHost(options.isUseLocalHost());
+        transport.connecting(host, options.getLocalAddress());
+
+        transport.setTransportListener(new DefaultTransportListener(){
+            public void onTransportConnected() {
+                if(state==CONNECTING) {
+                    state = CONNECTED;
+                    onConnect.onSuccess(null);
+                    transport.resumeRead();
+                }
+            }
+
+            public void onTransportFailure(final IOException error) {
+                if(state==CONNECTING) {
+                    onConnect.onFailure(error);
+                }
+            }
+
+        });
+        transport.connecting(host, options.getLocalAddress());
+        bind(transport);
+        transport.start(NOOP);
+    }
+
+    class SaslClientHandler extends ChainedCallback<Void, Void> {
+
+        private final AmqpConnectOptions options;
+
+        public SaslClientHandler(AmqpConnectOptions options, Callback<Void> next) {
+            super(next);
+            this.options = options;
+        }
+
+        public void onSuccess(final Void value) {
+            final Sasl s = protonTransport.sasl();
+            s.client();
+            pumpOut();
+            hawtdispatchTransport.setTransportListener(new AmqpTransportListener() {
+
+                Sasl sasl = s;
+
+                @Override
+                void process() {
+                    if (sasl != null) {
+                        sasl = processSaslEvent(sasl);
+                        if (sasl == null) {
+                            // once sasl handshake is done.. we need to read the protocol header again.
+                            ((AmqpProtocolCodec) hawtdispatchTransport.getProtocolCodec()).readProtocolHeader();
+                        }
+                    }
+                }
+
+                @Override
+                public void onTransportFailure(IOException error) {
+                    next.onFailure(error);
+                }
+
+                @Override
+                void onFailure(Throwable error) {
+                    next.onFailure(error);
+                }
+
+                boolean authSent = false;
+
+                private Sasl processSaslEvent(Sasl sasl) {
+                    if (sasl.getOutcome() == Sasl.SaslOutcome.PN_SASL_OK) {
+                        next.onSuccess(null);
+                        return null;
+                    }
+                    HashSet<String> mechanisims = new HashSet<String>(Arrays.asList(sasl.getRemoteMechanisms()));
+                    if (!authSent && !mechanisims.isEmpty()) {
+                        if (mechanisims.contains("PLAIN")) {
+                            authSent = true;
+                            DataByteArrayOutputStream os = new DataByteArrayOutputStream();
+                            try {
+                                os.writeByte(0);
+                                os.write(new UTF8Buffer(options.getUser()));
+                                os.writeByte(0);
+                                if (options.getPassword() != null) {
+                                    os.write(new UTF8Buffer(options.getPassword()));
+                                }
+                            } catch (IOException e) {
+                                throw new RuntimeException(e);
+                            }
+                            Buffer buffer = os.toBuffer();
+                            sasl.setMechanisms(new String[]{"PLAIN"});
+                            sasl.send(buffer.data, buffer.offset, buffer.length);
+                        } else if (mechanisims.contains("ANONYMOUS")) {
+                            authSent = true;
+                            sasl.setMechanisms(new String[]{"ANONYMOUS"});
+                            sasl.send(new byte[0], 0, 0);
+                        } else {
+                            next.onFailure(Support.illegalState("Remote does not support plain password authentication."));
+                            return null;
+                        }
+                    }
+                    return sasl;
+                }
+            });
+        }
+    }
+
+    class SaslServerListener extends AmqpTransportListener {
+        Sasl sasl;
+
+        @Override
+        public void onTransportCommand(Object command) {
+            try {
+                if (command.getClass() == AmqpHeader.class) {
+                    AmqpHeader header = (AmqpHeader)command;
+                    switch( header.getProtocolId() ) {
+                        case 3: // Client will be using SASL for auth..
+                            if( listener!=null ) {
+                                sasl = listener.processSaslConnect(protonTransport);
+                                break;
+                            }
+                        default:
+                            AmqpTransportListener listener = new AmqpTransportListener();
+                            hawtdispatchTransport.setTransportListener(listener);
+                            listener.onTransportCommand(command);
+                            return;
+                    }
+                    command = header.getBuffer();
+                }
+            } catch (Exception e) {
+                onFailure(e);
+            }
+            super.onTransportCommand(command);
+        }
+
+        @Override
+        void process() {
+            if (sasl != null) {
+                sasl = listener.processSaslEvent(sasl);
+            }
+            if (sasl == null) {
+                // once sasl handshake is done.. we need to read the protocol header again.
+                ((AmqpProtocolCodec) hawtdispatchTransport.getProtocolCodec()).readProtocolHeader();
+                hawtdispatchTransport.setTransportListener(new AmqpTransportListener());
+            }
+        }
+    }
+
+    static public AmqpTransport accept(Transport transport) {
+        return new AmqpTransport(transport.getDispatchQueue()).accepted(transport);
+    }
+
+    private AmqpTransport accepted(final Transport transport) {
+        state = CONNECTED;
+        bind(transport);
+        hawtdispatchTransport.setTransportListener(new SaslServerListener());
+        return this;
+    }
+
+    private void bind(final Transport transport) {
+        this.hawtdispatchTransport = transport;
+        this.protonTransport = (ProtonJTransport) org.apache.qpid.proton.engine.Transport.Factory.create();
+        this.protonTransport.bind(connection);
+        if( transport.getProtocolCodec()==null ) {
+            try {
+                transport.setProtocolCodec(new AmqpProtocolCodec());
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    public void defer(Defer defer) {
+        if( !defer.defered ) {
+            defer.defered = true;
+            defers.merge(defer);
+        }
+    }
+
+    public void pumpOut() {
+        assertExecuting();
+        defer(deferedPumpOut);
+    }
+
+    private Defer deferedPumpOut = new Defer() {
+        public void run() {
+            doPumpOut();
+        }
+    };
+
+    private void doPumpOut() {
+        switch(state) {
+            case CONNECTING:
+            case CONNECTED:
+                break;
+            default:
+                return;
+        }
+
+        int size = hawtdispatchTransport.getProtocolCodec().getWriteBufferSize();
+        byte data[] = new byte[size];
+        boolean done = false;
+        int pumped = 0;
+        while( !done && !hawtdispatchTransport.full() ) {
+            int count = protonTransport.output(data, 0, size);
+            if( count > 0 ) {
+                pumped += count;
+                boolean accepted = hawtdispatchTransport.offer(new Buffer(data, 0, count));
+                assert accepted: "Should be accepted since the transport was not full";
+            } else {
+                done = true;
+            }
+        }
+        if( pumped > 0 && !hawtdispatchTransport.full() ) {
+            listener.processRefill();
+        }
+    }
+
+    public Sasl sasl;
+    public void fireListenerEvents() {
+        fireWatches();
+
+        if( sasl!=null ) {
+            sasl = listener.processSaslEvent(sasl);
+            if( sasl==null ) {
+                // once sasl handshake is done.. we need to read the protocol header again.
+                ((AmqpProtocolCodec)this.hawtdispatchTransport.getProtocolCodec()).readProtocolHeader();
+            }
+        }
+
+        context(connection).fireListenerEvents(listener);
+
+        Session session = connection.sessionHead(ALL_SET, ALL_SET);
+        while(session != null)
+        {
+            context(session).fireListenerEvents(listener);
+            session = session.next(ALL_SET, ALL_SET);
+        }
+
+        Link link = connection.linkHead(ALL_SET, ALL_SET);
+        while(link != null)
+        {
+            context(link).fireListenerEvents(listener);
+            link = link.next(ALL_SET, ALL_SET);
+        }
+
+        Delivery delivery = connection.getWorkHead();
+        while(delivery != null)
+        {
+            listener.processDelivery(delivery);
+            delivery = delivery.getWorkNext();
+        }
+
+        listener.processRefill();
+    }
+
+
+    public ProtonJConnection connection() {
+        return connection;
+    }
+
+    AmqpListener listener = new AmqpListener();
+    public AmqpListener getListener() {
+        return listener;
+    }
+
+    public void setListener(AmqpListener listener) {
+        this.listener = listener;
+    }
+
+    public EndpointContext context(Endpoint endpoint) {
+        EndpointContext context = (EndpointContext) endpoint.getContext();
+        if( context == null ) {
+            context = new EndpointContext(this, endpoint);
+            endpoint.setContext(context);
+        }
+        return context;
+    }
+
+    class AmqpTransportListener extends DefaultTransportListener {
+
+        @Override
+        public void onTransportConnected() {
+            if( listener!=null ) {
+                listener.processTransportConnected();
+            }
+        }
+
+        @Override
+        public void onRefill() {
+            if( listener!=null ) {
+                listener.processRefill();
+            }
+        }
+
+        @Override
+        public void onTransportCommand(Object command) {
+            if( state != CONNECTED ) {
+                return;
+            }
+            try {
+                Buffer buffer;
+                if (command.getClass() == AmqpHeader.class) {
+                    buffer = ((AmqpHeader) command).getBuffer();
+                } else {
+                    buffer = (Buffer) command;
+                }
+                ByteBuffer bbuffer = buffer.toByteBuffer();
+                do {
+                  ByteBuffer input = protonTransport.getInputBuffer();
+                  ByteBufferUtils.pour(bbuffer, input);
+                  protonTransport.processInput();
+                } while (bbuffer.remaining() > 0);
+                process();
+                pumpOut();
+            } catch (Exception e) {
+                onFailure(e);
+            }
+        }
+
+        void process() {
+            fireListenerEvents();
+        }
+
+        @Override
+        public void onTransportFailure(IOException error) {
+            if( state==CONNECTED ) {
+                failure = error;
+                if( listener!=null ) {
+                    listener.processTransportFailure(error);
+                    fireWatches();
+                }
+            }
+        }
+
+        void onFailure(Throwable error) {
+            failure = error;
+            if( listener!=null ) {
+                listener.processFailure(error);
+                fireWatches();
+            }
+        }
+    }
+
+    public void disconnect() {
+        assertExecuting();
+        if( state == CONNECTING || state==CONNECTED) {
+            state = DISCONNECTING;
+            if( hawtdispatchTransport!=null ) {
+                hawtdispatchTransport.stop(new Task(){
+                    public void run() {
+                        state = DISCONNECTED;
+                        hawtdispatchTransport = null;
+                        protonTransport = null;
+                        fireWatches();
+                    }
+                });
+            }
+        }
+    }
+
+    public DispatchQueue queue() {
+        return queue;
+    }
+
+    public void assertExecuting() {
+        queue().assertExecuting();
+    }
+
+    public void onTransportConnected(final Callback<Void> cb) {
+        addWatch(new Watch() {
+            @Override
+            public boolean execute() {
+                if( failure !=null ) {
+                    cb.onFailure(failure);
+                    return true;
+                }
+                if( state!=CONNECTING ) {
+                    cb.onSuccess(null);
+                    return true;
+                }
+                return false;
+            }
+        });
+    }
+
+    public void onTransportDisconnected(final Callback<Void> cb) {
+        addWatch(new Watch() {
+            @Override
+            public boolean execute() {
+                if( state==DISCONNECTED ) {
+                    cb.onSuccess(null);
+                    return true;
+                }
+                return false;
+            }
+        });
+    }
+
+    public void onTransportFailure(final Callback<Throwable> cb) {
+        addWatch(new Watch() {
+            @Override
+            public boolean execute() {
+                if( failure!=null ) {
+                    cb.onSuccess(failure);
+                    return true;
+                }
+                return false;
+            }
+        });
+    }
+
+    public Throwable getFailure() {
+        return failure;
+    }
+
+    public void setProtocolTracer(ProtocolTracer protocolTracer) {
+        protonTransport.setProtocolTracer(protocolTracer);
+    }
+
+    public ProtocolTracer getProtocolTracer() {
+        return protonTransport.getProtocolTracer();
+    }
+}

Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpTransport.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpTransport.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Defer.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Defer.java?rev=1622849&view=auto
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Defer.java (added)
+++ qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Defer.java Sat Sep  6 11:23:10 2014
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.impl;
+
+import org.fusesource.hawtdispatch.Task;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract public class Defer extends Task {
+    boolean defered;
+}

Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Defer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Defer.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/EndpointContext.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/EndpointContext.java?rev=1622849&view=auto
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/EndpointContext.java (added)
+++ qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/EndpointContext.java Sat Sep  6 11:23:10 2014
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.impl;
+
+import org.apache.qpid.proton.engine.Endpoint;
+import org.apache.qpid.proton.engine.EndpointState;
+import org.fusesource.hawtdispatch.Task;
+
+/**
+* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+*/
+public class EndpointContext {
+
+    private final AmqpTransport transport;
+    private final Endpoint endpoint;
+    private Object attachment;
+    boolean listenerProcessing;
+
+    public EndpointContext(AmqpTransport transport, Endpoint endpoint) {
+        this.transport = transport;
+        this.endpoint = endpoint;
+    }
+
+    class ProcessedTask extends Task {
+        @Override
+        public void run() {
+            transport.assertExecuting();
+            listenerProcessing = false;
+            transport.pumpOut();
+        }
+    }
+
+    public void fireListenerEvents(AmqpListener listener) {
+        if( listener!=null && !listenerProcessing ) {
+            if( endpoint.getLocalState() == EndpointState.UNINITIALIZED &&
+                endpoint.getRemoteState() != EndpointState.UNINITIALIZED ) {
+                listenerProcessing = true;
+                listener.processRemoteOpen(endpoint, new ProcessedTask());
+            } else if( endpoint.getLocalState() == EndpointState.ACTIVE &&
+                endpoint.getRemoteState() == EndpointState.CLOSED ) {
+                listenerProcessing = true;
+                listener.processRemoteClose(endpoint, new ProcessedTask());
+            }
+        }
+        if( attachment !=null && attachment instanceof Task ) {
+            ((Task) attachment).run();
+        }
+    }
+
+    public Object getAttachment() {
+        return attachment;
+    }
+
+    public <T> T getAttachment(Class<T> clazz) {
+        return clazz.cast(getAttachment());
+    }
+
+    public void setAttachment(Object attachment) {
+        this.attachment = attachment;
+    }
+}

Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/EndpointContext.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/EndpointContext.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Support.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Support.java?rev=1622849&view=auto
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Support.java (added)
+++ qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Support.java Sat Sep  6 11:23:10 2014
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.impl;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class Support {
+
+    public static IllegalStateException illegalState(String msg) {
+        return (IllegalStateException) new IllegalStateException(msg).fillInStackTrace();
+    }
+
+    public static IllegalStateException createUnhandledEventError() {
+        return illegalState("Unhandled event.");
+    }
+
+    public static IllegalStateException createListenerNotSetError() {
+        return illegalState("No connection listener set to handle message received from the server.");
+    }
+
+    public static IllegalStateException createDisconnectedError() {
+        return illegalState("Disconnected");
+    }
+
+}

Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Support.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Support.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Watch.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Watch.java?rev=1622849&view=auto
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Watch.java (added)
+++ qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Watch.java Sat Sep  6 11:23:10 2014
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.impl;
+
+/**
+* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+*/
+public abstract class Watch {
+    /* returns true if the watch has been triggered */
+    public abstract boolean execute();
+}

Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Watch.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Watch.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/WatchBase.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/WatchBase.java?rev=1622849&view=auto
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/WatchBase.java (added)
+++ qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/WatchBase.java Sat Sep  6 11:23:10 2014
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.impl;
+
+import org.fusesource.hawtdispatch.Dispatch;
+import org.fusesource.hawtdispatch.Task;
+
+import java.util.LinkedList;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract public class WatchBase {
+
+    private LinkedList<Watch> watches = new LinkedList<Watch>();
+    protected void addWatch(final Watch task) {
+        watches.add(task);
+        fireWatches();
+    }
+
+    protected void fireWatches() {
+        if( !this.watches.isEmpty() ) {
+            Dispatch.getCurrentQueue().execute(new Task(){
+                @Override
+                public void run() {
+                    // Lets see if any of the watches are triggered.
+                    LinkedList<Watch> tmp = watches;
+                    watches = new LinkedList<Watch>();
+                    for (Watch task : tmp) {
+                        if( !task.execute() ) {
+                            watches.add(task);
+                        }
+                    }
+                }
+            });
+        }
+    }
+
+}

Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/WatchBase.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/WatchBase.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/api/SampleTest.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/api/SampleTest.java?rev=1622849&view=auto
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/api/SampleTest.java (added)
+++ qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/api/SampleTest.java Sat Sep  6 11:23:10 2014
@@ -0,0 +1,292 @@
+package org.apache.qpid.proton.hawtdispatch.api;
+
+import static junit.framework.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.net.URISyntaxException;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.apache.qpid.proton.amqp.messaging.Target;
+import org.apache.qpid.proton.amqp.transport.DeliveryState;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.hawtdispatch.test.MessengerServer;
+import org.apache.qpid.proton.message.Message;
+import org.fusesource.hawtdispatch.Task;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+/**
+ * Hello world!
+ *
+ */
+
+public class SampleTest {
+
+	private static final Logger _logger = Logger.getLogger(SampleTest.class.getName());
+
+	private MessengerServer server;
+
+	@Before
+	public void startServer() {
+		server = new MessengerServer();
+		server.start();
+	}
+
+	@After
+	public void stopServer() {
+		server.stop();
+	}
+
+	@Test
+	public void test() throws Exception {
+		int expected = 10;
+		final AtomicInteger countdown = new AtomicInteger(expected);
+		AmqpConnectOptions options = new AmqpConnectOptions();
+		final String container = UUID.randomUUID().toString();
+		try {
+			options.setHost(server.getHost(), server.getPort());
+			options.setLocalContainerId(container);
+			options.setUser("anonymous");
+			options.setPassword("changeit");
+		} catch (URISyntaxException e) {
+			e.printStackTrace();
+		}
+		final AmqpConnection conn = AmqpConnection.connect(options );
+		_logger.fine("connection queue");
+		conn.queue().execute(new Task() {
+
+			@Override
+			public void run() {
+				_logger.fine("connection running, setup callbacks");
+				conn.onTransportFailure(new Callback<Throwable>() {
+
+					@Override
+					public void onSuccess(Throwable value) {
+						_logger.fine("transportFailure Success? " + str(value));
+						conn.close();
+					}
+
+					@Override
+					public void onFailure(Throwable value) {
+						_logger.fine("transportFailure Trouble! " + str(value));
+						conn.close();
+					}
+				});
+
+				conn.onConnected(new Callback<Void>() {
+
+					@Override
+					public void onSuccess(Void value) {
+						_logger.fine("on connect Success! in container " + container);
+						final AmqpSession session = conn.createSession();
+						Target rqtarget = new Target();
+						rqtarget.setAddress("rq-tgt");
+						final AmqpSender sender = session.createSender(rqtarget, QoS.AT_LEAST_ONCE, "request-yyy");
+						Source rqsource = new Source();
+						rqsource.setAddress("rs-src");
+						sender.getEndpoint().setSource(rqsource);
+						Source rssource = new Source();
+						rssource.setAddress("rs-src");
+						final AmqpReceiver receiver = session.createReceiver(rssource , QoS.AT_LEAST_ONCE, 10, "response-yyy");
+						Target rstarget = new Target();
+						final String address = "rs-tgt";
+						rstarget.setAddress(address);
+						receiver.getEndpoint().setTarget(rstarget);
+						sender.onRemoteClose(new Callback<ErrorCondition>() {
+
+							@Override
+							public void onSuccess(ErrorCondition value) {
+								_logger.fine("sender remote close!" + str(value));
+							}
+
+							@Override
+							public void onFailure(Throwable value) {
+								_logger.fine("sender remote close Trouble!" + str(value));
+								conn.close();
+
+							}
+
+						});
+						receiver.onRemoteClose(new Callback<ErrorCondition>() {
+
+							@Override
+							public void onSuccess(ErrorCondition value) {
+								_logger.fine("receiver remote close!" + str(value));
+							}
+
+							@Override
+							public void onFailure(Throwable value) {
+								_logger.fine("receiver remote close Trouble!" + str(value));
+								conn.close();
+
+							}
+
+						});
+
+						final Task work = new Task() {
+
+							private AtomicInteger count = new AtomicInteger();
+
+							@Override
+							public void run() {
+								Message message = session.createTextMessage("hello world! " + String.valueOf(count.incrementAndGet()));
+								message.setAddress("amqp://joze/rq-src");
+								String reply_to = "amqp://" + container + "/" + address;
+								message.setReplyTo(reply_to);
+								message.setCorrelationId("correlator");
+								final MessageDelivery md = sender.send(message);
+								md.onRemoteStateChange(new Callback<DeliveryState>() {
+
+									@Override
+									public void onSuccess(DeliveryState value) {
+										_logger.fine("delivery remote state change! " + str(value) +
+												" local: "+ str(md.getLocalState()) +
+												" remote: " + str(md.getRemoteState()));
+									}
+
+									@Override
+									public void onFailure(Throwable value) {
+										_logger.fine("remote state change Trouble!" + str(value));
+										conn.close();
+									}
+
+								});
+								md.onSettle(new Callback<DeliveryState>() {
+
+									@Override
+									public void onSuccess(DeliveryState value) {
+										_logger.fine("delivery settled! " + str(value) +
+												" local: "+ str(md.getLocalState()) +
+												" remote: " + str(md.getRemoteState()));
+										_logger.fine("sender settle mode state " +
+												" local receiver " + str(sender.getEndpoint().getReceiverSettleMode()) +
+												" local sender " + str(sender.getEndpoint().getSenderSettleMode()) +
+												" remote receiver " + str(sender.getEndpoint().getRemoteReceiverSettleMode()) +
+												" remote sender " + str(sender.getEndpoint().getRemoteSenderSettleMode()) +
+												""
+												);
+									}
+
+									@Override
+									public void onFailure(Throwable value) {
+										_logger.fine("delivery sending Trouble!" + str(value));
+										conn.close();
+									}
+								});
+							}
+
+						};
+						receiver.setDeliveryListener(new AmqpDeliveryListener() {
+
+							@Override
+							public void onMessageDelivery(
+									MessageDelivery delivery) {
+								Message message = delivery.getMessage();
+								_logger.fine("incoming message delivery! " +
+										" local " + str(delivery.getLocalState()) +
+										" remote " + str(delivery.getRemoteState()) +
+										" message " + str(message.getBody()) +
+										"");
+								delivery.onSettle(new Callback<DeliveryState>() {
+
+									@Override
+									public void onSuccess(DeliveryState value) {
+										_logger.fine("incoming message settled! ");
+										int i = countdown.decrementAndGet();
+										if ( i > 0 ) {
+											_logger.fine("More work " + str(i));
+											work.run();
+										} else {
+											conn.queue().executeAfter(100, TimeUnit.MILLISECONDS, new Task() {
+
+												@Override
+												public void run() {
+													_logger.fine("stopping sender");
+													sender.close();												
+												}
+											});
+											conn.queue().executeAfter(200, TimeUnit.MILLISECONDS, new Task() {
+
+												@Override
+												public void run() {
+													_logger.fine("stopping receiver");
+													receiver.close();
+
+												}
+											});
+											conn.queue().executeAfter(300, TimeUnit.MILLISECONDS, new Task() {
+
+												@Override
+												public void run() {
+													_logger.fine("stopping session");
+													session.close();
+
+												}
+											});
+											conn.queue().executeAfter(400, TimeUnit.MILLISECONDS, new Task() {
+
+												@Override
+												public void run() {
+													_logger.fine("stopping connection");
+													conn.close();
+
+												}
+											});
+										}
+									}
+
+									@Override
+									public void onFailure(Throwable value) {
+										_logger.fine("trouble settling incoming message " + str(value));
+										conn.close();
+									}
+								});
+								delivery.settle();
+							}
+
+						});
+
+						// start the receiver
+						receiver.resume();
+
+						// send first message
+						conn.queue().execute(work);
+					}
+
+					@Override
+					public void onFailure(Throwable value) {
+						_logger.fine("on connect Failure?" + str(value));
+						conn.close();
+					}
+				});
+				_logger.fine("connection setup done");
+
+
+			}
+
+		});
+		try {
+			_logger.fine("Waiting...");
+			Future<Void> disconnectedFuture = conn.getDisconnectedFuture();
+			disconnectedFuture.await(10, TimeUnit.SECONDS);
+			_logger.fine("done");
+			assertEquals(expected, server.getMessagesReceived());
+		} catch (Exception e) {
+			_logger.log(Level.SEVERE, "Test failed, possibly due to timeout", e);
+			throw e;
+		}
+	}
+
+	private String str(Object value) {
+		if (value == null)
+			return "null";
+		return value.toString();
+	}
+
+
+}

Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/api/SampleTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/api/SampleTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/test/MessengerServer.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/test/MessengerServer.java?rev=1622849&view=auto
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/test/MessengerServer.java (added)
+++ qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/test/MessengerServer.java Sat Sep  6 11:23:10 2014
@@ -0,0 +1,135 @@
+package org.apache.qpid.proton.hawtdispatch.test;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.qpid.proton.InterruptException;
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.amqp.messaging.Section;
+import org.apache.qpid.proton.message.Message;
+import org.apache.qpid.proton.messenger.Messenger;
+import org.apache.qpid.proton.messenger.Tracker;
+
+public class MessengerServer {
+	public static final String REJECT_ME = "*REJECT-ME*";
+	private int timeout = 1000;
+	private String host = "127.0.0.1";
+	private int port = 55555;
+	private Messenger msgr;
+	private AtomicInteger messagesReceived = new AtomicInteger(0);
+	private AtomicInteger messagesSent = new AtomicInteger(0);
+	private AtomicBoolean serverShouldRun = new AtomicBoolean();
+	private AtomicReference<Throwable> issues = new AtomicReference<Throwable>();
+	private Thread thread;
+	private CountDownLatch serverStart;
+
+	public MessengerServer() {
+	}
+	public void start() {
+		if (!serverShouldRun.compareAndSet(false, true)) {
+			throw new IllegalStateException("started twice");
+		}
+		msgr = Proton.messenger();
+		serverStart = new CountDownLatch(1);
+		thread = new Thread(new Runnable() {
+
+			@Override
+			public void run() {
+				try {
+					msgr.start();
+					msgr.subscribe("amqp://~"+host+":"+String.valueOf(port));
+					serverStart.countDown();
+					try {
+						while(serverShouldRun.get()) {
+							msgr.recv(100);
+							while (msgr.incoming() > 0) {
+								Message msg = msgr.get();
+								messagesReceived.incrementAndGet();
+								Tracker tracker = msgr.incomingTracker();
+								if (REJECT_ME.equals(msg.getBody())) {
+									msgr.reject(tracker , 0);
+								} else {
+									msgr.accept(tracker, 0);
+								}
+								String reply_to = msg.getReplyTo();
+								if (reply_to != null) {
+									msg.setAddress(reply_to);
+									msgr.put(msg);
+									msgr.settle(msgr.outgoingTracker(), 0);
+								}
+							}
+						}
+					} finally {
+						msgr.stop();
+					}
+				} catch (InterruptException ex) {
+					// we're done
+				} catch (Exception ex) {
+					issues.set(ex);
+				}
+			}
+
+		});
+		thread.setName("MessengerServer");
+		thread.setDaemon(true);
+		thread.start();
+		try {
+			serverStart.await();
+		} catch (InterruptedException e) {
+			msgr.interrupt();
+		}
+	}
+
+	public void stop() {
+		if (!serverShouldRun.compareAndSet(true, false)) {
+			return;
+		}
+		if (serverStart.getCount() == 0)
+			msgr.interrupt();
+		try {
+			thread.join(timeout);
+		} catch (InterruptedException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		}
+		thread = null;
+		if (!msgr.stopped())
+			msgr.stop();
+		Throwable throwable = issues.get();
+		if (throwable != null)
+			throw new RuntimeException("Messenger server had problems", throwable);
+	}
+
+	public String getHost() {
+		return host;
+	}
+
+	public int getPort() {
+		return port;
+	}
+
+	public void setHost(String host) {
+		this.host = host;
+	}
+
+	public void setPort(int port) {
+		this.port = port;
+	}
+	public int getTimeout() {
+		return timeout;
+	}
+	public void setTimeout(int timeout) {
+		this.timeout = timeout;
+	}
+
+	public int getMessagesReceived() {
+		return messagesReceived.get();
+	}
+
+	public int getMessagesSent() {
+		return messagesSent.get();
+	}
+}

Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/test/MessengerServer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/proton/branches/fadams-javascript-binding/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/test/MessengerServer.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org