You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/11/13 18:41:05 UTC
svn commit: r1408852 [1/2] - in /activemq/activemq-apollo/trunk:
apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/
apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/
apollo-amqp/src/main/scala/org/apache/activemq/apollo/...
Author: chirino
Date: Tue Nov 13 17:41:01 2012
New Revision: 1408852
URL: http://svn.apache.org/viewvc?rev=1408852&view=rev
Log:
More progress on the proton based AMQP impl. Started extracting out a client API which builds on the proton lib which can be used in hawtdispatch client scenarios.
Added:
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpConnectOptions.java
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpConnection.java
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpDeliveryListener.java
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpEndpointBase.java
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpLink.java
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpReceiver.java
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpSender.java
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpSession.java
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/Callback.java
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/ChainedCallback.java
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/DeliveryAttachment.java
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/Future.java
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/MessageDelivery.java
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/Promise.java
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/QoS.java
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/TransportState.java
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpHeader.java
- copied, changed from r1406782, activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpHeader.java
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpListener.java
- copied, changed from r1406782, activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpListener.java
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpProtocolCodec.java
- copied, changed from r1406782, activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpProtocolCodec.java
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpTransport.java
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/Defer.java
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/DroppingWritableBuffer.java
- copied, changed from r1406782, activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/DroppingWritableBuffer.java
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/EndpointContext.java
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/Support.java
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/Watch.java
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/WatchBase.java
activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/jul.properties
activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/test/AmqpConnectionTest.scala
Removed:
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpConnection.java
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpHeader.java
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpListener.java
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpProtocolCodec.java
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/DroppingWritableBuffer.java
activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/hawtdispatch/TransportConnectionTest.java
Modified:
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpMessage.scala
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocol.scala
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/test/QpidJmsTest.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpMessage.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpMessage.scala?rev=1408852&r1=1408851&r2=1408852&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpMessage.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpMessage.scala Tue Nov 13 17:41:01 2012
@@ -17,9 +17,9 @@
package org.apache.activemq.apollo.amqp
+import hawtdispatch.impl.DroppingWritableBuffer
import org.apache.activemq.apollo.broker.protocol
import protocol.{MessageCodecFactory, MessageCodec}
-import hawtdispatch.DroppingWritableBuffer
import java.nio.ByteBuffer
import org.apache.qpid.proton.codec.{WritableBuffer, CompositeWritableBuffer}
import org.fusesource.hawtbuf.Buffer._
Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocol.scala?rev=1408852&r1=1408851&r2=1408852&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocol.scala Tue Nov 13 17:41:01 2012
@@ -17,7 +17,7 @@
package org.apache.activemq.apollo.amqp
import _root_.org.fusesource.hawtbuf._
-import hawtdispatch.AmqpProtocolCodec
+import hawtdispatch.impl.AmqpProtocolCodec
import org.apache.activemq.apollo.broker._
import org.apache.activemq.apollo.broker.protocol.Protocol
Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala?rev=1408852&r1=1408851&r2=1408852&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala Tue Nov 13 17:41:01 2012
@@ -25,17 +25,18 @@ import org.fusesource.hawtbuf._
import org.apache.activemq.apollo.util._
import org.apache.activemq.apollo.dto._
import org.apache.activemq.apollo.broker._
-import org.apache.activemq.apollo.util.path.{PathParser, Path, LiteralPart}
+import org.apache.activemq.apollo.util.path.{PathParser, Path}
+import path.LiteralPart
import protocol.ProtocolHandler
import org.apache.activemq.apollo.broker.security.SecurityContext
import org.apache.activemq.apollo.amqp.dto._
-import hawtdispatch.{AmqpProtocolCodec, AmqpListener, AmqpConnection}
+import hawtdispatch.impl.{AmqpProtocolCodec, AmqpListener, AmqpTransport}
import org.apache.qpid.proton.engine
-import engine.impl.{LinkImpl, TransportImpl}
-import engine.{Receiver, Sasl, Sender, Link, EndpointError, EndpointState}
+import org.apache.qpid.proton.engine.impl.{ProtocolTracer, DeliveryImpl, LinkImpl, TransportImpl}
+import org.apache.qpid.proton.engine._
import org.fusesource.hawtbuf.Buffer._
import org.apache.qpid.proton.`type`.transaction.{TransactionalState, Coordinator}
-import org.apache.qpid.proton.`type`.messaging.{Data, Source, Target}
+import org.apache.qpid.proton.`type`.messaging.{Accepted, Data, Source, Target, Modified}
import org.apache.activemq.apollo.broker.Delivery
import org.apache.activemq.apollo.filter.{FilterException, BooleanExpression}
import org.apache.qpid.proton.`type`.{Symbol => AmqpSymbol, Binary, DescribedType}
@@ -43,8 +44,11 @@ import org.apache.activemq.apollo.select
import org.apache.qpid.proton.`type`.transport.SenderSettleMode
import java.util
import java.io.IOException
+import scala.Some
import org.apache.activemq.apollo.broker.FullSink
import org.apache.activemq.apollo.broker.SubscriptionAddress
+import org.apache.activemq.apollo.broker.Session
+import org.apache.qpid.proton.framing.TransportFrame
object AmqpProtocolHandler extends Log {
@@ -94,6 +98,7 @@ class AmqpProtocolHandler extends Protoc
var config: AmqpDTO = _
var dead = false
var protocol_convert = "full"
+ var prefetch = 100
def session_id = security_context.session_id
@@ -171,11 +176,11 @@ class AmqpProtocolHandler extends Protoc
// heart_beat_monitor.resumeRead
}
- val amqp_connection = new AmqpConnection()
+ var amqp_connection:AmqpTransport = _
def codec = connection.transport.getProtocolCodec.asInstanceOf[AmqpProtocolCodec]
- def proton = amqp_connection.getProtonConnection
+ def proton = amqp_connection.connection()
def pump_out = {
queue.assertExecuting()
@@ -196,8 +201,21 @@ class AmqpProtocolHandler extends Protoc
val connector_config = connection.connector.config.asInstanceOf[AcceptingConnectorDTO]
config = connector_config.protocols.find(_.isInstanceOf[AmqpDTO]).map(_.asInstanceOf[AmqpDTO]).getOrElse(new AmqpDTO)
- amqp_connection.bind(connection.transport)
+ amqp_connection = AmqpTransport.accept(connection.transport)
amqp_connection.setListener(amqp_listener)
+ if( false ) {
+ amqp_connection.setProtocolTracer(new ProtocolTracer() {
+ def receivedFrame(transportFrame: TransportFrame) = {
+ // println("RECV: %s | %s".format(security_context.remote_address, transportFrame.getBody()));
+ // connection_log.trace("RECV: %s | %s", security_context.remote_address, transportFrame.getBody());
+ }
+ def sentFrame(transportFrame: TransportFrame) = {
+ println("SEND: %s | %s".format(security_context.remote_address, transportFrame.getBody()));
+ // connection_log.trace("SEND: %s | %s", security_context.remote_address, transportFrame.getBody());
+ }
+ });
+ }
+ connection.transport.resumeRead()
}
val amqp_listener = new AmqpListener() {
@@ -209,7 +227,6 @@ class AmqpProtocolHandler extends Protoc
sasl
}
-
override def processSaslEvent(sasl: Sasl): Sasl = {
// Lets try to complete the sasl handshake.
if (sasl.getRemoteMechanisms().length > 0) {
@@ -238,14 +255,45 @@ class AmqpProtocolHandler extends Protoc
}
}
- override def processConnectionOpen(conn: engine.Connection, onComplete: Task) {
- println("connection opened.")
+ override def processRemoteOpen(endpoint: Endpoint, onComplete: Task) {
+ endpoint match {
+ case connection:engine.Connection =>
+ processConnectionOpen(connection, onComplete)
+ case session:engine.Session =>
+ session.open(); onComplete.run()
+ case sender:engine.Sender =>
+ processSenderOpen(sender, onComplete)
+ case receiver:engine.Receiver =>
+ processReceiverOpen(receiver, onComplete)
+ case _ =>
+ async_die("system-error", "Unknown Endpoint")
+ }
+ }
+
+ override def processRemoteClose(endpoint: Endpoint, onComplete: Task) {
+ endpoint match {
+ case connection:engine.Connection =>
+ processConnectionClose(connection, onComplete)
+ case session:engine.Session =>
+ session.close(); onComplete.run()
+ case sender:engine.Sender =>
+ processSenderClose(sender, onComplete)
+ case receiver:engine.Receiver =>
+ processReceiverClose(receiver, onComplete)
+ case _ =>
+ async_die("system-error", "Unknown Endpoint")
+ }
+ }
+
+
+ def processConnectionOpen(conn: engine.Connection, onComplete: Task) {
security_context.session_id = Some(conn.getRemoteContainer())
suspend_read("host lookup")
broker.dispatch_queue {
val virtual_host = proton.getRemoteHostname match {
case null => broker.default_virtual_host
+ case "" => broker.default_virtual_host
case host => broker.get_virtual_host(ascii(host))
}
queue {
@@ -292,7 +340,7 @@ class AmqpProtocolHandler extends Protoc
}
}
- override def processReceiverOpen(receiver: Receiver, onComplete: Task) {
+ def processReceiverOpen(receiver: Receiver, onComplete: Task) {
// Client producer is attaching..
receiver.setSource(receiver.getRemoteSource());
receiver.setTarget(receiver.getRemoteTarget());
@@ -323,18 +371,15 @@ class AmqpProtocolHandler extends Protoc
host.dispatch_queue {
val rc = host.router.connect(route.addresses, route, security_context)
queue {
- println(rc)
rc match {
case Some(failure) =>
- println(failure)
close_with_error(receiver, "Could not connect", failure)
onComplete.run()
case None =>
- println("ok")
// If the remote has not closed on us yet...
if (receiver.getRemoteState == EndpointState.ACTIVE) {
- receiver.setContext(route)
- receiver.flow(1024 * 64);
+ set_attachment(receiver, route)
+ receiver.flow(prefetch);
receiver.open()
} else {
receiver.close()
@@ -346,14 +391,22 @@ class AmqpProtocolHandler extends Protoc
}
}
- override def processReceiverClose(receiver: Receiver, onComplete: Task) {
- receiver.getContext match {
+ def get_attachment(endpoint:Endpoint):AnyRef = {
+ amqp_connection.context(endpoint).getAttachment()
+ }
+
+ def set_attachment(endpoint:Endpoint, value:AnyRef) = {
+ amqp_connection.context(endpoint).setAttachment(value)
+ }
+
+ def processReceiverClose(receiver: Receiver, onComplete: Task) {
+ get_attachment(receiver) match {
case null =>
receiver.close()
onComplete.run()
case route: AmqpProducerRoute =>
// Lets disconnect the route.
- receiver.setContext(null)
+ set_attachment(receiver, null)
host.dispatch_queue {
host.router.disconnect(route.addresses, route)
queue {
@@ -366,14 +419,18 @@ class AmqpProtocolHandler extends Protoc
}
}
- override def processDelivery(receiver: Receiver, delivery: engine.Delivery) {
- receiver.getContext match {
+ override def processDelivery(delivery: engine.Delivery) {
+ get_attachment(delivery.getLink) match {
case null =>
- case route: AmqpProducerRoute => route.process(delivery)
+ case route: AmqpProducerRoute =>
+ route.process(delivery.asInstanceOf[DeliveryImpl])
+ case consumer: AmqpConsumer =>
+ consumer.process(delivery.asInstanceOf[DeliveryImpl])
+ // TODO
}
}
- override def processSenderOpen(sender: Sender, onComplete: Task) {
+ def processSenderOpen(sender: Sender, onComplete: Task) {
// Client consumer is attaching..
sender.setSource(sender.getRemoteSource());
sender.setTarget(sender.getRemoteTarget());
@@ -468,7 +525,7 @@ class AmqpProtocolHandler extends Protoc
close_with_error(sender, "subscribe-failed", reason)
onComplete.run()
case None =>
- sender.setContext(consumer)
+ set_attachment(sender, consumer)
sender.open()
onComplete.run()
}
@@ -476,6 +533,11 @@ class AmqpProtocolHandler extends Protoc
}
}
+ def processSenderClose(sender: Sender, onComplete: Task) {
+ sender.close()
+ onComplete.run()
+ }
+
var gracefully_closed = false
override def processFailure(e: Throwable) {
var msg = "Internal Server Error: " + e
@@ -493,10 +555,11 @@ class AmqpProtocolHandler extends Protoc
on_transport_disconnected()
}
- override def processConnectionClose(conn: engine.Connection, onComplete: Task) {
+ def processConnectionClose(conn: engine.Connection, onComplete: Task) {
gracefully_closed = true
on_transport_disconnected()
- super.processConnectionClose(conn, onComplete)
+ conn.close()
+ onComplete.run()
queue.after(die_delay, TimeUnit.MILLISECONDS) {
connection.stop(NOOP)
}
@@ -574,7 +637,7 @@ class AmqpProtocolHandler extends Protoc
def receiver: Receiver
- def process(delivery: engine.Delivery): Unit = {
+ def process(delivery: DeliveryImpl): Unit = {
if (!delivery.isReadable()) {
System.out.println("it was not readable!");
return;
@@ -599,15 +662,12 @@ class AmqpProtocolHandler extends Protoc
}
}
- receiver.advance();
- delivery.settle(); // TODO: do this once accepted by the broker.
-
val buffer = current.toBuffer();
current = null;
onMessage(delivery, new AmqpMessage(buffer));
}
- def onMessage(delivery: engine.Delivery, buffer: AmqpMessage): Unit
+ def onMessage(delivery: DeliveryImpl, buffer: AmqpMessage): Unit
}
def decode_target(target: Target) = {
@@ -669,13 +729,17 @@ class AmqpProtocolHandler extends Protoc
override def dispatch_queue = queue
- refiller = ^ {
- resume_read
+ val producer_overflow = new OverflowSink[Delivery](this) {
+ /**
+ * Called for each value what is passed on to the down stream sink.
+ */
+ override protected def onDelivered(value: Delivery) {
+ receiver.flow(1)
+ pump_out
+ }
}
- val producer_overflow = new OverflowSink[Delivery](this)
-
- def onMessage(delivery: engine.Delivery, message: AmqpMessage) = {
+ def onMessage(delivery: DeliveryImpl, message: AmqpMessage) = {
val d = new Delivery
d.message = message
d.size = message.encoded.length
@@ -699,15 +763,19 @@ class AmqpProtocolHandler extends Protoc
queue {
result match {
case Consumed =>
+ delivery.disposition(new Accepted())
delivery.settle()
case _ =>
async_die("uknown", "Unexpected NAK from broker")
}
}
}
+ } else {
+ delivery.settle()
}
- producer_overflow.offer(d)
+ val accepted = producer_overflow.offer(d)
+ assert(accepted)
receiver.advance();
}
}
@@ -790,7 +858,7 @@ class AmqpProtocolHandler extends Protoc
override def time_stamp = broker.now
var currentBuffer: Buffer = _;
- var currentDelivery: org.apache.qpid.proton.engine.Delivery = _;
+ var currentDelivery: DeliveryImpl = _;
override def drain_overflow: Unit = {
queue.assertExecuting()
@@ -798,13 +866,15 @@ class AmqpProtocolHandler extends Protoc
try {
while (true) {
while (currentBuffer != null) {
- var sent = sender.send(currentBuffer.data, currentBuffer.offset, currentBuffer.length);
- if (sent > 0) {
- pumpNeeded = true
+ if (sender.getCredit > 0) {
+ val sent = sender.send(currentBuffer.data, currentBuffer.offset, currentBuffer.length);
currentBuffer.moveHead(sent);
+ val (session, apollo_delivery) = currentDelivery.getContext.asInstanceOf[(Session[Delivery], Delivery)]
+ delivered(session, apollo_delivery.size)
+ pumpNeeded = true
if (currentBuffer.length == 0) {
if (presettle) {
- settle(currentDelivery, Consumed);
+ settle(currentDelivery, Consumed, false);
} else {
sender.advance();
}
@@ -849,10 +919,10 @@ class AmqpProtocolHandler extends Protoc
currentBuffer = new AmqpMessage(null, message).encoded;
if (presettle) {
- currentDelivery = sender.delivery(EMPTY_BYTE_ARRAY, 0, 0);
+ currentDelivery = sender.delivery(EMPTY_BYTE_ARRAY, 0, 0).asInstanceOf[DeliveryImpl];
} else {
val tag = nextTag
- currentDelivery = sender.delivery(tag, 0, tag.length);
+ currentDelivery = sender.delivery(tag, 0, tag.length).asInstanceOf[DeliveryImpl];
unsettled.put(new AsciiBuffer(tag), currentDelivery)
}
currentDelivery.setContext(value);
@@ -860,6 +930,7 @@ class AmqpProtocolHandler extends Protoc
}
} finally {
if( pumpNeeded ) {
+ pumpNeeded = false
pump_out
}
}
@@ -872,36 +943,68 @@ class AmqpProtocolHandler extends Protoc
redeliveries.removeFirst()
}
}
+ }
- private def settle(delivery: org.apache.qpid.proton.engine.Delivery, ackType: DeliveryResult) {
- val tag: Array[Byte] = delivery.getTag
- if (tag != null && tag.length > 0) {
- checkinTag(tag)
- unsettled.remove(new AsciiBuffer(tag))
- }
- // Don't ack.. redeliver
- val (session, apollo_delivery) = delivery.getContext.asInstanceOf[(Session[Delivery], Delivery)]
- if (ackType == null) {
- delivery.settle
- redeliveries.addFirst((session, apollo_delivery))
- drain_overflow
- } else {
-
- val remoteState = delivery.getRemoteState
- if (remoteState != null && remoteState.isInstanceOf[TransactionalState]) {
- val s: TransactionalState = remoteState.asInstanceOf[TransactionalState]
- val txid = toLong(s.getTxnId)
- async_die("txs-not-supported", "Transactions not yet supported")
- return
+ def process(proton_delivery:DeliveryImpl) = {
+ val state = proton_delivery.getRemoteState();
+ state match {
+ case null =>
+ case accepted:Accepted =>
+ if( !proton_delivery.remotelySettled() ) {
+ proton_delivery.disposition(new Accepted());
}
-
- if (apollo_delivery.ack != null) {
- apollo_delivery.ack(ackType, null)
+ settle(proton_delivery, Consumed, false);
+ case rejected:Rejected =>
+ // re-deliver /w incremented delivery counter.
+ settle(proton_delivery, null, true);
+ case release:Released =>
+ // re-deliver && don't increment the counter.
+ settle(proton_delivery, null, false);
+ case modified:Modified =>
+ def b(v:java.lang.Boolean) = v!=null && v.booleanValue()
+ var ackType = if(b(modified.getUndeliverableHere())) {
+ // receiver does not want the message..
+ // perhaps we should DLQ it?
+ Poisoned;
+ } else {
+ // Delivered ??
+ null
}
- delivery.settle
- pump_out
+ settle(proton_delivery, ackType, b(modified.getDeliveryFailed()));
+ }
+ }
+
+ def settle(delivery:DeliveryImpl, ackType:DeliveryResult, incrementRedelivery:Boolean) {
+ val (session, apollo_delivery) = delivery.getContext.asInstanceOf[(Session[Delivery], Delivery)]
+ if( incrementRedelivery ) {
+ apollo_delivery.redelivered
+ }
+
+ val tag = delivery.getTag();
+ if( tag !=null && tag.length>0 ) {
+ checkinTag(tag);
+ }
+
+ if( ackType == null ) {
+ redeliveries.addFirst((session, apollo_delivery))
+ session_manager.drain_overflow
+ } else {
+
+ val remoteState = delivery.getRemoteState
+ if (remoteState != null && remoteState.isInstanceOf[TransactionalState]) {
+ val s: TransactionalState = remoteState.asInstanceOf[TransactionalState]
+ val txid = toLong(s.getTxnId)
+ async_die("txs-not-supported", "Transactions not yet supported")
+ return
+ }
+
+ if( apollo_delivery.ack != null ) {
+ apollo_delivery.ack(ackType, null)
}
}
+ delivery.settle()
+ pump_out
+
}
class AmqpConsumerSession(p: DeliveryProducer) extends DeliverySession with SessionSinkFilter[Delivery] {
Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpConnectOptions.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpConnectOptions.java?rev=1408852&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpConnectOptions.java (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpConnectOptions.java Tue Nov 13 17:41:01 2012
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.amqp.hawtdispatch.api;
+
+import org.fusesource.hawtdispatch.DispatchQueue;
+import org.fusesource.hawtdispatch.transport.TcpTransport;
+
+import javax.net.ssl.SSLContext;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.*;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class AmqpConnectOptions implements Cloneable {
+
+ private static final long KEEP_ALIVE = Long.parseLong(System.getProperty("amqp.thread.keep_alive", ""+1000));
+ private static final long STACK_SIZE = Long.parseLong(System.getProperty("amqp.thread.stack_size", ""+1024*512));
+ private static ThreadPoolExecutor blockingThreadPool;
+
+ public synchronized static ThreadPoolExecutor getBlockingThreadPool() {
+ if( blockingThreadPool == null ) {
+ blockingThreadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, KEEP_ALIVE, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
+ public Thread newThread(Runnable r) {
+ Thread rc = new Thread(null, r, "AMQP Task", STACK_SIZE);
+ rc.setDaemon(true);
+ return rc;
+ }
+ }) {
+
+ @Override
+ public void shutdown() {
+ // we don't ever shutdown since we are shared..
+ }
+
+ @Override
+ public List<Runnable> shutdownNow() {
+ // we don't ever shutdown since we are shared..
+ return Collections.emptyList();
+ }
+ };
+ }
+ return blockingThreadPool;
+ }
+ public synchronized static void setBlockingThreadPool(ThreadPoolExecutor pool) {
+ blockingThreadPool = pool;
+ }
+
+ private static final URI DEFAULT_HOST;
+ static {
+ try {
+ DEFAULT_HOST = new URI("tcp://localhost");
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ URI host = DEFAULT_HOST;
+ URI localAddress;
+ SSLContext sslContext;
+ DispatchQueue dispatchQueue;
+ Executor blockingExecutor;
+ int maxReadRate;
+ int maxWriteRate;
+ int trafficClass = TcpTransport.IPTOS_THROUGHPUT;
+ boolean useLocalHost;
+ int receiveBufferSize = 1024*64;
+ int sendBufferSize = 1024*64;
+ String localContainerId;
+ String remoteContainerId;
+ String user;
+ String password;
+
+
+ @Override
+ public AmqpConnectOptions clone() {
+ try {
+ return (AmqpConnectOptions) super.clone();
+ } catch (CloneNotSupportedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public String getLocalContainerId() {
+ return localContainerId;
+ }
+
+ public void setLocalContainerId(String localContainerId) {
+ this.localContainerId = localContainerId;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public String getRemoteContainerId() {
+ return remoteContainerId;
+ }
+
+ public void setRemoteContainerId(String remoteContainerId) {
+ this.remoteContainerId = remoteContainerId;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public void setUser(String user) {
+ this.user = user;
+ }
+
+ public Executor getBlockingExecutor() {
+ return blockingExecutor;
+ }
+
+ public void setBlockingExecutor(Executor blockingExecutor) {
+ this.blockingExecutor = blockingExecutor;
+ }
+
+ public DispatchQueue getDispatchQueue() {
+ return dispatchQueue;
+ }
+
+ public void setDispatchQueue(DispatchQueue dispatchQueue) {
+ this.dispatchQueue = dispatchQueue;
+ }
+
+ public URI getLocalAddress() {
+ return localAddress;
+ }
+
+ public void setLocalAddress(String localAddress) throws URISyntaxException {
+ this.setLocalAddress(new URI(localAddress));
+ }
+ public void setLocalAddress(URI localAddress) {
+ this.localAddress = localAddress;
+ }
+
+ public int getMaxReadRate() {
+ return maxReadRate;
+ }
+
+ public void setMaxReadRate(int maxReadRate) {
+ this.maxReadRate = maxReadRate;
+ }
+
+ public int getMaxWriteRate() {
+ return maxWriteRate;
+ }
+
+ public void setMaxWriteRate(int maxWriteRate) {
+ this.maxWriteRate = maxWriteRate;
+ }
+
+ public int getReceiveBufferSize() {
+ return receiveBufferSize;
+ }
+
+ public void setReceiveBufferSize(int receiveBufferSize) {
+ this.receiveBufferSize = receiveBufferSize;
+ }
+
+ public URI getHost() {
+ return host;
+ }
+ public void setHost(String host, int port) throws URISyntaxException {
+ this.setHost(new URI("tcp://"+host+":"+port));
+ }
+ public void setHost(String host) throws URISyntaxException {
+ this.setHost(new URI(host));
+ }
+ public void setHost(URI host) {
+ this.host = host;
+ }
+
+ public int getSendBufferSize() {
+ return sendBufferSize;
+ }
+
+ public void setSendBufferSize(int sendBufferSize) {
+ this.sendBufferSize = sendBufferSize;
+ }
+
+ public SSLContext getSslContext() {
+ return sslContext;
+ }
+
+ public void setSslContext(SSLContext sslContext) {
+ this.sslContext = sslContext;
+ }
+
+ public int getTrafficClass() {
+ return trafficClass;
+ }
+
+ public void setTrafficClass(int trafficClass) {
+ this.trafficClass = trafficClass;
+ }
+
+ public boolean isUseLocalHost() {
+ return useLocalHost;
+ }
+
+ public void setUseLocalHost(boolean useLocalHost) {
+ this.useLocalHost = useLocalHost;
+ }
+}
Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpConnection.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpConnection.java?rev=1408852&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpConnection.java (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpConnection.java Tue Nov 13 17:41:01 2012
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.apollo.amqp.hawtdispatch.api;
+
+import org.apache.activemq.apollo.amqp.hawtdispatch.impl.AmqpListener;
+import org.apache.activemq.apollo.amqp.hawtdispatch.impl.AmqpTransport;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Endpoint;
+import org.apache.qpid.proton.engine.impl.ConnectionImpl;
+import org.apache.qpid.proton.engine.impl.ProtocolTracer;
+import org.apache.qpid.proton.engine.impl.SessionImpl;
+import org.fusesource.hawtdispatch.DispatchQueue;
+import org.fusesource.hawtdispatch.Task;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class AmqpConnection extends AmqpEndpointBase {
+
+ AmqpTransport transport;
+ ConnectionImpl connection;
+ HashSet<AmqpSender> senders = new HashSet<AmqpSender>();
+ boolean closing = false;
+
+ public static AmqpConnection connect(AmqpConnectOptions options) {
+ return new AmqpConnection(options);
+ }
+
+ private AmqpConnection(AmqpConnectOptions options) {
+ transport = AmqpTransport.connect(options);
+ transport.setListener(new AmqpListener() {
+ @Override
+ public void processDelivery(Delivery delivery) {
+ Attachment attachment = (Attachment) getTransport().context(delivery.getLink()).getAttachment();
+ AmqpLink link = (AmqpLink) attachment.endpoint();
+ link.processDelivery(delivery);
+ }
+
+ @Override
+ public void processRefill() {
+ for(AmqpSender sender: new ArrayList<AmqpSender>(senders)) {
+ sender.pumpDeliveries();
+ }
+ pumpOut();
+ }
+
+ });
+ connection = transport.connection();
+ connection.open();
+ attach();
+ }
+
+ public void waitForConnected() throws Exception {
+ assertNotOnDispatchQueue();
+ getConnectedFuture().await();
+ }
+
+ public Future<Void> getConnectedFuture() {
+ final Promise<Void> rc = new Promise<Void>();
+ queue().execute(new Task() {
+ @Override
+ public void run() {
+ onConnected(rc);
+ }
+ });
+ return rc;
+ }
+
+ public void onConnected(Callback<Void> cb) {
+ transport.onTransportConnected(cb);
+ }
+
+ @Override
+ protected Endpoint getEndpoint() {
+ return connection;
+ }
+
+ @Override
+ protected AmqpConnection getConnection() {
+ return this;
+ }
+
+ @Override
+ protected AmqpEndpointBase getParent() {
+ return null;
+ }
+
+ public AmqpSession createSession() {
+ assertExecuting();
+ SessionImpl session = connection.session();
+ session.open();
+ pumpOut();
+ return new AmqpSession(this, session);
+ }
+
+ public int getMaxSessions() {
+ return connection.getMaxChannels();
+ }
+
+ public void disconnect() {
+ closing = true;
+ transport.disconnect();
+ }
+
+ public void waitForDisconnected() throws Exception {
+ assertNotOnDispatchQueue();
+ getDisconnectedFuture().await();
+ }
+
+ public Future<Void> getDisconnectedFuture() {
+ final Promise<Void> rc = new Promise<Void>();
+ queue().execute(new Task() {
+ @Override
+ public void run() {
+ onDisconnected(rc);
+ }
+ });
+ return rc;
+ }
+
+ public void onDisconnected(Callback<Void> cb) {
+ transport.onTransportDisconnected(cb);
+ }
+
+ public TransportState getTransportState() {
+ return transport.getState();
+ }
+
+ public Throwable getTransportFailure() {
+ return transport.getFailure();
+ }
+
+ public Future<Throwable> getTransportFailureFuture() {
+ final Promise<Throwable> rc = new Promise<Throwable>();
+ queue().execute(new Task() {
+ @Override
+ public void run() {
+ onTransportFailure(rc);
+ }
+ });
+ return rc;
+ }
+
+ public void onTransportFailure(Callback<Throwable> cb) {
+ transport.onTransportFailure(cb);
+ }
+
+ @Override
+ public DispatchQueue queue() {
+ return super.queue();
+ }
+
+ public void setProtocolTracer(ProtocolTracer protocolTracer) {
+ transport.setProtocolTracer(protocolTracer);
+ }
+
+ public ProtocolTracer getProtocolTracer() {
+ return transport.getProtocolTracer();
+ }
+}
Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpDeliveryListener.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpDeliveryListener.java?rev=1408852&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpDeliveryListener.java (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpDeliveryListener.java Tue Nov 13 17:41:01 2012
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.apollo.amqp.hawtdispatch.api;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface AmqpDeliveryListener {
+
+ /**
+ * Caller should suspend/resume the AmqpReceiver to
+ * flow control the delivery of messages.
+ *
+ * @param delivery
+ */
+ void onMessageDelivery(MessageDelivery delivery);
+}
Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpEndpointBase.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpEndpointBase.java?rev=1408852&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpEndpointBase.java (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpEndpointBase.java Tue Nov 13 17:41:01 2012
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.apollo.amqp.hawtdispatch.api;
+
+import org.apache.activemq.apollo.amqp.hawtdispatch.impl.*;
+import org.apache.qpid.proton.engine.Endpoint;
+import org.apache.qpid.proton.engine.EndpointError;
+import org.apache.qpid.proton.engine.EndpointState;
+import org.fusesource.hawtdispatch.Dispatch;
+import org.fusesource.hawtdispatch.DispatchQueue;
+import org.fusesource.hawtdispatch.Task;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract class AmqpEndpointBase extends WatchBase {
+ abstract protected Endpoint getEndpoint();
+ abstract protected AmqpEndpointBase getParent();
+
+ protected AmqpConnection getConnection() {
+ return getParent().getConnection();
+ }
+
+ protected AmqpTransport getTransport() {
+ return getConnection().transport;
+ }
+
+ protected DispatchQueue queue() {
+ return getTransport().queue();
+ }
+
+ protected void assertExecuting() {
+ getTransport().assertExecuting();
+ }
+
+ public void waitForRemoteOpen() throws Exception {
+ assertNotOnDispatchQueue();
+ getRemoteOpenFuture().await();
+ }
+
+ public Future<Void> getRemoteOpenFuture() {
+ final Promise<Void> rc = new Promise<Void>();
+ queue().execute(new Task() {
+ @Override
+ public void run() {
+ onRemoteOpen(rc);
+ }
+ });
+ return rc;
+ }
+
+ public void onRemoteOpen(final Callback<Void> cb) {
+ addWatch(new Watch() {
+ @Override
+ public boolean execute() {
+ switch (getEndpoint().getRemoteState()) {
+ case ACTIVE:
+ cb.onSuccess(null);
+ return true;
+ case CLOSED:
+ cb.onFailure(Support.illegalState("closed"));
+ return true;
+ }
+ return false;
+ }
+ });
+ }
+
+ public EndpointError waitForRemoteClose() throws Exception {
+ assertNotOnDispatchQueue();
+ return getRemoteCloseFuture().await();
+ }
+
+ public Future<EndpointError> getRemoteCloseFuture() {
+ final Promise<EndpointError> rc = new Promise<EndpointError>();
+ queue().execute(new Task() {
+ @Override
+ public void run() {
+ onRemoteClose(rc);
+ }
+ });
+ return rc;
+ }
+
+ public void onRemoteClose(final Callback<EndpointError> cb) {
+ addWatch(new Watch() {
+ @Override
+ public boolean execute() {
+ if (getEndpoint().getRemoteState() == EndpointState.CLOSED) {
+ cb.onSuccess(getEndpoint().getRemoteError());
+ return true;
+ }
+ return false;
+ }
+ });
+ }
+
+ public void close() {
+ getEndpoint().close();
+ pumpOut();
+ }
+
+ public EndpointState getRemoteState() {
+ return getEndpoint().getRemoteState();
+ }
+
+ public EndpointError getRemoteError() {
+ return getEndpoint().getRemoteError();
+ }
+
+ static protected EndpointError toError(Throwable value) {
+ return new EndpointError("error", value.toString());
+ }
+
+ class Attachment extends Task {
+ AmqpEndpointBase endpoint() {
+ return AmqpEndpointBase.this;
+ }
+
+ @Override
+ public void run() {
+ fireWatches();
+ }
+ }
+
+ protected void attach() {
+ getTransport().context(getEndpoint()).setAttachment(new Attachment());
+ }
+
+ protected void defer(Defer defer) {
+ getTransport().defer(defer);
+ }
+
+ protected void pumpOut() {
+ getTransport().pumpOut();
+ }
+
+ static protected void assertNotOnDispatchQueue() {
+ assert Dispatch.getCurrentQueue()==null : "Not allowed to be called when executing on a dispatch queue";
+ }
+
+}
Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpLink.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpLink.java?rev=1408852&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpLink.java (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpLink.java Tue Nov 13 17:41:01 2012
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.apollo.amqp.hawtdispatch.api;
+
+import org.apache.qpid.proton.engine.Delivery;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract public class AmqpLink extends AmqpEndpointBase {
+ abstract protected void processDelivery(Delivery delivery);
+}
Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpReceiver.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpReceiver.java?rev=1408852&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpReceiver.java (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpReceiver.java Tue Nov 13 17:41:01 2012
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.apollo.amqp.hawtdispatch.api;
+
+import org.apache.activemq.apollo.amqp.hawtdispatch.impl.Defer;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.engine.impl.DeliveryImpl;
+import org.apache.qpid.proton.engine.impl.ReceiverImpl;
+import org.apache.qpid.proton.type.messaging.Accepted;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtbuf.ByteArrayOutputStream;
+
+import java.util.LinkedList;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class AmqpReceiver extends AmqpLink {
+
+ final AmqpSession parent;
+ final ReceiverImpl receiver;
+
+ public AmqpReceiver(AmqpSession parent, ReceiverImpl receiver, QoS qos) {
+ this.parent = parent;
+ this.receiver = receiver;
+ attach();
+ }
+
+ @Override
+ protected ReceiverImpl getEndpoint() {
+ return receiver;
+ }
+ @Override
+ protected AmqpSession getParent() {
+ return parent;
+ }
+
+ ByteArrayOutputStream current = new ByteArrayOutputStream();
+
+ @Override
+ protected void processDelivery(Delivery delivery) {
+ if( !delivery.isReadable() ) {
+ System.out.println("it was not readable!");
+ return;
+ }
+
+ if( current==null ) {
+ current = new ByteArrayOutputStream();
+ }
+
+ int count;
+ byte data[] = new byte[1024*4];
+ while( (count = receiver.recv(data, 0, data.length)) > 0 ) {
+ current.write(data, 0, count);
+ }
+
+ // Expecting more deliveries..
+ if( count == 0 ) {
+ return;
+ }
+
+ receiver.advance();
+ Buffer buffer = current.toBuffer();
+ current = null;
+ onMessage(delivery, buffer);
+
+ }
+
+ LinkedList<MessageDelivery> inbound = new LinkedList<MessageDelivery>();
+
+ protected void onMessage(Delivery delivery, Buffer buffer) {
+ MessageDelivery md = new MessageDelivery(buffer) {
+ @Override
+ AmqpLink link() {
+ return AmqpReceiver.this;
+ }
+
+ @Override
+ public void settle() {
+ if( !delivery.isSettled() ) {
+ delivery.disposition(new Accepted());
+ delivery.settle();
+ }
+ drain();
+ }
+ };
+ md.delivery = (DeliveryImpl) delivery;
+ delivery.setContext(md);
+ inbound.add(md);
+ drainInbound();
+ }
+
+ public void drain() {
+ defer(deferedDrain);
+ }
+
+ Defer deferedDrain = new Defer(){
+ public void run() {
+ drainInbound();
+ }
+ };
+ int resumed = 0;
+
+ public void resume() {
+ resumed++;
+ }
+ public void suspend() {
+ resumed--;
+ }
+
+ AmqpDeliveryListener deliveryListener;
+ private void drainInbound() {
+ while( deliveryListener!=null && !inbound.isEmpty() && resumed>0) {
+ deliveryListener.onMessageDelivery(inbound.removeFirst());
+ receiver.flow(1);
+ }
+ }
+
+ public AmqpDeliveryListener getDeliveryListener() {
+ return deliveryListener;
+ }
+
+ public void setDeliveryListener(AmqpDeliveryListener deliveryListener) {
+ this.deliveryListener = deliveryListener;
+ drainInbound();
+ }
+}
Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpSender.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpSender.java?rev=1408852&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpSender.java (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpSender.java Tue Nov 13 17:41:01 2012
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.apollo.amqp.hawtdispatch.api;
+
+import org.apache.activemq.apollo.amqp.hawtdispatch.impl.Defer;
+import org.apache.activemq.apollo.amqp.hawtdispatch.impl.Watch;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.EndpointError;
+import org.apache.qpid.proton.engine.EndpointState;
+import org.apache.qpid.proton.engine.impl.DeliveryImpl;
+import org.apache.qpid.proton.engine.impl.SenderImpl;
+import org.apache.qpid.proton.message.Message;
+import org.apache.qpid.proton.type.messaging.Accepted;
+import org.apache.qpid.proton.type.messaging.Modified;
+import org.apache.qpid.proton.type.messaging.Rejected;
+import org.apache.qpid.proton.type.messaging.Released;
+import org.apache.qpid.proton.type.transport.DeliveryState;
+import org.fusesource.hawtbuf.Buffer;
+
+import java.io.UnsupportedEncodingException;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class AmqpSender extends AmqpLink {
+
+ private byte[] EMPTY_BYTE_ARRAY = new byte[]{};
+ long nextTagId = 0;
+ HashSet<byte[]> tagCache = new HashSet<byte[]>();
+
+ final AmqpSession parent;
+ private final QoS qos;
+ final SenderImpl sender;
+
+ public AmqpSender(AmqpSession parent, SenderImpl sender, QoS qos) {
+ this.parent = parent;
+ this.sender = sender;
+ this.qos = qos;
+ attach();
+ getConnection().senders.add(this);
+ }
+
+ @Override
+ public void close() {
+ super.close();
+ getConnection().senders.remove(this);
+ }
+
+ @Override
+ protected SenderImpl getEndpoint() {
+ return sender;
+ }
+
+ @Override
+ protected AmqpSession getParent() {
+ return parent;
+ }
+
+ final LinkedList<MessageDelivery> outbound = new LinkedList<MessageDelivery>();
+ long outboundBufferSize;
+
+ public MessageDelivery send(Message message) {
+ assertExecuting();
+ MessageDelivery rc = new MessageDelivery(message) {
+ @Override
+ AmqpLink link() {
+ return AmqpSender.this;
+ }
+
+ @Override
+ public void redeliver(boolean incrementDeliveryCounter) {
+ super.redeliver(incrementDeliveryCounter);
+ outbound.add(this);
+ outboundBufferSize += initialSize;
+ defer(deferedPumpDeliveries);
+ }
+ };
+ outbound.add(rc);
+ outboundBufferSize += rc.initialSize;
+ pumpDeliveries();
+ pumpOut();
+ return rc;
+ }
+
+ Buffer currentBuffer;
+ DeliveryImpl currentDelivery;
+
+ Defer deferedPumpDeliveries = new Defer() {
+ public void run() {
+ pumpDeliveries();
+ }
+ };
+
+ public long getOverflowBufferSize() {
+ return outboundBufferSize;
+ }
+
+ protected void pumpDeliveries() {
+ assertExecuting();
+ try {
+ while(true) {
+ while( currentBuffer !=null ) {
+ if( sender.getCredit() > 0 ) {
+ int sent = sender.send(currentBuffer.data, currentBuffer.offset, currentBuffer.length);
+ currentBuffer.moveHead(sent);
+ if( currentBuffer.length == 0 ) {
+ DeliveryImpl current = currentDelivery;
+ MessageDelivery md = (MessageDelivery) current.getContext();
+ currentBuffer = null;
+ currentDelivery = null;
+ if( qos == QoS.AT_MOST_ONCE ) {
+ current.settle();
+ } else {
+ sender.advance();
+ }
+ md.fireWatches();
+ }
+ } else {
+ return;
+ }
+ }
+
+ if( outbound.isEmpty() ) {
+ return;
+ }
+
+ final MessageDelivery md = outbound.removeFirst();
+ outboundBufferSize -= md.initialSize;
+ currentBuffer = md.encoded();
+ if( qos == QoS.AT_MOST_ONCE ) {
+ currentDelivery = sender.delivery(EMPTY_BYTE_ARRAY, 0, 0);
+ } else {
+ final byte[] tag = nextTag();
+ currentDelivery = sender.delivery(tag, 0, tag.length);
+ }
+ md.delivery = currentDelivery;
+ currentDelivery.setContext(md);
+ }
+ } finally {
+ fireWatches();
+ }
+ }
+
+ @Override
+ protected void processDelivery(Delivery delivery) {
+ final MessageDelivery md = (MessageDelivery) delivery.getContext();
+ if( delivery.remotelySettled() && delivery.getTag().length > 0 ) {
+ checkinTag(delivery.getTag());
+ }
+ final DeliveryState state = delivery.getRemoteState();
+ if( state!=null ) {
+ if( state instanceof Accepted) {
+ if( !delivery.remotelySettled() ) {
+ delivery.disposition(new Accepted());
+ }
+ } else if( state instanceof Rejected) {
+ // re-deliver /w incremented delivery counter.
+ md.delivery = null;
+ md.incrementDeliveryCount();
+ outbound.addLast(md);
+ } else if( state instanceof Released) {
+ // re-deliver && don't increment the counter.
+ md.delivery = null;
+ outbound.addLast(md);
+ } else if( state instanceof Modified) {
+ Modified modified = (Modified) state;
+ if ( modified.getDeliveryFailed() ) {
+ // increment delivery counter..
+ md.incrementDeliveryCount();
+ }
+ }
+ delivery.settle();
+ }
+ md.fireWatches();
+ }
+
+ byte[] nextTag() {
+ byte[] rc;
+ if (tagCache != null && !tagCache.isEmpty()) {
+ final Iterator<byte[]> iterator = tagCache.iterator();
+ rc = iterator.next();
+ iterator.remove();
+ } else {
+ try {
+ rc = Long.toHexString(nextTagId++).getBytes("UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return rc;
+ }
+
+ void checkinTag(byte[] data) {
+ if( tagCache.size() < 1024 ) {
+ tagCache.add(data);
+ }
+ }
+
+ public void onOverflowBufferDrained(final Callback<Void> cb) {
+ addWatch(new Watch() {
+ @Override
+ public boolean execute() {
+ if (outboundBufferSize==0) {
+ cb.onSuccess(null);
+ return true;
+ }
+ return false;
+ }
+ });
+ }
+}
Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpSession.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpSession.java?rev=1408852&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpSession.java (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/AmqpSession.java Tue Nov 13 17:41:01 2012
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.apollo.amqp.hawtdispatch.api;
+
+import org.apache.qpid.proton.engine.Endpoint;
+import org.apache.qpid.proton.engine.Link;
+import org.apache.qpid.proton.engine.impl.ReceiverImpl;
+import org.apache.qpid.proton.engine.impl.SenderImpl;
+import org.apache.qpid.proton.engine.impl.SessionImpl;
+import org.apache.qpid.proton.message.Message;
+import org.apache.qpid.proton.type.Binary;
+import org.apache.qpid.proton.type.messaging.*;
+import org.apache.qpid.proton.type.transport.SenderSettleMode;
+
+import java.util.UUID;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class AmqpSession extends AmqpEndpointBase {
+
+ final AmqpConnection parent;
+ final SessionImpl session;
+
+
+ public AmqpSession(AmqpConnection parent, SessionImpl session) {
+ this.parent = parent;
+ this.session = session;
+ attach();
+ }
+
+ @Override
+ protected Endpoint getEndpoint() {
+ return session;
+ }
+
+ @Override
+ protected AmqpConnection getParent() {
+ return parent;
+ }
+
+ public AmqpSender createSender(Target target) {
+ return createSender(target, QoS.AT_LEAST_ONCE);
+ }
+
+ public AmqpSender createSender(Target target, QoS qos) {
+ return createSender(target, qos, UUID.randomUUID().toString());
+ }
+
+ public AmqpSender createSender(Target target, QoS qos, String name) {
+ assertExecuting();
+ SenderImpl sender = session.sender(name);
+ attach();
+// Source source = new Source();
+// source.setAddress(UUID.randomUUID().toString());
+// sender.setSource(source);
+ sender.setTarget(target);
+ configureQos(sender, qos);
+ sender.open();
+ pumpOut();
+ return new AmqpSender(this, sender, qos);
+ }
+
+ public AmqpReceiver createReceiver(Source source) {
+ return createReceiver(source, QoS.AT_LEAST_ONCE);
+ }
+
+ public AmqpReceiver createReceiver(Source source, QoS qos) {
+ return createReceiver(source, qos, 100);
+ }
+
+ public AmqpReceiver createReceiver(Source source, QoS qos, int prefetch) {
+ return createReceiver(source, qos, prefetch, UUID.randomUUID().toString());
+ }
+
+ public AmqpReceiver createReceiver(Source source, QoS qos, int prefetch, String name) {
+ assertExecuting();
+ ReceiverImpl receiver = session.receiver(name);
+ receiver.setSource(source);
+// Target target = new Target();
+// target.setAddress(UUID.randomUUID().toString());
+// receiver.setTarget(target);
+ receiver.flow(prefetch);
+ configureQos(receiver, qos);
+ receiver.open();
+ pumpOut();
+ return new AmqpReceiver(this, receiver, qos);
+ }
+
+ private void configureQos(Link link, QoS qos) {
+ switch (qos) {
+ case AT_MOST_ONCE:
+ link.setSenderSettleMode(SenderSettleMode.SETTLED);
+ link.setReceiverSettleMode(SenderSettleMode.UNSETTLED);
+ break;
+ case AT_LEAST_ONCE:
+ link.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+ link.setReceiverSettleMode(SenderSettleMode.SETTLED);
+ break;
+ case EXACTLY_ONCE:
+ link.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+ link.setReceiverSettleMode(SenderSettleMode.MIXED);
+ break;
+ }
+ }
+
+ public Message createTextMessage(String value) {
+ Message msg = new Message();
+ Section body = new AmqpValue(value);
+ msg.setBody(body);
+ return msg;
+ }
+
+ public Message createBinaryMessage(byte value[]) {
+ return createBinaryMessage(value, 0, value.length);
+ }
+
+ public Message createBinaryMessage(byte value[], int offset, int len) {
+ Message msg = new Message();
+ Data body = new Data(new Binary(value, offset,len));
+ msg.setBody(body);
+ return msg;
+ }
+}
Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/Callback.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/Callback.java?rev=1408852&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/Callback.java (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/Callback.java Tue Nov 13 17:41:01 2012
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.amqp.hawtdispatch.api;
+
+/**
+ * <p>
+ * Function Result that carries one value.
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface Callback<T> {
+ public void onSuccess(T value);
+ public void onFailure(Throwable value);
+}
Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/ChainedCallback.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/ChainedCallback.java?rev=1408852&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/ChainedCallback.java (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/ChainedCallback.java Tue Nov 13 17:41:01 2012
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.amqp.hawtdispatch.api;
+
+/**
+ * <p>
+ * Function Result that carries one value.
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract public class ChainedCallback<In,Out> implements Callback<In> {
+
+ public final Callback<Out> next;
+
+ public ChainedCallback(Callback<Out> next) {
+ this.next = next;
+ }
+
+ public void onFailure(Throwable value) {
+ next.onFailure(value);
+ }
+}
Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/DeliveryAttachment.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/DeliveryAttachment.java?rev=1408852&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/DeliveryAttachment.java (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/DeliveryAttachment.java Tue Nov 13 17:41:01 2012
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.apollo.amqp.hawtdispatch.api;
+
+import org.apache.qpid.proton.engine.Delivery;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract public class DeliveryAttachment {
+ abstract void processDelivery(Delivery delivery);
+}
Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/Future.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/Future.java?rev=1408852&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/Future.java (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/Future.java Tue Nov 13 17:41:01 2012
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.amqp.hawtdispatch.api;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * <p>A simplified Future function results interface.</p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface Future<T> {
+ T await() throws Exception;
+ T await(long amount, TimeUnit unit) throws Exception;
+ void then(Callback<T> callback);
+
+}
Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/MessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/MessageDelivery.java?rev=1408852&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/MessageDelivery.java (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/MessageDelivery.java Tue Nov 13 17:41:01 2012
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.apollo.amqp.hawtdispatch.api;
+
+import org.apache.activemq.apollo.amqp.hawtdispatch.impl.DroppingWritableBuffer;
+import org.apache.activemq.apollo.amqp.hawtdispatch.impl.Watch;
+import org.apache.activemq.apollo.amqp.hawtdispatch.impl.WatchBase;
+import org.apache.qpid.proton.codec.CompositeWritableBuffer;
+import org.apache.qpid.proton.codec.WritableBuffer;
+import org.apache.qpid.proton.engine.impl.DeliveryImpl;
+import org.apache.qpid.proton.message.Message;
+import org.apache.qpid.proton.type.messaging.Accepted;
+import org.apache.qpid.proton.type.messaging.Modified;
+import org.apache.qpid.proton.type.messaging.Rejected;
+import org.apache.qpid.proton.type.messaging.Released;
+import org.apache.qpid.proton.type.transport.DeliveryState;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtdispatch.Task;
+
+import java.nio.ByteBuffer;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public abstract class MessageDelivery extends WatchBase {
+
+ final int initialSize;
+ private Message message;
+ private Buffer encoded;
+ public DeliveryImpl delivery;
+ private int sizeHint = 1024*4;
+
+ static Buffer encode(Message message, int sizeHint) {
+ ByteBuffer buffer = ByteBuffer.wrap(new byte[sizeHint]);
+ DroppingWritableBuffer overflow = new DroppingWritableBuffer();
+ int c = message.encode(new CompositeWritableBuffer(new WritableBuffer.ByteBufferWrapper(buffer), overflow));
+ if( overflow.position() > 0 ) {
+ buffer = ByteBuffer.wrap(new byte[sizeHint+overflow.position()]);
+ c = message.encode(new WritableBuffer.ByteBufferWrapper(buffer));
+ }
+ return new Buffer(buffer.array(), 0, c);
+ }
+
+ static Message decode(Buffer buffer) {
+ Message msg = new Message();
+ int offset = buffer.offset;
+ int len = buffer.length;
+ while( len > 0 ) {
+ int decoded = msg.decode(buffer.data, offset, len);
+ assert decoded > 0: "Make progress decoding the message";
+ offset += decoded;
+ len -= decoded;
+ }
+ return msg;
+ }
+
+ public MessageDelivery(Message message) {
+ this(message, encode(message, 1024*4));
+ }
+
+ public MessageDelivery(Buffer encoded) {
+ this(null, encoded);
+ }
+
+ public MessageDelivery(Message message, Buffer encoded) {
+ this.message = message;
+ this.encoded = encoded;
+ sizeHint = this.encoded.length;
+ initialSize = sizeHint;
+ }
+
+ public Message getMessage() {
+ if( message == null ) {
+ message = decode(encoded);
+ }
+ return message;
+ }
+
+ public Buffer encoded() {
+ if( encoded == null ) {
+ encoded = encode(message, sizeHint);
+ sizeHint = encoded.length;
+ }
+ return encoded;
+ }
+
+ public boolean isSettled() {
+ return delivery!=null && delivery.isSettled();
+ }
+
+ public DeliveryState getRemoteState() {
+ return delivery==null ? null : delivery.getRemoteState();
+ }
+
+ public DeliveryState getLocalState() {
+ return delivery==null ? null : delivery.getLocalState();
+ }
+
+ public void onEncoded(final Callback<Void> cb) {
+ addWatch(new Watch() {
+ @Override
+ public boolean execute() {
+ if( delivery!=null ) {
+ cb.onSuccess(null);
+ return true;
+ }
+ return false;
+ }
+ });
+ }
+
+ /**
+ * @return the remote delivery state when it changes.
+ * @throws Exception
+ */
+ public DeliveryState getRemoteStateChange() throws Exception {
+ AmqpEndpointBase.assertNotOnDispatchQueue();
+ return getRemoteStateChangeFuture().await();
+ }
+
+ /**
+ * @return the future remote delivery state when it changes.
+ */
+ public Future<DeliveryState> getRemoteStateChangeFuture() {
+ final Promise<DeliveryState> rc = new Promise<DeliveryState>();
+ link().queue().execute(new Task() {
+ @Override
+ public void run() {
+ onRemoteStateChange(rc);
+ }
+ });
+ return rc;
+ }
+
+ abstract AmqpLink link();
+
+ boolean watchingRemoteStateChange;
+ public void onRemoteStateChange(final Callback<DeliveryState> cb) {
+ watchingRemoteStateChange = true;
+ final DeliveryState original = delivery.getRemoteState();
+ addWatch(new Watch() {
+ @Override
+ public boolean execute() {
+ if (original == null) {
+ if( delivery.getRemoteState()!=null ) {
+ cb.onSuccess(delivery.getRemoteState());
+ watchingRemoteStateChange = false;
+ return true;
+ }
+ } else {
+ if( !original.equals(delivery.getRemoteState()) ) {
+ cb.onSuccess(delivery.getRemoteState());
+ watchingRemoteStateChange = false;
+ return true;
+ }
+ }
+ return false;
+ }
+ });
+ }
+
+ /**
+ * @return the remote delivery state once settled.
+ * @throws Exception
+ */
+ public DeliveryState getSettle() throws Exception {
+ AmqpEndpointBase.assertNotOnDispatchQueue();
+ return getSettleFuture().await();
+ }
+
+ /**
+ * @return the future remote delivery state once the delivery is settled.
+ */
+ public Future<DeliveryState> getSettleFuture() {
+ final Promise<DeliveryState> rc = new Promise<DeliveryState>();
+ link().queue().execute(new Task() {
+ @Override
+ public void run() {
+ onSettle(rc);
+ }
+ });
+ return rc;
+ }
+
+ public void onSettle(final Callback<DeliveryState> cb) {
+ addWatch(new Watch() {
+ @Override
+ public boolean execute() {
+ if( delivery!=null && (delivery.isSettled() || delivery.remotelySettled()) ) {
+ cb.onSuccess(delivery.getRemoteState());
+ return true;
+ }
+ return false;
+ }
+ });
+ }
+
+ @Override
+ protected void fireWatches() {
+ super.fireWatches();
+ }
+
+ void incrementDeliveryCount() {
+ Message msg = getMessage();
+ msg.setDeliveryCount(msg.getDeliveryCount()+1);
+ encoded = null;
+ }
+
+ public void redeliver(boolean incrementDeliveryCounter) {
+ if( incrementDeliveryCounter ) {
+ incrementDeliveryCount();
+ }
+ }
+
+ public void settle() {
+ if( !delivery.isSettled() ) {
+ delivery.settle();
+ }
+ }
+}
Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/Promise.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/Promise.java?rev=1408852&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/Promise.java (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/Promise.java Tue Nov 13 17:41:01 2012
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.amqp.hawtdispatch.api;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class Promise<T> implements Callback<T>, Future<T> {
+
+ private final CountDownLatch latch = new CountDownLatch(1);
+ Callback<T> next;
+ Throwable error;
+ T value;
+
+ public void onFailure(Throwable value) {
+ Callback<T> callback = null;
+ synchronized(this) {
+ error = value;
+ latch.countDown();
+ callback = next;
+ }
+ if( callback!=null ) {
+ callback.onFailure(value);
+ }
+ }
+
+ public void onSuccess(T value) {
+ Callback<T> callback = null;
+ synchronized(this) {
+ this.value = value;
+ latch.countDown();
+ callback = next;
+ }
+ if( callback!=null ) {
+ callback.onSuccess(value);
+ }
+ }
+
+ public void then(Callback<T> callback) {
+ boolean fire = false;
+ synchronized(this) {
+ next = callback;
+ if( latch.getCount() == 0 ) {
+ fire = true;
+ }
+ }
+ if( fire ) {
+ if( error!=null ) {
+ callback.onFailure(error);
+ } else {
+ callback.onSuccess(value);
+ }
+ }
+ }
+
+ public T await(long amount, TimeUnit unit) throws Exception {
+ if( latch.await(amount, unit) ) {
+ return get();
+ } else {
+ throw new TimeoutException();
+ }
+ }
+
+ public T await() throws Exception {
+ latch.await();
+ return get();
+ }
+
+ private T get() throws Exception {
+ Throwable e = error;
+ if( e !=null ) {
+ if( e instanceof RuntimeException ) {
+ throw (RuntimeException) e;
+ } else if( e instanceof Exception) {
+ throw (Exception) e;
+ } else if( e instanceof Error) {
+ throw (Error) e;
+ } else {
+ // don't expect to hit this case.
+ throw new RuntimeException(e);
+ }
+ }
+ return value;
+ }
+
+}