You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2012/11/28 22:48:40 UTC

svn commit: r1414946 [1/2] - in /qpid/proton/trunk/proton-j: ./ contrib/proton-hawtdispatch/ contrib/proton-hawtdispatch/src/ contrib/proton-hawtdispatch/src/main/ contrib/proton-hawtdispatch/src/main/java/ contrib/proton-hawtdispatch/src/main/java/org...

Author: tross
Date: Wed Nov 28 21:48:33 2012
New Revision: 1414946

URL: http://svn.apache.org/viewvc?rev=1414946&view=rev
Log:
PROTON-22 - HawtDispatch Module
Contributed by Hiram Chirino

Added:
    qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/
    qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/pom.xml   (with props)
    qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/
    qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/
    qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/
    qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/
    qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/
    qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/
    qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/
    qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/
    qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/
    qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpConnectOptions.java
    qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpConnection.java
    qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpDeliveryListener.java
    qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpEndpointBase.java
    qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpLink.java
    qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpReceiver.java
    qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSender.java
    qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSession.java
    qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Callback.java
    qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/ChainedCallback.java
    qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/DeliveryAttachment.java
    qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Future.java
    qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/MessageDelivery.java
    qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Promise.java
    qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/QoS.java
    qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/TransportState.java
    qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/
    qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpHeader.java
    qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpListener.java
    qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpProtocolCodec.java
    qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpTransport.java
    qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Defer.java
    qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/DroppingWritableBuffer.java
    qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/EndpointContext.java
    qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Support.java
    qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Watch.java
    qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/WatchBase.java
Modified:
    qpid/proton/trunk/proton-j/pom.xml

Added: qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/pom.xml
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/pom.xml?rev=1414946&view=auto
==============================================================================
Binary file - no diff available.

Propchange: qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/pom.xml
------------------------------------------------------------------------------
    svn:mime-type = application/xml

Added: qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpConnectOptions.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpConnectOptions.java?rev=1414946&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpConnectOptions.java (added)
+++ qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpConnectOptions.java Wed Nov 28 21:48:33 2012
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.hawtdispatch.api;
+
+import org.fusesource.hawtdispatch.DispatchQueue;
+import org.fusesource.hawtdispatch.transport.TcpTransport;
+
+import javax.net.ssl.SSLContext;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.*;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class AmqpConnectOptions implements Cloneable {
+
+    private static final long KEEP_ALIVE = Long.parseLong(System.getProperty("amqp.thread.keep_alive", ""+1000));
+    private static final long STACK_SIZE = Long.parseLong(System.getProperty("amqp.thread.stack_size", ""+1024*512));
+    private static ThreadPoolExecutor blockingThreadPool;
+
+    public synchronized static ThreadPoolExecutor getBlockingThreadPool() {
+        if( blockingThreadPool == null ) {
+            blockingThreadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, KEEP_ALIVE, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
+                    public Thread newThread(Runnable r) {
+                        Thread rc = new Thread(null, r, "AMQP Task", STACK_SIZE);
+                        rc.setDaemon(true);
+                        return rc;
+                    }
+                }) {
+
+                    @Override
+                    public void shutdown() {
+                        // we don't ever shutdown since we are shared..
+                    }
+
+                    @Override
+                    public List<Runnable> shutdownNow() {
+                        // we don't ever shutdown since we are shared..
+                        return Collections.emptyList();
+                    }
+                };
+        }
+        return blockingThreadPool;
+    }
+    public synchronized static void setBlockingThreadPool(ThreadPoolExecutor pool) {
+        blockingThreadPool = pool;
+    }
+
+    private static final URI DEFAULT_HOST;
+    static {
+        try {
+            DEFAULT_HOST = new URI("tcp://localhost");
+        } catch (URISyntaxException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    URI host = DEFAULT_HOST;
+    URI localAddress;
+    SSLContext sslContext;
+    DispatchQueue dispatchQueue;
+    Executor blockingExecutor;
+    int maxReadRate;
+    int maxWriteRate;
+    int trafficClass = TcpTransport.IPTOS_THROUGHPUT;
+    boolean useLocalHost;
+    int receiveBufferSize = 1024*64;
+    int sendBufferSize = 1024*64;
+    String localContainerId;
+    String remoteContainerId;
+    String user;
+    String password;
+
+
+    @Override
+    public AmqpConnectOptions clone() {
+        try {
+            return (AmqpConnectOptions) super.clone();
+        } catch (CloneNotSupportedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public String getLocalContainerId() {
+        return localContainerId;
+    }
+
+    public void setLocalContainerId(String localContainerId) {
+        this.localContainerId = localContainerId;
+    }
+
+    public String getPassword() {
+        return password;
+    }
+
+    public void setPassword(String password) {
+        this.password = password;
+    }
+
+    public String getRemoteContainerId() {
+        return remoteContainerId;
+    }
+
+    public void setRemoteContainerId(String remoteContainerId) {
+        this.remoteContainerId = remoteContainerId;
+    }
+
+    public String getUser() {
+        return user;
+    }
+
+    public void setUser(String user) {
+        this.user = user;
+    }
+
+    public Executor getBlockingExecutor() {
+        return blockingExecutor;
+    }
+
+    public void setBlockingExecutor(Executor blockingExecutor) {
+        this.blockingExecutor = blockingExecutor;
+    }
+
+    public DispatchQueue getDispatchQueue() {
+        return dispatchQueue;
+    }
+
+    public void setDispatchQueue(DispatchQueue dispatchQueue) {
+        this.dispatchQueue = dispatchQueue;
+    }
+
+    public URI getLocalAddress() {
+        return localAddress;
+    }
+
+    public void setLocalAddress(String localAddress) throws URISyntaxException {
+        this.setLocalAddress(new URI(localAddress));
+    }
+    public void setLocalAddress(URI localAddress) {
+        this.localAddress = localAddress;
+    }
+
+    public int getMaxReadRate() {
+        return maxReadRate;
+    }
+
+    public void setMaxReadRate(int maxReadRate) {
+        this.maxReadRate = maxReadRate;
+    }
+
+    public int getMaxWriteRate() {
+        return maxWriteRate;
+    }
+
+    public void setMaxWriteRate(int maxWriteRate) {
+        this.maxWriteRate = maxWriteRate;
+    }
+
+    public int getReceiveBufferSize() {
+        return receiveBufferSize;
+    }
+
+    public void setReceiveBufferSize(int receiveBufferSize) {
+        this.receiveBufferSize = receiveBufferSize;
+    }
+
+    public URI getHost() {
+        return host;
+    }
+    public void setHost(String host, int port) throws URISyntaxException {
+        this.setHost(new URI("tcp://"+host+":"+port));
+    }
+    public void setHost(String host) throws URISyntaxException {
+        this.setHost(new URI(host));
+    }
+    public void setHost(URI host) {
+        this.host = host;
+    }
+
+    public int getSendBufferSize() {
+        return sendBufferSize;
+    }
+
+    public void setSendBufferSize(int sendBufferSize) {
+        this.sendBufferSize = sendBufferSize;
+    }
+
+    public SSLContext getSslContext() {
+        return sslContext;
+    }
+
+    public void setSslContext(SSLContext sslContext) {
+        this.sslContext = sslContext;
+    }
+
+    public int getTrafficClass() {
+        return trafficClass;
+    }
+
+    public void setTrafficClass(int trafficClass) {
+        this.trafficClass = trafficClass;
+    }
+
+    public boolean isUseLocalHost() {
+        return useLocalHost;
+    }
+
+    public void setUseLocalHost(boolean useLocalHost) {
+        this.useLocalHost = useLocalHost;
+    }
+}

Added: qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpConnection.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpConnection.java?rev=1414946&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpConnection.java (added)
+++ qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpConnection.java Wed Nov 28 21:48:33 2012
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.api;
+
+import org.apache.qpid.proton.hawtdispatch.impl.AmqpListener;
+import org.apache.qpid.proton.hawtdispatch.impl.AmqpTransport;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Endpoint;
+import org.apache.qpid.proton.engine.EndpointError;
+import org.apache.qpid.proton.engine.impl.ConnectionImpl;
+import org.apache.qpid.proton.engine.impl.ProtocolTracer;
+import org.apache.qpid.proton.engine.impl.SessionImpl;
+import org.fusesource.hawtdispatch.DispatchQueue;
+import org.fusesource.hawtdispatch.Task;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class AmqpConnection extends AmqpEndpointBase  {
+
+    AmqpTransport transport;
+    ConnectionImpl connection;
+    HashSet<AmqpSender> senders = new HashSet<AmqpSender>();
+    boolean closing = false;
+
+    public static AmqpConnection connect(AmqpConnectOptions options) {
+        return new AmqpConnection(options);
+    }
+
+    private AmqpConnection(AmqpConnectOptions options) {
+        transport = AmqpTransport.connect(options);
+        transport.setListener(new AmqpListener() {
+            @Override
+            public void processDelivery(Delivery delivery) {
+                Attachment attachment = (Attachment) getTransport().context(delivery.getLink()).getAttachment();
+                AmqpLink link = (AmqpLink) attachment.endpoint();
+                link.processDelivery(delivery);
+            }
+
+            @Override
+            public void processRefill() {
+                for(AmqpSender sender: new ArrayList<AmqpSender>(senders)) {
+                    sender.pumpDeliveries();
+                }
+                pumpOut();
+            }
+
+        });
+        connection = transport.connection();
+        connection.open();
+        attach();
+    }
+
+    public void waitForConnected() throws Exception {
+        assertNotOnDispatchQueue();
+        getConnectedFuture().await();
+    }
+
+    public Future<Void> getConnectedFuture() {
+        final Promise<Void> rc = new Promise<Void>();
+        queue().execute(new Task() {
+            @Override
+            public void run() {
+                onConnected(rc);
+            }
+        });
+        return rc;
+    }
+
+    public void onConnected(Callback<Void> cb) {
+        transport.onTransportConnected(cb);
+    }
+
+    @Override
+    protected Endpoint getEndpoint() {
+        return connection;
+    }
+
+    @Override
+    protected AmqpConnection getConnection() {
+        return this;
+    }
+
+    @Override
+    protected AmqpEndpointBase getParent() {
+        return null;
+    }
+
+    public AmqpSession createSession() {
+        assertExecuting();
+        SessionImpl session = connection.session();
+        session.open();
+        pumpOut();
+        return new AmqpSession(this, session);
+    }
+
+    public int getMaxSessions() {
+        return connection.getMaxChannels();
+    }
+
+    public void disconnect() {
+        closing = true;
+        transport.disconnect();
+    }
+
+    public void waitForDisconnected() throws Exception {
+        assertNotOnDispatchQueue();
+        getDisconnectedFuture().await();
+    }
+
+    public Future<Void> getDisconnectedFuture() {
+        final Promise<Void> rc = new Promise<Void>();
+        queue().execute(new Task() {
+            @Override
+            public void run() {
+                onDisconnected(rc);
+            }
+        });
+        return rc;
+    }
+
+    public void onDisconnected(Callback<Void> cb) {
+        transport.onTransportDisconnected(cb);
+    }
+
+    public TransportState getTransportState() {
+        return transport.getState();
+    }
+
+    public Throwable getTransportFailure() {
+        return transport.getFailure();
+    }
+
+    public Future<Throwable> getTransportFailureFuture() {
+        final Promise<Throwable> rc = new Promise<Throwable>();
+        queue().execute(new Task() {
+            @Override
+            public void run() {
+                onTransportFailure(rc);
+            }
+        });
+        return rc;
+    }
+
+    public void onTransportFailure(Callback<Throwable> cb) {
+        transport.onTransportFailure(cb);
+    }
+
+    @Override
+    public DispatchQueue queue() {
+        return super.queue();
+    }
+
+    public void setProtocolTracer(ProtocolTracer protocolTracer) {
+        transport.setProtocolTracer(protocolTracer);
+    }
+
+    public ProtocolTracer getProtocolTracer() {
+        return transport.getProtocolTracer();
+    }
+
+    /**
+     * Once the remote end, closes the transport is disconnected.
+     */
+    @Override
+    public void close() {
+        super.close();
+        onRemoteClose(new Callback<EndpointError>() {
+            @Override
+            public void onSuccess(EndpointError value) {
+                disconnect();
+            }
+
+            @Override
+            public void onFailure(Throwable value) {
+                disconnect();
+            }
+        });
+    }
+}

Added: qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpDeliveryListener.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpDeliveryListener.java?rev=1414946&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpDeliveryListener.java (added)
+++ qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpDeliveryListener.java Wed Nov 28 21:48:33 2012
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.api;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface AmqpDeliveryListener {
+
+    /**
+     * Caller should suspend/resume the AmqpReceiver to
+     * flow control the delivery of messages.
+     *
+     * @param delivery
+     */
+    void onMessageDelivery(MessageDelivery delivery);
+}

Added: qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpEndpointBase.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpEndpointBase.java?rev=1414946&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpEndpointBase.java (added)
+++ qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpEndpointBase.java Wed Nov 28 21:48:33 2012
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.api;
+
+import org.apache.qpid.proton.hawtdispatch.impl.*;
+import org.apache.qpid.proton.engine.Endpoint;
+import org.apache.qpid.proton.engine.EndpointError;
+import org.apache.qpid.proton.engine.EndpointState;
+import org.fusesource.hawtdispatch.Dispatch;
+import org.fusesource.hawtdispatch.DispatchQueue;
+import org.fusesource.hawtdispatch.Task;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract class AmqpEndpointBase extends WatchBase {
+    abstract protected Endpoint getEndpoint();
+    abstract protected AmqpEndpointBase getParent();
+
+    protected AmqpConnection getConnection() {
+        return getParent().getConnection();
+    }
+
+    protected AmqpTransport getTransport() {
+        return getConnection().transport;
+    }
+
+    protected DispatchQueue queue() {
+        return getTransport().queue();
+    }
+
+    protected void assertExecuting() {
+        getTransport().assertExecuting();
+    }
+
+    public void waitForRemoteOpen() throws Exception {
+        assertNotOnDispatchQueue();
+        getRemoteOpenFuture().await();
+    }
+
+    public Future<Void> getRemoteOpenFuture() {
+        final Promise<Void> rc = new Promise<Void>();
+        queue().execute(new Task() {
+            @Override
+            public void run() {
+                onRemoteOpen(rc);
+            }
+        });
+        return rc;
+    }
+
+    public void onRemoteOpen(final Callback<Void> cb) {
+        addWatch(new Watch() {
+            @Override
+            public boolean execute() {
+                switch (getEndpoint().getRemoteState()) {
+                    case ACTIVE:
+                        cb.onSuccess(null);
+                        return true;
+                    case CLOSED:
+                        cb.onFailure(Support.illegalState("closed"));
+                        return true;
+                }
+                return false;
+            }
+        });
+    }
+
+    public EndpointError waitForRemoteClose() throws Exception {
+        assertNotOnDispatchQueue();
+        return getRemoteCloseFuture().await();
+    }
+
+    public Future<EndpointError> getRemoteCloseFuture() {
+        final Promise<EndpointError> rc = new Promise<EndpointError>();
+        queue().execute(new Task() {
+            @Override
+            public void run() {
+                onRemoteClose(rc);
+            }
+        });
+        return rc;
+    }
+
+    public void onRemoteClose(final Callback<EndpointError> cb) {
+        addWatch(new Watch() {
+            @Override
+            public boolean execute() {
+                if (getEndpoint().getRemoteState() == EndpointState.CLOSED) {
+                    cb.onSuccess(getEndpoint().getRemoteError());
+                    return true;
+                }
+                return false;
+            }
+        });
+    }
+
+    public void close() {
+        getEndpoint().close();
+        pumpOut();
+    }
+
+    public EndpointState getRemoteState() {
+        return getEndpoint().getRemoteState();
+    }
+
+    public EndpointError getRemoteError() {
+        return getEndpoint().getRemoteError();
+    }
+
+    static protected EndpointError toError(Throwable value) {
+        return new EndpointError("error", value.toString());
+    }
+
+    class Attachment extends Task {
+        AmqpEndpointBase endpoint() {
+            return AmqpEndpointBase.this;
+        }
+
+        @Override
+        public void run() {
+            fireWatches();
+        }
+    }
+
+    protected void attach() {
+        getTransport().context(getEndpoint()).setAttachment(new Attachment());
+    }
+
+    protected void defer(Defer defer) {
+        getTransport().defer(defer);
+    }
+
+    protected void pumpOut() {
+        getTransport().pumpOut();
+    }
+
+    static protected void assertNotOnDispatchQueue() {
+        assert Dispatch.getCurrentQueue()==null : "Not allowed to be called when executing on a dispatch queue";
+    }
+
+}

Added: qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpLink.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpLink.java?rev=1414946&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpLink.java (added)
+++ qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpLink.java Wed Nov 28 21:48:33 2012
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.api;
+
+import org.apache.qpid.proton.engine.Delivery;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract public class AmqpLink extends AmqpEndpointBase {
+    abstract protected void processDelivery(Delivery delivery);
+}

Added: qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpReceiver.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpReceiver.java?rev=1414946&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpReceiver.java (added)
+++ qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpReceiver.java Wed Nov 28 21:48:33 2012
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.api;
+
+import org.apache.qpid.proton.hawtdispatch.impl.Defer;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.impl.DeliveryImpl;
+import org.apache.qpid.proton.engine.impl.ReceiverImpl;
+import org.apache.qpid.proton.type.messaging.Accepted;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtbuf.ByteArrayOutputStream;
+
+import java.util.LinkedList;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class AmqpReceiver extends AmqpLink {
+
+    final AmqpSession parent;
+    final ReceiverImpl receiver;
+
+    public AmqpReceiver(AmqpSession parent, ReceiverImpl receiver, QoS qos) {
+        this.parent = parent;
+        this.receiver = receiver;
+        attach();
+    }
+
+    @Override
+    protected ReceiverImpl getEndpoint() {
+        return receiver;
+    }
+    @Override
+    protected AmqpSession getParent() {
+        return parent;
+    }
+
+    ByteArrayOutputStream current = new ByteArrayOutputStream();
+
+    @Override
+    protected void processDelivery(Delivery delivery) {
+        if( !delivery.isReadable() ) {
+            System.out.println("it was not readable!");
+            return;
+        }
+
+        if( current==null ) {
+            current = new ByteArrayOutputStream();
+        }
+
+        int count;
+        byte data[] = new byte[1024*4];
+        while( (count = receiver.recv(data, 0, data.length)) > 0 ) {
+            current.write(data, 0, count);
+        }
+
+        // Expecting more deliveries..
+        if( count == 0 ) {
+            return;
+        }
+
+        receiver.advance();
+        Buffer buffer = current.toBuffer();
+        current = null;
+        onMessage(delivery, buffer);
+
+    }
+
+    LinkedList<MessageDelivery> inbound = new LinkedList<MessageDelivery>();
+
+    protected void onMessage(Delivery delivery, Buffer buffer) {
+        MessageDelivery md = new MessageDelivery(buffer) {
+            @Override
+            AmqpLink link() {
+                return AmqpReceiver.this;
+            }
+
+            @Override
+            public void settle() {
+                if( !delivery.isSettled() ) {
+                    delivery.disposition(new Accepted());
+                    delivery.settle();
+                }
+                drain();
+            }
+        };
+        md.delivery = (DeliveryImpl) delivery;
+        delivery.setContext(md);
+        inbound.add(md);
+        drainInbound();
+    }
+
+    public void drain() {
+        defer(deferedDrain);
+    }
+
+    Defer deferedDrain = new Defer(){
+        public void run() {
+            drainInbound();
+        }
+    };
+    int resumed = 0;
+
+    public void resume() {
+        resumed++;
+    }
+    public void suspend() {
+        resumed--;
+    }
+
+    AmqpDeliveryListener deliveryListener;
+    private void drainInbound() {
+        while( deliveryListener!=null && !inbound.isEmpty() && resumed>0) {
+            deliveryListener.onMessageDelivery(inbound.removeFirst());
+            receiver.flow(1);
+        }
+    }
+
+    public AmqpDeliveryListener getDeliveryListener() {
+        return deliveryListener;
+    }
+
+    public void setDeliveryListener(AmqpDeliveryListener deliveryListener) {
+        this.deliveryListener = deliveryListener;
+        drainInbound();
+    }
+}

Added: qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSender.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSender.java?rev=1414946&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSender.java (added)
+++ qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSender.java Wed Nov 28 21:48:33 2012
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.api;
+
+import org.apache.qpid.proton.hawtdispatch.impl.Defer;
+import org.apache.qpid.proton.hawtdispatch.impl.Watch;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.impl.DeliveryImpl;
+import org.apache.qpid.proton.engine.impl.SenderImpl;
+import org.apache.qpid.proton.message.Message;
+import org.apache.qpid.proton.type.messaging.Accepted;
+import org.apache.qpid.proton.type.messaging.Modified;
+import org.apache.qpid.proton.type.messaging.Rejected;
+import org.apache.qpid.proton.type.messaging.Released;
+import org.apache.qpid.proton.type.transport.DeliveryState;
+import org.fusesource.hawtbuf.Buffer;
+
+import java.io.UnsupportedEncodingException;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class AmqpSender extends AmqpLink {
+
+    private  byte[] EMPTY_BYTE_ARRAY = new byte[]{};
+    long nextTagId = 0;
+    HashSet<byte[]> tagCache = new HashSet<byte[]>();
+
+    final AmqpSession parent;
+    private final QoS qos;
+    final SenderImpl sender;
+
+    public AmqpSender(AmqpSession parent, SenderImpl sender, QoS qos) {
+        this.parent = parent;
+        this.sender = sender;
+        this.qos = qos;
+        attach();
+        getConnection().senders.add(this);
+    }
+
+    @Override
+    public void close() {
+        super.close();
+        getConnection().senders.remove(this);
+    }
+
+    @Override
+    protected SenderImpl getEndpoint() {
+        return sender;
+    }
+
+    @Override
+    protected AmqpSession getParent() {
+        return parent;
+    }
+
+    final LinkedList<MessageDelivery> outbound = new LinkedList<MessageDelivery>();
+    long outboundBufferSize;
+
+    public MessageDelivery send(Message message) {
+        assertExecuting();
+        MessageDelivery rc = new MessageDelivery(message) {
+            @Override
+            AmqpLink link() {
+                return AmqpSender.this;
+            }
+
+            @Override
+            public void redeliver(boolean incrementDeliveryCounter) {
+                super.redeliver(incrementDeliveryCounter);
+                outbound.add(this);
+                outboundBufferSize += initialSize;
+                defer(deferedPumpDeliveries);
+            }
+        };
+        outbound.add(rc);
+        outboundBufferSize += rc.initialSize;
+        pumpDeliveries();
+        pumpOut();
+        return rc;
+    }
+
+    Buffer currentBuffer;
+    DeliveryImpl currentDelivery;
+
+    Defer deferedPumpDeliveries = new Defer() {
+        public void run() {
+            pumpDeliveries();
+        }
+    };
+
+    public long getOverflowBufferSize() {
+        return outboundBufferSize;
+    }
+
+    protected void pumpDeliveries() {
+        assertExecuting();
+        try {
+            while(true) {
+                while( currentBuffer !=null ) {
+                    if( sender.getCredit() > 0 ) {
+                        int sent = sender.send(currentBuffer.data, currentBuffer.offset, currentBuffer.length);
+                        currentBuffer.moveHead(sent);
+                        if( currentBuffer.length == 0 ) {
+                            DeliveryImpl current = currentDelivery;
+                            MessageDelivery md = (MessageDelivery) current.getContext();
+                            currentBuffer = null;
+                            currentDelivery = null;
+                            if( qos == QoS.AT_MOST_ONCE ) {
+                                current.settle();
+                            } else {
+                                sender.advance();
+                            }
+                            md.fireWatches();
+                        }
+                    } else {
+                        return;
+                    }
+                }
+
+                if( outbound.isEmpty() ) {
+                    return;
+                }
+
+                final MessageDelivery md = outbound.removeFirst();
+                outboundBufferSize -= md.initialSize;
+                currentBuffer = md.encoded();
+                if( qos == QoS.AT_MOST_ONCE ) {
+                    currentDelivery = sender.delivery(EMPTY_BYTE_ARRAY, 0, 0);
+                } else {
+                    final byte[] tag = nextTag();
+                    currentDelivery = sender.delivery(tag, 0, tag.length);
+                }
+                md.delivery = currentDelivery;
+                currentDelivery.setContext(md);
+            }
+        } finally {
+            fireWatches();
+        }
+    }
+
+    @Override
+    protected void processDelivery(Delivery delivery) {
+        final MessageDelivery md  = (MessageDelivery) delivery.getContext();
+        if( delivery.remotelySettled() ) {
+            if( delivery.getTag().length > 0 ) {
+                checkinTag(delivery.getTag());
+            }
+
+            final DeliveryState state = delivery.getRemoteState();
+            if( state==null || state instanceof Accepted) {
+                if( !delivery.remotelySettled() ) {
+                    delivery.disposition(new Accepted());
+                }
+            } else if( state instanceof Rejected) {
+                // re-deliver /w incremented delivery counter.
+                md.delivery = null;
+                md.incrementDeliveryCount();
+                outbound.addLast(md);
+            } else if( state instanceof Released) {
+                // re-deliver && don't increment the counter.
+                md.delivery = null;
+                outbound.addLast(md);
+            } else if( state instanceof Modified) {
+                Modified modified = (Modified) state;
+                if ( modified.getDeliveryFailed() ) {
+                  // increment delivery counter..
+                  md.incrementDeliveryCount();
+                }
+            }
+            delivery.settle();
+        }
+        md.fireWatches();
+    }
+
+    byte[] nextTag() {
+        byte[] rc;
+        if (tagCache != null && !tagCache.isEmpty()) {
+            final Iterator<byte[]> iterator = tagCache.iterator();
+            rc = iterator.next();
+            iterator.remove();
+        } else {
+            try {
+                rc = Long.toHexString(nextTagId++).getBytes("UTF-8");
+            } catch (UnsupportedEncodingException e) {
+                throw new RuntimeException(e);
+            }
+        }
+        return rc;
+    }
+
+    void checkinTag(byte[] data) {
+        if( tagCache.size() < 1024 ) {
+            tagCache.add(data);
+        }
+    }
+
+    public void onOverflowBufferDrained(final Callback<Void> cb) {
+        addWatch(new Watch() {
+            @Override
+            public boolean execute() {
+                if (outboundBufferSize==0) {
+                    cb.onSuccess(null);
+                    return true;
+                }
+                return false;
+            }
+        });
+    }
+}

Added: qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSession.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSession.java?rev=1414946&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSession.java (added)
+++ qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSession.java Wed Nov 28 21:48:33 2012
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.api;
+
+import org.apache.qpid.proton.engine.Endpoint;
+import org.apache.qpid.proton.engine.Link;
+import org.apache.qpid.proton.engine.impl.ReceiverImpl;
+import org.apache.qpid.proton.engine.impl.SenderImpl;
+import org.apache.qpid.proton.engine.impl.SessionImpl;
+import org.apache.qpid.proton.message.Message;
+import org.apache.qpid.proton.type.Binary;
+import org.apache.qpid.proton.type.messaging.*;
+import org.apache.qpid.proton.type.transport.SenderSettleMode;
+
+import java.util.UUID;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class AmqpSession extends AmqpEndpointBase {
+
+    final AmqpConnection parent;
+    final SessionImpl session;
+
+
+    public AmqpSession(AmqpConnection parent, SessionImpl session) {
+        this.parent = parent;
+        this.session = session;
+        attach();
+    }
+
+    @Override
+    protected Endpoint getEndpoint() {
+        return session;
+    }
+
+    @Override
+    protected AmqpConnection getParent() {
+        return parent;
+    }
+
+    public AmqpSender createSender(Target target) {
+        return createSender(target, QoS.AT_LEAST_ONCE);
+    }
+
+    public AmqpSender createSender(Target target, QoS qos) {
+        return createSender(target, qos, UUID.randomUUID().toString());
+    }
+
+    public AmqpSender createSender(Target target, QoS qos, String name) {
+        assertExecuting();
+        SenderImpl sender = session.sender(name);
+        attach();
+//        Source source = new Source();
+//        source.setAddress(UUID.randomUUID().toString());
+//        sender.setSource(source);
+        sender.setTarget(target);
+        configureQos(sender, qos);
+        sender.open();
+        pumpOut();
+        return new AmqpSender(this, sender, qos);
+    }
+
+    public AmqpReceiver createReceiver(Source source) {
+        return createReceiver(source, QoS.AT_LEAST_ONCE);
+    }
+
+    public AmqpReceiver createReceiver(Source source, QoS qos) {
+        return createReceiver(source, qos, 100);
+    }
+
+    public AmqpReceiver createReceiver(Source source, QoS qos, int prefetch) {
+        return createReceiver(source, qos, prefetch,  UUID.randomUUID().toString());
+    }
+
+    public AmqpReceiver createReceiver(Source source, QoS qos, int prefetch, String name) {
+        assertExecuting();
+        ReceiverImpl receiver = session.receiver(name);
+        receiver.setSource(source);
+//        Target target = new Target();
+//        target.setAddress(UUID.randomUUID().toString());
+//        receiver.setTarget(target);
+        receiver.flow(prefetch);
+        configureQos(receiver, qos);
+        receiver.open();
+        pumpOut();
+        return new AmqpReceiver(this, receiver, qos);
+    }
+
+    private void configureQos(Link link, QoS qos) {
+        switch (qos) {
+            case AT_MOST_ONCE:
+                link.setSenderSettleMode(SenderSettleMode.SETTLED);
+                link.setReceiverSettleMode(SenderSettleMode.UNSETTLED);
+                break;
+            case AT_LEAST_ONCE:
+                link.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+                link.setReceiverSettleMode(SenderSettleMode.SETTLED);
+                break;
+            case EXACTLY_ONCE:
+                link.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+                link.setReceiverSettleMode(SenderSettleMode.MIXED);
+                break;
+        }
+    }
+
+    public Message createTextMessage(String value) {
+        Message msg = new Message();
+        Section body = new AmqpValue(value);
+        msg.setBody(body);
+        return msg;
+    }
+
+    public Message createBinaryMessage(byte value[]) {
+        return createBinaryMessage(value, 0, value.length);
+    }
+
+    public Message createBinaryMessage(byte value[], int offset, int len) {
+        Message msg = new Message();
+        Data body = new Data(new Binary(value, offset,len));
+        msg.setBody(body);
+        return msg;
+    }
+}

Added: qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Callback.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Callback.java?rev=1414946&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Callback.java (added)
+++ qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Callback.java Wed Nov 28 21:48:33 2012
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.hawtdispatch.api;
+
+/**
+ * <p>
+ * Function Result that carries one value.
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface Callback<T> {
+    public void onSuccess(T value);
+    public void onFailure(Throwable value);
+}

Added: qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/ChainedCallback.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/ChainedCallback.java?rev=1414946&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/ChainedCallback.java (added)
+++ qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/ChainedCallback.java Wed Nov 28 21:48:33 2012
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.hawtdispatch.api;
+
+/**
+ * <p>
+ * Function Result that carries one value.
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract public class ChainedCallback<In,Out> implements Callback<In> {
+
+    public final Callback<Out> next;
+
+    public ChainedCallback(Callback<Out> next) {
+        this.next = next;
+    }
+
+    public void onFailure(Throwable value) {
+        next.onFailure(value);
+    }
+}

Added: qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/DeliveryAttachment.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/DeliveryAttachment.java?rev=1414946&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/DeliveryAttachment.java (added)
+++ qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/DeliveryAttachment.java Wed Nov 28 21:48:33 2012
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.api;
+
+import org.apache.qpid.proton.engine.Delivery;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract public class DeliveryAttachment {
+    abstract void processDelivery(Delivery delivery);
+}

Added: qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Future.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Future.java?rev=1414946&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Future.java (added)
+++ qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Future.java Wed Nov 28 21:48:33 2012
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.hawtdispatch.api;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * <p>A simplified Future function results interface.</p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface Future<T> {
+    T await() throws Exception;
+    T await(long amount, TimeUnit unit) throws Exception;
+    void then(Callback<T> callback);
+
+}

Added: qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/MessageDelivery.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/MessageDelivery.java?rev=1414946&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/MessageDelivery.java (added)
+++ qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/MessageDelivery.java Wed Nov 28 21:48:33 2012
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.api;
+
+import org.apache.qpid.proton.hawtdispatch.impl.DroppingWritableBuffer;
+import org.apache.qpid.proton.hawtdispatch.impl.Watch;
+import org.apache.qpid.proton.hawtdispatch.impl.WatchBase;
+import org.apache.qpid.proton.codec.CompositeWritableBuffer;
+import org.apache.qpid.proton.codec.WritableBuffer;
+import org.apache.qpid.proton.engine.impl.DeliveryImpl;
+import org.apache.qpid.proton.message.Message;
+import org.apache.qpid.proton.type.transport.DeliveryState;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtdispatch.Task;
+
+import java.nio.ByteBuffer;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public abstract class MessageDelivery extends WatchBase {
+
+    final int initialSize;
+    private Message message;
+    private Buffer encoded;
+    public DeliveryImpl delivery;
+    private int sizeHint = 1024*4;
+
+    static Buffer encode(Message message, int sizeHint) {
+        ByteBuffer buffer = ByteBuffer.wrap(new byte[sizeHint]);
+        DroppingWritableBuffer overflow = new DroppingWritableBuffer();
+        int c = message.encode(new CompositeWritableBuffer(new WritableBuffer.ByteBufferWrapper(buffer), overflow));
+        if( overflow.position() > 0 ) {
+            buffer = ByteBuffer.wrap(new byte[sizeHint+overflow.position()]);
+            c = message.encode(new WritableBuffer.ByteBufferWrapper(buffer));
+        }
+        return new Buffer(buffer.array(), 0, c);
+    }
+
+    static Message decode(Buffer buffer) {
+        Message msg = new Message();
+        int offset = buffer.offset;
+        int len = buffer.length;
+        while( len > 0 ) {
+            int decoded = msg.decode(buffer.data, offset, len);
+            assert decoded > 0: "Make progress decoding the message";
+            offset += decoded;
+            len -= decoded;
+        }
+        return msg;
+    }
+
+    public MessageDelivery(Message message) {
+        this(message, encode(message, 1024*4));
+    }
+
+    public MessageDelivery(Buffer encoded) {
+        this(null, encoded);
+    }
+
+    public MessageDelivery(Message message, Buffer encoded) {
+        this.message = message;
+        this.encoded = encoded;
+        sizeHint = this.encoded.length;
+        initialSize = sizeHint;
+    }
+
+    public Message getMessage() {
+        if( message == null ) {
+            message = decode(encoded);
+        }
+        return message;
+    }
+
+    public Buffer encoded() {
+        if( encoded == null ) {
+            encoded = encode(message, sizeHint);
+            sizeHint = encoded.length;
+        }
+        return encoded;
+    }
+
+    public boolean isSettled() {
+        return delivery!=null && delivery.isSettled();
+    }
+
+    public DeliveryState getRemoteState() {
+        return delivery==null ? null : delivery.getRemoteState();
+    }
+
+    public DeliveryState getLocalState() {
+        return delivery==null ? null : delivery.getLocalState();
+    }
+
+    public void onEncoded(final Callback<Void> cb) {
+        addWatch(new Watch() {
+            @Override
+            public boolean execute() {
+                if( delivery!=null ) {
+                    cb.onSuccess(null);
+                    return true;
+                }
+                return false;
+            }
+        });
+    }
+
+    /**
+     * @return the remote delivery state when it changes.
+     * @throws Exception
+     */
+    public DeliveryState getRemoteStateChange() throws Exception {
+        AmqpEndpointBase.assertNotOnDispatchQueue();
+        return getRemoteStateChangeFuture().await();
+    }
+
+    /**
+     * @return the future remote delivery state when it changes.
+     */
+    public Future<DeliveryState> getRemoteStateChangeFuture() {
+        final Promise<DeliveryState> rc = new Promise<DeliveryState>();
+        link().queue().execute(new Task() {
+            @Override
+            public void run() {
+                onRemoteStateChange(rc);
+            }
+        });
+        return rc;
+    }
+
+    abstract AmqpLink link();
+
+    boolean watchingRemoteStateChange;
+    public void onRemoteStateChange(final Callback<DeliveryState> cb) {
+        watchingRemoteStateChange = true;
+        final DeliveryState original = delivery.getRemoteState();
+        addWatch(new Watch() {
+            @Override
+            public boolean execute() {
+                if (original == null) {
+                    if( delivery.getRemoteState()!=null ) {
+                        cb.onSuccess(delivery.getRemoteState());
+                        watchingRemoteStateChange = false;
+                        return true;
+                    }
+                } else {
+                    if( !original.equals(delivery.getRemoteState()) ) {
+                        cb.onSuccess(delivery.getRemoteState());
+                        watchingRemoteStateChange = false;
+                        return true;
+                    }
+                }
+                return false;
+            }
+        });
+    }
+
+    /**
+     * @return the remote delivery state once settled.
+     * @throws Exception
+     */
+    public DeliveryState getSettle() throws Exception {
+        AmqpEndpointBase.assertNotOnDispatchQueue();
+        return getSettleFuture().await();
+    }
+
+    /**
+     * @return the future remote delivery state once the delivery is settled.
+     */
+    public Future<DeliveryState> getSettleFuture() {
+        final Promise<DeliveryState> rc = new Promise<DeliveryState>();
+        link().queue().execute(new Task() {
+            @Override
+            public void run() {
+                onSettle(rc);
+            }
+        });
+        return rc;
+    }
+
+    public void onSettle(final Callback<DeliveryState> cb) {
+        addWatch(new Watch() {
+            @Override
+            public boolean execute() {
+                if( delivery!=null && delivery.isSettled() ) {
+                    cb.onSuccess(delivery.getRemoteState());
+                    return true;
+                }
+                return false;
+            }
+        });
+    }
+
+    @Override
+    protected void fireWatches() {
+        super.fireWatches();
+    }
+
+    void incrementDeliveryCount() {
+        Message msg = getMessage();
+        msg.setDeliveryCount(msg.getDeliveryCount()+1);
+        encoded = null;
+    }
+
+    public void redeliver(boolean incrementDeliveryCounter) {
+        if( incrementDeliveryCounter ) {
+            incrementDeliveryCount();
+        }
+    }
+
+    public void settle() {
+        if( !delivery.isSettled() ) {
+            delivery.settle();
+        }
+    }
+}

Added: qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Promise.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Promise.java?rev=1414946&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Promise.java (added)
+++ qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Promise.java Wed Nov 28 21:48:33 2012
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.hawtdispatch.api;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class Promise<T> implements Callback<T>, Future<T> {
+
+    private final CountDownLatch latch = new CountDownLatch(1);
+    Callback<T> next;
+    Throwable error;
+    T value;
+
+    public void onFailure(Throwable value) {
+        Callback<T> callback = null;
+        synchronized(this)  {
+            error = value;
+            latch.countDown();
+            callback = next;
+        }
+        if( callback!=null ) {
+            callback.onFailure(value);
+        }
+    }
+
+    public void onSuccess(T value) {
+        Callback<T> callback = null;
+        synchronized(this)  {
+            this.value = value;
+            latch.countDown();
+            callback = next;
+        }
+        if( callback!=null ) {
+            callback.onSuccess(value);
+        }
+    }
+
+    public void then(Callback<T> callback) {
+        boolean fire = false;
+        synchronized(this)  {
+            next = callback;
+            if( latch.getCount() == 0 ) {
+                fire = true;
+            }
+        }
+        if( fire ) {
+            if( error!=null ) {
+                callback.onFailure(error);
+            } else {
+                callback.onSuccess(value);
+            }
+        }
+    }
+
+    public T await(long amount, TimeUnit unit) throws Exception {
+        if( latch.await(amount, unit) ) {
+            return get();
+        } else {
+            throw new TimeoutException();
+        }
+    }
+
+    public T await() throws Exception {
+        latch.await();
+        return get();
+    }
+
+    private T get() throws Exception {
+        Throwable e = error;
+        if( e !=null ) {
+            if( e instanceof RuntimeException ) {
+                throw (RuntimeException) e;
+            } else if( e instanceof Exception) {
+                throw (Exception) e;
+            } else if( e instanceof Error) {
+                throw (Error) e;
+            } else {
+                // don't expect to hit this case.
+                throw new RuntimeException(e);
+            }
+        }
+        return value;
+    }
+
+}

Added: qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/QoS.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/QoS.java?rev=1414946&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/QoS.java (added)
+++ qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/QoS.java Wed Nov 28 21:48:33 2012
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.hawtdispatch.api;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public enum QoS {
+    AT_MOST_ONCE,
+    AT_LEAST_ONCE,
+    EXACTLY_ONCE
+}

Added: qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/TransportState.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/TransportState.java?rev=1414946&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/TransportState.java (added)
+++ qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/TransportState.java Wed Nov 28 21:48:33 2012
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.api;
+
+/**
+* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+*/
+public enum TransportState {
+    CREATED,
+    CONNECTING,
+    CONNECTED,
+    DISCONNECTING,
+    DISCONNECTED
+}

Added: qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpHeader.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpHeader.java?rev=1414946&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpHeader.java (added)
+++ qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpHeader.java Wed Nov 28 21:48:33 2012
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.hawtdispatch.impl;
+
+import org.fusesource.hawtbuf.Buffer;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class AmqpHeader {
+
+    static final Buffer PREFIX = new Buffer(new byte[]{
+      'A', 'M', 'Q', 'P'
+    });
+
+    private Buffer buffer;
+
+    public AmqpHeader(){
+        this(new Buffer(new byte[]{
+          'A', 'M', 'Q', 'P', 0, 1, 0, 0
+        }));
+    }
+
+    public AmqpHeader(Buffer buffer){
+        setBuffer(buffer);
+    }
+
+    public int getProtocolId() {
+        return buffer.get(4) & 0xFF;
+    }
+    public void setProtocolId(int value) {
+        buffer.data[buffer.offset+4] = (byte) value;
+    }
+
+    public int getMajor() {
+        return buffer.get(5) & 0xFF;
+    }
+    public void setMajor(int value) {
+        buffer.data[buffer.offset+5] = (byte) value;
+    }
+
+    public int getMinor() {
+        return buffer.get(6) & 0xFF;
+    }
+    public void setMinor(int value) {
+        buffer.data[buffer.offset+6] = (byte) value;
+    }
+
+    public int getRevision() {
+        return buffer.get(7) & 0xFF;
+    }
+    public void setRevision(int value) {
+        buffer.data[buffer.offset+7] = (byte) value;
+    }
+
+    public Buffer getBuffer() {
+        return buffer;
+    }
+    public void setBuffer(Buffer value) {
+        if( !value.startsWith(PREFIX) || value.length()!=8 ) {
+            throw new IllegalArgumentException("Not an AMQP header buffer");
+        }
+        buffer = value.buffer();
+    }
+
+
+    @Override
+    public String toString() {
+        return buffer.toString();
+    }
+}

Added: qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpListener.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpListener.java?rev=1414946&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpListener.java (added)
+++ qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpListener.java Wed Nov 28 21:48:33 2012
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.impl;
+
+import org.apache.qpid.proton.engine.*;
+import org.apache.qpid.proton.engine.impl.EndpointImpl;
+import org.apache.qpid.proton.engine.impl.TransportImpl;
+import org.fusesource.hawtdispatch.Task;
+
+import java.io.IOException;
+
+/**
+* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+*/
+public class AmqpListener {
+
+    public Sasl processSaslConnect(TransportImpl transport) {
+        return null;
+    }
+
+    public Sasl processSaslEvent(Sasl sasl) {
+        return sasl;
+    }
+
+    public void processRemoteOpen(Endpoint endpoint, Task onComplete) {
+        ((EndpointImpl)endpoint).setLocalError(new EndpointError("error", "Not supported"));
+        endpoint.close();
+        onComplete.run();
+    }
+
+    public void processRemoteClose(Endpoint endpoint, Task onComplete) {
+        endpoint.close();
+        onComplete.run();
+    }
+
+    public void processDelivery(Delivery delivery){
+    }
+
+    public void processTransportConnected() {
+    }
+
+    public void processTransportFailure(IOException e) {
+        this.processFailure(e);
+    }
+
+    public void processFailure(Throwable e) {
+        e.printStackTrace();
+    }
+
+    public void processRefill() {
+    }
+
+}

Added: qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpProtocolCodec.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpProtocolCodec.java?rev=1414946&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpProtocolCodec.java (added)
+++ qpid/proton/trunk/proton-j/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpProtocolCodec.java Wed Nov 28 21:48:33 2012
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.impl;
+
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtdispatch.transport.AbstractProtocolCodec;
+
+import java.io.IOException;
+
+/**
+ * A HawtDispatch protocol codec that encodes/decodes AMQP 1.0 frames.
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class AmqpProtocolCodec extends AbstractProtocolCodec {
+
+    int maxFrameSize = 4*1024*1024;
+
+    @Override
+    protected void encode(Object object) throws IOException {
+        nextWriteBuffer.write((Buffer) object);
+    }
+
+    @Override
+    protected Action initialDecodeAction() {
+        return new Action() {
+            public Object apply() throws IOException {
+                Buffer magic = readBytes(8);
+                if (magic != null) {
+                    nextDecodeAction = readFrameSize;
+                    return new AmqpHeader(magic);
+                } else {
+                    return null;
+                }
+            }
+        };
+    }
+
+    private final Action readFrameSize = new Action() {
+        public Object apply() throws IOException {
+            Buffer sizeBytes = peekBytes(4);
+            if (sizeBytes != null) {
+                int size = sizeBytes.bigEndianEditor().readInt();
+                if (size < 8) {
+                    throw new IOException(String.format("specified frame size %d is smaller than minimum frame size", size));
+                }
+                if( size > maxFrameSize ) {
+                    throw new IOException(String.format("specified frame size %d is larger than maximum frame size", size));
+                }
+
+                // TODO: check frame min and max size..
+                nextDecodeAction = readFrame(size);
+                return nextDecodeAction.apply();
+            } else {
+                return null;
+            }
+        }
+    };
+
+
+    private final Action readFrame(final int size) {
+        return new Action() {
+            public Object apply() throws IOException {
+                Buffer frameData = readBytes(size);
+                if (frameData != null) {
+                    nextDecodeAction = readFrameSize;
+                    return frameData;
+                } else {
+                    return null;
+                }
+            }
+        };
+    }
+
+    public int getReadBytesPendingDecode() {
+        return readBuffer.position() - readStart;
+    }
+
+    public void skipProtocolHeader() {
+        nextDecodeAction = readFrameSize;
+    }
+
+    public void readProtocolHeader() {
+        nextDecodeAction = initialDecodeAction();
+    }
+
+    public int getMaxFrameSize() {
+        return maxFrameSize;
+    }
+
+    public void setMaxFrameSize(int maxFrameSize) {
+        this.maxFrameSize = maxFrameSize;
+    }
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org