You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/11/05 21:32:49 UTC
svn commit: r1405942 [1/2] - in /activemq/activemq-apollo/trunk: apollo-amqp/
apollo-amqp/src/main/resources/META-INF/services/org.apache.activemq.apollo/
apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/
apollo-amqp/src/main/scala/org/apache...
Author: chirino
Date: Mon Nov 5 20:32:47 2012
New Revision: 1405942
URL: http://svn.apache.org/viewvc?rev=1405942&view=rev
Log:
Switching the apollo-amqp module to use the qpid-proton lib to talk AMQP.
Added:
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpMessage.scala
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpConnection.java
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpHeader.java
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpListener.java
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpProtocolCodec.java
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/DroppingWritableBuffer.java
activemq/activemq-apollo/trunk/apollo-amqp/src/test-swiftmq/scala/org/apache/activemq/apollo/amqp/test/
activemq/activemq-apollo/trunk/apollo-amqp/src/test-swiftmq/scala/org/apache/activemq/apollo/amqp/test/SwiftMQClientTest.scala
- copied, changed from r1405941, activemq/activemq-apollo/trunk/apollo-amqp/src/test-swiftmq/scala/org/apache/activemq/apollo/amqp/SwiftMQClientTest.scala
activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/provider.properties
- copied, changed from r1405941, activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index
activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/hawtdispatch/
activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/hawtdispatch/TransportConnectionTest.java
activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/joram/
activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/joram/ApolloAdmin.java
activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/joram/JoramJmsTest.java
activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/test/
activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/test/AmqpTest.scala
activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/test/QpidJmsTest.scala
Removed:
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpCodec.scala
activemq/activemq-apollo/trunk/apollo-amqp/src/test-swiftmq/scala/org/apache/activemq/apollo/amqp/SwiftMQClientTest.scala
activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/AmqpTest.scala
activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/FuseSourceClientTest.scala
Modified:
activemq/activemq-apollo/trunk/apollo-amqp/pom.xml
activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocol.scala
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
Modified: activemq/activemq-apollo/trunk/apollo-amqp/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/pom.xml?rev=1405942&r1=1405941&r2=1405942&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/pom.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/pom.xml Mon Nov 5 20:32:47 2012
@@ -44,11 +44,10 @@
<artifactId>apollo-broker</artifactId>
<version>99-trunk-SNAPSHOT</version>
</dependency>
-
<dependency>
- <groupId>org.fusesource.fuse-extra</groupId>
- <artifactId>fusesource-amqp</artifactId>
- <version>99-master-SNAPSHOT</version>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-proton</artifactId>
+ <version>1.0-SNAPSHOT</version>
</dependency>
<!-- Scala Support -->
@@ -102,6 +101,27 @@
<scope>test</scope>
</dependency>
+ <!-- Joram JMS conformance tests -->
+ <dependency>
+ <groupId>org.fusesource.joram-jms-tests</groupId>
+ <artifactId>joram-jms-tests</artifactId>
+ <version>1.0</version>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- qpid jms client -->
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-amqp-1-0-client-jms</artifactId>
+ <version>0.18</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.geronimo.specs</groupId>
+ <artifactId>geronimo-jms_1.1_spec</artifactId>
+ <version>1.1.1</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>junit</groupId>
Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index?rev=1405942&r1=1405941&r2=1405942&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index Mon Nov 5 20:32:47 2012
@@ -14,4 +14,4 @@
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
-org.apache.activemq.apollo.amqp.AmqpProtocolFactory
\ No newline at end of file
+org.apache.activemq.apollo.amqp.AmqpProtocol
\ No newline at end of file
Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpMessage.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpMessage.scala?rev=1405942&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpMessage.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpMessage.scala Mon Nov 5 20:32:47 2012
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.amqp
+
+
+import org.apache.activemq.apollo.broker.protocol
+import protocol.{MessageCodecFactory, MessageCodec}
+import hawtdispatch.DroppingWritableBuffer
+import java.nio.ByteBuffer
+import org.apache.qpid.proton.codec.{WritableBuffer, CompositeWritableBuffer}
+import org.fusesource.hawtbuf.Buffer._
+import org.apache.activemq.apollo.broker.Message
+import org.apache.activemq.apollo.broker.store.MessageRecord
+import org.fusesource.hawtbuf.Buffer
+import org.fusesource.hawtbuf.AsciiBuffer
+import org.fusesource.hawtbuf.UTF8Buffer
+
+object AmqpMessageCodecFactory extends MessageCodecFactory.Provider {
+ def create = Array[MessageCodec](AmqpMessageCodec)
+}
+
+object AmqpMessageCodec extends MessageCodec {
+
+ def ascii_id = ascii("amqp-1.0")
+ def id = "amqp-1.0"
+
+ def encode(message: Message):MessageRecord = {
+ val rc = new MessageRecord
+ rc.codec = ascii_id
+ rc.buffer = message.encoded
+ rc
+ }
+
+ def decode(message: MessageRecord) = {
+ assert( message.codec == ascii_id )
+ new AmqpMessage(message.buffer, null)
+ }
+
+}
+
+
+object AmqpMessage {
+ val SENDER_CONTAINER_KEY = "sender-container"
+}
+import AmqpMessage._
+
+class AmqpMessage(private var encoded_buffer:Buffer, private var decoded_message:org.apache.qpid.proton.message.Message=null) extends org.apache.activemq.apollo.broker.Message {
+
+ /**
+ * The encoder/decoder of the message
+ */
+ def codec = AmqpMessageCodec
+
+ def decoded = {
+ if( decoded_message==null ) {
+ val amqp = new org.apache.qpid.proton.message.Message();
+ var offset = encoded_buffer.offset
+ var len = encoded_buffer.length
+ while( len > 0 ) {
+ var decoded = amqp.decode(encoded_buffer.data, offset, len);
+ assert(decoded > 0, "Make progress decoding the message")
+ offset += decoded;
+ len -= decoded;
+ }
+ decoded_message = amqp
+
+ }
+ decoded_message
+ }
+
+ override def encoded = {
+ if( encoded_buffer == null ) {
+ var buffer = ByteBuffer.wrap(new Array[Byte](1024*4));
+ val overflow = new DroppingWritableBuffer();
+ var c = decoded_message.encode(new CompositeWritableBuffer(new WritableBuffer.ByteBufferWrapper(buffer), overflow));
+ if( overflow.position() > 0 ) {
+ buffer = ByteBuffer.wrap(new Array[Byte](1024*4+overflow.position()));
+ c = decoded_message.encode(new WritableBuffer.ByteBufferWrapper(buffer));
+ }
+ encoded_buffer = new Buffer(buffer.array(), 0, c)
+ }
+ encoded_buffer
+ }
+
+ def getBodyAs[T](toType : Class[T]): T = {
+ if (toType == classOf[Buffer]) {
+ encoded
+ } else if( toType == classOf[String] ) {
+ encoded.utf8
+ } else if (toType == classOf[AsciiBuffer]) {
+ encoded.ascii
+ } else if (toType == classOf[UTF8Buffer]) {
+ encoded.utf8
+ } else {
+ null
+ }
+ }.asInstanceOf[T]
+
+ def getLocalConnectionId: AnyRef = {
+ if ( decoded.getDeliveryAnnotations!=null ) {
+ decoded.getDeliveryAnnotations.getValue.get(SENDER_CONTAINER_KEY) match {
+ case x:String => x
+ case _ => null
+ }
+ } else {
+ null
+ }
+ }
+
+ def getProperty(name: String) = {
+ if( decoded.getApplicationProperties !=null ) {
+ decoded.getApplicationProperties.getValue.get(name).asInstanceOf[AnyRef]
+ } else {
+ null
+ }
+ }
+
+ def release() {}
+ def retain() {}
+ def retained(): Int = 0
+}
Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocol.scala?rev=1405942&r1=1405941&r2=1405942&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocol.scala Mon Nov 5 20:32:47 2012
@@ -17,25 +17,20 @@
package org.apache.activemq.apollo.amqp
import _root_.org.fusesource.hawtbuf._
+import hawtdispatch.AmqpProtocolCodec
import org.apache.activemq.apollo.broker._
-import org.apache.activemq.apollo.broker.protocol.{MessageCodecFactory, MessageCodec, ProtocolCodecFactory, Protocol}
-import org.apache.activemq.apollo.broker.store._
-import AmqpCodec._
-import org.fusesource.amqp.codec.AMQPProtocolCodec
+import org.apache.activemq.apollo.broker.protocol.Protocol
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-/**
- * Creates AmqpCodec objects that encode/decode the
- * <a href="http://activemq.apache.org/amqp/">Amqp</a> protocol.
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-class AmqpProtocolCodecFactory extends ProtocolCodecFactory.Provider {
- def id = PROTOCOL
+object AmqpProtocol extends Protocol {
+
+ def id = "amqp"
+ val PROTOCOL_ID = Buffer.ascii(id)
+ val PROTOCOL_MAGIC = new Buffer(Array[Byte]('A', 'M', 'Q', 'P'))
- def createProtocolCodec(connector:Connector) = new AMQPProtocolCodec();
+ def createProtocolCodec(connector:Connector) = new AmqpProtocolCodec();
def isIdentifiable() = true
@@ -48,26 +43,21 @@ class AmqpProtocolCodecFactory extends P
header.startsWith(PROTOCOL_MAGIC)
}
}
-}
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-object AmqpProtocol extends AmqpProtocolCodecFactory with Protocol {
def createProtocolHandler = new AmqpProtocolHandler
}
-object AmqpMessageCodecFactory extends MessageCodecFactory.Provider {
- def create = Array[MessageCodec](AmqpMessageCodec)
-}
-
- /**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-object AmqpMessageCodec extends MessageCodec {
- def id = PROTOCOL
- def encode(message: Message) = AmqpCodec.encode(message)
- def decode(message: MessageRecord) = AmqpCodec.decode(message)
-}
+//object AmqpMessageCodecFactory extends MessageCodecFactory.Provider {
+// def create = Array[MessageCodec](AmqpMessageCodec)
+//}
+//
+// /**
+// * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+// */
+//object AmqpMessageCodec extends MessageCodec {
+// def id = AmqpProtocol.id
+// def encode(message: Message) = AmqpCodec.encode(message)
+// def decode(message: MessageRecord) = AmqpCodec.decode(message)
+//}
Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala?rev=1405942&r1=1405941&r2=1405942&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala Mon Nov 5 20:32:47 2012
@@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,95 +17,41 @@
package org.apache.activemq.apollo.amqp
import java.util.concurrent.TimeUnit
-import java.util.Date
-import scala.collection.mutable.{ListBuffer, HashMap}
+import collection.mutable.{ListBuffer, HashMap}
import org.fusesource.hawtdispatch._
import org.fusesource.hawtbuf._
-import org.fusesource.hawtbuf.Buffer._
import org.apache.activemq.apollo.util._
import org.apache.activemq.apollo.dto._
import org.apache.activemq.apollo.broker._
import org.apache.activemq.apollo.util.path.{PathParser, Path, LiteralPart}
-import org.apache.activemq.apollo.selector.SelectorParser
-import org.apache.activemq.apollo.filter.{BooleanExpression, FilterException}
-import org.apache.activemq.apollo.broker.protocol.ProtocolHandler
+import protocol.ProtocolHandler
import org.apache.activemq.apollo.broker.security.SecurityContext
import org.apache.activemq.apollo.amqp.dto._
-
-import org.fusesource.amqp._
-import org.fusesource.amqp.callback._
-import org.fusesource.amqp.callback.Callback
-import org.fusesource.amqp.types._
-
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-object AMQPMessage {
-
- def apply(annotated:Envelope):AMQPMessage = {
- val payload = MessageSupport.toBuffer(annotated)
- val rc = AMQPMessage(payload)
- rc._annotated = annotated
- rc
- }
-}
-
-case class AMQPMessage(payload:Buffer) extends org.apache.activemq.apollo.broker.Message {
- import AmqpProtocolHandler._
- def codec = AmqpMessageCodec
-
- var _annotated:Envelope = _
- def annotated = {
- if ( _annotated ==null ) {
- _annotated = MessageSupport.decodeEnvelope(payload)
- }
- _annotated
- }
-
- def getBodyAs[T](toType : Class[T]): T = {
- if (toType == classOf[Buffer]) {
- payload
- } else if( toType == classOf[String] ) {
- payload.utf8
- } else if (toType == classOf[AsciiBuffer]) {
- payload.ascii
- } else if (toType == classOf[UTF8Buffer]) {
- payload.utf8
- } else {
- null
- }
- }.asInstanceOf[T]
-
- def getLocalConnectionId: AnyRef = annotated.getDeliveryAnnotations.getValue.get(SENDER_CONTAINER_KEY) match {
- case x:AMQPString => x.getValue
- case _ => null
- }
-
- def getProperty(name: String)= annotated match {
- case null => null
- case annotated =>
- annotated.getMessage.getApplicationProperties match {
- case null => null
- case props =>
- props.getValue.get(new AMQPString(name)).asInstanceOf[Object]
- }
- }
-
- def release() {}
- def retain() {}
- def retained(): Int = 0
-}
-
+import hawtdispatch.{AmqpProtocolCodec, AmqpListener, AmqpConnection}
+import org.apache.qpid.proton.engine
+import engine.impl.{LinkImpl, TransportImpl}
+import engine.{Receiver, Sasl, Sender, Link, EndpointError, EndpointState}
+import org.fusesource.hawtbuf.Buffer._
+import org.apache.qpid.proton.`type`.transaction.{TransactionalState, Coordinator}
+import org.apache.qpid.proton.`type`.messaging.{Data, Source, Target}
+import org.apache.activemq.apollo.broker.Delivery
+import org.apache.activemq.apollo.filter.{FilterException, BooleanExpression}
+import org.apache.qpid.proton.`type`.{Symbol => AmqpSymbol, Binary, DescribedType}
+import org.apache.activemq.apollo.selector.SelectorParser
+import org.apache.qpid.proton.`type`.transport.SenderSettleMode
+import java.util
+import java.io.IOException
+import org.apache.activemq.apollo.broker.FullSink
+import org.apache.activemq.apollo.broker.SubscriptionAddress
object AmqpProtocolHandler extends Log {
- val SENDER_CONTAINER_KEY = new AMQPString("sender-container")
// How long we hold a failed connection open so that the remote end
// can get the resulting error message.
- val DEFAULT_DIE_DELAY = 5*1000L
- val WAITING_ON_CLIENT_REQUEST = ()=> "client request"
+ val DEFAULT_DIE_DELAY = 5 * 1000L
+ val WAITING_ON_CLIENT_REQUEST = () => "client request"
val DEFAULT_DETINATION_PARSER = new DestinationParser
DEFAULT_DETINATION_PARSER.queue_prefix = "/queue/"
@@ -118,25 +64,43 @@ object AmqpProtocolHandler extends Log {
DEFAULT_DETINATION_PARSER.any_child_wildcard = "*"
DEFAULT_DETINATION_PARSER.any_descendant_wildcard = "**"
+ val JMS_SELECTOR = AmqpSymbol.valueOf("jms-selector")
+ val EMPTY_BYTE_ARRAY = Array[Byte]()
+
+ def toBytes(value: Long): Array[Byte] = {
+ val buffer: Buffer = new Buffer(8)
+ buffer.bigEndianEditor.writeLong(value)
+ return buffer.data
+ }
+
+ private def toLong(value: Binary): Long = {
+ val buffer: Buffer = new Buffer(value.getArray, value.getArrayOffset, value.getLength)
+ return buffer.bigEndianEditor.readLong
+ }
}
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
class AmqpProtocolHandler extends ProtocolHandler {
+
import AmqpProtocolHandler._
val security_context = new SecurityContext
- var connection_log:Log = AmqpProtocolHandler
- var host:VirtualHost = null
+ var connection_log: Log = AmqpProtocolHandler
+ var host: VirtualHost = null
var waiting_on = WAITING_ON_CLIENT_REQUEST
- var config:AmqpDTO = _
+ var config: AmqpDTO = _
var dead = false
+ var protocol_convert = "full"
def session_id = security_context.session_id
- def protocol = AmqpCodec.PROTOCOL
+
+ def protocol = AmqpProtocol.id
+
def broker = connection.connector.broker
+
def queue = connection.dispatch_queue
def die_delay = {
@@ -144,7 +108,6 @@ class AmqpProtocolHandler extends Protoc
}
lazy val buffer_size = MemoryPropertyEditor.parse(Option(config.buffer_size).getOrElse("640k")).toInt
- var amqp_connection:AMQPConnection = _
var messages_sent = 0L
var messages_received = 0L
@@ -152,33 +115,39 @@ class AmqpProtocolHandler extends Protoc
var rc = new AmqpConnectionStatusDTO
rc.protocol_version = "1.0.0"
rc.user = security_context.user
-// rc.subscription_count = consumers.size
+ // rc.subscription_count = consumers.size
rc.waiting_on = waiting_on()
rc.messages_sent = messages_sent
rc.messages_received = messages_received
rc
}
- class ProtocolException(msg:String) extends RuntimeException(msg)
+ class ProtocolException(msg: String) extends RuntimeException(msg)
+
class Break extends RuntimeException
- private def async_die(msg:String, e:Throwable=null) = try {
- die(msg, e)
+ private def async_die(error_code: String, msg: String, e: Throwable = null) = try {
+ die(error_code, msg, e)
} catch {
- case x:Break=>
+ case x: Break =>
}
- private def die[T](msg:String, e:Throwable=null):T = {
- if( e!=null) {
+ private def die[T](error_code: String, msg: String, e: Throwable = null): T = {
+ if (e != null) {
connection_log.info(e, "AMQP connection '%s' error: %s", security_context.remote_address, msg, e)
} else {
connection_log.info("AMQP connection '%s' error: %s", security_context.remote_address, msg)
}
- if( !dead ) {
+ if (!dead) {
dead = true
- waiting_on = ()=>"shutdown"
+ waiting_on = () => "shutdown"
connection.transport.resumeRead
+ on_transport_disconnected()
+ proton.setLocalError(amqp_error(error_code, msg));
+ proton.close()
+ pump_out
+
// TODO: if there are too many open connections we should just close the connection
// without waiting for the error to get sent to the client.
queue.after(die_delay, TimeUnit.MILLISECONDS) {
@@ -188,398 +157,757 @@ class AmqpProtocolHandler extends Protoc
throw new Break()
}
+ def amqp_error(name: String = "", message: String = "") = new EndpointError(name, message)
+
+ def suspend_read(reason: => String) = {
+ waiting_on = reason _
+ connection.transport.suspendRead
+ // heart_beat_monitor.suspendRead
+ }
+
+ def resume_read() = {
+ waiting_on = WAITING_ON_CLIENT_REQUEST
+ connection.transport.resumeRead
+ // heart_beat_monitor.resumeRead
+ }
+
+ val amqp_connection = new AmqpConnection()
+
+ def codec = connection.transport.getProtocolCodec.asInstanceOf[AmqpProtocolCodec]
+
+ def proton = amqp_connection.getProtonConnection
+
+ def pump_out = {
+ queue.assertExecuting()
+ amqp_connection.pumpOut()
+ }
+
+ override def on_transport_connected() = sys.error("should not get called")
+ override def on_transport_command(command: AnyRef): Unit = sys.error("should not get called")
+
override def set_connection(connection: BrokerConnection) = {
super.set_connection(connection)
import collection.JavaConversions._
+ security_context.connector_id = connection.connector.id
+ security_context.certificates = connection.certificates
+ security_context.local_address = connection.transport.getLocalAddress
+ security_context.remote_address = connection.transport.getRemoteAddress
+
val connector_config = connection.connector.config.asInstanceOf[AcceptingConnectorDTO]
- config = connector_config.protocols.find( _.isInstanceOf[AmqpDTO]).map(_.asInstanceOf[AmqpDTO]).getOrElse(new AmqpDTO)
+ config = connector_config.protocols.find(_.isInstanceOf[AmqpDTO]).map(_.asInstanceOf[AmqpDTO]).getOrElse(new AmqpDTO)
+ amqp_connection.bind(connection.transport)
+ amqp_connection.setListener(amqp_listener)
+ }
+
+ val amqp_listener = new AmqpListener() {
+
+ override def processSaslConnect(protonTransport: TransportImpl) = {
+ val sasl = protonTransport.sasl();
+ sasl.setMechanisms(Array("ANONYMOUS", "PLAIN"));
+ sasl.server();
+ sasl
+ }
- val options = new AMQPServerConnectionOptions
- options.setTransport(connection.transport);
- options.setMaxFrameSize(1024*4)
- options.setIdleTimeout(-1);
- options.setLogger(new AMQPConnectionOptions.Logger {
- override def _debug(format: String, args: Array[AnyRef]) {
- println(System.currentTimeMillis()+": "+format.format(args: _*))
- // connection_log.debug(format, args:_*)
- }
- override def _trace(format: String, args: Array[AnyRef]) {
- println(System.currentTimeMillis()+": "+format.format(args: _*))
- // connection_log.trace(format, args:_*)
- }
- })
- options.setListener(new AMQPConnection.Listener(){
-
- override def onBegin(begin: Begin) = {
- val rc = new AMQPServerSessionOptions
- rc.setIncomingWindow(100)
- rc.setOutgoingWindow(100)
- rc.setListener(session_listener)
- rc
+
+ override def processSaslEvent(sasl: Sasl): Sasl = {
+ // Lets try to complete the sasl handshake.
+ if (sasl.getRemoteMechanisms().length > 0) {
+ if ("PLAIN" == sasl.getRemoteMechanisms()(0)) {
+ val data = new Array[Byte](sasl.pending());
+ sasl.recv(data, 0, data.length);
+ val parts = new Buffer(data).split(0);
+ if (parts.length > 0) {
+ security_context.user = parts(0).utf8.toString
+ }
+ if (parts.length > 1) {
+ security_context.password = parts(1).utf8.toString
+ }
+ // We can't really auth at this point since we don't know the client id yet.. :(
+ sasl.done(Sasl.SaslOutcome.PN_SASL_OK);
+ null
+ } else if ("ANONYMOUS" == sasl.getRemoteMechanisms()(0)) {
+ sasl.done(Sasl.SaslOutcome.PN_SASL_OK);
+ null
+ } else {
+ sasl.done(Sasl.SaslOutcome.PN_SASL_PERM);
+ null
+ }
+ } else {
+ sasl
}
+ }
+
+ override def processConnectionOpen(conn: engine.Connection, onComplete: Task) {
+ println("connection opened.")
+ security_context.session_id = Some(conn.getRemoteContainer())
- override def onAccepted(session: AMQPSession) {
- connection_log.info("accepted: "+session)
+ suspend_read("host lookup")
+ broker.dispatch_queue {
+ val virtual_host = proton.getRemoteHostname match {
+ case null => broker.default_virtual_host
+ case host => broker.get_virtual_host(ascii(host))
+ }
+ queue {
+ resume_read
+ if (virtual_host == null) {
+ onComplete.run()
+ async_die("invalid virtual host", "invalid virtual host: " + proton.getRemoteHostname)
+ } else if (!virtual_host.service_state.is_started) {
+ onComplete.run()
+ async_die("virtual host not ready", "")
+ } else {
+ connection_log = virtual_host.connection_log
+ host = virtual_host
+ proton.setLocalContainerId(virtual_host.id)
+ // proton.open()
+ // callback.onSuccess(response)
+ if (virtual_host.authenticator != null && virtual_host.authorizer != null) {
+ suspend_read("authenticating and authorizing connect")
+ virtual_host.authenticator.authenticate(security_context) {
+ auth_failure =>
+ queue {
+ if (auth_failure != null) {
+ onComplete.run()
+ async_die("Authentication failure", "%s. Credentials=%s".format(auth_failure, security_context.credential_dump))
+ } else if (!virtual_host.authorizer.can(security_context, "connect", connection.connector)) {
+ onComplete.run()
+ async_die("Authorization failure", "Not authorized to connect to connector '%s'. Principals=%s".format(connection.connector.id, security_context.principal_dump))
+ } else if (!virtual_host.authorizer.can(security_context, "connect", virtual_host)) {
+ onComplete.run()
+ async_die("Authorization failure", "Not authorized to connect to virtual host '%s'. Principals=%s".format(virtual_host.id, security_context.principal_dump))
+ } else {
+ resume_read
+ proton.open()
+ onComplete.run()
+ }
+ }
+ }
+ } else {
+ proton.open()
+ onComplete.run()
+ }
+ }
+ }
}
- override def onException(error: Throwable) {
- error.printStackTrace();
+ }
+
+ override def processReceiverOpen(receiver: Receiver, onComplete: Task) {
+ // Client producer is attaching..
+ receiver.setSource(receiver.getRemoteSource());
+ receiver.setTarget(receiver.getRemoteTarget());
+
+ receiver.getRemoteTarget() match {
+ case target: Coordinator =>
+ // pumpProtonToSocket();
+ // receiver.setContext(coordinatorContext);
+ // receiver.flow(1024 * 64);
+ // receiver.open();
+ // pumpProtonToSocket();
+ close_with_error(receiver, "txs not supported")
+ onComplete.run()
+ case amqp_target: Target =>
+
+ val (address, addresses, actualTarget) = decode_target(amqp_target)
+ receiver.setTarget(actualTarget);
+ if (addresses == null) {
+ close_with_error(receiver, "invalid-address", "Invaild address: " + address)
+ onComplete.run()
+ return
+ }
+
+ link_counter += 1
+ val route = new AmqpProducerRoute(link_counter, receiver, addresses)
+ producers += (link_counter -> route)
+
+ host.dispatch_queue {
+ val rc = host.router.connect(route.addresses, route, security_context)
+ queue {
+ println(rc)
+ rc match {
+ case Some(failure) =>
+ println(failure)
+ close_with_error(receiver, "Could not connect", failure)
+ onComplete.run()
+ case None =>
+ println("ok")
+ // If the remote has not closed on us yet...
+ if (receiver.getRemoteState == EndpointState.ACTIVE) {
+ receiver.setContext(route)
+ receiver.flow(1024 * 64);
+ receiver.open()
+ } else {
+ receiver.close()
+ }
+ onComplete.run()
+ }
+ }
+ }
}
+ }
- override def onOpen(request: Open, response: Open, callback: Callback[Open]) {
- handle_open(request, response, callback)
+ override def processReceiverClose(receiver: Receiver, onComplete: Task) {
+ receiver.getContext match {
+ case null =>
+ receiver.close()
+ onComplete.run()
+ case route: AmqpProducerRoute =>
+ // Lets disconnect the route.
+ receiver.setContext(null)
+ host.dispatch_queue {
+ host.router.disconnect(route.addresses, route)
+ queue {
+ receiver.close()
+ producers -= route.id
+ route.release
+ onComplete.run()
+ }
+ }
}
- })
- amqp_connection = AMQP.open(options, new Callback[AMQPConnection] {
- override def onSuccess(value: AMQPConnection) {
- println("AMQP connection is open.")
+ }
+
+ override def processDelivery(receiver: Receiver, delivery: engine.Delivery) {
+ receiver.getContext match {
+ case null =>
+ case route: AmqpProducerRoute => route.process(delivery)
}
- override def onFailure(value: Throwable) {
- println("Failed to open AMQP connection: "+value)
+ }
+
+ override def processSenderOpen(sender: Sender, onComplete: Task) {
+ // Client consumer is attaching..
+ sender.setSource(sender.getRemoteSource());
+ sender.setTarget(sender.getRemoteTarget());
+
+ val source = sender.getRemoteSource().asInstanceOf[Source]
+ val (address, requested_addresses, actual) = decode_source(source)
+ sender.setSource(actual);
+ if (requested_addresses == null) {
+ close_with_error(sender, "invalid-address", "Invaild address: " + address)
+ onComplete.run()
+ return
+ }
+
+ val filter = source.getFilter()
+ val selector = if (filter != null) {
+ val value = filter.get(JMS_SELECTOR).asInstanceOf[DescribedType]
+ if (value != null) {
+ val selector = value.getDescribed().toString()
+ try {
+ (selector, SelectorParser.parse(selector))
+ } catch {
+ case e: FilterException =>
+ close_with_error(sender, "amqp:invalid-field", "Invalid selector expression '%s': %s".format(selector, e.getMessage))
+ onComplete.run()
+ return
+ }
+ } else {
+ null
+ }
+ } else {
+ null
}
- })
- }
- override def on_transport_connected() = sys.error("should not get called")
- override def on_transport_disconnected() = sys.error("should not get called")
- override def on_transport_command(command:AnyRef):Unit = sys.error("should not get called")
+ val presettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
- def suspend_read(reason: =>String) = {
- waiting_on = reason _
- connection.transport.suspendRead
- // heart_beat_monitor.suspendRead
- }
- def resume_read() = {
- waiting_on = WAITING_ON_CLIENT_REQUEST
- connection.transport.resumeRead
- // heart_beat_monitor.resumeRead
+ var browser = false
+ var browser_end = browser && true
+ var exclusive = !browser && false
+ var include_seq: Option[Long] = None
+ val from_seq_opt: Option[Long] = None
+
+ def is_multi_destination = if (requested_addresses.length > 1) {
+ true
+ } else {
+ PathParser.containsWildCards(requested_addresses(0).path)
+ }
+
+ if (from_seq_opt.isDefined && is_multi_destination) {
+ close_with_error(sender, "invalid-from-seq", "The from-seq header is only supported when you subscribe to one destination")
+ onComplete.run()
+ return
+ }
+
+ var persistent = source.getDurable != null && source.getDurable.intValue() == 1
+ val addresses: Array[_ <: BindAddress] = if (persistent) {
+ val dsubs = ListBuffer[BindAddress]()
+ val topics = ListBuffer[BindAddress]()
+ requested_addresses.foreach {
+ address =>
+ address.domain match {
+ case "dsub" => dsubs += address
+ case "topic" => topics += address
+ case _ =>
+ close_with_error(sender, "invalid-from-seq", "A durable link can only be used on a topic destination")
+ onComplete.run()
+ return
+ }
+ }
+ sender.getName()
+ val s = if (selector == null) null else selector._1
+ dsubs += SubscriptionAddress(destination_parser.decode_path(sender.getName), s, topics.toArray)
+ dsubs.toArray
+ } else {
+ requested_addresses
+ }
+
+ val from_seq = from_seq_opt.getOrElse(0L)
+
+
+ link_counter += 1
+ val id = link_counter
+ val consumer = new AmqpConsumer(sender, link_counter, requested_addresses, presettle, selector, browser, exclusive, include_seq, from_seq, browser_end);
+ consumers += (id -> consumer)
+
+ host.dispatch_queue {
+ val rc = host.router.bind(requested_addresses, consumer, security_context)
+ queue {
+ rc match {
+ case Some(reason) =>
+ consumers -= id
+ consumer.release
+ close_with_error(sender, "subscribe-failed", reason)
+ onComplete.run()
+ case None =>
+ sender.setContext(consumer)
+ sender.open()
+ onComplete.run()
+ }
+ }
+ }
+ }
+
+ var gracefully_closed = false
+ override def processFailure(e: Throwable) {
+ var msg = "Internal Server Error: " + e
+ if( connection_log!=AmqpProtocolHandler ) {
+ // but we also want the error on the apollo.log file.
+ warn(e, msg)
+ }
+ async_die("internal-error", msg, e)
+ }
+
+ override def processTransportFailure(error: IOException) {
+ if( !gracefully_closed ) {
+ connection_log.info("Shutting connection '%s' down due to: %s", security_context.remote_address, error)
+ }
+ on_transport_disconnected()
+ }
+
+ override def processConnectionClose(conn: engine.Connection, onComplete: Task) {
+ gracefully_closed = true
+ on_transport_disconnected()
+ super.processConnectionClose(conn, onComplete)
+ queue.after(die_delay, TimeUnit.MILLISECONDS) {
+ connection.stop(NOOP)
+ }
+ }
+
+ override def processRefill() = {
+ for( c <- consumers.values ) {
+ c.session_manager.drain_overflow
+ }
+ }
}
- def handle_open(request: Open, response: Open, callback: Callback[Open]) = {
- broker.dispatch_queue {
- suspend_read("host lookup")
- val host = request.getHostname match {
- case null => broker.default_virtual_host
- case host=> broker.get_virtual_host(ascii(host))
- }
- queue {
- resume_read
- if(host==null) {
- callback.onFailure(new AMQPException("Invalid virtual host: "+host));
- } else if(!host.service_state.is_started) {
- callback.onFailure(new AMQPException("virtual host not ready"));
- } else {
- response.setContainerID(host.id)
- this.host=host
- callback.onSuccess(response)
-// security_context.session_id = Some("%s-%x".format(this.host.config.id, this.host.session_counter.incrementAndGet))
-// connection_log = host.connection_log
-// if( host.authenticator!=null && host.authorizer!=null ) {
-// suspend_read("authenticating and authorizing connect")
-// host.authenticator.authenticate(security_context) { auth_failure=>
-// dispatchQueue {
-// if( auth_failure!=null ) {
-// async_die("%s. Credentials=%s".format(auth_failure, security_context.credential_dump))
-// } else if( !host.authorizer.can(security_context, "connect", connection.connector) ) {
-// async_die("Not authorized to connect to connector '%s'. Principals=%s".format(connection.connector.id, security_context.principal_dump))
-// } else if( !host.authorizer.can(security_context, "connect", this.host) ) {
-// async_die("Not authorized to connect to virtual host '%s'. Principals=%s".format(this.host.id, security_context.principal_dump))
-// } else {
-// resume_read
-// send_connected
-// }
-// }
-// }
-// } else {
-// send_connected
-// }
+ var disconnected = false
+ override def on_transport_disconnected() = {
+ queue.assertExecuting()
+ if( !disconnected ) {
+ disconnected = true
+
+// // Rollback any in-progress transactions..
+// for( (id, tx) <- transactions ) {
+// tx.rollback
+// }
+// transactions.clear()
+//
+ for (producer <- producers.values) {
+ val addresses = producer.addresses
+ host.dispatch_queue {
+ host.router.disconnect(producer.addresses, producer)
+ producer.release()
}
}
+ producers = Map()
+
+
+ for (consumer <- consumers.values) {
+ val addresses = consumer.addresses
+ host.dispatch_queue {
+ host.router.unbind(addresses, consumer, false , security_context)
+ consumer.release()
+ }
+ }
+ consumers = Map()
+ security_context.logout( e => {
+ if(e!=null) {
+ connection_log.info(e, "STOMP connection '%s' log out error: %s", security_context.remote_address, e)
+ }
+ })
+ trace("amqp protocol resources released")
}
}
var destination_parser = DEFAULT_DETINATION_PARSER
var temp_destination_map = HashMap[SimpleAddress, SimpleAddress]()
- def decode_addresses(value:String):Array[SimpleAddress] = {
+ def decode_addresses(value: String): Array[SimpleAddress] = {
val rc = destination_parser.decode_multi_destination(value)
- if( rc==null ) {
+ if (rc == null) {
return null
}
- rc.map { dest =>
- if( dest.domain.startsWith("temp-") ) {
- temp_destination_map.getOrElseUpdate(dest, {
- val parts = LiteralPart("temp") :: LiteralPart(broker.id) :: LiteralPart(session_id.get) :: dest.path.parts
- SimpleAddress(dest.domain.stripPrefix("temp-"), Path(parts))
- })
- } else {
- dest
- }
+ rc.map {
+ dest =>
+ if (dest.domain.startsWith("temp-")) {
+ temp_destination_map.getOrElseUpdate(dest, {
+ val parts = LiteralPart("temp") :: LiteralPart(broker.id) :: LiteralPart(session_id.get) :: dest.path.parts
+ SimpleAddress(dest.domain.stripPrefix("temp-"), Path(parts))
+ })
+ } else {
+ dest
+ }
}
}
- class AmqpProducerRoute(val dest: String, val addresses:Array[SimpleAddress]) extends DeliveryProducerRoute(host.router) {
+ trait ProducerSupport {
+ var current = new ByteArrayOutputStream();
- val key = addresses.toList
- var is_connected = false
+ def receiver: Receiver
- override def send_buffer_size = buffer_size
+ def process(delivery: engine.Delivery): Unit = {
+ if (!delivery.isReadable()) {
+ System.out.println("it was not readable!");
+ return;
+ }
- override def connection = Some(AmqpProtocolHandler.this.connection)
+ if (current == null) {
+ current = new ByteArrayOutputStream();
+ }
- override def connected() = is_connected = true
+ var data = new Array[Byte](1024 * 4);
+ var done = false
+ while (!done) {
+ val count = receiver.recv(data, 0, data.length)
+ if (count > 0) {
+ current.write(data, 0, count);
+ } else {
+ if (count == 0) {
+ // Expecting more deliveries..
+ return;
+ }
+ done = true
+ }
+ }
- override def dispatch_queue = queue
+ receiver.advance();
+ delivery.settle(); // TODO: do this once accepted by the broker.
- refiller = ^ {
- resume_read
+ val buffer = current.toBuffer();
+ current = null;
+ onMessage(delivery, new AmqpMessage(buffer));
}
+
+ def onMessage(delivery: engine.Delivery, buffer: AmqpMessage): Unit
}
- object session_listener extends AMQPSession.Listener{
- override def onAttach(attach: Attach, callback: Callback[AMQPEndpoint]) {
- if( attach.getRole == Role.SENDER.getValue ) {
- val target = attach.getTarget.asInstanceOf[Target]
- target.getAddress match {
- case address:AMQPString =>
- var dest = address.getValue
- decode_addresses(dest) match {
- case null => callback.onFailure(new Exception("Invaild address: "+dest))
- case addresses => attach_sender(attach, dest, addresses, callback)
- }
- case _ =>
- callback.onFailure(new Exception("Invaild address: "+target.getAddress))
- }
- } else {
- val source = attach.getSource.asInstanceOf[Source]
- source.getAddress match {
- case address:AMQPString =>
- var dest = address.getValue
- decode_addresses(dest) match {
- case null => callback.onFailure(new Exception("Invaild address: "+dest))
- case addresses => attach_receiver(attach, dest, addresses, callback)
- }
- case _ =>
- callback.onFailure(new Exception("Invaild address: "+source.getAddress))
- }
+ def decode_target(target: Target) = {
+ var dynamic = target.getDynamic()
+ if (dynamic) {
+ temp_dest_counter += 1
+ val parts = LiteralPart("temp") :: LiteralPart(broker.id) :: LiteralPart(session_id.get) :: LiteralPart(temp_dest_counter.toString) :: Nil
+ val rc = SimpleAddress("queue", Path(parts))
+ val actual = new Target();
+ var address = destination_parser.encode_destination(rc)
+ actual.setAddress(address);
+ actual.setDynamic(true);
+ (address, Array(rc), actual)
+ } else {
+ val address = target.getAddress
+ decode_addresses(address) match {
+ case null =>
+ (address, null, target)
+ case addresses =>
+ (address, addresses, target)
}
}
+ }
- override def onClose(error: Error) {
- if( error!=null ) {
- info("peer closed the AMQP session due to: "+error)
+ def decode_source(source: Source) = {
+ var dynamic = source.getDynamic()
+ if (dynamic) {
+ temp_dest_counter += 1
+ val parts = LiteralPart("temp") :: LiteralPart(broker.id) :: LiteralPart(session_id.get) :: LiteralPart(temp_dest_counter.toString) :: Nil
+ val rc = SimpleAddress("queue", Path(parts))
+ val actual = new Source();
+ var address = destination_parser.encode_destination(rc)
+ actual.setAddress(address);
+ actual.setDynamic(true);
+ (address, Array(rc), actual)
+ } else {
+ val address = source.getAddress
+ decode_addresses(address) match {
+ case null =>
+ (address, null, source)
+ case addresses =>
+ (address, addresses, source)
}
}
-
}
- def attach_sender(attach: Attach, address: String, addresses:Array[SimpleAddress], callback: Callback[AMQPEndpoint]) {
- val target = new AmqpProducerRoute(address, addresses)
- var receiver: AMQPReceiver = null
- // create the producer route...
- val options = new AMQPReceiverOptions();
- options.setSource(attach.getSource.asInstanceOf[Source])
- options.setTarget(attach.getTarget.asInstanceOf[Target])
- options.setName(attach.getName)
- options.setSenderSettleMode(SenderSettleMode.valueOf(attach.getSndSettleMode))
- options.setReceiverSettleMode(ReceiverSettleMode.valueOf(attach.getRcvSettleMode))
- options.setMaxMessageSize(10 * 1024 * 1024);
+ var temp_dest_counter = 0L
- def pump = {
- while (target.is_connected && !target.full && receiver.peek() != null) {
+ class AmqpProducerRoute(val id:Long, val receiver: Receiver, val addresses: Array[SimpleAddress]) extends DeliveryProducerRoute(host.router) with ProducerSupport {
- val amqpDelivery = receiver.poll()
+ val key = addresses.toList
+ var is_connected = false
- // Update the message /w who sent it to us..
- val amqpMessage = amqpDelivery.getMessage;
- if (amqpMessage.getDeliveryAnnotations == null) {
- amqpMessage.setDeliveryAnnotations(new DeliveryAnnotations(new MapEntries()))
- }
- amqpMessage.getDeliveryAnnotations.getValue.add(SENDER_CONTAINER_KEY, new AMQPString(amqp_connection.remoteContainerId()))
+ override def send_buffer_size = buffer_size
- val apolloDelivery = new Delivery
- apolloDelivery.message = AMQPMessage(amqpMessage)
- apolloDelivery.size = amqpDelivery.payload.length()
- // delivery.expiration = message.expiration
- // delivery.persistent = message.persistent
- // delivery.uow = uow
- // get(frame.headers, RETAIN).foreach { retain =>
- // delivery.retain = retain match {
- // case SET => RetainSet
- // case REMOVE => RetainRemove
- // case _ => RetainIgnore
- // }
- // }
+ override def connection = Some(AmqpProtocolHandler.this.connection)
- if (!amqpDelivery.isSettled) {
- apolloDelivery.ack = {
- (consumed, uow) =>
- queue <<| ^ {
- amqpDelivery.ack()
- }
- }
- }
+ override def connected() = is_connected = true
- target.offer(apolloDelivery)
- }
- }
+ override def dispatch_queue = queue
- target.refiller = ^ {
- pump
+ refiller = ^ {
+ resume_read
}
- options.setListener(new AMQPEndpoint.Listener {
- override def onTransfer() = pump
- override def onClosed(senderClosed: Boolean, error: Error) {
- if (error != null) {
- debug("Peer closed link due to error: %s", error)
- }
- host.dispatch_queue {
- host.router.disconnect(target.addresses, target)
+ val producer_overflow = new OverflowSink[Delivery](this)
+
+ def onMessage(delivery: engine.Delivery, message: AmqpMessage) = {
+ val d = new Delivery
+ d.message = message
+ d.size = message.encoded.length
+ var decoded = message.decoded
+ if (decoded.getProperties != null) {
+ if (decoded.getProperties.getAbsoluteExpiryTime != null) {
+ d.expiration = decoded.getProperties.getAbsoluteExpiryTime.getTime
+ }
+ }
+ if (decoded.getHeader != null) {
+ if (decoded.getHeader.getDurable != null) {
+ d.persistent = decoded.getHeader.getDurable.booleanValue()
+ }
+ if (decoded.getHeader.getDeliveryCount != null) {
+ d.redeliveries = decoded.getHeader.getDeliveryCount.shortValue()
}
}
- })
- // start with 0 credit window so that we don't receive any messages
- // until we have verified if that we can connect to the destination..
- options.credit = 0
- receiver = AMQP.createReceiver(options)
- callback.onSuccess(receiver)
-
- host.dispatch_queue {
- val rc = host.router.connect(target.addresses, target, security_context)
- queue {
- rc match {
- case Some(failure) =>
- receiver.detach(true, failure, null)
- case None =>
- // Add credit to start receiving messages.
- receiver.addCredit(50)
+ if (!delivery.remotelySettled()) {
+ d.ack = (result, uow) => {
+ queue {
+ result match {
+ case Consumed =>
+ delivery.settle()
+ case _ =>
+ async_die("uknown", "Unexpected NAK from broker")
+ }
+ }
}
}
+
+ producer_overflow.offer(d)
+ receiver.advance();
}
}
- var protocol_convert = "full"
- class AMQPConsumer(
- val subscription_id:String,
- val addresses:Array[_ <: BindAddress],
- val selector:(String, BooleanExpression),
- override val browser:Boolean,
- override val exclusive:Boolean,
- val include_seq:Option[Long],
- val from_seq:Long,
- override val close_on_drain:Boolean
- ) extends BaseRetained with DeliveryConsumer {
+ def close_with_error(link: Link, error_name: String = "", error_message: String = "") = {
+ link.asInstanceOf[LinkImpl].setLocalError(amqp_error(error_name, error_message))
+ link.close()
+ }
+
+ var link_counter = 0L
+ var producers = Map[Long, AmqpProducerRoute]()
+ var consumers = Map[Long, AmqpConsumer]()
+ var message_id_counter = 0L
+
+ class AmqpConsumer(sender: Sender,
+ val subscription_id: Long,
+ val addresses: Array[_ <: BindAddress],
+ val presettle: Boolean,
+ val selector: (String, BooleanExpression),
+ override val browser: Boolean,
+ override val exclusive: Boolean,
+ val include_seq: Option[Long],
+ val from_seq: Long,
+ override val close_on_drain: Boolean
+ ) extends BaseRetained with DeliveryConsumer {
- var sender:AMQPSender = _
-
- override def toString = "amqp subscription:"+subscription_id+", remote address: "+security_context.remote_address
+ override def toString = "amqp subscription:" + sender.getName + ", remote address: " + security_context.remote_address
///////////////////////////////////////////////////////////////////
// DeliveryConsumer Interface..
///////////////////////////////////////////////////////////////////
- def connect(p:DeliveryProducer) = new AMQPConsumerSession(p)
+ def connect(p: DeliveryProducer) = new AmqpConsumerSession(p)
def dispatch_queue = queue
override def connection = Option(AmqpProtocolHandler.this.connection)
+
def is_persistent = false
def matches(message: Delivery) = true
-
override def start_from_tail = from_seq == -1
- override def jms_selector = if(selector!=null){ selector._1 } else { null }
+
+ override def jms_selector = if (selector != null) {
+ selector._1
+ } else {
+ null
+ }
+
override def user = security_context.user
- var starting_seq:Long = 0L
- override def set_starting_seq(seq: Long):Unit = {
- starting_seq=seq
+ var starting_seq: Long = 0L
+
+ override def set_starting_seq(seq: Long): Unit = {
+ starting_seq = seq
}
- ///////////////////////////////////////////////////////////////////
- // Sink[(Session[Delivery], Delivery)] interface..
- ///////////////////////////////////////////////////////////////////
- object sink extends Sink[(Session[Delivery], Delivery)] {
- var refiller: Task = NOOP
- def full: Boolean = sender.full()
- def offer(value: (Session[Delivery], Delivery)): Boolean = {
- if( full ) {
- return false
- }
-
- val (session, delivery) = value
- val message = delivery.message
-
-
- val header = new Header()
- header.setDeliveryCount(delivery.redeliveries)
- header.setDurable(delivery.persistent)
- header.setFirstAcquirer(delivery.redeliveries == 0)
-
-// if( include_seq.isDefined ) {
-// frame = frame.append_headers((include_seq.get, ascii(delivery.seq.toString))::Nil)
-// }
-
- var annotated = if( message.codec eq AmqpMessageCodec ) {
- val original = message.asInstanceOf[AMQPMessage].annotated
- var annotated = new Envelope
- annotated.setHeader(header)
- annotated.setDeliveryAnnotations(original.getDeliveryAnnotations)
- annotated.setMessageAnnotations(original.getMessageAnnotations)
- annotated.setMessage(original.getMessage)
- annotated.setFooter(original.getFooter)
- annotated
- } else {
-
- val (body, content_type) = protocol_convert match{
- case "body" => (message.getBodyAs(classOf[Buffer]), "protocol/"+message.codec.id+";conv=body")
- case _ => (message.encoded, "protocol/"+message.codec.id)
- }
-
- val bare = new types.Message
- bare.setData(new Data(body))
- var properties = new Properties()
- properties.setContentType(ascii(content_type))
- if( delivery.expiration!= 0 ) {
- properties.setAbsoluteExpiryTime(new Date(delivery.expiration))
- }
- bare.setProperties(properties)
- var annotated = new Envelope
- annotated.setHeader(header)
- annotated.setMessage(bare)
- annotated
- }
-
- sender.send(annotated, null)
- messages_sent += 1
- return true
+ var nextTagId = 0L;
+ val tagCache = new util.HashSet[Array[Byte]]();
+ val unsettled = new HashMap[AsciiBuffer, org.apache.qpid.proton.engine.Delivery]()
+
+ def nextTag: Array[Byte] = {
+ var rc: Array[Byte] = null
+ if (tagCache != null && !tagCache.isEmpty()) {
+ val iterator = tagCache.iterator();
+ rc = iterator.next();
+ iterator.remove();
+ } else {
+ rc = java.lang.Long.toHexString(nextTagId).getBytes("UTF-8");
+ nextTagId += 1
}
+ return rc;
}
- ///////////////////////////////////////////////////////////////////
- // AMQPEndpoint.Listener interface..
- ///////////////////////////////////////////////////////////////////
- object endpoint_listener extends AMQPEndpoint.Listener {
- override def onTransfer = queue {
- sink.refiller.run()
- }
- override def onClosed(senderClosed: Boolean, error: Error) {
- if (error != null) {
- debug("Peer closed link due to error: %s", error)
- }
- host.dispatch_queue {
- host.router.unbind(addresses, AMQPConsumer.this, false, security_context)
- }
+ def checkinTag(data: Array[Byte]) = {
+ if (tagCache.size() < 1024) {
+ tagCache.add(data);
}
}
- val session_manager = new SessionSinkMux[Delivery](sink, queue, Delivery, 1, buffer_size) {
+ val redeliveries = new util.LinkedList[(Session[Delivery], Delivery)]()
+ val session_manager = new SessionSinkMux[Delivery](FullSink(), queue, Delivery, 1, buffer_size) {
override def time_stamp = broker.now
+
+ var currentBuffer: Buffer = _;
+ var currentDelivery: org.apache.qpid.proton.engine.Delivery = _;
+
+ override def drain_overflow: Unit = {
+ queue.assertExecuting()
+ var pumpNeeded = false
+ try {
+ while (true) {
+ while (currentBuffer != null) {
+ var sent = sender.send(currentBuffer.data, currentBuffer.offset, currentBuffer.length);
+ if (sent > 0) {
+ pumpNeeded = true
+ currentBuffer.moveHead(sent);
+ if (currentBuffer.length == 0) {
+ if (presettle) {
+ settle(currentDelivery, Consumed);
+ } else {
+ sender.advance();
+ }
+ currentBuffer = null;
+ currentDelivery = null;
+ }
+ } else {
+ return;
+ }
+ }
+
+ val value = poll
+ if (value == null) {
+ return
+ } else {
+ val (session, delivery) = value
+ val message = if (delivery.message.codec == AmqpMessageCodec) {
+ delivery.message.asInstanceOf[AmqpMessage].decoded
+ } else {
+ val (body, content_type) = protocol_convert match {
+ case "body" => (delivery.message.getBodyAs(classOf[Buffer]), "protocol/" + delivery.message.codec.id + ";conv=body")
+ case _ => (delivery.message.encoded, "protocol/" + delivery.message.codec.id())
+ }
+
+ message_id_counter += 1
+
+ val message = new org.apache.qpid.proton.message.Message
+ message.setMessageId(session_id.get + message_id_counter)
+ message.setBody(new Data(new Binary(body.data, body.offset, body.length)))
+ message.setContentType(content_type)
+ message.setDurable(delivery.persistent)
+ if (delivery.expiration > 0) {
+ message.setExpiryTime(delivery.expiration)
+ }
+ message
+ }
+
+ if (delivery.redeliveries > 0) {
+ message.setDeliveryCount(delivery.redeliveries)
+ message.setFirstAcquirer(false)
+ }
+
+ currentBuffer = new AmqpMessage(null, message).encoded;
+ if (presettle) {
+ currentDelivery = sender.delivery(EMPTY_BYTE_ARRAY, 0, 0);
+ } else {
+ val tag = nextTag
+ currentDelivery = sender.delivery(tag, 0, tag.length);
+ unsettled.put(new AsciiBuffer(tag), currentDelivery)
+ }
+ currentDelivery.setContext(value);
+ }
+ }
+ } finally {
+ if( pumpNeeded ) {
+ pump_out
+ }
+ }
+ }
+
+ override def poll: (Session[Delivery], Delivery) = {
+ if( redeliveries.isEmpty ) {
+ super.poll
+ } else {
+ redeliveries.removeFirst()
+ }
+ }
+
+ private def settle(delivery: org.apache.qpid.proton.engine.Delivery, ackType: DeliveryResult) {
+ val tag: Array[Byte] = delivery.getTag
+ if (tag != null && tag.length > 0) {
+ checkinTag(tag)
+ unsettled.remove(new AsciiBuffer(tag))
+ }
+ // Don't ack.. redeliver
+ val (session, apollo_delivery) = delivery.getContext.asInstanceOf[(Session[Delivery], Delivery)]
+ if (ackType == null) {
+ delivery.settle
+ redeliveries.addFirst((session, apollo_delivery))
+ drain_overflow
+ } else {
+
+ val remoteState = delivery.getRemoteState
+ if (remoteState != null && remoteState.isInstanceOf[TransactionalState]) {
+ val s: TransactionalState = remoteState.asInstanceOf[TransactionalState]
+ val txid = toLong(s.getTxnId)
+ async_die("txs-not-supported", "Transactions not yet supported")
+ return
+ }
+
+ if (apollo_delivery.ack != null) {
+ apollo_delivery.ack(ackType, null)
+ }
+ delivery.settle
+ pump_out
+ }
+ }
}
- class AMQPConsumerSession(p:DeliveryProducer) extends DeliverySession with SessionSinkFilter[Delivery] {
+ class AmqpConsumerSession(p: DeliveryProducer) extends DeliverySession with SessionSinkFilter[Delivery] {
def producer = p
- def consumer = AMQPConsumer.this
+ def consumer = AmqpConsumer.this
val downstream = session_manager.open(producer.dispatch_queue)
// Delegate all the flow control stuff to the session
@@ -588,8 +916,8 @@ class AmqpProtocolHandler extends Protoc
rc
}
- def offer(delivery:Delivery) = {
- if( full ) {
+ def offer(delivery: Delivery) = {
+ if (full) {
false
} else {
delivery.message.retain()
@@ -599,107 +927,38 @@ class AmqpProtocolHandler extends Protoc
}
}
- def close {}
- }
- }
-
- def attach_receiver(attach: Attach, address: String, requested_addresses:Array[SimpleAddress], callback: Callback[AMQPEndpoint]) = try {
- val options = new AMQPSenderOptions();
-
- val src = attach.getSource.asInstanceOf[Source]
- src.setDefaultOutcome(new Released())
-
- if (attach.getSndSettleMode == SenderSettleMode.SETTLED.getValue) {
- // if we are settling... then no other outcomes are possible..
- src.setOutcomes(Array())
- } else {
- src.setOutcomes(Array(
- new AMQPSymbol(Accepted.SYMBOLIC_ID),
- new AMQPSymbol(Rejected.SYMBOLIC_ID),
- new AMQPSymbol(Released.SYMBOLIC_ID),
- new AMQPSymbol(Modified.SYMBOLIC_ID)
- ))
- }
- options.setSource(src)
-
- options.setTarget(attach.getTarget.asInstanceOf[Target])
- options.setName(attach.getName)
- options.setSenderSettleMode(SenderSettleMode.valueOf(attach.getSndSettleMode))
- options.setReceiverSettleMode(ReceiverSettleMode.valueOf(attach.getRcvSettleMode))
- options.setMaxMessageSize(10 * 1024 * 1024);
-
-
- val subscription_id = attach.getName
- var persistent = false
- var browser = false
- var browser_end = browser && true
- var exclusive = !browser && false
- var include_seq: Option[Long] = None
- val from_seq_opt: Option[Long] = None
-
- def is_multi_destination = if (requested_addresses.length > 1) {
- true
- } else {
- PathParser.containsWildCards(requested_addresses(0).path)
- }
- if (from_seq_opt.isDefined && is_multi_destination) {
- die("The from-seq header is only supported when you subscribe to one destination");
- }
-
- val selector = attach.getProperties match {
- case null => null
- case properties =>
- properties.get(new AMQPString("selector")) match {
- case null => null
- case x:AMQPString =>
- try {
- (x.getValue, SelectorParser.parse(x.getValue))
- } catch {
- case e: FilterException =>
- die("Invalid selector expression '%s': ".format(x, e.getMessage))
- }
- case x =>
- die("Invalid selector expression '%s'".format(x))
- }
- }
-
- val addresses:Array[_ <: BindAddress] = if (persistent) {
- val dsubs = ListBuffer[BindAddress]()
- val topics = ListBuffer[BindAddress]()
- requested_addresses.foreach { address =>
- address.domain match {
- case "dsub" => dsubs += address
- case "topic" => topics += address
- case _ => die("A persistent subscription can only be used on a topic destination")
+ def close {
+ session_manager.close(downstream, (delivery)=>{
+ if( delivery.ack !=null ) {
+ delivery.ack(Undelivered, null)
}
+ })
}
- val s = if (selector == null) null else selector._1
- dsubs += SubscriptionAddress(destination_parser.decode_path(subscription_id), s, topics.toArray)
- dsubs.toArray
- } else {
- requested_addresses
}
- val from_seq = from_seq_opt.getOrElse(0L)
-
- val source = new AMQPConsumer(subscription_id, addresses, selector, browser, exclusive, include_seq, from_seq, browser_end);
- options.setListener(source.endpoint_listener)
- source.sender = AMQP.createSender(options)
+ override def dispose() = queue {
+ def reject(value:(Session[Delivery], Delivery), result:DeliveryResult) ={
+ val (_, delivery) = value
+ if( delivery.ack!=null ) {
+ delivery.ack(result, null)
+ }
+ }
- host.dispatch_queue {
- val rc = host.router.bind(addresses, source, security_context)
- source.release
- queue {
- rc match {
- case Some(failure) =>
- source.sender.detach(true, failure, null)
- case None =>
+ for( v <- unsettled.values ) {
+ val value = v.getContext.asInstanceOf[(Session[Delivery], Delivery)]
+ if( value!=null ) {
+ v.setContext(null)
+ reject(value, Delivered)
}
}
+
+ var next = session_manager.poll
+ while( next!=null ) {
+ reject(next, Undelivered)
+ }
+ super.dispose()
}
- callback.onSuccess(source.sender)
- } catch {
- case e => callback.onFailure(e)
+
}
}
Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpConnection.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpConnection.java?rev=1405942&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpConnection.java (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpConnection.java Mon Nov 5 20:32:47 2012
@@ -0,0 +1,283 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.activemq.apollo.amqp.hawtdispatch;
+
+import org.apache.qpid.proton.engine.*;
+import org.apache.qpid.proton.engine.impl.ConnectionImpl;
+import org.apache.qpid.proton.engine.impl.ProtocolTracer;
+import org.apache.qpid.proton.engine.impl.TransportImpl;
+import org.apache.qpid.proton.framing.TransportFrame;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtdispatch.Dispatch;
+import org.fusesource.hawtdispatch.DispatchQueue;
+import org.fusesource.hawtdispatch.Task;
+import org.fusesource.hawtdispatch.transport.DefaultTransportListener;
+import org.fusesource.hawtdispatch.transport.Transport;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Integrates a proton transport/connection /w HawtDispatch transports.
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class AmqpConnection {
+
+ static final ExecutorService blockingExecutor = Executors.newCachedThreadPool();
+ final ConnectionImpl protonConnection = new ConnectionImpl();
+
+ Transport hawtdispatchTransport;
+ TransportImpl protonTransport = new TransportImpl();
+ HashSet<Object> endpointsBeingProcessed = new HashSet<Object>();
+
+ public Sasl sasl;
+
+ public void bind(Transport transport) throws Exception {
+ this.protonTransport = new TransportImpl();
+ this.protonTransport.setProtocolTracer(new ProtocolTracer() {
+ public void receivedFrame(TransportFrame transportFrame) {
+ System.out.println(String.format("RECV: %s:%05d | %s", hawtdispatchTransport.getRemoteAddress(), transportFrame.getChannel(), transportFrame.getBody()));
+ }
+
+ public void sentFrame(TransportFrame transportFrame) {
+ System.out.println(String.format("SEND: %s:%05d | %s", hawtdispatchTransport.getRemoteAddress(), transportFrame.getChannel(), transportFrame.getBody()));
+ }
+ });
+ this.protonTransport.bind(protonConnection);
+
+ this.hawtdispatchTransport = transport;
+ this.hawtdispatchTransport.setBlockingExecutor(blockingExecutor);
+ if( this.hawtdispatchTransport.getProtocolCodec()==null ) {
+ this.hawtdispatchTransport.setProtocolCodec(new AmqpProtocolCodec());
+ }
+ this.hawtdispatchTransport.setTransportListener(new DefaultTransportListener() {
+
+ @Override
+ public void onTransportConnected() {
+ hawtdispatchTransport.resumeRead();
+ listener.processTransportConnected();
+ }
+
+ @Override
+ public void onTransportCommand(Object command) {
+ try {
+ Buffer buffer;
+ if( command.getClass() == AmqpHeader.class ) {
+ AmqpHeader header = (AmqpHeader)command;
+ switch( header.getProtocolId() ) {
+ case 0:
+ break; // nothing to do..
+ case 3: // Client will be using SASL for auth..
+ sasl = listener.processSaslConnect(protonTransport);
+ break;
+ default:
+ }
+ buffer = header.getBuffer();
+ } else {
+ buffer = (Buffer) command;
+ }
+ protonTransport.input(buffer.data, buffer.offset, buffer.length);
+ fireListenerEvents();
+ pumpOut();
+ } catch (Exception e) {
+ listener.processFailure(e);
+ }
+ }
+
+ public void onRefill() {
+ pumpOut();
+ }
+
+ @Override
+ public void onTransportFailure(IOException error) {
+ stop(Dispatch.NOOP);
+ listener.processTransportFailure(error);
+ }
+ });
+ }
+
+ boolean hawtdispatchClosed = false;
+
+ public static final EnumSet<EndpointState> UNINITIALIZED_SET = EnumSet.of(EndpointState.UNINITIALIZED);
+ public static final EnumSet<EndpointState> INITIALIZED_SET = EnumSet.complementOf(UNINITIALIZED_SET);
+ public static final EnumSet<EndpointState> ACTIVE_STATE = EnumSet.of(EndpointState.ACTIVE);
+ public static final EnumSet<EndpointState> CLOSED_STATE = EnumSet.of(EndpointState.CLOSED);
+
+ public void pumpOut() {
+ if(hawtdispatchClosed) {
+ return;
+ }
+ int size = hawtdispatchTransport.getProtocolCodec().getWriteBufferSize();
+ byte data[] = new byte[size];
+ boolean done = false;
+ while( !done && !hawtdispatchTransport.full() ) {
+ int count = protonTransport.output(data, 0, size);
+ if( count > 0 ) {
+ hawtdispatchTransport.offer(new Buffer(data, 0, count));
+ } else {
+ done = true;
+ }
+ }
+ if( !hawtdispatchTransport.full() ) {
+ listener.processRefill();
+ }
+ }
+
+ AmqpListener listener = new AmqpListener();
+
+ public DispatchQueue queue() {
+ return getHawtdispatchTransport().getDispatchQueue();
+ }
+
+ class ProcessedTask extends Task {
+ private final Object value;
+
+ ProcessedTask(Object value) {
+ this.value = value;
+ }
+
+ @Override
+ public void run() {
+ queue().assertExecuting();
+ endpointsBeingProcessed.remove(value);
+ pumpOut();
+ }
+ }
+
+
+ public void fireListenerEvents() {
+
+ if( sasl!=null ) {
+ sasl = listener.processSaslEvent(sasl);
+ if( sasl==null ) {
+ // once sasl handshake is done.. we need to read the protocol header again.
+ ((AmqpProtocolCodec)this.hawtdispatchTransport.getProtocolCodec()).readProtocolHeader();
+ }
+ }
+
+ if(protonConnection.getLocalState() == EndpointState.UNINITIALIZED && protonConnection.getRemoteState() != EndpointState.UNINITIALIZED && !endpointsBeingProcessed.contains(protonConnection))
+ {
+ endpointsBeingProcessed.add(protonConnection);
+ listener.processConnectionOpen(protonConnection, new ProcessedTask(protonConnection));
+ }
+
+ Session session = protonConnection.sessionHead(UNINITIALIZED_SET, INITIALIZED_SET);
+ while(session != null)
+ {
+ if( !endpointsBeingProcessed.contains(session) ) {
+ endpointsBeingProcessed.add(session);
+ listener.proccessSessionOpen(session, new ProcessedTask(session));
+ }
+ session = session.next(UNINITIALIZED_SET, INITIALIZED_SET);
+ }
+
+ Link link = protonConnection.linkHead(UNINITIALIZED_SET, INITIALIZED_SET);
+ while(link != null)
+ {
+ if( !endpointsBeingProcessed.contains(link) ) {
+ endpointsBeingProcessed.add(link);
+ link.setSource(link.getRemoteSource());
+ link.setTarget(link.getRemoteTarget());
+ ProcessedTask onComplete = new ProcessedTask(link);
+ if( link instanceof Sender) {
+ listener.processSenderOpen((Sender) link, onComplete);
+ } else {
+ listener.processReceiverOpen((Receiver) link, onComplete);
+ }
+ }
+ link = link.next(UNINITIALIZED_SET, INITIALIZED_SET);
+ }
+
+
+ Delivery delivery = protonConnection.getWorkHead();
+ while(delivery != null)
+ {
+ if(delivery.getLink() instanceof Receiver) {
+ listener.processDelivery((Receiver) delivery.getLink(), delivery);
+ } else {
+ listener.processDelivery((Sender) delivery.getLink(), delivery);
+ }
+ delivery = delivery.getWorkNext();
+ }
+
+ link = protonConnection.linkHead(ACTIVE_STATE, CLOSED_STATE);
+ while(link != null)
+ {
+ if( !endpointsBeingProcessed.contains(link) ) {
+ endpointsBeingProcessed.add(link);
+ ProcessedTask onComplete = new ProcessedTask(link);
+ if( link instanceof Receiver) {
+ listener.processReceiverClose((Receiver) link, onComplete);
+ } else {
+ listener.processSenderClose((Sender) link, onComplete);
+ }
+ }
+ link = link.next(ACTIVE_STATE, CLOSED_STATE);
+ }
+
+ session = protonConnection.sessionHead(ACTIVE_STATE, CLOSED_STATE);
+ while(session != null)
+ {
+ //TODO - close links?
+ if( !endpointsBeingProcessed.contains(session) ) {
+ endpointsBeingProcessed.add(session);
+ listener.processSessionClose(session, new ProcessedTask(session));
+ }
+ session = session.next(ACTIVE_STATE, CLOSED_STATE);
+ }
+ if(protonConnection.getLocalState() == EndpointState.ACTIVE && protonConnection.getRemoteState() == EndpointState.CLOSED && !endpointsBeingProcessed.contains(protonConnection))
+ {
+ listener.processConnectionClose(protonConnection, new ProcessedTask(protonConnection));
+ protonConnection.close();
+ }
+
+ }
+
+ public AmqpListener getListener() {
+ return listener;
+ }
+
+ public void setListener(AmqpListener listener) {
+ this.listener = listener;
+ }
+
+ public void start(Task onComplete) {
+ hawtdispatchTransport.start(onComplete);
+ }
+
+ public void stop(Task onComplete) {
+ hawtdispatchClosed = true;
+ hawtdispatchTransport.stop(onComplete);
+ }
+
+ public ConnectionImpl getProtonConnection() {
+ return protonConnection;
+ }
+
+ public Transport getHawtdispatchTransport() {
+ return hawtdispatchTransport;
+ }
+
+}
Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpHeader.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpHeader.java?rev=1405942&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpHeader.java (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpHeader.java Mon Nov 5 20:32:47 2012
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.amqp.hawtdispatch;
+
+import org.fusesource.hawtbuf.Buffer;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class AmqpHeader {
+
+ static final Buffer PREFIX = new Buffer(new byte[]{
+ 'A', 'M', 'Q', 'P'
+ });
+
+ private Buffer buffer;
+
+ public AmqpHeader(){
+ this(new Buffer(new byte[]{
+ 'A', 'M', 'Q', 'P', 0, 1, 0, 0
+ }));
+ }
+
+ public AmqpHeader(Buffer buffer){
+ setBuffer(buffer);
+ }
+
+ public int getProtocolId() {
+ return buffer.get(4) & 0xFF;
+ }
+ public void setProtocolId(int value) {
+ buffer.data[buffer.offset+4] = (byte) value;
+ }
+
+ public int getMajor() {
+ return buffer.get(5) & 0xFF;
+ }
+ public void setMajor(int value) {
+ buffer.data[buffer.offset+5] = (byte) value;
+ }
+
+ public int getMinor() {
+ return buffer.get(6) & 0xFF;
+ }
+ public void setMinor(int value) {
+ buffer.data[buffer.offset+6] = (byte) value;
+ }
+
+ public int getRevision() {
+ return buffer.get(7) & 0xFF;
+ }
+ public void setRevision(int value) {
+ buffer.data[buffer.offset+7] = (byte) value;
+ }
+
+ public Buffer getBuffer() {
+ return buffer;
+ }
+ public void setBuffer(Buffer value) {
+ if( !value.startsWith(PREFIX) || value.length()!=8 ) {
+ throw new IllegalArgumentException("Not an AMQP header buffer");
+ }
+ buffer = value.buffer();
+ }
+
+
+ @Override
+ public String toString() {
+ return buffer.toString();
+ }
+}
Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpListener.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpListener.java?rev=1405942&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpListener.java (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpListener.java Mon Nov 5 20:32:47 2012
@@ -0,0 +1,78 @@
+package org.apache.activemq.apollo.amqp.hawtdispatch;
+
+import org.apache.qpid.proton.engine.*;
+import org.apache.qpid.proton.engine.impl.TransportImpl;
+import org.apache.qpid.proton.type.messaging.Accepted;
+import org.fusesource.hawtdispatch.Task;
+
+import java.io.IOException;
+
+/**
+* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+*/
+public class AmqpListener {
+
+ public Sasl processSaslConnect(TransportImpl protonTransport) {
+ return null;
+ }
+
+ public Sasl processSaslEvent(Sasl sasl) {
+ return sasl;
+ }
+
+ public void processConnectionOpen(Connection conn, Task onComplete) {
+ conn.open();
+ onComplete.run();
+ }
+ public void processConnectionClose(Connection conn, Task onComplete){
+ conn.close();
+ onComplete.run();
+ }
+
+ public void proccessSessionOpen(Session session, Task onComplete){
+ session.open();
+ onComplete.run();
+ }
+ public void processSessionClose(Session session, Task onComplete){
+ session.close();
+ onComplete.run();
+ }
+
+ public void processSenderOpen(Sender sender, Task onComplete) {
+ sender.close();
+ onComplete.run();
+ }
+ public void processSenderClose(Sender sender, Task onComplete){
+ sender.close();
+ onComplete.run();
+ }
+
+ public void processReceiverOpen(Receiver receiver, Task onComplete) {
+ receiver.open();
+ onComplete.run();
+ }
+ public void processReceiverClose(Receiver receiver, Task onComplete) {
+ receiver.close();
+ onComplete.run();
+ }
+
+ public void processDelivery(Receiver receiver, Delivery delivery){
+ }
+
+ public void processDelivery(Sender sender, Delivery delivery) {
+ }
+
+ public void processFailure(Throwable e) {
+ e.printStackTrace();
+ }
+
+ public void processRefill() {
+ }
+
+ public void processTransportConnected() {
+ }
+
+ public void processTransportFailure(IOException e) {
+ e.printStackTrace();
+ }
+}
Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpProtocolCodec.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpProtocolCodec.java?rev=1405942&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpProtocolCodec.java (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpProtocolCodec.java Mon Nov 5 20:32:47 2012
@@ -0,0 +1,99 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.activemq.apollo.amqp.hawtdispatch;
+
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtdispatch.transport.AbstractProtocolCodec;
+
+import java.io.IOException;
+
+/**
+ * A HawtDispatch protocol codec that encodes/decodes AMQP 1.0 frames.
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class AmqpProtocolCodec extends AbstractProtocolCodec {
+
+ @Override
+ protected void encode(Object object) throws IOException {
+ nextWriteBuffer.write((Buffer) object);
+ }
+
+ @Override
+ protected Action initialDecodeAction() {
+ return new Action() {
+ public Object apply() throws IOException {
+ Buffer magic = readBytes(8);
+ if (magic != null) {
+ nextDecodeAction = readFrameSize;
+ return new AmqpHeader(magic);
+ } else {
+ return null;
+ }
+ }
+ };
+ }
+
+ private final Action readFrameSize = new Action() {
+ public Object apply() throws IOException {
+ Buffer sizeBytes = peekBytes(4);
+ if (sizeBytes != null) {
+ int size = sizeBytes.bigEndianEditor().readInt();
+ if (size < 8) {
+ throw new IOException(String.format("specified frame size %d smaller than minimum frame size", size));
+ }
+ // TODO: check frame min and max size..
+ nextDecodeAction = readFrame(size);
+ return nextDecodeAction.apply();
+ } else {
+ return null;
+ }
+ }
+ };
+
+
+ private final Action readFrame(final int size) {
+ return new Action() {
+ public Object apply() throws IOException {
+ Buffer frameData = readBytes(size);
+ if (frameData != null) {
+ nextDecodeAction = readFrameSize;
+ return frameData;
+ } else {
+ return null;
+ }
+ }
+ };
+ }
+
+ public int getReadBytesPendingDecode() {
+ return readBuffer.position() - readStart;
+ }
+
+ public void skipProtocolHeader() {
+ nextDecodeAction = readFrameSize;
+ }
+
+ public void readProtocolHeader() {
+ nextDecodeAction = initialDecodeAction();
+ }
+
+}