You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 05:42:14 UTC
svn commit: r961070 - in /activemq/sandbox/activemq-apollo-actor:
activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/
activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/
activemq-broker/src/test/scala/org/apache/activemq/apo...
Author: chirino
Date: Wed Jul 7 03:42:13 2010
New Revision: 961070
URL: http://svn.apache.org/viewvc?rev=961070&view=rev
Log:
bringing back the stomp module.. converting it to scala.
Added:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/SharedQueueTest.java
- copied, changed from r961069, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/broker/SharedQueueTest.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/Stomp.java
- copied, changed from r961069, activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/Stomp.java
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
- copied, changed from r961069, activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/StompWireFormat.scala
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/package.html
- copied, changed from r961069, activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/package.html
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
Removed:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/broker/BrokerTest.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/broker/BrokerTestBase.java
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/broker/SharedQueueTest.java
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/DefaultFrameTranslator.java
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/FrameTranslator.java
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/Stomp.java
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompConnection.java
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompFrame.java
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompMessageDelivery.java
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompMessageEvaluationContext.java
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompProtocolHandler.java
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompSslTransportFactory.java
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompTransportFactory.java
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompWireFormat.java
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/StompWireFormatFactory.java
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/XStreamFrameTranslator.java
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/package.html
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/java/org/apache/activemq/perf/broker/stomp/StompBrokerTest.java
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/java/org/apache/activemq/perf/broker/stomp/StompRemoteConsumer.java
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/java/org/apache/activemq/perf/broker/stomp/StompRemoteProducer.java
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/StompConnection.scala
activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/StompWireFormat.scala
activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java
activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java
activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala?rev=961070&r1=961069&r2=961070&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala Wed Jul 7 03:42:13 2010
@@ -31,7 +31,7 @@ class ConnectionConfig {
}
abstract class Connection() extends TransportListener with Service {
- val q = createQueue("connection")
+ val dispatchQueue = createQueue("connection")
var name = "connection"
var stopping = false;
@@ -39,7 +39,7 @@ abstract class Connection() extends Tran
var exceptionListener:ExceptionListener = null;
def start() = {
- transport.setDispatchQueue(q);
+ transport.setDispatchQueue(dispatchQueue);
transport.getDispatchQueue.release
transport.setTransportListener(this);
transport.start()
@@ -48,7 +48,7 @@ abstract class Connection() extends Tran
def stop() = {
stopping=true
transport.stop()
- q.release
+ dispatchQueue.release
}
def onException(error:IOException) = {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=961070&r1=961069&r2=961070&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Wed Jul 7 03:42:13 2010
@@ -84,41 +84,41 @@ trait Message {
}
object Delivery {
- def apply(o:Delivery) = new Delivery(o.message, o.encoded, o.encoding, o.size, o.ack, o.tx_id, o.store_id)
+ def apply(o:Delivery) = new Delivery(o.message, o.size, o.encoded, o.encoding, o.ack, o.tx_id, o.store_id)
}
case class Delivery (
- /**
- * the message being delivered
- */
+ /**
+ * the message being delivered
+ */
message: Message,
/**
- * the encoded form of the message being delivered.
+ * memory size of the delivery. Used for resource allocation tracking
*/
- encoded: Buffer,
+ size:Int,
/**
- * the encoding format of the message
+ * the encoded form of the message being delivered.
*/
- encoding: String,
+ encoded: Buffer = null,
/**
- * memory size of the delivery. Used for resource allocation tracking
+ * the encoding format of the message
*/
- size:Int,
+ encoding: String = null,
/**
* true if this delivery requires acknowledgment.
*/
- ack:Boolean,
+ ack:Boolean = false,
/**
* The id used to identify the transaction that the message
* belongs to.
*/
- tx_id:Long,
+ tx_id:Long = -1,
/**
* The id used to identify this message in the message
@@ -126,7 +126,7 @@ case class Delivery (
*
* @return The store tracking or -1 if not set.
*/
- store_id: Long
+ store_id: Long = -1
) extends BaseRetained {
Copied: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/SharedQueueTest.java (from r961069, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/broker/SharedQueueTest.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/SharedQueueTest.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/SharedQueueTest.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/broker/SharedQueueTest.java&r1=961069&r2=961070&rev=961070&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/broker/SharedQueueTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/SharedQueueTest.java Wed Jul 7 03:42:13 2010
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.broker;
+package org.apache.activemq.apollo.broker;
import junit.framework.TestCase;
Added: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala?rev=961070&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala Wed Jul 7 03:42:13 2010
@@ -0,0 +1,622 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker.perf
+
+import _root_.java.beans.ExceptionListener
+import _root_.java.io.{File}
+import _root_.java.net.URI
+import _root_.java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
+import _root_.java.util.concurrent.TimeUnit
+import _root_.java.util.{LinkedHashMap, ArrayList, HashMap}
+import _root_.org.apache.activemq.apollo.broker._
+import _root_.org.apache.activemq.broker.store.{StoreFactory, Store}
+import _root_.org.apache.activemq.metric.{Period, MetricAggregator, MetricCounter}
+import _root_.java.lang.{String}
+import _root_.org.apache.activemq.util.buffer.{AsciiBuffer}
+import _root_.org.junit.{Test, Before}
+
+import org.apache.activemq.transport.TransportFactory
+
+import _root_.scala.collection.JavaConversions._
+
+
+abstract class RemoteConsumer extends Connection {
+ val consumerRate = new MetricCounter();
+ var totalConsumerRate: MetricAggregator = null
+ var thinkTime: Long = 0
+ var destination: Destination = null
+ var selector: String = null;
+ var durable = false;
+ var uri: URI = null
+
+ override def start() = {
+ consumerRate.name("Consumer " + name + " Rate");
+ totalConsumerRate.add(consumerRate);
+ transport = TransportFactory.connect(uri);
+ super.start();
+ setupSubscription();
+ }
+
+ protected def setupSubscription()
+
+}
+
+
+abstract class RemoteProducer extends Connection {
+ val rate = new MetricCounter();
+
+ var messageIdGenerator: AtomicLong = null
+ var priority = 0
+ var persistentDelivery = false
+ var priorityMod = 0
+ var counter = 0
+ var producerId = 0
+ var destination: Destination = null
+ var property: String = null
+ var totalProducerRate: MetricAggregator = null
+ var next: Delivery = null
+
+ var filler: String = null
+ var payloadSize = 20
+ var uri: URI = null
+
+ override def start() = {
+
+ if (payloadSize > 0) {
+ var sb = new StringBuilder(payloadSize);
+ for (i <- 0 until payloadSize) {
+ sb.append(('a' + (i % 26)).toChar);
+ }
+ filler = sb.toString();
+ }
+
+ rate.name("Producer " + name + " Rate");
+ totalProducerRate.add(rate);
+
+ transport = TransportFactory.connect(uri);
+ super.start();
+ setupProducer();
+
+ }
+
+ def setupProducer()
+
+def createPayload(): String = {
+ if (payloadSize >= 0) {
+ var sb = new StringBuilder(payloadSize);
+ sb.append(name);
+ sb.append(':');
+ counter += 1
+ sb.append(counter);
+ sb.append(':');
+ var length = sb.length;
+ if (length <= payloadSize) {
+ sb.append(filler.subSequence(0, payloadSize - length));
+ return sb.toString();
+ } else {
+ return sb.substring(0, payloadSize);
+ }
+ } else {
+ counter += 1
+ return name + ":" + (counter);
+ }
+ }
+
+}
+
+object BaseBrokerPerfTest {
+ var PERFORMANCE_SAMPLES = Integer.parseInt(System.getProperty("PERFORMANCE_SAMPLES", "3"))
+ var IO_WORK_AMOUNT = 0
+ var FANIN_COUNT = 10
+ var FANOUT_COUNT = 10
+ var PRIORITY_LEVELS = 10
+ var USE_INPUT_QUEUES = true
+
+ var USE_KAHA_DB = true;
+ var PURGE_STORE = true;
+ var PERSISTENT = false;
+ var DURABLE = false;
+
+}
+abstract class BaseBrokerPerfTest {
+ import BaseBrokerPerfTest._
+
+ // Set to put senders and consumers on separate brokers.
+ protected var multibroker = false;
+
+ // Set to mockup up ptp:
+ protected var ptp = false;
+
+ // Set to use tcp IO
+ protected var tcp = true;
+ // set to force marshalling even in the NON tcp case.
+ protected var forceMarshalling = true;
+
+ protected var sendBrokerBindURI: String = null
+ protected var receiveBrokerBindURI: String = null
+ protected var sendBrokerConnectURI: String = null
+ protected var receiveBrokerConnectURI: String = null
+
+ protected var producerCount = 0
+ protected var consumerCount = 0
+ protected var destCount = 0
+
+ protected val totalProducerRate = new MetricAggregator().name("Aggregate Producer Rate").unit("items")
+ protected val totalConsumerRate = new MetricAggregator().name("Aggregate Consumer Rate").unit("items")
+
+ protected var sendBroker: Broker = null
+ protected var rcvBroker: Broker = null
+ protected val brokers = new ArrayList[Broker]()
+ protected val msgIdGenerator = new AtomicLong()
+ protected val stopping = new AtomicBoolean()
+
+ val producers = new ArrayList[RemoteProducer]()
+ val consumers = new ArrayList[RemoteConsumer]()
+ var name: String = null;
+
+ @Before
+ def setUp() = {
+ if (tcp) {
+ sendBrokerBindURI = "tcp://localhost:10000?wireFormat=" + getBrokerWireFormat();
+ receiveBrokerBindURI = "tcp://localhost:20000?wireFormat=" + getBrokerWireFormat();
+
+ sendBrokerConnectURI = "tcp://localhost:10000?wireFormat=" + getRemoteWireFormat();
+ receiveBrokerConnectURI = "tcp://localhost:20000?wireFormat=" + getRemoteWireFormat();
+ } else {
+ sendBrokerConnectURI = "pipe://SendBroker";
+ receiveBrokerConnectURI = "pipe://ReceiveBroker";
+ if (forceMarshalling) {
+ sendBrokerBindURI = sendBrokerConnectURI + "?wireFormat=" + getBrokerWireFormat();
+ receiveBrokerBindURI = receiveBrokerConnectURI + "?wireFormat=" + getBrokerWireFormat();
+ } else {
+ sendBrokerBindURI = sendBrokerConnectURI;
+ receiveBrokerBindURI = receiveBrokerConnectURI;
+ }
+ }
+ }
+
+ def setName(name: String) = {
+ if (this.name == null) {
+ this.name = name;
+ }
+ }
+
+ def getName() = name
+
+ def getBrokerWireFormat() = "multi"
+
+ def getRemoteWireFormat(): String
+
+ @Test
+ def benchmark_1_1_0(): Unit = {
+ setName("1 producer -> 1 destination -> 0 consumers");
+ if (ptp) {
+ return;
+ }
+ producerCount = 1;
+ destCount = 1;
+
+ createConnections();
+
+ // Start 'em up.
+ startClients();
+ try {
+ reportRates();
+ } finally {
+ stopServices();
+ }
+ }
+
+ @Test
+ def benchmark_1_1_1() = {
+ setName("1 producer -> 1 destination -> 1 consumers");
+ producerCount = 1;
+ destCount = 1;
+ consumerCount = 1;
+
+ createConnections();
+
+ // Start 'em up.
+ startClients();
+ try {
+ reportRates();
+ } finally {
+ stopServices();
+ }
+ }
+
+ @Test
+ def benchmark_10_1_10() = {
+ setName(format("%d producers -> 1 destination -> %d consumers", FANIN_COUNT, FANOUT_COUNT));
+ producerCount = FANIN_COUNT;
+ consumerCount = FANOUT_COUNT;
+ destCount = 1;
+
+ createConnections();
+
+ // Start 'em up.
+ startClients();
+ try {
+ reportRates();
+ } finally {
+ stopServices();
+ }
+ }
+
+ @Test
+ def benchmark_10_1_1() = {
+ setName(format("%d producers -> 1 destination -> 1 consumer", FANIN_COUNT));
+ producerCount = FANIN_COUNT;
+ destCount = 1;
+ consumerCount = 1;
+
+ createConnections();
+
+ // Start 'em up.
+ startClients();
+ try {
+ reportRates();
+ } finally {
+ stopServices();
+ }
+ }
+
+ @Test
+ def benchmark_1_1_10() = {
+ setName(format("1 producer -> 1 destination -> %d consumers", FANOUT_COUNT));
+ producerCount = 1;
+ destCount = 1;
+ consumerCount = FANOUT_COUNT;
+
+ createConnections();
+
+ // Start 'em up.
+ startClients();
+ try {
+ reportRates();
+ } finally {
+ stopServices();
+ }
+ }
+
+ @Test
+ def benchmark_2_2_2() = {
+ setName(format("2 producer -> 2 destination -> 2 consumers"));
+ producerCount = 2;
+ destCount = 2;
+ consumerCount = 2;
+
+ createConnections();
+
+ // Start 'em up.
+ startClients();
+ try {
+ reportRates();
+ } finally {
+ stopServices();
+ }
+ }
+
+ @Test
+ def benchmark_10_10_10() = {
+ setName(format("10 producers -> 10 destinations -> 10 consumers"));
+ producerCount = 10;
+ destCount = 10;
+ consumerCount = 10;
+
+ createConnections();
+
+ // Start 'em up.
+ startClients();
+ try {
+ reportRates();
+ } finally {
+ stopServices();
+ }
+ }
+
+ /**
+ * Tests 2 producers sending to 1 destination with 2 consumres, but with
+ * consumers set to select only messages from each producer. 1 consumers is
+ * set to slow, the other producer should be able to send quickly.
+ *
+ * @throws Exception
+ */
+ @Test
+ def benchmark_2_2_2_SlowConsumer() = {
+ setName(format("2 producer -> 2 destination -> 2 slow consumers"));
+ producerCount = 2;
+ destCount = 2;
+ consumerCount = 2;
+
+ createConnections();
+ consumers.get(0).thinkTime = 50;
+
+ // Start 'em up.
+ startClients();
+ try {
+ reportRates();
+ } finally {
+ stopServices();
+ }
+ }
+
+ @Test
+ def benchmark_2_2_2_Selector() = {
+ setName(format("2 producer -> 2 destination -> 2 selector consumers"));
+ producerCount = 2;
+ destCount = 2;
+ consumerCount = 2;
+
+ createConnections();
+
+ // Add properties to match producers to their consumers
+ for (i <- 0 until consumerCount) {
+ var property = "match" + i;
+ consumers.get(i).selector = property;
+ producers.get(i).property = property;
+ }
+
+ // Start 'em up.
+ startClients();
+ try {
+ reportRates();
+ } finally {
+ stopServices();
+ }
+ }
+
+ /**
+ * Test sending with 1 high priority sender. The high priority sender should
+ * have higher throughput than the other low priority senders.
+ *
+ * @throws Exception
+ */
+ @Test
+ def benchmark_2_1_1_HighPriorityProducer() = {
+
+ setName(format("1 high and 1 normal priority producer -> 1 destination -> 1 consumer"));
+ producerCount = 2;
+ destCount = 1;
+ consumerCount = 1;
+
+ createConnections();
+ var producer = producers.get(0);
+ producer.priority = 1
+ producer.rate.setName("High Priority Producer Rate");
+
+ consumers.get(0).thinkTime = 1;
+
+ // Start 'em up.
+ startClients();
+ try {
+
+ System.out.println("Checking rates for test: " + getName());
+ for (i <- 0 until PERFORMANCE_SAMPLES) {
+ var p = new Period();
+ Thread.sleep(1000 * 5);
+ System.out.println(producer.rate.getRateSummary(p));
+ System.out.println(totalProducerRate.getRateSummary(p));
+ System.out.println(totalConsumerRate.getRateSummary(p));
+ totalProducerRate.reset();
+ totalConsumerRate.reset();
+ }
+
+ } finally {
+ stopServices();
+ }
+ }
+
+ /**
+ * Test sending with 1 high priority sender. The high priority sender should
+ * have higher throughput than the other low priority senders.
+ *
+ * @throws Exception
+ */
+ @Test
+ def benchmark_2_1_1_MixedHighPriorityProducer() = {
+
+ setName(format("1 high/mixed and 1 normal priority producer -> 1 destination -> 1 consumer"));
+ producerCount = 2;
+ destCount = 1;
+ consumerCount = 1;
+
+ createConnections();
+ var producer = producers.get(0);
+ producer.priority = 1;
+ producer.priorityMod = 3;
+ producer.rate.setName("High Priority Producer Rate");
+
+ consumers.get(0).thinkTime = 1
+
+ // Start 'em up.
+ startClients();
+ try {
+
+ System.out.println("Checking rates for test: " + getName());
+ for (i <- 0 until PERFORMANCE_SAMPLES) {
+ var p = new Period();
+ Thread.sleep(1000 * 5);
+ System.out.println(producer.rate.getRateSummary(p));
+ System.out.println(totalProducerRate.getRateSummary(p));
+ System.out.println(totalConsumerRate.getRateSummary(p));
+ totalProducerRate.reset();
+ totalConsumerRate.reset();
+ }
+
+ } finally {
+ stopServices();
+ }
+ }
+
+ def reportRates() = {
+ System.out.println("Checking rates for test: " + getName() + ", " + (if (ptp) {"ptp"} else {"topic"}));
+ for (i <- 0 until PERFORMANCE_SAMPLES) {
+ var p = new Period();
+ Thread.sleep(1000 * 5);
+ System.out.println(totalProducerRate.getRateSummary(p));
+ System.out.println(totalConsumerRate.getRateSummary(p));
+ totalProducerRate.reset();
+ totalConsumerRate.reset();
+ }
+ }
+
+ def createConnections() = {
+
+ if (multibroker) {
+ sendBroker = createBroker("SendBroker", sendBrokerBindURI, sendBrokerConnectURI);
+ rcvBroker = createBroker("RcvBroker", receiveBrokerBindURI, receiveBrokerConnectURI);
+ brokers.add(sendBroker);
+ brokers.add(rcvBroker);
+ } else {
+ sendBroker = createBroker("Broker", sendBrokerBindURI, sendBrokerConnectURI);
+ rcvBroker = sendBroker
+ brokers.add(sendBroker);
+ }
+
+ startBrokers();
+
+ var dests = new Array[Destination](destCount);
+
+ for (i <- 0 until destCount) {
+ val domain = if (ptp) {Domain.QUEUE_DOMAIN} else {Domain.TOPIC_DOMAIN}
+ val name = new AsciiBuffer("dest" + (i + 1))
+ var bean = new SingleDestination(domain, name)
+ dests(i) = bean;
+ if (ptp) {
+ sendBroker.defaultVirtualHost.createQueue(dests(i));
+ if (multibroker) {
+ rcvBroker.defaultVirtualHost.createQueue(dests(i));
+ }
+ }
+ }
+
+ for (i <- 0 until producerCount) {
+ var destination = dests(i % destCount);
+ var producer = createProducer(i, destination);
+ producer.persistentDelivery = PERSISTENT;
+ producers.add(producer);
+ }
+
+ for (i <- 0 until consumerCount) {
+ var destination = dests(i % destCount);
+ var consumer = createConsumer(i, destination);
+ consumer.durable = DURABLE;
+ consumers.add(consumer);
+ }
+
+ // Create MultiBroker connections:
+ // if (multibroker) {
+ // Pipe<Message> pipe = new Pipe<Message>();
+ // sendBroker.createBrokerConnection(rcvBroker, pipe);
+ // rcvBroker.createBrokerConnection(sendBroker, pipe.connect());
+ // }
+ }
+
+ def createConsumer(i: Int, destination: Destination): RemoteConsumer = {
+
+ var consumer = createConsumer();
+ consumer.exceptionListener = new ExceptionListener() {
+ def exceptionThrown(error: Exception) = {
+ if (!stopping.get()) {
+ System.err.println("Consumer Async Error:");
+ error.printStackTrace();
+ }
+ }
+ }
+
+ consumer.uri = new URI(rcvBroker.connectUris.head)
+ consumer.destination = destination
+ consumer.name = "consumer" + (i + 1)
+ consumer.totalConsumerRate = totalConsumerRate
+ return consumer;
+ }
+
+ protected def createConsumer(): RemoteConsumer
+
+ private def createProducer(id: Int, destination: Destination): RemoteProducer = {
+ var producer = createProducer();
+ producer.exceptionListener = new ExceptionListener() {
+ def exceptionThrown(error: Exception) = {
+ if (!stopping.get()) {
+ System.err.println("Producer Async Error:");
+ error.printStackTrace();
+ }
+ }
+ }
+ producer.uri = new URI(sendBroker.connectUris.head)
+ producer.producerId = id + 1
+ producer.name = "producer" + (id + 1)
+ producer.destination = destination
+ producer.messageIdGenerator = msgIdGenerator
+ producer.totalProducerRate = totalProducerRate
+ producer
+ }
+
+ protected def createProducer(): RemoteProducer
+
+ private def createBroker(name: String, bindURI: String, connectUri: String): Broker = {
+ val broker = new Broker()
+ broker.transportServers.add(TransportFactory.bind(new URI(bindURI)))
+ broker.connectUris.add(connectUri)
+ // TODO:
+ // broker.defaultVirtualHost.setStore(createStore(broker))
+ broker
+ }
+
+ protected def createStore(broker: Broker): Store = {
+ val store = if (USE_KAHA_DB) {
+ StoreFactory.createStore("kaha-db");
+ } else {
+ StoreFactory.createStore("memory");
+ }
+ store.setStoreDirectory(new File("target/test-data/broker-test/" + broker.name));
+ store.setDeleteAllMessages(PURGE_STORE);
+ store
+ }
+
+ private def stopServices() = {
+ stopping.set(true);
+ for (broker <- brokers) {
+ broker.stop();
+ }
+ for (connection <- producers) {
+ connection.stop();
+ }
+ for (connection <- consumers) {
+ connection.stop();
+ }
+ }
+
+ private def startBrokers() = {
+ for (broker <- brokers) {
+ broker.start();
+ }
+ }
+
+ private def startClients() = {
+
+ for (connection <- consumers) {
+ connection.start();
+ }
+
+ for (connection <- producers) {
+ connection.start();
+ }
+ }
+
+}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/StompConnection.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/StompConnection.scala?rev=961070&r1=961069&r2=961070&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/StompConnection.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/StompConnection.scala Wed Jul 7 03:42:13 2010
@@ -17,7 +17,6 @@
package org.apache.activemq.ng
import _root_.java.util.{LinkedList}
-import _root_.org.apache.activemq.ng.Stomp
import java.nio.channels.SelectionKey._
import _root_.org.apache.activemq.util.buffer._
import _root_.org.fusesource.hawtdispatch._
Modified: activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/StompWireFormat.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/StompWireFormat.scala?rev=961070&r1=961069&r2=961070&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/StompWireFormat.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/StompWireFormat.scala Wed Jul 7 03:42:13 2010
@@ -17,7 +17,6 @@
package org.apache.activemq.ng
import _root_.java.util.{LinkedList, ArrayList}
-import _root_.org.apache.activemq.ng.Stomp
import java.nio.channels.{SocketChannel}
import java.nio.ByteBuffer
import java.io.{EOFException, IOException}
Copied: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/Stomp.java (from r961069, activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/Stomp.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/Stomp.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/Stomp.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/Stomp.java&r1=961069&r2=961070&rev=961070&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/Stomp.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/Stomp.java Wed Jul 7 03:42:13 2010
@@ -19,15 +19,16 @@ package org.apache.activemq.apollo.stomp
import org.apache.activemq.util.buffer.AsciiBuffer;
import org.apache.activemq.util.buffer.Buffer;
+
public interface Stomp {
-
+
Buffer EMPTY_BUFFER = new Buffer(0);
byte NULL = 0;
Buffer NULL_BUFFER = new Buffer(new byte[]{NULL});
byte NEWLINE = '\n';
Buffer NEWLINE_BUFFER = new Buffer(new byte[]{NEWLINE});
Buffer END_OF_FRAME_BUFFER = new Buffer(new byte[]{NULL, NEWLINE});
-
+
AsciiBuffer TRUE = new AsciiBuffer("true");
AsciiBuffer FALSE = new AsciiBuffer("false");
@@ -35,8 +36,8 @@ public interface Stomp {
AsciiBuffer CONNECT = new AsciiBuffer("CONNECT");
AsciiBuffer SEND = new AsciiBuffer("SEND");
AsciiBuffer DISCONNECT = new AsciiBuffer("DISCONNECT");
- AsciiBuffer SUBSCRIBE = new AsciiBuffer("SUB");
- AsciiBuffer UNSUBSCRIBE = new AsciiBuffer("UNSUB");
+ AsciiBuffer SUBSCRIBE = new AsciiBuffer("SUBSCRIBE");
+ AsciiBuffer UNSUBSCRIBE = new AsciiBuffer("UNSUBSCRIBE");
AsciiBuffer BEGIN_TRANSACTION = new AsciiBuffer("BEGIN");
AsciiBuffer COMMIT_TRANSACTION = new AsciiBuffer("COMMIT");
@@ -57,7 +58,7 @@ public interface Stomp {
public interface Headers {
byte SEPERATOR = ':';
Buffer SEPERATOR_BUFFER = new Buffer(new byte[]{SEPERATOR});
-
+
AsciiBuffer RECEIPT_REQUESTED = new AsciiBuffer("receipt");
AsciiBuffer TRANSACTION = new AsciiBuffer("transaction");
AsciiBuffer CONTENT_LENGTH = new AsciiBuffer("content-length");
@@ -129,16 +130,16 @@ public interface Stomp {
AsciiBuffer MESSAGE_ID = new AsciiBuffer("message-id");
}
}
-
+
public enum Transformations {
JMS_BYTE, JMS_OBJECT_XML, JMS_OBJECT_JSON, JMS_MAP_XML, JMS_MAP_JSON;
-
+
public String toString() {
return name().replaceAll("_", "-").toLowerCase();
}
-
+
public static Transformations getValue(String value) {
return valueOf(value.replaceAll("-", "_").toUpperCase());
}
- }
-}
+ }
+}
\ No newline at end of file
Added: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=961070&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala Wed Jul 7 03:42:13 2010
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.stomp
+
+import _root_.java.util.LinkedList
+import _root_.org.apache.activemq.apollo.broker.{BufferConversions, Destination, Message}
+import _root_.org.apache.activemq.filter.{Expression, MessageEvaluationContext}
+import _root_.org.apache.activemq.util.buffer._
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">chirino</a>
+ */
+object StompFrameConstants {
+ type HeaderMap = List[(AsciiBuffer, AsciiBuffer)]
+ var NO_DATA = new Buffer(0);
+}
+
+import StompFrameConstants._
+import StompConstants._;
+import BufferConversions._
+
+/**
+ * Represents all the data in a STOMP frame.
+ *
+ * @author <a href="http://hiramchirino.com">chirino</a>
+ */
+case class StompFrame(action:AsciiBuffer, headers:HeaderMap=Nil, content:Buffer=NO_DATA) extends Message {
+
+ def headerSize = {
+ if( headers.isEmpty ) {
+ 0
+ } else {
+ // if all the headers were part of the same input buffer.. size can be calculated by
+ // subtracting positions in the buffer.
+ val firstBuffer = headers.head._1
+ val lastBuffer = headers.last._2
+ if( firstBuffer.data eq lastBuffer.data ) {
+ (lastBuffer.offset-firstBuffer.offset)+lastBuffer.length+1
+ } else {
+ // gota do it the hard way
+ var rc = 0;
+ val i = headers.iterator
+ while( i.hasNext ) {
+ val (key, value) = i.next
+ rc += (key.length + value.length+2)
+ }
+ rc
+ }
+ }
+ }
+
+ def size = {
+ if( action.data eq content.data ) {
+ (content.offset-action.offset)+content.length
+ } else {
+ action.length + 1 +
+ headerSize + 1 + content.length
+ }
+ }
+
+ /**
+ * the globally unique id of the message
+ */
+ var id: AsciiBuffer = null
+
+ /**
+ * the globally unique id of the producer
+ */
+ var producer: AsciiBuffer = null
+
+ /**
+ * the message priority.
+ */
+ var priority:Byte = 4;
+
+ /**
+ * a positive value indicates that the delivery has an expiration
+ * time.
+ */
+ var expiration: Long = -1;
+
+ /**
+ * true if the delivery is persistent
+ */
+ var persistent = false
+
+ /**
+ * where the message was sent to.
+ */
+ var destination: Destination = null
+
+ /**
+ * used to apply a selector against the message.
+ */
+ lazy val messageEvaluationContext = new MessageEvaluationContext() {
+
+ def getBodyAs[T](clazz:Class[T]) = {
+ throw new UnsupportedOperationException
+ }
+
+ def getPropertyExpression(name:String):Expression = {
+ throw new UnsupportedOperationException
+ }
+
+ @deprecated("this should go away.")
+ def getLocalConnectionId() = {
+ throw new UnsupportedOperationException
+ }
+
+ def getDestination[T]():T = {
+ throw new UnsupportedOperationException
+ }
+
+ def setDestination(destination:Any):Unit = {
+ throw new UnsupportedOperationException
+ }
+ }
+
+ for( header <- headers ) {
+ header match {
+ case (Stomp.Headers.Message.MESSAGE_ID, value) =>
+ id = value
+ case (Stomp.Headers.Message.PRORITY, value) =>
+ priority = java.lang.Integer.parseInt(value).toByte
+ case (Stomp.Headers.Message.DESTINATION, value) =>
+ destination = value
+ case (Stomp.Headers.Message.EXPIRATION_TIME, value) =>
+ expiration = java.lang.Long.parseLong(value)
+ }
+ }
+}
Copied: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (from r961069, activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/StompWireFormat.scala)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?p2=activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala&p1=activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/StompWireFormat.scala&r1=961069&r2=961070&rev=961070&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-scala/src/main/scala/org/apache/activemq/ng/StompWireFormat.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Wed Jul 7 03:42:13 2010
@@ -14,20 +14,244 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.ng
+package org.apache.activemq.apollo.stomp
import _root_.java.util.{LinkedList, ArrayList}
-import _root_.org.apache.activemq.ng.Stomp
+import _root_.org.apache.activemq.apollo.broker._
+
+import _root_.org.apache.activemq.wireformat.WireFormat
+import _root_.org.fusesource.hawtdispatch.{DispatchQueue, BaseRetained}
import java.nio.channels.{SocketChannel}
import java.nio.ByteBuffer
import java.io.{EOFException, IOException}
import _root_.org.apache.activemq.util.buffer._
import collection.mutable.{ListBuffer, HashMap}
+import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
import AsciiBuffer._
import Stomp._
import Stomp.Headers._
+import BufferConversions._
+import _root_.scala.collection.JavaConversions._
+import StompFrameConstants._;
+
+
+class StompProtocolException(msg:String) extends Exception(msg)
+
+object StompConstants {
+ val QUEUE_PREFIX = new AsciiBuffer("/topic/")
+ val TOPIC_PREFIX = new AsciiBuffer("/queue/")
+
+ implicit def toDestination(value:AsciiBuffer):Destination = {
+ if( value.startsWith(QUEUE_PREFIX) ) {
+ new SingleDestination(Domain.QUEUE_DOMAIN, value.slice(QUEUE_PREFIX.length, -QUEUE_PREFIX.length))
+ } else if( value.startsWith(TOPIC_PREFIX) ) {
+ new SingleDestination(Domain.TOPIC_DOMAIN, value.slice(TOPIC_PREFIX.length, -TOPIC_PREFIX.length))
+ } else {
+ throw new StompProtocolException("Invalid stomp destiantion name: "+value);
+ }
+ }
+
+}
+
+import StompConstants._
+
+class StompProtocolHandler extends ProtocolHandler {
+
+
+ class SimpleConsumer(val dest:AsciiBuffer) extends BaseRetained with DeliveryConsumer {
+
+ val queue = StompProtocolHandler.this.dispatchQueue
+ queue.retain
+ setDisposer(^{ queue.release })
+
+ val deliveryQueue = new DeliveryCreditBufferProtocol(outboundChannel, queue)
+
+ def matches(message:Delivery) = true
+
+ def open_session(producer_queue:DispatchQueue) = new DeliverySession {
+ val session = deliveryQueue.session(producer_queue)
+
+ val consumer = SimpleConsumer.this
+ retain
+
+ def deliver(delivery:Delivery) = session.send(delivery)
+
+ def close = {
+ session.close
+ release
+ }
+ }
+ }
+
+ def dispatchQueue = connection.dispatchQueue
+ val outboundChannel = new DeliveryBuffer
+ var closed = false
+ var consumer:SimpleConsumer = null
+
+ var connection:BrokerConnection = null
+ var wireformat:WireFormat = null
+ var producerRoute:DeliveryProducerRoute=null
+ var host:VirtualHost = null
+
+ private def queue = connection.dispatchQueue
+
+ def setConnection(connection:BrokerConnection) = {
+ this.connection = connection
+
+ // We will be using the default virtual host
+ connection.broker.getDefaultVirtualHost(
+ queue.wrap { (host)=>
+ this.host=host
+ }
+ )
+ }
+
+ def setWireFormat(wireformat:WireFormat) = { this.wireformat = wireformat}
+
+ def onCommand(command:Any) = {
+ val frame = command.asInstanceOf[StompFrame]
+ frame match {
+ case StompFrame(Commands.CONNECT, headers, _) =>
+ on_stomp_connect(headers)
+ case StompFrame(Commands.SEND, headers, content) =>
+ on_stomp_send(frame)
+ case StompFrame(Commands.SUBSCRIBE, headers, content) =>
+ on_stomp_subscribe(headers)
+ case StompFrame(Commands.ACK, headers, content) =>
+ // TODO:
+ case StompFrame(Commands.DISCONNECT, headers, content) =>
+ stop
+ case StompFrame(unknown, _, _) =>
+ die("Unsupported STOMP command: "+unknown);
+ }
+ }
+
+
+ def on_stomp_connect(headers:HeaderMap) = {
+ println("connected on: "+Thread.currentThread.getName);
+ connection.transport.oneway(StompFrame(Responses.CONNECTED))
+ }
+
+ def get(headers:HeaderMap, name:AsciiBuffer):Option[AsciiBuffer] = {
+ val i = headers.iterator
+ while( i.hasNext ) {
+ val entry = i.next
+ if( entry._1 == name ) {
+ return Some(entry._2)
+ }
+ }
+ None
+ }
+
+ def on_stomp_send(frame:StompFrame) = {
+ get(frame.headers, Headers.Send.DESTINATION) match {
+ case Some(dest)=>
+ // create the producer route...
+ if( producerRoute==null || producerRoute.destination!= dest ) {
+
+ // clean up the previous producer..
+ if( producerRoute!=null ) {
+ host.router.disconnect(producerRoute)
+ producerRoute=null
+ }
+
+ val producer = new DeliveryProducer() {
+ override def collocate(value:DispatchQueue):Unit = ^{
+// TODO:
+// if( value.getTargetQueue ne queue.getTargetQueue ) {
+// println("sender on "+queue.getLabel+" co-locating with: "+value.getLabel);
+// queue.setTargetQueue(value.getTargetQueue)
+// write_source.setTargetQueue(queue);
+// read_source.setTargetQueue(queue)
+// }
+
+ } ->: queue
+ }
+
+ // don't process frames until we are connected..
+ connection.transport.suspendRead
+ host.router.connect(dest, queue, producer) {
+ (route) =>
+ connection.transport.resumeRead
+ producerRoute = route
+ send_via_route(producerRoute, frame)
+ }
+ } else {
+ // we can re-use the existing producer route
+ send_via_route(producerRoute, frame)
+ }
+ case None=>
+ die("destination not set.")
+ }
+ }
+
+ def send_via_route(route:DeliveryProducerRoute, frame:StompFrame) = {
+ if( !route.targets.isEmpty ) {
+ val delivery = Delivery(frame, frame.size)
+ connection.transport.suspendRead
+ delivery.setDisposer(^{
+ connection.transport.resumeRead
+ })
+ route.targets.foreach(consumer=>{
+ consumer.deliver(delivery)
+ })
+ delivery.release;
+ }
+ }
+
+ def on_stomp_subscribe(headers:HeaderMap) = {
+ println("Consumer on "+Thread.currentThread.getName)
+ get(headers, Headers.Subscribe.DESTINATION) match {
+ case Some(dest)=>
+ if( consumer !=null ) {
+ die("Only one subscription supported.")
+
+ } else {
+ consumer = new SimpleConsumer(dest);
+ host.router.bind(dest, consumer :: Nil)
+ consumer.release
+ }
+ case None=>
+ die("destination not set.")
+ }
+
+ }
+
+ private def die(msg:String) = {
+ println("Shutting connection down due to: "+msg)
+ connection.transport.suspendRead
+ connection.transport.oneway(StompFrame(Responses.ERROR, Nil, ascii(msg)))
+ ^ {
+ stop
+ } ->: queue
+ }
+
+ def onException(error:Exception) = {
+ println("Shutting connection down due to: "+error)
+ stop
+ }
+
+ def start = {
+ }
+
+ def stop = {
+ if( !closed ) {
+ closed=true;
+ if( producerRoute!=null ) {
+ host.router.disconnect(producerRoute)
+ producerRoute=null
+ }
+ if( consumer!=null ) {
+ host.router.unbind(consumer.dest, consumer::Nil)
+ consumer=null
+ }
+ connection.stop
+ }
+ }
+}
+
object StompWireFormat {
val READ_BUFFFER_SIZE = 1024*64;
val MAX_COMMAND_LENGTH = 1024;
@@ -46,78 +270,6 @@ class StompWireFormat {
ByteBuffer.wrap(Array(x));
}
-// var outbound_pos=0
-// var outbound_limit=0
-// var outbound_buffers: ListBuffer[ByteBuffer] = new ListBuffer[ByteBuffer]()
-//
-// /**
-// * @retruns true if the source has been drained of StompFrame objects and they are fully written to the socket
-// */
-// def drain_source(socket:SocketChannel)(source: =>StompFrame ):Boolean = {
-// while(true) {
-// // if we have a pending frame that is being sent over the socket...
-// if( !outbound_buffers.isEmpty ) {
-//
-// val data = outbound_buffers.toArray
-//
-// socket.write(data)
-//
-// // remove all the written buffers...
-// while( !outbound_buffers.isEmpty && outbound_buffers.head.remaining==0 ) {
-// outbound_buffers.remove(0)
-// }
-//
-// if( !outbound_buffers.isEmpty ) {
-// // non blocking socket returned before the buffers were fully written to disk..
-// // we are not yet fully drained.. but need to quit now.
-// return false
-// }
-//
-// } else {
-//
-// var frame = source
-// while( frame!=null ) {
-// marshall(outbound_buffers, frame)
-// frame = source
-// }
-//
-// if( outbound_buffers.size == 0 ) {
-// // the source is now drained...
-// return true
-// }
-// }
-// }
-// true
-// }
-//
-// implicit def toByteBuffer(data:AsciiBuffer) = ByteBuffer.wrap(data.data, data.offset, data.length)
-// implicit def toByteBuffer(data:Buffer) = ByteBuffer.wrap(data.data, data.offset, data.length)
-//
-// def marshall(buffer:ListBuffer[ByteBuffer], frame:StompFrame) = {
-// buffer.append(frame.action)
-// buffer.append(NEWLINE)
-//
-// // we can optimize a little if the headers and content are in the same buffer..
-// if( !frame.headers.isEmpty && !frame.content.isEmpty &&
-// ( frame.headers.getFirst._1.data eq frame.content.data ) ) {
-// buffer.append( ByteBuffer.wrap(frame.content.data, frame.headers.getFirst._1.offset, (frame.content.offset-frame.headers.getFirst._1.offset)+ frame.content.length) )
-//
-// } else {
-// val i = frame.headers.iterator
-// while( i.hasNext ) {
-// val (key, value) = i.next
-// buffer.append(key)
-// buffer.append(SEPERATOR)
-// buffer.append(value)
-// buffer.append(NEWLINE)
-// }
-//
-// buffer.append(NEWLINE)
-// buffer.append(toByteBuffer(frame.content))
-// }
-// buffer.append(toByteBuffer(END_OF_FRAME_BUFFER))
-// }
-
var outbound_frame: ByteBuffer = null
/**
* @retruns true if the source has been drained of StompFrame objects and they are fully written to the socket
@@ -163,18 +315,16 @@ class StompWireFormat {
// we can optimize a little if the headers and content are in the same buffer..
if( !frame.headers.isEmpty && !frame.content.isEmpty &&
- ( frame.headers.getFirst._1.data eq frame.content.data ) ) {
+ ( frame.headers.head._1.data eq frame.content.data ) ) {
- val offset = frame.headers.getFirst._1.offset;
- val buffer1 = frame.headers.getFirst._1;
+ val offset = frame.headers.head._1.offset;
+ val buffer1 = frame.headers.head._1;
val buffer2 = frame.content;
val length = (buffer2.offset-buffer1.offset)+buffer2.length
buffer.write( buffer1.data, offset, length)
} else {
- val i = frame.headers.iterator
- while( i.hasNext ) {
- val (key, value) = i.next
+ for( (key, value) <- frame.headers ) {
buffer.write(key)
buffer.write(SEPERATOR)
buffer.write(value)
@@ -275,9 +425,7 @@ class StompWireFormat {
null
}
- type HeaderMap = LinkedList[(AsciiBuffer, AsciiBuffer)]
-
- def read_headers(action:Buffer, headers:HeaderMap=new LinkedList()):FrameReader = ()=> {
+ def read_headers(action:Buffer, headers:HeaderMap=Nil):FrameReader = ()=> {
val line = read_line(MAX_HEADER_LENGTH, "The maximum header length was exceeded")
if( line !=null ) {
if( line.trim().length() > 0 ) {
@@ -340,7 +488,7 @@ class StompWireFormat {
}
None
}
-
+
def read_binary_body(action:Buffer, headers:HeaderMap, contentLength:Int):FrameReader = ()=> {
val content:Buffer=read_content(contentLength)
@@ -394,6 +542,4 @@ class StompWireFormat {
}
}
-
-
}
Copied: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/package.html (from r961069, activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/package.html)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/package.html?p2=activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/package.html&p1=activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/java/org/apache/activemq/apollo/stomp/package.html&r1=961069&r2=961070&rev=961070&view=diff
==============================================================================
(empty)
Added: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala?rev=961070&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala Wed Jul 7 03:42:13 2010
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.stomp.perf
+
+import _root_.java.util.concurrent.TimeUnit
+import _root_.org.apache.activemq.apollo.broker._
+import _root_.org.apache.activemq.apollo.broker.perf._
+import _root_.org.apache.activemq.apollo.stomp._
+
+import _root_.org.apache.activemq.transport.CompletionCallback
+import _root_.org.apache.activemq.util.buffer._
+import collection.mutable.{ListBuffer, HashMap}
+
+import AsciiBuffer._
+import Stomp._
+import _root_.org.apache.activemq.apollo.stomp.StompFrame
+import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
+
+class StompBrokerPerfTest extends BaseBrokerPerfTest {
+
+ override def createProducer() = new StompRemoteProducer()
+ override def createConsumer() = new StompRemoteConsumer()
+ override def getRemoteWireFormat() = "stomp"
+
+}
+
+class StompRemoteConsumer extends RemoteConsumer {
+
+ def setupSubscription() = {
+ val stompDestination = if( destination.getDomain() == Domain.QUEUE_DOMAIN ) {
+ ascii("/queue/"+destination.getName().toString());
+ } else {
+ ascii("/topic/"+destination.getName().toString());
+ }
+
+ var frame = StompFrame(Stomp.Commands.CONNECT);
+ transport.oneway(frame);
+
+ var headers:List[(AsciiBuffer, AsciiBuffer)] = Nil
+ headers ::= (Stomp.Headers.Subscribe.DESTINATION, stompDestination)
+ headers ::= (Stomp.Headers.Subscribe.ID, ascii("stomp-sub-"+name))
+ headers ::= (Stomp.Headers.Subscribe.ACK_MODE, Stomp.Headers.Subscribe.AckModeValues.AUTO)
+
+ frame = StompFrame(Stomp.Commands.SUBSCRIBE, headers);
+ transport.oneway(frame);
+ }
+
+ def onCommand(command:Object) = {
+ var frame = command.asInstanceOf[StompFrame]
+ frame match {
+ case StompFrame(Responses.CONNECTED, headers, _) =>
+ case StompFrame(Responses.MESSAGE, headers, content) =>
+ messageReceived();
+ case _ =>
+ onFailure(new Exception("Unexpected stomp command: " + frame.action));
+ }
+ }
+
+ protected def messageReceived() {
+ if (thinkTime > 0) {
+ transport.suspendRead
+ dispatchQueue.dispatchAfter(thinkTime, TimeUnit.MILLISECONDS, ^ {
+ consumerRate.increment();
+ if (!stopping) {
+ transport.resumeRead
+ }
+ })
+ } else {
+ consumerRate.increment();
+ }
+ }
+
+}
+
+class StompRemoteProducer extends RemoteProducer {
+
+ var stompDestination:AsciiBuffer = null
+
+ val send_next:CompletionCallback = new CompletionCallback() {
+ def onCompletion() = {
+ rate.increment();
+ if( !stopping ) {
+
+ var headers: List[(AsciiBuffer, AsciiBuffer)] = Nil
+ headers ::= (Stomp.Headers.Send.DESTINATION, stompDestination);
+ if (property != null) {
+ headers ::= (ascii(property), ascii(property));
+ }
+// var p = this.priority;
+// if (priorityMod > 0) {
+// p = if ((counter % priorityMod) == 0) { 0 } else { priority }
+// }
+
+ var content = ascii(createPayload());
+ transport.oneway(StompFrame(Stomp.Commands.SEND, headers, content), send_next)
+ }
+ }
+ def onFailure(error:Throwable) = {
+ println("stopping due to: "+error);
+ stop
+ }
+ }
+
+ override def setupProducer() = {
+ if( destination.getDomain() == Domain.QUEUE_DOMAIN ) {
+ stompDestination = ascii("/queue/"+destination.getName().toString());
+ } else {
+ stompDestination = ascii("/topic/"+destination.getName().toString());
+ }
+ transport.oneway(StompFrame(Stomp.Commands.CONNECT), send_next);
+ }
+
+ def onCommand(command:Object) = {
+ var frame = command.asInstanceOf[StompFrame]
+ frame match {
+ case StompFrame(Responses.CONNECTED, headers, _) =>
+ case _ =>
+ onFailure(new Exception("Unexpected stomp command: " + frame.action));
+ }
+ }
+
+}
+
Modified: activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=961070&r1=961069&r2=961070&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Wed Jul 7 03:42:13 2010
@@ -327,11 +327,11 @@ public class TcpTransport implements Tra
return null;
}
- public void suspend() {
+ public void suspendRead() {
readSource.suspend();
}
- public void resume() {
+ public void resumeRead() {
readSource.resume();
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java?rev=961070&r1=961069&r2=961070&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java Wed Jul 7 03:42:13 2010
@@ -74,12 +74,12 @@ public interface Transport extends Servi
/**
* suspend delivery of commands.
*/
- void suspend();
+ void suspendRead();
/**
* resume delivery of commands.
*/
- void resume();
+ void resumeRead();
/**
* @param target
Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java?rev=961070&r1=961069&r2=961070&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java Wed Jul 7 03:42:13 2010
@@ -66,12 +66,12 @@ public class TransportFilter implements
next.setDispatchQueue(queue);
}
- public void suspend() {
- next.suspend();
+ public void suspendRead() {
+ next.suspendRead();
}
- public void resume() {
- next.resume();
+ public void resumeRead() {
+ next.resumeRead();
}
/**
Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java?rev=961070&r1=961069&r2=961070&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java Wed Jul 7 03:42:13 2010
@@ -193,11 +193,11 @@ public class PipeTransport implements Tr
return null;
}
- public void suspend() {
+ public void suspendRead() {
dispatchSource.suspend();
}
- public void resume() {
+ public void resumeRead() {
dispatchSource.resume();
}
public void reconnect(URI uri, CompletionCallback callback) {