You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/06/15 21:06:22 UTC

svn commit: r784920 - in /activemq/sandbox/activemq-flow: activemq-client/src/test/java/org/apache/activemq/legacy/ActiveMQConnectionFactoryTest.java activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java

Author: chirino
Date: Mon Jun 15 19:06:21 2009
New Revision: 784920

URL: http://svn.apache.org/viewvc?rev=784920&view=rev
Log:
adapting the ActiveMQConnectionFactoryTest to use the new apis.

Modified:
    activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/ActiveMQConnectionFactoryTest.java
    activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java

Modified: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/ActiveMQConnectionFactoryTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/ActiveMQConnectionFactoryTest.java?rev=784920&r1=784919&r2=784920&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/ActiveMQConnectionFactoryTest.java (original)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/ActiveMQConnectionFactoryTest.java Mon Jun 15 19:06:21 2009
@@ -31,9 +31,12 @@
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.ActiveMQMessageConsumer;
 import org.apache.activemq.apollo.CombinationTestSupport;
-import org.apache.activemq.legacy.broker.BrokerRegistry;
-import org.apache.activemq.legacy.broker.BrokerService;
-import org.apache.activemq.legacy.broker.TransportConnector;
+import org.apache.activemq.apollo.broker.Broker;
+import org.apache.activemq.apollo.transport.vm.VMTransportFactory;
+import org.apache.activemq.broker.store.memory.MemoryStore;
+import org.apache.activemq.transport.TransportFactory;
+import org.apache.activemq.transport.TransportServer;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -41,7 +44,7 @@
     private static final Log LOG = LogFactory.getLog(ActiveMQConnectionFactoryTest.class);
 
     private ActiveMQConnection connection;
-    private BrokerService broker;
+    private Broker broker;
 
     public void testUseURIToSetUseClientIDPrefixOnConnectionFactory() throws URISyntaxException, JMSException {
         ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(
@@ -109,17 +112,17 @@
         ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
         // Make sure the broker is not created until the connection is
         // instantiated.
-        assertNull(BrokerRegistry.getInstance().lookup("localhost"));
+        assertNull(VMTransportFactory.lookup("localhost"));
         connection = (ActiveMQConnection)cf.createConnection();
         // This should create the connection.
         assertNotNull(connection);
         // Verify the broker was created.
-        assertNotNull(BrokerRegistry.getInstance().lookup("localhost"));
+        assertNotNull(VMTransportFactory.lookup("localhost"));
 
         connection.close();
 
         // Verify the broker was destroyed.
-        assertNull(BrokerRegistry.getInstance().lookup("localhost"));
+        assertNull(VMTransportFactory.lookup("localhost"));
     }
 
     public void testGetBrokerName() throws URISyntaxException, JMSException {
@@ -189,15 +192,16 @@
 
     protected void assertCreateConnection(String uri) throws Exception {
         // Start up a broker with a tcp connector.
-        broker = new BrokerService();
-        broker.setPersistent(false);
-        TransportConnector connector = broker.addConnector(uri);
+        broker = new Broker();
+        broker.getDefaultVirtualHost().setStore(new MemoryStore());
+        TransportServer server = TransportFactory.bind(new URI(uri));
+		broker.addTransportServer(server);
         broker.start();
 
         URI temp = new URI(uri);
         // URI connectURI = connector.getServer().getConnectURI();
         // TODO this sometimes fails when using the actual local host name
-        URI currentURI = connector.getServer().getConnectURI();
+        URI currentURI = server.getConnectURI();
 
         // sometimes the actual host name doesn't work in this test case
         // e.g. on OS X so lets use the original details but just use the actual

Modified: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java?rev=784920&r1=784919&r2=784920&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java (original)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java Mon Jun 15 19:06:21 2009
@@ -33,7 +33,7 @@
 public class PipeTransportFactory extends TransportFactory {
     static private final Object EOF_TOKEN = new Object();
 
-    protected final HashMap<String, PipeTransportServer> servers = new HashMap<String, PipeTransportServer>();
+    static protected final HashMap<String, PipeTransportServer> servers = new HashMap<String, PipeTransportServer>();
 
     protected static class PipeTransport implements DispatchableTransport, Dispatchable, Runnable, ReadReadyListener<Object> {
 
@@ -290,23 +290,24 @@
     }
 
     @Override
-    public synchronized TransportServer doBind(URI uri) throws IOException {
+    public TransportServer doBind(URI uri) throws IOException {
         try {
             Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(uri));
-
             String node = uri.getHost();
-            if (servers.containsKey(node)) {
-                throw new IOException("Server already bound: " + node);
-            }
-            PipeTransportServer server = createTransportServer();
-            server.setConnectURI(uri);
-            server.setName(node);
-            if (options.containsKey("wireFormat")) {
-                server.setWireFormatFactory(createWireFormatFactory(options));
-            }
-                
-            servers.put(node, server);
-            return server;
+    		synchronized(servers) {
+	            if (servers.containsKey(node)) {
+	                throw new IOException("Server already bound: " + node);
+	            }
+	            PipeTransportServer server = createTransportServer();
+	            server.setConnectURI(uri);
+	            server.setName(node);
+	            if (options.containsKey("wireFormat")) {
+	                server.setWireFormatFactory(createWireFormatFactory(options));
+	            }
+	                
+	            servers.put(node, server);
+	            return server;
+    		}
         } catch (URISyntaxException e) {
             throw IOExceptionSupport.create(e);
         }
@@ -316,18 +317,34 @@
 		return new PipeTransportServer();
 	}
 
-	protected synchronized void unbind(PipeTransportServer server) {
-        servers.remove(server.getName());
-    }
-
     @Override
-    public synchronized Transport doCompositeConnect(URI location) throws Exception {
+    public Transport doCompositeConnect(URI location) throws Exception {
         String name = location.getHost();
-        PipeTransportServer server = servers.get(name);
-        if (server == null) {
-            throw new IOException("Server is not bound: " + name);
-        }
-        return server.connect();
+		synchronized(servers) {
+	        PipeTransportServer server = lookup(name);
+	        if (server == null) {
+	            throw new IOException("Server is not bound: " + name);
+	        }
+	        return server.connect();
+		}
+    }
+
+	static public PipeTransportServer lookup(String name) {
+		synchronized(servers) {
+			return servers.get(name);
+    	}
+	}
+    
+    static public Map<String, PipeTransportServer> getServers() {
+    	synchronized(servers) {
+    		return new HashMap<String, PipeTransportServer>(servers);
+    	}
+    }
+
+	static public void unbind(PipeTransportServer server) {
+		synchronized(servers) {
+			servers.remove(server.getName());
+		}
     }
 
 }