You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 05:45:18 UTC
svn commit: r961077 - in /activemq/sandbox/activemq-apollo-actor:
activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/
activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/
activemq-stomp/src/test/scala/org/apache/activemq/apollo...
Author: chirino
Date: Wed Jul 7 03:45:17 2010
New Revision: 961077
URL: http://svn.apache.org/viewvc?rev=961077&view=rev
Log:
transport tweaks to get perf up
Removed:
activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/CompletionCallback.java
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala
activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java
activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java
activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=961077&r1=961076&r2=961077&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Wed Jul 7 03:45:17 2010
@@ -22,6 +22,7 @@ import _root_.java.lang.{String}
import _root_.org.apache.activemq.util.buffer.{Buffer, AsciiBuffer}
import _root_.org.fusesource.hawtdispatch._
import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
+import org.apache.activemq.transport.Transport
trait DeliveryProducer {
def collocate(queue:DispatchQueue):Unit
@@ -384,11 +385,21 @@ case class Delivery (
// }
//}
+trait DeliverySink {
+ def full:Boolean
+ def send(delivery:Delivery):Unit
+}
+
+class TransportDeliverySink(val transport:Transport) extends DeliverySink {
+ def full:Boolean = transport.isFull
+ def send(delivery:Delivery) = transport.oneway(delivery.message, delivery)
+}
+
/**
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class DeliveryBuffer(var maxSize:Int=1024*32) {
+class DeliveryBuffer(var maxSize:Int=1024*32) extends DeliverySink {
var deliveries = new LinkedList[Delivery]()
private var size = 0
@@ -425,7 +436,7 @@ class DeliveryBuffer(var maxSize:Int=102
}
-class DeliveryOverflowBuffer(val delivery_buffer:DeliveryBuffer) {
+class DeliveryOverflowBuffer(val delivery_buffer:DeliverySink) extends DeliverySink {
private var overflow = new LinkedList[Delivery]()
@@ -433,7 +444,7 @@ class DeliveryOverflowBuffer(val deliver
while( !overflow.isEmpty && !full ) {
val delivery = overflow.removeFirst
delivery.release
- send_to_delivery_queue(delivery)
+ send_to_delivery_buffer(delivery)
}
}
@@ -444,11 +455,11 @@ class DeliveryOverflowBuffer(val deliver
delivery.retain
overflow.addLast(delivery)
} else {
- send_to_delivery_queue(delivery)
+ send_to_delivery_buffer(delivery)
}
}
- protected def send_to_delivery_queue(value:Delivery) = {
+ protected def send_to_delivery_buffer(value:Delivery) = {
var delivery = Delivery(value)
delivery.setDisposer(^{
drainOverflow
@@ -461,7 +472,7 @@ class DeliveryOverflowBuffer(val deliver
}
-class DeliverySessionManager(val delivery_buffer:DeliveryBuffer, val queue:DispatchQueue) extends BaseRetained {
+class DeliverySessionManager(val delivery_buffer:DeliverySink, val queue:DispatchQueue) extends BaseRetained {
var sessions = List[SessionServer]()
@@ -524,7 +535,7 @@ class DeliverySessionManager(val deliver
override def full = credits <= 0
- override protected def send_to_delivery_queue(value:Delivery) = {
+ override protected def send_to_delivery_buffer(value:Delivery) = {
var delivery = Delivery(value)
delivery.setDisposer(^{
// This is called from the server/consumer thread
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=961077&r1=961076&r2=961077&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Wed Jul 7 03:45:17 2010
@@ -28,7 +28,6 @@ import AsciiBuffer._
import Stomp._
import BufferConversions._
import StompFrameConstants._
-import org.apache.activemq.transport.CompletionCallback
import java.io.IOException
@@ -90,31 +89,17 @@ class StompProtocolHandler extends Proto
}
}
- val outboundChannel = new DeliveryBuffer
+ var outboundChannel:TransportDeliverySink = null
var closed = false
var consumer:SimpleConsumer = null
var producerRoute:DeliveryProducerRoute=null
var host:VirtualHost = null
- outboundChannel.eventHandler = ^{
- var delivery = outboundChannel.receive
- while( delivery!=null ) {
- connection.transport.oneway(delivery.message, new CompletionCallback() {
- def onCompletion() = {
- outboundChannel.ack(delivery)
- }
- def onFailure(e:Exception) = {
- connection.onFailure(e)
- }
- });
- delivery = outboundChannel.receive
- }
- }
-
private def queue = connection.dispatchQueue
override def onTransportConnected() = {
+ outboundChannel = new TransportDeliverySink(connection.transport)
connection.broker.runtime.getDefaultVirtualHost(
queue.wrap { (host)=>
this.host=host
@@ -172,7 +157,7 @@ class StompProtocolHandler extends Proto
def on_stomp_connect(headers:HeaderMap) = {
- connection.transport.oneway(StompFrame(Responses.CONNECTED))
+ connection.transport.oneway(StompFrame(Responses.CONNECTED), null)
}
def get(headers:HeaderMap, name:AsciiBuffer):Option[AsciiBuffer] = {
@@ -267,7 +252,7 @@ class StompProtocolHandler extends Proto
private def die(msg:String) = {
info("Shutting connection down due to: "+msg)
connection.transport.suspendRead
- connection.transport.oneway(StompFrame(Responses.ERROR, Nil, ascii(msg)))
+ connection.transport.oneway(StompFrame(Responses.ERROR, Nil, ascii(msg)), null)
^ {
connection.stop()
} ->: queue
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala?rev=961077&r1=961076&r2=961077&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala Wed Jul 7 03:45:17 2010
@@ -74,8 +74,19 @@ class StompWireFormat extends WireFormat
ByteBuffer.wrap(Array(x));
}
+
def marshal(command:Any, os:DataOutput) = {
+ marshal(command.asInstanceOf[StompFrame], os)
+ }
+
+ def marshal(command:Any):Buffer= {
val frame = command.asInstanceOf[StompFrame]
+ val os = new DataByteArrayOutputStream(frame.size);
+ marshal(frame, os)
+ os.toBuffer
+ }
+
+ def marshal(frame:StompFrame, os:DataOutput) = {
frame.action.writeTo(os)
os.write(NEWLINE)
@@ -102,15 +113,6 @@ class StompWireFormat extends WireFormat
END_OF_FRAME_BUFFER.writeTo(os)
}
- def marshal(command:Any):Buffer= {
- val frame = command.asInstanceOf[StompFrame]
- // make a little bigger since size can be an estimate and we want to avoid
- // a capacity re-size.
- val os = new DataByteArrayOutputStream(frame.size + 100);
- marshal(frame, os)
- os.toBuffer
- }
-
def unmarshal(packet:Buffer) = {
throw new UnsupportedOperationException
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala?rev=961077&r1=961076&r2=961077&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala Wed Jul 7 03:45:17 2010
@@ -21,7 +21,6 @@ import _root_.org.apache.activemq.apollo
import _root_.org.apache.activemq.apollo.broker.perf._
import _root_.org.apache.activemq.apollo.stomp._
-import _root_.org.apache.activemq.transport.CompletionCallback
import _root_.org.apache.activemq.util.buffer._
import collection.mutable.{ListBuffer, HashMap}
@@ -29,6 +28,7 @@ import AsciiBuffer._
import Stomp._
import _root_.org.apache.activemq.apollo.stomp.StompFrame
import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
+import org.fusesource.hawtdispatch.BaseRetained
object StompBrokerPerfTest {
def main(args:Array[String]) = {
@@ -55,7 +55,7 @@ class StompRemoteConsumer extends Remote
}
var frame = StompFrame(Stomp.Commands.CONNECT);
- transport.oneway(frame);
+ transport.oneway(frame, null);
var headers:List[(AsciiBuffer, AsciiBuffer)] = Nil
headers ::= (Stomp.Headers.Subscribe.DESTINATION, stompDestination)
@@ -63,7 +63,7 @@ class StompRemoteConsumer extends Remote
headers ::= (Stomp.Headers.Subscribe.ACK_MODE, Stomp.Headers.Subscribe.AckModeValues.AUTO)
frame = StompFrame(Stomp.Commands.SUBSCRIBE, headers);
- transport.oneway(frame);
+ transport.oneway(frame, null);
}
def onTransportCommand(command:Object) = {
@@ -99,36 +99,35 @@ class StompRemoteProducer extends Remote
var stompDestination:AsciiBuffer = null
- val send_next:CompletionCallback = new CompletionCallback() {
- def onCompletion() = {
+ def send_next:Unit = {
+ var headers: List[(AsciiBuffer, AsciiBuffer)] = Nil
+ headers ::= (Stomp.Headers.Send.DESTINATION, stompDestination);
+ if (property != null) {
+ headers ::= (ascii(property), ascii(property));
+ }
+// var p = this.priority;
+// if (priorityMod > 0) {
+// p = if ((counter % priorityMod) == 0) { 0 } else { priority }
+// }
+
+ var content = ascii(createPayload());
+ val frame = StompFrame(Stomp.Commands.SEND, headers, content)
+ val delivery = new BaseRetained()
+ delivery.setDisposer(^{
rate.increment();
val task = ^ {
if( !stopping ) {
-
- var headers: List[(AsciiBuffer, AsciiBuffer)] = Nil
- headers ::= (Stomp.Headers.Send.DESTINATION, stompDestination);
- if (property != null) {
- headers ::= (ascii(property), ascii(property));
- }
-// var p = this.priority;
-// if (priorityMod > 0) {
-// p = if ((counter % priorityMod) == 0) { 0 } else { priority }
-// }
-
- var content = ascii(createPayload());
- transport.oneway(StompFrame(Stomp.Commands.SEND, headers, content), send_next)
+ send_next
}
- }
+ }
if( thinkTime > 0 ) {
dispatchQueue.dispatchAfter(thinkTime, TimeUnit.MILLISECONDS, task)
} else {
dispatchQueue << task
}
- }
- def onFailure(error:Exception) = {
- println("stopping due to: "+error);
- stop
- }
+ })
+ transport.oneway(frame, delivery)
+ delivery.release
}
override def setupProducer() = {
@@ -137,7 +136,8 @@ class StompRemoteProducer extends Remote
} else {
stompDestination = ascii("/topic/"+destination.getName().toString());
}
- transport.oneway(StompFrame(Stomp.Commands.CONNECT), send_next);
+ transport.oneway(StompFrame(Stomp.Commands.CONNECT), null);
+
}
def onTransportCommand(command:Object) = {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala?rev=961077&r1=961076&r2=961077&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala Wed Jul 7 03:45:17 2010
@@ -36,7 +36,7 @@ object StompLoadClient {
import StompLoadClient._
implicit def toAsciiBuffer(value: String) = new AsciiBuffer(value)
- var producerSleep = 1000*1000000;
+ var producerSleep = 0;
var consumerSleep = 0;
var producers = 1;
var consumers = 1;
Modified: activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=961077&r1=961076&r2=961077&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Wed Jul 7 03:45:17 2010
@@ -16,7 +16,6 @@
*/
package org.apache.activemq.transport.tcp;
-import org.apache.activemq.transport.CompletionCallback;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.util.buffer.Buffer;
@@ -25,6 +24,7 @@ import org.apache.activemq.wireformat.Wi
import org.fusesource.hawtdispatch.Dispatch;
import org.fusesource.hawtdispatch.DispatchQueue;
import org.fusesource.hawtdispatch.DispatchSource;
+import org.fusesource.hawtdispatch.Retained;
import java.io.EOFException;
import java.io.IOException;
@@ -32,6 +32,7 @@ import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
import java.util.LinkedList;
import java.util.Map;
@@ -40,7 +41,7 @@ import static org.apache.activemq.transp
/**
* An implementation of the {@link Transport} interface using raw tcp/ip
- *
+ *
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
public class TcpTransport implements Transport {
@@ -75,20 +76,21 @@ public class TcpTransport implements Tra
private DispatchSource writeSource;
final LinkedList<OneWay> outbound = new LinkedList<OneWay>();
- int maxOutbound = 1024*32;
+ int outboundSize = 0;
+ int maxOutbound = 1024 * 32;
ByteBuffer outbound_frame;
protected boolean useLocalHost = true;
- int READ_BUFFFER_SIZE = 1024*32;
- ByteBuffer readBuffer = ByteBuffer.allocate(1024*32);
+ int READ_BUFFFER_SIZE = 1024 * 32;
+ ByteBuffer readBuffer = ByteBuffer.allocate(1024 * 32);
static final class OneWay {
final Buffer buffer;
- final CompletionCallback callback;
+ final Retained retained;
- public OneWay(Buffer buffer, CompletionCallback callback) {
- this.callback = callback;
+ public OneWay(Buffer buffer, Retained retained) {
+ this.retained = retained;
this.buffer = buffer;
}
}
@@ -111,11 +113,11 @@ public class TcpTransport implements Tra
}
public void setDispatchQueue(DispatchQueue queue) {
- if( dispatchQueue!=null ) {
+ if (dispatchQueue != null) {
dispatchQueue.release();
}
this.dispatchQueue = queue;
- if( dispatchQueue!=null ) {
+ if (dispatchQueue != null) {
dispatchQueue.retain();
}
}
@@ -127,18 +129,18 @@ public class TcpTransport implements Tra
if (listener == null) {
throw new IllegalArgumentException("listener is not set");
}
- if( transportState!=CREATED ) {
+ if (transportState != CREATED) {
throw new IllegalStateException("can only be started from the created stae");
}
- transportState=RUNNING;
-
+ transportState = RUNNING;
+
unmarshalSession = wireformat.createUnmarshalSession();
- if( socketState == CONNECTING ) {
+ if (socketState == CONNECTING) {
channel = SocketChannel.open();
}
channel.configureBlocking(false);
- if( socketState == CONNECTING ) {
+ if (socketState == CONNECTING) {
if (localLocation != null) {
InetSocketAddress localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), localLocation.getPort());
@@ -152,7 +154,7 @@ public class TcpTransport implements Tra
final DispatchSource connectSource = Dispatch.createSource(channel, SelectionKey.OP_CONNECT, dispatchQueue);
connectSource.setEventHandler(new Runnable() {
public void run() {
- if( transportState==RUNNING ) {
+ if (transportState == RUNNING) {
try {
socketState = CONNECTED;
channel.finishConnect();
@@ -190,7 +192,7 @@ public class TcpTransport implements Tra
}
readSource = Dispatch.createSource(channel, SelectionKey.OP_READ, dispatchQueue);
- readSource.setEventHandler(new Runnable(){
+ readSource.setEventHandler(new Runnable() {
public void run() {
try {
drainInbound();
@@ -201,12 +203,12 @@ public class TcpTransport implements Tra
});
writeSource = Dispatch.createSource(channel, SelectionKey.OP_WRITE, dispatchQueue);
- writeSource.setEventHandler(new Runnable(){
+ writeSource.setEventHandler(new Runnable() {
public void run() {
- if( transportState==RUNNING ) {
+ if (transportState == RUNNING) {
// once the outbound is drained.. we can suspend getting
// write events.
- if( drainOutbound() ) {
+ if (drainOutbound()) {
writeSource.suspend();
}
}
@@ -219,33 +221,31 @@ public class TcpTransport implements Tra
public void stop() throws Exception {
- if( readSource!=null ) {
+ if (readSource != null) {
readSource.release();
readSource = null;
}
- if( writeSource!=null ) {
+ if (writeSource != null) {
writeSource.release();
writeSource = null;
}
setDispatchQueue(null);
- transportState=DISPOSED;
+ transportState = DISPOSED;
}
- @Deprecated
- public void oneway(Object command) {
- oneway(command, null);
+
+ public boolean isFull() {
+ return outboundSize >= maxOutbound;
}
- public void oneway(Object command, CompletionCallback callback) {
+ public void oneway(Object command, Retained retained) {
assert Dispatch.getCurrentQueue() == dispatchQueue;
try {
- if( socketState != CONNECTED ) {
- throw new IllegalStateException("Not connected.");
- }
- } catch (IllegalStateException e) {
- if( callback!=null ) {
- callback.onFailure(e);
+ if (socketState != CONNECTED) {
+ throw new IOException("Not connected.");
}
+ } catch (IOException e) {
+ listener.onTransportFailure(e);
}
// Marshall the command.
@@ -253,79 +253,95 @@ public class TcpTransport implements Tra
try {
buffer = wireformat.marshal(command);
} catch (IOException e) {
- callback.onFailure(e);
+ listener.onTransportFailure(e);
return;
}
- outbound.add(new OneWay(buffer, callback));
+ OneWay oneway;
+ if (retained!=null && isFull() ) {
+ // retaining blocks the sender it is released.
+ retained.retain();
+ oneway = new OneWay(buffer, retained);
+ } else {
+ oneway = new OneWay(buffer, null);
+ }
+ outbound.add(oneway);
+ outboundSize += buffer.length;
// wait for write ready events if this write
// cannot be drained.
- if( outbound.size()==1 && !drainOutbound() ) {
+ if (outbound.size() == 1 && !drainOutbound()) {
writeSource.resume();
}
}
/**
- * @retruns true if the outbound has been drained of all objects and there are no in progress writes.
- */
+ * @retruns true if the outbound has been drained of all objects and there are no in progress writes.
+ */
private boolean drainOutbound() {
try {
-
- while(socketState == CONNECTED) {
-
- // if we have a pending write that is being sent over the socket...
- if( outbound_frame!=null ) {
-
- channel.write(outbound_frame);
- if( outbound_frame.remaining() != 0 ) {
- return false;
- } else {
- outbound_frame = null;
- }
- } else {
+ while (socketState == CONNECTED) {
- // marshall all the available frames..
- ByteArrayOutputStream buffer = new ByteArrayOutputStream(maxOutbound << 2);
- OneWay oneWay = outbound.poll();
-
- while( oneWay!=null) {
- buffer.write(oneWay.buffer);
- if( oneWay.callback!=null ) {
- oneWay.callback.onCompletion();
- }
- if( buffer.size() < maxOutbound ) {
- oneWay = outbound.poll();
+ // if we have a pending write that is being sent over the socket...
+ if (outbound_frame != null) {
+
+ channel.write(outbound_frame);
+ if (outbound_frame.remaining() != 0) {
+ return false;
} else {
- oneWay = null;
+ outbound_frame = null;
}
- }
-
- if( buffer.size()==0 ) {
- // the source is now drained...
- return true;
} else {
- outbound_frame = buffer.toBuffer().toByteBuffer();
+
+ // marshall all the available frames..
+ OneWay oneWay = outbound.poll();
+
+ int size = 0;
+ ArrayList<Buffer> buffers = new ArrayList<Buffer>(outbound.size());
+ while (oneWay != null) {
+ size+=oneWay.buffer.length;
+ buffers.add(oneWay.buffer);
+ if (oneWay.retained != null) {
+ oneWay.retained.release();
+ }
+ if (size < maxOutbound) {
+ oneWay = outbound.poll();
+ } else {
+ oneWay = null;
+ }
+ }
+
+ if (size == 0) {
+ // the source is now drained...
+ return true;
+ } else {
+ // Make the write just one big buffer.
+ outboundSize -= size;
+ ByteArrayOutputStream buffer = new ByteArrayOutputStream(size);
+ for (Buffer b : buffers) {
+ buffer.write(b);
+ }
+ outbound_frame = buffer.toBuffer().toByteBuffer();
+ }
}
- }
}
} catch (IOException e) {
listener.onTransportFailure(e);
}
-
- return outbound.isEmpty() && outbound_frame==null;
+
+ return outbound.isEmpty() && outbound_frame == null;
}
private void drainInbound() throws IOException {
- if( transportState==DISPOSED || readSource.isSuspended() ) {
+ if (transportState == DISPOSED || readSource.isSuspended()) {
return;
}
- while( true ) {
+ while (true) {
// do we need to read in more data???
if (unmarshalSession.getEndPos() == readBuffer.position()) {
@@ -359,12 +375,12 @@ public class TcpTransport implements Tra
}
}
- Object command=unmarshalSession.unmarshal(readBuffer);
- if( command!=null ) {
+ Object command = unmarshalSession.unmarshal(readBuffer);
+ if (command != null) {
listener.onTransportCommand(command);
// the transport may be suspended after processing a command.
- if( transportState==DISPOSED || readSource.isSuspended() ) {
+ if (transportState == DISPOSED || readSource.isSuspended()) {
return;
}
}
@@ -391,14 +407,15 @@ public class TcpTransport implements Tra
public void resumeRead() {
readSource.resume();
}
-
- public void reconnect(URI uri, CompletionCallback callback) {
+
+ public void reconnect(URI uri) {
throw new UnsupportedOperationException();
}
public TransportListener getTransportListener() {
return listener;
}
+
public void setTransportListener(TransportListener listener) {
this.listener = listener;
}
@@ -406,6 +423,7 @@ public class TcpTransport implements Tra
public WireFormat getWireformat() {
return wireformat;
}
+
public void setWireformat(WireFormat wireformat) {
this.wireformat = wireformat;
}
@@ -417,6 +435,7 @@ public class TcpTransport implements Tra
public boolean isDisposed() {
return transportState == DISPOSED;
}
+
public boolean isFaultTolerant() {
return false;
}
@@ -677,6 +696,7 @@ public class TcpTransport implements Tra
// this.minmumWireFormatVersion = minmumWireFormatVersion;
// }
//
+
public boolean isUseLocalHost() {
return useLocalHost;
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java?rev=961077&r1=961076&r2=961077&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java Wed Jul 7 03:45:17 2010
@@ -22,6 +22,7 @@ import java.net.URI;
import org.apache.activemq.Service;
import org.apache.activemq.wireformat.WireFormat;
import org.fusesource.hawtdispatch.DispatchQueue;
+import org.fusesource.hawtdispatch.Retained;
/**
* Represents an abstract connection. It can be a client side or server side connection.
@@ -30,18 +31,16 @@ import org.fusesource.hawtdispatch.Dispa
*/
public interface Transport extends Service {
- @Deprecated
- void oneway(Object command);
+
+ boolean isFull();
/**
- * A one way asynchronous send. Once the command is transmitted the callback
- * is invoked.
+ * A one way asynchronous send.
*
* @param command
- * @param callback
* @throws IOException
*/
- void oneway(Object command, CompletionCallback callback);
+ void oneway(Object command, Retained retained);
/**
* Returns the current transport listener
@@ -121,6 +120,6 @@ public interface Transport extends Servi
* @param uri
* @throws IOException on failure of if not supported
*/
- void reconnect(URI uri, CompletionCallback callback);
+ void reconnect(URI uri);
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java?rev=961077&r1=961076&r2=961077&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java Wed Jul 7 03:45:17 2010
@@ -21,6 +21,7 @@ import java.net.URI;
import org.apache.activemq.wireformat.WireFormat;
import org.fusesource.hawtdispatch.DispatchQueue;
+import org.fusesource.hawtdispatch.Retained;
/**
* @version $Revision: 1.5 $
@@ -105,16 +106,14 @@ public class TransportFilter implements
return next.toString();
}
- @Deprecated
- public void oneway(Object command) {
- oneway(command, null);
+ public void oneway(Object command, Retained retained) {
+ next.oneway(command, retained);
}
- public void oneway(Object command, CompletionCallback callback) {
- next.oneway(command, callback);
+ public boolean isFull() {
+ return next.isFull();
}
-
public void onTransportFailure(IOException error) {
transportListener.onTransportFailure(error);
}
@@ -154,8 +153,8 @@ public class TransportFilter implements
return next.isConnected();
}
- public void reconnect(URI uri, CompletionCallback callback) {
- next.reconnect(uri, callback);
+ public void reconnect(URI uri) {
+ next.reconnect(uri);
}
public WireFormat getWireformat() {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java?rev=961077&r1=961076&r2=961077&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java Wed Jul 7 03:45:17 2010
@@ -16,15 +16,11 @@
*/
package org.apache.activemq.transport.pipe;
-import org.apache.activemq.transport.CompletionCallback;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.util.buffer.Buffer;
import org.apache.activemq.wireformat.WireFormat;
-import org.fusesource.hawtdispatch.CustomDispatchSource;
-import org.fusesource.hawtdispatch.Dispatch;
-import org.fusesource.hawtdispatch.DispatchQueue;
-import org.fusesource.hawtdispatch.EventAggregators;
+import org.fusesource.hawtdispatch.*;
import java.io.EOFException;
import java.io.IOException;
@@ -139,11 +135,11 @@ public class PipeTransport implements Tr
static final class OneWay {
final Object command;
- final CompletionCallback callback;
+ final Retained retained;
- public OneWay(Object command, CompletionCallback callback) {
- this.callback = callback;
+ public OneWay(Object command, Retained retained) {
this.command = command;
+ this.retained = retained;
}
}
@@ -151,34 +147,34 @@ public class PipeTransport implements Tr
int outbound = 0;
int maxOutbound = 100;
- @Deprecated
- public void oneway(Object command) {
- oneway(command, null);
+ public boolean isFull() {
+ return outbound >= maxOutbound;
}
- public void oneway(Object command, CompletionCallback callback) {
+ public void oneway(Object command, Retained retained) {
if( !connected ) {
throw new IllegalStateException("Not connected.");
}
- if( outbound < maxOutbound ) {
- transmit(command, callback);
+ if( isFull() && retained!=null) {
+ retained.retain();
+ inbound.add(new OneWay(command, retained));
} else {
- inbound.add(new OneWay(command, callback));
+ transmit(command, null);
}
}
private void drainInbound() {
- while( outbound < maxOutbound && !inbound.isEmpty() ) {
+ while( !isFull() && !inbound.isEmpty() ) {
OneWay oneWay = inbound.poll();
- transmit(oneWay.command, oneWay.callback);
+ transmit(oneWay.command, oneWay.retained);
}
}
- private void transmit(Object command, CompletionCallback callback) {
+ private void transmit(Object command, Retained retained) {
outbound++;
peer.dispatchSource.merge(command);
- if( callback!=null ) {
- callback.onCompletion();
+ if( retained!=null ) {
+ retained.release();
}
}
@@ -200,7 +196,7 @@ public class PipeTransport implements Tr
public void resumeRead() {
dispatchSource.resume();
}
- public void reconnect(URI uri, CompletionCallback callback) {
+ public void reconnect(URI uri) {
throw new UnsupportedOperationException();
}