You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2014/11/28 14:50:03 UTC

[25/51] [abbrv] qpid-proton git commit: Update with merge of latest proton codebase and checked against latest emscripten incoming branch

Update with merge of latest proton codebase and checked against latest emscripten incoming branch

git-svn-id: https://svn.apache.org/repos/asf/qpid/proton/branches/fadams-javascript-binding@1622849 13f79535-47bb-0310-9956-ffa450edef68


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/1c2f4894
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/1c2f4894
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/1c2f4894

Branch: refs/heads/master
Commit: 1c2f4894270775269abb6e8303e75684331a3212
Parents: 4a78327
Author: fadams <fa...@unknown>
Authored: Sat Sep 6 11:23:10 2014 +0000
Committer: fadams <fa...@unknown>
Committed: Sat Sep 6 11:23:10 2014 +0000

----------------------------------------------------------------------
 CMakeLists.txt                                  |   14 +
 DEVELOPERS                                      |   25 +-
 config.bat.in                                   |   66 +
 config.sh                                       |   79 --
 config.sh.in                                    |   61 +
 contrib/proton-hawtdispatch/pom.xml             |   61 +
 .../hawtdispatch/api/AmqpConnectOptions.java    |  228 ++++
 .../proton/hawtdispatch/api/AmqpConnection.java |  201 ++++
 .../hawtdispatch/api/AmqpDeliveryListener.java  |   32 +
 .../hawtdispatch/api/AmqpEndpointBase.java      |  158 +++
 .../qpid/proton/hawtdispatch/api/AmqpLink.java  |   27 +
 .../proton/hawtdispatch/api/AmqpReceiver.java   |  141 +++
 .../proton/hawtdispatch/api/AmqpSender.java     |  227 ++++
 .../proton/hawtdispatch/api/AmqpSession.java    |  141 +++
 .../qpid/proton/hawtdispatch/api/Callback.java  |   29 +
 .../hawtdispatch/api/ChainedCallback.java       |   37 +
 .../hawtdispatch/api/DeliveryAttachment.java    |   27 +
 .../qpid/proton/hawtdispatch/api/Future.java    |   31 +
 .../hawtdispatch/api/MessageDelivery.java       |  226 ++++
 .../qpid/proton/hawtdispatch/api/Promise.java   |  107 ++
 .../qpid/proton/hawtdispatch/api/QoS.java       |   26 +
 .../proton/hawtdispatch/api/TransportState.java |   29 +
 .../proton/hawtdispatch/impl/AmqpHeader.java    |   85 ++
 .../proton/hawtdispatch/impl/AmqpListener.java  |   71 ++
 .../hawtdispatch/impl/AmqpProtocolCodec.java    |  109 ++
 .../proton/hawtdispatch/impl/AmqpTransport.java |  587 +++++++++
 .../qpid/proton/hawtdispatch/impl/Defer.java    |   27 +
 .../hawtdispatch/impl/EndpointContext.java      |   76 ++
 .../qpid/proton/hawtdispatch/impl/Support.java  |   41 +
 .../qpid/proton/hawtdispatch/impl/Watch.java    |   26 +
 .../proton/hawtdispatch/impl/WatchBase.java     |   54 +
 .../proton/hawtdispatch/api/SampleTest.java     |  292 +++++
 .../hawtdispatch/test/MessengerServer.java      |  135 +++
 contrib/proton-jms/pom.xml                      |   50 +
 .../jms/AMQPNativeInboundTransformer.java       |   40 +
 .../jms/AMQPNativeOutboundTransformer.java      |  103 ++
 .../proton/jms/AMQPRawInboundTransformer.java   |   47 +
 .../proton/jms/AutoOutboundTransformer.java     |   46 +
 .../apache/qpid/proton/jms/EncodedMessage.java  |   75 ++
 .../qpid/proton/jms/InboundTransformer.java     |  314 +++++
 .../jms/JMSMappingInboundTransformer.java       |  102 ++
 .../jms/JMSMappingOutboundTransformer.java      |  246 ++++
 .../org/apache/qpid/proton/jms/JMSVendor.java   |   45 +
 .../qpid/proton/jms/OutboundTransformer.java    |   52 +
 examples/CMakeLists.txt                         |    1 +
 examples/messenger/java/recv                    |    2 +-
 examples/messenger/java/send                    |    2 +-
 .../org/apache/qpid/proton/example/Send.java    |   10 +-
 examples/messenger/javascript/send.html         |   12 +-
 examples/messenger/perl/recv.pl                 |   34 +-
 examples/messenger/perl/send.pl                 |   16 +-
 examples/messenger/perl/server.pl               |    6 +-
 proton-c/CMakeLists.txt                         |   49 +-
 proton-c/bindings/CMakeLists.txt                |    2 +
 proton-c/bindings/javascript/CMakeLists.txt     |   10 +-
 proton-c/bindings/perl/CMakeLists.txt           |    4 +-
 .../bindings/perl/lib/qpid/proton/Constants.pm  |    7 +-
 proton-c/bindings/perl/lib/qpid/proton/Data.pm  |   93 +-
 .../bindings/perl/lib/qpid/proton/Message.pm    |   28 +-
 proton-c/bindings/perl/lib/qpid/proton/utils.pm |   31 +
 proton-c/bindings/perl/lib/qpid_proton.pm       |    1 +
 proton-c/bindings/perl/perl.i                   |    7 -
 proton-c/bindings/php/CMakeLists.txt            |    3 +
 proton-c/bindings/php/proton.php                |    2 +-
 proton-c/bindings/python/CMakeLists.txt         |   90 +-
 proton-c/bindings/python/cproton.i              |  379 ++++++
 proton-c/bindings/python/proton.py              |  242 ++--
 proton-c/bindings/python/python.i               |  378 ------
 proton-c/bindings/python/setup.py.in            |  107 ++
 proton-c/bindings/ruby/CMakeLists.txt           |    4 +-
 proton-c/bindings/ruby/lib/qpid_proton.rb       |    1 +
 .../bindings/ruby/lib/qpid_proton/version.rb    |   29 +
 proton-c/bindings/ruby/qpid_proton.gemspec      |    1 +
 proton-c/bindings/ruby/ruby.i                   |    1 -
 proton-c/docs/man/CMakeLists.txt                |    2 +-
 proton-c/docs/man/proton-dump.1                 |   19 +
 proton-c/include/proton/buffer.h                |    6 +
 proton-c/include/proton/cproton.i               |   28 +-
 proton-c/include/proton/event.h                 |  153 ++-
 proton-c/include/proton/io.h                    |    2 +
 proton-c/include/proton/object.h                |   11 +-
 proton-c/include/proton/sasl.h                  |   15 +-
 proton-c/include/proton/selector.h              |    4 +-
 proton-c/include/proton/terminus.h              |    8 +-
 proton-c/include/proton/transport.h             |   13 +-
 proton-c/include/proton/types.h                 |    6 +-
 proton-c/src/buffer.c                           |   12 +
 proton-c/src/codec/codec.c                      |   48 +-
 proton-c/src/codec/data.h                       |   34 +-
 proton-c/src/codec/decoder.c                    |    2 +-
 proton-c/src/codec/encoder.c                    |    2 +-
 proton-c/src/dispatch_actions.h                 |   45 +
 proton-c/src/dispatcher/dispatcher.c            |   63 +-
 proton-c/src/dispatcher/dispatcher.h            |   21 +-
 proton-c/src/engine/engine-internal.h           |   77 +-
 proton-c/src/engine/engine.c                    |  186 +--
 proton-c/src/engine/event.c                     |  249 ++--
 proton-c/src/engine/event.h                     |    9 +-
 proton-c/src/error.c                            |    2 +-
 proton-c/src/message/message.c                  |   25 +-
 proton-c/src/messenger/messenger.c              |  137 ++-
 proton-c/src/messenger/store.c                  |   22 +-
 proton-c/src/messenger/subscription.c           |    2 +-
 proton-c/src/messenger/transform.c              |    4 +-
 proton-c/src/object/object.c                    |  104 +-
 proton-c/src/parser.c                           |    2 +-
 proton-c/src/platform.h                         |    2 +
 proton-c/src/posix/driver.c                     |   24 +-
 proton-c/src/posix/io.c                         |   12 +-
 proton-c/src/posix/selector.c                   |    4 +-
 proton-c/src/protocol.h.py                      |   98 +-
 proton-c/src/proton-dump.c                      |   28 +
 proton-c/src/proton.c                           |    4 +-
 proton-c/src/sasl/sasl.c                        |   91 +-
 proton-c/src/selectable.c                       |    2 +-
 proton-c/src/ssl/openssl.c                      |   57 +-
 proton-c/src/tests/object.c                     |   53 +-
 proton-c/src/tests/parse-url.c                  |    7 +
 proton-c/src/transport/transport.c              |  366 +++---
 proton-c/src/types.c                            |   14 +-
 proton-c/src/util.c                             |   22 +-
 proton-c/src/windows/io.c                       |  226 ++--
 proton-c/src/windows/iocp.c                     | 1138 ++++++++++++++++++
 proton-c/src/windows/iocp.h                     |  141 +++
 proton-c/src/windows/selector.c                 |  325 +++--
 proton-c/src/windows/write_pipeline.c           |  312 +++++
 .../java/org/apache/qpid/proton/Proton.java     |  168 +--
 .../org/apache/qpid/proton/ProtonFactory.java   |   31 -
 .../apache/qpid/proton/ProtonFactoryImpl.java   |   28 -
 .../apache/qpid/proton/ProtonFactoryLoader.java |  111 --
 .../amqp/messaging/DeliveryAnnotations.java     |   11 +-
 .../amqp/messaging/MessageAnnotations.java      |    8 +-
 .../org/apache/qpid/proton/codec/Codec.java     |   40 +
 .../java/org/apache/qpid/proton/codec/Data.java |   10 +
 .../apache/qpid/proton/codec/DataFactory.java   |   28 -
 .../qpid/proton/codec/impl/DataFactoryImpl.java |   35 -
 .../org/apache/qpid/proton/driver/Driver.java   |   10 +
 .../qpid/proton/driver/DriverFactory.java       |   29 -
 .../qpid/proton/driver/impl/ConnectorImpl.java  |    1 -
 .../proton/driver/impl/DriverFactoryImpl.java   |   35 -
 .../qpid/proton/driver/impl/DriverImpl.java     |    2 +-
 .../apache/qpid/proton/engine/Collector.java    |    8 +
 .../apache/qpid/proton/engine/Connection.java   |   11 +
 .../org/apache/qpid/proton/engine/Engine.java   |   60 +
 .../qpid/proton/engine/EngineFactory.java       |   29 -
 .../org/apache/qpid/proton/engine/Event.java    |   41 +-
 .../org/apache/qpid/proton/engine/Sasl.java     |   10 +-
 .../apache/qpid/proton/engine/SslDomain.java    |   10 +
 .../qpid/proton/engine/SslPeerDetails.java      |   12 +-
 .../apache/qpid/proton/engine/Transport.java    |   30 +-
 .../qpid/proton/engine/impl/CollectorImpl.java  |   42 +-
 .../qpid/proton/engine/impl/ConnectionImpl.java |   67 +-
 .../qpid/proton/engine/impl/EndpointImpl.java   |   54 +-
 .../proton/engine/impl/EngineFactoryImpl.java   |   59 -
 .../qpid/proton/engine/impl/EventImpl.java      |  111 +-
 .../qpid/proton/engine/impl/FrameHandler.java   |    3 +-
 .../qpid/proton/engine/impl/FrameParser.java    |   58 +-
 .../qpid/proton/engine/impl/FrameWriter.java    |   20 +-
 .../qpid/proton/engine/impl/LinkImpl.java       |   36 +-
 .../qpid/proton/engine/impl/ReceiverImpl.java   |    7 +-
 .../org/apache/qpid/proton/engine/impl/Ref.java |   46 +
 .../qpid/proton/engine/impl/SaslImpl.java       |   18 +-
 .../qpid/proton/engine/impl/SenderImpl.java     |    6 +-
 .../qpid/proton/engine/impl/SessionImpl.java    |   42 +-
 .../proton/engine/impl/TransportFactory.java    |   34 -
 .../engine/impl/TransportFactoryImpl.java       |   41 -
 .../qpid/proton/engine/impl/TransportImpl.java  |  298 +++--
 .../qpid/proton/engine/impl/TransportLink.java  |   28 +-
 .../engine/impl/TransportOutputAdaptor.java     |   66 +-
 .../engine/impl/TransportOutputWriter.java      |    4 +-
 .../proton/engine/impl/TransportSession.java    |   32 +-
 .../impl/ssl/SimpleSslTransportWrapper.java     |   33 +-
 .../proton/engine/impl/ssl/SslDomainImpl.java   |    1 -
 .../engine/impl/ssl/SslPeerDetailsImpl.java     |    1 -
 .../org/apache/qpid/proton/message/Message.java |   23 +
 .../qpid/proton/message/MessageFactory.java     |   37 -
 .../proton/message/impl/MessageFactoryImpl.java |   54 -
 .../qpid/proton/message/impl/MessageImpl.java   |   12 +-
 .../apache/qpid/proton/messenger/Messenger.java |   14 +
 .../qpid/proton/messenger/MessengerFactory.java |   28 -
 .../messenger/impl/MessengerFactoryImpl.java    |   42 -
 .../proton/messenger/impl/MessengerImpl.java    |    9 +-
 proton-j/src/main/resources/cengine.py          |  140 ++-
 proton-j/src/main/resources/csasl.py            |   10 +-
 .../qpid/proton/ProtonFactoryLoaderTest.java    |  129 --
 .../proton/engine/impl/FrameParserTest.java     |   16 +-
 .../proton/engine/impl/TransportImplTest.java   |   15 +
 .../engine/impl/TransportOutputAdaptorTest.java |    4 +-
 .../impl/ssl/SimpleSslTransportWrapperTest.java |   13 +-
 .../DummyProtonCFactory.java                    |   29 -
 .../DummyProtonFactory.java                     |   25 -
 .../DummyProtonJFactory.java                    |   29 -
 .../systemtests/ProtonEngineExampleTest.java    |   34 +-
 .../proton/systemtests/ProtonFactoryTest.java   |   60 -
 .../qpid/proton/systemtests/SimpleTest.java     |   13 +-
 .../systemtests/engine/ConnectionTest.java      |   31 +-
 .../engine/ProtonFactoryTestFixture.java        |   54 -
 .../systemtests/engine/TransportTest.java       |    6 +-
 .../org/apache/qpid/proton/InteropTest.java     |    5 +-
 tests/python/proton_tests/common.py             |   81 +-
 tests/python/proton_tests/engine.py             |  242 +++-
 tests/python/proton_tests/messenger.py          |   98 +-
 tests/python/proton_tests/sasl.py               |   77 +-
 tests/python/proton_tests/ssl.py                |   72 +-
 tests/python/proton_tests/transport.py          |  117 +-
 tests/tools/apps/c/CMakeLists.txt               |    2 +
 tests/tools/apps/c/msgr-common.h                |    4 +
 version.txt                                     |    2 +-
 208 files changed, 10388 insertions(+), 3391 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index a503467..7917258 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -18,6 +18,14 @@
 #
 cmake_minimum_required (VERSION 2.6)
 
+# Set default build type. Must come before project() which sets default to ""
+set (CMAKE_BUILD_TYPE RelWithDebInfo CACHE string
+  "Build type: Debug, Release, RelWithDebInfo or MinSizeRel (default RelWithDebInfo)")
+if (CMAKE_BUILD_TYPE MATCHES "Deb")
+  set (has_debug_symbols " (has debug symbols)")
+endif (CMAKE_BUILD_TYPE MATCHES "Deb")
+message("Build type is \"${CMAKE_BUILD_TYPE}\"${has_debug_symbols}")
+
 option(BUILD_WITH_CXX "Compile Proton using C++" OFF)
 if ("${CMAKE_GENERATOR}" MATCHES "^Visual Studio")
   # No C99 capability, use C++
@@ -152,3 +160,9 @@ if (JAVA_FOUND AND MAVEN_EXE)
 else (JAVA_FOUND AND MAVEN_EXE)
   message (STATUS "Cannot find both Java and Maven: testing disabled for Proton-J")
 endif (JAVA_FOUND AND MAVEN_EXE)
+
+# Generate test environment settings
+configure_file(${CMAKE_SOURCE_DIR}/config.sh.in
+               ${CMAKE_BINARY_DIR}/config.sh @ONLY)
+configure_file(${CMAKE_SOURCE_DIR}/config.bat.in
+               ${CMAKE_BINARY_DIR}/config.bat @ONLY)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/DEVELOPERS
----------------------------------------------------------------------
diff --git a/DEVELOPERS b/DEVELOPERS
index 97cfdaf..b0e0015 100644
--- a/DEVELOPERS
+++ b/DEVELOPERS
@@ -5,32 +5,15 @@ DEVELOPMENT ENVIRONMENT
 =======================
 
 To setup the variables for your development environment, simply source
-the file $REPO/config.sh:
+the file $BLDDIR/config.sh [$BLDDIR points to the proton build directory]:
 
   $ source config.sh
 
 This file sets the needed environment variables for all supported dynamic
-languages (Python, Perl, Ruby, PHP) as well as for Java and for testing. It,
-by default, assumes that you're building Proton in the directory:
-
-  $REPO/build
-
-where $REPO points the location where the Proton Subversion or Git repo has
-been checked out.
-
-If, however, you use a different location for your build files, then you'll want
-to set the environment variable CPROTON_BUILD to point to it first. So, for
-example, if you're building in:
-
-  /home/yourname/devel/proton/cmake
-
-then you would have the following environment variable set:
-
-  $ export CPROTON_BUILD=/hojme/yourname/devel/proton/cmake/proton-c
-
-NOTE: You need to point the environment variable to the proton-c directory under
-where your build is done.
+languages (Python, Perl, Ruby, PHP) as well as for Java and for testing.
 
+You will need to have set up the build directory first with cmake before the file
+will exist (see the instructions in README).
 
 
 MAILING LIST

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/config.bat.in
----------------------------------------------------------------------
diff --git a/config.bat.in b/config.bat.in
new file mode 100644
index 0000000..a73a88e
--- /dev/null
+++ b/config.bat.in
@@ -0,0 +1,66 @@
+REM
+REM Licensed to the Apache Software Foundation (ASF) under one
+REM or more contributor license agreements.  See the NOTICE file
+REM distributed with this work for additional information
+REM regarding copyright ownership.  The ASF licenses this file
+REM to you under the Apache License, Version 2.0 (the
+REM "License"); you may not use this file except in compliance
+REM with the License.  You may obtain a copy of the License at
+REM
+REM   http://www.apache.org/licenses/LICENSE-2.0
+REM
+REM Unless required by applicable law or agreed to in writing,
+REM software distributed under the License is distributed on an
+REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+REM KIND, either express or implied.  See the License for the
+REM specific language governing permissions and limitations
+REM under the License.
+REM
+
+REM This is a generated file and will be overwritten the next
+REM time that cmake is run.
+
+REM This build may be one of @CMAKE_CONFIGURATION_TYPES@
+REM Choose the configuration this script should reference:
+SET PROTON_BUILD_CONFIGURATION=relwithdebinfo
+
+REM PROTON_HOME  is the root of the proton checkout
+REM PROTON_BUILD is where cmake was run
+
+set PROTON_HOME=@CMAKE_SOURCE_DIR@
+set PROTON_BUILD=@CMAKE_BINARY_DIR@
+
+set PROTON_HOME=%PROTON_HOME:/=\%
+set PROTON_BUILD=%PROTON_BUILD:/=\%
+
+set PROTON_BINDINGS=%PROTON_BUILD%\proton-c\bindings
+set PROTON_JARS=%PROTON_BUILD%\proton-j\proton-j.jar
+
+REM Python & Jython
+set PYTHON_BINDINGS=%PROTON_BINDINGS%\python
+set COMMON_PYPATH=%PROTON_HOME%\tests\python;%PROTON_HOME%\proton-c\bindings\python
+set PYTHONPATH=%COMMON_PYPATH%;%PYTHON_BINDINGS%
+set JYTHONPATH=%COMMON_PYPATH%;%PROTON_HOME%\proton-j\src\main\resources;%PROTON_JARS%
+set CLASSPATH=%PROTON_JARS%
+
+REM PHP
+set PHP_BINDINGS=%PROTON_BINDINGS%\php
+if EXIST %PHP_BINDINGS% (
+    echo include_path="%PHP_BINDINGS%;%PROTON_HOME%\proton-c\bindings\php" >  %PHP_BINDINGS%\php.ini
+    echo extension="%PHP_BINDINGS%\cproton.so"                             >> %PHP_BINDINGS%\php.ini
+    set PHPRC=%PHP_BINDINGS%\php.ini
+)
+
+REM Ruby
+set RUBY_BINDINGS=%PROTON_BINDINGS%\ruby
+set RUBYLIB=%RUBY_BINDINGS%;%PROTON_HOME%\proton-c\bindings\ruby\lib;%PROTON_HOME%\tests\ruby
+
+REM Perl
+set PERL_BINDINGS=%PROTON_BINDINGS%\perl
+set PERL5LIB=%PERL5LIB%;%PERL_BINDINGS%;%PROTON_HOME%\proton-c\bindings\perl\lib
+
+REM test applications
+set PATH=%PATH%;%PROTON_BUILD%\tests\tools\apps\c
+set PATH=%PATH%;%PROTON_HOME%\tests\tools\apps\python
+set PATH=%PATH%;%PROTON_HOME%\tests\python
+set PATH=%PATH%;%PROTON_BUILD%\proton-c\%PROTON_BUILD_CONFIGURATION%

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/config.sh
----------------------------------------------------------------------
diff --git a/config.sh b/config.sh
deleted file mode 100755
index 90ad707..0000000
--- a/config.sh
+++ /dev/null
@@ -1,79 +0,0 @@
-#!/bin/bash
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-#   http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-cd $(dirname ${BASH_SOURCE[0]}) > /dev/null
-export PROTON_HOME=$(pwd)
-cd - > /dev/null
-
-if [ -z "$CPROTON_BUILD" ]; then
-    if [ -d $PROTON_HOME/build/proton-c ]; then
-        PROTON_BINDINGS=$PROTON_HOME/build/proton-c/bindings
-    else
-        PROTON_BINDINGS=$PROTON_HOME/proton-c/bindings
-    fi
-    if [ -d $PROTON_HOME/build/proton-j ]; then
-        PROTON_JARS=$PROTON_HOME/build/proton-j/proton-api/proton-api.jar:$PROTON_HOME/build/proton-j/proton/proton-j-impl.jar
-    else
-        PROTON_JARS=$PROTON_HOME/proton-j/proton-api/proton-api.jar:$PROTON_HOME/proton-j/proton/proton-j-impl.jar
-    fi
-else
-    PROTON_BINDINGS=$CPROTON_BUILD/bindings
-fi
-
-# Python & Jython
-export PYTHON_BINDINGS=$PROTON_BINDINGS/python
-export COMMON_PYPATH=$PROTON_HOME/tests/python
-export PYTHONPATH=$COMMON_PYPATH:$PROTON_HOME/proton-c/bindings/python:$PYTHON_BINDINGS
-export JYTHONPATH=$COMMON_PYPATH:$PROTON_HOME/proton-j/proton-api/src/main/resources:$PROTON_JARS
-export CLASSPATH=$PROTON_JARS
-
-# PHP
-export PHP_BINDINGS=$PROTON_BINDINGS/php
-if [ -d $PHP_BINDINGS ]; then
-    cat <<EOF > $PHP_BINDINGS/php.ini
-include_path="$PHP_BINDINGS:$PROTON_HOME/proton-c/bindings/php"
-extension="$PHP_BINDINGS/cproton.so"
-EOF
-    export PHPRC=$PHP_BINDINGS/php.ini
-fi
-
-# Ruby
-export RUBY_BINDINGS=$PROTON_BINDINGS/ruby
-export RUBYLIB=$RUBY_BINDINGS:$PROTON_HOME/proton-c/bindings/ruby/lib:$PROTON_HOME/tests/ruby
-
-# Perl
-export PERL_BINDINGS=$PROTON_BINDINGS/perl
-export PERL5LIB=$PERL5LIB:$PERL_BINDINGS:$PROTON_HOME/proton-c/bindings/perl/lib
-
-# test applications
-if [ -d $PROTON_HOME/build/tests/tools/apps/c ]; then
-    export PATH="$PATH:$PROTON_HOME/build/tests/tools/apps/c"
-fi
-if [ -d $PROTON_HOME/tests/tools/apps/python ]; then
-    export PATH="$PATH:$PROTON_HOME/tests/tools/apps/python"
-fi
-
-# test applications
-export PATH="$PATH:$PROTON_HOME/tests/python"
-
-# can the test harness use valgrind?
-if [[ -x "$(type -p valgrind)" ]] ; then
-    export VALGRIND=1
-fi

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/config.sh.in
----------------------------------------------------------------------
diff --git a/config.sh.in b/config.sh.in
new file mode 100755
index 0000000..4b60b2f
--- /dev/null
+++ b/config.sh.in
@@ -0,0 +1,61 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+PROTON_HOME=@CMAKE_SOURCE_DIR@
+PROTON_BUILD=@CMAKE_BINARY_DIR@
+
+PROTON_BINDINGS=$PROTON_BUILD/proton-c/bindings
+PROTON_JARS=$PROTON_BUILD/proton-j/proton-j.jar
+
+PYTHON_BINDINGS=$PROTON_BINDINGS/python
+PHP_BINDINGS=$PROTON_BINDINGS/php
+RUBY_BINDINGS=$PROTON_BINDINGS/ruby
+PERL_BINDINGS=$PROTON_BINDINGS/perl
+
+# Python & Jython
+COMMON_PYPATH=$PROTON_HOME/tests/python:$PROTON_HOME/proton-c/bindings/python
+export PYTHONPATH=$COMMON_PYPATH:$PYTHON_BINDINGS
+export JYTHONPATH=$COMMON_PYPATH:$PROTON_HOME/proton-j/src/main/resources:$PROTON_JARS
+export CLASSPATH=$PROTON_JARS
+
+# PHP
+if [ -d $PHP_BINDINGS ]; then
+    cat <<EOF > $PHP_BINDINGS/php.ini
+include_path="$PHP_BINDINGS:$PROTON_HOME/proton-c/bindings/php"
+extension="$PHP_BINDINGS/cproton.so"
+EOF
+    export PHPRC=$PHP_BINDINGS/php.ini
+fi
+
+# Ruby
+export RUBYLIB=$RUBY_BINDINGS:$PROTON_HOME/proton-c/bindings/ruby/lib:$PROTON_HOME/tests/ruby
+
+# Perl
+export PERL5LIB=$PERL5LIB:$PERL_BINDINGS:$PROTON_HOME/proton-c/bindings/perl/lib
+
+# test applications
+export PATH="$PATH:$PROTON_BUILD/tests/tools/apps/c"
+export PATH="$PATH:$PROTON_HOME/tests/tools/apps/python"
+export PATH="$PATH:$PROTON_HOME/tests/python"
+
+# can the test harness use valgrind?
+if [[ -x "$(type -p valgrind)" ]] ; then
+    export VALGRIND=$(type -p valgrind)
+fi

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/pom.xml b/contrib/proton-hawtdispatch/pom.xml
new file mode 100644
index 0000000..0eaa171
--- /dev/null
+++ b/contrib/proton-hawtdispatch/pom.xml
@@ -0,0 +1,61 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+  
+  http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <parent>
+    <groupId>org.apache.qpid</groupId>
+    <artifactId>proton-project</artifactId>
+    <version>1.0-SNAPSHOT</version>
+    <relativePath>../..</relativePath>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>proton-hawtdispatch</artifactId>
+  <name>proton-hawtdispatch</name>
+
+  <properties>
+    <hawtbuf-version>1.9</hawtbuf-version>
+    <hawtdispatch-version>1.18</hawtdispatch-version>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.qpid</groupId>
+      <artifactId>proton-j</artifactId>
+      <version>${project.parent.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.fusesource.hawtdispatch</groupId>
+      <artifactId>hawtdispatch-transport</artifactId>
+      <version>${hawtdispatch-version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.fusesource.hawtbuf</groupId>
+      <artifactId>hawtbuf</artifactId>
+      <version>${hawtbuf-version}</version>
+      <scope>provided</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+  </build>
+  <scm>
+    <url>http://svn.apache.org/viewvc/qpid/proton/</url>
+  </scm>
+
+</project>

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpConnectOptions.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpConnectOptions.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpConnectOptions.java
new file mode 100644
index 0000000..3c3543d
--- /dev/null
+++ b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpConnectOptions.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.hawtdispatch.api;
+
+import org.fusesource.hawtdispatch.DispatchQueue;
+import org.fusesource.hawtdispatch.transport.TcpTransport;
+
+import javax.net.ssl.SSLContext;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.*;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class AmqpConnectOptions implements Cloneable {
+
+    private static final long KEEP_ALIVE = Long.parseLong(System.getProperty("amqp.thread.keep_alive", ""+1000));
+    private static final long STACK_SIZE = Long.parseLong(System.getProperty("amqp.thread.stack_size", ""+1024*512));
+    private static ThreadPoolExecutor blockingThreadPool;
+
+    public synchronized static ThreadPoolExecutor getBlockingThreadPool() {
+        if( blockingThreadPool == null ) {
+            blockingThreadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, KEEP_ALIVE, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
+                    public Thread newThread(Runnable r) {
+                        Thread rc = new Thread(null, r, "AMQP Task", STACK_SIZE);
+                        rc.setDaemon(true);
+                        return rc;
+                    }
+                }) {
+
+                    @Override
+                    public void shutdown() {
+                        // we don't ever shutdown since we are shared..
+                    }
+
+                    @Override
+                    public List<Runnable> shutdownNow() {
+                        // we don't ever shutdown since we are shared..
+                        return Collections.emptyList();
+                    }
+                };
+        }
+        return blockingThreadPool;
+    }
+    public synchronized static void setBlockingThreadPool(ThreadPoolExecutor pool) {
+        blockingThreadPool = pool;
+    }
+
+    private static final URI DEFAULT_HOST;
+    static {
+        try {
+            DEFAULT_HOST = new URI("tcp://localhost");
+        } catch (URISyntaxException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    URI host = DEFAULT_HOST;
+    URI localAddress;
+    SSLContext sslContext;
+    DispatchQueue dispatchQueue;
+    Executor blockingExecutor;
+    int maxReadRate;
+    int maxWriteRate;
+    int trafficClass = TcpTransport.IPTOS_THROUGHPUT;
+    boolean useLocalHost;
+    int receiveBufferSize = 1024*64;
+    int sendBufferSize = 1024*64;
+    String localContainerId;
+    String remoteContainerId;
+    String user;
+    String password;
+
+
+    @Override
+    public AmqpConnectOptions clone() {
+        try {
+            return (AmqpConnectOptions) super.clone();
+        } catch (CloneNotSupportedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public String getLocalContainerId() {
+        return localContainerId;
+    }
+
+    public void setLocalContainerId(String localContainerId) {
+        this.localContainerId = localContainerId;
+    }
+
+    public String getPassword() {
+        return password;
+    }
+
+    public void setPassword(String password) {
+        this.password = password;
+    }
+
+    public String getRemoteContainerId() {
+        return remoteContainerId;
+    }
+
+    public void setRemoteContainerId(String remoteContainerId) {
+        this.remoteContainerId = remoteContainerId;
+    }
+
+    public String getUser() {
+        return user;
+    }
+
+    public void setUser(String user) {
+        this.user = user;
+    }
+
+    public Executor getBlockingExecutor() {
+        return blockingExecutor;
+    }
+
+    public void setBlockingExecutor(Executor blockingExecutor) {
+        this.blockingExecutor = blockingExecutor;
+    }
+
+    public DispatchQueue getDispatchQueue() {
+        return dispatchQueue;
+    }
+
+    public void setDispatchQueue(DispatchQueue dispatchQueue) {
+        this.dispatchQueue = dispatchQueue;
+    }
+
+    public URI getLocalAddress() {
+        return localAddress;
+    }
+
+    public void setLocalAddress(String localAddress) throws URISyntaxException {
+        this.setLocalAddress(new URI(localAddress));
+    }
+    public void setLocalAddress(URI localAddress) {
+        this.localAddress = localAddress;
+    }
+
+    public int getMaxReadRate() {
+        return maxReadRate;
+    }
+
+    public void setMaxReadRate(int maxReadRate) {
+        this.maxReadRate = maxReadRate;
+    }
+
+    public int getMaxWriteRate() {
+        return maxWriteRate;
+    }
+
+    public void setMaxWriteRate(int maxWriteRate) {
+        this.maxWriteRate = maxWriteRate;
+    }
+
+    public int getReceiveBufferSize() {
+        return receiveBufferSize;
+    }
+
+    public void setReceiveBufferSize(int receiveBufferSize) {
+        this.receiveBufferSize = receiveBufferSize;
+    }
+
+    public URI getHost() {
+        return host;
+    }
+    public void setHost(String host, int port) throws URISyntaxException {
+        this.setHost(new URI("tcp://"+host+":"+port));
+    }
+    public void setHost(String host) throws URISyntaxException {
+        this.setHost(new URI(host));
+    }
+    public void setHost(URI host) {
+        this.host = host;
+    }
+
+    public int getSendBufferSize() {
+        return sendBufferSize;
+    }
+
+    public void setSendBufferSize(int sendBufferSize) {
+        this.sendBufferSize = sendBufferSize;
+    }
+
+    public SSLContext getSslContext() {
+        return sslContext;
+    }
+
+    public void setSslContext(SSLContext sslContext) {
+        this.sslContext = sslContext;
+    }
+
+    public int getTrafficClass() {
+        return trafficClass;
+    }
+
+    public void setTrafficClass(int trafficClass) {
+        this.trafficClass = trafficClass;
+    }
+
+    public boolean isUseLocalHost() {
+        return useLocalHost;
+    }
+
+    public void setUseLocalHost(boolean useLocalHost) {
+        this.useLocalHost = useLocalHost;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpConnection.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpConnection.java
new file mode 100644
index 0000000..b308209
--- /dev/null
+++ b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpConnection.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.api;
+
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.hawtdispatch.impl.AmqpListener;
+import org.apache.qpid.proton.hawtdispatch.impl.AmqpTransport;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Endpoint;
+import org.apache.qpid.proton.engine.ProtonJConnection;
+import org.apache.qpid.proton.engine.ProtonJSession;
+import org.apache.qpid.proton.engine.impl.ProtocolTracer;
+import org.fusesource.hawtdispatch.DispatchQueue;
+import org.fusesource.hawtdispatch.Task;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class AmqpConnection extends AmqpEndpointBase  {
+
+    AmqpTransport transport;
+    ProtonJConnection connection;
+    HashSet<AmqpSender> senders = new HashSet<AmqpSender>();
+    boolean closing = false;
+
+    public static AmqpConnection connect(AmqpConnectOptions options) {
+        return new AmqpConnection(options);
+    }
+
+    private AmqpConnection(AmqpConnectOptions options) {
+        transport = AmqpTransport.connect(options);
+        transport.setListener(new AmqpListener() {
+            @Override
+            public void processDelivery(Delivery delivery) {
+                Attachment attachment = (Attachment) getTransport().context(delivery.getLink()).getAttachment();
+                AmqpLink link = (AmqpLink) attachment.endpoint();
+                link.processDelivery(delivery);
+            }
+
+            @Override
+            public void processRefill() {
+                for(AmqpSender sender: new ArrayList<AmqpSender>(senders)) {
+                    sender.pumpDeliveries();
+                }
+                pumpOut();
+            }
+
+            public void processTransportFailure(final IOException e) {
+            }
+        });
+        connection = transport.connection();
+        connection.open();
+        attach();
+    }
+
+    public void waitForConnected() throws Exception {
+        assertNotOnDispatchQueue();
+        getConnectedFuture().await();
+    }
+
+    public Future<Void> getConnectedFuture() {
+        final Promise<Void> rc = new Promise<Void>();
+        queue().execute(new Task() {
+            @Override
+            public void run() {
+                onConnected(rc);
+            }
+        });
+        return rc;
+    }
+
+    public void onConnected(Callback<Void> cb) {
+        transport.onTransportConnected(cb);
+    }
+
+    @Override
+    protected Endpoint getEndpoint() {
+        return connection;
+    }
+
+    @Override
+    protected AmqpConnection getConnection() {
+        return this;
+    }
+
+    @Override
+    protected AmqpEndpointBase getParent() {
+        return null;
+    }
+
+    public AmqpSession createSession() {
+        assertExecuting();
+        ProtonJSession session = connection.session();
+        session.open();
+        pumpOut();
+        return new AmqpSession(this, session);
+    }
+
+    public int getMaxSessions() {
+        return connection.getMaxChannels();
+    }
+
+    public void disconnect() {
+        closing = true;
+        transport.disconnect();
+    }
+
+    public void waitForDisconnected() throws Exception {
+        assertNotOnDispatchQueue();
+        getDisconnectedFuture().await();
+    }
+
+    public Future<Void> getDisconnectedFuture() {
+        final Promise<Void> rc = new Promise<Void>();
+        queue().execute(new Task() {
+            @Override
+            public void run() {
+                onDisconnected(rc);
+            }
+        });
+        return rc;
+    }
+
+    public void onDisconnected(Callback<Void> cb) {
+        transport.onTransportDisconnected(cb);
+    }
+
+    public TransportState getTransportState() {
+        return transport.getState();
+    }
+
+    public Throwable getTransportFailure() {
+        return transport.getFailure();
+    }
+
+    public Future<Throwable> getTransportFailureFuture() {
+        final Promise<Throwable> rc = new Promise<Throwable>();
+        queue().execute(new Task() {
+            @Override
+            public void run() {
+                onTransportFailure(rc);
+            }
+        });
+        return rc;
+    }
+
+    public void onTransportFailure(Callback<Throwable> cb) {
+        transport.onTransportFailure(cb);
+    }
+
+    @Override
+    public DispatchQueue queue() {
+        return super.queue();
+    }
+
+    public void setProtocolTracer(ProtocolTracer protocolTracer) {
+        transport.setProtocolTracer(protocolTracer);
+    }
+
+    public ProtocolTracer getProtocolTracer() {
+        return transport.getProtocolTracer();
+    }
+
+    /**
+     * Once the remote end, closes the transport is disconnected.
+     */
+    @Override
+    public void close() {
+        super.close();
+        onRemoteClose(new Callback<ErrorCondition>() {
+            @Override
+            public void onSuccess(ErrorCondition value) {
+                disconnect();
+            }
+
+            @Override
+            public void onFailure(Throwable value) {
+                disconnect();
+            }
+        });
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpDeliveryListener.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpDeliveryListener.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpDeliveryListener.java
new file mode 100644
index 0000000..1e9f4e2
--- /dev/null
+++ b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpDeliveryListener.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.api;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface AmqpDeliveryListener {
+
+    /**
+     * Caller should suspend/resume the AmqpReceiver to
+     * flow control the delivery of messages.
+     *
+     * @param delivery
+     */
+    void onMessageDelivery(MessageDelivery delivery);
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpEndpointBase.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpEndpointBase.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpEndpointBase.java
new file mode 100644
index 0000000..4ebd8e2
--- /dev/null
+++ b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpEndpointBase.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.api;
+
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.hawtdispatch.impl.*;
+import org.apache.qpid.proton.engine.Endpoint;
+import org.apache.qpid.proton.engine.EndpointState;
+import org.fusesource.hawtdispatch.Dispatch;
+import org.fusesource.hawtdispatch.DispatchQueue;
+import org.fusesource.hawtdispatch.Task;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract class AmqpEndpointBase extends WatchBase {
+    abstract protected Endpoint getEndpoint();
+    abstract protected AmqpEndpointBase getParent();
+
+    protected AmqpConnection getConnection() {
+        return getParent().getConnection();
+    }
+
+    protected AmqpTransport getTransport() {
+        return getConnection().transport;
+    }
+
+    protected DispatchQueue queue() {
+        return getTransport().queue();
+    }
+
+    protected void assertExecuting() {
+        getTransport().assertExecuting();
+    }
+
+    public void waitForRemoteOpen() throws Exception {
+        assertNotOnDispatchQueue();
+        getRemoteOpenFuture().await();
+    }
+
+    public Future<Void> getRemoteOpenFuture() {
+        final Promise<Void> rc = new Promise<Void>();
+        queue().execute(new Task() {
+            @Override
+            public void run() {
+                onRemoteOpen(rc);
+            }
+        });
+        return rc;
+    }
+
+    public void onRemoteOpen(final Callback<Void> cb) {
+        addWatch(new Watch() {
+            @Override
+            public boolean execute() {
+                switch (getEndpoint().getRemoteState()) {
+                    case ACTIVE:
+                        cb.onSuccess(null);
+                        return true;
+                    case CLOSED:
+                        cb.onFailure(Support.illegalState("closed"));
+                        return true;
+                }
+                return false;
+            }
+        });
+    }
+
+    public ErrorCondition waitForRemoteClose() throws Exception {
+        assertNotOnDispatchQueue();
+        return getRemoteCloseFuture().await();
+    }
+
+    public Future<ErrorCondition> getRemoteCloseFuture() {
+        final Promise<ErrorCondition> rc = new Promise<ErrorCondition>();
+        queue().execute(new Task() {
+            @Override
+            public void run() {
+                onRemoteClose(rc);
+            }
+        });
+        return rc;
+    }
+
+    public void onRemoteClose(final Callback<ErrorCondition> cb) {
+        addWatch(new Watch() {
+            @Override
+            public boolean execute() {
+                if (getEndpoint().getRemoteState() == EndpointState.CLOSED) {
+                    cb.onSuccess(getEndpoint().getRemoteCondition());
+                    return true;
+                }
+                return false;
+            }
+        });
+    }
+
+    public void close() {
+        getEndpoint().close();
+        pumpOut();
+    }
+
+    public EndpointState getRemoteState() {
+        return getEndpoint().getRemoteState();
+    }
+
+    public ErrorCondition getRemoteError() {
+        return getEndpoint().getRemoteCondition();
+    }
+
+    static protected ErrorCondition toError(Throwable value) {
+        return new ErrorCondition(Symbol.valueOf("error"), value.toString());
+    }
+
+    class Attachment extends Task {
+        AmqpEndpointBase endpoint() {
+            return AmqpEndpointBase.this;
+        }
+
+        @Override
+        public void run() {
+            fireWatches();
+        }
+    }
+
+    protected void attach() {
+        getTransport().context(getEndpoint()).setAttachment(new Attachment());
+    }
+
+    protected void defer(Defer defer) {
+        getTransport().defer(defer);
+    }
+
+    protected void pumpOut() {
+        getTransport().pumpOut();
+    }
+
+    static protected void assertNotOnDispatchQueue() {
+        assert Dispatch.getCurrentQueue()==null : "Not allowed to be called when executing on a dispatch queue";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpLink.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpLink.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpLink.java
new file mode 100644
index 0000000..dd6f32e
--- /dev/null
+++ b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpLink.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.api;
+
+import org.apache.qpid.proton.engine.Delivery;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract public class AmqpLink extends AmqpEndpointBase {
+    abstract protected void processDelivery(Delivery delivery);
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpReceiver.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpReceiver.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpReceiver.java
new file mode 100644
index 0000000..644f72a
--- /dev/null
+++ b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpReceiver.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.api;
+
+import org.apache.qpid.proton.hawtdispatch.impl.Defer;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.amqp.messaging.Accepted;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtbuf.ByteArrayOutputStream;
+
+import java.util.LinkedList;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class AmqpReceiver extends AmqpLink {
+
+    final AmqpSession parent;
+    final Receiver receiver;
+
+    public AmqpReceiver(AmqpSession parent, Receiver receiver2, QoS qos) {
+        this.parent = parent;
+        this.receiver = receiver2;
+        attach();
+    }
+
+    @Override
+    protected Receiver getEndpoint() {
+        return receiver;
+    }
+    @Override
+    protected AmqpSession getParent() {
+        return parent;
+    }
+
+    ByteArrayOutputStream current = new ByteArrayOutputStream();
+
+    @Override
+    protected void processDelivery(Delivery delivery) {
+        if( !delivery.isReadable() ) {
+            System.out.println("it was not readable!");
+            return;
+        }
+
+        if( current==null ) {
+            current = new ByteArrayOutputStream();
+        }
+
+        int count;
+        byte data[] = new byte[1024*4];
+        while( (count = receiver.recv(data, 0, data.length)) > 0 ) {
+            current.write(data, 0, count);
+        }
+
+        // Expecting more deliveries..
+        if( count == 0 ) {
+            return;
+        }
+
+        receiver.advance();
+        Buffer buffer = current.toBuffer();
+        current = null;
+        onMessage(delivery, buffer);
+
+    }
+
+    LinkedList<MessageDelivery> inbound = new LinkedList<MessageDelivery>();
+
+    protected void onMessage(Delivery delivery, Buffer buffer) {
+        MessageDelivery md = new MessageDelivery(buffer) {
+            @Override
+            AmqpLink link() {
+                return AmqpReceiver.this;
+            }
+
+            @Override
+            public void settle() {
+                if( !delivery.isSettled() ) {
+                    delivery.disposition(new Accepted());
+                    delivery.settle();
+                }
+                drain();
+            }
+        };
+        md.delivery = delivery;
+        delivery.setContext(md);
+        inbound.add(md);
+        drainInbound();
+    }
+
+    public void drain() {
+        defer(deferedDrain);
+    }
+
+    Defer deferedDrain = new Defer(){
+        public void run() {
+            drainInbound();
+        }
+    };
+    int resumed = 0;
+
+    public void resume() {
+        resumed++;
+    }
+    public void suspend() {
+        resumed--;
+    }
+
+    AmqpDeliveryListener deliveryListener;
+    private void drainInbound() {
+        while( deliveryListener!=null && !inbound.isEmpty() && resumed>0) {
+            deliveryListener.onMessageDelivery(inbound.removeFirst());
+            receiver.flow(1);
+        }
+    }
+
+    public AmqpDeliveryListener getDeliveryListener() {
+        return deliveryListener;
+    }
+
+    public void setDeliveryListener(AmqpDeliveryListener deliveryListener) {
+        this.deliveryListener = deliveryListener;
+        drainInbound();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSender.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSender.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSender.java
new file mode 100644
index 0000000..9a672d5
--- /dev/null
+++ b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSender.java
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.api;
+
+import org.apache.qpid.proton.hawtdispatch.impl.Defer;
+import org.apache.qpid.proton.hawtdispatch.impl.Watch;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.message.Message;
+import org.apache.qpid.proton.amqp.messaging.Accepted;
+import org.apache.qpid.proton.amqp.messaging.Modified;
+import org.apache.qpid.proton.amqp.messaging.Rejected;
+import org.apache.qpid.proton.amqp.messaging.Released;
+import org.apache.qpid.proton.amqp.transport.DeliveryState;
+import org.fusesource.hawtbuf.Buffer;
+
+import java.io.UnsupportedEncodingException;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class AmqpSender extends AmqpLink {
+
+    private  byte[] EMPTY_BYTE_ARRAY = new byte[]{};
+    long nextTagId = 0;
+    HashSet<byte[]> tagCache = new HashSet<byte[]>();
+
+    final AmqpSession parent;
+    private final QoS qos;
+    final Sender sender;
+
+    public AmqpSender(AmqpSession parent, Sender sender2, QoS qos) {
+        this.parent = parent;
+        this.sender = sender2;
+        this.qos = qos;
+        attach();
+        getConnection().senders.add(this);
+    }
+
+    @Override
+    public void close() {
+        super.close();
+        getConnection().senders.remove(this);
+    }
+
+    @Override
+    protected Sender getEndpoint() {
+        return sender;
+    }
+
+    @Override
+    protected AmqpSession getParent() {
+        return parent;
+    }
+
+    final LinkedList<MessageDelivery> outbound = new LinkedList<MessageDelivery>();
+    long outboundBufferSize;
+
+    public MessageDelivery send(Message message) {
+        assertExecuting();
+        MessageDelivery rc = new MessageDelivery(message) {
+            @Override
+            AmqpLink link() {
+                return AmqpSender.this;
+            }
+
+            @Override
+            public void redeliver(boolean incrementDeliveryCounter) {
+                super.redeliver(incrementDeliveryCounter);
+                outbound.add(this);
+                outboundBufferSize += initialSize;
+                defer(deferedPumpDeliveries);
+            }
+        };
+        outbound.add(rc);
+        outboundBufferSize += rc.initialSize;
+        pumpDeliveries();
+        pumpOut();
+        return rc;
+    }
+
+    Buffer currentBuffer;
+    Delivery currentDelivery;
+
+    Defer deferedPumpDeliveries = new Defer() {
+        public void run() {
+            pumpDeliveries();
+        }
+    };
+
+    public long getOverflowBufferSize() {
+        return outboundBufferSize;
+    }
+
+    protected void pumpDeliveries() {
+        assertExecuting();
+        try {
+            while(true) {
+                while( currentBuffer !=null ) {
+                    if( sender.getCredit() > 0 ) {
+                        int sent = sender.send(currentBuffer.data, currentBuffer.offset, currentBuffer.length);
+                        currentBuffer.moveHead(sent);
+                        if( currentBuffer.length == 0 ) {
+                            Delivery current = currentDelivery;
+                            MessageDelivery md = (MessageDelivery) current.getContext();
+                            currentBuffer = null;
+                            currentDelivery = null;
+                            if( qos == QoS.AT_MOST_ONCE ) {
+                                current.settle();
+                            } else {
+                                sender.advance();
+                            }
+                            md.fireWatches();
+                        }
+                    } else {
+                        return;
+                    }
+                }
+
+                if( outbound.isEmpty() ) {
+                    return;
+                }
+
+                final MessageDelivery md = outbound.removeFirst();
+                outboundBufferSize -= md.initialSize;
+                currentBuffer = md.encoded();
+                if( qos == QoS.AT_MOST_ONCE ) {
+                    currentDelivery = sender.delivery(EMPTY_BYTE_ARRAY, 0, 0);
+                } else {
+                    final byte[] tag = nextTag();
+                    currentDelivery = sender.delivery(tag, 0, tag.length);
+                }
+                md.delivery = currentDelivery;
+                currentDelivery.setContext(md);
+            }
+        } finally {
+            fireWatches();
+        }
+    }
+
+    @Override
+    protected void processDelivery(Delivery delivery) {
+        final MessageDelivery md  = (MessageDelivery) delivery.getContext();
+        if( delivery.remotelySettled() ) {
+            if( delivery.getTag().length > 0 ) {
+                checkinTag(delivery.getTag());
+            }
+
+            final DeliveryState state = delivery.getRemoteState();
+            if( state==null || state instanceof Accepted) {
+                if( !delivery.remotelySettled() ) {
+                    delivery.disposition(new Accepted());
+                }
+            } else if( state instanceof Rejected) {
+                // re-deliver /w incremented delivery counter.
+                md.delivery = null;
+                md.incrementDeliveryCount();
+                outbound.addLast(md);
+            } else if( state instanceof Released) {
+                // re-deliver && don't increment the counter.
+                md.delivery = null;
+                outbound.addLast(md);
+            } else if( state instanceof Modified) {
+                Modified modified = (Modified) state;
+                if ( modified.getDeliveryFailed() ) {
+                  // increment delivery counter..
+                  md.incrementDeliveryCount();
+                }
+            }
+            delivery.settle();
+        }
+        md.fireWatches();
+    }
+
+    byte[] nextTag() {
+        byte[] rc;
+        if (tagCache != null && !tagCache.isEmpty()) {
+            final Iterator<byte[]> iterator = tagCache.iterator();
+            rc = iterator.next();
+            iterator.remove();
+        } else {
+            try {
+                rc = Long.toHexString(nextTagId++).getBytes("UTF-8");
+            } catch (UnsupportedEncodingException e) {
+                throw new RuntimeException(e);
+            }
+        }
+        return rc;
+    }
+
+    void checkinTag(byte[] data) {
+        if( tagCache.size() < 1024 ) {
+            tagCache.add(data);
+        }
+    }
+
+    public void onOverflowBufferDrained(final Callback<Void> cb) {
+        addWatch(new Watch() {
+            @Override
+            public boolean execute() {
+                if (outboundBufferSize==0) {
+                    cb.onSuccess(null);
+                    return true;
+                }
+                return false;
+            }
+        });
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSession.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSession.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSession.java
new file mode 100644
index 0000000..b25a1b7
--- /dev/null
+++ b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSession.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.api;
+
+import org.apache.qpid.proton.amqp.messaging.Section;
+import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
+import org.apache.qpid.proton.engine.Endpoint;
+import org.apache.qpid.proton.engine.Link;
+import org.apache.qpid.proton.engine.ProtonJSession;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.message.Message;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.messaging.*;
+import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
+
+import java.util.UUID;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class AmqpSession extends AmqpEndpointBase {
+
+    final AmqpConnection parent;
+    final ProtonJSession session;
+
+
+    public AmqpSession(AmqpConnection parent, ProtonJSession session) {
+        this.parent = parent;
+        this.session = session;
+        attach();
+    }
+
+    @Override
+    protected Endpoint getEndpoint() {
+        return session;
+    }
+
+    @Override
+    protected AmqpConnection getParent() {
+        return parent;
+    }
+
+    public AmqpSender createSender(Target target) {
+        return createSender(target, QoS.AT_LEAST_ONCE);
+    }
+
+    public AmqpSender createSender(Target target, QoS qos) {
+        return createSender(target, qos, UUID.randomUUID().toString());
+    }
+
+    public AmqpSender createSender(Target target, QoS qos, String name) {
+        assertExecuting();
+        Sender sender = session.sender(name);
+        attach();
+//        Source source = new Source();
+//        source.setAddress(UUID.randomUUID().toString());
+//        sender.setSource(source);
+        sender.setTarget(target);
+        configureQos(sender, qos);
+        sender.open();
+        pumpOut();
+        return new AmqpSender(this, sender, qos);
+    }
+
+    public AmqpReceiver createReceiver(Source source) {
+        return createReceiver(source, QoS.AT_LEAST_ONCE);
+    }
+
+    public AmqpReceiver createReceiver(Source source, QoS qos) {
+        return createReceiver(source, qos, 100);
+    }
+
+    public AmqpReceiver createReceiver(Source source, QoS qos, int prefetch) {
+        return createReceiver(source, qos, prefetch,  UUID.randomUUID().toString());
+    }
+
+    public AmqpReceiver createReceiver(Source source, QoS qos, int prefetch, String name) {
+        assertExecuting();
+        Receiver receiver = session.receiver(name);
+        receiver.setSource(source);
+//        Target target = new Target();
+//        target.setAddress(UUID.randomUUID().toString());
+//        receiver.setTarget(target);
+        receiver.flow(prefetch);
+        configureQos(receiver, qos);
+        receiver.open();
+        pumpOut();
+        return new AmqpReceiver(this, receiver, qos);
+    }
+
+    private void configureQos(Link link, QoS qos) {
+        switch (qos) {
+            case AT_MOST_ONCE:
+                link.setSenderSettleMode(SenderSettleMode.SETTLED);
+                link.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+                break;
+            case AT_LEAST_ONCE:
+                link.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+                link.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+                break;
+            case EXACTLY_ONCE:
+                link.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+                link.setReceiverSettleMode(ReceiverSettleMode.SECOND);
+                break;
+        }
+    }
+
+    public Message createTextMessage(String value) {
+        Message msg = Message.Factory.create();
+        Section body = new AmqpValue(value);
+        msg.setBody(body);
+        return msg;
+    }
+
+    public Message createBinaryMessage(byte value[]) {
+        return createBinaryMessage(value, 0, value.length);
+    }
+
+    public Message createBinaryMessage(byte value[], int offset, int len) {
+        Message msg = Message.Factory.create();
+        Data body = new Data(new Binary(value, offset,len));
+        msg.setBody(body);
+        return msg;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Callback.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Callback.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Callback.java
new file mode 100644
index 0000000..89fbdd1
--- /dev/null
+++ b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Callback.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.hawtdispatch.api;
+
+/**
+ * <p>
+ * Function Result that carries one value.
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface Callback<T> {
+    public void onSuccess(T value);
+    public void onFailure(Throwable value);
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/ChainedCallback.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/ChainedCallback.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/ChainedCallback.java
new file mode 100644
index 0000000..e53f512
--- /dev/null
+++ b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/ChainedCallback.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.hawtdispatch.api;
+
+/**
+ * <p>
+ * Function Result that carries one value.
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract public class ChainedCallback<In,Out> implements Callback<In> {
+
+    public final Callback<Out> next;
+
+    public ChainedCallback(Callback<Out> next) {
+        this.next = next;
+    }
+
+    public void onFailure(Throwable value) {
+        next.onFailure(value);
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/DeliveryAttachment.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/DeliveryAttachment.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/DeliveryAttachment.java
new file mode 100644
index 0000000..290076f
--- /dev/null
+++ b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/DeliveryAttachment.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.api;
+
+import org.apache.qpid.proton.engine.Delivery;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract public class DeliveryAttachment {
+    abstract void processDelivery(Delivery delivery);
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Future.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Future.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Future.java
new file mode 100644
index 0000000..4a9eb5e
--- /dev/null
+++ b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Future.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.hawtdispatch.api;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * <p>A simplified Future function results interface.</p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface Future<T> {
+    T await() throws Exception;
+    T await(long amount, TimeUnit unit) throws Exception;
+    void then(Callback<T> callback);
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/MessageDelivery.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/MessageDelivery.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/MessageDelivery.java
new file mode 100644
index 0000000..b115557
--- /dev/null
+++ b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/MessageDelivery.java
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.api;
+
+import org.apache.qpid.proton.amqp.transport.DeliveryState;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.hawtdispatch.impl.Watch;
+import org.apache.qpid.proton.hawtdispatch.impl.WatchBase;
+import org.apache.qpid.proton.message.Message;
+import org.apache.qpid.proton.message.ProtonJMessage;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtdispatch.Task;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public abstract class MessageDelivery extends WatchBase {
+
+    final int initialSize;
+    private Message message;
+    private Buffer encoded;
+    public Delivery delivery;
+    private int sizeHint = 32;
+
+    static Buffer encode(Message message, int sizeHint) {
+        byte[] buffer = new byte[sizeHint];
+        int size = ((ProtonJMessage)message).encode2(buffer, 0, sizeHint);
+        if( size > sizeHint ) {
+            buffer = new byte[size];
+            size = message.encode(buffer, 0, size);
+        }
+        return new Buffer(buffer, 0, size);
+    }
+
+    static Message decode(Buffer buffer) {
+        Message msg = Message.Factory.create();
+        int offset = buffer.offset;
+        int len = buffer.length;
+        while( len > 0 ) {
+            int decoded = msg.decode(buffer.data, offset, len);
+            assert decoded > 0: "Make progress decoding the message";
+            offset += decoded;
+            len -= decoded;
+        }
+        return msg;
+    }
+
+    public MessageDelivery(Message message) {
+        this(message, encode(message, 32));
+    }
+
+    public MessageDelivery(Buffer encoded) {
+        this(null, encoded);
+    }
+
+    public MessageDelivery(Message message, Buffer encoded) {
+        this.message = message;
+        this.encoded = encoded;
+        sizeHint = this.encoded.length;
+        initialSize = sizeHint;
+    }
+
+    public Message getMessage() {
+        if( message == null ) {
+            message = decode(encoded);
+        }
+        return message;
+    }
+
+    public Buffer encoded() {
+        if( encoded == null ) {
+            encoded = encode(message, sizeHint);
+            sizeHint = encoded.length;
+        }
+        return encoded;
+    }
+
+    public boolean isSettled() {
+        return delivery!=null && delivery.isSettled();
+    }
+
+    public DeliveryState getRemoteState() {
+        return delivery==null ? null : delivery.getRemoteState();
+    }
+
+    public DeliveryState getLocalState() {
+        return delivery==null ? null : delivery.getLocalState();
+    }
+
+    public void onEncoded(final Callback<Void> cb) {
+        addWatch(new Watch() {
+            @Override
+            public boolean execute() {
+                if( delivery!=null ) {
+                    cb.onSuccess(null);
+                    return true;
+                }
+                return false;
+            }
+        });
+    }
+
+    /**
+     * @return the remote delivery state when it changes.
+     * @throws Exception
+     */
+    public DeliveryState getRemoteStateChange() throws Exception {
+        AmqpEndpointBase.assertNotOnDispatchQueue();
+        return getRemoteStateChangeFuture().await();
+    }
+
+    /**
+     * @return the future remote delivery state when it changes.
+     */
+    public Future<DeliveryState> getRemoteStateChangeFuture() {
+        final Promise<DeliveryState> rc = new Promise<DeliveryState>();
+        link().queue().execute(new Task() {
+            @Override
+            public void run() {
+                onRemoteStateChange(rc);
+            }
+        });
+        return rc;
+    }
+
+    abstract AmqpLink link();
+
+    boolean watchingRemoteStateChange;
+    public void onRemoteStateChange(final Callback<DeliveryState> cb) {
+        watchingRemoteStateChange = true;
+        final DeliveryState original = delivery.getRemoteState();
+        addWatch(new Watch() {
+            @Override
+            public boolean execute() {
+                if (original == null) {
+                    if( delivery.getRemoteState()!=null ) {
+                        cb.onSuccess(delivery.getRemoteState());
+                        watchingRemoteStateChange = false;
+                        return true;
+                    }
+                } else {
+                    if( !original.equals(delivery.getRemoteState()) ) {
+                        cb.onSuccess(delivery.getRemoteState());
+                        watchingRemoteStateChange = false;
+                        return true;
+                    }
+                }
+                return false;
+            }
+        });
+    }
+
+    /**
+     * @return the remote delivery state once settled.
+     * @throws Exception
+     */
+    public DeliveryState getSettle() throws Exception {
+        AmqpEndpointBase.assertNotOnDispatchQueue();
+        return getSettleFuture().await();
+    }
+
+    /**
+     * @return the future remote delivery state once the delivery is settled.
+     */
+    public Future<DeliveryState> getSettleFuture() {
+        final Promise<DeliveryState> rc = new Promise<DeliveryState>();
+        link().queue().execute(new Task() {
+            @Override
+            public void run() {
+                onSettle(rc);
+            }
+        });
+        return rc;
+    }
+
+    public void onSettle(final Callback<DeliveryState> cb) {
+        addWatch(new Watch() {
+            @Override
+            public boolean execute() {
+                if( delivery!=null && delivery.isSettled() ) {
+                    cb.onSuccess(delivery.getRemoteState());
+                    return true;
+                }
+                return false;
+            }
+        });
+    }
+
+    @Override
+    protected void fireWatches() {
+        super.fireWatches();
+    }
+
+    void incrementDeliveryCount() {
+        Message msg = getMessage();
+        msg.setDeliveryCount(msg.getDeliveryCount()+1);
+        encoded = null;
+    }
+
+    public void redeliver(boolean incrementDeliveryCounter) {
+        if( incrementDeliveryCounter ) {
+            incrementDeliveryCount();
+        }
+    }
+
+    public void settle() {
+        if( !delivery.isSettled() ) {
+            delivery.settle();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Promise.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Promise.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Promise.java
new file mode 100644
index 0000000..b914b44
--- /dev/null
+++ b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Promise.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.hawtdispatch.api;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class Promise<T> implements Callback<T>, Future<T> {
+
+    private final CountDownLatch latch = new CountDownLatch(1);
+    Callback<T> next;
+    Throwable error;
+    T value;
+
+    public void onFailure(Throwable value) {
+        Callback<T> callback = null;
+        synchronized(this)  {
+            error = value;
+            latch.countDown();
+            callback = next;
+        }
+        if( callback!=null ) {
+            callback.onFailure(value);
+        }
+    }
+
+    public void onSuccess(T value) {
+        Callback<T> callback = null;
+        synchronized(this)  {
+            this.value = value;
+            latch.countDown();
+            callback = next;
+        }
+        if( callback!=null ) {
+            callback.onSuccess(value);
+        }
+    }
+
+    public void then(Callback<T> callback) {
+        boolean fire = false;
+        synchronized(this)  {
+            next = callback;
+            if( latch.getCount() == 0 ) {
+                fire = true;
+            }
+        }
+        if( fire ) {
+            if( error!=null ) {
+                callback.onFailure(error);
+            } else {
+                callback.onSuccess(value);
+            }
+        }
+    }
+
+    public T await(long amount, TimeUnit unit) throws Exception {
+        if( latch.await(amount, unit) ) {
+            return get();
+        } else {
+            throw new TimeoutException();
+        }
+    }
+
+    public T await() throws Exception {
+        latch.await();
+        return get();
+    }
+
+    private T get() throws Exception {
+        Throwable e = error;
+        if( e !=null ) {
+            if( e instanceof RuntimeException ) {
+                throw (RuntimeException) e;
+            } else if( e instanceof Exception) {
+                throw (Exception) e;
+            } else if( e instanceof Error) {
+                throw (Error) e;
+            } else {
+                // don't expect to hit this case.
+                throw new RuntimeException(e);
+            }
+        }
+        return value;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/QoS.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/QoS.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/QoS.java
new file mode 100644
index 0000000..5b4a8dc
--- /dev/null
+++ b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/QoS.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.hawtdispatch.api;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public enum QoS {
+    AT_MOST_ONCE,
+    AT_LEAST_ONCE,
+    EXACTLY_ONCE
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org