You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 05:44:42 UTC

svn commit: r961075 - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/apollo/broker/ activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/apollo/broker/protocol/ a...

Author: chirino
Date: Wed Jul  7 03:44:41 2010
New Revision: 961075

URL: http://svn.apache.org/viewvc?rev=961075&view=rev
Log:
still working on the simple stomp case.

Added:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/apollo/broker/
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/apollo/broker/protocol/
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/apollo/broker/protocol/multi
      - copied, changed from r961074, activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/META-INF/services/org/apache/activemq/broker/protocol/stomp
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/META-INF/services/org/apache/activemq/apollo/
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/META-INF/services/org/apache/activemq/apollo/broker/
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/META-INF/services/org/apache/activemq/apollo/broker/protocol/
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/META-INF/services/org/apache/activemq/apollo/broker/protocol/stomp
      - copied, changed from r961074, activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/META-INF/services/org/apache/activemq/broker/protocol/stomp
Removed:
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/META-INF/services/org/apache/activemq/broker/protocol/stomp
Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Logging.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/transport/vm/VMTransportTest.java
    activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/BrokerXml.java
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala
    activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/DefaultTransportListener.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportListener.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java
    activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/java/org/apache/activemq/legacy/transport/failover/ReconnectTest.java
    activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/java/org/apache/activemq/legacy/transport/tcp/SslBrokerServiceTest.java
    activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/java/org/apache/activemq/legacy/transport/tcp/SslTransportServerTest.java
    activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/resources/org/apache/activemq/transport/stomp/sslstomp-auth-broker.xml
    activemq/sandbox/activemq-apollo-actor/sandbox/activemq-network/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
    activemq/sandbox/activemq-apollo-actor/sandbox/activemq-openwire/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java
    activemq/sandbox/activemq-apollo-actor/sandbox/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/StubConnection.java
    activemq/sandbox/activemq-apollo-actor/sandbox/activemq-openwire/src/test/java/org/apache/activemq/openwire/BrokerTestSupport.java
    activemq/sandbox/activemq-apollo-actor/sandbox/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockBroker.java

Copied: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/apollo/broker/protocol/multi (from r961074, activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/META-INF/services/org/apache/activemq/broker/protocol/stomp)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/apollo/broker/protocol/multi?p2=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/apollo/broker/protocol/multi&p1=activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/META-INF/services/org/apache/activemq/broker/protocol/stomp&r1=961074&r2=961075&rev=961075&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/META-INF/services/org/apache/activemq/broker/protocol/stomp (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/apollo/broker/protocol/multi Wed Jul  7 03:44:41 2010
@@ -1,17 +1,17 @@
-## ---------------------------------------------------------------------------
-## Licensed to the Apache Software Foundation (ASF) under one or more
-## contributor license agreements.  See the NOTICE file distributed with
-## this work for additional information regarding copyright ownership.
-## The ASF licenses this file to You under the Apache License, Version 2.0
-## (the "License"); you may not use this file except in compliance with
-## the License.  You may obtain a copy of the License at
-## 
-## http://www.apache.org/licenses/LICENSE-2.0
-## 
-## Unless required by applicable law or agreed to in writing, software
-## distributed under the License is distributed on an "AS IS" BASIS,
-## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-## See the License for the specific language governing permissions and
-## limitations under the License.
-## ---------------------------------------------------------------------------
-class=org.apache.activemq.apollo.stomp.StompProtocolHandler
\ No newline at end of file
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+## 
+## http://www.apache.org/licenses/LICENSE-2.0
+## 
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.apollo.broker.MultiProtocolHandler

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=961075&r1=961074&r2=961075&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Wed Jul  7 03:44:41 2010
@@ -24,10 +24,9 @@ import _root_.java.lang.{String}
 import _root_.org.apache.activemq.util.buffer.{Buffer, UTF8Buffer, AsciiBuffer}
 import _root_.org.apache.activemq.util.{FactoryFinder, IOHelper}
 import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
-import _root_.org.fusesource.hawtdispatch.{DispatchQueue, BaseRetained}
-
 import _root_.scala.collection.JavaConversions._
 import _root_.scala.reflect.BeanProperty
+import org.fusesource.hawtdispatch.{Dispatch, DispatchQueue, BaseRetained}
 
 object BrokerFactory {
 
@@ -90,7 +89,7 @@ object BrokerConstants extends Log {
   val DEFAULT_VIRTUAL_HOST_NAME = new AsciiBuffer("default")
 }
 
-class Broker() extends Service with Logging {
+class Broker() extends Service with DispatchLogging {
   
   import BrokerConstants._
   override protected def log = BrokerConstants
@@ -113,6 +112,9 @@ class Broker() extends Service with Logg
   def start = runtime.start
   def stop = runtime.stop
 
+  val dispatchQueue = createQueue("broker");
+  dispatchQueue.setTargetQueue(Dispatch.getRandomThreadQueue)
+
   def addVirtualHost(host: VirtualHost) = {
     if (host.names.isEmpty) {
       throw new IllegalArgumentException("Virtual host must be configured with at least one host name.")
@@ -159,7 +161,6 @@ class Broker() extends Service with Logg
     }
 
     var state = CONFIGURATION
-    val dispatchQueue = createQueue("broker");
     val clientConnections: ArrayList[Connection] = new ArrayList[Connection]
 
     def removeConnectUri(uri: String): Unit = ^ {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala?rev=961075&r1=961074&r2=961075&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala Wed Jul  7 03:44:41 2010
@@ -27,221 +27,149 @@ import _root_.org.apache.activemq.util.{
 import _root_.org.apache.activemq.wireformat.WireFormat
 import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
 import java.util.concurrent.atomic.AtomicLong
+import org.fusesource.hawtdispatch.Dispatch
 
-abstract class Connection() extends TransportListener with Service {
+object Connection extends Log {
+  val id_generator = new AtomicLong()
+  def next_id = "connection:"+id_generator.incrementAndGet
+}
+
+abstract class Connection() extends TransportListener with Service  with DispatchLogging {
 
-  val dispatchQueue = createQueue("connection")
+  override protected def log = Connection
+
+  import Connection._
+  val id = next_id
+  val dispatchQueue = createQueue(id)
+  dispatchQueue.setTargetQueue(Dispatch.getRandomThreadQueue)
+  
   var name = "connection"
   var stopping = false;
 
   var transport:Transport = null
-  var exceptionListener:ExceptionListener = null;
 
-  def start() = ^{
+  def start() = {
     transport.setDispatchQueue(dispatchQueue);
     transport.setTransportListener(Connection.this);
     transport.start()
-  } ->: dispatchQueue
+  }
 
-  def stop() = ^{
+  def stop() = {
     stopping=true
     transport.stop()
     dispatchQueue.release
-  } ->: dispatchQueue
+  }
 
-  def onException(error:IOException) = {
+  def onTransportFailure(error:IOException) = {
     if (!stopping) {
         onFailure(error);
     }
   }
 
   def onFailure(error:Exception) = {
-    if (exceptionListener != null) {
-        exceptionListener.exceptionThrown(error);
-    }
+    warn(error)
+    transport.stop
   }
 
-  def onDisconnected() = {
+  def onTransportDisconnected() = {
   }
 
-  def onConnected() = {
+  def onTransportConnected() = {
   }
 
 }
 
-object BrokerConnection extends Log {
-  val id_generator = new AtomicLong()
-}
+class BrokerConnection(val broker: Broker) extends Connection {
 
-class BrokerConnection(val broker: Broker) extends Connection with Logging {
+  var protocol = "stomp"
+  var protocolHandler: ProtocolHandler = null;
 
-  override protected def log = BrokerConnection
-  override protected def log_map(message:String) = "connection:"+id+" | "+message
+  override def start() = {
+    protocolHandler = ProtocolHandlerFactory.createProtocolHandler(protocol)
+    protocolHandler.setConnection(this);
+    super.start
+  }
 
-  import BrokerConnection._
+  override def onTransportConnected() = protocolHandler.onTransportConnected
 
-  var protocolHandler: ProtocolHandler = null;
-  val id = id_generator.incrementAndGet
+  override def onTransportDisconnected() = protocolHandler.onTransportDisconnected
 
-  exceptionListener = new ExceptionListener() {
-    def exceptionThrown(error:Exception) = {
-      info("Transport failed before messaging protocol was initialized.", error);
-      stop()
+  def onTransportCommand(command: Object) = {
+    try {
+      protocolHandler.onTransportCommand(command);
+    } catch {
+      case e:Exception =>
+        onFailure(e)
     }
   }
 
+  override def onTransportFailure(error: IOException) = protocolHandler.onTransportFailure(error)
+}
+
+class ProtocolException(message:String, e:Throwable=null) extends Exception(message, e) 
+
+class MultiProtocolHandler extends ProtocolHandler {
+
+  var connected = false
+
+  def onTransportCommand(command:Any) = {
 
-  def onCommand(command: Object) = {
-    if (protocolHandler != null) {
-      protocolHandler.onCommand(command);
-    } else {
-      try {
-        var wireformat:WireFormat = null;
-
-        if (command.isInstanceOf[WireFormat]) {
-
-          // First command might be from the wire format decriminator, letting
-          // us know what the actually wireformat is.
-          wireformat = command.asInstanceOf[WireFormat];
-
-          try {
-            protocolHandler = ProtocolHandlerFactory.createProtocolHandler(wireformat.getName());
-          } catch {
-            case e:Exception=>
-            throw IOExceptionSupport.create("No protocol handler available for: " + wireformat.getName(), e);
-          }
-
-          protocolHandler.setConnection(this);
-          protocolHandler.setWireFormat(wireformat);
-          protocolHandler.start();
-
-          exceptionListener = new ExceptionListener() {
-            def exceptionThrown(error:Exception) {
-              protocolHandler.onException(error);
-            }
-          }
-          protocolHandler.onCommand(command);
-
-        } else {
-          throw new IOException("First command should be a WireFormat");
-        }
-
-      } catch {
-        case e:Exception =>
-        onFailure(e);
-      }
+    if (!command.isInstanceOf[WireFormat]) {
+      throw new ProtocolException("First command should be a WireFormat");
     }
-  }
 
-  override def stop() = {
-    super.stop();
-    if (protocolHandler != null) {
-      protocolHandler.stop();
+    var wireformat:WireFormat = command.asInstanceOf[WireFormat];
+    val protocol = wireformat.getName()
+    val protocolHandler = try {
+      // Create the new protocol handler..
+       ProtocolHandlerFactory.createProtocolHandler(protocol);
+    } catch {
+      case e:Exception=>
+      throw new ProtocolException("No protocol handler available for protocol: " + protocol, e);
     }
+    protocolHandler.setConnection(connection);
+
+    // replace the current handler with the new one.
+    connection.protocol = protocol
+    connection.protocolHandler = protocolHandler
+    connection.transport.suspendRead
+    protocolHandler.onTransportConnected
+  }
+
+  override def onTransportConnected = {
+    connection.transport.resumeRead
   }
-}
 
+}
 
 object ProtocolHandlerFactory {
-    val PROTOCOL_HANDLER_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/broker/protocol/");
+    val PROTOCOL_HANDLER_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/apollo/broker/protocol/");
 
     def createProtocolHandler(protocol:String) = {
         PROTOCOL_HANDLER_FINDER.newInstance(protocol).asInstanceOf[ProtocolHandler]
     }
 }
 
-trait ProtocolHandler extends Service {
+trait ProtocolHandler extends TransportListener {
+
+  var connection:BrokerConnection = null;
+
+  def setConnection(brokerConnection:BrokerConnection) = {
+    this.connection = brokerConnection
+  }
+
+  def onTransportCommand(command:Any);
+
+  def onTransportFailure(error:IOException) = {
+    connection.stop()
+  }
+
+  def onTransportDisconnected() = {
+  }
+
+  def onTransportConnected() = {
+  }
 
-    def onCommand(command:Any);
-    def setConnection(brokerConnection:BrokerConnection);
-    def setWireFormat(wireformat:WireFormat);
-    def onException(error:Exception);
-
-// TODO:
-//    public void setConnection(BrokerConnection connection);
-//
-//    public BrokerConnection getConnection();
-//
-//    public void onCommand(Object command);
-//
-//    public void onException(Exception error);
-//
-//    public void setWireFormat(WireFormat wf);
-//
-//    public BrokerMessageDelivery createMessageDelivery(MessageRecord record) throws IOException;
-//
-//    /**
-//     * ClientContext
-//     * <p>
-//     * Description: Base interface describing a channel on a physical
-//     * connection.
-//     * </p>
-//     *
-//     * @author cmacnaug
-//     * @version 1.0
-//     */
-//    public interface ClientContext {
-//        public ClientContext getParent();
-//
-//        public Collection<ClientContext> getChildren();
-//
-//        public void addChild(ClientContext context);
-//
-//        public void removeChild(ClientContext context);
-//
-//        public void close();
-//
-//    }
-//
-//    public abstract class AbstractClientContext<E extends MessageDelivery> extends AbstractLimitedFlowResource<E> implements ClientContext {
-//        protected final HashSet<ClientContext> children = new HashSet<ClientContext>();
-//        protected final ClientContext parent;
-//        protected boolean closed = false;
-//
-//        public AbstractClientContext(String name, ClientContext parent) {
-//            super(name);
-//            this.parent = parent;
-//            if (parent != null) {
-//                parent.addChild(this);
-//            }
-//        }
-//
-//        public ClientContext getParent() {
-//            return parent;
-//        }
-//
-//        public void addChild(ClientContext child) {
-//            if (!closed) {
-//                children.add(child);
-//            }
-//        }
-//
-//        public void removeChild(ClientContext child) {
-//            if (!closed) {
-//                children.remove(child);
-//            }
-//        }
-//
-//        public Collection<ClientContext> getChildren() {
-//            return children;
-//        }
-//
-//        public void close() {
-//
-//            closed = true;
-//
-//            for (ClientContext c : children) {
-//                c.close();
-//            }
-//
-//            if (parent != null) {
-//                parent.removeChild(this);
-//            }
-//
-//            super.close();
-//        }
-//    }
-//
 }
 
 trait ConsumerContext { // extends ClientContext, Subscription<MessageDelivery>, IFlowSink<MessageDelivery> {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Logging.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Logging.scala?rev=961075&r1=961074&r2=961075&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Logging.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Logging.scala Wed Jul  7 03:44:41 2010
@@ -209,3 +209,17 @@ trait Logging {
   }
 
 }
+
+trait DispatchLogging extends Logging {
+  import org.fusesource.hawtdispatch.ScalaDispatch._
+
+  override protected def log_map(message:String) = {
+    val d = getCurrentQueue
+    if( d!=null ) {
+      d.getLabel+" | "+message
+    } else {
+      message
+    }
+  }
+
+}
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala?rev=961075&r1=961074&r2=961075&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala Wed Jul  7 03:44:41 2010
@@ -17,7 +17,6 @@
 package org.apache.activemq.apollo.broker.perf
 
 import _root_.java.beans.ExceptionListener
-import _root_.java.io.{File}
 import _root_.java.net.URI
 import _root_.java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
 import _root_.java.util.concurrent.TimeUnit
@@ -33,6 +32,7 @@ import org.apache.activemq.transport.Tra
 
 import _root_.scala.collection.JavaConversions._
 import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
+import java.io.{IOException, File}
 
 
 abstract class RemoteConsumer extends Connection {
@@ -43,6 +43,7 @@ abstract class RemoteConsumer extends Co
   var selector: String = null;
   var durable = false;
   var uri: String = null
+  var brokerPerfTest:BaseBrokerPerfTest = null
 
   override def start() = {
     consumerRate.name("Consumer " + name + " Rate");
@@ -52,10 +53,17 @@ abstract class RemoteConsumer extends Co
   }
 
 
-  override def onConnected() = {
+  override def onTransportConnected() = {
     setupSubscription();
   }
 
+  override def onTransportFailure(error: IOException) = {
+    if (!brokerPerfTest.stopping.get()) {
+      System.err.println("Client Async Error:");
+      error.printStackTrace();
+    }
+  }
+
   protected def setupSubscription()
 
 }
@@ -79,6 +87,14 @@ abstract class RemoteProducer extends Co
   var filler: String = null
   var payloadSize = 20
   var uri: String = null
+  var brokerPerfTest:BaseBrokerPerfTest = null
+
+  override def onTransportFailure(error: IOException) = {
+    if (!brokerPerfTest.stopping.get()) {
+      System.err.println("Client Async Error:");
+      error.printStackTrace();
+    }
+  }
 
   override def start() = {
 
@@ -98,7 +114,7 @@ abstract class RemoteProducer extends Co
 
   }
 
-  override def onConnected() = {
+  override def onTransportConnected() = {
     setupProducer();
   }
 
@@ -171,7 +187,7 @@ abstract class BaseBrokerPerfTest {
   protected var rcvBroker: Broker = null
   protected val brokers = new ArrayList[Broker]()
   protected val msgIdGenerator = new AtomicLong()
-  protected val stopping = new AtomicBoolean()
+  val stopping = new AtomicBoolean()
 
   val producers = new ArrayList[RemoteProducer]()
   val consumers = new ArrayList[RemoteConsumer]()
@@ -540,14 +556,7 @@ abstract class BaseBrokerPerfTest {
   def createConsumer(i: Int, destination: Destination): RemoteConsumer = {
 
     var consumer = createConsumer();
-    consumer.exceptionListener = new ExceptionListener() {
-      def exceptionThrown(error: Exception) = {
-        if (!stopping.get()) {
-          System.err.println("Consumer Async Error:");
-          error.printStackTrace();
-        }
-      }
-    }
+    consumer.brokerPerfTest = this
 
     consumer.uri = rcvBroker.connectUris.head
     consumer.destination = destination
@@ -560,14 +569,7 @@ abstract class BaseBrokerPerfTest {
 
   private def createProducer(id: Int, destination: Destination): RemoteProducer = {
     var producer = createProducer();
-    producer.exceptionListener = new ExceptionListener() {
-      def exceptionThrown(error: Exception) = {
-        if (!stopping.get()) {
-          System.err.println("Producer Async Error:");
-          error.printStackTrace();
-        }
-      }
-    }
+    producer.brokerPerfTest = this
     producer.uri = sendBroker.connectUris.head
     producer.producerId = id + 1
     producer.name = "producer" + (id + 1)

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/transport/vm/VMTransportTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/transport/vm/VMTransportTest.java?rev=961075&r1=961074&r2=961075&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/transport/vm/VMTransportTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/transport/vm/VMTransportTest.java Wed Jul  7 03:44:41 2010
@@ -17,7 +17,6 @@
 package org.apache.activemq.apollo.transport.vm;
 
 import java.io.IOException;
-import java.net.URI;
 
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportFactory;

Modified: activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/BrokerXml.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/BrokerXml.java?rev=961075&r1=961074&r2=961075&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/BrokerXml.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/BrokerXml.java Wed Jul  7 03:44:41 2010
@@ -16,7 +16,6 @@
  */
 package org.apache.activemq.apollo.jaxb;
 
-import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
 

Copied: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/META-INF/services/org/apache/activemq/apollo/broker/protocol/stomp (from r961074, activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/META-INF/services/org/apache/activemq/broker/protocol/stomp)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/META-INF/services/org/apache/activemq/apollo/broker/protocol/stomp?p2=activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/META-INF/services/org/apache/activemq/apollo/broker/protocol/stomp&p1=activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/META-INF/services/org/apache/activemq/broker/protocol/stomp&r1=961074&r2=961075&rev=961075&view=diff
==============================================================================
    (empty)

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=961075&r1=961074&r2=961075&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Wed Jul  7 03:44:41 2010
@@ -28,7 +28,8 @@ import AsciiBuffer._
 import Stomp._
 import BufferConversions._
 import StompFrameConstants._
-import org.apache.activemq.transport.CompletionCallback;
+import org.apache.activemq.transport.CompletionCallback
+import java.io.IOException
 
 
 class StompProtocolException(msg:String) extends Exception(msg)
@@ -53,16 +54,11 @@ import StompConstants._
 
 object StompProtocolHandler extends Log
 
-class StompProtocolHandler extends ProtocolHandler with Logging {
+class StompProtocolHandler extends ProtocolHandler with DispatchLogging {
 
   override protected def log = StompProtocolHandler
-  override protected def log_map(message:String) = {
-    if( connection==null )
-      message
-    else
-      "connection:"+connection.id+" | "+message
-  }
-
+  
+  protected def dispatchQueue:DispatchQueue = connection.dispatchQueue
 
   class SimpleConsumer(val dest:AsciiBuffer) extends BaseRetained with DeliveryConsumer {
 
@@ -93,13 +89,10 @@ class StompProtocolHandler extends Proto
     }
   }
 
-  def dispatchQueue = connection.dispatchQueue
   val outboundChannel = new DeliveryBuffer
   var closed = false
   var consumer:SimpleConsumer = null
 
-  var connection:BrokerConnection = null
-  var wireformat:WireFormat = null
   var producerRoute:DeliveryProducerRoute=null
   var host:VirtualHost = null
 
@@ -111,7 +104,7 @@ class StompProtocolHandler extends Proto
           outboundChannel.ack(delivery)
         }
         def onFailure(e:Exception) = {
-          StompProtocolHandler.this.onException(e)
+          connection.onFailure(e)
         }
       });
     }
@@ -119,15 +112,7 @@ class StompProtocolHandler extends Proto
 
   private def queue = connection.dispatchQueue
 
-  def setConnection(connection:BrokerConnection) = {
-    this.connection = connection
-  }
-
-  def setWireFormat(wireformat:WireFormat) = { this.wireformat = wireformat}
-
-  def start = {
-    info("start")
-    connection.transport.suspendRead
+  override def onTransportConnected() = {
     connection.broker.runtime.getDefaultVirtualHost(
       queue.wrap { (host)=>
         info("got host.. resuming")
@@ -137,7 +122,8 @@ class StompProtocolHandler extends Proto
     )
   }
 
-  def stop = {
+
+  override def onTransportDisconnected() = {
     if( !closed ) {
       info("stop")
       closed=true;
@@ -149,13 +135,11 @@ class StompProtocolHandler extends Proto
         host.router.unbind(consumer.dest, consumer::Nil)
         consumer=null
       }
-      connection.stop
     }
   }
 
 
-  def onCommand(command:Any) = {
-    info("got command: %s", command)
+  def onTransportCommand(command:Any) = {
     try {
       command match {
         case StompFrame(Commands.SEND, headers, content) =>
@@ -163,11 +147,14 @@ class StompProtocolHandler extends Proto
         case StompFrame(Commands.ACK, headers, content) =>
           // TODO:
         case StompFrame(Commands.SUBSCRIBE, headers, content) =>
+          info("got command: %s", command)
           on_stomp_subscribe(headers)
         case StompFrame(Commands.CONNECT, headers, _) =>
+          info("got command: %s", command)
           on_stomp_connect(headers)
         case StompFrame(Commands.DISCONNECT, headers, content) =>
-          stop
+          info("got command: %s", command)
+          connection.stop
         case s:StompWireFormat =>
           // this is passed on to us by the protocol discriminator
           // so we know which wire format is being used.
@@ -255,13 +242,13 @@ class StompProtocolHandler extends Proto
   }
 
   def on_stomp_subscribe(headers:HeaderMap) = {
-    println("Consumer on "+Thread.currentThread.getName)
     get(headers, Headers.Subscribe.DESTINATION) match {
       case Some(dest)=>
         if( consumer !=null ) {
           die("Only one subscription supported.")
 
         } else {
+          info("subscribing to: %s", dest)
           consumer = new SimpleConsumer(dest);
           host.router.bind(dest, consumer :: Nil)
           consumer.release
@@ -273,19 +260,17 @@ class StompProtocolHandler extends Proto
   }
 
   private def die(msg:String) = {
-    println("Shutting connection down due to: "+msg)
+    info("Shutting connection down due to: "+msg)
     connection.transport.suspendRead
     connection.transport.oneway(StompFrame(Responses.ERROR, Nil, ascii(msg)))
     ^ {
-      stop
+      connection.stop()
     } ->: queue
   }
 
-  def onException(error:Exception) = {
-    println("Shutting connection down due to: "+error)
-    error.printStackTrace
-    stop
+  override def onTransportFailure(error: IOException) = {
+    info(error, "Shutting connection down due to: %s", error)
+    super.onTransportFailure(error);
   }
-
 }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala?rev=961075&r1=961074&r2=961075&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala Wed Jul  7 03:44:41 2010
@@ -29,7 +29,7 @@ import Stomp.Headers._
 
 import BufferConversions._
 import _root_.scala.collection.JavaConversions._
-import StompFrameConstants._;
+import StompFrameConstants._
 
 
 /**
@@ -54,18 +54,20 @@ class StompWireFormatFactory extends Wir
     }
 }
 
-object StompWireFormat {
+object StompWireFormat extends Log {
     val READ_BUFFFER_SIZE = 1024*64;
     val MAX_COMMAND_LENGTH = 1024;
     val MAX_HEADER_LENGTH = 1024 * 10;
     val MAX_HEADERS = 1000;
     val MAX_DATA_LENGTH = 1024 * 1024 * 100;
-    val TRIM=false
+    val TRIM=true
     val SIZE_CHECK=false
   }
 
-class StompWireFormat extends WireFormat {
+class StompWireFormat extends WireFormat with DispatchLogging {
+
   import StompWireFormat._
+  protected def log: Log = StompWireFormat
 
   implicit def wrap(x: Buffer) = ByteBuffer.wrap(x.data, x.offset, x.length);
   implicit def wrap(x: Byte) = {
@@ -147,6 +149,8 @@ class StompWireFormat extends WireFormat
       while( rc == null && end!=buffer.position ) {
         rc = next_action(buffer)
       }
+
+//      trace("unmarshalled: "+rc+", start: "+start+", end: "+end+", buffer position: "+buffer.position)
       rc
     }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala?rev=961075&r1=961074&r2=961075&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala Wed Jul  7 03:44:41 2010
@@ -66,7 +66,7 @@ class StompRemoteConsumer extends Remote
         transport.oneway(frame);
     }
 
-    def onCommand(command:Object) = {
+    def onTransportCommand(command:Object) = {
       var frame = command.asInstanceOf[StompFrame]
       frame match {
         case StompFrame(Responses.CONNECTED, headers, _) =>
@@ -140,7 +140,7 @@ class StompRemoteProducer extends Remote
       transport.oneway(StompFrame(Stomp.Commands.CONNECT), send_next);
     }
 
-    def onCommand(command:Object) = {
+    def onTransportCommand(command:Object) = {
       var frame = command.asInstanceOf[StompFrame]
       frame match {
         case StompFrame(Responses.CONNECTED, headers, _) =>

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala?rev=961075&r1=961074&r2=961075&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala Wed Jul  7 03:44:41 2010
@@ -36,7 +36,7 @@ object StompLoadClient {
   import StompLoadClient._
   implicit def toAsciiBuffer(value: String) = new AsciiBuffer(value)
 
-  var producerSleep = 1000*30;
+  var producerSleep = 0;
   var consumerSleep = 0;
   var producers = 1;
   var consumers = 1;
@@ -158,6 +158,7 @@ object StompLoadClient {
 """)
         client.flush
         client.receive("CONNECTED")
+
         proc(client)
       } catch {
         case e: Throwable =>

Modified: activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=961075&r1=961074&r2=961075&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Wed Jul  7 03:44:41 2010
@@ -121,8 +121,6 @@ public class TcpTransport implements Tra
     }
 
     public void start() throws Exception {
-        assert Dispatch.getCurrentQueue() == dispatchQueue;
-
         if (dispatchQueue == null) {
             throw new IllegalArgumentException("dispatchQueue is not set");
         }
@@ -161,7 +159,7 @@ public class TcpTransport implements Tra
                             connectSource.release();
                             fireConnected();
                         } catch (IOException e) {
-                            listener.onException(e);
+                            listener.onTransportFailure(e);
                         }
                     }
                 }
@@ -197,7 +195,7 @@ public class TcpTransport implements Tra
                 try {
                     drainInbound();
                 } catch (IOException e) {
-                    listener.onException(e);
+                    listener.onTransportFailure(e);
                 }
             }
         });
@@ -216,14 +214,11 @@ public class TcpTransport implements Tra
         });
 
         remoteAddress = channel.socket().getRemoteSocketAddress().toString();
-        listener.onConnected();
-        readSource.resume();
+        listener.onTransportConnected();
     }
 
 
     public void stop() throws Exception {
-        assert Dispatch.getCurrentQueue() == dispatchQueue;
-
         if( readSource!=null ) {
             readSource.release();
             readSource = null;
@@ -319,7 +314,7 @@ public class TcpTransport implements Tra
             }
 
         } catch (IOException e) {
-            listener.onException(e);
+            listener.onTransportFailure(e);
         }
         
         return outbound.isEmpty() && outbound_frame==null;
@@ -358,7 +353,7 @@ public class TcpTransport implements Tra
                 int p = readBuffer.position();
                 int count = channel.read(readBuffer);
                 if (count == -1) {
-                    throw new EOFException();
+                    throw new EOFException("Peer disconnected");
                 } else if (count == 0) {
                     return;
                 }
@@ -366,7 +361,7 @@ public class TcpTransport implements Tra
 
             Object command=unmarshalSession.unmarshal(readBuffer);
             if( command!=null ) {
-                listener.onCommand(command);
+                listener.onTransportCommand(command);
 
                 // the transport may be suspended after processing a command.
                 if( transportState==DISPOSED || readSource.isSuspended() ) {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/DefaultTransportListener.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/DefaultTransportListener.java?rev=961075&r1=961074&r2=961075&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/DefaultTransportListener.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/DefaultTransportListener.java Wed Jul  7 03:44:41 2010
@@ -29,7 +29,7 @@ public class DefaultTransportListener im
      * 
      * @param command
      */
-    public void onCommand(Object command) {
+    public void onTransportCommand(Object command) {
     }
 
     /**
@@ -37,20 +37,20 @@ public class DefaultTransportListener im
      * 
      * @param error
      */
-    public void onException(IOException error) {
+    public void onTransportFailure(IOException error) {
     }
 
     /**
      * The transport has been connected.
      */
-    public void onConnected() {
+    public void onTransportConnected() {
     }
 
     /**
      * The transport has suffered a disconnection from
      * which it hopes to recover
      */
-    public void onDisconnected() {
+    public void onTransportDisconnected() {
     }
 
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java?rev=961075&r1=961074&r2=961075&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java Wed Jul  7 03:44:41 2010
@@ -96,8 +96,8 @@ public class TransportFilter implements 
         next.stop();
     }
 
-    public void onCommand(Object command) {
-        transportListener.onCommand(command);
+    public void onTransportCommand(Object command) {
+        transportListener.onTransportCommand(command);
     }
 
 
@@ -115,16 +115,16 @@ public class TransportFilter implements 
     }
 
 
-    public void onException(IOException error) {
-        transportListener.onException(error);
+    public void onTransportFailure(IOException error) {
+        transportListener.onTransportFailure(error);
     }
 
-    public void onDisconnected() {
-        transportListener.onDisconnected();
+    public void onTransportDisconnected() {
+        transportListener.onTransportDisconnected();
     }
 
-    public void onConnected() {
-        transportListener.onConnected();
+    public void onTransportConnected() {
+        transportListener.onTransportConnected();
     }
 
     public <T> T narrow(Class<T> target) {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportListener.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportListener.java?rev=961075&r1=961074&r2=961075&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportListener.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportListener.java Wed Jul  7 03:44:41 2010
@@ -29,22 +29,22 @@ public interface TransportListener {
      * called to process a command
      * @param command
      */
-    void onCommand(Object command);
+    void onTransportCommand(Object command);
     /**
      * An unrecoverable exception has occured on the transport
      * @param error
      */
-    void onException(IOException error);
+    void onTransportFailure(IOException error);
     
     /**
      * The transport has been connected.
      */
-    public void onConnected();
+    public void onTransportConnected();
 
     /**
      * The transport has suffered a disconnection from
      * which it hopes to recover
      */
-    public void onDisconnected();
+    public void onTransportDisconnected();
 
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java?rev=961075&r1=961074&r2=961075&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java Wed Jul  7 03:44:41 2010
@@ -88,9 +88,9 @@ public class PipeTransport implements Tr
                                 }
 
                                 if (wireformat != null && marshal) {
-                                    listener.onCommand(wireformat.unmarshal((Buffer) o));
+                                    listener.onTransportCommand(wireformat.unmarshal((Buffer) o));
                                 } else {
-                                    listener.onCommand(o);
+                                    listener.onTransportCommand(o);
                                 }
                             }
 
@@ -102,7 +102,7 @@ public class PipeTransport implements Tr
                                 }
                             });
                         } catch (IOException e) {
-                            listener.onException(e);
+                            listener.onTransportFailure(e);
                         }
 
                     }
@@ -120,7 +120,7 @@ public class PipeTransport implements Tr
             public void run() {
                 connected = true;
                 dispatchSource.resume();
-                listener.onConnected();
+                listener.onTransportConnected();
                 drainInbound();
             }
         });

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java?rev=961075&r1=961074&r2=961075&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java Wed Jul  7 03:44:41 2010
@@ -42,7 +42,8 @@ public class MultiWireFormatFactory impl
 
     private static final FactoryFinder WIREFORMAT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/wireformat/");
 
-    private String wireFormats = "openwire, stomp";
+//    private String wireFormats = "openwire, stomp";
+    private String wireFormats = "stomp";
     private ArrayList<WireFormatFactory> wireFormatFactories;
 
     static class MultiWireFormat implements WireFormat {

Modified: activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/java/org/apache/activemq/legacy/transport/failover/ReconnectTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/java/org/apache/activemq/legacy/transport/failover/ReconnectTest.java?rev=961075&r1=961074&r2=961075&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/java/org/apache/activemq/legacy/transport/failover/ReconnectTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/java/org/apache/activemq/legacy/transport/failover/ReconnectTest.java Wed Jul  7 03:44:41 2010
@@ -38,7 +38,7 @@ import org.apache.activemq.legacy.broker
 import org.apache.activemq.legacy.broker.TransportConnector;
 import org.apache.activemq.transport.mock.MockTransport;
 import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.transport.TransportListener;
+import org.apache.activemq.apollo.transport.TransportListener;
 import org.apache.activemq.util.ServiceStopper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;

Modified: activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/java/org/apache/activemq/legacy/transport/tcp/SslBrokerServiceTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/java/org/apache/activemq/legacy/transport/tcp/SslBrokerServiceTest.java?rev=961075&r1=961074&r2=961075&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/java/org/apache/activemq/legacy/transport/tcp/SslBrokerServiceTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/java/org/apache/activemq/legacy/transport/tcp/SslBrokerServiceTest.java Wed Jul  7 03:44:41 2010
@@ -16,18 +16,6 @@
  */
 package org.apache.activemq.legacy.transport.tcp;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.net.URI;
-import java.security.KeyStore;
-
-import javax.net.ssl.KeyManager;
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.TrustManager;
-import javax.net.ssl.TrustManagerFactory;
-
 import junit.framework.Test;
 import junit.textui.TestRunner;
 

Modified: activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/java/org/apache/activemq/legacy/transport/tcp/SslTransportServerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/java/org/apache/activemq/legacy/transport/tcp/SslTransportServerTest.java?rev=961075&r1=961074&r2=961075&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/java/org/apache/activemq/legacy/transport/tcp/SslTransportServerTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/java/org/apache/activemq/legacy/transport/tcp/SslTransportServerTest.java Wed Jul  7 03:44:41 2010
@@ -20,7 +20,7 @@ package org.apache.activemq.legacy.trans
 import java.io.IOException;
 import java.net.URI;
 
-import org.apache.activemq.transport.tcp.SslTransportServer;
+import org.apache.activemq.apollo.transport.tcp.SslTransportServer;
 
 import junit.framework.TestCase;
 

Modified: activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/resources/org/apache/activemq/transport/stomp/sslstomp-auth-broker.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/resources/org/apache/activemq/transport/stomp/sslstomp-auth-broker.xml?rev=961075&r1=961074&r2=961075&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/resources/org/apache/activemq/transport/stomp/sslstomp-auth-broker.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/resources/org/apache/activemq/transport/stomp/sslstomp-auth-broker.xml Wed Jul  7 03:44:41 2010
@@ -22,7 +22,7 @@
   <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
   
   <bean class="org.apache.activemq.util.XStreamFactoryBean" name="xstream">
-        <property name="annotatedClass"><value>org.apache.activemq.transport.stomp.SamplePojo</value></property>
+        <property name="annotatedClass"><value>org.apache.activemq.apollo.transport.stomp.SamplePojo</value></property>
   </bean>
 
   <broker useJmx="true" persistent="false" xmlns="http://activemq.org/config/1.0" populateJMSXUserID="true">

Modified: activemq/sandbox/activemq-apollo-actor/sandbox/activemq-network/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/sandbox/activemq-network/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java?rev=961075&r1=961074&r2=961075&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/sandbox/activemq-network/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java (original)
+++ activemq/sandbox/activemq-apollo-actor/sandbox/activemq-network/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java Wed Jul  7 03:44:41 2010
@@ -24,7 +24,7 @@ import java.util.concurrent.ConcurrentHa
 
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportFactory;
-import org.apache.activemq.transport.discovery.DiscoveryAgent;
+import org.apache.activemq.apollo.transport.discovery.DiscoveryAgent;
 import org.apache.activemq.transport.discovery.DiscoveryAgentFactory;
 import org.apache.activemq.transport.discovery.DiscoveryEvent;
 import org.apache.activemq.transport.discovery.DiscoveryListener;

Modified: activemq/sandbox/activemq-apollo-actor/sandbox/activemq-openwire/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/sandbox/activemq-openwire/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java?rev=961075&r1=961074&r2=961075&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/sandbox/activemq-openwire/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java (original)
+++ activemq/sandbox/activemq-apollo-actor/sandbox/activemq-openwire/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java Wed Jul  7 03:44:41 2010
@@ -20,7 +20,6 @@ import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;

Modified: activemq/sandbox/activemq-apollo-actor/sandbox/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/StubConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/sandbox/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/StubConnection.java?rev=961075&r1=961074&r2=961075&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/sandbox/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/StubConnection.java (original)
+++ activemq/sandbox/activemq-apollo-actor/sandbox/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/StubConnection.java Wed Jul  7 03:44:41 2010
@@ -30,7 +30,7 @@ import org.apache.activemq.command.Shutd
 
 import org.apache.activemq.transport.DefaultTransportListener;
 import org.apache.activemq.transport.ResponseCorrelator;
-import org.apache.activemq.transport.Transport;
+import org.apache.activemq.apollo.transport.Transport;
 import org.apache.activemq.transport.TransportListener;
 import org.apache.activemq.util.JMSExceptionSupport;
 import org.apache.activemq.util.ServiceSupport;

Modified: activemq/sandbox/activemq-apollo-actor/sandbox/activemq-openwire/src/test/java/org/apache/activemq/openwire/BrokerTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/sandbox/activemq-openwire/src/test/java/org/apache/activemq/openwire/BrokerTestSupport.java?rev=961075&r1=961074&r2=961075&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/sandbox/activemq-openwire/src/test/java/org/apache/activemq/openwire/BrokerTestSupport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/sandbox/activemq-openwire/src/test/java/org/apache/activemq/openwire/BrokerTestSupport.java Wed Jul  7 03:44:41 2010
@@ -52,7 +52,7 @@ import org.apache.activemq.command.Trans
 import org.apache.activemq.command.XATransactionId;
 import org.apache.activemq.legacy.openwireprotocol.StubConnection;
 import org.apache.activemq.transport.Transport;
-import org.apache.activemq.transport.TransportFactory;
+import org.apache.activemq.apollo.transport.TransportFactory;
 
 public class BrokerTestSupport extends CombinationTestSupport {
 

Modified: activemq/sandbox/activemq-apollo-actor/sandbox/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockBroker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/sandbox/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockBroker.java?rev=961075&r1=961074&r2=961075&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/sandbox/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockBroker.java (original)
+++ activemq/sandbox/activemq-apollo-actor/sandbox/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockBroker.java Wed Jul  7 03:44:41 2010
@@ -19,7 +19,6 @@ package org.apache.activemq.queue.actor.
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.dispatch.DispatchQueue;