You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/06/15 20:32:48 UTC
svn commit: r784902 - in /activemq/sandbox/activemq-flow:
activemq-broker/src/main/java/org/apache/activemq/apollo/broker/
activemq-broker/src/main/java/org/apache/activemq/apollo/transport/vm/
activemq-broker/src/test/java/org/apache/activemq/apollo/t...
Author: chirino
Date: Mon Jun 15 18:32:48 2009
New Revision: 784902
URL: http://svn.apache.org/viewvc?rev=784902&view=rev
Log:
Adding a couple of more test cases to the VMTransport. Enhanced the Pipe transport so the an EOFEception is raised on the peer when the transport is closed.
Modified:
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerConnection.java
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/transport/vm/VMTransportFactory.java
activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/apollo/transport/vm/VMTransportTest.java
activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java
Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerConnection.java?rev=784902&r1=784901&r2=784902&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerConnection.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerConnection.java Mon Jun 15 18:32:48 2009
@@ -21,16 +21,20 @@
import org.apache.activemq.apollo.Connection;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.wireformat.WireFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
public class BrokerConnection extends Connection {
+ private static final Log LOG = LogFactory.getLog(BrokerConnection.class);
+
protected Broker broker;
private ProtocolHandler protocolHandler;
public BrokerConnection() {
setExceptionListener(new ExceptionListener(){
public void exceptionThrown(Exception error) {
- error.printStackTrace();
+ LOG.debug("Transport failed before messaging protocol was initialized.", error);
try {
stop();
} catch (Exception ignore) {
Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/transport/vm/VMTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/transport/vm/VMTransportFactory.java?rev=784902&r1=784901&r2=784902&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/transport/vm/VMTransportFactory.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/transport/vm/VMTransportFactory.java Mon Jun 15 18:32:48 2009
@@ -77,6 +77,7 @@
private void stopBroker() {
try {
this.broker.stop();
+ unbind(this);
} catch (Exception e) {
LOG.error("Failed to stop the broker gracefully: "+e);
LOG.debug("Failed to stop the broker gracefully: ", e);
Modified: activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/apollo/transport/vm/VMTransportTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/apollo/transport/vm/VMTransportTest.java?rev=784902&r1=784901&r2=784902&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/apollo/transport/vm/VMTransportTest.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/apollo/transport/vm/VMTransportTest.java Mon Jun 15 18:32:48 2009
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.apollo.transport.vm;
+import java.io.IOException;
import java.net.URI;
import junit.framework.TestCase;
@@ -33,12 +34,27 @@
System.setProperty("org.apache.activemq.default.directory.prefix", "target/test-data/");
}
-
public void testAutoCreateBroker() throws Exception {
-
Transport connect = TransportFactory.compositeConnect(new URI("vm://test"));
assertNotNull(connect);
-
+ connect.stop();
+ System.out.println("done");
+ }
+
+ public void testNoAutoCreateBroker() throws Exception {
+ try {
+ TransportFactory.compositeConnect(new URI("vm://test?create=false"));
+ fail("Expected a IOException");
+ } catch (IOException e) {
+ }
+ }
+
+ public void testBadOptions() throws Exception {
+ try {
+ TransportFactory.compositeConnect(new URI("vm://test?crazy-option=false"));
+ fail("Expected a IllegalArgumentException");
+ } catch (IllegalArgumentException e) {
+ }
}
}
Modified: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java?rev=784902&r1=784901&r2=784902&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java (original)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransportFactory.java Mon Jun 15 18:32:48 2009
@@ -1,5 +1,6 @@
package org.apache.activemq.transport.pipe;
+import java.io.EOFException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.InetSocketAddress;
@@ -30,6 +31,7 @@
import org.apache.activemq.wireformat.WireFormatFactory;
public class PipeTransportFactory extends TransportFactory {
+ static private final Object EOF_TOKEN = new Object();
protected final HashMap<String, PipeTransportServer> servers = new HashMap<String, PipeTransportServer>();
@@ -59,11 +61,14 @@
}
public void stop() throws Exception {
+ pipe.write(EOF_TOKEN);
if (readContext != null) {
readContext.close(true);
} else {
stopping.set(true);
- thread.join();
+ if( thread!=null ) {
+ thread.join();
+ }
}
}
@@ -107,6 +112,9 @@
pipe.setReadReadyListener(this);
return true;
} else {
+ if(o == EOF_TOKEN) {
+ throw new EOFException();
+ }
if (wireFormat != null) {
listener.onCommand(wireFormat.unmarshal((ByteSequence) o));
} else {
@@ -126,9 +134,15 @@
while (!stopping.get()) {
Object value = pipe.poll(500, TimeUnit.MILLISECONDS);
if (value != null) {
- listener.onCommand(value);
+ if(value == EOF_TOKEN) {
+ throw new EOFException();
+ } else {
+ listener.onCommand(value);
+ }
}
}
+ } catch (IOException e) {
+ listener.onException(e);
} catch (InterruptedException e) {
}
}
@@ -302,7 +316,7 @@
return new PipeTransportServer();
}
- private synchronized void unbind(PipeTransportServer server) {
+ protected synchronized void unbind(PipeTransportServer server) {
servers.remove(server.getName());
}