You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2014/11/28 14:50:03 UTC
[25/51] [abbrv] qpid-proton git commit: Update with merge of latest
proton codebase and checked against latest emscripten incoming branch
Update with merge of latest proton codebase and checked against latest emscripten incoming branch
git-svn-id: https://svn.apache.org/repos/asf/qpid/proton/branches/fadams-javascript-binding@1622849 13f79535-47bb-0310-9956-ffa450edef68
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/1c2f4894
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/1c2f4894
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/1c2f4894
Branch: refs/heads/master
Commit: 1c2f4894270775269abb6e8303e75684331a3212
Parents: 4a78327
Author: fadams <fa...@unknown>
Authored: Sat Sep 6 11:23:10 2014 +0000
Committer: fadams <fa...@unknown>
Committed: Sat Sep 6 11:23:10 2014 +0000
----------------------------------------------------------------------
CMakeLists.txt | 14 +
DEVELOPERS | 25 +-
config.bat.in | 66 +
config.sh | 79 --
config.sh.in | 61 +
contrib/proton-hawtdispatch/pom.xml | 61 +
.../hawtdispatch/api/AmqpConnectOptions.java | 228 ++++
.../proton/hawtdispatch/api/AmqpConnection.java | 201 ++++
.../hawtdispatch/api/AmqpDeliveryListener.java | 32 +
.../hawtdispatch/api/AmqpEndpointBase.java | 158 +++
.../qpid/proton/hawtdispatch/api/AmqpLink.java | 27 +
.../proton/hawtdispatch/api/AmqpReceiver.java | 141 +++
.../proton/hawtdispatch/api/AmqpSender.java | 227 ++++
.../proton/hawtdispatch/api/AmqpSession.java | 141 +++
.../qpid/proton/hawtdispatch/api/Callback.java | 29 +
.../hawtdispatch/api/ChainedCallback.java | 37 +
.../hawtdispatch/api/DeliveryAttachment.java | 27 +
.../qpid/proton/hawtdispatch/api/Future.java | 31 +
.../hawtdispatch/api/MessageDelivery.java | 226 ++++
.../qpid/proton/hawtdispatch/api/Promise.java | 107 ++
.../qpid/proton/hawtdispatch/api/QoS.java | 26 +
.../proton/hawtdispatch/api/TransportState.java | 29 +
.../proton/hawtdispatch/impl/AmqpHeader.java | 85 ++
.../proton/hawtdispatch/impl/AmqpListener.java | 71 ++
.../hawtdispatch/impl/AmqpProtocolCodec.java | 109 ++
.../proton/hawtdispatch/impl/AmqpTransport.java | 587 +++++++++
.../qpid/proton/hawtdispatch/impl/Defer.java | 27 +
.../hawtdispatch/impl/EndpointContext.java | 76 ++
.../qpid/proton/hawtdispatch/impl/Support.java | 41 +
.../qpid/proton/hawtdispatch/impl/Watch.java | 26 +
.../proton/hawtdispatch/impl/WatchBase.java | 54 +
.../proton/hawtdispatch/api/SampleTest.java | 292 +++++
.../hawtdispatch/test/MessengerServer.java | 135 +++
contrib/proton-jms/pom.xml | 50 +
.../jms/AMQPNativeInboundTransformer.java | 40 +
.../jms/AMQPNativeOutboundTransformer.java | 103 ++
.../proton/jms/AMQPRawInboundTransformer.java | 47 +
.../proton/jms/AutoOutboundTransformer.java | 46 +
.../apache/qpid/proton/jms/EncodedMessage.java | 75 ++
.../qpid/proton/jms/InboundTransformer.java | 314 +++++
.../jms/JMSMappingInboundTransformer.java | 102 ++
.../jms/JMSMappingOutboundTransformer.java | 246 ++++
.../org/apache/qpid/proton/jms/JMSVendor.java | 45 +
.../qpid/proton/jms/OutboundTransformer.java | 52 +
examples/CMakeLists.txt | 1 +
examples/messenger/java/recv | 2 +-
examples/messenger/java/send | 2 +-
.../org/apache/qpid/proton/example/Send.java | 10 +-
examples/messenger/javascript/send.html | 12 +-
examples/messenger/perl/recv.pl | 34 +-
examples/messenger/perl/send.pl | 16 +-
examples/messenger/perl/server.pl | 6 +-
proton-c/CMakeLists.txt | 49 +-
proton-c/bindings/CMakeLists.txt | 2 +
proton-c/bindings/javascript/CMakeLists.txt | 10 +-
proton-c/bindings/perl/CMakeLists.txt | 4 +-
.../bindings/perl/lib/qpid/proton/Constants.pm | 7 +-
proton-c/bindings/perl/lib/qpid/proton/Data.pm | 93 +-
.../bindings/perl/lib/qpid/proton/Message.pm | 28 +-
proton-c/bindings/perl/lib/qpid/proton/utils.pm | 31 +
proton-c/bindings/perl/lib/qpid_proton.pm | 1 +
proton-c/bindings/perl/perl.i | 7 -
proton-c/bindings/php/CMakeLists.txt | 3 +
proton-c/bindings/php/proton.php | 2 +-
proton-c/bindings/python/CMakeLists.txt | 90 +-
proton-c/bindings/python/cproton.i | 379 ++++++
proton-c/bindings/python/proton.py | 242 ++--
proton-c/bindings/python/python.i | 378 ------
proton-c/bindings/python/setup.py.in | 107 ++
proton-c/bindings/ruby/CMakeLists.txt | 4 +-
proton-c/bindings/ruby/lib/qpid_proton.rb | 1 +
.../bindings/ruby/lib/qpid_proton/version.rb | 29 +
proton-c/bindings/ruby/qpid_proton.gemspec | 1 +
proton-c/bindings/ruby/ruby.i | 1 -
proton-c/docs/man/CMakeLists.txt | 2 +-
proton-c/docs/man/proton-dump.1 | 19 +
proton-c/include/proton/buffer.h | 6 +
proton-c/include/proton/cproton.i | 28 +-
proton-c/include/proton/event.h | 153 ++-
proton-c/include/proton/io.h | 2 +
proton-c/include/proton/object.h | 11 +-
proton-c/include/proton/sasl.h | 15 +-
proton-c/include/proton/selector.h | 4 +-
proton-c/include/proton/terminus.h | 8 +-
proton-c/include/proton/transport.h | 13 +-
proton-c/include/proton/types.h | 6 +-
proton-c/src/buffer.c | 12 +
proton-c/src/codec/codec.c | 48 +-
proton-c/src/codec/data.h | 34 +-
proton-c/src/codec/decoder.c | 2 +-
proton-c/src/codec/encoder.c | 2 +-
proton-c/src/dispatch_actions.h | 45 +
proton-c/src/dispatcher/dispatcher.c | 63 +-
proton-c/src/dispatcher/dispatcher.h | 21 +-
proton-c/src/engine/engine-internal.h | 77 +-
proton-c/src/engine/engine.c | 186 +--
proton-c/src/engine/event.c | 249 ++--
proton-c/src/engine/event.h | 9 +-
proton-c/src/error.c | 2 +-
proton-c/src/message/message.c | 25 +-
proton-c/src/messenger/messenger.c | 137 ++-
proton-c/src/messenger/store.c | 22 +-
proton-c/src/messenger/subscription.c | 2 +-
proton-c/src/messenger/transform.c | 4 +-
proton-c/src/object/object.c | 104 +-
proton-c/src/parser.c | 2 +-
proton-c/src/platform.h | 2 +
proton-c/src/posix/driver.c | 24 +-
proton-c/src/posix/io.c | 12 +-
proton-c/src/posix/selector.c | 4 +-
proton-c/src/protocol.h.py | 98 +-
proton-c/src/proton-dump.c | 28 +
proton-c/src/proton.c | 4 +-
proton-c/src/sasl/sasl.c | 91 +-
proton-c/src/selectable.c | 2 +-
proton-c/src/ssl/openssl.c | 57 +-
proton-c/src/tests/object.c | 53 +-
proton-c/src/tests/parse-url.c | 7 +
proton-c/src/transport/transport.c | 366 +++---
proton-c/src/types.c | 14 +-
proton-c/src/util.c | 22 +-
proton-c/src/windows/io.c | 226 ++--
proton-c/src/windows/iocp.c | 1138 ++++++++++++++++++
proton-c/src/windows/iocp.h | 141 +++
proton-c/src/windows/selector.c | 325 +++--
proton-c/src/windows/write_pipeline.c | 312 +++++
.../java/org/apache/qpid/proton/Proton.java | 168 +--
.../org/apache/qpid/proton/ProtonFactory.java | 31 -
.../apache/qpid/proton/ProtonFactoryImpl.java | 28 -
.../apache/qpid/proton/ProtonFactoryLoader.java | 111 --
.../amqp/messaging/DeliveryAnnotations.java | 11 +-
.../amqp/messaging/MessageAnnotations.java | 8 +-
.../org/apache/qpid/proton/codec/Codec.java | 40 +
.../java/org/apache/qpid/proton/codec/Data.java | 10 +
.../apache/qpid/proton/codec/DataFactory.java | 28 -
.../qpid/proton/codec/impl/DataFactoryImpl.java | 35 -
.../org/apache/qpid/proton/driver/Driver.java | 10 +
.../qpid/proton/driver/DriverFactory.java | 29 -
.../qpid/proton/driver/impl/ConnectorImpl.java | 1 -
.../proton/driver/impl/DriverFactoryImpl.java | 35 -
.../qpid/proton/driver/impl/DriverImpl.java | 2 +-
.../apache/qpid/proton/engine/Collector.java | 8 +
.../apache/qpid/proton/engine/Connection.java | 11 +
.../org/apache/qpid/proton/engine/Engine.java | 60 +
.../qpid/proton/engine/EngineFactory.java | 29 -
.../org/apache/qpid/proton/engine/Event.java | 41 +-
.../org/apache/qpid/proton/engine/Sasl.java | 10 +-
.../apache/qpid/proton/engine/SslDomain.java | 10 +
.../qpid/proton/engine/SslPeerDetails.java | 12 +-
.../apache/qpid/proton/engine/Transport.java | 30 +-
.../qpid/proton/engine/impl/CollectorImpl.java | 42 +-
.../qpid/proton/engine/impl/ConnectionImpl.java | 67 +-
.../qpid/proton/engine/impl/EndpointImpl.java | 54 +-
.../proton/engine/impl/EngineFactoryImpl.java | 59 -
.../qpid/proton/engine/impl/EventImpl.java | 111 +-
.../qpid/proton/engine/impl/FrameHandler.java | 3 +-
.../qpid/proton/engine/impl/FrameParser.java | 58 +-
.../qpid/proton/engine/impl/FrameWriter.java | 20 +-
.../qpid/proton/engine/impl/LinkImpl.java | 36 +-
.../qpid/proton/engine/impl/ReceiverImpl.java | 7 +-
.../org/apache/qpid/proton/engine/impl/Ref.java | 46 +
.../qpid/proton/engine/impl/SaslImpl.java | 18 +-
.../qpid/proton/engine/impl/SenderImpl.java | 6 +-
.../qpid/proton/engine/impl/SessionImpl.java | 42 +-
.../proton/engine/impl/TransportFactory.java | 34 -
.../engine/impl/TransportFactoryImpl.java | 41 -
.../qpid/proton/engine/impl/TransportImpl.java | 298 +++--
.../qpid/proton/engine/impl/TransportLink.java | 28 +-
.../engine/impl/TransportOutputAdaptor.java | 66 +-
.../engine/impl/TransportOutputWriter.java | 4 +-
.../proton/engine/impl/TransportSession.java | 32 +-
.../impl/ssl/SimpleSslTransportWrapper.java | 33 +-
.../proton/engine/impl/ssl/SslDomainImpl.java | 1 -
.../engine/impl/ssl/SslPeerDetailsImpl.java | 1 -
.../org/apache/qpid/proton/message/Message.java | 23 +
.../qpid/proton/message/MessageFactory.java | 37 -
.../proton/message/impl/MessageFactoryImpl.java | 54 -
.../qpid/proton/message/impl/MessageImpl.java | 12 +-
.../apache/qpid/proton/messenger/Messenger.java | 14 +
.../qpid/proton/messenger/MessengerFactory.java | 28 -
.../messenger/impl/MessengerFactoryImpl.java | 42 -
.../proton/messenger/impl/MessengerImpl.java | 9 +-
proton-j/src/main/resources/cengine.py | 140 ++-
proton-j/src/main/resources/csasl.py | 10 +-
.../qpid/proton/ProtonFactoryLoaderTest.java | 129 --
.../proton/engine/impl/FrameParserTest.java | 16 +-
.../proton/engine/impl/TransportImplTest.java | 15 +
.../engine/impl/TransportOutputAdaptorTest.java | 4 +-
.../impl/ssl/SimpleSslTransportWrapperTest.java | 13 +-
.../DummyProtonCFactory.java | 29 -
.../DummyProtonFactory.java | 25 -
.../DummyProtonJFactory.java | 29 -
.../systemtests/ProtonEngineExampleTest.java | 34 +-
.../proton/systemtests/ProtonFactoryTest.java | 60 -
.../qpid/proton/systemtests/SimpleTest.java | 13 +-
.../systemtests/engine/ConnectionTest.java | 31 +-
.../engine/ProtonFactoryTestFixture.java | 54 -
.../systemtests/engine/TransportTest.java | 6 +-
.../org/apache/qpid/proton/InteropTest.java | 5 +-
tests/python/proton_tests/common.py | 81 +-
tests/python/proton_tests/engine.py | 242 +++-
tests/python/proton_tests/messenger.py | 98 +-
tests/python/proton_tests/sasl.py | 77 +-
tests/python/proton_tests/ssl.py | 72 +-
tests/python/proton_tests/transport.py | 117 +-
tests/tools/apps/c/CMakeLists.txt | 2 +
tests/tools/apps/c/msgr-common.h | 4 +
version.txt | 2 +-
208 files changed, 10388 insertions(+), 3391 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index a503467..7917258 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -18,6 +18,14 @@
#
cmake_minimum_required (VERSION 2.6)
+# Set default build type. Must come before project() which sets default to ""
+set (CMAKE_BUILD_TYPE RelWithDebInfo CACHE string
+ "Build type: Debug, Release, RelWithDebInfo or MinSizeRel (default RelWithDebInfo)")
+if (CMAKE_BUILD_TYPE MATCHES "Deb")
+ set (has_debug_symbols " (has debug symbols)")
+endif (CMAKE_BUILD_TYPE MATCHES "Deb")
+message("Build type is \"${CMAKE_BUILD_TYPE}\"${has_debug_symbols}")
+
option(BUILD_WITH_CXX "Compile Proton using C++" OFF)
if ("${CMAKE_GENERATOR}" MATCHES "^Visual Studio")
# No C99 capability, use C++
@@ -152,3 +160,9 @@ if (JAVA_FOUND AND MAVEN_EXE)
else (JAVA_FOUND AND MAVEN_EXE)
message (STATUS "Cannot find both Java and Maven: testing disabled for Proton-J")
endif (JAVA_FOUND AND MAVEN_EXE)
+
+# Generate test environment settings
+configure_file(${CMAKE_SOURCE_DIR}/config.sh.in
+ ${CMAKE_BINARY_DIR}/config.sh @ONLY)
+configure_file(${CMAKE_SOURCE_DIR}/config.bat.in
+ ${CMAKE_BINARY_DIR}/config.bat @ONLY)
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/DEVELOPERS
----------------------------------------------------------------------
diff --git a/DEVELOPERS b/DEVELOPERS
index 97cfdaf..b0e0015 100644
--- a/DEVELOPERS
+++ b/DEVELOPERS
@@ -5,32 +5,15 @@ DEVELOPMENT ENVIRONMENT
=======================
To setup the variables for your development environment, simply source
-the file $REPO/config.sh:
+the file $BLDDIR/config.sh [$BLDDIR points to the proton build directory]:
$ source config.sh
This file sets the needed environment variables for all supported dynamic
-languages (Python, Perl, Ruby, PHP) as well as for Java and for testing. It,
-by default, assumes that you're building Proton in the directory:
-
- $REPO/build
-
-where $REPO points the location where the Proton Subversion or Git repo has
-been checked out.
-
-If, however, you use a different location for your build files, then you'll want
-to set the environment variable CPROTON_BUILD to point to it first. So, for
-example, if you're building in:
-
- /home/yourname/devel/proton/cmake
-
-then you would have the following environment variable set:
-
- $ export CPROTON_BUILD=/hojme/yourname/devel/proton/cmake/proton-c
-
-NOTE: You need to point the environment variable to the proton-c directory under
-where your build is done.
+languages (Python, Perl, Ruby, PHP) as well as for Java and for testing.
+You will need to have set up the build directory first with cmake before the file
+will exist (see the instructions in README).
MAILING LIST
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/config.bat.in
----------------------------------------------------------------------
diff --git a/config.bat.in b/config.bat.in
new file mode 100644
index 0000000..a73a88e
--- /dev/null
+++ b/config.bat.in
@@ -0,0 +1,66 @@
+REM
+REM Licensed to the Apache Software Foundation (ASF) under one
+REM or more contributor license agreements. See the NOTICE file
+REM distributed with this work for additional information
+REM regarding copyright ownership. The ASF licenses this file
+REM to you under the Apache License, Version 2.0 (the
+REM "License"); you may not use this file except in compliance
+REM with the License. You may obtain a copy of the License at
+REM
+REM http://www.apache.org/licenses/LICENSE-2.0
+REM
+REM Unless required by applicable law or agreed to in writing,
+REM software distributed under the License is distributed on an
+REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+REM KIND, either express or implied. See the License for the
+REM specific language governing permissions and limitations
+REM under the License.
+REM
+
+REM This is a generated file and will be overwritten the next
+REM time that cmake is run.
+
+REM This build may be one of @CMAKE_CONFIGURATION_TYPES@
+REM Choose the configuration this script should reference:
+SET PROTON_BUILD_CONFIGURATION=relwithdebinfo
+
+REM PROTON_HOME is the root of the proton checkout
+REM PROTON_BUILD is where cmake was run
+
+set PROTON_HOME=@CMAKE_SOURCE_DIR@
+set PROTON_BUILD=@CMAKE_BINARY_DIR@
+
+set PROTON_HOME=%PROTON_HOME:/=\%
+set PROTON_BUILD=%PROTON_BUILD:/=\%
+
+set PROTON_BINDINGS=%PROTON_BUILD%\proton-c\bindings
+set PROTON_JARS=%PROTON_BUILD%\proton-j\proton-j.jar
+
+REM Python & Jython
+set PYTHON_BINDINGS=%PROTON_BINDINGS%\python
+set COMMON_PYPATH=%PROTON_HOME%\tests\python;%PROTON_HOME%\proton-c\bindings\python
+set PYTHONPATH=%COMMON_PYPATH%;%PYTHON_BINDINGS%
+set JYTHONPATH=%COMMON_PYPATH%;%PROTON_HOME%\proton-j\src\main\resources;%PROTON_JARS%
+set CLASSPATH=%PROTON_JARS%
+
+REM PHP
+set PHP_BINDINGS=%PROTON_BINDINGS%\php
+if EXIST %PHP_BINDINGS% (
+ echo include_path="%PHP_BINDINGS%;%PROTON_HOME%\proton-c\bindings\php" > %PHP_BINDINGS%\php.ini
+ echo extension="%PHP_BINDINGS%\cproton.so" >> %PHP_BINDINGS%\php.ini
+ set PHPRC=%PHP_BINDINGS%\php.ini
+)
+
+REM Ruby
+set RUBY_BINDINGS=%PROTON_BINDINGS%\ruby
+set RUBYLIB=%RUBY_BINDINGS%;%PROTON_HOME%\proton-c\bindings\ruby\lib;%PROTON_HOME%\tests\ruby
+
+REM Perl
+set PERL_BINDINGS=%PROTON_BINDINGS%\perl
+set PERL5LIB=%PERL5LIB%;%PERL_BINDINGS%;%PROTON_HOME%\proton-c\bindings\perl\lib
+
+REM test applications
+set PATH=%PATH%;%PROTON_BUILD%\tests\tools\apps\c
+set PATH=%PATH%;%PROTON_HOME%\tests\tools\apps\python
+set PATH=%PATH%;%PROTON_HOME%\tests\python
+set PATH=%PATH%;%PROTON_BUILD%\proton-c\%PROTON_BUILD_CONFIGURATION%
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/config.sh
----------------------------------------------------------------------
diff --git a/config.sh b/config.sh
deleted file mode 100755
index 90ad707..0000000
--- a/config.sh
+++ /dev/null
@@ -1,79 +0,0 @@
-#!/bin/bash
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-cd $(dirname ${BASH_SOURCE[0]}) > /dev/null
-export PROTON_HOME=$(pwd)
-cd - > /dev/null
-
-if [ -z "$CPROTON_BUILD" ]; then
- if [ -d $PROTON_HOME/build/proton-c ]; then
- PROTON_BINDINGS=$PROTON_HOME/build/proton-c/bindings
- else
- PROTON_BINDINGS=$PROTON_HOME/proton-c/bindings
- fi
- if [ -d $PROTON_HOME/build/proton-j ]; then
- PROTON_JARS=$PROTON_HOME/build/proton-j/proton-api/proton-api.jar:$PROTON_HOME/build/proton-j/proton/proton-j-impl.jar
- else
- PROTON_JARS=$PROTON_HOME/proton-j/proton-api/proton-api.jar:$PROTON_HOME/proton-j/proton/proton-j-impl.jar
- fi
-else
- PROTON_BINDINGS=$CPROTON_BUILD/bindings
-fi
-
-# Python & Jython
-export PYTHON_BINDINGS=$PROTON_BINDINGS/python
-export COMMON_PYPATH=$PROTON_HOME/tests/python
-export PYTHONPATH=$COMMON_PYPATH:$PROTON_HOME/proton-c/bindings/python:$PYTHON_BINDINGS
-export JYTHONPATH=$COMMON_PYPATH:$PROTON_HOME/proton-j/proton-api/src/main/resources:$PROTON_JARS
-export CLASSPATH=$PROTON_JARS
-
-# PHP
-export PHP_BINDINGS=$PROTON_BINDINGS/php
-if [ -d $PHP_BINDINGS ]; then
- cat <<EOF > $PHP_BINDINGS/php.ini
-include_path="$PHP_BINDINGS:$PROTON_HOME/proton-c/bindings/php"
-extension="$PHP_BINDINGS/cproton.so"
-EOF
- export PHPRC=$PHP_BINDINGS/php.ini
-fi
-
-# Ruby
-export RUBY_BINDINGS=$PROTON_BINDINGS/ruby
-export RUBYLIB=$RUBY_BINDINGS:$PROTON_HOME/proton-c/bindings/ruby/lib:$PROTON_HOME/tests/ruby
-
-# Perl
-export PERL_BINDINGS=$PROTON_BINDINGS/perl
-export PERL5LIB=$PERL5LIB:$PERL_BINDINGS:$PROTON_HOME/proton-c/bindings/perl/lib
-
-# test applications
-if [ -d $PROTON_HOME/build/tests/tools/apps/c ]; then
- export PATH="$PATH:$PROTON_HOME/build/tests/tools/apps/c"
-fi
-if [ -d $PROTON_HOME/tests/tools/apps/python ]; then
- export PATH="$PATH:$PROTON_HOME/tests/tools/apps/python"
-fi
-
-# test applications
-export PATH="$PATH:$PROTON_HOME/tests/python"
-
-# can the test harness use valgrind?
-if [[ -x "$(type -p valgrind)" ]] ; then
- export VALGRIND=1
-fi
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/config.sh.in
----------------------------------------------------------------------
diff --git a/config.sh.in b/config.sh.in
new file mode 100755
index 0000000..4b60b2f
--- /dev/null
+++ b/config.sh.in
@@ -0,0 +1,61 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+PROTON_HOME=@CMAKE_SOURCE_DIR@
+PROTON_BUILD=@CMAKE_BINARY_DIR@
+
+PROTON_BINDINGS=$PROTON_BUILD/proton-c/bindings
+PROTON_JARS=$PROTON_BUILD/proton-j/proton-j.jar
+
+PYTHON_BINDINGS=$PROTON_BINDINGS/python
+PHP_BINDINGS=$PROTON_BINDINGS/php
+RUBY_BINDINGS=$PROTON_BINDINGS/ruby
+PERL_BINDINGS=$PROTON_BINDINGS/perl
+
+# Python & Jython
+COMMON_PYPATH=$PROTON_HOME/tests/python:$PROTON_HOME/proton-c/bindings/python
+export PYTHONPATH=$COMMON_PYPATH:$PYTHON_BINDINGS
+export JYTHONPATH=$COMMON_PYPATH:$PROTON_HOME/proton-j/src/main/resources:$PROTON_JARS
+export CLASSPATH=$PROTON_JARS
+
+# PHP
+if [ -d $PHP_BINDINGS ]; then
+ cat <<EOF > $PHP_BINDINGS/php.ini
+include_path="$PHP_BINDINGS:$PROTON_HOME/proton-c/bindings/php"
+extension="$PHP_BINDINGS/cproton.so"
+EOF
+ export PHPRC=$PHP_BINDINGS/php.ini
+fi
+
+# Ruby
+export RUBYLIB=$RUBY_BINDINGS:$PROTON_HOME/proton-c/bindings/ruby/lib:$PROTON_HOME/tests/ruby
+
+# Perl
+export PERL5LIB=$PERL5LIB:$PERL_BINDINGS:$PROTON_HOME/proton-c/bindings/perl/lib
+
+# test applications
+export PATH="$PATH:$PROTON_BUILD/tests/tools/apps/c"
+export PATH="$PATH:$PROTON_HOME/tests/tools/apps/python"
+export PATH="$PATH:$PROTON_HOME/tests/python"
+
+# can the test harness use valgrind?
+if [[ -x "$(type -p valgrind)" ]] ; then
+ export VALGRIND=$(type -p valgrind)
+fi
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/pom.xml b/contrib/proton-hawtdispatch/pom.xml
new file mode 100644
index 0000000..0eaa171
--- /dev/null
+++ b/contrib/proton-hawtdispatch/pom.xml
@@ -0,0 +1,61 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <parent>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>proton-project</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ <relativePath>../..</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>proton-hawtdispatch</artifactId>
+ <name>proton-hawtdispatch</name>
+
+ <properties>
+ <hawtbuf-version>1.9</hawtbuf-version>
+ <hawtdispatch-version>1.18</hawtdispatch-version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>proton-j</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.fusesource.hawtdispatch</groupId>
+ <artifactId>hawtdispatch-transport</artifactId>
+ <version>${hawtdispatch-version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.fusesource.hawtbuf</groupId>
+ <artifactId>hawtbuf</artifactId>
+ <version>${hawtbuf-version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ </build>
+ <scm>
+ <url>http://svn.apache.org/viewvc/qpid/proton/</url>
+ </scm>
+
+</project>
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpConnectOptions.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpConnectOptions.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpConnectOptions.java
new file mode 100644
index 0000000..3c3543d
--- /dev/null
+++ b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpConnectOptions.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.hawtdispatch.api;
+
+import org.fusesource.hawtdispatch.DispatchQueue;
+import org.fusesource.hawtdispatch.transport.TcpTransport;
+
+import javax.net.ssl.SSLContext;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.*;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class AmqpConnectOptions implements Cloneable {
+
+ private static final long KEEP_ALIVE = Long.parseLong(System.getProperty("amqp.thread.keep_alive", ""+1000));
+ private static final long STACK_SIZE = Long.parseLong(System.getProperty("amqp.thread.stack_size", ""+1024*512));
+ private static ThreadPoolExecutor blockingThreadPool;
+
+ public synchronized static ThreadPoolExecutor getBlockingThreadPool() {
+ if( blockingThreadPool == null ) {
+ blockingThreadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, KEEP_ALIVE, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
+ public Thread newThread(Runnable r) {
+ Thread rc = new Thread(null, r, "AMQP Task", STACK_SIZE);
+ rc.setDaemon(true);
+ return rc;
+ }
+ }) {
+
+ @Override
+ public void shutdown() {
+ // we don't ever shutdown since we are shared..
+ }
+
+ @Override
+ public List<Runnable> shutdownNow() {
+ // we don't ever shutdown since we are shared..
+ return Collections.emptyList();
+ }
+ };
+ }
+ return blockingThreadPool;
+ }
+ public synchronized static void setBlockingThreadPool(ThreadPoolExecutor pool) {
+ blockingThreadPool = pool;
+ }
+
+ private static final URI DEFAULT_HOST;
+ static {
+ try {
+ DEFAULT_HOST = new URI("tcp://localhost");
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ URI host = DEFAULT_HOST;
+ URI localAddress;
+ SSLContext sslContext;
+ DispatchQueue dispatchQueue;
+ Executor blockingExecutor;
+ int maxReadRate;
+ int maxWriteRate;
+ int trafficClass = TcpTransport.IPTOS_THROUGHPUT;
+ boolean useLocalHost;
+ int receiveBufferSize = 1024*64;
+ int sendBufferSize = 1024*64;
+ String localContainerId;
+ String remoteContainerId;
+ String user;
+ String password;
+
+
+ @Override
+ public AmqpConnectOptions clone() {
+ try {
+ return (AmqpConnectOptions) super.clone();
+ } catch (CloneNotSupportedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public String getLocalContainerId() {
+ return localContainerId;
+ }
+
+ public void setLocalContainerId(String localContainerId) {
+ this.localContainerId = localContainerId;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public String getRemoteContainerId() {
+ return remoteContainerId;
+ }
+
+ public void setRemoteContainerId(String remoteContainerId) {
+ this.remoteContainerId = remoteContainerId;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public void setUser(String user) {
+ this.user = user;
+ }
+
+ public Executor getBlockingExecutor() {
+ return blockingExecutor;
+ }
+
+ public void setBlockingExecutor(Executor blockingExecutor) {
+ this.blockingExecutor = blockingExecutor;
+ }
+
+ public DispatchQueue getDispatchQueue() {
+ return dispatchQueue;
+ }
+
+ public void setDispatchQueue(DispatchQueue dispatchQueue) {
+ this.dispatchQueue = dispatchQueue;
+ }
+
+ public URI getLocalAddress() {
+ return localAddress;
+ }
+
+ public void setLocalAddress(String localAddress) throws URISyntaxException {
+ this.setLocalAddress(new URI(localAddress));
+ }
+ public void setLocalAddress(URI localAddress) {
+ this.localAddress = localAddress;
+ }
+
+ public int getMaxReadRate() {
+ return maxReadRate;
+ }
+
+ public void setMaxReadRate(int maxReadRate) {
+ this.maxReadRate = maxReadRate;
+ }
+
+ public int getMaxWriteRate() {
+ return maxWriteRate;
+ }
+
+ public void setMaxWriteRate(int maxWriteRate) {
+ this.maxWriteRate = maxWriteRate;
+ }
+
+ public int getReceiveBufferSize() {
+ return receiveBufferSize;
+ }
+
+ public void setReceiveBufferSize(int receiveBufferSize) {
+ this.receiveBufferSize = receiveBufferSize;
+ }
+
+ public URI getHost() {
+ return host;
+ }
+ public void setHost(String host, int port) throws URISyntaxException {
+ this.setHost(new URI("tcp://"+host+":"+port));
+ }
+ public void setHost(String host) throws URISyntaxException {
+ this.setHost(new URI(host));
+ }
+ public void setHost(URI host) {
+ this.host = host;
+ }
+
+ public int getSendBufferSize() {
+ return sendBufferSize;
+ }
+
+ public void setSendBufferSize(int sendBufferSize) {
+ this.sendBufferSize = sendBufferSize;
+ }
+
+ public SSLContext getSslContext() {
+ return sslContext;
+ }
+
+ public void setSslContext(SSLContext sslContext) {
+ this.sslContext = sslContext;
+ }
+
+ public int getTrafficClass() {
+ return trafficClass;
+ }
+
+ public void setTrafficClass(int trafficClass) {
+ this.trafficClass = trafficClass;
+ }
+
+ public boolean isUseLocalHost() {
+ return useLocalHost;
+ }
+
+ public void setUseLocalHost(boolean useLocalHost) {
+ this.useLocalHost = useLocalHost;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpConnection.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpConnection.java
new file mode 100644
index 0000000..b308209
--- /dev/null
+++ b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpConnection.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.api;
+
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.hawtdispatch.impl.AmqpListener;
+import org.apache.qpid.proton.hawtdispatch.impl.AmqpTransport;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Endpoint;
+import org.apache.qpid.proton.engine.ProtonJConnection;
+import org.apache.qpid.proton.engine.ProtonJSession;
+import org.apache.qpid.proton.engine.impl.ProtocolTracer;
+import org.fusesource.hawtdispatch.DispatchQueue;
+import org.fusesource.hawtdispatch.Task;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class AmqpConnection extends AmqpEndpointBase {
+
+ AmqpTransport transport;
+ ProtonJConnection connection;
+ HashSet<AmqpSender> senders = new HashSet<AmqpSender>();
+ boolean closing = false;
+
+ public static AmqpConnection connect(AmqpConnectOptions options) {
+ return new AmqpConnection(options);
+ }
+
+ private AmqpConnection(AmqpConnectOptions options) {
+ transport = AmqpTransport.connect(options);
+ transport.setListener(new AmqpListener() {
+ @Override
+ public void processDelivery(Delivery delivery) {
+ Attachment attachment = (Attachment) getTransport().context(delivery.getLink()).getAttachment();
+ AmqpLink link = (AmqpLink) attachment.endpoint();
+ link.processDelivery(delivery);
+ }
+
+ @Override
+ public void processRefill() {
+ for(AmqpSender sender: new ArrayList<AmqpSender>(senders)) {
+ sender.pumpDeliveries();
+ }
+ pumpOut();
+ }
+
+ public void processTransportFailure(final IOException e) {
+ }
+ });
+ connection = transport.connection();
+ connection.open();
+ attach();
+ }
+
+ public void waitForConnected() throws Exception {
+ assertNotOnDispatchQueue();
+ getConnectedFuture().await();
+ }
+
+ public Future<Void> getConnectedFuture() {
+ final Promise<Void> rc = new Promise<Void>();
+ queue().execute(new Task() {
+ @Override
+ public void run() {
+ onConnected(rc);
+ }
+ });
+ return rc;
+ }
+
+ public void onConnected(Callback<Void> cb) {
+ transport.onTransportConnected(cb);
+ }
+
+ @Override
+ protected Endpoint getEndpoint() {
+ return connection;
+ }
+
+ @Override
+ protected AmqpConnection getConnection() {
+ return this;
+ }
+
+ @Override
+ protected AmqpEndpointBase getParent() {
+ return null;
+ }
+
+ public AmqpSession createSession() {
+ assertExecuting();
+ ProtonJSession session = connection.session();
+ session.open();
+ pumpOut();
+ return new AmqpSession(this, session);
+ }
+
+ public int getMaxSessions() {
+ return connection.getMaxChannels();
+ }
+
+ public void disconnect() {
+ closing = true;
+ transport.disconnect();
+ }
+
+ public void waitForDisconnected() throws Exception {
+ assertNotOnDispatchQueue();
+ getDisconnectedFuture().await();
+ }
+
+ public Future<Void> getDisconnectedFuture() {
+ final Promise<Void> rc = new Promise<Void>();
+ queue().execute(new Task() {
+ @Override
+ public void run() {
+ onDisconnected(rc);
+ }
+ });
+ return rc;
+ }
+
+ public void onDisconnected(Callback<Void> cb) {
+ transport.onTransportDisconnected(cb);
+ }
+
+ public TransportState getTransportState() {
+ return transport.getState();
+ }
+
+ public Throwable getTransportFailure() {
+ return transport.getFailure();
+ }
+
+ public Future<Throwable> getTransportFailureFuture() {
+ final Promise<Throwable> rc = new Promise<Throwable>();
+ queue().execute(new Task() {
+ @Override
+ public void run() {
+ onTransportFailure(rc);
+ }
+ });
+ return rc;
+ }
+
+ public void onTransportFailure(Callback<Throwable> cb) {
+ transport.onTransportFailure(cb);
+ }
+
+ @Override
+ public DispatchQueue queue() {
+ return super.queue();
+ }
+
+ public void setProtocolTracer(ProtocolTracer protocolTracer) {
+ transport.setProtocolTracer(protocolTracer);
+ }
+
+ public ProtocolTracer getProtocolTracer() {
+ return transport.getProtocolTracer();
+ }
+
+ /**
+ * Once the remote end, closes the transport is disconnected.
+ */
+ @Override
+ public void close() {
+ super.close();
+ onRemoteClose(new Callback<ErrorCondition>() {
+ @Override
+ public void onSuccess(ErrorCondition value) {
+ disconnect();
+ }
+
+ @Override
+ public void onFailure(Throwable value) {
+ disconnect();
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpDeliveryListener.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpDeliveryListener.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpDeliveryListener.java
new file mode 100644
index 0000000..1e9f4e2
--- /dev/null
+++ b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpDeliveryListener.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.api;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface AmqpDeliveryListener {
+
+ /**
+ * Caller should suspend/resume the AmqpReceiver to
+ * flow control the delivery of messages.
+ *
+ * @param delivery
+ */
+ void onMessageDelivery(MessageDelivery delivery);
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpEndpointBase.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpEndpointBase.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpEndpointBase.java
new file mode 100644
index 0000000..4ebd8e2
--- /dev/null
+++ b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpEndpointBase.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.api;
+
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.hawtdispatch.impl.*;
+import org.apache.qpid.proton.engine.Endpoint;
+import org.apache.qpid.proton.engine.EndpointState;
+import org.fusesource.hawtdispatch.Dispatch;
+import org.fusesource.hawtdispatch.DispatchQueue;
+import org.fusesource.hawtdispatch.Task;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract class AmqpEndpointBase extends WatchBase {
+ abstract protected Endpoint getEndpoint();
+ abstract protected AmqpEndpointBase getParent();
+
+ protected AmqpConnection getConnection() {
+ return getParent().getConnection();
+ }
+
+ protected AmqpTransport getTransport() {
+ return getConnection().transport;
+ }
+
+ protected DispatchQueue queue() {
+ return getTransport().queue();
+ }
+
+ protected void assertExecuting() {
+ getTransport().assertExecuting();
+ }
+
+ public void waitForRemoteOpen() throws Exception {
+ assertNotOnDispatchQueue();
+ getRemoteOpenFuture().await();
+ }
+
+ public Future<Void> getRemoteOpenFuture() {
+ final Promise<Void> rc = new Promise<Void>();
+ queue().execute(new Task() {
+ @Override
+ public void run() {
+ onRemoteOpen(rc);
+ }
+ });
+ return rc;
+ }
+
+ public void onRemoteOpen(final Callback<Void> cb) {
+ addWatch(new Watch() {
+ @Override
+ public boolean execute() {
+ switch (getEndpoint().getRemoteState()) {
+ case ACTIVE:
+ cb.onSuccess(null);
+ return true;
+ case CLOSED:
+ cb.onFailure(Support.illegalState("closed"));
+ return true;
+ }
+ return false;
+ }
+ });
+ }
+
+ public ErrorCondition waitForRemoteClose() throws Exception {
+ assertNotOnDispatchQueue();
+ return getRemoteCloseFuture().await();
+ }
+
+ public Future<ErrorCondition> getRemoteCloseFuture() {
+ final Promise<ErrorCondition> rc = new Promise<ErrorCondition>();
+ queue().execute(new Task() {
+ @Override
+ public void run() {
+ onRemoteClose(rc);
+ }
+ });
+ return rc;
+ }
+
+ public void onRemoteClose(final Callback<ErrorCondition> cb) {
+ addWatch(new Watch() {
+ @Override
+ public boolean execute() {
+ if (getEndpoint().getRemoteState() == EndpointState.CLOSED) {
+ cb.onSuccess(getEndpoint().getRemoteCondition());
+ return true;
+ }
+ return false;
+ }
+ });
+ }
+
+ public void close() {
+ getEndpoint().close();
+ pumpOut();
+ }
+
+ public EndpointState getRemoteState() {
+ return getEndpoint().getRemoteState();
+ }
+
+ public ErrorCondition getRemoteError() {
+ return getEndpoint().getRemoteCondition();
+ }
+
+ static protected ErrorCondition toError(Throwable value) {
+ return new ErrorCondition(Symbol.valueOf("error"), value.toString());
+ }
+
+ class Attachment extends Task {
+ AmqpEndpointBase endpoint() {
+ return AmqpEndpointBase.this;
+ }
+
+ @Override
+ public void run() {
+ fireWatches();
+ }
+ }
+
+ protected void attach() {
+ getTransport().context(getEndpoint()).setAttachment(new Attachment());
+ }
+
+ protected void defer(Defer defer) {
+ getTransport().defer(defer);
+ }
+
+ protected void pumpOut() {
+ getTransport().pumpOut();
+ }
+
+ static protected void assertNotOnDispatchQueue() {
+ assert Dispatch.getCurrentQueue()==null : "Not allowed to be called when executing on a dispatch queue";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpLink.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpLink.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpLink.java
new file mode 100644
index 0000000..dd6f32e
--- /dev/null
+++ b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpLink.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.api;
+
+import org.apache.qpid.proton.engine.Delivery;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract public class AmqpLink extends AmqpEndpointBase {
+ abstract protected void processDelivery(Delivery delivery);
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpReceiver.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpReceiver.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpReceiver.java
new file mode 100644
index 0000000..644f72a
--- /dev/null
+++ b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpReceiver.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.api;
+
+import org.apache.qpid.proton.hawtdispatch.impl.Defer;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.amqp.messaging.Accepted;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtbuf.ByteArrayOutputStream;
+
+import java.util.LinkedList;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class AmqpReceiver extends AmqpLink {
+
+ final AmqpSession parent;
+ final Receiver receiver;
+
+ public AmqpReceiver(AmqpSession parent, Receiver receiver2, QoS qos) {
+ this.parent = parent;
+ this.receiver = receiver2;
+ attach();
+ }
+
+ @Override
+ protected Receiver getEndpoint() {
+ return receiver;
+ }
+ @Override
+ protected AmqpSession getParent() {
+ return parent;
+ }
+
+ ByteArrayOutputStream current = new ByteArrayOutputStream();
+
+ @Override
+ protected void processDelivery(Delivery delivery) {
+ if( !delivery.isReadable() ) {
+ System.out.println("it was not readable!");
+ return;
+ }
+
+ if( current==null ) {
+ current = new ByteArrayOutputStream();
+ }
+
+ int count;
+ byte data[] = new byte[1024*4];
+ while( (count = receiver.recv(data, 0, data.length)) > 0 ) {
+ current.write(data, 0, count);
+ }
+
+ // Expecting more deliveries..
+ if( count == 0 ) {
+ return;
+ }
+
+ receiver.advance();
+ Buffer buffer = current.toBuffer();
+ current = null;
+ onMessage(delivery, buffer);
+
+ }
+
+ LinkedList<MessageDelivery> inbound = new LinkedList<MessageDelivery>();
+
+ protected void onMessage(Delivery delivery, Buffer buffer) {
+ MessageDelivery md = new MessageDelivery(buffer) {
+ @Override
+ AmqpLink link() {
+ return AmqpReceiver.this;
+ }
+
+ @Override
+ public void settle() {
+ if( !delivery.isSettled() ) {
+ delivery.disposition(new Accepted());
+ delivery.settle();
+ }
+ drain();
+ }
+ };
+ md.delivery = delivery;
+ delivery.setContext(md);
+ inbound.add(md);
+ drainInbound();
+ }
+
+ public void drain() {
+ defer(deferedDrain);
+ }
+
+ Defer deferedDrain = new Defer(){
+ public void run() {
+ drainInbound();
+ }
+ };
+ int resumed = 0;
+
+ public void resume() {
+ resumed++;
+ }
+ public void suspend() {
+ resumed--;
+ }
+
+ AmqpDeliveryListener deliveryListener;
+ private void drainInbound() {
+ while( deliveryListener!=null && !inbound.isEmpty() && resumed>0) {
+ deliveryListener.onMessageDelivery(inbound.removeFirst());
+ receiver.flow(1);
+ }
+ }
+
+ public AmqpDeliveryListener getDeliveryListener() {
+ return deliveryListener;
+ }
+
+ public void setDeliveryListener(AmqpDeliveryListener deliveryListener) {
+ this.deliveryListener = deliveryListener;
+ drainInbound();
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSender.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSender.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSender.java
new file mode 100644
index 0000000..9a672d5
--- /dev/null
+++ b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSender.java
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.api;
+
+import org.apache.qpid.proton.hawtdispatch.impl.Defer;
+import org.apache.qpid.proton.hawtdispatch.impl.Watch;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.message.Message;
+import org.apache.qpid.proton.amqp.messaging.Accepted;
+import org.apache.qpid.proton.amqp.messaging.Modified;
+import org.apache.qpid.proton.amqp.messaging.Rejected;
+import org.apache.qpid.proton.amqp.messaging.Released;
+import org.apache.qpid.proton.amqp.transport.DeliveryState;
+import org.fusesource.hawtbuf.Buffer;
+
+import java.io.UnsupportedEncodingException;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class AmqpSender extends AmqpLink {
+
+ private byte[] EMPTY_BYTE_ARRAY = new byte[]{};
+ long nextTagId = 0;
+ HashSet<byte[]> tagCache = new HashSet<byte[]>();
+
+ final AmqpSession parent;
+ private final QoS qos;
+ final Sender sender;
+
+ public AmqpSender(AmqpSession parent, Sender sender2, QoS qos) {
+ this.parent = parent;
+ this.sender = sender2;
+ this.qos = qos;
+ attach();
+ getConnection().senders.add(this);
+ }
+
+ @Override
+ public void close() {
+ super.close();
+ getConnection().senders.remove(this);
+ }
+
+ @Override
+ protected Sender getEndpoint() {
+ return sender;
+ }
+
+ @Override
+ protected AmqpSession getParent() {
+ return parent;
+ }
+
+ final LinkedList<MessageDelivery> outbound = new LinkedList<MessageDelivery>();
+ long outboundBufferSize;
+
+ public MessageDelivery send(Message message) {
+ assertExecuting();
+ MessageDelivery rc = new MessageDelivery(message) {
+ @Override
+ AmqpLink link() {
+ return AmqpSender.this;
+ }
+
+ @Override
+ public void redeliver(boolean incrementDeliveryCounter) {
+ super.redeliver(incrementDeliveryCounter);
+ outbound.add(this);
+ outboundBufferSize += initialSize;
+ defer(deferedPumpDeliveries);
+ }
+ };
+ outbound.add(rc);
+ outboundBufferSize += rc.initialSize;
+ pumpDeliveries();
+ pumpOut();
+ return rc;
+ }
+
+ Buffer currentBuffer;
+ Delivery currentDelivery;
+
+ Defer deferedPumpDeliveries = new Defer() {
+ public void run() {
+ pumpDeliveries();
+ }
+ };
+
+ public long getOverflowBufferSize() {
+ return outboundBufferSize;
+ }
+
+ protected void pumpDeliveries() {
+ assertExecuting();
+ try {
+ while(true) {
+ while( currentBuffer !=null ) {
+ if( sender.getCredit() > 0 ) {
+ int sent = sender.send(currentBuffer.data, currentBuffer.offset, currentBuffer.length);
+ currentBuffer.moveHead(sent);
+ if( currentBuffer.length == 0 ) {
+ Delivery current = currentDelivery;
+ MessageDelivery md = (MessageDelivery) current.getContext();
+ currentBuffer = null;
+ currentDelivery = null;
+ if( qos == QoS.AT_MOST_ONCE ) {
+ current.settle();
+ } else {
+ sender.advance();
+ }
+ md.fireWatches();
+ }
+ } else {
+ return;
+ }
+ }
+
+ if( outbound.isEmpty() ) {
+ return;
+ }
+
+ final MessageDelivery md = outbound.removeFirst();
+ outboundBufferSize -= md.initialSize;
+ currentBuffer = md.encoded();
+ if( qos == QoS.AT_MOST_ONCE ) {
+ currentDelivery = sender.delivery(EMPTY_BYTE_ARRAY, 0, 0);
+ } else {
+ final byte[] tag = nextTag();
+ currentDelivery = sender.delivery(tag, 0, tag.length);
+ }
+ md.delivery = currentDelivery;
+ currentDelivery.setContext(md);
+ }
+ } finally {
+ fireWatches();
+ }
+ }
+
+ @Override
+ protected void processDelivery(Delivery delivery) {
+ final MessageDelivery md = (MessageDelivery) delivery.getContext();
+ if( delivery.remotelySettled() ) {
+ if( delivery.getTag().length > 0 ) {
+ checkinTag(delivery.getTag());
+ }
+
+ final DeliveryState state = delivery.getRemoteState();
+ if( state==null || state instanceof Accepted) {
+ if( !delivery.remotelySettled() ) {
+ delivery.disposition(new Accepted());
+ }
+ } else if( state instanceof Rejected) {
+ // re-deliver /w incremented delivery counter.
+ md.delivery = null;
+ md.incrementDeliveryCount();
+ outbound.addLast(md);
+ } else if( state instanceof Released) {
+ // re-deliver && don't increment the counter.
+ md.delivery = null;
+ outbound.addLast(md);
+ } else if( state instanceof Modified) {
+ Modified modified = (Modified) state;
+ if ( modified.getDeliveryFailed() ) {
+ // increment delivery counter..
+ md.incrementDeliveryCount();
+ }
+ }
+ delivery.settle();
+ }
+ md.fireWatches();
+ }
+
+ byte[] nextTag() {
+ byte[] rc;
+ if (tagCache != null && !tagCache.isEmpty()) {
+ final Iterator<byte[]> iterator = tagCache.iterator();
+ rc = iterator.next();
+ iterator.remove();
+ } else {
+ try {
+ rc = Long.toHexString(nextTagId++).getBytes("UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return rc;
+ }
+
+ void checkinTag(byte[] data) {
+ if( tagCache.size() < 1024 ) {
+ tagCache.add(data);
+ }
+ }
+
+ public void onOverflowBufferDrained(final Callback<Void> cb) {
+ addWatch(new Watch() {
+ @Override
+ public boolean execute() {
+ if (outboundBufferSize==0) {
+ cb.onSuccess(null);
+ return true;
+ }
+ return false;
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSession.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSession.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSession.java
new file mode 100644
index 0000000..b25a1b7
--- /dev/null
+++ b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/AmqpSession.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.api;
+
+import org.apache.qpid.proton.amqp.messaging.Section;
+import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
+import org.apache.qpid.proton.engine.Endpoint;
+import org.apache.qpid.proton.engine.Link;
+import org.apache.qpid.proton.engine.ProtonJSession;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.message.Message;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.messaging.*;
+import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
+
+import java.util.UUID;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class AmqpSession extends AmqpEndpointBase {
+
+ final AmqpConnection parent;
+ final ProtonJSession session;
+
+
+ public AmqpSession(AmqpConnection parent, ProtonJSession session) {
+ this.parent = parent;
+ this.session = session;
+ attach();
+ }
+
+ @Override
+ protected Endpoint getEndpoint() {
+ return session;
+ }
+
+ @Override
+ protected AmqpConnection getParent() {
+ return parent;
+ }
+
+ public AmqpSender createSender(Target target) {
+ return createSender(target, QoS.AT_LEAST_ONCE);
+ }
+
+ public AmqpSender createSender(Target target, QoS qos) {
+ return createSender(target, qos, UUID.randomUUID().toString());
+ }
+
+ public AmqpSender createSender(Target target, QoS qos, String name) {
+ assertExecuting();
+ Sender sender = session.sender(name);
+ attach();
+// Source source = new Source();
+// source.setAddress(UUID.randomUUID().toString());
+// sender.setSource(source);
+ sender.setTarget(target);
+ configureQos(sender, qos);
+ sender.open();
+ pumpOut();
+ return new AmqpSender(this, sender, qos);
+ }
+
+ public AmqpReceiver createReceiver(Source source) {
+ return createReceiver(source, QoS.AT_LEAST_ONCE);
+ }
+
+ public AmqpReceiver createReceiver(Source source, QoS qos) {
+ return createReceiver(source, qos, 100);
+ }
+
+ public AmqpReceiver createReceiver(Source source, QoS qos, int prefetch) {
+ return createReceiver(source, qos, prefetch, UUID.randomUUID().toString());
+ }
+
+ public AmqpReceiver createReceiver(Source source, QoS qos, int prefetch, String name) {
+ assertExecuting();
+ Receiver receiver = session.receiver(name);
+ receiver.setSource(source);
+// Target target = new Target();
+// target.setAddress(UUID.randomUUID().toString());
+// receiver.setTarget(target);
+ receiver.flow(prefetch);
+ configureQos(receiver, qos);
+ receiver.open();
+ pumpOut();
+ return new AmqpReceiver(this, receiver, qos);
+ }
+
+ private void configureQos(Link link, QoS qos) {
+ switch (qos) {
+ case AT_MOST_ONCE:
+ link.setSenderSettleMode(SenderSettleMode.SETTLED);
+ link.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+ break;
+ case AT_LEAST_ONCE:
+ link.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+ link.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+ break;
+ case EXACTLY_ONCE:
+ link.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+ link.setReceiverSettleMode(ReceiverSettleMode.SECOND);
+ break;
+ }
+ }
+
+ public Message createTextMessage(String value) {
+ Message msg = Message.Factory.create();
+ Section body = new AmqpValue(value);
+ msg.setBody(body);
+ return msg;
+ }
+
+ public Message createBinaryMessage(byte value[]) {
+ return createBinaryMessage(value, 0, value.length);
+ }
+
+ public Message createBinaryMessage(byte value[], int offset, int len) {
+ Message msg = Message.Factory.create();
+ Data body = new Data(new Binary(value, offset,len));
+ msg.setBody(body);
+ return msg;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Callback.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Callback.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Callback.java
new file mode 100644
index 0000000..89fbdd1
--- /dev/null
+++ b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Callback.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.hawtdispatch.api;
+
+/**
+ * <p>
+ * Function Result that carries one value.
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface Callback<T> {
+ public void onSuccess(T value);
+ public void onFailure(Throwable value);
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/ChainedCallback.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/ChainedCallback.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/ChainedCallback.java
new file mode 100644
index 0000000..e53f512
--- /dev/null
+++ b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/ChainedCallback.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.hawtdispatch.api;
+
+/**
+ * <p>
+ * Function Result that carries one value.
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract public class ChainedCallback<In,Out> implements Callback<In> {
+
+ public final Callback<Out> next;
+
+ public ChainedCallback(Callback<Out> next) {
+ this.next = next;
+ }
+
+ public void onFailure(Throwable value) {
+ next.onFailure(value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/DeliveryAttachment.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/DeliveryAttachment.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/DeliveryAttachment.java
new file mode 100644
index 0000000..290076f
--- /dev/null
+++ b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/DeliveryAttachment.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.api;
+
+import org.apache.qpid.proton.engine.Delivery;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract public class DeliveryAttachment {
+ abstract void processDelivery(Delivery delivery);
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Future.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Future.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Future.java
new file mode 100644
index 0000000..4a9eb5e
--- /dev/null
+++ b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Future.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.hawtdispatch.api;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * <p>A simplified Future function results interface.</p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface Future<T> {
+ T await() throws Exception;
+ T await(long amount, TimeUnit unit) throws Exception;
+ void then(Callback<T> callback);
+
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/MessageDelivery.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/MessageDelivery.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/MessageDelivery.java
new file mode 100644
index 0000000..b115557
--- /dev/null
+++ b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/MessageDelivery.java
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.proton.hawtdispatch.api;
+
+import org.apache.qpid.proton.amqp.transport.DeliveryState;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.hawtdispatch.impl.Watch;
+import org.apache.qpid.proton.hawtdispatch.impl.WatchBase;
+import org.apache.qpid.proton.message.Message;
+import org.apache.qpid.proton.message.ProtonJMessage;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtdispatch.Task;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public abstract class MessageDelivery extends WatchBase {
+
+ final int initialSize;
+ private Message message;
+ private Buffer encoded;
+ public Delivery delivery;
+ private int sizeHint = 32;
+
+ static Buffer encode(Message message, int sizeHint) {
+ byte[] buffer = new byte[sizeHint];
+ int size = ((ProtonJMessage)message).encode2(buffer, 0, sizeHint);
+ if( size > sizeHint ) {
+ buffer = new byte[size];
+ size = message.encode(buffer, 0, size);
+ }
+ return new Buffer(buffer, 0, size);
+ }
+
+ static Message decode(Buffer buffer) {
+ Message msg = Message.Factory.create();
+ int offset = buffer.offset;
+ int len = buffer.length;
+ while( len > 0 ) {
+ int decoded = msg.decode(buffer.data, offset, len);
+ assert decoded > 0: "Make progress decoding the message";
+ offset += decoded;
+ len -= decoded;
+ }
+ return msg;
+ }
+
+ public MessageDelivery(Message message) {
+ this(message, encode(message, 32));
+ }
+
+ public MessageDelivery(Buffer encoded) {
+ this(null, encoded);
+ }
+
+ public MessageDelivery(Message message, Buffer encoded) {
+ this.message = message;
+ this.encoded = encoded;
+ sizeHint = this.encoded.length;
+ initialSize = sizeHint;
+ }
+
+ public Message getMessage() {
+ if( message == null ) {
+ message = decode(encoded);
+ }
+ return message;
+ }
+
+ public Buffer encoded() {
+ if( encoded == null ) {
+ encoded = encode(message, sizeHint);
+ sizeHint = encoded.length;
+ }
+ return encoded;
+ }
+
+ public boolean isSettled() {
+ return delivery!=null && delivery.isSettled();
+ }
+
+ public DeliveryState getRemoteState() {
+ return delivery==null ? null : delivery.getRemoteState();
+ }
+
+ public DeliveryState getLocalState() {
+ return delivery==null ? null : delivery.getLocalState();
+ }
+
+ public void onEncoded(final Callback<Void> cb) {
+ addWatch(new Watch() {
+ @Override
+ public boolean execute() {
+ if( delivery!=null ) {
+ cb.onSuccess(null);
+ return true;
+ }
+ return false;
+ }
+ });
+ }
+
+ /**
+ * @return the remote delivery state when it changes.
+ * @throws Exception
+ */
+ public DeliveryState getRemoteStateChange() throws Exception {
+ AmqpEndpointBase.assertNotOnDispatchQueue();
+ return getRemoteStateChangeFuture().await();
+ }
+
+ /**
+ * @return the future remote delivery state when it changes.
+ */
+ public Future<DeliveryState> getRemoteStateChangeFuture() {
+ final Promise<DeliveryState> rc = new Promise<DeliveryState>();
+ link().queue().execute(new Task() {
+ @Override
+ public void run() {
+ onRemoteStateChange(rc);
+ }
+ });
+ return rc;
+ }
+
+ abstract AmqpLink link();
+
+ boolean watchingRemoteStateChange;
+ public void onRemoteStateChange(final Callback<DeliveryState> cb) {
+ watchingRemoteStateChange = true;
+ final DeliveryState original = delivery.getRemoteState();
+ addWatch(new Watch() {
+ @Override
+ public boolean execute() {
+ if (original == null) {
+ if( delivery.getRemoteState()!=null ) {
+ cb.onSuccess(delivery.getRemoteState());
+ watchingRemoteStateChange = false;
+ return true;
+ }
+ } else {
+ if( !original.equals(delivery.getRemoteState()) ) {
+ cb.onSuccess(delivery.getRemoteState());
+ watchingRemoteStateChange = false;
+ return true;
+ }
+ }
+ return false;
+ }
+ });
+ }
+
+ /**
+ * @return the remote delivery state once settled.
+ * @throws Exception
+ */
+ public DeliveryState getSettle() throws Exception {
+ AmqpEndpointBase.assertNotOnDispatchQueue();
+ return getSettleFuture().await();
+ }
+
+ /**
+ * @return the future remote delivery state once the delivery is settled.
+ */
+ public Future<DeliveryState> getSettleFuture() {
+ final Promise<DeliveryState> rc = new Promise<DeliveryState>();
+ link().queue().execute(new Task() {
+ @Override
+ public void run() {
+ onSettle(rc);
+ }
+ });
+ return rc;
+ }
+
+ public void onSettle(final Callback<DeliveryState> cb) {
+ addWatch(new Watch() {
+ @Override
+ public boolean execute() {
+ if( delivery!=null && delivery.isSettled() ) {
+ cb.onSuccess(delivery.getRemoteState());
+ return true;
+ }
+ return false;
+ }
+ });
+ }
+
+ @Override
+ protected void fireWatches() {
+ super.fireWatches();
+ }
+
+ void incrementDeliveryCount() {
+ Message msg = getMessage();
+ msg.setDeliveryCount(msg.getDeliveryCount()+1);
+ encoded = null;
+ }
+
+ public void redeliver(boolean incrementDeliveryCounter) {
+ if( incrementDeliveryCounter ) {
+ incrementDeliveryCount();
+ }
+ }
+
+ public void settle() {
+ if( !delivery.isSettled() ) {
+ delivery.settle();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Promise.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Promise.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Promise.java
new file mode 100644
index 0000000..b914b44
--- /dev/null
+++ b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/Promise.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.hawtdispatch.api;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class Promise<T> implements Callback<T>, Future<T> {
+
+ private final CountDownLatch latch = new CountDownLatch(1);
+ Callback<T> next;
+ Throwable error;
+ T value;
+
+ public void onFailure(Throwable value) {
+ Callback<T> callback = null;
+ synchronized(this) {
+ error = value;
+ latch.countDown();
+ callback = next;
+ }
+ if( callback!=null ) {
+ callback.onFailure(value);
+ }
+ }
+
+ public void onSuccess(T value) {
+ Callback<T> callback = null;
+ synchronized(this) {
+ this.value = value;
+ latch.countDown();
+ callback = next;
+ }
+ if( callback!=null ) {
+ callback.onSuccess(value);
+ }
+ }
+
+ public void then(Callback<T> callback) {
+ boolean fire = false;
+ synchronized(this) {
+ next = callback;
+ if( latch.getCount() == 0 ) {
+ fire = true;
+ }
+ }
+ if( fire ) {
+ if( error!=null ) {
+ callback.onFailure(error);
+ } else {
+ callback.onSuccess(value);
+ }
+ }
+ }
+
+ public T await(long amount, TimeUnit unit) throws Exception {
+ if( latch.await(amount, unit) ) {
+ return get();
+ } else {
+ throw new TimeoutException();
+ }
+ }
+
+ public T await() throws Exception {
+ latch.await();
+ return get();
+ }
+
+ private T get() throws Exception {
+ Throwable e = error;
+ if( e !=null ) {
+ if( e instanceof RuntimeException ) {
+ throw (RuntimeException) e;
+ } else if( e instanceof Exception) {
+ throw (Exception) e;
+ } else if( e instanceof Error) {
+ throw (Error) e;
+ } else {
+ // don't expect to hit this case.
+ throw new RuntimeException(e);
+ }
+ }
+ return value;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/QoS.java
----------------------------------------------------------------------
diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/QoS.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/QoS.java
new file mode 100644
index 0000000..5b4a8dc
--- /dev/null
+++ b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/QoS.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.hawtdispatch.api;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public enum QoS {
+ AT_MOST_ONCE,
+ AT_LEAST_ONCE,
+ EXACTLY_ONCE
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org