You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 05:44:42 UTC
svn commit: r961075 - in /activemq/sandbox/activemq-apollo-actor:
activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/apollo/broker/
activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/apollo/broker/protocol/
a...
Author: chirino
Date: Wed Jul 7 03:44:41 2010
New Revision: 961075
URL: http://svn.apache.org/viewvc?rev=961075&view=rev
Log:
still working on the simple stomp case.
Added:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/apollo/broker/
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/apollo/broker/protocol/
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/apollo/broker/protocol/multi
- copied, changed from r961074, activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/META-INF/services/org/apache/activemq/broker/protocol/stomp
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/META-INF/services/org/apache/activemq/apollo/
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/META-INF/services/org/apache/activemq/apollo/broker/
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/META-INF/services/org/apache/activemq/apollo/broker/protocol/
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/META-INF/services/org/apache/activemq/apollo/broker/protocol/stomp
- copied, changed from r961074, activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/META-INF/services/org/apache/activemq/broker/protocol/stomp
Removed:
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/META-INF/services/org/apache/activemq/broker/protocol/stomp
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Logging.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/transport/vm/VMTransportTest.java
activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/BrokerXml.java
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala
activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/DefaultTransportListener.java
activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java
activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportListener.java
activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java
activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java
activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/java/org/apache/activemq/legacy/transport/failover/ReconnectTest.java
activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/java/org/apache/activemq/legacy/transport/tcp/SslBrokerServiceTest.java
activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/java/org/apache/activemq/legacy/transport/tcp/SslTransportServerTest.java
activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/resources/org/apache/activemq/transport/stomp/sslstomp-auth-broker.xml
activemq/sandbox/activemq-apollo-actor/sandbox/activemq-network/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
activemq/sandbox/activemq-apollo-actor/sandbox/activemq-openwire/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java
activemq/sandbox/activemq-apollo-actor/sandbox/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/StubConnection.java
activemq/sandbox/activemq-apollo-actor/sandbox/activemq-openwire/src/test/java/org/apache/activemq/openwire/BrokerTestSupport.java
activemq/sandbox/activemq-apollo-actor/sandbox/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockBroker.java
Copied: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/apollo/broker/protocol/multi (from r961074, activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/META-INF/services/org/apache/activemq/broker/protocol/stomp)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/apollo/broker/protocol/multi?p2=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/apollo/broker/protocol/multi&p1=activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/META-INF/services/org/apache/activemq/broker/protocol/stomp&r1=961074&r2=961075&rev=961075&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/META-INF/services/org/apache/activemq/broker/protocol/stomp (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/apollo/broker/protocol/multi Wed Jul 7 03:44:41 2010
@@ -1,17 +1,17 @@
-## ---------------------------------------------------------------------------
-## Licensed to the Apache Software Foundation (ASF) under one or more
-## contributor license agreements. See the NOTICE file distributed with
-## this work for additional information regarding copyright ownership.
-## The ASF licenses this file to You under the Apache License, Version 2.0
-## (the "License"); you may not use this file except in compliance with
-## the License. You may obtain a copy of the License at
-##
-## http://www.apache.org/licenses/LICENSE-2.0
-##
-## Unless required by applicable law or agreed to in writing, software
-## distributed under the License is distributed on an "AS IS" BASIS,
-## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-## See the License for the specific language governing permissions and
-## limitations under the License.
-## ---------------------------------------------------------------------------
-class=org.apache.activemq.apollo.stomp.StompProtocolHandler
\ No newline at end of file
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.apollo.broker.MultiProtocolHandler
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=961075&r1=961074&r2=961075&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Wed Jul 7 03:44:41 2010
@@ -24,10 +24,9 @@ import _root_.java.lang.{String}
import _root_.org.apache.activemq.util.buffer.{Buffer, UTF8Buffer, AsciiBuffer}
import _root_.org.apache.activemq.util.{FactoryFinder, IOHelper}
import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
-import _root_.org.fusesource.hawtdispatch.{DispatchQueue, BaseRetained}
-
import _root_.scala.collection.JavaConversions._
import _root_.scala.reflect.BeanProperty
+import org.fusesource.hawtdispatch.{Dispatch, DispatchQueue, BaseRetained}
object BrokerFactory {
@@ -90,7 +89,7 @@ object BrokerConstants extends Log {
val DEFAULT_VIRTUAL_HOST_NAME = new AsciiBuffer("default")
}
-class Broker() extends Service with Logging {
+class Broker() extends Service with DispatchLogging {
import BrokerConstants._
override protected def log = BrokerConstants
@@ -113,6 +112,9 @@ class Broker() extends Service with Logg
def start = runtime.start
def stop = runtime.stop
+ val dispatchQueue = createQueue("broker");
+ dispatchQueue.setTargetQueue(Dispatch.getRandomThreadQueue)
+
def addVirtualHost(host: VirtualHost) = {
if (host.names.isEmpty) {
throw new IllegalArgumentException("Virtual host must be configured with at least one host name.")
@@ -159,7 +161,6 @@ class Broker() extends Service with Logg
}
var state = CONFIGURATION
- val dispatchQueue = createQueue("broker");
val clientConnections: ArrayList[Connection] = new ArrayList[Connection]
def removeConnectUri(uri: String): Unit = ^ {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala?rev=961075&r1=961074&r2=961075&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala Wed Jul 7 03:44:41 2010
@@ -27,221 +27,149 @@ import _root_.org.apache.activemq.util.{
import _root_.org.apache.activemq.wireformat.WireFormat
import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
import java.util.concurrent.atomic.AtomicLong
+import org.fusesource.hawtdispatch.Dispatch
-abstract class Connection() extends TransportListener with Service {
+object Connection extends Log {
+ val id_generator = new AtomicLong()
+ def next_id = "connection:"+id_generator.incrementAndGet
+}
+
+abstract class Connection() extends TransportListener with Service with DispatchLogging {
- val dispatchQueue = createQueue("connection")
+ override protected def log = Connection
+
+ import Connection._
+ val id = next_id
+ val dispatchQueue = createQueue(id)
+ dispatchQueue.setTargetQueue(Dispatch.getRandomThreadQueue)
+
var name = "connection"
var stopping = false;
var transport:Transport = null
- var exceptionListener:ExceptionListener = null;
- def start() = ^{
+ def start() = {
transport.setDispatchQueue(dispatchQueue);
transport.setTransportListener(Connection.this);
transport.start()
- } ->: dispatchQueue
+ }
- def stop() = ^{
+ def stop() = {
stopping=true
transport.stop()
dispatchQueue.release
- } ->: dispatchQueue
+ }
- def onException(error:IOException) = {
+ def onTransportFailure(error:IOException) = {
if (!stopping) {
onFailure(error);
}
}
def onFailure(error:Exception) = {
- if (exceptionListener != null) {
- exceptionListener.exceptionThrown(error);
- }
+ warn(error)
+ transport.stop
}
- def onDisconnected() = {
+ def onTransportDisconnected() = {
}
- def onConnected() = {
+ def onTransportConnected() = {
}
}
-object BrokerConnection extends Log {
- val id_generator = new AtomicLong()
-}
+class BrokerConnection(val broker: Broker) extends Connection {
-class BrokerConnection(val broker: Broker) extends Connection with Logging {
+ var protocol = "stomp"
+ var protocolHandler: ProtocolHandler = null;
- override protected def log = BrokerConnection
- override protected def log_map(message:String) = "connection:"+id+" | "+message
+ override def start() = {
+ protocolHandler = ProtocolHandlerFactory.createProtocolHandler(protocol)
+ protocolHandler.setConnection(this);
+ super.start
+ }
- import BrokerConnection._
+ override def onTransportConnected() = protocolHandler.onTransportConnected
- var protocolHandler: ProtocolHandler = null;
- val id = id_generator.incrementAndGet
+ override def onTransportDisconnected() = protocolHandler.onTransportDisconnected
- exceptionListener = new ExceptionListener() {
- def exceptionThrown(error:Exception) = {
- info("Transport failed before messaging protocol was initialized.", error);
- stop()
+ def onTransportCommand(command: Object) = {
+ try {
+ protocolHandler.onTransportCommand(command);
+ } catch {
+ case e:Exception =>
+ onFailure(e)
}
}
+ override def onTransportFailure(error: IOException) = protocolHandler.onTransportFailure(error)
+}
+
+class ProtocolException(message:String, e:Throwable=null) extends Exception(message, e)
+
+class MultiProtocolHandler extends ProtocolHandler {
+
+ var connected = false
+
+ def onTransportCommand(command:Any) = {
- def onCommand(command: Object) = {
- if (protocolHandler != null) {
- protocolHandler.onCommand(command);
- } else {
- try {
- var wireformat:WireFormat = null;
-
- if (command.isInstanceOf[WireFormat]) {
-
- // First command might be from the wire format decriminator, letting
- // us know what the actually wireformat is.
- wireformat = command.asInstanceOf[WireFormat];
-
- try {
- protocolHandler = ProtocolHandlerFactory.createProtocolHandler(wireformat.getName());
- } catch {
- case e:Exception=>
- throw IOExceptionSupport.create("No protocol handler available for: " + wireformat.getName(), e);
- }
-
- protocolHandler.setConnection(this);
- protocolHandler.setWireFormat(wireformat);
- protocolHandler.start();
-
- exceptionListener = new ExceptionListener() {
- def exceptionThrown(error:Exception) {
- protocolHandler.onException(error);
- }
- }
- protocolHandler.onCommand(command);
-
- } else {
- throw new IOException("First command should be a WireFormat");
- }
-
- } catch {
- case e:Exception =>
- onFailure(e);
- }
+ if (!command.isInstanceOf[WireFormat]) {
+ throw new ProtocolException("First command should be a WireFormat");
}
- }
- override def stop() = {
- super.stop();
- if (protocolHandler != null) {
- protocolHandler.stop();
+ var wireformat:WireFormat = command.asInstanceOf[WireFormat];
+ val protocol = wireformat.getName()
+ val protocolHandler = try {
+ // Create the new protocol handler..
+ ProtocolHandlerFactory.createProtocolHandler(protocol);
+ } catch {
+ case e:Exception=>
+ throw new ProtocolException("No protocol handler available for protocol: " + protocol, e);
}
+ protocolHandler.setConnection(connection);
+
+ // replace the current handler with the new one.
+ connection.protocol = protocol
+ connection.protocolHandler = protocolHandler
+ connection.transport.suspendRead
+ protocolHandler.onTransportConnected
+ }
+
+ override def onTransportConnected = {
+ connection.transport.resumeRead
}
-}
+}
object ProtocolHandlerFactory {
- val PROTOCOL_HANDLER_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/broker/protocol/");
+ val PROTOCOL_HANDLER_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/apollo/broker/protocol/");
def createProtocolHandler(protocol:String) = {
PROTOCOL_HANDLER_FINDER.newInstance(protocol).asInstanceOf[ProtocolHandler]
}
}
-trait ProtocolHandler extends Service {
+trait ProtocolHandler extends TransportListener {
+
+ var connection:BrokerConnection = null;
+
+ def setConnection(brokerConnection:BrokerConnection) = {
+ this.connection = brokerConnection
+ }
+
+ def onTransportCommand(command:Any);
+
+ def onTransportFailure(error:IOException) = {
+ connection.stop()
+ }
+
+ def onTransportDisconnected() = {
+ }
+
+ def onTransportConnected() = {
+ }
- def onCommand(command:Any);
- def setConnection(brokerConnection:BrokerConnection);
- def setWireFormat(wireformat:WireFormat);
- def onException(error:Exception);
-
-// TODO:
-// public void setConnection(BrokerConnection connection);
-//
-// public BrokerConnection getConnection();
-//
-// public void onCommand(Object command);
-//
-// public void onException(Exception error);
-//
-// public void setWireFormat(WireFormat wf);
-//
-// public BrokerMessageDelivery createMessageDelivery(MessageRecord record) throws IOException;
-//
-// /**
-// * ClientContext
-// * <p>
-// * Description: Base interface describing a channel on a physical
-// * connection.
-// * </p>
-// *
-// * @author cmacnaug
-// * @version 1.0
-// */
-// public interface ClientContext {
-// public ClientContext getParent();
-//
-// public Collection<ClientContext> getChildren();
-//
-// public void addChild(ClientContext context);
-//
-// public void removeChild(ClientContext context);
-//
-// public void close();
-//
-// }
-//
-// public abstract class AbstractClientContext<E extends MessageDelivery> extends AbstractLimitedFlowResource<E> implements ClientContext {
-// protected final HashSet<ClientContext> children = new HashSet<ClientContext>();
-// protected final ClientContext parent;
-// protected boolean closed = false;
-//
-// public AbstractClientContext(String name, ClientContext parent) {
-// super(name);
-// this.parent = parent;
-// if (parent != null) {
-// parent.addChild(this);
-// }
-// }
-//
-// public ClientContext getParent() {
-// return parent;
-// }
-//
-// public void addChild(ClientContext child) {
-// if (!closed) {
-// children.add(child);
-// }
-// }
-//
-// public void removeChild(ClientContext child) {
-// if (!closed) {
-// children.remove(child);
-// }
-// }
-//
-// public Collection<ClientContext> getChildren() {
-// return children;
-// }
-//
-// public void close() {
-//
-// closed = true;
-//
-// for (ClientContext c : children) {
-// c.close();
-// }
-//
-// if (parent != null) {
-// parent.removeChild(this);
-// }
-//
-// super.close();
-// }
-// }
-//
}
trait ConsumerContext { // extends ClientContext, Subscription<MessageDelivery>, IFlowSink<MessageDelivery> {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Logging.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Logging.scala?rev=961075&r1=961074&r2=961075&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Logging.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Logging.scala Wed Jul 7 03:44:41 2010
@@ -209,3 +209,17 @@ trait Logging {
}
}
+
+trait DispatchLogging extends Logging {
+ import org.fusesource.hawtdispatch.ScalaDispatch._
+
+ override protected def log_map(message:String) = {
+ val d = getCurrentQueue
+ if( d!=null ) {
+ d.getLabel+" | "+message
+ } else {
+ message
+ }
+ }
+
+}
\ No newline at end of file
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala?rev=961075&r1=961074&r2=961075&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfTest.scala Wed Jul 7 03:44:41 2010
@@ -17,7 +17,6 @@
package org.apache.activemq.apollo.broker.perf
import _root_.java.beans.ExceptionListener
-import _root_.java.io.{File}
import _root_.java.net.URI
import _root_.java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
import _root_.java.util.concurrent.TimeUnit
@@ -33,6 +32,7 @@ import org.apache.activemq.transport.Tra
import _root_.scala.collection.JavaConversions._
import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
+import java.io.{IOException, File}
abstract class RemoteConsumer extends Connection {
@@ -43,6 +43,7 @@ abstract class RemoteConsumer extends Co
var selector: String = null;
var durable = false;
var uri: String = null
+ var brokerPerfTest:BaseBrokerPerfTest = null
override def start() = {
consumerRate.name("Consumer " + name + " Rate");
@@ -52,10 +53,17 @@ abstract class RemoteConsumer extends Co
}
- override def onConnected() = {
+ override def onTransportConnected() = {
setupSubscription();
}
+ override def onTransportFailure(error: IOException) = {
+ if (!brokerPerfTest.stopping.get()) {
+ System.err.println("Client Async Error:");
+ error.printStackTrace();
+ }
+ }
+
protected def setupSubscription()
}
@@ -79,6 +87,14 @@ abstract class RemoteProducer extends Co
var filler: String = null
var payloadSize = 20
var uri: String = null
+ var brokerPerfTest:BaseBrokerPerfTest = null
+
+ override def onTransportFailure(error: IOException) = {
+ if (!brokerPerfTest.stopping.get()) {
+ System.err.println("Client Async Error:");
+ error.printStackTrace();
+ }
+ }
override def start() = {
@@ -98,7 +114,7 @@ abstract class RemoteProducer extends Co
}
- override def onConnected() = {
+ override def onTransportConnected() = {
setupProducer();
}
@@ -171,7 +187,7 @@ abstract class BaseBrokerPerfTest {
protected var rcvBroker: Broker = null
protected val brokers = new ArrayList[Broker]()
protected val msgIdGenerator = new AtomicLong()
- protected val stopping = new AtomicBoolean()
+ val stopping = new AtomicBoolean()
val producers = new ArrayList[RemoteProducer]()
val consumers = new ArrayList[RemoteConsumer]()
@@ -540,14 +556,7 @@ abstract class BaseBrokerPerfTest {
def createConsumer(i: Int, destination: Destination): RemoteConsumer = {
var consumer = createConsumer();
- consumer.exceptionListener = new ExceptionListener() {
- def exceptionThrown(error: Exception) = {
- if (!stopping.get()) {
- System.err.println("Consumer Async Error:");
- error.printStackTrace();
- }
- }
- }
+ consumer.brokerPerfTest = this
consumer.uri = rcvBroker.connectUris.head
consumer.destination = destination
@@ -560,14 +569,7 @@ abstract class BaseBrokerPerfTest {
private def createProducer(id: Int, destination: Destination): RemoteProducer = {
var producer = createProducer();
- producer.exceptionListener = new ExceptionListener() {
- def exceptionThrown(error: Exception) = {
- if (!stopping.get()) {
- System.err.println("Producer Async Error:");
- error.printStackTrace();
- }
- }
- }
+ producer.brokerPerfTest = this
producer.uri = sendBroker.connectUris.head
producer.producerId = id + 1
producer.name = "producer" + (id + 1)
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/transport/vm/VMTransportTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/transport/vm/VMTransportTest.java?rev=961075&r1=961074&r2=961075&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/transport/vm/VMTransportTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/transport/vm/VMTransportTest.java Wed Jul 7 03:44:41 2010
@@ -17,7 +17,6 @@
package org.apache.activemq.apollo.transport.vm;
import java.io.IOException;
-import java.net.URI;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
Modified: activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/BrokerXml.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/BrokerXml.java?rev=961075&r1=961074&r2=961075&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/BrokerXml.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-jaxb/src/main/java/org/apache/activemq/apollo/jaxb/BrokerXml.java Wed Jul 7 03:44:41 2010
@@ -16,7 +16,6 @@
*/
package org.apache.activemq.apollo.jaxb;
-import java.net.URI;
import java.util.ArrayList;
import java.util.List;
Copied: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/META-INF/services/org/apache/activemq/apollo/broker/protocol/stomp (from r961074, activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/META-INF/services/org/apache/activemq/broker/protocol/stomp)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/META-INF/services/org/apache/activemq/apollo/broker/protocol/stomp?p2=activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/META-INF/services/org/apache/activemq/apollo/broker/protocol/stomp&p1=activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/META-INF/services/org/apache/activemq/broker/protocol/stomp&r1=961074&r2=961075&rev=961075&view=diff
==============================================================================
(empty)
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=961075&r1=961074&r2=961075&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Wed Jul 7 03:44:41 2010
@@ -28,7 +28,8 @@ import AsciiBuffer._
import Stomp._
import BufferConversions._
import StompFrameConstants._
-import org.apache.activemq.transport.CompletionCallback;
+import org.apache.activemq.transport.CompletionCallback
+import java.io.IOException
class StompProtocolException(msg:String) extends Exception(msg)
@@ -53,16 +54,11 @@ import StompConstants._
object StompProtocolHandler extends Log
-class StompProtocolHandler extends ProtocolHandler with Logging {
+class StompProtocolHandler extends ProtocolHandler with DispatchLogging {
override protected def log = StompProtocolHandler
- override protected def log_map(message:String) = {
- if( connection==null )
- message
- else
- "connection:"+connection.id+" | "+message
- }
-
+
+ protected def dispatchQueue:DispatchQueue = connection.dispatchQueue
class SimpleConsumer(val dest:AsciiBuffer) extends BaseRetained with DeliveryConsumer {
@@ -93,13 +89,10 @@ class StompProtocolHandler extends Proto
}
}
- def dispatchQueue = connection.dispatchQueue
val outboundChannel = new DeliveryBuffer
var closed = false
var consumer:SimpleConsumer = null
- var connection:BrokerConnection = null
- var wireformat:WireFormat = null
var producerRoute:DeliveryProducerRoute=null
var host:VirtualHost = null
@@ -111,7 +104,7 @@ class StompProtocolHandler extends Proto
outboundChannel.ack(delivery)
}
def onFailure(e:Exception) = {
- StompProtocolHandler.this.onException(e)
+ connection.onFailure(e)
}
});
}
@@ -119,15 +112,7 @@ class StompProtocolHandler extends Proto
private def queue = connection.dispatchQueue
- def setConnection(connection:BrokerConnection) = {
- this.connection = connection
- }
-
- def setWireFormat(wireformat:WireFormat) = { this.wireformat = wireformat}
-
- def start = {
- info("start")
- connection.transport.suspendRead
+ override def onTransportConnected() = {
connection.broker.runtime.getDefaultVirtualHost(
queue.wrap { (host)=>
info("got host.. resuming")
@@ -137,7 +122,8 @@ class StompProtocolHandler extends Proto
)
}
- def stop = {
+
+ override def onTransportDisconnected() = {
if( !closed ) {
info("stop")
closed=true;
@@ -149,13 +135,11 @@ class StompProtocolHandler extends Proto
host.router.unbind(consumer.dest, consumer::Nil)
consumer=null
}
- connection.stop
}
}
- def onCommand(command:Any) = {
- info("got command: %s", command)
+ def onTransportCommand(command:Any) = {
try {
command match {
case StompFrame(Commands.SEND, headers, content) =>
@@ -163,11 +147,14 @@ class StompProtocolHandler extends Proto
case StompFrame(Commands.ACK, headers, content) =>
// TODO:
case StompFrame(Commands.SUBSCRIBE, headers, content) =>
+ info("got command: %s", command)
on_stomp_subscribe(headers)
case StompFrame(Commands.CONNECT, headers, _) =>
+ info("got command: %s", command)
on_stomp_connect(headers)
case StompFrame(Commands.DISCONNECT, headers, content) =>
- stop
+ info("got command: %s", command)
+ connection.stop
case s:StompWireFormat =>
// this is passed on to us by the protocol discriminator
// so we know which wire format is being used.
@@ -255,13 +242,13 @@ class StompProtocolHandler extends Proto
}
def on_stomp_subscribe(headers:HeaderMap) = {
- println("Consumer on "+Thread.currentThread.getName)
get(headers, Headers.Subscribe.DESTINATION) match {
case Some(dest)=>
if( consumer !=null ) {
die("Only one subscription supported.")
} else {
+ info("subscribing to: %s", dest)
consumer = new SimpleConsumer(dest);
host.router.bind(dest, consumer :: Nil)
consumer.release
@@ -273,19 +260,17 @@ class StompProtocolHandler extends Proto
}
private def die(msg:String) = {
- println("Shutting connection down due to: "+msg)
+ info("Shutting connection down due to: "+msg)
connection.transport.suspendRead
connection.transport.oneway(StompFrame(Responses.ERROR, Nil, ascii(msg)))
^ {
- stop
+ connection.stop()
} ->: queue
}
- def onException(error:Exception) = {
- println("Shutting connection down due to: "+error)
- error.printStackTrace
- stop
+ override def onTransportFailure(error: IOException) = {
+ info(error, "Shutting connection down due to: %s", error)
+ super.onTransportFailure(error);
}
-
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala?rev=961075&r1=961074&r2=961075&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala Wed Jul 7 03:44:41 2010
@@ -29,7 +29,7 @@ import Stomp.Headers._
import BufferConversions._
import _root_.scala.collection.JavaConversions._
-import StompFrameConstants._;
+import StompFrameConstants._
/**
@@ -54,18 +54,20 @@ class StompWireFormatFactory extends Wir
}
}
-object StompWireFormat {
+object StompWireFormat extends Log {
val READ_BUFFFER_SIZE = 1024*64;
val MAX_COMMAND_LENGTH = 1024;
val MAX_HEADER_LENGTH = 1024 * 10;
val MAX_HEADERS = 1000;
val MAX_DATA_LENGTH = 1024 * 1024 * 100;
- val TRIM=false
+ val TRIM=true
val SIZE_CHECK=false
}
-class StompWireFormat extends WireFormat {
+class StompWireFormat extends WireFormat with DispatchLogging {
+
import StompWireFormat._
+ protected def log: Log = StompWireFormat
implicit def wrap(x: Buffer) = ByteBuffer.wrap(x.data, x.offset, x.length);
implicit def wrap(x: Byte) = {
@@ -147,6 +149,8 @@ class StompWireFormat extends WireFormat
while( rc == null && end!=buffer.position ) {
rc = next_action(buffer)
}
+
+// trace("unmarshalled: "+rc+", start: "+start+", end: "+end+", buffer position: "+buffer.position)
rc
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala?rev=961075&r1=961074&r2=961075&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala Wed Jul 7 03:44:41 2010
@@ -66,7 +66,7 @@ class StompRemoteConsumer extends Remote
transport.oneway(frame);
}
- def onCommand(command:Object) = {
+ def onTransportCommand(command:Object) = {
var frame = command.asInstanceOf[StompFrame]
frame match {
case StompFrame(Responses.CONNECTED, headers, _) =>
@@ -140,7 +140,7 @@ class StompRemoteProducer extends Remote
transport.oneway(StompFrame(Stomp.Commands.CONNECT), send_next);
}
- def onCommand(command:Object) = {
+ def onTransportCommand(command:Object) = {
var frame = command.asInstanceOf[StompFrame]
frame match {
case StompFrame(Responses.CONNECTED, headers, _) =>
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala?rev=961075&r1=961074&r2=961075&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala Wed Jul 7 03:44:41 2010
@@ -36,7 +36,7 @@ object StompLoadClient {
import StompLoadClient._
implicit def toAsciiBuffer(value: String) = new AsciiBuffer(value)
- var producerSleep = 1000*30;
+ var producerSleep = 0;
var consumerSleep = 0;
var producers = 1;
var consumers = 1;
@@ -158,6 +158,7 @@ object StompLoadClient {
""")
client.flush
client.receive("CONNECTED")
+
proc(client)
} catch {
case e: Throwable =>
Modified: activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=961075&r1=961074&r2=961075&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Wed Jul 7 03:44:41 2010
@@ -121,8 +121,6 @@ public class TcpTransport implements Tra
}
public void start() throws Exception {
- assert Dispatch.getCurrentQueue() == dispatchQueue;
-
if (dispatchQueue == null) {
throw new IllegalArgumentException("dispatchQueue is not set");
}
@@ -161,7 +159,7 @@ public class TcpTransport implements Tra
connectSource.release();
fireConnected();
} catch (IOException e) {
- listener.onException(e);
+ listener.onTransportFailure(e);
}
}
}
@@ -197,7 +195,7 @@ public class TcpTransport implements Tra
try {
drainInbound();
} catch (IOException e) {
- listener.onException(e);
+ listener.onTransportFailure(e);
}
}
});
@@ -216,14 +214,11 @@ public class TcpTransport implements Tra
});
remoteAddress = channel.socket().getRemoteSocketAddress().toString();
- listener.onConnected();
- readSource.resume();
+ listener.onTransportConnected();
}
public void stop() throws Exception {
- assert Dispatch.getCurrentQueue() == dispatchQueue;
-
if( readSource!=null ) {
readSource.release();
readSource = null;
@@ -319,7 +314,7 @@ public class TcpTransport implements Tra
}
} catch (IOException e) {
- listener.onException(e);
+ listener.onTransportFailure(e);
}
return outbound.isEmpty() && outbound_frame==null;
@@ -358,7 +353,7 @@ public class TcpTransport implements Tra
int p = readBuffer.position();
int count = channel.read(readBuffer);
if (count == -1) {
- throw new EOFException();
+ throw new EOFException("Peer disconnected");
} else if (count == 0) {
return;
}
@@ -366,7 +361,7 @@ public class TcpTransport implements Tra
Object command=unmarshalSession.unmarshal(readBuffer);
if( command!=null ) {
- listener.onCommand(command);
+ listener.onTransportCommand(command);
// the transport may be suspended after processing a command.
if( transportState==DISPOSED || readSource.isSuspended() ) {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/DefaultTransportListener.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/DefaultTransportListener.java?rev=961075&r1=961074&r2=961075&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/DefaultTransportListener.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/DefaultTransportListener.java Wed Jul 7 03:44:41 2010
@@ -29,7 +29,7 @@ public class DefaultTransportListener im
*
* @param command
*/
- public void onCommand(Object command) {
+ public void onTransportCommand(Object command) {
}
/**
@@ -37,20 +37,20 @@ public class DefaultTransportListener im
*
* @param error
*/
- public void onException(IOException error) {
+ public void onTransportFailure(IOException error) {
}
/**
* The transport has been connected.
*/
- public void onConnected() {
+ public void onTransportConnected() {
}
/**
* The transport has suffered a disconnection from
* which it hopes to recover
*/
- public void onDisconnected() {
+ public void onTransportDisconnected() {
}
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java?rev=961075&r1=961074&r2=961075&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java Wed Jul 7 03:44:41 2010
@@ -96,8 +96,8 @@ public class TransportFilter implements
next.stop();
}
- public void onCommand(Object command) {
- transportListener.onCommand(command);
+ public void onTransportCommand(Object command) {
+ transportListener.onTransportCommand(command);
}
@@ -115,16 +115,16 @@ public class TransportFilter implements
}
- public void onException(IOException error) {
- transportListener.onException(error);
+ public void onTransportFailure(IOException error) {
+ transportListener.onTransportFailure(error);
}
- public void onDisconnected() {
- transportListener.onDisconnected();
+ public void onTransportDisconnected() {
+ transportListener.onTransportDisconnected();
}
- public void onConnected() {
- transportListener.onConnected();
+ public void onTransportConnected() {
+ transportListener.onTransportConnected();
}
public <T> T narrow(Class<T> target) {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportListener.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportListener.java?rev=961075&r1=961074&r2=961075&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportListener.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportListener.java Wed Jul 7 03:44:41 2010
@@ -29,22 +29,22 @@ public interface TransportListener {
* called to process a command
* @param command
*/
- void onCommand(Object command);
+ void onTransportCommand(Object command);
/**
* An unrecoverable exception has occured on the transport
* @param error
*/
- void onException(IOException error);
+ void onTransportFailure(IOException error);
/**
* The transport has been connected.
*/
- public void onConnected();
+ public void onTransportConnected();
/**
* The transport has suffered a disconnection from
* which it hopes to recover
*/
- public void onDisconnected();
+ public void onTransportDisconnected();
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java?rev=961075&r1=961074&r2=961075&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java Wed Jul 7 03:44:41 2010
@@ -88,9 +88,9 @@ public class PipeTransport implements Tr
}
if (wireformat != null && marshal) {
- listener.onCommand(wireformat.unmarshal((Buffer) o));
+ listener.onTransportCommand(wireformat.unmarshal((Buffer) o));
} else {
- listener.onCommand(o);
+ listener.onTransportCommand(o);
}
}
@@ -102,7 +102,7 @@ public class PipeTransport implements Tr
}
});
} catch (IOException e) {
- listener.onException(e);
+ listener.onTransportFailure(e);
}
}
@@ -120,7 +120,7 @@ public class PipeTransport implements Tr
public void run() {
connected = true;
dispatchSource.resume();
- listener.onConnected();
+ listener.onTransportConnected();
drainInbound();
}
});
Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java?rev=961075&r1=961074&r2=961075&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java Wed Jul 7 03:44:41 2010
@@ -42,7 +42,8 @@ public class MultiWireFormatFactory impl
private static final FactoryFinder WIREFORMAT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/wireformat/");
- private String wireFormats = "openwire, stomp";
+// private String wireFormats = "openwire, stomp";
+ private String wireFormats = "stomp";
private ArrayList<WireFormatFactory> wireFormatFactories;
static class MultiWireFormat implements WireFormat {
Modified: activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/java/org/apache/activemq/legacy/transport/failover/ReconnectTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/java/org/apache/activemq/legacy/transport/failover/ReconnectTest.java?rev=961075&r1=961074&r2=961075&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/java/org/apache/activemq/legacy/transport/failover/ReconnectTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/java/org/apache/activemq/legacy/transport/failover/ReconnectTest.java Wed Jul 7 03:44:41 2010
@@ -38,7 +38,7 @@ import org.apache.activemq.legacy.broker
import org.apache.activemq.legacy.broker.TransportConnector;
import org.apache.activemq.transport.mock.MockTransport;
import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.transport.TransportListener;
+import org.apache.activemq.apollo.transport.TransportListener;
import org.apache.activemq.util.ServiceStopper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Modified: activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/java/org/apache/activemq/legacy/transport/tcp/SslBrokerServiceTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/java/org/apache/activemq/legacy/transport/tcp/SslBrokerServiceTest.java?rev=961075&r1=961074&r2=961075&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/java/org/apache/activemq/legacy/transport/tcp/SslBrokerServiceTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/java/org/apache/activemq/legacy/transport/tcp/SslBrokerServiceTest.java Wed Jul 7 03:44:41 2010
@@ -16,18 +16,6 @@
*/
package org.apache.activemq.legacy.transport.tcp;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.net.URI;
-import java.security.KeyStore;
-
-import javax.net.ssl.KeyManager;
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.TrustManager;
-import javax.net.ssl.TrustManagerFactory;
-
import junit.framework.Test;
import junit.textui.TestRunner;
Modified: activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/java/org/apache/activemq/legacy/transport/tcp/SslTransportServerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/java/org/apache/activemq/legacy/transport/tcp/SslTransportServerTest.java?rev=961075&r1=961074&r2=961075&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/java/org/apache/activemq/legacy/transport/tcp/SslTransportServerTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/java/org/apache/activemq/legacy/transport/tcp/SslTransportServerTest.java Wed Jul 7 03:44:41 2010
@@ -20,7 +20,7 @@ package org.apache.activemq.legacy.trans
import java.io.IOException;
import java.net.URI;
-import org.apache.activemq.transport.tcp.SslTransportServer;
+import org.apache.activemq.apollo.transport.tcp.SslTransportServer;
import junit.framework.TestCase;
Modified: activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/resources/org/apache/activemq/transport/stomp/sslstomp-auth-broker.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/resources/org/apache/activemq/transport/stomp/sslstomp-auth-broker.xml?rev=961075&r1=961074&r2=961075&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/resources/org/apache/activemq/transport/stomp/sslstomp-auth-broker.xml (original)
+++ activemq/sandbox/activemq-apollo-actor/sandbox/activemq-all/src/test/resources/org/apache/activemq/transport/stomp/sslstomp-auth-broker.xml Wed Jul 7 03:44:41 2010
@@ -22,7 +22,7 @@
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
<bean class="org.apache.activemq.util.XStreamFactoryBean" name="xstream">
- <property name="annotatedClass"><value>org.apache.activemq.transport.stomp.SamplePojo</value></property>
+ <property name="annotatedClass"><value>org.apache.activemq.apollo.transport.stomp.SamplePojo</value></property>
</bean>
<broker useJmx="true" persistent="false" xmlns="http://activemq.org/config/1.0" populateJMSXUserID="true">
Modified: activemq/sandbox/activemq-apollo-actor/sandbox/activemq-network/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/sandbox/activemq-network/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java?rev=961075&r1=961074&r2=961075&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/sandbox/activemq-network/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java (original)
+++ activemq/sandbox/activemq-apollo-actor/sandbox/activemq-network/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java Wed Jul 7 03:44:41 2010
@@ -24,7 +24,7 @@ import java.util.concurrent.ConcurrentHa
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
-import org.apache.activemq.transport.discovery.DiscoveryAgent;
+import org.apache.activemq.apollo.transport.discovery.DiscoveryAgent;
import org.apache.activemq.transport.discovery.DiscoveryAgentFactory;
import org.apache.activemq.transport.discovery.DiscoveryEvent;
import org.apache.activemq.transport.discovery.DiscoveryListener;
Modified: activemq/sandbox/activemq-apollo-actor/sandbox/activemq-openwire/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/sandbox/activemq-openwire/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java?rev=961075&r1=961074&r2=961075&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/sandbox/activemq-openwire/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java (original)
+++ activemq/sandbox/activemq-apollo-actor/sandbox/activemq-openwire/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java Wed Jul 7 03:44:41 2010
@@ -20,7 +20,6 @@ import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Modified: activemq/sandbox/activemq-apollo-actor/sandbox/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/StubConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/sandbox/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/StubConnection.java?rev=961075&r1=961074&r2=961075&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/sandbox/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/StubConnection.java (original)
+++ activemq/sandbox/activemq-apollo-actor/sandbox/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/StubConnection.java Wed Jul 7 03:44:41 2010
@@ -30,7 +30,7 @@ import org.apache.activemq.command.Shutd
import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.ResponseCorrelator;
-import org.apache.activemq.transport.Transport;
+import org.apache.activemq.apollo.transport.Transport;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.util.JMSExceptionSupport;
import org.apache.activemq.util.ServiceSupport;
Modified: activemq/sandbox/activemq-apollo-actor/sandbox/activemq-openwire/src/test/java/org/apache/activemq/openwire/BrokerTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/sandbox/activemq-openwire/src/test/java/org/apache/activemq/openwire/BrokerTestSupport.java?rev=961075&r1=961074&r2=961075&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/sandbox/activemq-openwire/src/test/java/org/apache/activemq/openwire/BrokerTestSupport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/sandbox/activemq-openwire/src/test/java/org/apache/activemq/openwire/BrokerTestSupport.java Wed Jul 7 03:44:41 2010
@@ -52,7 +52,7 @@ import org.apache.activemq.command.Trans
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.legacy.openwireprotocol.StubConnection;
import org.apache.activemq.transport.Transport;
-import org.apache.activemq.transport.TransportFactory;
+import org.apache.activemq.apollo.transport.TransportFactory;
public class BrokerTestSupport extends CombinationTestSupport {
Modified: activemq/sandbox/activemq-apollo-actor/sandbox/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockBroker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/sandbox/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockBroker.java?rev=961075&r1=961074&r2=961075&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/sandbox/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockBroker.java (original)
+++ activemq/sandbox/activemq-apollo-actor/sandbox/activemq-queue/src/test/java/org/apache/activemq/queue/actor/perf/MockBroker.java Wed Jul 7 03:44:41 2010
@@ -19,7 +19,6 @@ package org.apache.activemq.queue.actor.
import java.util.ArrayList;
import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.dispatch.DispatchQueue;