You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/11/13 18:41:05 UTC

svn commit: r1408852 [1/2] - in /activemq/activemq-apollo/trunk: apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/ apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/ apollo-amqp/src/main/scala/org/apache/activemq/apollo/...

Author: chirino
Date: Tue Nov 13 17:41:01 2012
New Revision: 1408852

URL: http://svn.apache.org/viewvc?rev=1408852&view=rev
Log:
More progress on the proton based AMQP impl.  Started extracting out a client API which builds on the proton lib which can be used in hawtdispatch client scenarios.

Added:
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpConnectOptions.java
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpConnection.java
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpDeliveryListener.java
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpEndpointBase.java
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpLink.java
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpReceiver.java
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpSender.java
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpSession.java
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/Callback.java
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/ChainedCallback.java
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/DeliveryAttachment.java
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/Future.java
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/MessageDelivery.java
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/Promise.java
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/QoS.java
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/TransportState.java
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpHeader.java
      - copied, changed from r1406782, activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpHeader.java
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpListener.java
      - copied, changed from r1406782, activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpListener.java
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpProtocolCodec.java
      - copied, changed from r1406782, activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpProtocolCodec.java
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpTransport.java
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/Defer.java
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/DroppingWritableBuffer.java
      - copied, changed from r1406782, activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/DroppingWritableBuffer.java
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/EndpointContext.java
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/Support.java
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/Watch.java
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/WatchBase.java
    activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/jul.properties
    activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/test/AmqpConnectionTest.scala
Removed:
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpConnection.java
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpHeader.java
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpListener.java
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpProtocolCodec.java
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/DroppingWritableBuffer.java
    activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/hawtdispatch/TransportConnectionTest.java
Modified:
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpMessage.scala
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocol.scala
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/test/QpidJmsTest.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala

Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpMessage.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpMessage.scala?rev=1408852&r1=1408851&r2=1408852&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpMessage.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpMessage.scala Tue Nov 13 17:41:01 2012
@@ -17,9 +17,9 @@
 package org.apache.activemq.apollo.amqp
 
 
+import hawtdispatch.impl.DroppingWritableBuffer
 import org.apache.activemq.apollo.broker.protocol
 import protocol.{MessageCodecFactory, MessageCodec}
-import hawtdispatch.DroppingWritableBuffer
 import java.nio.ByteBuffer
 import org.apache.qpid.proton.codec.{WritableBuffer, CompositeWritableBuffer}
 import org.fusesource.hawtbuf.Buffer._

Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocol.scala?rev=1408852&r1=1408851&r2=1408852&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocol.scala Tue Nov 13 17:41:01 2012
@@ -17,7 +17,7 @@
 package org.apache.activemq.apollo.amqp
 
 import _root_.org.fusesource.hawtbuf._
-import hawtdispatch.AmqpProtocolCodec
+import hawtdispatch.impl.AmqpProtocolCodec
 import org.apache.activemq.apollo.broker._
 import org.apache.activemq.apollo.broker.protocol.Protocol
 

Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala?rev=1408852&r1=1408851&r2=1408852&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala Tue Nov 13 17:41:01 2012
@@ -25,17 +25,18 @@ import org.fusesource.hawtbuf._
 import org.apache.activemq.apollo.util._
 import org.apache.activemq.apollo.dto._
 import org.apache.activemq.apollo.broker._
-import org.apache.activemq.apollo.util.path.{PathParser, Path, LiteralPart}
+import org.apache.activemq.apollo.util.path.{PathParser, Path}
+import path.LiteralPart
 import protocol.ProtocolHandler
 import org.apache.activemq.apollo.broker.security.SecurityContext
 import org.apache.activemq.apollo.amqp.dto._
-import hawtdispatch.{AmqpProtocolCodec, AmqpListener, AmqpConnection}
+import hawtdispatch.impl.{AmqpProtocolCodec, AmqpListener, AmqpTransport}
 import org.apache.qpid.proton.engine
-import engine.impl.{LinkImpl, TransportImpl}
-import engine.{Receiver, Sasl, Sender, Link, EndpointError, EndpointState}
+import org.apache.qpid.proton.engine.impl.{ProtocolTracer, DeliveryImpl, LinkImpl, TransportImpl}
+import org.apache.qpid.proton.engine._
 import org.fusesource.hawtbuf.Buffer._
 import org.apache.qpid.proton.`type`.transaction.{TransactionalState, Coordinator}
-import org.apache.qpid.proton.`type`.messaging.{Data, Source, Target}
+import org.apache.qpid.proton.`type`.messaging.{Accepted, Data, Source, Target, Modified}
 import org.apache.activemq.apollo.broker.Delivery
 import org.apache.activemq.apollo.filter.{FilterException, BooleanExpression}
 import org.apache.qpid.proton.`type`.{Symbol => AmqpSymbol, Binary, DescribedType}
@@ -43,8 +44,11 @@ import org.apache.activemq.apollo.select
 import org.apache.qpid.proton.`type`.transport.SenderSettleMode
 import java.util
 import java.io.IOException
+import scala.Some
 import org.apache.activemq.apollo.broker.FullSink
 import org.apache.activemq.apollo.broker.SubscriptionAddress
+import org.apache.activemq.apollo.broker.Session
+import org.apache.qpid.proton.framing.TransportFrame
 
 object AmqpProtocolHandler extends Log {
 
@@ -94,6 +98,7 @@ class AmqpProtocolHandler extends Protoc
   var config: AmqpDTO = _
   var dead = false
   var protocol_convert = "full"
+  var prefetch = 100
 
   def session_id = security_context.session_id
 
@@ -171,11 +176,11 @@ class AmqpProtocolHandler extends Protoc
     // heart_beat_monitor.resumeRead
   }
 
-  val amqp_connection = new AmqpConnection()
+  var amqp_connection:AmqpTransport = _
 
   def codec = connection.transport.getProtocolCodec.asInstanceOf[AmqpProtocolCodec]
 
-  def proton = amqp_connection.getProtonConnection
+  def proton = amqp_connection.connection()
 
   def pump_out = {
     queue.assertExecuting()
@@ -196,8 +201,21 @@ class AmqpProtocolHandler extends Protoc
 
     val connector_config = connection.connector.config.asInstanceOf[AcceptingConnectorDTO]
     config = connector_config.protocols.find(_.isInstanceOf[AmqpDTO]).map(_.asInstanceOf[AmqpDTO]).getOrElse(new AmqpDTO)
-    amqp_connection.bind(connection.transport)
+    amqp_connection = AmqpTransport.accept(connection.transport)
     amqp_connection.setListener(amqp_listener)
+    if( false ) {
+      amqp_connection.setProtocolTracer(new ProtocolTracer() {
+        def receivedFrame(transportFrame: TransportFrame) = {
+  //        println("RECV: %s | %s".format(security_context.remote_address, transportFrame.getBody()));
+  //        connection_log.trace("RECV: %s | %s", security_context.remote_address, transportFrame.getBody());
+        }
+        def sentFrame(transportFrame: TransportFrame) = {
+          println("SEND: %s | %s".format(security_context.remote_address, transportFrame.getBody()));
+  //        connection_log.trace("SEND: %s | %s", security_context.remote_address, transportFrame.getBody());
+        }
+      });
+    }
+    connection.transport.resumeRead()
   }
 
   val amqp_listener = new AmqpListener() {
@@ -209,7 +227,6 @@ class AmqpProtocolHandler extends Protoc
       sasl
     }
 
-
     override def processSaslEvent(sasl: Sasl): Sasl = {
       // Lets try to complete the sasl handshake.
       if (sasl.getRemoteMechanisms().length > 0) {
@@ -238,14 +255,45 @@ class AmqpProtocolHandler extends Protoc
       }
     }
 
-    override def processConnectionOpen(conn: engine.Connection, onComplete: Task) {
-      println("connection opened.")
+    override def processRemoteOpen(endpoint: Endpoint, onComplete: Task) {
+      endpoint match {
+        case connection:engine.Connection =>
+          processConnectionOpen(connection, onComplete)
+        case session:engine.Session =>
+          session.open(); onComplete.run()
+        case sender:engine.Sender =>
+          processSenderOpen(sender, onComplete)
+        case receiver:engine.Receiver =>
+          processReceiverOpen(receiver, onComplete)
+        case _ =>
+          async_die("system-error", "Unknown Endpoint")
+      }
+    }
+
+    override def processRemoteClose(endpoint: Endpoint, onComplete: Task) {
+      endpoint match {
+        case connection:engine.Connection =>
+          processConnectionClose(connection, onComplete)
+        case session:engine.Session =>
+          session.close(); onComplete.run()
+        case sender:engine.Sender =>
+          processSenderClose(sender, onComplete)
+        case receiver:engine.Receiver =>
+          processReceiverClose(receiver, onComplete)
+        case _ =>
+          async_die("system-error", "Unknown Endpoint")
+      }
+    }
+
+
+    def processConnectionOpen(conn: engine.Connection, onComplete: Task) {
       security_context.session_id = Some(conn.getRemoteContainer())
 
       suspend_read("host lookup")
       broker.dispatch_queue {
         val virtual_host = proton.getRemoteHostname match {
           case null => broker.default_virtual_host
+          case "" => broker.default_virtual_host
           case host => broker.get_virtual_host(ascii(host))
         }
         queue {
@@ -292,7 +340,7 @@ class AmqpProtocolHandler extends Protoc
       }
     }
 
-    override def processReceiverOpen(receiver: Receiver, onComplete: Task) {
+    def processReceiverOpen(receiver: Receiver, onComplete: Task) {
       // Client producer is attaching..
       receiver.setSource(receiver.getRemoteSource());
       receiver.setTarget(receiver.getRemoteTarget());
@@ -323,18 +371,15 @@ class AmqpProtocolHandler extends Protoc
           host.dispatch_queue {
             val rc = host.router.connect(route.addresses, route, security_context)
             queue {
-              println(rc)
               rc match {
                 case Some(failure) =>
-                  println(failure)
                   close_with_error(receiver, "Could not connect", failure)
                   onComplete.run()
                 case None =>
-                  println("ok")
                   // If the remote has not closed on us yet...
                   if (receiver.getRemoteState == EndpointState.ACTIVE) {
-                    receiver.setContext(route)
-                    receiver.flow(1024 * 64);
+                    set_attachment(receiver, route)
+                    receiver.flow(prefetch);
                     receiver.open()
                   } else {
                     receiver.close()
@@ -346,14 +391,22 @@ class AmqpProtocolHandler extends Protoc
       }
     }
 
-    override def processReceiverClose(receiver: Receiver, onComplete: Task) {
-      receiver.getContext match {
+    def get_attachment(endpoint:Endpoint):AnyRef = {
+      amqp_connection.context(endpoint).getAttachment()
+    }
+
+    def set_attachment(endpoint:Endpoint, value:AnyRef) = {
+      amqp_connection.context(endpoint).setAttachment(value)
+    }
+
+    def processReceiverClose(receiver: Receiver, onComplete: Task) {
+      get_attachment(receiver) match {
         case null =>
           receiver.close()
           onComplete.run()
         case route: AmqpProducerRoute =>
           // Lets disconnect the route.
-          receiver.setContext(null)
+          set_attachment(receiver, null)
           host.dispatch_queue {
             host.router.disconnect(route.addresses, route)
             queue {
@@ -366,14 +419,18 @@ class AmqpProtocolHandler extends Protoc
       }
     }
 
-    override def processDelivery(receiver: Receiver, delivery: engine.Delivery) {
-      receiver.getContext match {
+    override def processDelivery(delivery: engine.Delivery) {
+      get_attachment(delivery.getLink) match {
         case null =>
-        case route: AmqpProducerRoute => route.process(delivery)
+        case route: AmqpProducerRoute =>
+          route.process(delivery.asInstanceOf[DeliveryImpl])
+        case consumer: AmqpConsumer =>
+          consumer.process(delivery.asInstanceOf[DeliveryImpl])
+        // TODO
       }
     }
 
-    override def processSenderOpen(sender: Sender, onComplete: Task) {
+    def processSenderOpen(sender: Sender, onComplete: Task) {
       // Client consumer is attaching..
       sender.setSource(sender.getRemoteSource());
       sender.setTarget(sender.getRemoteTarget());
@@ -468,7 +525,7 @@ class AmqpProtocolHandler extends Protoc
               close_with_error(sender, "subscribe-failed", reason)
               onComplete.run()
             case None =>
-              sender.setContext(consumer)
+              set_attachment(sender, consumer)
               sender.open()
               onComplete.run()
           }
@@ -476,6 +533,11 @@ class AmqpProtocolHandler extends Protoc
       }
     }
 
+    def processSenderClose(sender: Sender, onComplete: Task) {
+      sender.close()
+      onComplete.run()
+    }
+
     var gracefully_closed = false
     override def processFailure(e: Throwable) {
       var msg = "Internal Server Error: " + e
@@ -493,10 +555,11 @@ class AmqpProtocolHandler extends Protoc
       on_transport_disconnected()
     }
 
-    override def processConnectionClose(conn: engine.Connection, onComplete: Task) {
+    def processConnectionClose(conn: engine.Connection, onComplete: Task) {
       gracefully_closed = true
       on_transport_disconnected()
-      super.processConnectionClose(conn, onComplete)
+      conn.close()
+      onComplete.run()
       queue.after(die_delay, TimeUnit.MILLISECONDS) {
         connection.stop(NOOP)
       }
@@ -574,7 +637,7 @@ class AmqpProtocolHandler extends Protoc
 
     def receiver: Receiver
 
-    def process(delivery: engine.Delivery): Unit = {
+    def process(delivery: DeliveryImpl): Unit = {
       if (!delivery.isReadable()) {
         System.out.println("it was not readable!");
         return;
@@ -599,15 +662,12 @@ class AmqpProtocolHandler extends Protoc
         }
       }
 
-      receiver.advance();
-      delivery.settle(); // TODO: do this once accepted by the broker.
-
       val buffer = current.toBuffer();
       current = null;
       onMessage(delivery, new AmqpMessage(buffer));
     }
 
-    def onMessage(delivery: engine.Delivery, buffer: AmqpMessage): Unit
+    def onMessage(delivery: DeliveryImpl, buffer: AmqpMessage): Unit
   }
 
   def decode_target(target: Target) = {
@@ -669,13 +729,17 @@ class AmqpProtocolHandler extends Protoc
 
     override def dispatch_queue = queue
 
-    refiller = ^ {
-      resume_read
+    val producer_overflow = new OverflowSink[Delivery](this) {
+      /**
+       * Called for each value what is passed on to the down stream sink.
+       */
+      override protected def onDelivered(value: Delivery) {
+        receiver.flow(1)
+        pump_out
+      }
     }
 
-    val producer_overflow = new OverflowSink[Delivery](this)
-
-    def onMessage(delivery: engine.Delivery, message: AmqpMessage) = {
+    def onMessage(delivery: DeliveryImpl, message: AmqpMessage) = {
       val d = new Delivery
       d.message = message
       d.size = message.encoded.length
@@ -699,15 +763,19 @@ class AmqpProtocolHandler extends Protoc
           queue {
             result match {
               case Consumed =>
+                delivery.disposition(new Accepted())
                 delivery.settle()
               case _ =>
                 async_die("uknown", "Unexpected NAK from broker")
             }
           }
         }
+      } else {
+        delivery.settle()
       }
 
-      producer_overflow.offer(d)
+      val accepted = producer_overflow.offer(d)
+      assert(accepted)
       receiver.advance();
     }
   }
@@ -790,7 +858,7 @@ class AmqpProtocolHandler extends Protoc
       override def time_stamp = broker.now
 
       var currentBuffer: Buffer = _;
-      var currentDelivery: org.apache.qpid.proton.engine.Delivery = _;
+      var currentDelivery: DeliveryImpl = _;
 
       override def drain_overflow: Unit = {
         queue.assertExecuting()
@@ -798,13 +866,15 @@ class AmqpProtocolHandler extends Protoc
         try {
           while (true) {
             while (currentBuffer != null) {
-              var sent = sender.send(currentBuffer.data, currentBuffer.offset, currentBuffer.length);
-              if (sent > 0) {
-                pumpNeeded = true
+              if (sender.getCredit > 0) {
+                val sent = sender.send(currentBuffer.data, currentBuffer.offset, currentBuffer.length);
                 currentBuffer.moveHead(sent);
+                val (session, apollo_delivery) = currentDelivery.getContext.asInstanceOf[(Session[Delivery], Delivery)]
+                delivered(session, apollo_delivery.size)
+                pumpNeeded = true
                 if (currentBuffer.length == 0) {
                   if (presettle) {
-                    settle(currentDelivery, Consumed);
+                    settle(currentDelivery, Consumed, false);
                   } else {
                     sender.advance();
                   }
@@ -849,10 +919,10 @@ class AmqpProtocolHandler extends Protoc
 
               currentBuffer = new AmqpMessage(null, message).encoded;
               if (presettle) {
-                currentDelivery = sender.delivery(EMPTY_BYTE_ARRAY, 0, 0);
+                currentDelivery = sender.delivery(EMPTY_BYTE_ARRAY, 0, 0).asInstanceOf[DeliveryImpl];
               } else {
                 val tag = nextTag
-                currentDelivery = sender.delivery(tag, 0, tag.length);
+                currentDelivery = sender.delivery(tag, 0, tag.length).asInstanceOf[DeliveryImpl];
                 unsettled.put(new AsciiBuffer(tag), currentDelivery)
               }
               currentDelivery.setContext(value);
@@ -860,6 +930,7 @@ class AmqpProtocolHandler extends Protoc
           }
         } finally {
           if( pumpNeeded ) {
+            pumpNeeded = false
             pump_out
           }
         }
@@ -872,36 +943,68 @@ class AmqpProtocolHandler extends Protoc
           redeliveries.removeFirst()
         }
       }
+    }
 
-      private def settle(delivery: org.apache.qpid.proton.engine.Delivery, ackType: DeliveryResult) {
-        val tag: Array[Byte] = delivery.getTag
-        if (tag != null && tag.length > 0) {
-          checkinTag(tag)
-          unsettled.remove(new AsciiBuffer(tag))
-        }
-        // Don't ack.. redeliver
-        val (session, apollo_delivery) = delivery.getContext.asInstanceOf[(Session[Delivery], Delivery)]
-        if (ackType == null) {
-          delivery.settle
-          redeliveries.addFirst((session, apollo_delivery))
-          drain_overflow
-        } else {
-
-          val remoteState = delivery.getRemoteState
-          if (remoteState != null && remoteState.isInstanceOf[TransactionalState]) {
-            val s: TransactionalState = remoteState.asInstanceOf[TransactionalState]
-            val txid = toLong(s.getTxnId)
-            async_die("txs-not-supported", "Transactions not yet supported")
-            return
+    def process(proton_delivery:DeliveryImpl) = {
+      val state = proton_delivery.getRemoteState();
+      state match {
+        case null =>
+        case accepted:Accepted =>
+          if( !proton_delivery.remotelySettled() ) {
+              proton_delivery.disposition(new Accepted());
           }
-
-          if (apollo_delivery.ack != null) {
-            apollo_delivery.ack(ackType, null)
+          settle(proton_delivery, Consumed, false);
+        case rejected:Rejected =>
+          // re-deliver /w incremented delivery counter.
+          settle(proton_delivery, null, true);
+        case release:Released =>
+          // re-deliver && don't increment the counter.
+          settle(proton_delivery, null, false);
+        case modified:Modified =>
+          def b(v:java.lang.Boolean) = v!=null && v.booleanValue()
+          var ackType = if(b(modified.getUndeliverableHere())) {
+              // receiver does not want the message..
+              // perhaps we should DLQ it?
+              Poisoned;
+          } else {
+            // Delivered ??
+            null
           }
-          delivery.settle
-          pump_out
+          settle(proton_delivery, ackType, b(modified.getDeliveryFailed()));
+      }
+    }
+
+    def settle(delivery:DeliveryImpl, ackType:DeliveryResult, incrementRedelivery:Boolean) {
+      val (session, apollo_delivery) = delivery.getContext.asInstanceOf[(Session[Delivery], Delivery)]
+      if( incrementRedelivery ) {
+        apollo_delivery.redelivered
+      }
+
+      val tag = delivery.getTag();
+      if( tag !=null && tag.length>0 ) {
+          checkinTag(tag);
+      }
+
+      if( ackType == null ) {
+        redeliveries.addFirst((session, apollo_delivery))
+        session_manager.drain_overflow
+      } else {
+
+        val remoteState = delivery.getRemoteState
+        if (remoteState != null && remoteState.isInstanceOf[TransactionalState]) {
+          val s: TransactionalState = remoteState.asInstanceOf[TransactionalState]
+          val txid = toLong(s.getTxnId)
+          async_die("txs-not-supported", "Transactions not yet supported")
+          return
+        }
+
+        if( apollo_delivery.ack != null ) {
+          apollo_delivery.ack(ackType, null)
         }
       }
+      delivery.settle()
+      pump_out
+
     }
 
     class AmqpConsumerSession(p: DeliveryProducer) extends DeliverySession with SessionSinkFilter[Delivery] {

Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpConnectOptions.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpConnectOptions.java?rev=1408852&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpConnectOptions.java (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpConnectOptions.java Tue Nov 13 17:41:01 2012
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.amqp.hawtdispatch.api;
+
+import org.fusesource.hawtdispatch.DispatchQueue;
+import org.fusesource.hawtdispatch.transport.TcpTransport;
+
+import javax.net.ssl.SSLContext;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.*;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class AmqpConnectOptions implements Cloneable {
+
+    private static final long KEEP_ALIVE = Long.parseLong(System.getProperty("amqp.thread.keep_alive", ""+1000));
+    private static final long STACK_SIZE = Long.parseLong(System.getProperty("amqp.thread.stack_size", ""+1024*512));
+    private static ThreadPoolExecutor blockingThreadPool;
+
+    public synchronized static ThreadPoolExecutor getBlockingThreadPool() {
+        if( blockingThreadPool == null ) {
+            blockingThreadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, KEEP_ALIVE, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
+                    public Thread newThread(Runnable r) {
+                        Thread rc = new Thread(null, r, "AMQP Task", STACK_SIZE);
+                        rc.setDaemon(true);
+                        return rc;
+                    }
+                }) {
+
+                    @Override
+                    public void shutdown() {
+                        // we don't ever shutdown since we are shared..
+                    }
+
+                    @Override
+                    public List<Runnable> shutdownNow() {
+                        // we don't ever shutdown since we are shared..
+                        return Collections.emptyList();
+                    }
+                };
+        }
+        return blockingThreadPool;
+    }
+    public synchronized static void setBlockingThreadPool(ThreadPoolExecutor pool) {
+        blockingThreadPool = pool;
+    }
+
+    private static final URI DEFAULT_HOST;
+    static {
+        try {
+            DEFAULT_HOST = new URI("tcp://localhost");
+        } catch (URISyntaxException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    URI host = DEFAULT_HOST;
+    URI localAddress;
+    SSLContext sslContext;
+    DispatchQueue dispatchQueue;
+    Executor blockingExecutor;
+    int maxReadRate;
+    int maxWriteRate;
+    int trafficClass = TcpTransport.IPTOS_THROUGHPUT;
+    boolean useLocalHost;
+    int receiveBufferSize = 1024*64;
+    int sendBufferSize = 1024*64;
+    String localContainerId;
+    String remoteContainerId;
+    String user;
+    String password;
+
+
+    @Override
+    public AmqpConnectOptions clone() {
+        try {
+            return (AmqpConnectOptions) super.clone();
+        } catch (CloneNotSupportedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public String getLocalContainerId() {
+        return localContainerId;
+    }
+
+    public void setLocalContainerId(String localContainerId) {
+        this.localContainerId = localContainerId;
+    }
+
+    public String getPassword() {
+        return password;
+    }
+
+    public void setPassword(String password) {
+        this.password = password;
+    }
+
+    public String getRemoteContainerId() {
+        return remoteContainerId;
+    }
+
+    public void setRemoteContainerId(String remoteContainerId) {
+        this.remoteContainerId = remoteContainerId;
+    }
+
+    public String getUser() {
+        return user;
+    }
+
+    public void setUser(String user) {
+        this.user = user;
+    }
+
+    public Executor getBlockingExecutor() {
+        return blockingExecutor;
+    }
+
+    public void setBlockingExecutor(Executor blockingExecutor) {
+        this.blockingExecutor = blockingExecutor;
+    }
+
+    public DispatchQueue getDispatchQueue() {
+        return dispatchQueue;
+    }
+
+    public void setDispatchQueue(DispatchQueue dispatchQueue) {
+        this.dispatchQueue = dispatchQueue;
+    }
+
+    public URI getLocalAddress() {
+        return localAddress;
+    }
+
+    public void setLocalAddress(String localAddress) throws URISyntaxException {
+        this.setLocalAddress(new URI(localAddress));
+    }
+    public void setLocalAddress(URI localAddress) {
+        this.localAddress = localAddress;
+    }
+
+    public int getMaxReadRate() {
+        return maxReadRate;
+    }
+
+    public void setMaxReadRate(int maxReadRate) {
+        this.maxReadRate = maxReadRate;
+    }
+
+    public int getMaxWriteRate() {
+        return maxWriteRate;
+    }
+
+    public void setMaxWriteRate(int maxWriteRate) {
+        this.maxWriteRate = maxWriteRate;
+    }
+
+    public int getReceiveBufferSize() {
+        return receiveBufferSize;
+    }
+
+    public void setReceiveBufferSize(int receiveBufferSize) {
+        this.receiveBufferSize = receiveBufferSize;
+    }
+
+    public URI getHost() {
+        return host;
+    }
+    public void setHost(String host, int port) throws URISyntaxException {
+        this.setHost(new URI("tcp://"+host+":"+port));
+    }
+    public void setHost(String host) throws URISyntaxException {
+        this.setHost(new URI(host));
+    }
+    public void setHost(URI host) {
+        this.host = host;
+    }
+
+    public int getSendBufferSize() {
+        return sendBufferSize;
+    }
+
+    public void setSendBufferSize(int sendBufferSize) {
+        this.sendBufferSize = sendBufferSize;
+    }
+
+    public SSLContext getSslContext() {
+        return sslContext;
+    }
+
+    public void setSslContext(SSLContext sslContext) {
+        this.sslContext = sslContext;
+    }
+
+    public int getTrafficClass() {
+        return trafficClass;
+    }
+
+    public void setTrafficClass(int trafficClass) {
+        this.trafficClass = trafficClass;
+    }
+
+    public boolean isUseLocalHost() {
+        return useLocalHost;
+    }
+
+    public void setUseLocalHost(boolean useLocalHost) {
+        this.useLocalHost = useLocalHost;
+    }
+}

Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpConnection.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpConnection.java?rev=1408852&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpConnection.java (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpConnection.java Tue Nov 13 17:41:01 2012
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.apollo.amqp.hawtdispatch.api;
+
+import org.apache.activemq.apollo.amqp.hawtdispatch.impl.AmqpListener;
+import org.apache.activemq.apollo.amqp.hawtdispatch.impl.AmqpTransport;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Endpoint;
+import org.apache.qpid.proton.engine.impl.ConnectionImpl;
+import org.apache.qpid.proton.engine.impl.ProtocolTracer;
+import org.apache.qpid.proton.engine.impl.SessionImpl;
+import org.fusesource.hawtdispatch.DispatchQueue;
+import org.fusesource.hawtdispatch.Task;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class AmqpConnection extends AmqpEndpointBase  {
+
+    AmqpTransport transport;
+    ConnectionImpl connection;
+    HashSet<AmqpSender> senders = new HashSet<AmqpSender>();
+    boolean closing = false;
+
+    public static AmqpConnection connect(AmqpConnectOptions options) {
+        return new AmqpConnection(options);
+    }
+
+    private AmqpConnection(AmqpConnectOptions options) {
+        transport = AmqpTransport.connect(options);
+        transport.setListener(new AmqpListener() {
+            @Override
+            public void processDelivery(Delivery delivery) {
+                Attachment attachment = (Attachment) getTransport().context(delivery.getLink()).getAttachment();
+                AmqpLink link = (AmqpLink) attachment.endpoint();
+                link.processDelivery(delivery);
+            }
+
+            @Override
+            public void processRefill() {
+                for(AmqpSender sender: new ArrayList<AmqpSender>(senders)) {
+                    sender.pumpDeliveries();
+                }
+                pumpOut();
+            }
+
+        });
+        connection = transport.connection();
+        connection.open();
+        attach();
+    }
+
+    public void waitForConnected() throws Exception {
+        assertNotOnDispatchQueue();
+        getConnectedFuture().await();
+    }
+
+    public Future<Void> getConnectedFuture() {
+        final Promise<Void> rc = new Promise<Void>();
+        queue().execute(new Task() {
+            @Override
+            public void run() {
+                onConnected(rc);
+            }
+        });
+        return rc;
+    }
+
+    public void onConnected(Callback<Void> cb) {
+        transport.onTransportConnected(cb);
+    }
+
+    @Override
+    protected Endpoint getEndpoint() {
+        return connection;
+    }
+
+    @Override
+    protected AmqpConnection getConnection() {
+        return this;
+    }
+
+    @Override
+    protected AmqpEndpointBase getParent() {
+        return null;
+    }
+
+    public AmqpSession createSession() {
+        assertExecuting();
+        SessionImpl session = connection.session();
+        session.open();
+        pumpOut();
+        return new AmqpSession(this, session);
+    }
+
+    public int getMaxSessions() {
+        return connection.getMaxChannels();
+    }
+
+    public void disconnect() {
+        closing = true;
+        transport.disconnect();
+    }
+
+    public void waitForDisconnected() throws Exception {
+        assertNotOnDispatchQueue();
+        getDisconnectedFuture().await();
+    }
+
+    public Future<Void> getDisconnectedFuture() {
+        final Promise<Void> rc = new Promise<Void>();
+        queue().execute(new Task() {
+            @Override
+            public void run() {
+                onDisconnected(rc);
+            }
+        });
+        return rc;
+    }
+
+    public void onDisconnected(Callback<Void> cb) {
+        transport.onTransportDisconnected(cb);
+    }
+
+    public TransportState getTransportState() {
+        return transport.getState();
+    }
+
+    public Throwable getTransportFailure() {
+        return transport.getFailure();
+    }
+
+    public Future<Throwable> getTransportFailureFuture() {
+        final Promise<Throwable> rc = new Promise<Throwable>();
+        queue().execute(new Task() {
+            @Override
+            public void run() {
+                onTransportFailure(rc);
+            }
+        });
+        return rc;
+    }
+
+    public void onTransportFailure(Callback<Throwable> cb) {
+        transport.onTransportFailure(cb);
+    }
+
+    @Override
+    public DispatchQueue queue() {
+        return super.queue();
+    }
+
+    public void setProtocolTracer(ProtocolTracer protocolTracer) {
+        transport.setProtocolTracer(protocolTracer);
+    }
+
+    public ProtocolTracer getProtocolTracer() {
+        return transport.getProtocolTracer();
+    }
+}

Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpDeliveryListener.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpDeliveryListener.java?rev=1408852&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpDeliveryListener.java (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpDeliveryListener.java Tue Nov 13 17:41:01 2012
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.apollo.amqp.hawtdispatch.api;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface AmqpDeliveryListener {
+
+    /**
+     * Caller should suspend/resume the AmqpReceiver to
+     * flow control the delivery of messages.
+     *
+     * @param delivery
+     */
+    void onMessageDelivery(MessageDelivery delivery);
+}

Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpEndpointBase.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpEndpointBase.java?rev=1408852&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpEndpointBase.java (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpEndpointBase.java Tue Nov 13 17:41:01 2012
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.apollo.amqp.hawtdispatch.api;
+
+import org.apache.activemq.apollo.amqp.hawtdispatch.impl.*;
+import org.apache.qpid.proton.engine.Endpoint;
+import org.apache.qpid.proton.engine.EndpointError;
+import org.apache.qpid.proton.engine.EndpointState;
+import org.fusesource.hawtdispatch.Dispatch;
+import org.fusesource.hawtdispatch.DispatchQueue;
+import org.fusesource.hawtdispatch.Task;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract class AmqpEndpointBase extends WatchBase {
+    abstract protected Endpoint getEndpoint();
+    abstract protected AmqpEndpointBase getParent();
+
+    protected AmqpConnection getConnection() {
+        return getParent().getConnection();
+    }
+
+    protected AmqpTransport getTransport() {
+        return getConnection().transport;
+    }
+
+    protected DispatchQueue queue() {
+        return getTransport().queue();
+    }
+
+    protected void assertExecuting() {
+        getTransport().assertExecuting();
+    }
+
+    public void waitForRemoteOpen() throws Exception {
+        assertNotOnDispatchQueue();
+        getRemoteOpenFuture().await();
+    }
+
+    public Future<Void> getRemoteOpenFuture() {
+        final Promise<Void> rc = new Promise<Void>();
+        queue().execute(new Task() {
+            @Override
+            public void run() {
+                onRemoteOpen(rc);
+            }
+        });
+        return rc;
+    }
+
+    public void onRemoteOpen(final Callback<Void> cb) {
+        addWatch(new Watch() {
+            @Override
+            public boolean execute() {
+                switch (getEndpoint().getRemoteState()) {
+                    case ACTIVE:
+                        cb.onSuccess(null);
+                        return true;
+                    case CLOSED:
+                        cb.onFailure(Support.illegalState("closed"));
+                        return true;
+                }
+                return false;
+            }
+        });
+    }
+
+    public EndpointError waitForRemoteClose() throws Exception {
+        assertNotOnDispatchQueue();
+        return getRemoteCloseFuture().await();
+    }
+
+    public Future<EndpointError> getRemoteCloseFuture() {
+        final Promise<EndpointError> rc = new Promise<EndpointError>();
+        queue().execute(new Task() {
+            @Override
+            public void run() {
+                onRemoteClose(rc);
+            }
+        });
+        return rc;
+    }
+
+    public void onRemoteClose(final Callback<EndpointError> cb) {
+        addWatch(new Watch() {
+            @Override
+            public boolean execute() {
+                if (getEndpoint().getRemoteState() == EndpointState.CLOSED) {
+                    cb.onSuccess(getEndpoint().getRemoteError());
+                    return true;
+                }
+                return false;
+            }
+        });
+    }
+
+    public void close() {
+        getEndpoint().close();
+        pumpOut();
+    }
+
+    public EndpointState getRemoteState() {
+        return getEndpoint().getRemoteState();
+    }
+
+    public EndpointError getRemoteError() {
+        return getEndpoint().getRemoteError();
+    }
+
+    static protected EndpointError toError(Throwable value) {
+        return new EndpointError("error", value.toString());
+    }
+
+    class Attachment extends Task {
+        AmqpEndpointBase endpoint() {
+            return AmqpEndpointBase.this;
+        }
+
+        @Override
+        public void run() {
+            fireWatches();
+        }
+    }
+
+    protected void attach() {
+        getTransport().context(getEndpoint()).setAttachment(new Attachment());
+    }
+
+    protected void defer(Defer defer) {
+        getTransport().defer(defer);
+    }
+
+    protected void pumpOut() {
+        getTransport().pumpOut();
+    }
+
+    static protected void assertNotOnDispatchQueue() {
+        assert Dispatch.getCurrentQueue()==null : "Not allowed to be called when executing on a dispatch queue";
+    }
+
+}

Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpLink.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpLink.java?rev=1408852&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpLink.java (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpLink.java Tue Nov 13 17:41:01 2012
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.apollo.amqp.hawtdispatch.api;
+
+import org.apache.qpid.proton.engine.Delivery;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract public class AmqpLink extends AmqpEndpointBase {
+    abstract protected void processDelivery(Delivery delivery);
+}

Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpReceiver.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpReceiver.java?rev=1408852&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpReceiver.java (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpReceiver.java Tue Nov 13 17:41:01 2012
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.apollo.amqp.hawtdispatch.api;
+
+import org.apache.activemq.apollo.amqp.hawtdispatch.impl.Defer;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.engine.impl.DeliveryImpl;
+import org.apache.qpid.proton.engine.impl.ReceiverImpl;
+import org.apache.qpid.proton.type.messaging.Accepted;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtbuf.ByteArrayOutputStream;
+
+import java.util.LinkedList;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class AmqpReceiver extends AmqpLink {
+
+    final AmqpSession parent;
+    final ReceiverImpl receiver;
+
+    public AmqpReceiver(AmqpSession parent, ReceiverImpl receiver, QoS qos) {
+        this.parent = parent;
+        this.receiver = receiver;
+        attach();
+    }
+
+    @Override
+    protected ReceiverImpl getEndpoint() {
+        return receiver;
+    }
+    @Override
+    protected AmqpSession getParent() {
+        return parent;
+    }
+
+    ByteArrayOutputStream current = new ByteArrayOutputStream();
+
+    @Override
+    protected void processDelivery(Delivery delivery) {
+        if( !delivery.isReadable() ) {
+            System.out.println("it was not readable!");
+            return;
+        }
+
+        if( current==null ) {
+            current = new ByteArrayOutputStream();
+        }
+
+        int count;
+        byte data[] = new byte[1024*4];
+        while( (count = receiver.recv(data, 0, data.length)) > 0 ) {
+            current.write(data, 0, count);
+        }
+
+        // Expecting more deliveries..
+        if( count == 0 ) {
+            return;
+        }
+
+        receiver.advance();
+        Buffer buffer = current.toBuffer();
+        current = null;
+        onMessage(delivery, buffer);
+
+    }
+
+    LinkedList<MessageDelivery> inbound = new LinkedList<MessageDelivery>();
+
+    protected void onMessage(Delivery delivery, Buffer buffer) {
+        MessageDelivery md = new MessageDelivery(buffer) {
+            @Override
+            AmqpLink link() {
+                return AmqpReceiver.this;
+            }
+
+            @Override
+            public void settle() {
+                if( !delivery.isSettled() ) {
+                    delivery.disposition(new Accepted());
+                    delivery.settle();
+                }
+                drain();
+            }
+        };
+        md.delivery = (DeliveryImpl) delivery;
+        delivery.setContext(md);
+        inbound.add(md);
+        drainInbound();
+    }
+
+    public void drain() {
+        defer(deferedDrain);
+    }
+
+    Defer deferedDrain = new Defer(){
+        public void run() {
+            drainInbound();
+        }
+    };
+    int resumed = 0;
+
+    public void resume() {
+        resumed++;
+    }
+    public void suspend() {
+        resumed--;
+    }
+
+    AmqpDeliveryListener deliveryListener;
+    private void drainInbound() {
+        while( deliveryListener!=null && !inbound.isEmpty() && resumed>0) {
+            deliveryListener.onMessageDelivery(inbound.removeFirst());
+            receiver.flow(1);
+        }
+    }
+
+    public AmqpDeliveryListener getDeliveryListener() {
+        return deliveryListener;
+    }
+
+    public void setDeliveryListener(AmqpDeliveryListener deliveryListener) {
+        this.deliveryListener = deliveryListener;
+        drainInbound();
+    }
+}

Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpSender.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpSender.java?rev=1408852&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpSender.java (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpSender.java Tue Nov 13 17:41:01 2012
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.apollo.amqp.hawtdispatch.api;
+
+import org.apache.activemq.apollo.amqp.hawtdispatch.impl.Defer;
+import org.apache.activemq.apollo.amqp.hawtdispatch.impl.Watch;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.EndpointError;
+import org.apache.qpid.proton.engine.EndpointState;
+import org.apache.qpid.proton.engine.impl.DeliveryImpl;
+import org.apache.qpid.proton.engine.impl.SenderImpl;
+import org.apache.qpid.proton.message.Message;
+import org.apache.qpid.proton.type.messaging.Accepted;
+import org.apache.qpid.proton.type.messaging.Modified;
+import org.apache.qpid.proton.type.messaging.Rejected;
+import org.apache.qpid.proton.type.messaging.Released;
+import org.apache.qpid.proton.type.transport.DeliveryState;
+import org.fusesource.hawtbuf.Buffer;
+
+import java.io.UnsupportedEncodingException;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class AmqpSender extends AmqpLink {
+
+    private  byte[] EMPTY_BYTE_ARRAY = new byte[]{};
+    long nextTagId = 0;
+    HashSet<byte[]> tagCache = new HashSet<byte[]>();
+
+    final AmqpSession parent;
+    private final QoS qos;
+    final SenderImpl sender;
+
+    public AmqpSender(AmqpSession parent, SenderImpl sender, QoS qos) {
+        this.parent = parent;
+        this.sender = sender;
+        this.qos = qos;
+        attach();
+        getConnection().senders.add(this);
+    }
+
+    @Override
+    public void close() {
+        super.close();
+        getConnection().senders.remove(this);
+    }
+
+    @Override
+    protected SenderImpl getEndpoint() {
+        return sender;
+    }
+
+    @Override
+    protected AmqpSession getParent() {
+        return parent;
+    }
+
+    final LinkedList<MessageDelivery> outbound = new LinkedList<MessageDelivery>();
+    long outboundBufferSize;
+
+    public MessageDelivery send(Message message) {
+        assertExecuting();
+        MessageDelivery rc = new MessageDelivery(message) {
+            @Override
+            AmqpLink link() {
+                return AmqpSender.this;
+            }
+
+            @Override
+            public void redeliver(boolean incrementDeliveryCounter) {
+                super.redeliver(incrementDeliveryCounter);
+                outbound.add(this);
+                outboundBufferSize += initialSize;
+                defer(deferedPumpDeliveries);
+            }
+        };
+        outbound.add(rc);
+        outboundBufferSize += rc.initialSize;
+        pumpDeliveries();
+        pumpOut();
+        return rc;
+    }
+
+    Buffer currentBuffer;
+    DeliveryImpl currentDelivery;
+
+    Defer deferedPumpDeliveries = new Defer() {
+        public void run() {
+            pumpDeliveries();
+        }
+    };
+
+    public long getOverflowBufferSize() {
+        return outboundBufferSize;
+    }
+
+    protected void pumpDeliveries() {
+        assertExecuting();
+        try {
+            while(true) {
+                while( currentBuffer !=null ) {
+                    if( sender.getCredit() > 0 ) {
+                        int sent = sender.send(currentBuffer.data, currentBuffer.offset, currentBuffer.length);
+                        currentBuffer.moveHead(sent);
+                        if( currentBuffer.length == 0 ) {
+                            DeliveryImpl current = currentDelivery;
+                            MessageDelivery md = (MessageDelivery) current.getContext();
+                            currentBuffer = null;
+                            currentDelivery = null;
+                            if( qos == QoS.AT_MOST_ONCE ) {
+                                current.settle();
+                            } else {
+                                sender.advance();
+                            }
+                            md.fireWatches();
+                        }
+                    } else {
+                        return;
+                    }
+                }
+
+                if( outbound.isEmpty() ) {
+                    return;
+                }
+
+                final MessageDelivery md = outbound.removeFirst();
+                outboundBufferSize -= md.initialSize;
+                currentBuffer = md.encoded();
+                if( qos == QoS.AT_MOST_ONCE ) {
+                    currentDelivery = sender.delivery(EMPTY_BYTE_ARRAY, 0, 0);
+                } else {
+                    final byte[] tag = nextTag();
+                    currentDelivery = sender.delivery(tag, 0, tag.length);
+                }
+                md.delivery = currentDelivery;
+                currentDelivery.setContext(md);
+            }
+        } finally {
+            fireWatches();
+        }
+    }
+
+    @Override
+    protected void processDelivery(Delivery delivery) {
+        final MessageDelivery md  = (MessageDelivery) delivery.getContext();
+        if( delivery.remotelySettled() && delivery.getTag().length > 0 ) {
+            checkinTag(delivery.getTag());
+        }
+        final DeliveryState state = delivery.getRemoteState();
+        if( state!=null ) {
+            if( state instanceof Accepted) {
+                if( !delivery.remotelySettled() ) {
+                    delivery.disposition(new Accepted());
+                }
+            } else if( state instanceof Rejected) {
+                // re-deliver /w incremented delivery counter.
+                md.delivery = null;
+                md.incrementDeliveryCount();
+                outbound.addLast(md);
+            } else if( state instanceof Released) {
+                // re-deliver && don't increment the counter.
+                md.delivery = null;
+                outbound.addLast(md);
+            } else if( state instanceof Modified) {
+                Modified modified = (Modified) state;
+                if ( modified.getDeliveryFailed() ) {
+                  // increment delivery counter..
+                  md.incrementDeliveryCount();
+                }
+            }
+            delivery.settle();
+        }
+        md.fireWatches();
+    }
+
+    byte[] nextTag() {
+        byte[] rc;
+        if (tagCache != null && !tagCache.isEmpty()) {
+            final Iterator<byte[]> iterator = tagCache.iterator();
+            rc = iterator.next();
+            iterator.remove();
+        } else {
+            try {
+                rc = Long.toHexString(nextTagId++).getBytes("UTF-8");
+            } catch (UnsupportedEncodingException e) {
+                throw new RuntimeException(e);
+            }
+        }
+        return rc;
+    }
+
+    void checkinTag(byte[] data) {
+        if( tagCache.size() < 1024 ) {
+            tagCache.add(data);
+        }
+    }
+
+    public void onOverflowBufferDrained(final Callback<Void> cb) {
+        addWatch(new Watch() {
+            @Override
+            public boolean execute() {
+                if (outboundBufferSize==0) {
+                    cb.onSuccess(null);
+                    return true;
+                }
+                return false;
+            }
+        });
+    }
+}

Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpSession.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpSession.java?rev=1408852&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpSession.java (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpSession.java Tue Nov 13 17:41:01 2012
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.apollo.amqp.hawtdispatch.api;
+
+import org.apache.qpid.proton.engine.Endpoint;
+import org.apache.qpid.proton.engine.Link;
+import org.apache.qpid.proton.engine.impl.ReceiverImpl;
+import org.apache.qpid.proton.engine.impl.SenderImpl;
+import org.apache.qpid.proton.engine.impl.SessionImpl;
+import org.apache.qpid.proton.message.Message;
+import org.apache.qpid.proton.type.Binary;
+import org.apache.qpid.proton.type.messaging.*;
+import org.apache.qpid.proton.type.transport.SenderSettleMode;
+
+import java.util.UUID;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class AmqpSession extends AmqpEndpointBase {
+
+    final AmqpConnection parent;
+    final SessionImpl session;
+
+
+    public AmqpSession(AmqpConnection parent, SessionImpl session) {
+        this.parent = parent;
+        this.session = session;
+        attach();
+    }
+
+    @Override
+    protected Endpoint getEndpoint() {
+        return session;
+    }
+
+    @Override
+    protected AmqpConnection getParent() {
+        return parent;
+    }
+
+    public AmqpSender createSender(Target target) {
+        return createSender(target, QoS.AT_LEAST_ONCE);
+    }
+
+    public AmqpSender createSender(Target target, QoS qos) {
+        return createSender(target, qos, UUID.randomUUID().toString());
+    }
+
+    public AmqpSender createSender(Target target, QoS qos, String name) {
+        assertExecuting();
+        SenderImpl sender = session.sender(name);
+        attach();
+//        Source source = new Source();
+//        source.setAddress(UUID.randomUUID().toString());
+//        sender.setSource(source);
+        sender.setTarget(target);
+        configureQos(sender, qos);
+        sender.open();
+        pumpOut();
+        return new AmqpSender(this, sender, qos);
+    }
+
+    public AmqpReceiver createReceiver(Source source) {
+        return createReceiver(source, QoS.AT_LEAST_ONCE);
+    }
+
+    public AmqpReceiver createReceiver(Source source, QoS qos) {
+        return createReceiver(source, qos, 100);
+    }
+
+    public AmqpReceiver createReceiver(Source source, QoS qos, int prefetch) {
+        return createReceiver(source, qos, prefetch,  UUID.randomUUID().toString());
+    }
+
+    public AmqpReceiver createReceiver(Source source, QoS qos, int prefetch, String name) {
+        assertExecuting();
+        ReceiverImpl receiver = session.receiver(name);
+        receiver.setSource(source);
+//        Target target = new Target();
+//        target.setAddress(UUID.randomUUID().toString());
+//        receiver.setTarget(target);
+        receiver.flow(prefetch);
+        configureQos(receiver, qos);
+        receiver.open();
+        pumpOut();
+        return new AmqpReceiver(this, receiver, qos);
+    }
+
+    private void configureQos(Link link, QoS qos) {
+        switch (qos) {
+            case AT_MOST_ONCE:
+                link.setSenderSettleMode(SenderSettleMode.SETTLED);
+                link.setReceiverSettleMode(SenderSettleMode.UNSETTLED);
+                break;
+            case AT_LEAST_ONCE:
+                link.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+                link.setReceiverSettleMode(SenderSettleMode.SETTLED);
+                break;
+            case EXACTLY_ONCE:
+                link.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+                link.setReceiverSettleMode(SenderSettleMode.MIXED);
+                break;
+        }
+    }
+
+    public Message createTextMessage(String value) {
+        Message msg = new Message();
+        Section body = new AmqpValue(value);
+        msg.setBody(body);
+        return msg;
+    }
+
+    public Message createBinaryMessage(byte value[]) {
+        return createBinaryMessage(value, 0, value.length);
+    }
+
+    public Message createBinaryMessage(byte value[], int offset, int len) {
+        Message msg = new Message();
+        Data body = new Data(new Binary(value, offset,len));
+        msg.setBody(body);
+        return msg;
+    }
+}

Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/Callback.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/Callback.java?rev=1408852&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/Callback.java (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/Callback.java Tue Nov 13 17:41:01 2012
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.amqp.hawtdispatch.api;
+
+/**
+ * <p>
+ * Function Result that carries one value.
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface Callback<T> {
+    public void onSuccess(T value);
+    public void onFailure(Throwable value);
+}

Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/ChainedCallback.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/ChainedCallback.java?rev=1408852&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/ChainedCallback.java (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/ChainedCallback.java Tue Nov 13 17:41:01 2012
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.amqp.hawtdispatch.api;
+
+/**
+ * <p>
+ * Function Result that carries one value.
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract public class ChainedCallback<In,Out> implements Callback<In> {
+
+    public final Callback<Out> next;
+
+    public ChainedCallback(Callback<Out> next) {
+        this.next = next;
+    }
+
+    public void onFailure(Throwable value) {
+        next.onFailure(value);
+    }
+}

Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/DeliveryAttachment.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/DeliveryAttachment.java?rev=1408852&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/DeliveryAttachment.java (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/DeliveryAttachment.java Tue Nov 13 17:41:01 2012
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.apollo.amqp.hawtdispatch.api;
+
+import org.apache.qpid.proton.engine.Delivery;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract public class DeliveryAttachment {
+    abstract void processDelivery(Delivery delivery);
+}

Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/Future.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/Future.java?rev=1408852&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/Future.java (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/Future.java Tue Nov 13 17:41:01 2012
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.amqp.hawtdispatch.api;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * <p>A simplified Future function results interface.</p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface Future<T> {
+    T await() throws Exception;
+    T await(long amount, TimeUnit unit) throws Exception;
+    void then(Callback<T> callback);
+
+}

Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/MessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/MessageDelivery.java?rev=1408852&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/MessageDelivery.java (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/MessageDelivery.java Tue Nov 13 17:41:01 2012
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.apollo.amqp.hawtdispatch.api;
+
+import org.apache.activemq.apollo.amqp.hawtdispatch.impl.DroppingWritableBuffer;
+import org.apache.activemq.apollo.amqp.hawtdispatch.impl.Watch;
+import org.apache.activemq.apollo.amqp.hawtdispatch.impl.WatchBase;
+import org.apache.qpid.proton.codec.CompositeWritableBuffer;
+import org.apache.qpid.proton.codec.WritableBuffer;
+import org.apache.qpid.proton.engine.impl.DeliveryImpl;
+import org.apache.qpid.proton.message.Message;
+import org.apache.qpid.proton.type.messaging.Accepted;
+import org.apache.qpid.proton.type.messaging.Modified;
+import org.apache.qpid.proton.type.messaging.Rejected;
+import org.apache.qpid.proton.type.messaging.Released;
+import org.apache.qpid.proton.type.transport.DeliveryState;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtdispatch.Task;
+
+import java.nio.ByteBuffer;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public abstract class MessageDelivery extends WatchBase {
+
+    final int initialSize;
+    private Message message;
+    private Buffer encoded;
+    public DeliveryImpl delivery;
+    private int sizeHint = 1024*4;
+
+    static Buffer encode(Message message, int sizeHint) {
+        ByteBuffer buffer = ByteBuffer.wrap(new byte[sizeHint]);
+        DroppingWritableBuffer overflow = new DroppingWritableBuffer();
+        int c = message.encode(new CompositeWritableBuffer(new WritableBuffer.ByteBufferWrapper(buffer), overflow));
+        if( overflow.position() > 0 ) {
+            buffer = ByteBuffer.wrap(new byte[sizeHint+overflow.position()]);
+            c = message.encode(new WritableBuffer.ByteBufferWrapper(buffer));
+        }
+        return new Buffer(buffer.array(), 0, c);
+    }
+
+    static Message decode(Buffer buffer) {
+        Message msg = new Message();
+        int offset = buffer.offset;
+        int len = buffer.length;
+        while( len > 0 ) {
+            int decoded = msg.decode(buffer.data, offset, len);
+            assert decoded > 0: "Make progress decoding the message";
+            offset += decoded;
+            len -= decoded;
+        }
+        return msg;
+    }
+
+    public MessageDelivery(Message message) {
+        this(message, encode(message, 1024*4));
+    }
+
+    public MessageDelivery(Buffer encoded) {
+        this(null, encoded);
+    }
+
+    public MessageDelivery(Message message, Buffer encoded) {
+        this.message = message;
+        this.encoded = encoded;
+        sizeHint = this.encoded.length;
+        initialSize = sizeHint;
+    }
+
+    public Message getMessage() {
+        if( message == null ) {
+            message = decode(encoded);
+        }
+        return message;
+    }
+
+    public Buffer encoded() {
+        if( encoded == null ) {
+            encoded = encode(message, sizeHint);
+            sizeHint = encoded.length;
+        }
+        return encoded;
+    }
+
+    public boolean isSettled() {
+        return delivery!=null && delivery.isSettled();
+    }
+
+    public DeliveryState getRemoteState() {
+        return delivery==null ? null : delivery.getRemoteState();
+    }
+
+    public DeliveryState getLocalState() {
+        return delivery==null ? null : delivery.getLocalState();
+    }
+
+    public void onEncoded(final Callback<Void> cb) {
+        addWatch(new Watch() {
+            @Override
+            public boolean execute() {
+                if( delivery!=null ) {
+                    cb.onSuccess(null);
+                    return true;
+                }
+                return false;
+            }
+        });
+    }
+
+    /**
+     * @return the remote delivery state when it changes.
+     * @throws Exception
+     */
+    public DeliveryState getRemoteStateChange() throws Exception {
+        AmqpEndpointBase.assertNotOnDispatchQueue();
+        return getRemoteStateChangeFuture().await();
+    }
+
+    /**
+     * @return the future remote delivery state when it changes.
+     */
+    public Future<DeliveryState> getRemoteStateChangeFuture() {
+        final Promise<DeliveryState> rc = new Promise<DeliveryState>();
+        link().queue().execute(new Task() {
+            @Override
+            public void run() {
+                onRemoteStateChange(rc);
+            }
+        });
+        return rc;
+    }
+
+    abstract AmqpLink link();
+
+    boolean watchingRemoteStateChange;
+    public void onRemoteStateChange(final Callback<DeliveryState> cb) {
+        watchingRemoteStateChange = true;
+        final DeliveryState original = delivery.getRemoteState();
+        addWatch(new Watch() {
+            @Override
+            public boolean execute() {
+                if (original == null) {
+                    if( delivery.getRemoteState()!=null ) {
+                        cb.onSuccess(delivery.getRemoteState());
+                        watchingRemoteStateChange = false;
+                        return true;
+                    }
+                } else {
+                    if( !original.equals(delivery.getRemoteState()) ) {
+                        cb.onSuccess(delivery.getRemoteState());
+                        watchingRemoteStateChange = false;
+                        return true;
+                    }
+                }
+                return false;
+            }
+        });
+    }
+
+    /**
+     * @return the remote delivery state once settled.
+     * @throws Exception
+     */
+    public DeliveryState getSettle() throws Exception {
+        AmqpEndpointBase.assertNotOnDispatchQueue();
+        return getSettleFuture().await();
+    }
+
+    /**
+     * @return the future remote delivery state once the delivery is settled.
+     */
+    public Future<DeliveryState> getSettleFuture() {
+        final Promise<DeliveryState> rc = new Promise<DeliveryState>();
+        link().queue().execute(new Task() {
+            @Override
+            public void run() {
+                onSettle(rc);
+            }
+        });
+        return rc;
+    }
+
+    public void onSettle(final Callback<DeliveryState> cb) {
+        addWatch(new Watch() {
+            @Override
+            public boolean execute() {
+                if( delivery!=null && (delivery.isSettled() || delivery.remotelySettled()) ) {
+                    cb.onSuccess(delivery.getRemoteState());
+                    return true;
+                }
+                return false;
+            }
+        });
+    }
+
+    @Override
+    protected void fireWatches() {
+        super.fireWatches();
+    }
+
+    void incrementDeliveryCount() {
+        Message msg = getMessage();
+        msg.setDeliveryCount(msg.getDeliveryCount()+1);
+        encoded = null;
+    }
+
+    public void redeliver(boolean incrementDeliveryCounter) {
+        if( incrementDeliveryCounter ) {
+            incrementDeliveryCount();
+        }
+    }
+
+    public void settle() {
+        if( !delivery.isSettled() ) {
+            delivery.settle();
+        }
+    }
+}

Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/Promise.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/Promise.java?rev=1408852&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/Promise.java (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/Promise.java Tue Nov 13 17:41:01 2012
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.amqp.hawtdispatch.api;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class Promise<T> implements Callback<T>, Future<T> {
+
+    private final CountDownLatch latch = new CountDownLatch(1);
+    Callback<T> next;
+    Throwable error;
+    T value;
+
+    public void onFailure(Throwable value) {
+        Callback<T> callback = null;
+        synchronized(this)  {
+            error = value;
+            latch.countDown();
+            callback = next;
+        }
+        if( callback!=null ) {
+            callback.onFailure(value);
+        }
+    }
+
+    public void onSuccess(T value) {
+        Callback<T> callback = null;
+        synchronized(this)  {
+            this.value = value;
+            latch.countDown();
+            callback = next;
+        }
+        if( callback!=null ) {
+            callback.onSuccess(value);
+        }
+    }
+
+    public void then(Callback<T> callback) {
+        boolean fire = false;
+        synchronized(this)  {
+            next = callback;
+            if( latch.getCount() == 0 ) {
+                fire = true;
+            }
+        }
+        if( fire ) {
+            if( error!=null ) {
+                callback.onFailure(error);
+            } else {
+                callback.onSuccess(value);
+            }
+        }
+    }
+
+    public T await(long amount, TimeUnit unit) throws Exception {
+        if( latch.await(amount, unit) ) {
+            return get();
+        } else {
+            throw new TimeoutException();
+        }
+    }
+
+    public T await() throws Exception {
+        latch.await();
+        return get();
+    }
+
+    private T get() throws Exception {
+        Throwable e = error;
+        if( e !=null ) {
+            if( e instanceof RuntimeException ) {
+                throw (RuntimeException) e;
+            } else if( e instanceof Exception) {
+                throw (Exception) e;
+            } else if( e instanceof Error) {
+                throw (Error) e;
+            } else {
+                // don't expect to hit this case.
+                throw new RuntimeException(e);
+            }
+        }
+        return value;
+    }
+
+}