You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/06/15 20:32:48 UTC

svn commit: r784902 - in /activemq/sandbox/activemq-flow: activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ activemq-broker/src/main/java/org/apache/activemq/apollo/transport/vm/ activemq-broker/src/test/java/org/apache/activemq/apollo/t...

Author: chirino
Date: Mon Jun 15 18:32:48 2009
New Revision: 784902

URL: http://svn.apache.org/viewvc?rev=784902&view=rev
Log:
Adding a couple of more test cases to the VMTransport.  Enhanced the Pipe transport so the an EOFEception is raised on the peer when the transport is closed.

Modified:
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerConnection.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/transport/vm/VMTransportFactory.java
    activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/apollo/transport/vm/VMTransportTest.java
    activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerConnection.java?rev=784902&r1=784901&r2=784902&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerConnection.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerConnection.java Mon Jun 15 18:32:48 2009
@@ -21,16 +21,20 @@
 import org.apache.activemq.apollo.Connection;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.wireformat.WireFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 public class BrokerConnection extends Connection {
     
+	private static final Log LOG = LogFactory.getLog(BrokerConnection.class);
+	
     protected Broker broker;
     private ProtocolHandler protocolHandler;
 
     public BrokerConnection() {
         setExceptionListener(new ExceptionListener(){
             public void exceptionThrown(Exception error) {
-                error.printStackTrace();
+            	LOG.debug("Transport failed before messaging protocol was initialized.", error);
                 try {
                     stop();
                 } catch (Exception ignore) {

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/transport/vm/VMTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/transport/vm/VMTransportFactory.java?rev=784902&r1=784901&r2=784902&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/transport/vm/VMTransportFactory.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/transport/vm/VMTransportFactory.java Mon Jun 15 18:32:48 2009
@@ -77,6 +77,7 @@
 		private void stopBroker() {
 			try {
 				this.broker.stop();
+				unbind(this);
 			} catch (Exception e) {
 				LOG.error("Failed to stop the broker gracefully: "+e);
 				LOG.debug("Failed to stop the broker gracefully: ", e);

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/apollo/transport/vm/VMTransportTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/apollo/transport/vm/VMTransportTest.java?rev=784902&r1=784901&r2=784902&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/apollo/transport/vm/VMTransportTest.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/apollo/transport/vm/VMTransportTest.java Mon Jun 15 18:32:48 2009
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.apollo.transport.vm;
 
+import java.io.IOException;
 import java.net.URI;
 
 import junit.framework.TestCase;
@@ -33,12 +34,27 @@
 		System.setProperty("org.apache.activemq.default.directory.prefix", "target/test-data/");
 	}
 	
-	
 	public void testAutoCreateBroker() throws Exception {
-		
 		Transport connect = TransportFactory.compositeConnect(new URI("vm://test"));
 		assertNotNull(connect);
-		
+		connect.stop();
+		System.out.println("done");
+	}
+	
+	public void testNoAutoCreateBroker() throws Exception {
+		try {
+			TransportFactory.compositeConnect(new URI("vm://test?create=false"));
+			fail("Expected a IOException");
+		} catch (IOException e) {
+		}
+	}
+	
+	public void testBadOptions() throws Exception {
+		try {
+			TransportFactory.compositeConnect(new URI("vm://test?crazy-option=false"));
+			fail("Expected a IllegalArgumentException");
+		} catch (IllegalArgumentException e) {
+		}
 	}
 	
 }

Modified: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java?rev=784902&r1=784901&r2=784902&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java (original)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java Mon Jun 15 18:32:48 2009
@@ -1,5 +1,6 @@
 package org.apache.activemq.transport.pipe;
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.net.InetSocketAddress;
@@ -30,6 +31,7 @@
 import org.apache.activemq.wireformat.WireFormatFactory;
 
 public class PipeTransportFactory extends TransportFactory {
+    static private final Object EOF_TOKEN = new Object();
 
     protected final HashMap<String, PipeTransportServer> servers = new HashMap<String, PipeTransportServer>();
 
@@ -59,11 +61,14 @@
         }
 
         public void stop() throws Exception {
+        	pipe.write(EOF_TOKEN);
             if (readContext != null) {
                 readContext.close(true);
             } else {
                 stopping.set(true);
-                thread.join();
+                if( thread!=null ) {
+                	thread.join();
+                }
             }
         }
 
@@ -107,6 +112,9 @@
                         pipe.setReadReadyListener(this);
                         return true;
                     } else {
+                    	if(o == EOF_TOKEN) {
+                    		throw new EOFException();
+                    	}                    	
                         if (wireFormat != null) {
                             listener.onCommand(wireFormat.unmarshal((ByteSequence) o));
                         } else {
@@ -126,9 +134,15 @@
                 while (!stopping.get()) {
                     Object value = pipe.poll(500, TimeUnit.MILLISECONDS);
                     if (value != null) {
-                        listener.onCommand(value);
+                    	if(value == EOF_TOKEN) {
+                    		throw new EOFException();
+                    	} else {
+                    		listener.onCommand(value);
+                    	}
                     }
                 }
+            } catch (IOException e) {
+                listener.onException(e);
             } catch (InterruptedException e) {
             }
         }
@@ -302,7 +316,7 @@
 		return new PipeTransportServer();
 	}
 
-    private synchronized void unbind(PipeTransportServer server) {
+	protected synchronized void unbind(PipeTransportServer server) {
         servers.remove(server.getName());
     }