You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/11/05 21:32:49 UTC

svn commit: r1405942 [1/2] - in /activemq/activemq-apollo/trunk: apollo-amqp/ apollo-amqp/src/main/resources/META-INF/services/org.apache.activemq.apollo/ apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/ apollo-amqp/src/main/scala/org/apache...

Author: chirino
Date: Mon Nov  5 20:32:47 2012
New Revision: 1405942

URL: http://svn.apache.org/viewvc?rev=1405942&view=rev
Log:
Switching the apollo-amqp module to use the qpid-proton lib to talk AMQP.

Added:
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpMessage.scala
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpConnection.java
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpHeader.java
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpListener.java
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpProtocolCodec.java
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/DroppingWritableBuffer.java
    activemq/activemq-apollo/trunk/apollo-amqp/src/test-swiftmq/scala/org/apache/activemq/apollo/amqp/test/
    activemq/activemq-apollo/trunk/apollo-amqp/src/test-swiftmq/scala/org/apache/activemq/apollo/amqp/test/SwiftMQClientTest.scala
      - copied, changed from r1405941, activemq/activemq-apollo/trunk/apollo-amqp/src/test-swiftmq/scala/org/apache/activemq/apollo/amqp/SwiftMQClientTest.scala
    activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/provider.properties
      - copied, changed from r1405941, activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index
    activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/hawtdispatch/
    activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/hawtdispatch/TransportConnectionTest.java
    activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/joram/
    activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/joram/ApolloAdmin.java
    activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/joram/JoramJmsTest.java
    activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/test/
    activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/test/AmqpTest.scala
    activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/test/QpidJmsTest.scala
Removed:
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpCodec.scala
    activemq/activemq-apollo/trunk/apollo-amqp/src/test-swiftmq/scala/org/apache/activemq/apollo/amqp/SwiftMQClientTest.scala
    activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/AmqpTest.scala
    activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/FuseSourceClientTest.scala
Modified:
    activemq/activemq-apollo/trunk/apollo-amqp/pom.xml
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocol.scala
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala

Modified: activemq/activemq-apollo/trunk/apollo-amqp/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/pom.xml?rev=1405942&r1=1405941&r2=1405942&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/pom.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/pom.xml Mon Nov  5 20:32:47 2012
@@ -44,11 +44,10 @@
       <artifactId>apollo-broker</artifactId>
       <version>99-trunk-SNAPSHOT</version>
     </dependency>
-
     <dependency>
-      <groupId>org.fusesource.fuse-extra</groupId>
-      <artifactId>fusesource-amqp</artifactId>
-      <version>99-master-SNAPSHOT</version>
+      <groupId>org.apache.qpid</groupId>
+      <artifactId>qpid-proton</artifactId>
+      <version>1.0-SNAPSHOT</version>
     </dependency>
 
     <!-- Scala Support -->
@@ -102,6 +101,27 @@
       <scope>test</scope>
     </dependency>
 
+    <!--  Joram JMS conformance tests -->
+    <dependency>
+      <groupId>org.fusesource.joram-jms-tests</groupId>
+      <artifactId>joram-jms-tests</artifactId>
+      <version>1.0</version>
+      <scope>test</scope>
+    </dependency>
+
+    <!-- qpid jms client -->
+    <dependency>
+      <groupId>org.apache.qpid</groupId>
+      <artifactId>qpid-amqp-1-0-client-jms</artifactId>
+      <version>0.18</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.geronimo.specs</groupId>
+      <artifactId>geronimo-jms_1.1_spec</artifactId>
+      <version>1.1.1</version>
+      <scope>test</scope>
+    </dependency>
 
     <dependency>
       <groupId>junit</groupId>

Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index?rev=1405942&r1=1405941&r2=1405942&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index Mon Nov  5 20:32:47 2012
@@ -14,4 +14,4 @@
 ## See the License for the specific language governing permissions and
 ## limitations under the License.
 ## ---------------------------------------------------------------------------
-org.apache.activemq.apollo.amqp.AmqpProtocolFactory
\ No newline at end of file
+org.apache.activemq.apollo.amqp.AmqpProtocol
\ No newline at end of file

Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpMessage.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpMessage.scala?rev=1405942&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpMessage.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpMessage.scala Mon Nov  5 20:32:47 2012
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.amqp
+
+
+import org.apache.activemq.apollo.broker.protocol
+import protocol.{MessageCodecFactory, MessageCodec}
+import hawtdispatch.DroppingWritableBuffer
+import java.nio.ByteBuffer
+import org.apache.qpid.proton.codec.{WritableBuffer, CompositeWritableBuffer}
+import org.fusesource.hawtbuf.Buffer._
+import org.apache.activemq.apollo.broker.Message
+import org.apache.activemq.apollo.broker.store.MessageRecord
+import org.fusesource.hawtbuf.Buffer
+import org.fusesource.hawtbuf.AsciiBuffer
+import org.fusesource.hawtbuf.UTF8Buffer
+
+object AmqpMessageCodecFactory extends MessageCodecFactory.Provider {
+  def create = Array[MessageCodec](AmqpMessageCodec)
+}
+
+object AmqpMessageCodec extends MessageCodec {
+
+  def ascii_id = ascii("amqp-1.0")
+  def id = "amqp-1.0"
+
+  def encode(message: Message):MessageRecord = {
+    val rc = new MessageRecord
+    rc.codec = ascii_id
+    rc.buffer = message.encoded
+    rc
+  }
+
+  def decode(message: MessageRecord) = {
+    assert( message.codec == ascii_id )
+    new AmqpMessage(message.buffer, null)
+  }
+
+}
+
+
+object AmqpMessage {
+  val SENDER_CONTAINER_KEY = "sender-container"
+}
+import AmqpMessage._
+
+class AmqpMessage(private var encoded_buffer:Buffer, private var decoded_message:org.apache.qpid.proton.message.Message=null) extends org.apache.activemq.apollo.broker.Message {
+
+  /**
+   * The encoder/decoder of the message
+   */
+  def codec = AmqpMessageCodec
+
+  def decoded = {
+    if( decoded_message==null ) {
+      val amqp = new org.apache.qpid.proton.message.Message();
+      var offset = encoded_buffer.offset
+      var len = encoded_buffer.length
+      while( len > 0 ) {
+          var decoded = amqp.decode(encoded_buffer.data, offset, len);
+          assert(decoded > 0, "Make progress decoding the message")
+          offset += decoded;
+          len -= decoded;
+      }
+      decoded_message = amqp
+
+    }
+    decoded_message
+  }
+
+  override def encoded = {
+    if( encoded_buffer == null ) {
+      var buffer = ByteBuffer.wrap(new Array[Byte](1024*4));
+      val overflow = new DroppingWritableBuffer();
+      var c = decoded_message.encode(new CompositeWritableBuffer(new WritableBuffer.ByteBufferWrapper(buffer), overflow));
+      if( overflow.position() > 0 ) {
+          buffer = ByteBuffer.wrap(new Array[Byte](1024*4+overflow.position()));
+          c = decoded_message.encode(new WritableBuffer.ByteBufferWrapper(buffer));
+      }
+      encoded_buffer = new Buffer(buffer.array(), 0, c)
+    }
+    encoded_buffer
+  }
+
+  def getBodyAs[T](toType : Class[T]): T = {
+    if (toType == classOf[Buffer]) {
+      encoded
+    } else if( toType == classOf[String] ) {
+      encoded.utf8
+    } else if (toType == classOf[AsciiBuffer]) {
+      encoded.ascii
+    } else if (toType == classOf[UTF8Buffer]) {
+      encoded.utf8
+    } else {
+      null
+    }
+  }.asInstanceOf[T]
+
+  def getLocalConnectionId: AnyRef = {
+    if ( decoded.getDeliveryAnnotations!=null ) {
+      decoded.getDeliveryAnnotations.getValue.get(SENDER_CONTAINER_KEY) match {
+        case x:String => x
+        case _ => null
+      }
+    } else {
+      null
+    }
+  }
+
+  def getProperty(name: String) = {
+    if( decoded.getApplicationProperties !=null ) {
+      decoded.getApplicationProperties.getValue.get(name).asInstanceOf[AnyRef]
+    } else {
+      null
+    }
+  }
+
+  def release() {}
+  def retain() {}
+  def retained(): Int = 0
+}

Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocol.scala?rev=1405942&r1=1405941&r2=1405942&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocol.scala Mon Nov  5 20:32:47 2012
@@ -17,25 +17,20 @@
 package org.apache.activemq.apollo.amqp
 
 import _root_.org.fusesource.hawtbuf._
+import hawtdispatch.AmqpProtocolCodec
 import org.apache.activemq.apollo.broker._
-import org.apache.activemq.apollo.broker.protocol.{MessageCodecFactory, MessageCodec, ProtocolCodecFactory, Protocol}
-import org.apache.activemq.apollo.broker.store._
-import AmqpCodec._
-import org.fusesource.amqp.codec.AMQPProtocolCodec
+import org.apache.activemq.apollo.broker.protocol.Protocol
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-/**
- * Creates AmqpCodec objects that encode/decode the
- * <a href="http://activemq.apache.org/amqp/">Amqp</a> protocol.
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-class AmqpProtocolCodecFactory extends ProtocolCodecFactory.Provider {
-  def id = PROTOCOL
+object AmqpProtocol extends Protocol {
+
+  def id = "amqp"
+  val PROTOCOL_ID = Buffer.ascii(id)
+  val PROTOCOL_MAGIC = new Buffer(Array[Byte]('A', 'M', 'Q', 'P'))
 
-  def createProtocolCodec(connector:Connector) = new AMQPProtocolCodec();
+  def createProtocolCodec(connector:Connector) = new AmqpProtocolCodec();
 
   def isIdentifiable() = true
 
@@ -48,26 +43,21 @@ class AmqpProtocolCodecFactory extends P
       header.startsWith(PROTOCOL_MAGIC)
     }
   }
-}
 
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-object AmqpProtocol extends AmqpProtocolCodecFactory with Protocol {
   def createProtocolHandler = new AmqpProtocolHandler
 }
 
-object AmqpMessageCodecFactory extends MessageCodecFactory.Provider {
-  def create = Array[MessageCodec](AmqpMessageCodec)
-}
-
-  /**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-object AmqpMessageCodec extends MessageCodec {
-  def id = PROTOCOL
-  def encode(message: Message) = AmqpCodec.encode(message)
-  def decode(message: MessageRecord) = AmqpCodec.decode(message)
-}
+//object AmqpMessageCodecFactory extends MessageCodecFactory.Provider {
+//  def create = Array[MessageCodec](AmqpMessageCodec)
+//}
+//
+//  /**
+// * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+// */
+//object AmqpMessageCodec extends MessageCodec {
+//  def id = AmqpProtocol.id
+//  def encode(message: Message) = AmqpCodec.encode(message)
+//  def decode(message: MessageRecord) = AmqpCodec.decode(message)
+//}
 
 

Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala?rev=1405942&r1=1405941&r2=1405942&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala Mon Nov  5 20:32:47 2012
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,95 +17,41 @@
 package org.apache.activemq.apollo.amqp
 
 import java.util.concurrent.TimeUnit
-import java.util.Date
-import scala.collection.mutable.{ListBuffer, HashMap}
+import collection.mutable.{ListBuffer, HashMap}
 
 import org.fusesource.hawtdispatch._
 import org.fusesource.hawtbuf._
-import org.fusesource.hawtbuf.Buffer._
 
 import org.apache.activemq.apollo.util._
 import org.apache.activemq.apollo.dto._
 import org.apache.activemq.apollo.broker._
 import org.apache.activemq.apollo.util.path.{PathParser, Path, LiteralPart}
-import org.apache.activemq.apollo.selector.SelectorParser
-import org.apache.activemq.apollo.filter.{BooleanExpression, FilterException}
-import org.apache.activemq.apollo.broker.protocol.ProtocolHandler
+import protocol.ProtocolHandler
 import org.apache.activemq.apollo.broker.security.SecurityContext
 import org.apache.activemq.apollo.amqp.dto._
-
-import org.fusesource.amqp._
-import org.fusesource.amqp.callback._
-import org.fusesource.amqp.callback.Callback
-import org.fusesource.amqp.types._
-
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-object AMQPMessage {
-
-  def apply(annotated:Envelope):AMQPMessage = {
-    val payload = MessageSupport.toBuffer(annotated)
-    val rc = AMQPMessage(payload)
-    rc._annotated = annotated
-    rc
-  }
-}
-
-case class AMQPMessage(payload:Buffer) extends org.apache.activemq.apollo.broker.Message {
-  import AmqpProtocolHandler._
-  def codec = AmqpMessageCodec
-
-  var _annotated:Envelope = _
-  def annotated = {
-    if ( _annotated ==null ) {
-      _annotated = MessageSupport.decodeEnvelope(payload)
-    }
-    _annotated
-  }
-  
-  def getBodyAs[T](toType : Class[T]): T = {
-    if (toType == classOf[Buffer]) {
-      payload
-    } else if( toType == classOf[String] ) {
-      payload.utf8
-    } else if (toType == classOf[AsciiBuffer]) {
-      payload.ascii
-    } else if (toType == classOf[UTF8Buffer]) {
-      payload.utf8
-    } else {
-      null
-    }
-  }.asInstanceOf[T]
-
-  def getLocalConnectionId: AnyRef = annotated.getDeliveryAnnotations.getValue.get(SENDER_CONTAINER_KEY) match {
-    case x:AMQPString => x.getValue
-    case _ => null
-  }
-
-  def getProperty(name: String)= annotated match {
-    case null => null
-    case annotated =>
-      annotated.getMessage.getApplicationProperties match {
-        case null => null
-        case props =>
-          props.getValue.get(new AMQPString(name)).asInstanceOf[Object]
-      }
-  }
-
-  def release() {}
-  def retain() {}
-  def retained(): Int = 0
-}
-
+import hawtdispatch.{AmqpProtocolCodec, AmqpListener, AmqpConnection}
+import org.apache.qpid.proton.engine
+import engine.impl.{LinkImpl, TransportImpl}
+import engine.{Receiver, Sasl, Sender, Link, EndpointError, EndpointState}
+import org.fusesource.hawtbuf.Buffer._
+import org.apache.qpid.proton.`type`.transaction.{TransactionalState, Coordinator}
+import org.apache.qpid.proton.`type`.messaging.{Data, Source, Target}
+import org.apache.activemq.apollo.broker.Delivery
+import org.apache.activemq.apollo.filter.{FilterException, BooleanExpression}
+import org.apache.qpid.proton.`type`.{Symbol => AmqpSymbol, Binary, DescribedType}
+import org.apache.activemq.apollo.selector.SelectorParser
+import org.apache.qpid.proton.`type`.transport.SenderSettleMode
+import java.util
+import java.io.IOException
+import org.apache.activemq.apollo.broker.FullSink
+import org.apache.activemq.apollo.broker.SubscriptionAddress
 
 object AmqpProtocolHandler extends Log {
-  val SENDER_CONTAINER_KEY = new AMQPString("sender-container")
 
   // How long we hold a failed connection open so that the remote end
   // can get the resulting error message.
-  val DEFAULT_DIE_DELAY = 5*1000L
-  val WAITING_ON_CLIENT_REQUEST = ()=> "client request"
+  val DEFAULT_DIE_DELAY = 5 * 1000L
+  val WAITING_ON_CLIENT_REQUEST = () => "client request"
 
   val DEFAULT_DETINATION_PARSER = new DestinationParser
   DEFAULT_DETINATION_PARSER.queue_prefix = "/queue/"
@@ -118,25 +64,43 @@ object AmqpProtocolHandler extends Log {
   DEFAULT_DETINATION_PARSER.any_child_wildcard = "*"
   DEFAULT_DETINATION_PARSER.any_descendant_wildcard = "**"
 
+  val JMS_SELECTOR = AmqpSymbol.valueOf("jms-selector")
+  val EMPTY_BYTE_ARRAY = Array[Byte]()
+
+  def toBytes(value: Long): Array[Byte] = {
+    val buffer: Buffer = new Buffer(8)
+    buffer.bigEndianEditor.writeLong(value)
+    return buffer.data
+  }
+
+  private def toLong(value: Binary): Long = {
+    val buffer: Buffer = new Buffer(value.getArray, value.getArrayOffset, value.getLength)
+    return buffer.bigEndianEditor.readLong
+  }
 }
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 class AmqpProtocolHandler extends ProtocolHandler {
+
   import AmqpProtocolHandler._
 
   val security_context = new SecurityContext
 
-  var connection_log:Log = AmqpProtocolHandler
-  var host:VirtualHost = null
+  var connection_log: Log = AmqpProtocolHandler
+  var host: VirtualHost = null
   var waiting_on = WAITING_ON_CLIENT_REQUEST
-  var config:AmqpDTO = _
+  var config: AmqpDTO = _
   var dead = false
+  var protocol_convert = "full"
 
   def session_id = security_context.session_id
-  def protocol = AmqpCodec.PROTOCOL
+
+  def protocol = AmqpProtocol.id
+
   def broker = connection.connector.broker
+
   def queue = connection.dispatch_queue
 
   def die_delay = {
@@ -144,7 +108,6 @@ class AmqpProtocolHandler extends Protoc
   }
 
   lazy val buffer_size = MemoryPropertyEditor.parse(Option(config.buffer_size).getOrElse("640k")).toInt
-  var amqp_connection:AMQPConnection = _
   var messages_sent = 0L
   var messages_received = 0L
 
@@ -152,33 +115,39 @@ class AmqpProtocolHandler extends Protoc
     var rc = new AmqpConnectionStatusDTO
     rc.protocol_version = "1.0.0"
     rc.user = security_context.user
-//    rc.subscription_count = consumers.size
+    //    rc.subscription_count = consumers.size
     rc.waiting_on = waiting_on()
     rc.messages_sent = messages_sent
     rc.messages_received = messages_received
     rc
   }
 
-  class ProtocolException(msg:String) extends RuntimeException(msg)
+  class ProtocolException(msg: String) extends RuntimeException(msg)
+
   class Break extends RuntimeException
 
-  private def async_die(msg:String, e:Throwable=null) = try {
-    die(msg, e)
+  private def async_die(error_code: String, msg: String, e: Throwable = null) = try {
+    die(error_code, msg, e)
   } catch {
-    case x:Break=>
+    case x: Break =>
   }
 
-  private def die[T](msg:String, e:Throwable=null):T = {
-    if( e!=null) {
+  private def die[T](error_code: String, msg: String, e: Throwable = null): T = {
+    if (e != null) {
       connection_log.info(e, "AMQP connection '%s' error: %s", security_context.remote_address, msg, e)
     } else {
       connection_log.info("AMQP connection '%s' error: %s", security_context.remote_address, msg)
     }
-    if( !dead ) {
+    if (!dead) {
       dead = true
-      waiting_on = ()=>"shutdown"
+      waiting_on = () => "shutdown"
       connection.transport.resumeRead
 
+      on_transport_disconnected()
+      proton.setLocalError(amqp_error(error_code, msg));
+      proton.close()
+      pump_out
+
       // TODO: if there are too many open connections we should just close the connection
       // without waiting for the error to get sent to the client.
       queue.after(die_delay, TimeUnit.MILLISECONDS) {
@@ -188,398 +157,757 @@ class AmqpProtocolHandler extends Protoc
     throw new Break()
   }
 
+  def amqp_error(name: String = "", message: String = "") = new EndpointError(name, message)
+
+  def suspend_read(reason: => String) = {
+    waiting_on = reason _
+    connection.transport.suspendRead
+    // heart_beat_monitor.suspendRead
+  }
+
+  def resume_read() = {
+    waiting_on = WAITING_ON_CLIENT_REQUEST
+    connection.transport.resumeRead
+    // heart_beat_monitor.resumeRead
+  }
+
+  val amqp_connection = new AmqpConnection()
+
+  def codec = connection.transport.getProtocolCodec.asInstanceOf[AmqpProtocolCodec]
+
+  def proton = amqp_connection.getProtonConnection
+
+  def pump_out = {
+    queue.assertExecuting()
+    amqp_connection.pumpOut()
+  }
+
+  override def on_transport_connected() = sys.error("should not get called")
+  override def on_transport_command(command: AnyRef): Unit = sys.error("should not get called")
+
   override def set_connection(connection: BrokerConnection) = {
     super.set_connection(connection)
     import collection.JavaConversions._
 
+    security_context.connector_id = connection.connector.id
+    security_context.certificates = connection.certificates
+    security_context.local_address = connection.transport.getLocalAddress
+    security_context.remote_address = connection.transport.getRemoteAddress
+
     val connector_config = connection.connector.config.asInstanceOf[AcceptingConnectorDTO]
-    config = connector_config.protocols.find( _.isInstanceOf[AmqpDTO]).map(_.asInstanceOf[AmqpDTO]).getOrElse(new AmqpDTO)
+    config = connector_config.protocols.find(_.isInstanceOf[AmqpDTO]).map(_.asInstanceOf[AmqpDTO]).getOrElse(new AmqpDTO)
+    amqp_connection.bind(connection.transport)
+    amqp_connection.setListener(amqp_listener)
+  }
+
+  val amqp_listener = new AmqpListener() {
+
+    override def processSaslConnect(protonTransport: TransportImpl) = {
+      val sasl = protonTransport.sasl();
+      sasl.setMechanisms(Array("ANONYMOUS", "PLAIN"));
+      sasl.server();
+      sasl
+    }
 
-    val options = new AMQPServerConnectionOptions
-    options.setTransport(connection.transport);
-    options.setMaxFrameSize(1024*4)
-    options.setIdleTimeout(-1);
-    options.setLogger(new AMQPConnectionOptions.Logger {
-      override def _debug(format: String, args: Array[AnyRef]) {
-        println(System.currentTimeMillis()+": "+format.format(args: _*))
-        // connection_log.debug(format, args:_*)
-      }
-      override def _trace(format: String, args: Array[AnyRef]) {
-        println(System.currentTimeMillis()+": "+format.format(args: _*))
-        // connection_log.trace(format, args:_*)
-      }
-    })
-    options.setListener(new AMQPConnection.Listener(){
-
-      override def onBegin(begin: Begin) = {
-        val rc = new AMQPServerSessionOptions
-        rc.setIncomingWindow(100)
-        rc.setOutgoingWindow(100)
-        rc.setListener(session_listener)
-        rc
+
+    override def processSaslEvent(sasl: Sasl): Sasl = {
+      // Lets try to complete the sasl handshake.
+      if (sasl.getRemoteMechanisms().length > 0) {
+        if ("PLAIN" == sasl.getRemoteMechanisms()(0)) {
+          val data = new Array[Byte](sasl.pending());
+          sasl.recv(data, 0, data.length);
+          val parts = new Buffer(data).split(0);
+          if (parts.length > 0) {
+            security_context.user = parts(0).utf8.toString
+          }
+          if (parts.length > 1) {
+            security_context.password = parts(1).utf8.toString
+          }
+          // We can't really auth at this point since we don't know the client id yet.. :(
+          sasl.done(Sasl.SaslOutcome.PN_SASL_OK);
+          null
+        } else if ("ANONYMOUS" == sasl.getRemoteMechanisms()(0)) {
+          sasl.done(Sasl.SaslOutcome.PN_SASL_OK);
+          null
+        } else {
+          sasl.done(Sasl.SaslOutcome.PN_SASL_PERM);
+          null
+        }
+      } else {
+        sasl
       }
+    }
+
+    override def processConnectionOpen(conn: engine.Connection, onComplete: Task) {
+      println("connection opened.")
+      security_context.session_id = Some(conn.getRemoteContainer())
 
-      override def onAccepted(session: AMQPSession) {
-        connection_log.info("accepted: "+session)
+      suspend_read("host lookup")
+      broker.dispatch_queue {
+        val virtual_host = proton.getRemoteHostname match {
+          case null => broker.default_virtual_host
+          case host => broker.get_virtual_host(ascii(host))
+        }
+        queue {
+          resume_read
+          if (virtual_host == null) {
+            onComplete.run()
+            async_die("invalid virtual host", "invalid virtual host: " + proton.getRemoteHostname)
+          } else if (!virtual_host.service_state.is_started) {
+            onComplete.run()
+            async_die("virtual host not ready", "")
+          } else {
+            connection_log = virtual_host.connection_log
+            host = virtual_host
+            proton.setLocalContainerId(virtual_host.id)
+            //                proton.open()
+            //                callback.onSuccess(response)
+            if (virtual_host.authenticator != null && virtual_host.authorizer != null) {
+              suspend_read("authenticating and authorizing connect")
+              virtual_host.authenticator.authenticate(security_context) {
+                auth_failure =>
+                  queue {
+                    if (auth_failure != null) {
+                      onComplete.run()
+                      async_die("Authentication failure", "%s. Credentials=%s".format(auth_failure, security_context.credential_dump))
+                    } else if (!virtual_host.authorizer.can(security_context, "connect", connection.connector)) {
+                      onComplete.run()
+                      async_die("Authorization failure", "Not authorized to connect to connector '%s'. Principals=%s".format(connection.connector.id, security_context.principal_dump))
+                    } else if (!virtual_host.authorizer.can(security_context, "connect", virtual_host)) {
+                      onComplete.run()
+                      async_die("Authorization failure", "Not authorized to connect to virtual host '%s'. Principals=%s".format(virtual_host.id, security_context.principal_dump))
+                    } else {
+                      resume_read
+                      proton.open()
+                      onComplete.run()
+                    }
+                  }
+              }
+            } else {
+              proton.open()
+              onComplete.run()
+            }
+          }
+        }
       }
-      override def onException(error: Throwable) {
-        error.printStackTrace();
+    }
+
+    override def processReceiverOpen(receiver: Receiver, onComplete: Task) {
+      // Client producer is attaching..
+      receiver.setSource(receiver.getRemoteSource());
+      receiver.setTarget(receiver.getRemoteTarget());
+
+      receiver.getRemoteTarget() match {
+        case target: Coordinator =>
+          //          pumpProtonToSocket();
+          //          receiver.setContext(coordinatorContext);
+          //          receiver.flow(1024 * 64);
+          //          receiver.open();
+          //          pumpProtonToSocket();
+          close_with_error(receiver, "txs not supported")
+          onComplete.run()
+        case amqp_target: Target =>
+
+          val (address, addresses, actualTarget) = decode_target(amqp_target)
+          receiver.setTarget(actualTarget);
+          if (addresses == null) {
+            close_with_error(receiver, "invalid-address", "Invaild address: " + address)
+            onComplete.run()
+            return
+          }
+
+          link_counter += 1
+          val route = new AmqpProducerRoute(link_counter, receiver, addresses)
+          producers += (link_counter -> route)
+
+          host.dispatch_queue {
+            val rc = host.router.connect(route.addresses, route, security_context)
+            queue {
+              println(rc)
+              rc match {
+                case Some(failure) =>
+                  println(failure)
+                  close_with_error(receiver, "Could not connect", failure)
+                  onComplete.run()
+                case None =>
+                  println("ok")
+                  // If the remote has not closed on us yet...
+                  if (receiver.getRemoteState == EndpointState.ACTIVE) {
+                    receiver.setContext(route)
+                    receiver.flow(1024 * 64);
+                    receiver.open()
+                  } else {
+                    receiver.close()
+                  }
+                  onComplete.run()
+              }
+            }
+          }
       }
+    }
 
-      override def onOpen(request: Open, response: Open, callback: Callback[Open]) {
-        handle_open(request, response, callback)
+    override def processReceiverClose(receiver: Receiver, onComplete: Task) {
+      receiver.getContext match {
+        case null =>
+          receiver.close()
+          onComplete.run()
+        case route: AmqpProducerRoute =>
+          // Lets disconnect the route.
+          receiver.setContext(null)
+          host.dispatch_queue {
+            host.router.disconnect(route.addresses, route)
+            queue {
+              receiver.close()
+              producers -= route.id
+              route.release
+              onComplete.run()
+            }
+          }
       }
-    })
-    amqp_connection = AMQP.open(options, new Callback[AMQPConnection] {
-      override def onSuccess(value: AMQPConnection) {
-        println("AMQP connection is open.")
+    }
+
+    override def processDelivery(receiver: Receiver, delivery: engine.Delivery) {
+      receiver.getContext match {
+        case null =>
+        case route: AmqpProducerRoute => route.process(delivery)
       }
-      override def onFailure(value: Throwable) {
-        println("Failed to open AMQP connection: "+value)
+    }
+
+    override def processSenderOpen(sender: Sender, onComplete: Task) {
+      // Client consumer is attaching..
+      sender.setSource(sender.getRemoteSource());
+      sender.setTarget(sender.getRemoteTarget());
+
+      val source = sender.getRemoteSource().asInstanceOf[Source]
+      val (address, requested_addresses, actual) = decode_source(source)
+      sender.setSource(actual);
+      if (requested_addresses == null) {
+        close_with_error(sender, "invalid-address", "Invaild address: " + address)
+        onComplete.run()
+        return
+      }
+
+      val filter = source.getFilter()
+      val selector = if (filter != null) {
+        val value = filter.get(JMS_SELECTOR).asInstanceOf[DescribedType]
+        if (value != null) {
+          val selector = value.getDescribed().toString()
+          try {
+            (selector, SelectorParser.parse(selector))
+          } catch {
+            case e: FilterException =>
+              close_with_error(sender, "amqp:invalid-field", "Invalid selector expression '%s': %s".format(selector, e.getMessage))
+              onComplete.run()
+              return
+          }
+        } else {
+          null
+        }
+      } else {
+        null
       }
-    })
 
-  }
-  override def on_transport_connected() = sys.error("should not get called")
-  override def on_transport_disconnected() = sys.error("should not get called")
-  override def on_transport_command(command:AnyRef):Unit = sys.error("should not get called")
+      val presettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
 
-  def suspend_read(reason: =>String) = {
-    waiting_on = reason _
-    connection.transport.suspendRead
-    // heart_beat_monitor.suspendRead
-  }
-  def resume_read() = {
-    waiting_on = WAITING_ON_CLIENT_REQUEST
-    connection.transport.resumeRead
-    // heart_beat_monitor.resumeRead
+      var browser = false
+      var browser_end = browser && true
+      var exclusive = !browser && false
+      var include_seq: Option[Long] = None
+      val from_seq_opt: Option[Long] = None
+
+      def is_multi_destination = if (requested_addresses.length > 1) {
+        true
+      } else {
+        PathParser.containsWildCards(requested_addresses(0).path)
+      }
+
+      if (from_seq_opt.isDefined && is_multi_destination) {
+        close_with_error(sender, "invalid-from-seq", "The from-seq header is only supported when you subscribe to one destination")
+        onComplete.run()
+        return
+      }
+
+      var persistent = source.getDurable != null && source.getDurable.intValue() == 1
+      val addresses: Array[_ <: BindAddress] = if (persistent) {
+        val dsubs = ListBuffer[BindAddress]()
+        val topics = ListBuffer[BindAddress]()
+        requested_addresses.foreach {
+          address =>
+            address.domain match {
+              case "dsub" => dsubs += address
+              case "topic" => topics += address
+              case _ =>
+                close_with_error(sender, "invalid-from-seq", "A durable link can only be used on a topic destination")
+                onComplete.run()
+                return
+            }
+        }
+        sender.getName()
+        val s = if (selector == null) null else selector._1
+        dsubs += SubscriptionAddress(destination_parser.decode_path(sender.getName), s, topics.toArray)
+        dsubs.toArray
+      } else {
+        requested_addresses
+      }
+
+      val from_seq = from_seq_opt.getOrElse(0L)
+
+
+      link_counter += 1
+      val id = link_counter
+      val consumer = new AmqpConsumer(sender, link_counter, requested_addresses, presettle, selector, browser, exclusive, include_seq, from_seq, browser_end);
+      consumers += (id -> consumer)
+
+      host.dispatch_queue {
+        val rc = host.router.bind(requested_addresses, consumer, security_context)
+        queue {
+          rc match {
+            case Some(reason) =>
+              consumers -= id
+              consumer.release
+              close_with_error(sender, "subscribe-failed", reason)
+              onComplete.run()
+            case None =>
+              sender.setContext(consumer)
+              sender.open()
+              onComplete.run()
+          }
+        }
+      }
+    }
+
+    var gracefully_closed = false
+    override def processFailure(e: Throwable) {
+      var msg = "Internal Server Error: " + e
+      if( connection_log!=AmqpProtocolHandler ) {
+        // but we also want the error on the apollo.log file.
+        warn(e, msg)
+      }
+      async_die("internal-error", msg, e)
+    }
+
+    override def processTransportFailure(error: IOException) {
+      if( !gracefully_closed ) {
+        connection_log.info("Shutting connection '%s'  down due to: %s", security_context.remote_address, error)
+      }
+      on_transport_disconnected()
+    }
+
+    override def processConnectionClose(conn: engine.Connection, onComplete: Task) {
+      gracefully_closed = true
+      on_transport_disconnected()
+      super.processConnectionClose(conn, onComplete)
+      queue.after(die_delay, TimeUnit.MILLISECONDS) {
+        connection.stop(NOOP)
+      }
+    }
+
+    override def processRefill() = {
+      for( c <- consumers.values ) {
+        c.session_manager.drain_overflow
+      }
+    }
   }
 
-  def handle_open(request: Open, response: Open, callback: Callback[Open]) = {
-    broker.dispatch_queue {
-      suspend_read("host lookup")
-      val host = request.getHostname match {
-        case null => broker.default_virtual_host
-        case host=> broker.get_virtual_host(ascii(host))
-      }
-      queue {
-        resume_read
-        if(host==null) {
-          callback.onFailure(new AMQPException("Invalid virtual host: "+host));
-        } else if(!host.service_state.is_started) {
-          callback.onFailure(new AMQPException("virtual host not ready"));
-        } else {
-          response.setContainerID(host.id)
-          this.host=host
-          callback.onSuccess(response)
-//          security_context.session_id = Some("%s-%x".format(this.host.config.id, this.host.session_counter.incrementAndGet))
-//          connection_log = host.connection_log
-//          if( host.authenticator!=null &&  host.authorizer!=null ) {
-//            suspend_read("authenticating and authorizing connect")
-//            host.authenticator.authenticate(security_context) { auth_failure=>
-//              dispatchQueue {
-//                if( auth_failure!=null ) {
-//                  async_die("%s. Credentials=%s".format(auth_failure, security_context.credential_dump))
-//                } else if( !host.authorizer.can(security_context, "connect", connection.connector) ) {
-//                  async_die("Not authorized to connect to connector '%s'. Principals=%s".format(connection.connector.id, security_context.principal_dump))
-//                } else if( !host.authorizer.can(security_context, "connect", this.host) ) {
-//                  async_die("Not authorized to connect to virtual host '%s'. Principals=%s".format(this.host.id, security_context.principal_dump))
-//                } else {
-//                  resume_read
-//                  send_connected
-//                }
-//              }
-//            }
-//          } else {
-//            send_connected
-//          }
+  var disconnected = false
+  override def on_transport_disconnected() = {
+    queue.assertExecuting()
+    if( !disconnected ) {
+      disconnected = true
+
+//      // Rollback any in-progress transactions..
+//      for( (id, tx) <- transactions ) {
+//        tx.rollback
+//      }
+//      transactions.clear()
+//
+      for (producer <- producers.values) {
+        val addresses = producer.addresses
+        host.dispatch_queue {
+          host.router.disconnect(producer.addresses, producer)
+          producer.release()
         }
       }
+      producers = Map()
+
+
+      for (consumer <- consumers.values) {
+        val addresses = consumer.addresses
+        host.dispatch_queue {
+          host.router.unbind(addresses, consumer, false , security_context)
+          consumer.release()
+        }
+      }
+      consumers = Map()
+      security_context.logout( e => {
+        if(e!=null) {
+          connection_log.info(e, "STOMP connection '%s' log out error: %s", security_context.remote_address, e)
+        }
+      })
+      trace("amqp protocol resources released")
     }
   }
 
   var destination_parser = DEFAULT_DETINATION_PARSER
   var temp_destination_map = HashMap[SimpleAddress, SimpleAddress]()
 
-  def decode_addresses(value:String):Array[SimpleAddress] = {
+  def decode_addresses(value: String): Array[SimpleAddress] = {
     val rc = destination_parser.decode_multi_destination(value)
-    if( rc==null ) {
+    if (rc == null) {
       return null
     }
-    rc.map { dest =>
-      if( dest.domain.startsWith("temp-") ) {
-        temp_destination_map.getOrElseUpdate(dest, {
-          val parts = LiteralPart("temp") :: LiteralPart(broker.id) :: LiteralPart(session_id.get) :: dest.path.parts
-          SimpleAddress(dest.domain.stripPrefix("temp-"), Path(parts))
-        })
-      } else {
-        dest
-      }
+    rc.map {
+      dest =>
+        if (dest.domain.startsWith("temp-")) {
+          temp_destination_map.getOrElseUpdate(dest, {
+            val parts = LiteralPart("temp") :: LiteralPart(broker.id) :: LiteralPart(session_id.get) :: dest.path.parts
+            SimpleAddress(dest.domain.stripPrefix("temp-"), Path(parts))
+          })
+        } else {
+          dest
+        }
     }
   }
 
-  class AmqpProducerRoute(val dest: String, val addresses:Array[SimpleAddress]) extends DeliveryProducerRoute(host.router) {
+  trait ProducerSupport {
+    var current = new ByteArrayOutputStream();
 
-    val key = addresses.toList
-    var is_connected = false
+    def receiver: Receiver
 
-    override def send_buffer_size = buffer_size
+    def process(delivery: engine.Delivery): Unit = {
+      if (!delivery.isReadable()) {
+        System.out.println("it was not readable!");
+        return;
+      }
 
-    override def connection = Some(AmqpProtocolHandler.this.connection)
+      if (current == null) {
+        current = new ByteArrayOutputStream();
+      }
 
-    override def connected() = is_connected = true
+      var data = new Array[Byte](1024 * 4);
+      var done = false
+      while (!done) {
+        val count = receiver.recv(data, 0, data.length)
+        if (count > 0) {
+          current.write(data, 0, count);
+        } else {
+          if (count == 0) {
+            // Expecting more deliveries..
+            return;
+          }
+          done = true
+        }
+      }
 
-    override def dispatch_queue = queue
+      receiver.advance();
+      delivery.settle(); // TODO: do this once accepted by the broker.
 
-    refiller = ^ {
-      resume_read
+      val buffer = current.toBuffer();
+      current = null;
+      onMessage(delivery, new AmqpMessage(buffer));
     }
+
+    def onMessage(delivery: engine.Delivery, buffer: AmqpMessage): Unit
   }
 
-  object session_listener extends AMQPSession.Listener{
-    override def onAttach(attach: Attach, callback: Callback[AMQPEndpoint]) {
-      if( attach.getRole == Role.SENDER.getValue ) {
-        val target = attach.getTarget.asInstanceOf[Target]
-        target.getAddress match {
-          case address:AMQPString =>
-            var dest = address.getValue
-            decode_addresses(dest) match {
-              case null => callback.onFailure(new Exception("Invaild address: "+dest))
-              case addresses => attach_sender(attach, dest, addresses, callback)
-            }
-          case _ =>
-            callback.onFailure(new Exception("Invaild address: "+target.getAddress))
-        }
-      } else {
-        val source = attach.getSource.asInstanceOf[Source]
-        source.getAddress match {
-          case address:AMQPString =>
-            var dest = address.getValue
-            decode_addresses(dest) match {
-              case null => callback.onFailure(new Exception("Invaild address: "+dest))
-              case addresses => attach_receiver(attach, dest, addresses, callback)
-            }
-          case _ =>
-            callback.onFailure(new Exception("Invaild address: "+source.getAddress))
-        }
+  def decode_target(target: Target) = {
+    var dynamic = target.getDynamic()
+    if (dynamic) {
+      temp_dest_counter += 1
+      val parts = LiteralPart("temp") :: LiteralPart(broker.id) :: LiteralPart(session_id.get) :: LiteralPart(temp_dest_counter.toString) :: Nil
+      val rc = SimpleAddress("queue", Path(parts))
+      val actual = new Target();
+      var address = destination_parser.encode_destination(rc)
+      actual.setAddress(address);
+      actual.setDynamic(true);
+      (address, Array(rc), actual)
+    } else {
+      val address = target.getAddress
+      decode_addresses(address) match {
+        case null =>
+          (address, null, target)
+        case addresses =>
+          (address, addresses, target)
       }
     }
+  }
 
-    override def onClose(error: Error) {
-      if( error!=null ) {
-        info("peer closed the AMQP session due to: "+error)
+  def decode_source(source: Source) = {
+    var dynamic = source.getDynamic()
+    if (dynamic) {
+      temp_dest_counter += 1
+      val parts = LiteralPart("temp") :: LiteralPart(broker.id) :: LiteralPart(session_id.get) :: LiteralPart(temp_dest_counter.toString) :: Nil
+      val rc = SimpleAddress("queue", Path(parts))
+      val actual = new Source();
+      var address = destination_parser.encode_destination(rc)
+      actual.setAddress(address);
+      actual.setDynamic(true);
+      (address, Array(rc), actual)
+    } else {
+      val address = source.getAddress
+      decode_addresses(address) match {
+        case null =>
+          (address, null, source)
+        case addresses =>
+          (address, addresses, source)
       }
     }
-
   }
 
-  def attach_sender(attach: Attach, address: String, addresses:Array[SimpleAddress], callback: Callback[AMQPEndpoint]) {
-    val target = new AmqpProducerRoute(address, addresses)
-    var receiver: AMQPReceiver = null
-    // create the producer route...
-    val options = new AMQPReceiverOptions();
-    options.setSource(attach.getSource.asInstanceOf[Source])
-    options.setTarget(attach.getTarget.asInstanceOf[Target])
-    options.setName(attach.getName)
-    options.setSenderSettleMode(SenderSettleMode.valueOf(attach.getSndSettleMode))
-    options.setReceiverSettleMode(ReceiverSettleMode.valueOf(attach.getRcvSettleMode))
-    options.setMaxMessageSize(10 * 1024 * 1024);
+  var temp_dest_counter = 0L
 
-    def pump = {
-      while (target.is_connected && !target.full && receiver.peek() != null) {
+  class AmqpProducerRoute(val id:Long, val receiver: Receiver, val addresses: Array[SimpleAddress]) extends DeliveryProducerRoute(host.router) with ProducerSupport {
 
-        val amqpDelivery = receiver.poll()
+    val key = addresses.toList
+    var is_connected = false
 
-        // Update the message /w who sent it to us..
-        val amqpMessage = amqpDelivery.getMessage;
-        if (amqpMessage.getDeliveryAnnotations == null) {
-          amqpMessage.setDeliveryAnnotations(new DeliveryAnnotations(new MapEntries()))
-        }
-        amqpMessage.getDeliveryAnnotations.getValue.add(SENDER_CONTAINER_KEY, new AMQPString(amqp_connection.remoteContainerId()))
+    override def send_buffer_size = buffer_size
 
-        val apolloDelivery = new Delivery
-        apolloDelivery.message = AMQPMessage(amqpMessage)
-        apolloDelivery.size = amqpDelivery.payload.length()
-        //                delivery.expiration = message.expiration
-        //                delivery.persistent = message.persistent
-        //                delivery.uow = uow
-        //                get(frame.headers, RETAIN).foreach { retain =>
-        //                  delivery.retain = retain match {
-        //                    case SET => RetainSet
-        //                    case REMOVE => RetainRemove
-        //                    case _ => RetainIgnore
-        //                  }
-        //                }
+    override def connection = Some(AmqpProtocolHandler.this.connection)
 
-        if (!amqpDelivery.isSettled) {
-          apolloDelivery.ack = {
-            (consumed, uow) =>
-              queue <<| ^ {
-                amqpDelivery.ack()
-              }
-          }
-        }
+    override def connected() = is_connected = true
 
-        target.offer(apolloDelivery)
-      }
-    }
+    override def dispatch_queue = queue
 
-    target.refiller = ^ {
-      pump
+    refiller = ^ {
+      resume_read
     }
-    options.setListener(new AMQPEndpoint.Listener {
-      override def onTransfer() = pump
 
-      override def onClosed(senderClosed: Boolean, error: Error) {
-        if (error != null) {
-          debug("Peer closed link due to error: %s", error)
-        }
-        host.dispatch_queue {
-          host.router.disconnect(target.addresses, target)
+    val producer_overflow = new OverflowSink[Delivery](this)
+
+    def onMessage(delivery: engine.Delivery, message: AmqpMessage) = {
+      val d = new Delivery
+      d.message = message
+      d.size = message.encoded.length
+      var decoded = message.decoded
+      if (decoded.getProperties != null) {
+        if (decoded.getProperties.getAbsoluteExpiryTime != null) {
+          d.expiration = decoded.getProperties.getAbsoluteExpiryTime.getTime
+        }
+      }
+      if (decoded.getHeader != null) {
+        if (decoded.getHeader.getDurable != null) {
+          d.persistent = decoded.getHeader.getDurable.booleanValue()
+        }
+        if (decoded.getHeader.getDeliveryCount != null) {
+          d.redeliveries = decoded.getHeader.getDeliveryCount.shortValue()
         }
       }
-    })
 
-    // start with 0 credit window so that we don't receive any messages
-    // until we have verified if that we can connect to the destination..
-    options.credit = 0
-    receiver = AMQP.createReceiver(options)
-    callback.onSuccess(receiver)
-
-    host.dispatch_queue {
-      val rc = host.router.connect(target.addresses, target, security_context)
-      queue {
-        rc match {
-          case Some(failure) =>
-            receiver.detach(true, failure, null)
-          case None =>
-            // Add credit to start receiving messages.
-            receiver.addCredit(50)
+      if (!delivery.remotelySettled()) {
+        d.ack = (result, uow) => {
+          queue {
+            result match {
+              case Consumed =>
+                delivery.settle()
+              case _ =>
+                async_die("uknown", "Unexpected NAK from broker")
+            }
+          }
         }
       }
+
+      producer_overflow.offer(d)
+      receiver.advance();
     }
   }
-  var protocol_convert = "full"
 
-  class AMQPConsumer(
-      val subscription_id:String,
-      val addresses:Array[_ <: BindAddress],
-      val selector:(String, BooleanExpression),
-      override val browser:Boolean,
-      override val exclusive:Boolean,
-      val include_seq:Option[Long],
-      val from_seq:Long,
-      override val close_on_drain:Boolean
-    ) extends BaseRetained with DeliveryConsumer {
+  def close_with_error(link: Link, error_name: String = "", error_message: String = "") = {
+    link.asInstanceOf[LinkImpl].setLocalError(amqp_error(error_name, error_message))
+    link.close()
+  }
+
+  var link_counter = 0L
+  var producers = Map[Long, AmqpProducerRoute]()
+  var consumers = Map[Long, AmqpConsumer]()
+  var message_id_counter = 0L
+
+  class AmqpConsumer(sender: Sender,
+                     val subscription_id: Long,
+                     val addresses: Array[_ <: BindAddress],
+                     val presettle: Boolean,
+                     val selector: (String, BooleanExpression),
+                     override val browser: Boolean,
+                     override val exclusive: Boolean,
+                     val include_seq: Option[Long],
+                     val from_seq: Long,
+                     override val close_on_drain: Boolean
+                            ) extends BaseRetained with DeliveryConsumer {
 
-    var sender:AMQPSender = _
-
-    override def toString = "amqp subscription:"+subscription_id+", remote address: "+security_context.remote_address
+    override def toString = "amqp subscription:" + sender.getName + ", remote address: " + security_context.remote_address
 
     ///////////////////////////////////////////////////////////////////
     // DeliveryConsumer Interface..
     ///////////////////////////////////////////////////////////////////
-    def connect(p:DeliveryProducer) = new AMQPConsumerSession(p)
+    def connect(p: DeliveryProducer) = new AmqpConsumerSession(p)
     def dispatch_queue = queue
     override def connection = Option(AmqpProtocolHandler.this.connection)
+
     def is_persistent = false
     def matches(message: Delivery) = true
-
     override def start_from_tail = from_seq == -1
-    override def jms_selector = if(selector!=null){ selector._1 } else { null }
+
+    override def jms_selector = if (selector != null) {
+      selector._1
+    } else {
+      null
+    }
+
     override def user = security_context.user
 
-    var starting_seq:Long = 0L
-    override def set_starting_seq(seq: Long):Unit = {
-      starting_seq=seq
+    var starting_seq: Long = 0L
+
+    override def set_starting_seq(seq: Long): Unit = {
+      starting_seq = seq
     }
 
-    ///////////////////////////////////////////////////////////////////
-    // Sink[(Session[Delivery], Delivery)] interface..
-    ///////////////////////////////////////////////////////////////////
-    object sink extends Sink[(Session[Delivery], Delivery)] {
-      var refiller: Task = NOOP
-      def full: Boolean = sender.full()
-      def offer(value: (Session[Delivery], Delivery)): Boolean = {
-        if( full ) {
-          return false
-        }
-        
-        val (session, delivery) = value
-        val message = delivery.message
-
-        
-        val header = new Header()
-        header.setDeliveryCount(delivery.redeliveries)
-        header.setDurable(delivery.persistent)
-        header.setFirstAcquirer(delivery.redeliveries == 0)
-
-//        if( include_seq.isDefined ) {
-//          frame = frame.append_headers((include_seq.get, ascii(delivery.seq.toString))::Nil)
-//        }
-
-        var annotated = if( message.codec eq AmqpMessageCodec ) {
-          val original = message.asInstanceOf[AMQPMessage].annotated
-          var annotated = new Envelope
-          annotated.setHeader(header)
-          annotated.setDeliveryAnnotations(original.getDeliveryAnnotations)
-          annotated.setMessageAnnotations(original.getMessageAnnotations)
-          annotated.setMessage(original.getMessage)
-          annotated.setFooter(original.getFooter)
-          annotated
-        } else {
-          
-          val (body, content_type) =  protocol_convert match{
-            case "body" => (message.getBodyAs(classOf[Buffer]), "protocol/"+message.codec.id+";conv=body")
-            case _ => (message.encoded, "protocol/"+message.codec.id)
-          }
-          
-          val bare = new types.Message
-          bare.setData(new Data(body))
-          var properties = new Properties()
-          properties.setContentType(ascii(content_type))
-          if( delivery.expiration!= 0 ) {
-            properties.setAbsoluteExpiryTime(new Date(delivery.expiration))
-          }
-          bare.setProperties(properties)
-          var annotated = new Envelope
-          annotated.setHeader(header)
-          annotated.setMessage(bare)
-          annotated
-        }
-        
-        sender.send(annotated, null)
 
-        messages_sent += 1
-        return true
+    var nextTagId = 0L;
+    val tagCache = new util.HashSet[Array[Byte]]();
+    val unsettled = new HashMap[AsciiBuffer, org.apache.qpid.proton.engine.Delivery]()
+
+    def nextTag: Array[Byte] = {
+      var rc: Array[Byte] = null
+      if (tagCache != null && !tagCache.isEmpty()) {
+        val iterator = tagCache.iterator();
+        rc = iterator.next();
+        iterator.remove();
+      } else {
+        rc = java.lang.Long.toHexString(nextTagId).getBytes("UTF-8");
+        nextTagId += 1
       }
+      return rc;
     }
 
-    ///////////////////////////////////////////////////////////////////
-    // AMQPEndpoint.Listener interface..
-    ///////////////////////////////////////////////////////////////////
-    object endpoint_listener extends AMQPEndpoint.Listener {
-      override def onTransfer = queue {
-        sink.refiller.run()
-      }
-      override def onClosed(senderClosed: Boolean, error: Error) {
-        if (error != null) {
-          debug("Peer closed link due to error: %s", error)
-        }
-        host.dispatch_queue {
-          host.router.unbind(addresses, AMQPConsumer.this, false, security_context)
-        }
+    def checkinTag(data: Array[Byte]) = {
+      if (tagCache.size() < 1024) {
+        tagCache.add(data);
       }
     }
 
-    val session_manager = new SessionSinkMux[Delivery](sink, queue, Delivery, 1, buffer_size) {
+    val redeliveries = new util.LinkedList[(Session[Delivery], Delivery)]()
+    val session_manager = new SessionSinkMux[Delivery](FullSink(), queue, Delivery, 1, buffer_size) {
       override def time_stamp = broker.now
+
+      var currentBuffer: Buffer = _;
+      var currentDelivery: org.apache.qpid.proton.engine.Delivery = _;
+
+      override def drain_overflow: Unit = {
+        queue.assertExecuting()
+        var pumpNeeded = false
+        try {
+          while (true) {
+            while (currentBuffer != null) {
+              var sent = sender.send(currentBuffer.data, currentBuffer.offset, currentBuffer.length);
+              if (sent > 0) {
+                pumpNeeded = true
+                currentBuffer.moveHead(sent);
+                if (currentBuffer.length == 0) {
+                  if (presettle) {
+                    settle(currentDelivery, Consumed);
+                  } else {
+                    sender.advance();
+                  }
+                  currentBuffer = null;
+                  currentDelivery = null;
+                }
+              } else {
+                return;
+              }
+            }
+
+            val value = poll
+            if (value == null) {
+              return
+            } else {
+              val (session, delivery) = value
+              val message = if (delivery.message.codec == AmqpMessageCodec) {
+                delivery.message.asInstanceOf[AmqpMessage].decoded
+              } else {
+                val (body, content_type) = protocol_convert match {
+                  case "body" => (delivery.message.getBodyAs(classOf[Buffer]), "protocol/" + delivery.message.codec.id + ";conv=body")
+                  case _ => (delivery.message.encoded, "protocol/" + delivery.message.codec.id())
+                }
+
+                message_id_counter += 1
+
+                val message = new org.apache.qpid.proton.message.Message
+                message.setMessageId(session_id.get + message_id_counter)
+                message.setBody(new Data(new Binary(body.data, body.offset, body.length)))
+                message.setContentType(content_type)
+                message.setDurable(delivery.persistent)
+                if (delivery.expiration > 0) {
+                  message.setExpiryTime(delivery.expiration)
+                }
+                message
+              }
+
+              if (delivery.redeliveries > 0) {
+                message.setDeliveryCount(delivery.redeliveries)
+                message.setFirstAcquirer(false)
+              }
+
+              currentBuffer = new AmqpMessage(null, message).encoded;
+              if (presettle) {
+                currentDelivery = sender.delivery(EMPTY_BYTE_ARRAY, 0, 0);
+              } else {
+                val tag = nextTag
+                currentDelivery = sender.delivery(tag, 0, tag.length);
+                unsettled.put(new AsciiBuffer(tag), currentDelivery)
+              }
+              currentDelivery.setContext(value);
+            }
+          }
+        } finally {
+          if( pumpNeeded ) {
+            pump_out
+          }
+        }
+      }
+
+      override def poll: (Session[Delivery], Delivery) = {
+        if( redeliveries.isEmpty ) {
+          super.poll
+        } else {
+          redeliveries.removeFirst()
+        }
+      }
+
+      private def settle(delivery: org.apache.qpid.proton.engine.Delivery, ackType: DeliveryResult) {
+        val tag: Array[Byte] = delivery.getTag
+        if (tag != null && tag.length > 0) {
+          checkinTag(tag)
+          unsettled.remove(new AsciiBuffer(tag))
+        }
+        // Don't ack.. redeliver
+        val (session, apollo_delivery) = delivery.getContext.asInstanceOf[(Session[Delivery], Delivery)]
+        if (ackType == null) {
+          delivery.settle
+          redeliveries.addFirst((session, apollo_delivery))
+          drain_overflow
+        } else {
+
+          val remoteState = delivery.getRemoteState
+          if (remoteState != null && remoteState.isInstanceOf[TransactionalState]) {
+            val s: TransactionalState = remoteState.asInstanceOf[TransactionalState]
+            val txid = toLong(s.getTxnId)
+            async_die("txs-not-supported", "Transactions not yet supported")
+            return
+          }
+
+          if (apollo_delivery.ack != null) {
+            apollo_delivery.ack(ackType, null)
+          }
+          delivery.settle
+          pump_out
+        }
+      }
     }
 
-    class AMQPConsumerSession(p:DeliveryProducer) extends DeliverySession with SessionSinkFilter[Delivery] {
+    class AmqpConsumerSession(p: DeliveryProducer) extends DeliverySession with SessionSinkFilter[Delivery] {
 
       def producer = p
-      def consumer = AMQPConsumer.this
+      def consumer = AmqpConsumer.this
       val downstream = session_manager.open(producer.dispatch_queue)
 
       // Delegate all the flow control stuff to the session
@@ -588,8 +916,8 @@ class AmqpProtocolHandler extends Protoc
         rc
       }
 
-      def offer(delivery:Delivery) = {
-        if( full ) {
+      def offer(delivery: Delivery) = {
+        if (full) {
           false
         } else {
           delivery.message.retain()
@@ -599,107 +927,38 @@ class AmqpProtocolHandler extends Protoc
         }
       }
 
-      def close {}
-    }
-  }
-
-  def attach_receiver(attach: Attach, address: String, requested_addresses:Array[SimpleAddress], callback: Callback[AMQPEndpoint]) = try {
-    val options = new AMQPSenderOptions();
-
-    val src = attach.getSource.asInstanceOf[Source]
-    src.setDefaultOutcome(new Released())
-
-    if (attach.getSndSettleMode == SenderSettleMode.SETTLED.getValue) {
-      // if we are settling... then no other outcomes are possible..
-      src.setOutcomes(Array())
-    } else {
-      src.setOutcomes(Array(
-        new AMQPSymbol(Accepted.SYMBOLIC_ID),
-        new AMQPSymbol(Rejected.SYMBOLIC_ID),
-        new AMQPSymbol(Released.SYMBOLIC_ID),
-        new AMQPSymbol(Modified.SYMBOLIC_ID)
-      ))
-    }
-    options.setSource(src)
-
-    options.setTarget(attach.getTarget.asInstanceOf[Target])
-    options.setName(attach.getName)
-    options.setSenderSettleMode(SenderSettleMode.valueOf(attach.getSndSettleMode))
-    options.setReceiverSettleMode(ReceiverSettleMode.valueOf(attach.getRcvSettleMode))
-    options.setMaxMessageSize(10 * 1024 * 1024);
-
-
-    val subscription_id = attach.getName
-    var persistent = false
-    var browser = false
-    var browser_end = browser && true
-    var exclusive = !browser && false
-    var include_seq: Option[Long] = None
-    val from_seq_opt: Option[Long] = None
-
-    def is_multi_destination = if (requested_addresses.length > 1) {
-      true
-    } else {
-      PathParser.containsWildCards(requested_addresses(0).path)
-    }
-    if (from_seq_opt.isDefined && is_multi_destination) {
-      die("The from-seq header is only supported when you subscribe to one destination");
-    }
-
-    val selector = attach.getProperties match {
-      case null => null
-      case properties =>
-        properties.get(new AMQPString("selector")) match {
-          case null => null
-          case x:AMQPString =>
-            try {
-              (x.getValue, SelectorParser.parse(x.getValue))
-            } catch {
-              case e: FilterException =>
-                die("Invalid selector expression '%s': ".format(x, e.getMessage))
-            }
-          case x =>
-            die("Invalid selector expression '%s'".format(x))
-        }
-    }
-
-    val addresses:Array[_ <: BindAddress] = if (persistent) {
-      val dsubs = ListBuffer[BindAddress]()
-      val topics = ListBuffer[BindAddress]()
-      requested_addresses.foreach { address =>
-          address.domain match {
-            case "dsub" => dsubs += address
-            case "topic" => topics += address
-            case _ => die("A persistent subscription can only be used on a topic destination")
+      def close {
+        session_manager.close(downstream, (delivery)=>{
+          if( delivery.ack !=null ) {
+            delivery.ack(Undelivered, null)
           }
+        })
       }
-      val s = if (selector == null) null else selector._1
-      dsubs += SubscriptionAddress(destination_parser.decode_path(subscription_id), s, topics.toArray)
-      dsubs.toArray
-    } else {
-      requested_addresses
     }
 
-    val from_seq = from_seq_opt.getOrElse(0L)
-
-    val source = new AMQPConsumer(subscription_id, addresses, selector, browser, exclusive, include_seq, from_seq, browser_end);
-    options.setListener(source.endpoint_listener)
-    source.sender = AMQP.createSender(options)
+    override def dispose() = queue {
+      def reject(value:(Session[Delivery], Delivery), result:DeliveryResult) ={
+        val (_, delivery) = value
+        if( delivery.ack!=null ) {
+          delivery.ack(result, null)
+        }
+      }
 
-    host.dispatch_queue {
-      val rc = host.router.bind(addresses, source, security_context)
-      source.release
-      queue {
-        rc match {
-          case Some(failure) =>
-            source.sender.detach(true, failure, null)
-          case None =>
+      for( v <- unsettled.values ) {
+        val value = v.getContext.asInstanceOf[(Session[Delivery], Delivery)]
+        if( value!=null ) {
+          v.setContext(null)
+          reject(value, Delivered)
         }
       }
+
+      var next = session_manager.poll
+      while( next!=null ) {
+        reject(next, Undelivered)
+      }
+      super.dispose()
     }
-    callback.onSuccess(source.sender)
-  } catch {
-    case e => callback.onFailure(e)
+
   }
 
 }

Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpConnection.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpConnection.java?rev=1405942&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpConnection.java (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpConnection.java Mon Nov  5 20:32:47 2012
@@ -0,0 +1,283 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.activemq.apollo.amqp.hawtdispatch;
+
+import org.apache.qpid.proton.engine.*;
+import org.apache.qpid.proton.engine.impl.ConnectionImpl;
+import org.apache.qpid.proton.engine.impl.ProtocolTracer;
+import org.apache.qpid.proton.engine.impl.TransportImpl;
+import org.apache.qpid.proton.framing.TransportFrame;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtdispatch.Dispatch;
+import org.fusesource.hawtdispatch.DispatchQueue;
+import org.fusesource.hawtdispatch.Task;
+import org.fusesource.hawtdispatch.transport.DefaultTransportListener;
+import org.fusesource.hawtdispatch.transport.Transport;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Integrates a proton transport/connection /w HawtDispatch transports.
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class AmqpConnection {
+
+    static final ExecutorService blockingExecutor = Executors.newCachedThreadPool();
+    final ConnectionImpl protonConnection = new ConnectionImpl();
+
+    Transport hawtdispatchTransport;
+    TransportImpl protonTransport = new TransportImpl();
+    HashSet<Object> endpointsBeingProcessed = new HashSet<Object>();
+
+    public Sasl sasl;
+
+    public void bind(Transport transport) throws Exception {
+        this.protonTransport = new TransportImpl();
+        this.protonTransport.setProtocolTracer(new ProtocolTracer() {
+            public void receivedFrame(TransportFrame transportFrame) {
+                System.out.println(String.format("RECV: %s:%05d | %s", hawtdispatchTransport.getRemoteAddress(), transportFrame.getChannel(), transportFrame.getBody()));
+            }
+
+            public void sentFrame(TransportFrame transportFrame) {
+                System.out.println(String.format("SEND: %s:%05d | %s", hawtdispatchTransport.getRemoteAddress(), transportFrame.getChannel(), transportFrame.getBody()));
+            }
+        });
+        this.protonTransport.bind(protonConnection);
+
+        this.hawtdispatchTransport = transport;
+        this.hawtdispatchTransport.setBlockingExecutor(blockingExecutor);
+        if( this.hawtdispatchTransport.getProtocolCodec()==null ) {
+            this.hawtdispatchTransport.setProtocolCodec(new AmqpProtocolCodec());
+        }
+        this.hawtdispatchTransport.setTransportListener(new DefaultTransportListener() {
+
+            @Override
+            public void onTransportConnected() {
+                hawtdispatchTransport.resumeRead();
+                listener.processTransportConnected();
+            }
+
+            @Override
+            public void onTransportCommand(Object command) {
+                try {
+                    Buffer buffer;
+                    if( command.getClass() == AmqpHeader.class ) {
+                        AmqpHeader header = (AmqpHeader)command;
+                        switch( header.getProtocolId() ) {
+                            case 0:
+                                break; // nothing to do..
+                            case 3: // Client will be using SASL for auth..
+                                sasl = listener.processSaslConnect(protonTransport);
+                                break;
+                            default:
+                        }
+                        buffer = header.getBuffer();
+                    } else {
+                        buffer = (Buffer) command;
+                    }
+                    protonTransport.input(buffer.data, buffer.offset, buffer.length);
+                    fireListenerEvents();
+                    pumpOut();
+                } catch (Exception e) {
+                    listener.processFailure(e);
+                }
+            }
+
+            public void onRefill() {
+                pumpOut();
+            }
+
+            @Override
+            public void onTransportFailure(IOException error) {
+                stop(Dispatch.NOOP);
+                listener.processTransportFailure(error);
+            }
+        });
+    }
+
+    boolean hawtdispatchClosed = false;
+
+    public static final EnumSet<EndpointState> UNINITIALIZED_SET = EnumSet.of(EndpointState.UNINITIALIZED);
+    public static final EnumSet<EndpointState> INITIALIZED_SET = EnumSet.complementOf(UNINITIALIZED_SET);
+    public static final EnumSet<EndpointState> ACTIVE_STATE = EnumSet.of(EndpointState.ACTIVE);
+    public static final EnumSet<EndpointState> CLOSED_STATE = EnumSet.of(EndpointState.CLOSED);
+
+    public void pumpOut() {
+        if(hawtdispatchClosed) {
+            return;
+        }
+        int size = hawtdispatchTransport.getProtocolCodec().getWriteBufferSize();
+        byte data[] = new byte[size];
+        boolean done = false;
+        while( !done && !hawtdispatchTransport.full() ) {
+            int count = protonTransport.output(data, 0, size);
+            if( count > 0 ) {
+                hawtdispatchTransport.offer(new Buffer(data, 0, count));
+            } else {
+                done = true;
+            }
+        }
+        if( !hawtdispatchTransport.full() ) {
+            listener.processRefill();
+        }
+    }
+
+    AmqpListener listener = new AmqpListener();
+
+    public DispatchQueue queue() {
+        return getHawtdispatchTransport().getDispatchQueue();
+    }
+
+    class ProcessedTask extends Task {
+        private final Object value;
+
+        ProcessedTask(Object value) {
+            this.value = value;
+        }
+
+        @Override
+        public void run() {
+            queue().assertExecuting();
+            endpointsBeingProcessed.remove(value);
+            pumpOut();
+        }
+    }
+
+
+    public void fireListenerEvents() {
+
+        if( sasl!=null ) {
+            sasl = listener.processSaslEvent(sasl);
+            if( sasl==null ) {
+                // once sasl handshake is done.. we need to read the protocol header again.
+                ((AmqpProtocolCodec)this.hawtdispatchTransport.getProtocolCodec()).readProtocolHeader();
+            }
+        }
+
+        if(protonConnection.getLocalState() == EndpointState.UNINITIALIZED && protonConnection.getRemoteState() != EndpointState.UNINITIALIZED && !endpointsBeingProcessed.contains(protonConnection))
+        {
+            endpointsBeingProcessed.add(protonConnection);
+            listener.processConnectionOpen(protonConnection, new ProcessedTask(protonConnection));
+        }
+
+        Session session = protonConnection.sessionHead(UNINITIALIZED_SET, INITIALIZED_SET);
+        while(session != null)
+        {
+            if( !endpointsBeingProcessed.contains(session) ) {
+                endpointsBeingProcessed.add(session);
+                listener.proccessSessionOpen(session, new ProcessedTask(session));
+            }
+            session = session.next(UNINITIALIZED_SET, INITIALIZED_SET);
+        }
+
+        Link link = protonConnection.linkHead(UNINITIALIZED_SET, INITIALIZED_SET);
+        while(link != null)
+        {
+            if( !endpointsBeingProcessed.contains(link) ) {
+                endpointsBeingProcessed.add(link);
+                link.setSource(link.getRemoteSource());
+                link.setTarget(link.getRemoteTarget());
+                ProcessedTask onComplete = new ProcessedTask(link);
+                if( link instanceof Sender) {
+                    listener.processSenderOpen((Sender) link, onComplete);
+                } else {
+                    listener.processReceiverOpen((Receiver) link, onComplete);
+                }
+            }
+            link = link.next(UNINITIALIZED_SET, INITIALIZED_SET);
+        }
+
+
+        Delivery delivery = protonConnection.getWorkHead();
+        while(delivery != null)
+        {
+            if(delivery.getLink() instanceof Receiver) {
+                listener.processDelivery((Receiver) delivery.getLink(), delivery);
+            } else {
+                listener.processDelivery((Sender) delivery.getLink(), delivery);
+            }
+            delivery = delivery.getWorkNext();
+        }
+
+        link = protonConnection.linkHead(ACTIVE_STATE, CLOSED_STATE);
+        while(link != null)
+        {
+            if( !endpointsBeingProcessed.contains(link) ) {
+                endpointsBeingProcessed.add(link);
+                ProcessedTask onComplete = new ProcessedTask(link);
+                if( link instanceof Receiver) {
+                    listener.processReceiverClose((Receiver) link, onComplete);
+                } else {
+                    listener.processSenderClose((Sender) link, onComplete);
+                }
+            }
+            link = link.next(ACTIVE_STATE, CLOSED_STATE);
+        }
+
+        session = protonConnection.sessionHead(ACTIVE_STATE, CLOSED_STATE);
+        while(session != null)
+        {
+            //TODO - close links?
+            if( !endpointsBeingProcessed.contains(session) ) {
+                endpointsBeingProcessed.add(session);
+                listener.processSessionClose(session, new ProcessedTask(session));
+            }
+            session = session.next(ACTIVE_STATE, CLOSED_STATE);
+        }
+        if(protonConnection.getLocalState() == EndpointState.ACTIVE && protonConnection.getRemoteState() == EndpointState.CLOSED && !endpointsBeingProcessed.contains(protonConnection))
+        {
+            listener.processConnectionClose(protonConnection, new ProcessedTask(protonConnection));
+            protonConnection.close();
+        }
+
+    }
+
+    public AmqpListener getListener() {
+        return listener;
+    }
+
+    public void setListener(AmqpListener listener) {
+        this.listener = listener;
+    }
+
+    public void start(Task onComplete) {
+        hawtdispatchTransport.start(onComplete);
+    }
+
+    public void stop(Task onComplete) {
+        hawtdispatchClosed = true;
+        hawtdispatchTransport.stop(onComplete);
+    }
+
+    public ConnectionImpl getProtonConnection() {
+        return protonConnection;
+    }
+
+    public Transport getHawtdispatchTransport() {
+        return hawtdispatchTransport;
+    }
+
+}

Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpHeader.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpHeader.java?rev=1405942&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpHeader.java (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpHeader.java Mon Nov  5 20:32:47 2012
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.amqp.hawtdispatch;
+
+import org.fusesource.hawtbuf.Buffer;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class AmqpHeader {
+
+    static final Buffer PREFIX = new Buffer(new byte[]{
+      'A', 'M', 'Q', 'P'
+    });
+
+    private Buffer buffer;
+
+    public AmqpHeader(){
+        this(new Buffer(new byte[]{
+          'A', 'M', 'Q', 'P', 0, 1, 0, 0
+        }));
+    }
+
+    public AmqpHeader(Buffer buffer){
+        setBuffer(buffer);
+    }
+
+    public int getProtocolId() {
+        return buffer.get(4) & 0xFF;
+    }
+    public void setProtocolId(int value) {
+        buffer.data[buffer.offset+4] = (byte) value;
+    }
+
+    public int getMajor() {
+        return buffer.get(5) & 0xFF;
+    }
+    public void setMajor(int value) {
+        buffer.data[buffer.offset+5] = (byte) value;
+    }
+
+    public int getMinor() {
+        return buffer.get(6) & 0xFF;
+    }
+    public void setMinor(int value) {
+        buffer.data[buffer.offset+6] = (byte) value;
+    }
+
+    public int getRevision() {
+        return buffer.get(7) & 0xFF;
+    }
+    public void setRevision(int value) {
+        buffer.data[buffer.offset+7] = (byte) value;
+    }
+
+    public Buffer getBuffer() {
+        return buffer;
+    }
+    public void setBuffer(Buffer value) {
+        if( !value.startsWith(PREFIX) || value.length()!=8 ) {
+            throw new IllegalArgumentException("Not an AMQP header buffer");
+        }
+        buffer = value.buffer();
+    }
+
+
+    @Override
+    public String toString() {
+        return buffer.toString();
+    }
+}

Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpListener.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpListener.java?rev=1405942&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpListener.java (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpListener.java Mon Nov  5 20:32:47 2012
@@ -0,0 +1,78 @@
+package org.apache.activemq.apollo.amqp.hawtdispatch;
+
+import org.apache.qpid.proton.engine.*;
+import org.apache.qpid.proton.engine.impl.TransportImpl;
+import org.apache.qpid.proton.type.messaging.Accepted;
+import org.fusesource.hawtdispatch.Task;
+
+import java.io.IOException;
+
+/**
+* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+*/
+public class AmqpListener {
+
+    public Sasl processSaslConnect(TransportImpl protonTransport) {
+        return null;
+    }
+
+    public Sasl processSaslEvent(Sasl sasl) {
+        return sasl;
+    }
+
+    public void processConnectionOpen(Connection conn, Task onComplete) {
+        conn.open();
+        onComplete.run();
+    }
+    public void processConnectionClose(Connection conn, Task onComplete){
+        conn.close();
+        onComplete.run();
+    }
+
+    public void proccessSessionOpen(Session session, Task onComplete){
+        session.open();
+        onComplete.run();
+    }
+    public void processSessionClose(Session session, Task onComplete){
+        session.close();
+        onComplete.run();
+    }
+
+    public void processSenderOpen(Sender sender, Task onComplete) {
+        sender.close();
+        onComplete.run();
+    }
+    public void processSenderClose(Sender sender, Task onComplete){
+        sender.close();
+        onComplete.run();
+    }
+
+    public void processReceiverOpen(Receiver receiver, Task onComplete) {
+        receiver.open();
+        onComplete.run();
+    }
+    public void processReceiverClose(Receiver receiver, Task onComplete) {
+        receiver.close();
+        onComplete.run();
+    }
+
+    public void processDelivery(Receiver receiver, Delivery delivery){
+    }
+
+    public void processDelivery(Sender sender, Delivery delivery) {
+    }
+
+    public void processFailure(Throwable e) {
+        e.printStackTrace();
+    }
+
+    public void processRefill() {
+    }
+
+    public void processTransportConnected() {
+    }
+
+    public void processTransportFailure(IOException e) {
+        e.printStackTrace();
+    }
+}

Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpProtocolCodec.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpProtocolCodec.java?rev=1405942&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpProtocolCodec.java (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpProtocolCodec.java Mon Nov  5 20:32:47 2012
@@ -0,0 +1,99 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.activemq.apollo.amqp.hawtdispatch;
+
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtdispatch.transport.AbstractProtocolCodec;
+
+import java.io.IOException;
+
+/**
+ * A HawtDispatch protocol codec that encodes/decodes AMQP 1.0 frames.
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class AmqpProtocolCodec extends AbstractProtocolCodec {
+
+    @Override
+    protected void encode(Object object) throws IOException {
+        nextWriteBuffer.write((Buffer) object);
+    }
+
+    @Override
+    protected Action initialDecodeAction() {
+        return new Action() {
+            public Object apply() throws IOException {
+                Buffer magic = readBytes(8);
+                if (magic != null) {
+                    nextDecodeAction = readFrameSize;
+                    return new AmqpHeader(magic);
+                } else {
+                    return null;
+                }
+            }
+        };
+    }
+
+    private final Action readFrameSize = new Action() {
+        public Object apply() throws IOException {
+            Buffer sizeBytes = peekBytes(4);
+            if (sizeBytes != null) {
+                int size = sizeBytes.bigEndianEditor().readInt();
+                if (size < 8) {
+                    throw new IOException(String.format("specified frame size %d smaller than minimum frame size", size));
+                }
+                // TODO: check frame min and max size..
+                nextDecodeAction = readFrame(size);
+                return nextDecodeAction.apply();
+            } else {
+                return null;
+            }
+        }
+    };
+
+
+    private final Action readFrame(final int size) {
+        return new Action() {
+            public Object apply() throws IOException {
+                Buffer frameData = readBytes(size);
+                if (frameData != null) {
+                    nextDecodeAction = readFrameSize;
+                    return frameData;
+                } else {
+                    return null;
+                }
+            }
+        };
+    }
+
+    public int getReadBytesPendingDecode() {
+        return readBuffer.position() - readStart;
+    }
+
+    public void skipProtocolHeader() {
+        nextDecodeAction = readFrameSize;
+    }
+
+    public void readProtocolHeader() {
+        nextDecodeAction = initialDecodeAction();
+    }
+
+}