You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 05:40:20 UTC

svn commit: r961068 [4/4] - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/ activemq-broker/src/main/java/org/apache/activemq/apollo/ activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ activemq-broker/src/main/java/org/apache...

Copied: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/SimplePathFilter.java (from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/SimplePathFilter.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/SimplePathFilter.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/SimplePathFilter.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/SimplePathFilter.java&r1=961067&r2=961068&rev=961068&view=diff
==============================================================================
    (empty)

Copied: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/WildcardPathFilter.java (from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/WildcardPathFilter.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/WildcardPathFilter.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/WildcardPathFilter.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/WildcardPathFilter.java&r1=961067&r2=961068&rev=961068&view=diff
==============================================================================
    (empty)

Added: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala?rev=961068&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/VMTransport.scala Wed Jul  7 03:40:18 2010
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.transport.vm
+
+import _root_.java.io.IOException
+import _root_.java.net.URI
+import _root_.java.util.concurrent.atomic.AtomicBoolean
+import _root_.java.util.concurrent.atomic.AtomicInteger
+
+import _root_.org.apache.activemq.apollo.broker._
+import _root_.org.apache.activemq.transport.Transport
+import _root_.org.apache.activemq.transport.TransportFactory
+import _root_.org.apache.activemq.transport.TransportServer
+import _root_.org.apache.activemq.transport.pipe.PipeTransport
+import _root_.org.apache.activemq.transport.pipe.PipeTransportFactory
+import _root_.org.apache.activemq.transport.pipe.PipeTransportServer
+import _root_.org.apache.activemq.util.IOExceptionSupport
+import _root_.org.apache.activemq.util.URISupport
+import _root_.org.apache.activemq.transport.TransportFactorySupport.configure
+import _root_.org.apache.activemq.transport.TransportFactorySupport.verify
+
+import _root_.scala.collection.JavaConversions._
+
+object VMTransportFactory extends Log {
+  val DEFAULT_PIPE_NAME = BrokerConstants.DEFAULT_VIRTUAL_HOST_NAME.toString();
+}
+
+/**
+ * Implements the vm transport which behaves like the pipe transport except that
+ * it can start embedded brokers up on demand.
+ *
+ * @author chirino
+ *
+ */
+class VMTransportFactory extends PipeTransportFactory with Logging {
+
+  import PipeTransportFactory._
+  import VMTransportFactory._
+  override protected def log = VMTransportFactory
+
+	/**
+	 * This extension of the PipeTransportServer shuts down the broker
+	 * when all the connections are disconnected.
+	 *
+	 * @author chirino
+	 */
+	class VmTransportServer extends PipeTransportServer {
+
+		val refs = new AtomicInteger()
+		var broker:Broker = null
+
+		override def createClientTransport():PipeTransport = {
+			refs.incrementAndGet();
+			new PipeTransport(this) {
+
+				val stopped = new AtomicBoolean()
+
+				override def stop() = {
+					if( stopped.compareAndSet(false, true) ) {
+						super.stop();
+						if( refs.decrementAndGet() == 0 ) {
+							stopBroker();
+						}
+					}
+				}
+			};
+		}
+
+		def setBroker(broker:Broker) = {
+			this.broker = broker;
+		}
+
+		def stopBroker() = {
+			try {
+				this.broker.stop();
+				unbind(this);
+			} catch {
+        case e:Exception=>
+				error("Failed to stop the broker gracefully: "+e);
+				debug("Failed to stop the broker gracefully: ", e);
+			}
+		}
+	}
+
+  override def bind(uri:URI):TransportServer = {
+    new VmTransportServer();
+  }
+
+  override def connect(location:URI):Transport = {
+		try {
+
+			var brokerURI:String = null;
+			var create = true;
+			var name = location.getHost();
+			if (name == null) {
+				name = DEFAULT_PIPE_NAME;
+			}
+
+			var options = URISupport.parseParamters(location);
+			var config = options.remove("broker").asInstanceOf[String]
+			if (config != null) {
+				brokerURI = config;
+			}
+			if ("false".equals(options.remove("create"))) {
+				create = false;
+			}
+
+
+			var server = servers.get(name);
+			if (server == null && create) {
+
+				// Create the broker on demand.
+				var broker = if( brokerURI == null ) {
+					new Broker()
+				} else {
+					BrokerFactory.createBroker(brokerURI);
+				}
+
+				// Remove the existing pipe severs if the broker is configured with one...  we want to make sure it
+				// uses the one we explicitly configure here.
+				for (s <- broker.transportServers ) {
+					if (s.isInstanceOf[PipeTransportServer] && name == s.asInstanceOf[PipeTransportServer].getName()) {
+						broker.transportServers.remove(s);
+					}
+				}
+
+				// We want to use a vm transport server impl.
+				var vmTransportServer = TransportFactory.bind(new URI("vm://" + name+"?wireFormat=null")).asInstanceOf[VmTransportServer]
+				vmTransportServer.setBroker(broker);
+				broker.transportServers.add(vmTransportServer);
+				broker.start();
+
+				server = servers.get(name);
+			}
+
+			if (server == null) {
+				throw new IOException("Server is not bound: " + name);
+			}
+
+      var transport = server.connect();
+      verify( configure(transport, options), options);
+
+		} catch {
+//      case e:URISyntaxException=>
+//  			throw IOExceptionSupport.create(e);
+      case e:Exception=>
+  			throw IOExceptionSupport.create(e);
+		}
+	}
+
+}

Copied: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/package.html (from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/transport/vm/package.html)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/package.html?p2=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/package.html&p1=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/transport/vm/package.html&r1=961067&r2=961068&rev=961068&view=diff
==============================================================================
    (empty)

Copied: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/path/PathMapMemoryTest.java (from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/apollo/broker/path/PathMapMemoryTest.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/path/PathMapMemoryTest.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/path/PathMapMemoryTest.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/apollo/broker/path/PathMapMemoryTest.java&r1=961067&r2=961068&rev=961068&view=diff
==============================================================================
    (empty)

Copied: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/path/PathMapTest.java (from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/apollo/broker/path/PathMapTest.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/path/PathMapTest.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/path/PathMapTest.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/apollo/broker/path/PathMapTest.java&r1=961067&r2=961068&rev=961068&view=diff
==============================================================================
    (empty)

Copied: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/transport/vm/VMTransportTest.java (from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/apollo/transport/vm/VMTransportTest.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/transport/vm/VMTransportTest.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/transport/vm/VMTransportTest.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/apollo/transport/vm/VMTransportTest.java&r1=961067&r2=961068&rev=961068&view=diff
==============================================================================
    (empty)

Added: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/broker/BrokerTest.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/broker/BrokerTest.scala?rev=961068&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/broker/BrokerTest.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/broker/BrokerTest.scala Wed Jul  7 03:40:18 2010
@@ -0,0 +1,698 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker
+
+import _root_.java.beans.ExceptionListener
+import _root_.java.io.{File}
+import _root_.java.net.URI
+import _root_.java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
+import _root_.java.util.{LinkedHashMap, ArrayList, HashMap}
+import _root_.org.apache.activemq.apollo.broker._
+import _root_.org.apache.activemq.broker.store.{StoreFactory, Store}
+import _root_.org.apache.activemq.metric.{Period, MetricAggregator, MetricCounter}
+import _root_.java.lang.{String}
+import _root_.org.apache.activemq.util.buffer.{AsciiBuffer}
+import _root_.org.junit.{Test, Before}
+
+import org.apache.activemq.transport.TransportFactory
+
+import _root_.scala.collection.JavaConversions._
+
+
+abstract class RemoteConsumer extends Connection {
+
+    val consumerRate = new MetricCounter();
+    var totalConsumerRate :  MetricAggregator = null
+    var thinkTime:Long = 0
+    var destination:Destination = null
+    var selector:String = null;
+    var durable = false;
+    var uri:URI = null
+
+    private var schedualWait = false;
+
+    override def start() = {
+        consumerRate.name("Consumer " + name + " Rate");
+        totalConsumerRate.add(consumerRate);
+        transport = TransportFactory.connect(uri);
+        schedualWait = true;
+//        initialize();
+        super.start();
+        setupSubscription();
+    }
+
+
+    protected def setupSubscription()
+
+    protected def messageReceived( elem:MessageDelivery ) {
+//        TODO:
+//        if( schedualWait ) {
+//            if (thinkTime > 0) {
+//                dispatchQueue.dispatchAfter(thinkTime, TimeUnit.MILLISECONDS, new Runnable(){
+//                    public void run() {
+//                        consumerRate.increment();
+//                        controller.elementDispatched(elem);
+//                    }
+//                });
+//
+//            }
+//            else
+//            {
+//                consumerRate.increment();
+//                controller.elementDispatched(elem);
+//            }
+//
+//        } else {
+//            if( thinkTime>0 ) {
+//                try {
+//                    Thread.sleep(thinkTime);
+//                } catch (InterruptedException e) {
+//                }
+//            }
+//            consumerRate.increment();
+//            controller.elementDispatched(elem);
+//        }
+    }
+
+}
+
+
+abstract class RemoteProducer extends Connection {
+
+    val rate = new MetricCounter();
+
+    var messageIdGenerator:AtomicLong = null
+    var priority = 0
+    var persistentDelivery = false
+    var priorityMod = 0
+    var counter = 0
+    var producerId = 0
+    var destination:Destination =null
+    var property:String = null
+    var totalProducerRate:MetricAggregator = null
+    var next:MessageDelivery = null
+
+    var filler:String = null
+    var payloadSize = 20
+    var uri:URI = null
+
+//    TODO:
+//    protected IFlowController<MessageDelivery> outboundController;
+//    protected IFlowSink<MessageDelivery> outboundQueue;
+
+
+    override def start() = {
+
+        if( payloadSize>0 ) {
+            var sb = new StringBuilder(payloadSize);
+            for( i <- 0 until payloadSize) {
+                sb.append(('a'+(i%26)).toChar);
+            }
+            filler = sb.toString();
+        }
+
+        rate.name("Producer " + name + " Rate");
+        totalProducerRate.add(rate);
+
+
+        transport = TransportFactory.connect(uri);
+//        initialize();
+        super.start();
+
+        setupProducer();
+
+    }
+
+    def dispatch() = {
+//       TODO:
+//        while(true)
+//        {
+//
+//            if(next == null)
+//            {
+//                createNextMessage();
+//            }
+//
+//            //If flow controlled stop until flow control is lifted.
+//            if(outboundController.isSinkBlocked())
+//            {
+//                if(outboundController.addUnblockListener(this))
+//                {
+//                    return;
+//                }
+//            }
+//
+//            outboundQueue.add(next, null);
+//            rate.increment();
+//            next = null;
+//        }
+    }
+
+    def setupProducer()
+
+    def createNextMessage()
+
+//	public void onFlowUnblocked(ISinkController<MessageDelivery> controller) {
+//        dispatchQueue.dispatchAsync(dispatchTask);
+//	}
+
+    def createPayload():String = {
+        if( payloadSize>=0 ) {
+            var sb = new StringBuilder(payloadSize);
+            sb.append(name);
+            sb.append(':');
+            counter += 1
+            sb.append(counter);
+            sb.append(':');
+            var length = sb.length;
+            if( length <= payloadSize ) {
+                sb.append(filler.subSequence(0, payloadSize-length));
+                return sb.toString();
+            } else {
+               return sb.substring(0, payloadSize);
+            }
+        } else {
+            counter += 1
+            return name+":"+(counter);
+        }
+    }
+
+}
+
+object BrokerTestBase {
+
+  var PERFORMANCE_SAMPLES = Integer.parseInt(System.getProperty("PERFORMANCE_SAMPLES", "3"))
+  var IO_WORK_AMOUNT = 0
+  var FANIN_COUNT = 10
+  var FANOUT_COUNT = 10
+  var PRIORITY_LEVELS = 10
+  var USE_INPUT_QUEUES = true
+
+  var USE_KAHA_DB = true;
+  var PURGE_STORE = true;
+  var PERSISTENT = false;
+  var DURABLE = false;
+
+}
+abstract class BrokerTestBase {
+  import BrokerTestBase._
+
+  // Set to put senders and consumers on separate brokers.
+  protected var multibroker = false;
+
+  // Set to mockup up ptp:
+  protected var ptp = false;
+
+  // Set to use tcp IO
+  protected var tcp = true;
+  // set to force marshalling even in the NON tcp case.
+  protected var forceMarshalling = true;
+
+  protected var sendBrokerBindURI:String=null
+  protected var receiveBrokerBindURI:String=null
+  protected var sendBrokerConnectURI:String=null
+  protected var receiveBrokerConnectURI:String=null
+
+  protected var producerCount=0
+  protected var consumerCount=0
+  protected var destCount=0
+
+  protected val totalProducerRate = new MetricAggregator().name("Aggregate Producer Rate").unit("items")
+  protected val totalConsumerRate = new MetricAggregator().name("Aggregate Consumer Rate").unit("items")
+
+  protected var sendBroker:Broker=null
+  protected var rcvBroker:Broker=null
+  protected val brokers = new ArrayList[Broker]()
+  protected val msgIdGenerator = new AtomicLong()
+  protected val stopping = new AtomicBoolean()
+
+  val producers = new ArrayList[RemoteProducer]()
+  val consumers = new ArrayList[RemoteConsumer]()
+  var name:String=null;
+
+  @Before
+  def setUp() = {
+    if (tcp) {
+        sendBrokerBindURI = "tcp://localhost:10000?wireFormat=" + getBrokerWireFormat();
+        receiveBrokerBindURI = "tcp://localhost:20000?wireFormat=" + getBrokerWireFormat();
+
+        sendBrokerConnectURI = "tcp://localhost:10000?wireFormat=" + getRemoteWireFormat();
+        receiveBrokerConnectURI = "tcp://localhost:20000?wireFormat=" + getRemoteWireFormat();
+    } else {
+        sendBrokerConnectURI = "pipe://SendBroker";
+        receiveBrokerConnectURI = "pipe://ReceiveBroker";
+        if (forceMarshalling) {
+            sendBrokerBindURI = sendBrokerConnectURI + "?wireFormat=" + getBrokerWireFormat();
+            receiveBrokerBindURI = receiveBrokerConnectURI + "?wireFormat=" + getBrokerWireFormat();
+        } else {
+            sendBrokerBindURI = sendBrokerConnectURI;
+            receiveBrokerBindURI = receiveBrokerConnectURI;
+        }
+    }
+  }
+
+  def setName(name:String) = {
+      if( this.name==null ) {
+          this.name = name;
+      }
+  }
+
+  def getName() = name
+
+  def getBrokerWireFormat() = "multi"
+
+  def getRemoteWireFormat():String
+
+  @Test
+  def benchmark_1_1_0():Unit = {
+      setName("1 producer -> 1 destination -> 0 consumers");
+      if (ptp) {
+          return;
+      }
+      producerCount = 1;
+      destCount = 1;
+
+      createConnections();
+
+      // Start 'em up.
+      startClients();
+      try {
+          reportRates();
+      } finally {
+          stopServices();
+      }
+  }
+
+  @Test
+  def benchmark_1_1_1() = {
+      setName("1 producer -> 1 destination -> 1 consumers");
+      producerCount = 1;
+      destCount = 1;
+      consumerCount = 1;
+
+      createConnections();
+
+      // Start 'em up.
+      startClients();
+      try {
+          reportRates();
+      } finally {
+          stopServices();
+      }
+  }
+
+  @Test
+  def benchmark_10_1_10() = {
+      setName(format("%d producers -> 1 destination -> %d consumers", FANIN_COUNT, FANOUT_COUNT));
+      producerCount = FANIN_COUNT;
+      consumerCount = FANOUT_COUNT;
+      destCount = 1;
+
+      createConnections();
+
+      // Start 'em up.
+      startClients();
+      try {
+          reportRates();
+      } finally {
+          stopServices();
+      }
+  }
+
+  @Test
+  def benchmark_10_1_1() = {
+      setName(format("%d producers -> 1 destination -> 1 consumer", FANIN_COUNT));
+      producerCount = FANIN_COUNT;
+      destCount = 1;
+      consumerCount = 1;
+
+      createConnections();
+
+      // Start 'em up.
+      startClients();
+      try {
+          reportRates();
+      } finally {
+          stopServices();
+      }
+  }
+
+  @Test
+  def benchmark_1_1_10() = {
+      setName(format("1 producer -> 1 destination -> %d consumers", FANOUT_COUNT));
+      producerCount = 1;
+      destCount = 1;
+      consumerCount = FANOUT_COUNT;
+
+      createConnections();
+
+      // Start 'em up.
+      startClients();
+      try {
+          reportRates();
+      } finally {
+          stopServices();
+      }
+  }
+
+  @Test
+  def benchmark_2_2_2() = {
+      setName(format("2 producer -> 2 destination -> 2 consumers"));
+      producerCount = 2;
+      destCount = 2;
+      consumerCount = 2;
+
+      createConnections();
+
+      // Start 'em up.
+      startClients();
+      try {
+          reportRates();
+      } finally {
+          stopServices();
+      }
+  }
+
+  @Test
+  def benchmark_10_10_10() = {
+      setName(format("10 producers -> 10 destinations -> 10 consumers"));
+      producerCount = 10;
+      destCount = 10;
+      consumerCount = 10;
+
+      createConnections();
+
+      // Start 'em up.
+      startClients();
+      try {
+          reportRates();
+      } finally {
+          stopServices();
+      }
+  }
+
+  /**
+   * Tests 2 producers sending to 1 destination with 2 consumres, but with
+   * consumers set to select only messages from each producer. 1 consumers is
+   * set to slow, the other producer should be able to send quickly.
+   *
+   * @throws Exception
+   */
+  @Test
+  def benchmark_2_2_2_SlowConsumer() = {
+      setName(format("2 producer -> 2 destination -> 2 slow consumers"));
+      producerCount = 2;
+      destCount = 2;
+      consumerCount = 2;
+
+      createConnections();
+      consumers.get(0).thinkTime = 50;
+
+      // Start 'em up.
+      startClients();
+      try {
+          reportRates();
+      } finally {
+          stopServices();
+      }
+  }
+
+  @Test
+  def benchmark_2_2_2_Selector()= {
+      setName(format("2 producer -> 2 destination -> 2 selector consumers"));
+      producerCount = 2;
+      destCount = 2;
+      consumerCount = 2;
+
+      createConnections();
+
+      // Add properties to match producers to their consumers
+      for (i <- 0 until consumerCount) {
+          var property = "match" + i;
+          consumers.get(i).selector = property;
+          producers.get(i).property = property;
+      }
+
+      // Start 'em up.
+      startClients();
+      try {
+          reportRates();
+      } finally {
+          stopServices();
+      }
+  }
+
+  /**
+   * Test sending with 1 high priority sender. The high priority sender should
+   * have higher throughput than the other low priority senders.
+   *
+   * @throws Exception
+   */
+  @Test
+  def benchmark_2_1_1_HighPriorityProducer() = {
+
+      setName(format("1 high and 1 normal priority producer -> 1 destination -> 1 consumer"));
+      producerCount = 2;
+      destCount = 1;
+      consumerCount = 1;
+
+      createConnections();
+      var producer = producers.get(0);
+      producer.priority = 1
+      producer.rate.setName("High Priority Producer Rate");
+
+      consumers.get(0).thinkTime = 1;
+
+      // Start 'em up.
+      startClients();
+      try {
+
+          System.out.println("Checking rates for test: " + getName());
+          for (i <- 0 until PERFORMANCE_SAMPLES) {
+              var p = new Period();
+              Thread.sleep(1000 * 5);
+              System.out.println(producer.rate.getRateSummary(p));
+              System.out.println(totalProducerRate.getRateSummary(p));
+              System.out.println(totalConsumerRate.getRateSummary(p));
+              totalProducerRate.reset();
+              totalConsumerRate.reset();
+          }
+
+      } finally {
+          stopServices();
+      }
+  }
+
+  /**
+   * Test sending with 1 high priority sender. The high priority sender should
+   * have higher throughput than the other low priority senders.
+   *
+   * @throws Exception
+   */
+  @Test
+  def benchmark_2_1_1_MixedHighPriorityProducer() = {
+    
+      setName(format("1 high/mixed and 1 normal priority producer -> 1 destination -> 1 consumer"));
+      producerCount = 2;
+      destCount = 1;
+      consumerCount = 1;
+
+      createConnections();
+      var producer = producers.get(0);
+      producer.priority = 1;
+      producer.priorityMod = 3;
+      producer.rate.setName("High Priority Producer Rate");
+
+      consumers.get(0).thinkTime = 1
+
+      // Start 'em up.
+      startClients();
+      try {
+
+          System.out.println("Checking rates for test: " + getName());
+          for (i <- 0 until PERFORMANCE_SAMPLES) {
+              var p = new Period();
+              Thread.sleep(1000 * 5);
+              System.out.println(producer.rate.getRateSummary(p));
+              System.out.println(totalProducerRate.getRateSummary(p));
+              System.out.println(totalConsumerRate.getRateSummary(p));
+              totalProducerRate.reset();
+              totalConsumerRate.reset();
+          }
+
+      } finally {
+          stopServices();
+      }
+  }
+
+  def reportRates() = {
+      System.out.println("Checking rates for test: " + getName() + ", " +(if(ptp){"ptp"}else{"topic"}) );
+      for (i <- 0 until PERFORMANCE_SAMPLES) {
+          var p = new Period();
+          Thread.sleep(1000 * 5);
+          System.out.println(totalProducerRate.getRateSummary(p));
+          System.out.println(totalConsumerRate.getRateSummary(p));
+          totalProducerRate.reset();
+          totalConsumerRate.reset();
+      }
+  }
+
+  def createConnections() = {
+
+      if (multibroker) {
+          sendBroker = createBroker("SendBroker", sendBrokerBindURI, sendBrokerConnectURI);
+          rcvBroker = createBroker("RcvBroker", receiveBrokerBindURI, receiveBrokerConnectURI);
+          brokers.add(sendBroker);
+          brokers.add(rcvBroker);
+      } else {
+          sendBroker = createBroker("Broker", sendBrokerBindURI, sendBrokerConnectURI);
+          rcvBroker = sendBroker
+          brokers.add(sendBroker);
+      }
+
+      startBrokers();
+
+      var dests = new Array[Destination](destCount);
+
+      for (i <-0 until destCount) {
+        val domain = if (ptp) { Domain.QUEUE_DOMAIN} else {Domain.TOPIC_DOMAIN}
+        val name = new AsciiBuffer("dest" + (i + 1))
+        var bean = new SingleDestination(domain, name)
+        dests(i) = bean;
+        if (ptp) {
+            sendBroker.defaultVirtualHost.createQueue(dests(i));
+            if (multibroker) {
+                rcvBroker.defaultVirtualHost.createQueue(dests(i));
+            }
+        }
+      }
+
+      for (i <- 0 until producerCount) {
+          var destination = dests(i % destCount);
+          var producer = createProducer(i, destination);
+          producer.persistentDelivery = PERSISTENT;
+          producers.add(producer);
+      }
+
+      for (i <- 0 until consumerCount) {
+          var destination = dests(i % destCount);
+          var consumer = createConsumer(i, destination);
+          consumer.durable = DURABLE;
+          consumers.add(consumer);
+      }
+
+      // Create MultiBroker connections:
+      // if (multibroker) {
+      // Pipe<Message> pipe = new Pipe<Message>();
+      // sendBroker.createBrokerConnection(rcvBroker, pipe);
+      // rcvBroker.createBrokerConnection(sendBroker, pipe.connect());
+      // }
+  }
+
+  def createConsumer(i:Int, destination:Destination):RemoteConsumer = {
+
+      var consumer = createConsumer();
+      consumer.exceptionListener= new ExceptionListener() {
+          def exceptionThrown(error:Exception ) = {
+              if (!stopping.get()) {
+                  System.err.println("Consumer Async Error:");
+                  error.printStackTrace();
+              }
+          }
+      }
+
+      consumer.uri = new URI(rcvBroker.connectUris.head)
+      consumer.destination = destination
+      consumer.name = "consumer" + (i + 1)
+      consumer.totalConsumerRate = totalConsumerRate
+      return consumer;
+  }
+
+  protected def createConsumer():RemoteConsumer
+
+  private def createProducer(id:Int, destination:Destination):RemoteProducer = {
+      var producer = createProducer();
+      producer.exceptionListener = new ExceptionListener() {
+          def exceptionThrown(error:Exception ) = {
+              if (!stopping.get()) {
+                  System.err.println("Producer Async Error:");
+                  error.printStackTrace();
+              }
+          }
+      }
+      producer.uri = new URI(sendBroker.connectUris.head)
+      producer.producerId = id + 1
+      producer.name = "producer" + (id + 1)
+      producer.destination = destination
+      producer.messageIdGenerator = msgIdGenerator
+      producer.totalProducerRate = totalProducerRate
+      producer
+  }
+
+  protected def createProducer():RemoteProducer
+
+  private def createBroker(name:String , bindURI:String , connectUri:String ):Broker = {
+    val broker = new Broker()
+    broker.transportServers.add(TransportFactory.bind(new URI(bindURI)))
+    broker.connectUris.add(connectUri)
+//     TODO:
+//    broker.defaultVirtualHost.setStore(createStore(broker))
+    broker
+  }
+
+  protected def createStore(broker:Broker):Store = {
+      val store = if (USE_KAHA_DB) {
+          StoreFactory.createStore("kaha-db");
+      } else {
+          StoreFactory.createStore("memory");
+      }
+      store.setStoreDirectory(new File("target/test-data/broker-test/" + broker.name));
+      store.setDeleteAllMessages(PURGE_STORE);
+      store
+  }
+
+  private def stopServices() = {
+      stopping.set(true);
+      for (broker <- brokers) {
+          broker.stop();
+      }
+      for (connection <- producers) {
+          connection.stop();
+      }
+      for (connection <- consumers) {
+          connection.stop();
+      }
+  }
+
+  private def startBrokers() = {
+      for (broker <- brokers) {
+          broker.start();
+      }
+  }
+
+  private def startClients() = {
+
+      for (connection <- consumers) {
+          connection.start();
+      }
+
+      for (connection <- producers) {
+          connection.start();
+      }
+  }
+
+}

Copied: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/broker/BrokerTestBase.java (from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Domain.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/broker/BrokerTestBase.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/broker/BrokerTestBase.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Domain.java&r1=961067&r2=961068&rev=961068&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Domain.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/broker/BrokerTestBase.java Wed Jul  7 03:40:18 2010
@@ -14,27 +14,28 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.apollo.broker;
+package org.apache.activemq.broker;
 
-import java.util.Collection;
-
-import org.apache.activemq.apollo.broker.path.PathMap;
+import java.beans.ExceptionListener;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.activemq.apollo.broker.Broker;
+import org.apache.activemq.apollo.broker.Destination;
+import org.apache.activemq.apollo.broker.Router;
+import org.apache.activemq.broker.store.Store;
+import org.apache.activemq.broker.store.StoreFactory;
+import org.apache.activemq.metric.MetricAggregator;
+import org.apache.activemq.metric.Period;
+import org.apache.activemq.transport.TransportFactory;
 import org.apache.activemq.util.buffer.AsciiBuffer;
+import org.junit.Before;
+import org.junit.Test;
 
-public class Domain {
-
-    private final PathMap<DeliveryTarget> targets = new PathMap<DeliveryTarget>();
-
-    synchronized public void bind(AsciiBuffer name, DeliveryTarget queue) {
-        targets.put(name, queue);
-    }
-    
-    synchronized public void unbind(AsciiBuffer name, DeliveryTarget queue) {
-        targets.remove(name, queue);
-    }
-
-    synchronized public Collection<DeliveryTarget> route(AsciiBuffer name, MessageDelivery delivery) {
-        return targets.get(name);
-    }
+import static java.lang.String.*;
 
-}

Copied: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/broker/SharedQueueTest.java (from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/broker/SharedQueueTest.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/broker/SharedQueueTest.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java&r1=961067&r2=961068&rev=961068&view=diff
==============================================================================
    (empty)

Modified: activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/JAXBBrokerFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/JAXBBrokerFactory.java?rev=961068&r1=961067&r2=961068&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/JAXBBrokerFactory.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/JAXBBrokerFactory.java Wed Jul  7 03:40:18 2010
@@ -32,31 +32,32 @@ import org.apache.activemq.util.URISuppo
 
 public class JAXBBrokerFactory implements BrokerFactory.Handler {
 
-	public Broker createBroker(URI brokerURI) throws Exception {
-		JAXBContext context = JAXBContext.newInstance("org.apache.activemq.apollo.jaxb");
-		Unmarshaller unmarshaller = context.createUnmarshaller();
+	public Broker createBroker(String value) {
+        try {
+            URI brokerURI = new URI(value);
+            JAXBContext context = JAXBContext.newInstance("org.apache.activemq.apollo.jaxb");
+            Unmarshaller unmarshaller = context.createUnmarshaller();
 
-		URL configURL;
-		brokerURI = URISupport.stripScheme(brokerURI);
-		String scheme = brokerURI.getScheme();
-		if( scheme==null || "file".equals(scheme) ) {
-			configURL = URISupport.changeScheme(URISupport.stripScheme(brokerURI), "file").toURL();
-		} else if( "classpath".equals(scheme) ) {
-			configURL = Thread.currentThread().getContextClassLoader().getResource(brokerURI.getSchemeSpecificPart());
-		} else {
-			configURL = URISupport.changeScheme(brokerURI, scheme).toURL();
-		}
-		if (configURL == null) {
-			throw new IOException("Cannot create broker from non-existent URI: " + brokerURI);
-		}
-		XMLInputFactory factory = XMLInputFactory.newInstance();
-		XMLStreamReader reader = factory.createXMLStreamReader(configURL.openStream());
-		XMLStreamReader properties = new PropertiesReader(reader);
-		try {
+            URL configURL;
+            brokerURI = URISupport.stripScheme(brokerURI);
+            String scheme = brokerURI.getScheme();
+            if( scheme==null || "file".equals(scheme) ) {
+                configURL = URISupport.changeScheme(URISupport.stripScheme(brokerURI), "file").toURL();
+            } else if( "classpath".equals(scheme) ) {
+                configURL = Thread.currentThread().getContextClassLoader().getResource(brokerURI.getSchemeSpecificPart());
+            } else {
+                configURL = URISupport.changeScheme(brokerURI, scheme).toURL();
+            }
+            if (configURL == null) {
+                throw new IOException("Cannot create broker from non-existent URI: " + brokerURI);
+            }
+            XMLInputFactory factory = XMLInputFactory.newInstance();
+            XMLStreamReader reader = factory.createXMLStreamReader(configURL.openStream());
+            XMLStreamReader properties = new PropertiesReader(reader);
 			BrokerXml xml = (BrokerXml) unmarshaller.unmarshal(properties);
 			return xml.createMessageBroker();
-		} catch (UnmarshalException e) {
-			throw new IOException("Cannot create broker from URI: " + brokerURI + ", reason: " + e.getCause());
+		} catch (Exception e) {
+			throw new RuntimeException("Cannot create broker from URI: " + value, e);
 		}	
 	}
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/MemoryStoreXml.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/MemoryStoreXml.java?rev=961068&r1=961067&r2=961068&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/MemoryStoreXml.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/MemoryStoreXml.java Wed Jul  7 03:40:18 2010
@@ -28,8 +28,7 @@ import org.apache.activemq.broker.store.
 public class MemoryStoreXml extends StoreXml {
 
 	public Store createStore() {
-		MemoryStore rc = new MemoryStore();
-		return rc;
+		return new MemoryStore();
 	}
 
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/VirtualHostXml.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/VirtualHostXml.java?rev=961068&r1=961067&r2=961068&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/VirtualHostXml.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/VirtualHostXml.java Wed Jul  7 03:40:18 2010
@@ -27,6 +27,7 @@ import javax.xml.bind.annotation.XmlTran
 import javax.xml.bind.annotation.adapters.XmlAdapter;
 import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
 
+import org.apache.activemq.apollo.broker.BrokerDatabase;
 import org.apache.activemq.apollo.broker.VirtualHost;
 import org.apache.activemq.util.buffer.AsciiBuffer;
 
@@ -34,19 +35,20 @@ import org.apache.activemq.util.buffer.A
 @XmlAccessorType(XmlAccessType.FIELD)
 public class VirtualHostXml {
 	
-    @XmlJavaTypeAdapter(AsciiBufferAdapter.class)
     @XmlElement(name="host-name", required=true)
-    private ArrayList<AsciiBuffer> hostNames = new ArrayList<AsciiBuffer>();
+    private ArrayList<String> hostNames = new ArrayList<String>();
 
     @XmlElementRef   
     private StoreXml store;
     
 	public VirtualHost createVirtualHost(BrokerXml brokerXml) throws Exception {
 		VirtualHost rc = new VirtualHost();
-		rc.setHostNames(hostNames);
-		
+		rc.setNamesArray(hostNames);
 		if( store != null ) {
-			rc.setStore(store.createStore());
+            BrokerDatabase database = new BrokerDatabase();
+            database.setVirtualHost(rc);
+            database.setStore(store.createStore());
+            rc.setDatabase(database);
 		}
 		return rc;
 	}

Modified: activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/test/java/org/apache/activemq/apollo/jaxb/JAXBConfigTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/test/java/org/apache/activemq/apollo/jaxb/JAXBConfigTest.java?rev=961068&r1=961067&r2=961068&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/test/java/org/apache/activemq/apollo/jaxb/JAXBConfigTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/test/java/org/apache/activemq/apollo/jaxb/JAXBConfigTest.java Wed Jul  7 03:40:18 2010
@@ -39,66 +39,59 @@ public class JAXBConfigTest extends Test
 	
 	@Test()
 	public void testSimpleConfig() throws Exception {
-		URI uri = new URI("jaxb:classpath:org/apache/activemq/apollo/jaxb/testSimpleConfig.xml");
+		String uri = "jaxb:classpath:org/apache/activemq/apollo/jaxb/testSimpleConfig.xml";
 		LOG.info("Loading broker configuration from the classpath with URI: " + uri);
-		Broker broker = BrokerFactory.createBroker(uri);
+		Broker broker = BrokerFactory.createBroker(uri, false);
 		
 //		assertEquals(4, p.getSize());
 //		assertEquals("test dispatcher", p.getName());
 		
 		
-		assertEquals(1, broker.getTransportServers().size());
+		assertEquals(1, broker.transportServers().size());
 		
 		ArrayList<String> expected = new ArrayList<String>();
 		expected.add("pipe://test1");
 		expected.add("tcp://127.0.0.1:61616");
-		assertEquals(expected, broker.getConnectUris());
+		assertEquals(expected, broker.connectUris() );
 		
-		assertEquals(2, broker.getVirtualHosts().size());
+		assertEquals(2, broker.virtualHosts().size());
 		
-		assertNotNull(broker.getDefaultVirtualHost().getDatabase());
-		assertNotNull(broker.getDefaultVirtualHost().getDatabase().getStore());
-		assertTrue((broker.getDefaultVirtualHost().getDatabase().getStore() instanceof MemoryStore));
+		assertNotNull(broker.defaultVirtualHost().getDatabase());
+		assertNotNull(broker.defaultVirtualHost().getDatabase().getStore());
+		assertTrue((broker.defaultVirtualHost().getDatabase().getStore() instanceof MemoryStore));
 		
 	}
 	
 	@Test()
 	public void testUris() throws Exception {
-		boolean failed = false;
-		// non-existent classpath
+
+        // non-existent classpath
 		try {
-			URI uri = new URI("jaxb:classpath:org/apache/activemq/apollo/jaxb/testUris-fail.xml");
-			BrokerFactory.createBroker(uri);
-		} catch (IOException e) {
-			failed = true;
+			String uri = "jaxb:classpath:org/apache/activemq/apollo/jaxb/testUris-fail.xml";
+			BrokerFactory.createBroker(uri, false);
+            fail("Creating broker from non-existing url does not throw an exception!");
+		} catch (RuntimeException e) {
 		}
-		if (!failed) {
-			fail("Creating broker from non-existing url does not throw an exception!");
-		}
-		failed = false;
-		//non-existent file
+
+        //non-existent file
 		try {
-			URI uri = new URI("jaxb:file:/org/apache/activemq/apollo/jaxb/testUris-fail.xml");
-			BrokerFactory.createBroker(uri);
-		} catch (IOException e) {
-			failed = true;
-		}
-		if (!failed) {
-			fail("Creating broker from non-existing url does not throw an exception!");
+			String uri ="jaxb:file:/org/apache/activemq/apollo/jaxb/testUris-fail.xml";
+			BrokerFactory.createBroker(uri, false);
+            fail("Creating broker from non-existing url does not throw an exception!");
+		} catch (RuntimeException e) {
 		}
-		//non-existent url
+
+        //non-existent url
 		try {
-			URI uri = new URI("jaxb:http://localhost/testUris.xml");
-			BrokerFactory.createBroker(uri);
-		} catch (IOException e) {
-			failed = true;
+			String uri = "jaxb:http://localhost/testUris.xml";
+			BrokerFactory.createBroker(uri, false);
+            fail("Creating broker from non-existing url does not throw an exception!");
+		} catch (RuntimeException e) {
 		}
-		if (!failed) {
-			fail("Creating broker from non-existing url does not throw an exception!");
-		}		
+
 		// regular file
-		URI uri = new URI("jaxb:" + Thread.currentThread().getContextClassLoader().getResource("org/apache/activemq/apollo/jaxb/testUris.xml"));
-		BrokerFactory.createBroker(uri);
+		String uri = "jaxb:" + Thread.currentThread().getContextClassLoader().getResource("org/apache/activemq/apollo/jaxb/testUris.xml");
+		BrokerFactory.createBroker(uri, false);
 	}
 	
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java?rev=961068&r1=961067&r2=961068&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java Wed Jul  7 03:40:18 2010
@@ -33,7 +33,7 @@ import org.apache.activemq.util.URISuppo
  */
 public class PipeTransportFactory implements TransportFactory.TransportFactorySPI {
 
-    static protected final HashMap<String, PipeTransportServer> servers = new HashMap<String, PipeTransportServer>();
+    public static final HashMap<String, PipeTransportServer> servers = new HashMap<String, PipeTransportServer>();
 
     public TransportServer bind(URI uri) throws URISyntaxException, IOException {