You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/12/18 05:28:52 UTC
svn commit: r892128 [2/2] - in /activemq/sandbox/activemq-apollo-actor:
activemq-broker/src/main/java/org/apache/activemq/apollo/broker/
activemq-broker/src/test/java/org/apache/activemq/broker/
activemq-dispatcher/src/main/java/org/apache/activemq/dis...
Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/pipe/PipeTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/pipe/PipeTransportFactory.java?rev=892128&r1=892127&r2=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/pipe/PipeTransportFactory.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/actor/transport/pipe/PipeTransportFactory.java Fri Dec 18 04:28:49 2009
@@ -21,12 +21,14 @@
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.actor.ActorProxy;
-import org.apache.activemq.dispatch.DispatchObject;
import org.apache.activemq.dispatch.DispatchQueue;
import org.apache.activemq.dispatch.Dispatcher;
+import org.apache.activemq.queue.actor.perf.DispatchObjectFilter;
import org.apache.activemq.queue.actor.transport.Transport;
import org.apache.activemq.queue.actor.transport.TransportFactory;
import org.apache.activemq.queue.actor.transport.TransportHandler;
@@ -45,7 +47,8 @@
public class PipeTransportFactory implements TransportFactory {
static protected final HashMap<String, PipeTransportServer> servers = new HashMap<String, PipeTransportServer>();
-
+ static final AtomicLong CONNECTION_IDS = new AtomicLong();
+
static void perform_unbind(PipeTransportServer server) {
synchronized(servers) {
servers.remove(server.name);
@@ -69,7 +72,10 @@
throw new IOException("Server not bound: " + clientTransport.name);
}
}
- PipeTransport serverTransport = new PipeTransport(server.dispatcher);
+
+ long connectionId = clientTransport.connectionId;
+ String remoteAddress = clientTransport.connnectAddress+":server:"+connectionId;
+ PipeTransport serverTransport = new PipeTransport(server.dispatcher, connectionId, remoteAddress);
clientTransport.peer = serverTransport.actor;
serverTransport.peer = clientTransport.actor;
server.actor.onConnect(serverTransport);
@@ -87,13 +93,12 @@
throw new IllegalArgumentException("Invalid bind uri: "+e, e);
}
- PipeTransportServer rc = new PipeTransportServer(dispatcher);
+ PipeTransportServer rc = new PipeTransportServer(dispatcher, name);
rc.connectURI = bindUri;
IntrospectionSupport.setProperties(rc, options);
if (!options.isEmpty()) {
throw new IllegalArgumentException("Invalid bind uri parameters: " + options);
}
- rc.name = name;
return rc;
}
@@ -109,7 +114,9 @@
throw new IllegalArgumentException("Invalid connect uri: "+e, e);
}
- PipeTransport rc = new PipeTransport(dispatcher);
+ long connectionId = CONNECTION_IDS.incrementAndGet();
+ String remoteAddress = connectUri+":client:"+connectionId;
+ PipeTransport rc = new PipeTransport(dispatcher, connectionId, remoteAddress);
IntrospectionSupport.setProperties(rc, options);
if (!options.isEmpty()) {
throw new IllegalArgumentException("Invalid connect uri parameters: " + options);
@@ -120,45 +127,6 @@
}
- public static class DispatchObjectFilter implements DispatchObject {
- protected DispatchObject next;
-
- public DispatchObjectFilter() {
- }
-
- public DispatchObjectFilter(DispatchObject next) {
- this.next = next;
- }
-
- public void addShutdownWatcher(Runnable shutdownWatcher) {
- next.addShutdownWatcher(shutdownWatcher);
- }
- public <Context> Context getContext() {
- return next.getContext();
- }
- public DispatchQueue getTargetQueue() {
- return next.getTargetQueue();
- }
- public void release() {
- next.release();
- }
- public void resume() {
- next.resume();
- }
- public void retain() {
- next.retain();
- }
- public <Context> void setContext(Context context) {
- next.setContext(context);
- }
- public void setTargetQueue(DispatchQueue queue) {
- next.setTargetQueue(queue);
- }
- public void suspend() {
- next.suspend();
- }
- }
-
interface PipeTransportServerActor {
public void onBind();
public void onConnect(PipeTransport serverSide);
@@ -179,11 +147,11 @@
protected boolean marshal;
protected final AtomicInteger suspendCounter = new AtomicInteger();
- protected long connectionCounter;
- public PipeTransportServer(Dispatcher dispatcher) {
- super( dispatcher.createSerialQueue(null) );
+ public PipeTransportServer(Dispatcher dispatcher, String name) {
+ super( dispatcher.createSerialQueue(name) );
this.dispatcher = dispatcher;
+ this.name=name;
dispatchQueue = (DispatchQueue) next;
dispatchQueue.suspend();
dispatchQueue.addShutdownWatcher(new Runnable() {
@@ -214,8 +182,6 @@
}
public void onConnect(PipeTransport serverSide) {
- long connectionId = connectionCounter++;
- serverSide.remoteAddress = connectURI.toString() + "#" + connectionId;
handler.onAccept(serverSide);
}
@@ -266,21 +232,18 @@
private String wireFormat;
private boolean marshal;
+ protected final AtomicBoolean connected = new AtomicBoolean();
protected final AtomicInteger suspendCounter = new AtomicInteger();
-
- public PipeTransport(Dispatcher dispatcher) {
- super( dispatcher.createSerialQueue(null) );
+ private final long connectionId;
+
+ final protected AtomicInteger retained = new AtomicInteger(1);
+
+ public PipeTransport(Dispatcher dispatcher, long connectionId, String remoteAddress) {
+ super( dispatcher.createSerialQueue(remoteAddress) );
+ this.connectionId = connectionId;
this.dispatchQueue = (DispatchQueue) next;
this.dispatchQueue.suspend();
- this.dispatchQueue.addShutdownWatcher(new Runnable() {
- public void run() {
- PipeTransportActor peer = PipeTransport.this.peer;
- if( peer!=null ) {
- peer.onDisconnect();
- peer = null;
- }
- }
- });
+ this.remoteAddress = remoteAddress;
// Queue up the connect event so it's the first thing that gets executed when
// this object gets resumed..
@@ -288,28 +251,65 @@
this.actor.onConnect();
}
+ public void retain() {
+ retained.getAndIncrement();
+ }
+
+ public void release() {
+ if (retained.decrementAndGet() == 0) {
+ PipeTransportActor peer = PipeTransport.this.peer;
+ if( peer!=null ) {
+ peer.onDisconnect();
+ }
+ dispatchQueue.dispatchAsync(new Runnable(){
+ public void run() {
+ handler = null;
+ }
+ });
+ dispatchQueue.release();
+ }
+ }
+
public void setHandler(TransportHandler hanlder) {
this.handler = hanlder;
}
public void onConnect() {
try {
+ if( !connected.compareAndSet(false, true) ) {
+ throw new IOException("allready connected.");
+ }
+
if (connnectAddress != null) {
// Client side connect case...
perform_connect(this);
- remoteAddress = connnectAddress;
- handler.onConnect();
+ if( handler!=null ) {
+ handler.onConnect();
+ }
} else {
// Server side connect case...
- if( peer==null || remoteAddress==null ) {
+ if( peer==null ) {
throw new IOException("Server transport not properly initialized.");
}
- handler.onConnect();
+ if( handler!=null ) {
+ handler.onConnect();
+ }
}
+ dispatchQueue.retain();
} catch (IOException e) {
- handler.onFailure(e);
+ onFailure(e);
}
}
+
+ public void onDisconnect() {
+ if( !connected.compareAndSet(true, false) ) {
+ throw new AssertionError("Was not connected.");
+ }
+ if( handler!=null ) {
+ handler.onDisconnect();
+ }
+ dispatchQueue.release();
+ }
public void send(Object message) {
send(message, null, null);
@@ -328,21 +328,26 @@
complete(onCompleted, queue);
return;
}
+ if( queue!=null ) {
+ queue.retain();
+ }
peer.onDispatch(message, onCompleted, queue);
}
public void onDispatch(Object message, Runnable onCompleted, DispatchQueue queue) {
try {
- Object m = message;
- if (wf != null && marshal) {
- try {
- m = wf.unmarshal((Buffer) m);
- } catch (IOException e) {
- handler.onFailure(e);
- return;
+ if( handler!=null ) {
+ Object m = message;
+ if (wf != null && marshal) {
+ try {
+ m = wf.unmarshal((Buffer) m);
+ } catch (IOException e) {
+ handler.onFailure(e);
+ return;
+ }
}
+ handler.onRecevie(m);
}
- handler.onRecevie(m);
} finally {
complete(onCompleted, queue);
}
@@ -352,17 +357,19 @@
if( onCompleted!=null ) {
if(queue!=null) {
queue.dispatchAsync(onCompleted);
+ if( queue!=null ) {
+ queue.release();
+ }
} else {
onCompleted.run();
}
}
}
- public void onDisconnect() {
- handler.onDisconnect();
- }
public void onFailure(Exception e) {
- handler.onFailure(e);
+ if( handler!=null ) {
+ handler.onFailure(e);
+ }
}
public String getRemoteAddress() {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBroker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBroker.java?rev=892128&r1=892127&r2=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBroker.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBroker.java Fri Dec 18 04:28:49 2009
@@ -210,7 +210,7 @@
broker.setName("Broker");
broker.createDispatcher();
try {
- broker.getDispatcher().retain();
+ broker.getDispatcher().resume();
broker.startServices();
} catch (Exception e) {
// TODO Auto-generated catch block
Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBrokerTest.java?rev=892128&r1=892127&r2=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBrokerTest.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBrokerTest.java Fri Dec 18 04:28:49 2009
@@ -255,7 +255,7 @@
private void createConnections(int destCount) throws Exception {
dispatcher = createDispatcher("BrokerDispatcher");
- dispatcher.retain();
+ dispatcher.resume();
if (multibroker) {
sendBroker = createBroker("SendBroker", sendBrokerURI);
@@ -287,7 +287,7 @@
Dispatcher clientDispatcher = null;
if (SEPARATE_CLIENT_DISPATCHER) {
clientDispatcher = createDispatcher("ClientDispatcher");
- clientDispatcher.retain();
+ clientDispatcher.resume();
} else {
clientDispatcher = dispatcher;
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockClient.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockClient.java?rev=892128&r1=892127&r2=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockClient.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockClient.java Fri Dec 18 04:28:49 2009
@@ -219,7 +219,7 @@
}
public void runTest() throws Exception {
- getDispatcher().retain();
+ getDispatcher().resume();
// Start 'em up.
startServices();
@@ -304,7 +304,7 @@
System.out.println(IntrospectionSupport.toString(test));
try
{
- test.getDispatcher().retain();
+ test.getDispatcher().resume();
test.createConnections();
test.runTest();
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/util/IntrospectionSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/util/IntrospectionSupport.java?rev=892128&r1=892127&r2=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/util/IntrospectionSupport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/util/IntrospectionSupport.java Fri Dec 18 04:28:49 2009
@@ -254,14 +254,22 @@
}
public static String toString(Object target) {
- return toString(target, Object.class, null);
+ return toString(target, Object.class, null, null);
+ }
+
+ public static String toString(Object target, String...fields) {
+ return toString(target, Object.class, null, fields);
}
public static String toString(Object target, Class<?> stopClass) {
- return toString(target, stopClass, null);
+ return toString(target, stopClass, null, null);
+ }
+
+ public static String toString(Object target, Map<String, Object> overrideFields, String...fields) {
+ return toString(target, Object.class, overrideFields, fields);
}
- public static String toString(Object target, Class<?> stopClass, Map<String, Object> overrideFields) {
+ public static String toString(Object target, Class<?> stopClass, Map<String, Object> overrideFields, String fields[]) {
try {
LinkedHashMap<String, Object> map = new LinkedHashMap<String, Object>();
addFields(target, target.getClass(), stopClass, map);
@@ -271,6 +279,10 @@
map.put(key, value);
}
}
+
+ if( fields!=null ) {
+ map.keySet().retainAll(Arrays.asList(fields));
+ }
boolean useMultiLine=false;
LinkedHashMap<String, String> props = new LinkedHashMap<String, String>();
@@ -313,7 +325,8 @@
}
buffer.append(entry.getKey());
buffer.append(": ");
- buffer.append(StringSupport.indent(entry.getValue(), 2));
+ String value = entry.getValue();
+ buffer.append(StringSupport.indent(value, 2));
}
buffer.append("}");
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/util/StringSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/util/StringSupport.java?rev=892128&r1=892127&r2=892128&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/util/StringSupport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/java/org/apache/activemq/util/StringSupport.java Fri Dec 18 04:28:49 2009
@@ -26,6 +26,9 @@
public class StringSupport {
public static String indent(String value, int spaces) {
+ if( value == null ) {
+ return null;
+ }
String indent = fillString(spaces, ' ');
return value.replaceAll("(\\r?\\n)", "$1"+indent);
}