You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 05:40:20 UTC
svn commit: r961068 [4/4] - in /activemq/sandbox/activemq-apollo-actor:
activemq-broker/ activemq-broker/src/main/java/org/apache/activemq/apollo/
activemq-broker/src/main/java/org/apache/activemq/apollo/broker/
activemq-broker/src/main/java/org/apache...
Copied: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/SimplePathFilter.java (from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/SimplePathFilter.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/SimplePathFilter.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/SimplePathFilter.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/SimplePathFilter.java&r1=961067&r2=961068&rev=961068&view=diff
==============================================================================
(empty)
Copied: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/WildcardPathFilter.java (from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/WildcardPathFilter.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/WildcardPathFilter.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/WildcardPathFilter.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/WildcardPathFilter.java&r1=961067&r2=961068&rev=961068&view=diff
==============================================================================
(empty)
Added: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala?rev=961068&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala Wed Jul 7 03:40:18 2010
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.transport.vm
+
+import _root_.java.io.IOException
+import _root_.java.net.URI
+import _root_.java.util.concurrent.atomic.AtomicBoolean
+import _root_.java.util.concurrent.atomic.AtomicInteger
+
+import _root_.org.apache.activemq.apollo.broker._
+import _root_.org.apache.activemq.transport.Transport
+import _root_.org.apache.activemq.transport.TransportFactory
+import _root_.org.apache.activemq.transport.TransportServer
+import _root_.org.apache.activemq.transport.pipe.PipeTransport
+import _root_.org.apache.activemq.transport.pipe.PipeTransportFactory
+import _root_.org.apache.activemq.transport.pipe.PipeTransportServer
+import _root_.org.apache.activemq.util.IOExceptionSupport
+import _root_.org.apache.activemq.util.URISupport
+import _root_.org.apache.activemq.transport.TransportFactorySupport.configure
+import _root_.org.apache.activemq.transport.TransportFactorySupport.verify
+
+import _root_.scala.collection.JavaConversions._
+
+object VMTransportFactory extends Log {
+ val DEFAULT_PIPE_NAME = BrokerConstants.DEFAULT_VIRTUAL_HOST_NAME.toString();
+}
+
+/**
+ * Implements the vm transport which behaves like the pipe transport except that
+ * it can start embedded brokers up on demand.
+ *
+ * @author chirino
+ *
+ */
+class VMTransportFactory extends PipeTransportFactory with Logging {
+
+ import PipeTransportFactory._
+ import VMTransportFactory._
+ override protected def log = VMTransportFactory
+
+ /**
+ * This extension of the PipeTransportServer shuts down the broker
+ * when all the connections are disconnected.
+ *
+ * @author chirino
+ */
+ class VmTransportServer extends PipeTransportServer {
+
+ val refs = new AtomicInteger()
+ var broker:Broker = null
+
+ override def createClientTransport():PipeTransport = {
+ refs.incrementAndGet();
+ new PipeTransport(this) {
+
+ val stopped = new AtomicBoolean()
+
+ override def stop() = {
+ if( stopped.compareAndSet(false, true) ) {
+ super.stop();
+ if( refs.decrementAndGet() == 0 ) {
+ stopBroker();
+ }
+ }
+ }
+ };
+ }
+
+ def setBroker(broker:Broker) = {
+ this.broker = broker;
+ }
+
+ def stopBroker() = {
+ try {
+ this.broker.stop();
+ unbind(this);
+ } catch {
+ case e:Exception=>
+ error("Failed to stop the broker gracefully: "+e);
+ debug("Failed to stop the broker gracefully: ", e);
+ }
+ }
+ }
+
+ override def bind(uri:URI):TransportServer = {
+ new VmTransportServer();
+ }
+
+ override def connect(location:URI):Transport = {
+ try {
+
+ var brokerURI:String = null;
+ var create = true;
+ var name = location.getHost();
+ if (name == null) {
+ name = DEFAULT_PIPE_NAME;
+ }
+
+ var options = URISupport.parseParamters(location);
+ var config = options.remove("broker").asInstanceOf[String]
+ if (config != null) {
+ brokerURI = config;
+ }
+ if ("false".equals(options.remove("create"))) {
+ create = false;
+ }
+
+
+ var server = servers.get(name);
+ if (server == null && create) {
+
+ // Create the broker on demand.
+ var broker = if( brokerURI == null ) {
+ new Broker()
+ } else {
+ BrokerFactory.createBroker(brokerURI);
+ }
+
+ // Remove the existing pipe severs if the broker is configured with one... we want to make sure it
+ // uses the one we explicitly configure here.
+ for (s <- broker.transportServers ) {
+ if (s.isInstanceOf[PipeTransportServer] && name == s.asInstanceOf[PipeTransportServer].getName()) {
+ broker.transportServers.remove(s);
+ }
+ }
+
+ // We want to use a vm transport server impl.
+ var vmTransportServer = TransportFactory.bind(new URI("vm://" + name+"?wireFormat=null")).asInstanceOf[VmTransportServer]
+ vmTransportServer.setBroker(broker);
+ broker.transportServers.add(vmTransportServer);
+ broker.start();
+
+ server = servers.get(name);
+ }
+
+ if (server == null) {
+ throw new IOException("Server is not bound: " + name);
+ }
+
+ var transport = server.connect();
+ verify( configure(transport, options), options);
+
+ } catch {
+// case e:URISyntaxException=>
+// throw IOExceptionSupport.create(e);
+ case e:Exception=>
+ throw IOExceptionSupport.create(e);
+ }
+ }
+
+}
Copied: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/package.html (from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/transport/vm/package.html)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/package.html?p2=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/package.html&p1=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/transport/vm/package.html&r1=961067&r2=961068&rev=961068&view=diff
==============================================================================
(empty)
Copied: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/path/PathMapMemoryTest.java (from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/apollo/broker/path/PathMapMemoryTest.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/path/PathMapMemoryTest.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/path/PathMapMemoryTest.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/apollo/broker/path/PathMapMemoryTest.java&r1=961067&r2=961068&rev=961068&view=diff
==============================================================================
(empty)
Copied: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/path/PathMapTest.java (from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/apollo/broker/path/PathMapTest.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/path/PathMapTest.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/path/PathMapTest.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/apollo/broker/path/PathMapTest.java&r1=961067&r2=961068&rev=961068&view=diff
==============================================================================
(empty)
Copied: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/transport/vm/VMTransportTest.java (from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/apollo/transport/vm/VMTransportTest.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/transport/vm/VMTransportTest.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/transport/vm/VMTransportTest.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/apollo/transport/vm/VMTransportTest.java&r1=961067&r2=961068&rev=961068&view=diff
==============================================================================
(empty)
Added: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/broker/BrokerTest.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/broker/BrokerTest.scala?rev=961068&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/broker/BrokerTest.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/broker/BrokerTest.scala Wed Jul 7 03:40:18 2010
@@ -0,0 +1,698 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker
+
+import _root_.java.beans.ExceptionListener
+import _root_.java.io.{File}
+import _root_.java.net.URI
+import _root_.java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
+import _root_.java.util.{LinkedHashMap, ArrayList, HashMap}
+import _root_.org.apache.activemq.apollo.broker._
+import _root_.org.apache.activemq.broker.store.{StoreFactory, Store}
+import _root_.org.apache.activemq.metric.{Period, MetricAggregator, MetricCounter}
+import _root_.java.lang.{String}
+import _root_.org.apache.activemq.util.buffer.{AsciiBuffer}
+import _root_.org.junit.{Test, Before}
+
+import org.apache.activemq.transport.TransportFactory
+
+import _root_.scala.collection.JavaConversions._
+
+
+abstract class RemoteConsumer extends Connection {
+
+ val consumerRate = new MetricCounter();
+ var totalConsumerRate : MetricAggregator = null
+ var thinkTime:Long = 0
+ var destination:Destination = null
+ var selector:String = null;
+ var durable = false;
+ var uri:URI = null
+
+ private var schedualWait = false;
+
+ override def start() = {
+ consumerRate.name("Consumer " + name + " Rate");
+ totalConsumerRate.add(consumerRate);
+ transport = TransportFactory.connect(uri);
+ schedualWait = true;
+// initialize();
+ super.start();
+ setupSubscription();
+ }
+
+
+ protected def setupSubscription()
+
+ protected def messageReceived( elem:MessageDelivery ) {
+// TODO:
+// if( schedualWait ) {
+// if (thinkTime > 0) {
+// dispatchQueue.dispatchAfter(thinkTime, TimeUnit.MILLISECONDS, new Runnable(){
+// public void run() {
+// consumerRate.increment();
+// controller.elementDispatched(elem);
+// }
+// });
+//
+// }
+// else
+// {
+// consumerRate.increment();
+// controller.elementDispatched(elem);
+// }
+//
+// } else {
+// if( thinkTime>0 ) {
+// try {
+// Thread.sleep(thinkTime);
+// } catch (InterruptedException e) {
+// }
+// }
+// consumerRate.increment();
+// controller.elementDispatched(elem);
+// }
+ }
+
+}
+
+
+abstract class RemoteProducer extends Connection {
+
+ val rate = new MetricCounter();
+
+ var messageIdGenerator:AtomicLong = null
+ var priority = 0
+ var persistentDelivery = false
+ var priorityMod = 0
+ var counter = 0
+ var producerId = 0
+ var destination:Destination =null
+ var property:String = null
+ var totalProducerRate:MetricAggregator = null
+ var next:MessageDelivery = null
+
+ var filler:String = null
+ var payloadSize = 20
+ var uri:URI = null
+
+// TODO:
+// protected IFlowController<MessageDelivery> outboundController;
+// protected IFlowSink<MessageDelivery> outboundQueue;
+
+
+ override def start() = {
+
+ if( payloadSize>0 ) {
+ var sb = new StringBuilder(payloadSize);
+ for( i <- 0 until payloadSize) {
+ sb.append(('a'+(i%26)).toChar);
+ }
+ filler = sb.toString();
+ }
+
+ rate.name("Producer " + name + " Rate");
+ totalProducerRate.add(rate);
+
+
+ transport = TransportFactory.connect(uri);
+// initialize();
+ super.start();
+
+ setupProducer();
+
+ }
+
+ def dispatch() = {
+// TODO:
+// while(true)
+// {
+//
+// if(next == null)
+// {
+// createNextMessage();
+// }
+//
+// //If flow controlled stop until flow control is lifted.
+// if(outboundController.isSinkBlocked())
+// {
+// if(outboundController.addUnblockListener(this))
+// {
+// return;
+// }
+// }
+//
+// outboundQueue.add(next, null);
+// rate.increment();
+// next = null;
+// }
+ }
+
+ def setupProducer()
+
+ def createNextMessage()
+
+// public void onFlowUnblocked(ISinkController<MessageDelivery> controller) {
+// dispatchQueue.dispatchAsync(dispatchTask);
+// }
+
+ def createPayload():String = {
+ if( payloadSize>=0 ) {
+ var sb = new StringBuilder(payloadSize);
+ sb.append(name);
+ sb.append(':');
+ counter += 1
+ sb.append(counter);
+ sb.append(':');
+ var length = sb.length;
+ if( length <= payloadSize ) {
+ sb.append(filler.subSequence(0, payloadSize-length));
+ return sb.toString();
+ } else {
+ return sb.substring(0, payloadSize);
+ }
+ } else {
+ counter += 1
+ return name+":"+(counter);
+ }
+ }
+
+}
+
+object BrokerTestBase {
+
+ var PERFORMANCE_SAMPLES = Integer.parseInt(System.getProperty("PERFORMANCE_SAMPLES", "3"))
+ var IO_WORK_AMOUNT = 0
+ var FANIN_COUNT = 10
+ var FANOUT_COUNT = 10
+ var PRIORITY_LEVELS = 10
+ var USE_INPUT_QUEUES = true
+
+ var USE_KAHA_DB = true;
+ var PURGE_STORE = true;
+ var PERSISTENT = false;
+ var DURABLE = false;
+
+}
+abstract class BrokerTestBase {
+ import BrokerTestBase._
+
+ // Set to put senders and consumers on separate brokers.
+ protected var multibroker = false;
+
+ // Set to mockup up ptp:
+ protected var ptp = false;
+
+ // Set to use tcp IO
+ protected var tcp = true;
+ // set to force marshalling even in the NON tcp case.
+ protected var forceMarshalling = true;
+
+ protected var sendBrokerBindURI:String=null
+ protected var receiveBrokerBindURI:String=null
+ protected var sendBrokerConnectURI:String=null
+ protected var receiveBrokerConnectURI:String=null
+
+ protected var producerCount=0
+ protected var consumerCount=0
+ protected var destCount=0
+
+ protected val totalProducerRate = new MetricAggregator().name("Aggregate Producer Rate").unit("items")
+ protected val totalConsumerRate = new MetricAggregator().name("Aggregate Consumer Rate").unit("items")
+
+ protected var sendBroker:Broker=null
+ protected var rcvBroker:Broker=null
+ protected val brokers = new ArrayList[Broker]()
+ protected val msgIdGenerator = new AtomicLong()
+ protected val stopping = new AtomicBoolean()
+
+ val producers = new ArrayList[RemoteProducer]()
+ val consumers = new ArrayList[RemoteConsumer]()
+ var name:String=null;
+
+ @Before
+ def setUp() = {
+ if (tcp) {
+ sendBrokerBindURI = "tcp://localhost:10000?wireFormat=" + getBrokerWireFormat();
+ receiveBrokerBindURI = "tcp://localhost:20000?wireFormat=" + getBrokerWireFormat();
+
+ sendBrokerConnectURI = "tcp://localhost:10000?wireFormat=" + getRemoteWireFormat();
+ receiveBrokerConnectURI = "tcp://localhost:20000?wireFormat=" + getRemoteWireFormat();
+ } else {
+ sendBrokerConnectURI = "pipe://SendBroker";
+ receiveBrokerConnectURI = "pipe://ReceiveBroker";
+ if (forceMarshalling) {
+ sendBrokerBindURI = sendBrokerConnectURI + "?wireFormat=" + getBrokerWireFormat();
+ receiveBrokerBindURI = receiveBrokerConnectURI + "?wireFormat=" + getBrokerWireFormat();
+ } else {
+ sendBrokerBindURI = sendBrokerConnectURI;
+ receiveBrokerBindURI = receiveBrokerConnectURI;
+ }
+ }
+ }
+
+ def setName(name:String) = {
+ if( this.name==null ) {
+ this.name = name;
+ }
+ }
+
+ def getName() = name
+
+ def getBrokerWireFormat() = "multi"
+
+ def getRemoteWireFormat():String
+
+ @Test
+ def benchmark_1_1_0():Unit = {
+ setName("1 producer -> 1 destination -> 0 consumers");
+ if (ptp) {
+ return;
+ }
+ producerCount = 1;
+ destCount = 1;
+
+ createConnections();
+
+ // Start 'em up.
+ startClients();
+ try {
+ reportRates();
+ } finally {
+ stopServices();
+ }
+ }
+
+ @Test
+ def benchmark_1_1_1() = {
+ setName("1 producer -> 1 destination -> 1 consumers");
+ producerCount = 1;
+ destCount = 1;
+ consumerCount = 1;
+
+ createConnections();
+
+ // Start 'em up.
+ startClients();
+ try {
+ reportRates();
+ } finally {
+ stopServices();
+ }
+ }
+
+ @Test
+ def benchmark_10_1_10() = {
+ setName(format("%d producers -> 1 destination -> %d consumers", FANIN_COUNT, FANOUT_COUNT));
+ producerCount = FANIN_COUNT;
+ consumerCount = FANOUT_COUNT;
+ destCount = 1;
+
+ createConnections();
+
+ // Start 'em up.
+ startClients();
+ try {
+ reportRates();
+ } finally {
+ stopServices();
+ }
+ }
+
+ @Test
+ def benchmark_10_1_1() = {
+ setName(format("%d producers -> 1 destination -> 1 consumer", FANIN_COUNT));
+ producerCount = FANIN_COUNT;
+ destCount = 1;
+ consumerCount = 1;
+
+ createConnections();
+
+ // Start 'em up.
+ startClients();
+ try {
+ reportRates();
+ } finally {
+ stopServices();
+ }
+ }
+
+ @Test
+ def benchmark_1_1_10() = {
+ setName(format("1 producer -> 1 destination -> %d consumers", FANOUT_COUNT));
+ producerCount = 1;
+ destCount = 1;
+ consumerCount = FANOUT_COUNT;
+
+ createConnections();
+
+ // Start 'em up.
+ startClients();
+ try {
+ reportRates();
+ } finally {
+ stopServices();
+ }
+ }
+
+ @Test
+ def benchmark_2_2_2() = {
+ setName(format("2 producer -> 2 destination -> 2 consumers"));
+ producerCount = 2;
+ destCount = 2;
+ consumerCount = 2;
+
+ createConnections();
+
+ // Start 'em up.
+ startClients();
+ try {
+ reportRates();
+ } finally {
+ stopServices();
+ }
+ }
+
+ @Test
+ def benchmark_10_10_10() = {
+ setName(format("10 producers -> 10 destinations -> 10 consumers"));
+ producerCount = 10;
+ destCount = 10;
+ consumerCount = 10;
+
+ createConnections();
+
+ // Start 'em up.
+ startClients();
+ try {
+ reportRates();
+ } finally {
+ stopServices();
+ }
+ }
+
+ /**
+ * Tests 2 producers sending to 1 destination with 2 consumres, but with
+ * consumers set to select only messages from each producer. 1 consumers is
+ * set to slow, the other producer should be able to send quickly.
+ *
+ * @throws Exception
+ */
+ @Test
+ def benchmark_2_2_2_SlowConsumer() = {
+ setName(format("2 producer -> 2 destination -> 2 slow consumers"));
+ producerCount = 2;
+ destCount = 2;
+ consumerCount = 2;
+
+ createConnections();
+ consumers.get(0).thinkTime = 50;
+
+ // Start 'em up.
+ startClients();
+ try {
+ reportRates();
+ } finally {
+ stopServices();
+ }
+ }
+
+ @Test
+ def benchmark_2_2_2_Selector()= {
+ setName(format("2 producer -> 2 destination -> 2 selector consumers"));
+ producerCount = 2;
+ destCount = 2;
+ consumerCount = 2;
+
+ createConnections();
+
+ // Add properties to match producers to their consumers
+ for (i <- 0 until consumerCount) {
+ var property = "match" + i;
+ consumers.get(i).selector = property;
+ producers.get(i).property = property;
+ }
+
+ // Start 'em up.
+ startClients();
+ try {
+ reportRates();
+ } finally {
+ stopServices();
+ }
+ }
+
+ /**
+ * Test sending with 1 high priority sender. The high priority sender should
+ * have higher throughput than the other low priority senders.
+ *
+ * @throws Exception
+ */
+ @Test
+ def benchmark_2_1_1_HighPriorityProducer() = {
+
+ setName(format("1 high and 1 normal priority producer -> 1 destination -> 1 consumer"));
+ producerCount = 2;
+ destCount = 1;
+ consumerCount = 1;
+
+ createConnections();
+ var producer = producers.get(0);
+ producer.priority = 1
+ producer.rate.setName("High Priority Producer Rate");
+
+ consumers.get(0).thinkTime = 1;
+
+ // Start 'em up.
+ startClients();
+ try {
+
+ System.out.println("Checking rates for test: " + getName());
+ for (i <- 0 until PERFORMANCE_SAMPLES) {
+ var p = new Period();
+ Thread.sleep(1000 * 5);
+ System.out.println(producer.rate.getRateSummary(p));
+ System.out.println(totalProducerRate.getRateSummary(p));
+ System.out.println(totalConsumerRate.getRateSummary(p));
+ totalProducerRate.reset();
+ totalConsumerRate.reset();
+ }
+
+ } finally {
+ stopServices();
+ }
+ }
+
+ /**
+ * Test sending with 1 high priority sender. The high priority sender should
+ * have higher throughput than the other low priority senders.
+ *
+ * @throws Exception
+ */
+ @Test
+ def benchmark_2_1_1_MixedHighPriorityProducer() = {
+
+ setName(format("1 high/mixed and 1 normal priority producer -> 1 destination -> 1 consumer"));
+ producerCount = 2;
+ destCount = 1;
+ consumerCount = 1;
+
+ createConnections();
+ var producer = producers.get(0);
+ producer.priority = 1;
+ producer.priorityMod = 3;
+ producer.rate.setName("High Priority Producer Rate");
+
+ consumers.get(0).thinkTime = 1
+
+ // Start 'em up.
+ startClients();
+ try {
+
+ System.out.println("Checking rates for test: " + getName());
+ for (i <- 0 until PERFORMANCE_SAMPLES) {
+ var p = new Period();
+ Thread.sleep(1000 * 5);
+ System.out.println(producer.rate.getRateSummary(p));
+ System.out.println(totalProducerRate.getRateSummary(p));
+ System.out.println(totalConsumerRate.getRateSummary(p));
+ totalProducerRate.reset();
+ totalConsumerRate.reset();
+ }
+
+ } finally {
+ stopServices();
+ }
+ }
+
+ def reportRates() = {
+ System.out.println("Checking rates for test: " + getName() + ", " +(if(ptp){"ptp"}else{"topic"}) );
+ for (i <- 0 until PERFORMANCE_SAMPLES) {
+ var p = new Period();
+ Thread.sleep(1000 * 5);
+ System.out.println(totalProducerRate.getRateSummary(p));
+ System.out.println(totalConsumerRate.getRateSummary(p));
+ totalProducerRate.reset();
+ totalConsumerRate.reset();
+ }
+ }
+
+ def createConnections() = {
+
+ if (multibroker) {
+ sendBroker = createBroker("SendBroker", sendBrokerBindURI, sendBrokerConnectURI);
+ rcvBroker = createBroker("RcvBroker", receiveBrokerBindURI, receiveBrokerConnectURI);
+ brokers.add(sendBroker);
+ brokers.add(rcvBroker);
+ } else {
+ sendBroker = createBroker("Broker", sendBrokerBindURI, sendBrokerConnectURI);
+ rcvBroker = sendBroker
+ brokers.add(sendBroker);
+ }
+
+ startBrokers();
+
+ var dests = new Array[Destination](destCount);
+
+ for (i <-0 until destCount) {
+ val domain = if (ptp) { Domain.QUEUE_DOMAIN} else {Domain.TOPIC_DOMAIN}
+ val name = new AsciiBuffer("dest" + (i + 1))
+ var bean = new SingleDestination(domain, name)
+ dests(i) = bean;
+ if (ptp) {
+ sendBroker.defaultVirtualHost.createQueue(dests(i));
+ if (multibroker) {
+ rcvBroker.defaultVirtualHost.createQueue(dests(i));
+ }
+ }
+ }
+
+ for (i <- 0 until producerCount) {
+ var destination = dests(i % destCount);
+ var producer = createProducer(i, destination);
+ producer.persistentDelivery = PERSISTENT;
+ producers.add(producer);
+ }
+
+ for (i <- 0 until consumerCount) {
+ var destination = dests(i % destCount);
+ var consumer = createConsumer(i, destination);
+ consumer.durable = DURABLE;
+ consumers.add(consumer);
+ }
+
+ // Create MultiBroker connections:
+ // if (multibroker) {
+ // Pipe<Message> pipe = new Pipe<Message>();
+ // sendBroker.createBrokerConnection(rcvBroker, pipe);
+ // rcvBroker.createBrokerConnection(sendBroker, pipe.connect());
+ // }
+ }
+
+ def createConsumer(i:Int, destination:Destination):RemoteConsumer = {
+
+ var consumer = createConsumer();
+ consumer.exceptionListener= new ExceptionListener() {
+ def exceptionThrown(error:Exception ) = {
+ if (!stopping.get()) {
+ System.err.println("Consumer Async Error:");
+ error.printStackTrace();
+ }
+ }
+ }
+
+ consumer.uri = new URI(rcvBroker.connectUris.head)
+ consumer.destination = destination
+ consumer.name = "consumer" + (i + 1)
+ consumer.totalConsumerRate = totalConsumerRate
+ return consumer;
+ }
+
+ protected def createConsumer():RemoteConsumer
+
+ private def createProducer(id:Int, destination:Destination):RemoteProducer = {
+ var producer = createProducer();
+ producer.exceptionListener = new ExceptionListener() {
+ def exceptionThrown(error:Exception ) = {
+ if (!stopping.get()) {
+ System.err.println("Producer Async Error:");
+ error.printStackTrace();
+ }
+ }
+ }
+ producer.uri = new URI(sendBroker.connectUris.head)
+ producer.producerId = id + 1
+ producer.name = "producer" + (id + 1)
+ producer.destination = destination
+ producer.messageIdGenerator = msgIdGenerator
+ producer.totalProducerRate = totalProducerRate
+ producer
+ }
+
+ protected def createProducer():RemoteProducer
+
+ private def createBroker(name:String , bindURI:String , connectUri:String ):Broker = {
+ val broker = new Broker()
+ broker.transportServers.add(TransportFactory.bind(new URI(bindURI)))
+ broker.connectUris.add(connectUri)
+// TODO:
+// broker.defaultVirtualHost.setStore(createStore(broker))
+ broker
+ }
+
+ protected def createStore(broker:Broker):Store = {
+ val store = if (USE_KAHA_DB) {
+ StoreFactory.createStore("kaha-db");
+ } else {
+ StoreFactory.createStore("memory");
+ }
+ store.setStoreDirectory(new File("target/test-data/broker-test/" + broker.name));
+ store.setDeleteAllMessages(PURGE_STORE);
+ store
+ }
+
+ private def stopServices() = {
+ stopping.set(true);
+ for (broker <- brokers) {
+ broker.stop();
+ }
+ for (connection <- producers) {
+ connection.stop();
+ }
+ for (connection <- consumers) {
+ connection.stop();
+ }
+ }
+
+ private def startBrokers() = {
+ for (broker <- brokers) {
+ broker.start();
+ }
+ }
+
+ private def startClients() = {
+
+ for (connection <- consumers) {
+ connection.start();
+ }
+
+ for (connection <- producers) {
+ connection.start();
+ }
+ }
+
+}
Copied: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/broker/BrokerTestBase.java (from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Domain.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/broker/BrokerTestBase.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/broker/BrokerTestBase.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Domain.java&r1=961067&r2=961068&rev=961068&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Domain.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/broker/BrokerTestBase.java Wed Jul 7 03:40:18 2010
@@ -14,27 +14,28 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.apollo.broker;
+package org.apache.activemq.broker;
-import java.util.Collection;
-
-import org.apache.activemq.apollo.broker.path.PathMap;
+import java.beans.ExceptionListener;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.activemq.apollo.broker.Broker;
+import org.apache.activemq.apollo.broker.Destination;
+import org.apache.activemq.apollo.broker.Router;
+import org.apache.activemq.broker.store.Store;
+import org.apache.activemq.broker.store.StoreFactory;
+import org.apache.activemq.metric.MetricAggregator;
+import org.apache.activemq.metric.Period;
+import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.util.buffer.AsciiBuffer;
+import org.junit.Before;
+import org.junit.Test;
-public class Domain {
-
- private final PathMap<DeliveryTarget> targets = new PathMap<DeliveryTarget>();
-
- synchronized public void bind(AsciiBuffer name, DeliveryTarget queue) {
- targets.put(name, queue);
- }
-
- synchronized public void unbind(AsciiBuffer name, DeliveryTarget queue) {
- targets.remove(name, queue);
- }
-
- synchronized public Collection<DeliveryTarget> route(AsciiBuffer name, MessageDelivery delivery) {
- return targets.get(name);
- }
+import static java.lang.String.*;
-}
Copied: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/broker/SharedQueueTest.java (from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/broker/SharedQueueTest.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/broker/SharedQueueTest.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java&r1=961067&r2=961068&rev=961068&view=diff
==============================================================================
(empty)
Modified: activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/JAXBBrokerFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/JAXBBrokerFactory.java?rev=961068&r1=961067&r2=961068&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/JAXBBrokerFactory.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/JAXBBrokerFactory.java Wed Jul 7 03:40:18 2010
@@ -32,31 +32,32 @@ import org.apache.activemq.util.URISuppo
public class JAXBBrokerFactory implements BrokerFactory.Handler {
- public Broker createBroker(URI brokerURI) throws Exception {
- JAXBContext context = JAXBContext.newInstance("org.apache.activemq.apollo.jaxb");
- Unmarshaller unmarshaller = context.createUnmarshaller();
+ public Broker createBroker(String value) {
+ try {
+ URI brokerURI = new URI(value);
+ JAXBContext context = JAXBContext.newInstance("org.apache.activemq.apollo.jaxb");
+ Unmarshaller unmarshaller = context.createUnmarshaller();
- URL configURL;
- brokerURI = URISupport.stripScheme(brokerURI);
- String scheme = brokerURI.getScheme();
- if( scheme==null || "file".equals(scheme) ) {
- configURL = URISupport.changeScheme(URISupport.stripScheme(brokerURI), "file").toURL();
- } else if( "classpath".equals(scheme) ) {
- configURL = Thread.currentThread().getContextClassLoader().getResource(brokerURI.getSchemeSpecificPart());
- } else {
- configURL = URISupport.changeScheme(brokerURI, scheme).toURL();
- }
- if (configURL == null) {
- throw new IOException("Cannot create broker from non-existent URI: " + brokerURI);
- }
- XMLInputFactory factory = XMLInputFactory.newInstance();
- XMLStreamReader reader = factory.createXMLStreamReader(configURL.openStream());
- XMLStreamReader properties = new PropertiesReader(reader);
- try {
+ URL configURL;
+ brokerURI = URISupport.stripScheme(brokerURI);
+ String scheme = brokerURI.getScheme();
+ if( scheme==null || "file".equals(scheme) ) {
+ configURL = URISupport.changeScheme(URISupport.stripScheme(brokerURI), "file").toURL();
+ } else if( "classpath".equals(scheme) ) {
+ configURL = Thread.currentThread().getContextClassLoader().getResource(brokerURI.getSchemeSpecificPart());
+ } else {
+ configURL = URISupport.changeScheme(brokerURI, scheme).toURL();
+ }
+ if (configURL == null) {
+ throw new IOException("Cannot create broker from non-existent URI: " + brokerURI);
+ }
+ XMLInputFactory factory = XMLInputFactory.newInstance();
+ XMLStreamReader reader = factory.createXMLStreamReader(configURL.openStream());
+ XMLStreamReader properties = new PropertiesReader(reader);
BrokerXml xml = (BrokerXml) unmarshaller.unmarshal(properties);
return xml.createMessageBroker();
- } catch (UnmarshalException e) {
- throw new IOException("Cannot create broker from URI: " + brokerURI + ", reason: " + e.getCause());
+ } catch (Exception e) {
+ throw new RuntimeException("Cannot create broker from URI: " + value, e);
}
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/MemoryStoreXml.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/MemoryStoreXml.java?rev=961068&r1=961067&r2=961068&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/MemoryStoreXml.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/MemoryStoreXml.java Wed Jul 7 03:40:18 2010
@@ -28,8 +28,7 @@ import org.apache.activemq.broker.store.
public class MemoryStoreXml extends StoreXml {
public Store createStore() {
- MemoryStore rc = new MemoryStore();
- return rc;
+ return new MemoryStore();
}
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/VirtualHostXml.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/VirtualHostXml.java?rev=961068&r1=961067&r2=961068&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/VirtualHostXml.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/VirtualHostXml.java Wed Jul 7 03:40:18 2010
@@ -27,6 +27,7 @@ import javax.xml.bind.annotation.XmlTran
import javax.xml.bind.annotation.adapters.XmlAdapter;
import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+import org.apache.activemq.apollo.broker.BrokerDatabase;
import org.apache.activemq.apollo.broker.VirtualHost;
import org.apache.activemq.util.buffer.AsciiBuffer;
@@ -34,19 +35,20 @@ import org.apache.activemq.util.buffer.A
@XmlAccessorType(XmlAccessType.FIELD)
public class VirtualHostXml {
- @XmlJavaTypeAdapter(AsciiBufferAdapter.class)
@XmlElement(name="host-name", required=true)
- private ArrayList<AsciiBuffer> hostNames = new ArrayList<AsciiBuffer>();
+ private ArrayList<String> hostNames = new ArrayList<String>();
@XmlElementRef
private StoreXml store;
public VirtualHost createVirtualHost(BrokerXml brokerXml) throws Exception {
VirtualHost rc = new VirtualHost();
- rc.setHostNames(hostNames);
-
+ rc.setNamesArray(hostNames);
if( store != null ) {
- rc.setStore(store.createStore());
+ BrokerDatabase database = new BrokerDatabase();
+ database.setVirtualHost(rc);
+ database.setStore(store.createStore());
+ rc.setDatabase(database);
}
return rc;
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/test/java/org/apache/activemq/apollo/jaxb/JAXBConfigTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/test/java/org/apache/activemq/apollo/jaxb/JAXBConfigTest.java?rev=961068&r1=961067&r2=961068&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/test/java/org/apache/activemq/apollo/jaxb/JAXBConfigTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/test/java/org/apache/activemq/apollo/jaxb/JAXBConfigTest.java Wed Jul 7 03:40:18 2010
@@ -39,66 +39,59 @@ public class JAXBConfigTest extends Test
@Test()
public void testSimpleConfig() throws Exception {
- URI uri = new URI("jaxb:classpath:org/apache/activemq/apollo/jaxb/testSimpleConfig.xml");
+ String uri = "jaxb:classpath:org/apache/activemq/apollo/jaxb/testSimpleConfig.xml";
LOG.info("Loading broker configuration from the classpath with URI: " + uri);
- Broker broker = BrokerFactory.createBroker(uri);
+ Broker broker = BrokerFactory.createBroker(uri, false);
// assertEquals(4, p.getSize());
// assertEquals("test dispatcher", p.getName());
- assertEquals(1, broker.getTransportServers().size());
+ assertEquals(1, broker.transportServers().size());
ArrayList<String> expected = new ArrayList<String>();
expected.add("pipe://test1");
expected.add("tcp://127.0.0.1:61616");
- assertEquals(expected, broker.getConnectUris());
+ assertEquals(expected, broker.connectUris() );
- assertEquals(2, broker.getVirtualHosts().size());
+ assertEquals(2, broker.virtualHosts().size());
- assertNotNull(broker.getDefaultVirtualHost().getDatabase());
- assertNotNull(broker.getDefaultVirtualHost().getDatabase().getStore());
- assertTrue((broker.getDefaultVirtualHost().getDatabase().getStore() instanceof MemoryStore));
+ assertNotNull(broker.defaultVirtualHost().getDatabase());
+ assertNotNull(broker.defaultVirtualHost().getDatabase().getStore());
+ assertTrue((broker.defaultVirtualHost().getDatabase().getStore() instanceof MemoryStore));
}
@Test()
public void testUris() throws Exception {
- boolean failed = false;
- // non-existent classpath
+
+ // non-existent classpath
try {
- URI uri = new URI("jaxb:classpath:org/apache/activemq/apollo/jaxb/testUris-fail.xml");
- BrokerFactory.createBroker(uri);
- } catch (IOException e) {
- failed = true;
+ String uri = "jaxb:classpath:org/apache/activemq/apollo/jaxb/testUris-fail.xml";
+ BrokerFactory.createBroker(uri, false);
+ fail("Creating broker from non-existing url does not throw an exception!");
+ } catch (RuntimeException e) {
}
- if (!failed) {
- fail("Creating broker from non-existing url does not throw an exception!");
- }
- failed = false;
- //non-existent file
+
+ //non-existent file
try {
- URI uri = new URI("jaxb:file:/org/apache/activemq/apollo/jaxb/testUris-fail.xml");
- BrokerFactory.createBroker(uri);
- } catch (IOException e) {
- failed = true;
- }
- if (!failed) {
- fail("Creating broker from non-existing url does not throw an exception!");
+ String uri ="jaxb:file:/org/apache/activemq/apollo/jaxb/testUris-fail.xml";
+ BrokerFactory.createBroker(uri, false);
+ fail("Creating broker from non-existing url does not throw an exception!");
+ } catch (RuntimeException e) {
}
- //non-existent url
+
+ //non-existent url
try {
- URI uri = new URI("jaxb:http://localhost/testUris.xml");
- BrokerFactory.createBroker(uri);
- } catch (IOException e) {
- failed = true;
+ String uri = "jaxb:http://localhost/testUris.xml";
+ BrokerFactory.createBroker(uri, false);
+ fail("Creating broker from non-existing url does not throw an exception!");
+ } catch (RuntimeException e) {
}
- if (!failed) {
- fail("Creating broker from non-existing url does not throw an exception!");
- }
+
// regular file
- URI uri = new URI("jaxb:" + Thread.currentThread().getContextClassLoader().getResource("org/apache/activemq/apollo/jaxb/testUris.xml"));
- BrokerFactory.createBroker(uri);
+ String uri = "jaxb:" + Thread.currentThread().getContextClassLoader().getResource("org/apache/activemq/apollo/jaxb/testUris.xml");
+ BrokerFactory.createBroker(uri, false);
}
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java?rev=961068&r1=961067&r2=961068&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java Wed Jul 7 03:40:18 2010
@@ -33,7 +33,7 @@ import org.apache.activemq.util.URISuppo
*/
public class PipeTransportFactory implements TransportFactory.TransportFactorySPI {
- static protected final HashMap<String, PipeTransportServer> servers = new HashMap<String, PipeTransportServer>();
+ public static final HashMap<String, PipeTransportServer> servers = new HashMap<String, PipeTransportServer>();
public TransportServer bind(URI uri) throws URISyntaxException, IOException {