You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/03/16 18:25:24 UTC

svn commit: r754964 [1/2] - in /activemq/sandbox/activemq-flow: ./ src/main/java/org/apache/activemq/ src/main/java/org/apache/activemq/broker/ src/main/java/org/apache/activemq/broker/openwire/ src/main/java/org/apache/activemq/broker/stomp/ src/main/...

Author: chirino
Date: Mon Mar 16 17:25:23 2009
New Revision: 754964

URL: http://svn.apache.org/viewvc?rev=754964&view=rev
Log:
Added in initial bits needed to support the STOMP protocol better.


Added:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/FrameTranslator.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/JmsFrameTranslator.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/LegacyFrameTranslator.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteConsumer.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteProducer.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteConsumer.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompBrokerTest.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompRemoteConsumer.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompRemoteProducer.java
Removed:
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java
Modified:
    activemq/sandbox/activemq-flow/pom.xml
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerConnection.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/DiscriminatableStompWireFormatFactory.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java

Modified: activemq/sandbox/activemq-flow/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/pom.xml?rev=754964&r1=754963&r2=754964&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/pom.xml (original)
+++ activemq/sandbox/activemq-flow/pom.xml Mon Mar 16 17:25:23 2009
@@ -37,6 +37,11 @@
       <groupId>org.apache.activemq</groupId>
       <artifactId>activemq-core</artifactId>
     </dependency>
+    <dependency>
+      <groupId>com.thoughtworks.xstream</groupId>
+      <artifactId>xstream</artifactId>
+      <optional>true</optional>
+    </dependency>        
     
     <dependency>
       <groupId>junit</groupId>

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java?rev=754964&r1=754963&r2=754964&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java Mon Mar 16 17:25:23 2009
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq;
 
+import java.beans.ExceptionListener;
 import java.io.IOException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -41,7 +42,9 @@
     private IDispatcher dispatcher;
     private final AtomicBoolean stopping = new AtomicBoolean();
     private  ExecutorService blockingWriter;
-
+    private ExceptionListener exceptionListener;
+    
+    
     public void setTransport(Transport transport) {
         this.transport = transport;
     }
@@ -99,13 +102,16 @@
         }
     }
     
-    public void onException(IOException error) {
+    final public void onException(IOException error) {
         if (!isStopping()) {
             onException((Exception) error);
         }
     }
 
-    public void onException(Exception error) {
+    final public void onException(Exception error) {
+        if( exceptionListener!=null ) {
+            exceptionListener.exceptionThrown(error);
+        }
     }
     
     public boolean isStopping(){ 
@@ -158,4 +164,12 @@
         return transport;
     }
 
+    public ExceptionListener getExceptionListener() {
+        return exceptionListener;
+    }
+
+    public void setExceptionListener(ExceptionListener exceptionListener) {
+        this.exceptionListener = exceptionListener;
+    }
+
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerConnection.java?rev=754964&r1=754963&r2=754964&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerConnection.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerConnection.java Mon Mar 16 17:25:23 2009
@@ -16,10 +16,15 @@
  */
 package org.apache.activemq.broker;
 
+import java.beans.ExceptionListener;
+import java.io.IOException;
+
 import org.apache.activemq.Connection;
 import org.apache.activemq.Service;
 import org.apache.activemq.broker.openwire.OpenwireProtocolHandler;
+import org.apache.activemq.broker.stomp.StompProtocolHandler;
 import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.transport.stomp.StompWireFormat;
 import org.apache.activemq.wireformat.WireFormat;
 
 public class BrokerConnection extends Connection {
@@ -34,6 +39,19 @@
         public void setWireFormat(WireFormat wf);
     }
 
+    
+    public BrokerConnection() {
+        setExceptionListener(new ExceptionListener(){
+            public void exceptionThrown(Exception error) {
+                error.printStackTrace();
+                try {
+                    stop();
+                } catch (Exception ignore) {
+                }
+            }
+        });
+    }
+    
     public Broker getBroker() {
         return broker;
     }
@@ -53,13 +71,27 @@
             protocolHandler.onCommand(command);
         } else {
             try {
+                
+                // TODO: need to make this more extensible and dynamic.  Perhaps 
+                // we should lookup the ProtocolHandler via a FactoryFinder
                 WireFormat wf = (WireFormat) command;
                 if( wf.getClass() == OpenWireFormat.class ) {
                     protocolHandler = new OpenwireProtocolHandler();
-                    protocolHandler.setConnection(this);
-                    protocolHandler.setWireFormat(wf);
-                    protocolHandler.start();
+                } else if( wf.getClass() == StompWireFormat.class ) {
+                    protocolHandler = new StompProtocolHandler();
+                } else {
+                    throw new IOException("No protocol handler available for: "+wf.getClass());
                 }
+                
+                protocolHandler.setConnection(this);
+                protocolHandler.setWireFormat(wf);
+                protocolHandler.start();
+                
+                setExceptionListener(new ExceptionListener(){
+                    public void exceptionThrown(Exception error) {
+                        protocolHandler.onException(error);
+                    }
+                });
             } catch (Exception e) {
                 onException(e);
             }
@@ -67,19 +99,6 @@
     }
     
     @Override
-    public void onException(Exception error) {
-        if( protocolHandler!=null ) {
-            protocolHandler.onException(error);
-        } else {
-            error.printStackTrace();
-            try {
-                stop();
-            } catch (Exception ignore) {
-            }
-        }
-    }
-    
-    @Override
     public void stop() throws Exception {
         super.stop();
         if( protocolHandler!=null ) {

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java?rev=754964&r1=754963&r2=754964&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java Mon Mar 16 17:25:23 2009
@@ -35,4 +35,6 @@
 
     public <T> T asType(Class<T> type);
 
+    public boolean isPersistent();
+
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java?rev=754964&r1=754963&r2=754964&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java Mon Mar 16 17:25:23 2009
@@ -81,4 +81,8 @@
         return null;
     }
 
+    public boolean isPersistent() {
+        return message.isPersistent();
+    }
+
 }

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/FrameTranslator.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/FrameTranslator.java?rev=754964&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/FrameTranslator.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/FrameTranslator.java Mon Mar 16 17:25:23 2009
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.stomp;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.broker.Destination;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.transport.stomp.ProtocolException;
+import org.apache.activemq.transport.stomp.Stomp;
+import org.apache.activemq.transport.stomp.StompFrame;
+
+/**
+ * Implementations of this interface are used to map back and forth from Stomp
+ * to ActiveMQ. There are several standard mappings which are semantically the
+ * same, the inner class, Helper, provides functions to copy those properties
+ * from one to the other
+ */
+public interface FrameTranslator {
+    
+    ActiveMQMessage convertToOpenwireMessage(StompProtocolHandler converter, StompFrame frame) throws JMSException, ProtocolException;
+    StompFrame convertFromOpenwireMessage(StompProtocolHandler converter, ActiveMQMessage message) throws IOException, JMSException;
+    
+    String convertFromOpenwireDestination(StompProtocolHandler converter, ActiveMQDestination d);
+    ActiveMQDestination convertToOpenwireDestination(StompProtocolHandler converter, String name) throws ProtocolException;
+
+    String convertFromDestination(StompProtocolHandler converter, Destination d) throws ProtocolException;
+    Destination convertToDestination(StompProtocolHandler converter, String name) throws ProtocolException;
+
+    /**
+     * Helper class which holds commonly needed functions used when implementing
+     * FrameTranslators
+     */
+    static final class Helper {
+
+        private Helper() {
+        }
+
+        public static void copyStandardHeadersFromMessageToFrame(StompProtocolHandler converter, ActiveMQMessage message, StompFrame command, FrameTranslator ft) throws IOException {
+            final Map<String, String> headers = command.getHeaders();
+            headers.put(Stomp.Headers.Message.DESTINATION, ft.convertFromOpenwireDestination(converter, message.getDestination()));
+            headers.put(Stomp.Headers.Message.MESSAGE_ID, message.getJMSMessageID());
+
+            if (message.getJMSCorrelationID() != null) {
+                headers.put(Stomp.Headers.Message.CORRELATION_ID, message.getJMSCorrelationID());
+            }
+            headers.put(Stomp.Headers.Message.EXPIRATION_TIME, "" + message.getJMSExpiration());
+
+            if (message.getJMSRedelivered()) {
+                headers.put(Stomp.Headers.Message.REDELIVERED, "true");
+            }
+            headers.put(Stomp.Headers.Message.PRORITY, "" + message.getJMSPriority());
+
+            if (message.getJMSReplyTo() != null) {
+                headers.put(Stomp.Headers.Message.REPLY_TO, ft.convertFromOpenwireDestination(converter, (ActiveMQDestination) message.getJMSReplyTo()));
+            }
+            headers.put(Stomp.Headers.Message.TIMESTAMP, "" + message.getJMSTimestamp());
+
+            if (message.getJMSType() != null) {
+                headers.put(Stomp.Headers.Message.TYPE, message.getJMSType());
+            }
+
+            // now lets add all the message headers
+            final Map<String, Object> properties = message.getProperties();
+            if (properties != null) {
+                for (Map.Entry<String, Object> prop : properties.entrySet()) {
+                    headers.put(prop.getKey(), "" + prop.getValue());
+                }
+            }
+        }
+
+        public static void copyStandardHeadersFromFrameToMessage(StompProtocolHandler converter, StompFrame command, ActiveMQMessage msg, FrameTranslator ft) throws ProtocolException, JMSException {
+            final Map<String, String> headers = new HashMap<String, String>(command.getHeaders());
+            final String destination = headers.remove(Stomp.Headers.Send.DESTINATION);
+            msg.setDestination(ft.convertToOpenwireDestination(converter, destination));
+
+            // the standard JMS headers
+            msg.setJMSCorrelationID(headers.remove(Stomp.Headers.Send.CORRELATION_ID));
+
+            Object o = headers.remove(Stomp.Headers.Send.EXPIRATION_TIME);
+            if (o != null) {
+                msg.setJMSExpiration(Long.parseLong((String)o));
+            }
+
+            o = headers.remove(Stomp.Headers.Send.PRIORITY);
+            if (o != null) {
+                msg.setJMSPriority(Integer.parseInt((String)o));
+            }
+
+            o = headers.remove(Stomp.Headers.Send.TYPE);
+            if (o != null) {
+                msg.setJMSType((String)o);
+            }
+
+            o = headers.remove(Stomp.Headers.Send.REPLY_TO);
+            if (o != null) {
+                msg.setJMSReplyTo(ft.convertToOpenwireDestination(converter, (String)o));
+            }
+
+            o = headers.remove(Stomp.Headers.Send.PERSISTENT);
+            if (o != null) {
+                msg.setPersistent("true".equals(o));
+            }
+
+            // now the general headers
+            msg.setProperties(headers);
+        }
+    }
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/JmsFrameTranslator.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/JmsFrameTranslator.java?rev=754964&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/JmsFrameTranslator.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/JmsFrameTranslator.java Mon Mar 16 17:25:23 2009
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.stomp;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.io.StringReader;
+import java.io.StringWriter;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.command.ActiveMQMapMessage;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQObjectMessage;
+import org.apache.activemq.transport.stomp.ProtocolException;
+import org.apache.activemq.transport.stomp.Stomp;
+import org.apache.activemq.transport.stomp.StompFrame;
+
+import com.thoughtworks.xstream.XStream;
+import com.thoughtworks.xstream.io.HierarchicalStreamReader;
+import com.thoughtworks.xstream.io.HierarchicalStreamWriter;
+import com.thoughtworks.xstream.io.json.JettisonMappedXmlDriver;
+import com.thoughtworks.xstream.io.xml.PrettyPrintWriter;
+import com.thoughtworks.xstream.io.xml.XppReader;
+
+/**
+ * Frame translator implementation that uses XStream to convert messages to and
+ * from XML and JSON
+ * 
+ * @author <a href="mailto:dejan@nighttale.net">Dejan Bosanac</a>
+ */
+public class JmsFrameTranslator extends LegacyFrameTranslator {
+
+	XStream xStream = null;
+
+	public ActiveMQMessage convertToOpenwireMessage(StompProtocolHandler converter,
+			StompFrame command) throws JMSException, ProtocolException {
+	    Map<String, String> headers = command.getHeaders();
+		ActiveMQMessage msg;
+		String transformation = (String) headers.get(Stomp.Headers.TRANSFORMATION);
+		if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH) || transformation.equals(Stomp.Transformations.JMS_BYTE.toString())) {
+			msg = super.convertToOpenwireMessage(converter, command);
+		} else {
+			HierarchicalStreamReader in;
+
+			try {
+				String text = new String(command.getContent(), "UTF-8");
+				switch (Stomp.Transformations.getValue(transformation)) {
+				case JMS_OBJECT_XML:
+					in = new XppReader(new StringReader(text));
+					msg = createObjectMessage(in);
+					break;
+				case JMS_OBJECT_JSON:
+					in = new JettisonMappedXmlDriver().createReader(new StringReader(text));
+					msg = createObjectMessage(in);
+					break;
+				case JMS_MAP_XML:
+					in = new XppReader(new StringReader(text));
+					msg = createMapMessage(in);
+					break;
+				case JMS_MAP_JSON:
+					in = new JettisonMappedXmlDriver().createReader(new StringReader(text));
+					msg = createMapMessage(in);
+					break;
+				default:
+					throw new Exception("Unkown transformation: " + transformation);
+				}
+			} catch (Throwable e) {
+				command.getHeaders().put(Stomp.Headers.TRANSFORMATION_ERROR, e.getMessage());
+				msg = super.convertToOpenwireMessage(converter, command);
+			}
+		}
+		FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage(converter, command, msg, this);
+		return msg;
+	}
+
+	public StompFrame convertFromOpenwireMessage(StompProtocolHandler converter,
+			ActiveMQMessage message) throws IOException, JMSException {
+		if (message.getDataStructureType() == ActiveMQObjectMessage.DATA_STRUCTURE_TYPE) {
+			StompFrame command = new StompFrame();
+			command.setAction(Stomp.Responses.MESSAGE);
+			Map<String, String> headers = new HashMap<String, String>(25);
+			command.setHeaders(headers);
+
+			FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
+					converter, message, command, this);
+			ActiveMQObjectMessage msg = (ActiveMQObjectMessage) message.copy();
+			command.setContent(marshall(msg.getObject(),
+					headers.get(Stomp.Headers.TRANSFORMATION))
+					.getBytes("UTF-8"));
+			return command;
+
+		} else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) { 
+			StompFrame command = new StompFrame();
+			command.setAction(Stomp.Responses.MESSAGE);
+			Map<String, String> headers = new HashMap<String, String>(25);
+			command.setHeaders(headers);
+
+			FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
+					converter, message, command, this);
+			ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy();
+			command.setContent(marshall((Serializable)msg.getContentMap(),
+					headers.get(Stomp.Headers.TRANSFORMATION))
+					.getBytes("UTF-8"));
+			return command;		
+		} else {
+			return super.convertFromOpenwireMessage(converter, message);
+		}
+	}
+
+	/**
+	 * Marshalls the Object to a string using XML or JSON encoding
+	 */
+	protected String marshall(Serializable object, String transformation)
+			throws JMSException {
+		StringWriter buffer = new StringWriter();
+		HierarchicalStreamWriter out;
+		if (transformation.toLowerCase().endsWith("json")) {
+			out = new JettisonMappedXmlDriver().createWriter(buffer);
+		} else {
+			out = new PrettyPrintWriter(buffer);
+		}
+		getXStream().marshal(object, out);
+		return buffer.toString();
+	}
+
+	protected ActiveMQObjectMessage createObjectMessage(HierarchicalStreamReader in) throws JMSException {
+		ActiveMQObjectMessage objMsg = new ActiveMQObjectMessage();
+		Object obj = getXStream().unmarshal(in);
+		objMsg.setObject((Serializable) obj);
+		return objMsg;
+	}
+	
+	protected ActiveMQMapMessage createMapMessage(HierarchicalStreamReader in) throws JMSException {
+		ActiveMQMapMessage mapMsg = new ActiveMQMapMessage();
+		Map<String, Object> map = (Map<String, Object>)getXStream().unmarshal(in);
+		for (String key : map.keySet()) {
+			mapMsg.setObject(key, map.get(key));
+		}
+		return mapMsg;
+	}
+	
+	
+
+	// Properties
+	// -------------------------------------------------------------------------
+	public XStream getXStream() {
+		if (xStream == null) {
+			xStream = new XStream();
+		}
+		return xStream;
+	}
+
+	public void setXStream(XStream xStream) {
+		this.xStream = xStream;
+	}
+
+
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/LegacyFrameTranslator.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/LegacyFrameTranslator.java?rev=754964&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/LegacyFrameTranslator.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/LegacyFrameTranslator.java Mon Mar 16 17:25:23 2009
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.stomp;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.broker.Destination;
+import org.apache.activemq.broker.Router;
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.transport.stomp.ProtocolException;
+import org.apache.activemq.transport.stomp.Stomp;
+import org.apache.activemq.transport.stomp.StompFrame;
+
+/**
+ * Implements ActiveMQ 4.0 translations
+ */
+public class LegacyFrameTranslator implements FrameTranslator {
+	
+	
+    public ActiveMQMessage convertToOpenwireMessage(StompProtocolHandler converter, StompFrame command) throws JMSException, ProtocolException {
+        final Map<String, String> headers = command.getHeaders();
+        final ActiveMQMessage msg;
+        if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH)) {
+            headers.remove(Stomp.Headers.CONTENT_LENGTH);
+            ActiveMQBytesMessage bm = new ActiveMQBytesMessage();
+            bm.writeBytes(command.getContent());
+            msg = bm;
+        } else {
+            ActiveMQTextMessage text = new ActiveMQTextMessage();
+            try {
+                text.setText(new String(command.getContent(), "UTF-8"));
+            } catch (Throwable e) {
+                throw new ProtocolException("Text could not bet set: " + e, false, e);
+            }
+            msg = text;
+        }
+        FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage(converter, command, msg, this);
+        return msg;
+    }
+
+    public StompFrame convertFromOpenwireMessage(StompProtocolHandler converter, ActiveMQMessage message) throws IOException, JMSException {
+        StompFrame command = new StompFrame();
+        command.setAction(Stomp.Responses.MESSAGE);
+        Map<String, String> headers = new HashMap<String, String>(25);
+        command.setHeaders(headers);
+
+        FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(converter, message, command, this);
+
+        if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) {
+
+            ActiveMQTextMessage msg = (ActiveMQTextMessage)message.copy();
+            command.setContent(msg.getText().getBytes("UTF-8"));
+
+        } else if (message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE) {
+
+            ActiveMQBytesMessage msg = (ActiveMQBytesMessage)message.copy();
+            msg.setReadOnlyBody(true);
+            byte[] data = new byte[(int)msg.getBodyLength()];
+            msg.readBytes(data);
+
+            headers.put(Stomp.Headers.CONTENT_LENGTH, "" + data.length);
+            command.setContent(data);
+        }
+        return command;
+    }
+
+    public String convertFromOpenwireDestination(StompProtocolHandler converter, ActiveMQDestination activeMQDestination) {
+        if (activeMQDestination == null) {
+            return null;
+        }
+        String physicalName = activeMQDestination.getPhysicalName();
+
+        String rc = converter.getCreatedTempDestinationName(activeMQDestination);
+        if( rc!=null ) {
+        	return rc;
+        }
+        
+        StringBuffer buffer = new StringBuffer();
+        if (activeMQDestination.isQueue()) {
+            if (activeMQDestination.isTemporary()) {
+                buffer.append("/remote-temp-queue/");
+            } else {
+                buffer.append("/queue/");
+            }
+        } else {
+            if (activeMQDestination.isTemporary()) {
+                buffer.append("/remote-temp-topic/");
+            } else {
+                buffer.append("/topic/");
+            }
+        }
+        buffer.append(physicalName);
+        return buffer.toString();
+    }
+
+    public ActiveMQDestination convertToOpenwireDestination(StompProtocolHandler converter, String name) throws ProtocolException {
+        if (name == null) {
+            return null;
+        } else if (name.startsWith("/queue/")) {
+            String qName = name.substring("/queue/".length(), name.length());
+            return ActiveMQDestination.createDestination(qName, ActiveMQDestination.QUEUE_TYPE);
+        } else if (name.startsWith("/topic/")) {
+            String tName = name.substring("/topic/".length(), name.length());
+            return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TOPIC_TYPE);
+        } else if (name.startsWith("/remote-temp-queue/")) {
+            String tName = name.substring("/remote-temp-queue/".length(), name.length());
+            return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TEMP_QUEUE_TYPE);
+        } else if (name.startsWith("/remote-temp-topic/")) {
+            String tName = name.substring("/remote-temp-topic/".length(), name.length());
+            return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TEMP_TOPIC_TYPE);
+        } else if (name.startsWith("/temp-queue/")) {
+            return converter.createTempQueue(name);
+        } else if (name.startsWith("/temp-topic/")) {
+            return converter.createTempTopic(name);
+        } else {
+            throw new ProtocolException("Illegal destination name: [" + name + "] -- ActiveMQ STOMP destinations "
+                                        + "must begine with one of: /queue/ /topic/ /temp-queue/ /temp-topic/");
+        }
+    }
+
+    public String convertFromDestination(StompProtocolHandler converter, Destination d) throws ProtocolException {
+        if (d == null) {
+            return null;
+        }
+        
+        StringBuffer buffer = new StringBuffer();
+        if( d.getDomain().equals(Router.QUEUE_DOMAIN) ) {
+            buffer.append("/queue/");
+        } else if( d.getDomain().equals(Router.QUEUE_DOMAIN) ) {
+            buffer.append("/topic/");
+        } else {
+            throw new ProtocolException("Illegal destination: Stomp can only handle queue or topic Domains");
+        }
+        
+        buffer.append(d.getName().toString());
+        return buffer.toString();
+    }
+
+    public Destination convertToDestination(StompProtocolHandler converter, String name) throws ProtocolException {
+        if (name == null) {
+            return null;
+        } else if (name.startsWith("/queue/")) {
+            String qName = name.substring("/queue/".length(), name.length());
+            return new Destination.SingleDestination(Router.QUEUE_DOMAIN, new AsciiBuffer(qName));
+        } else if (name.startsWith("/topic/")) {
+            String tName = name.substring("/topic/".length(), name.length());
+            return new Destination.SingleDestination(Router.TOPIC_DOMAIN, new AsciiBuffer(tName));
+        } else if (name.startsWith("/remote-temp-queue/")) {
+            throw new UnsupportedOperationException();
+        } else if (name.startsWith("/remote-temp-topic/")) {
+            throw new UnsupportedOperationException();
+        } else if (name.startsWith("/temp-queue/")) {
+            throw new UnsupportedOperationException();
+        } else if (name.startsWith("/temp-topic/")) {
+            throw new UnsupportedOperationException();
+        } else {
+            throw new ProtocolException("Illegal destination name: [" + name + "] -- ActiveMQ STOMP destinations "
+                                        + "must begine with one of: /queue/ /topic/ /temp-queue/ /temp-topic/");
+        }
+    }
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java?rev=754964&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java Mon Mar 16 17:25:23 2009
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.stomp;
+
+import org.apache.activemq.broker.Destination;
+import org.apache.activemq.broker.MessageDelivery;
+import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.transport.stomp.Stomp;
+import org.apache.activemq.transport.stomp.StompFrame;
+
+public class StompMessageDelivery implements MessageDelivery {
+
+    private final StompFrame frame;
+    private Destination destination;
+    private Runnable completionCallback;
+    private String receiptId;
+    private int priority = Integer.MIN_VALUE;
+    private AsciiBuffer msgId;
+
+    public StompMessageDelivery(StompFrame frame, Destination destiantion) {
+        this.frame = frame;
+        this.destination = destiantion;
+        this.frame.setAction(Stomp.Responses.MESSAGE);
+        this.receiptId = frame.getHeaders().remove(Stomp.Headers.RECEIPT_REQUESTED);
+    }
+
+    public Destination getDestination() {
+        return destination;
+    }
+
+    public int getFlowLimiterSize() {
+        return frame.getContent().length;
+    }
+
+    public int getPriority() {
+        if( priority == Integer.MIN_VALUE ) {
+            String p = frame.getHeaders().get(Stomp.Headers.Message.PRORITY);
+            try {
+                priority = (p == null) ? 4 : Integer.parseInt(p);
+            } catch (NumberFormatException e) {
+                priority = 4;
+            } 
+        }
+        return priority;
+    }
+
+    public AsciiBuffer getMsgId() {
+        if( msgId == null ) {
+            String p = frame.getHeaders().get(Stomp.Headers.Message.MESSAGE_ID);
+            if( p!=null ) {
+                msgId = new AsciiBuffer(p);
+            }
+        }
+        return msgId;
+    }
+
+    public AsciiBuffer getProducerId() {
+        return null;
+    }
+
+    public Runnable getCompletionCallback() {
+        return completionCallback;
+    }
+
+    public void setCompletionCallback(Runnable completionCallback) {
+        this.completionCallback = completionCallback;
+    }
+
+    public <T> T asType(Class<T> type) {
+        if( type == StompFrame.class ) {
+            return type.cast(frame);
+        }
+        return null;
+    }
+
+    public StompFrame getStomeFame() {
+        return frame;
+    }
+
+    public String getReceiptId() {
+        return receiptId;
+    }
+
+    public boolean isPersistent() {
+        String p = frame.getHeaders().get(Stomp.Headers.Send.PERSISTENT);
+        return "true".equals(p);
+    }
+
+}

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java?rev=754964&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java Mon Mar 16 17:25:23 2009
@@ -0,0 +1,496 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.stomp;
+
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
+
+import org.apache.activemq.WindowLimiter;
+import org.apache.activemq.broker.BrokerConnection;
+import org.apache.activemq.broker.DeliveryTarget;
+import org.apache.activemq.broker.Destination;
+import org.apache.activemq.broker.MessageDelivery;
+import org.apache.activemq.broker.Router;
+import org.apache.activemq.broker.BrokerConnection.ProtocolHandler;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.filter.BooleanExpression;
+import org.apache.activemq.filter.MessageEvaluationContext;
+import org.apache.activemq.flow.Flow;
+import org.apache.activemq.flow.FlowController;
+import org.apache.activemq.flow.IFlowController;
+import org.apache.activemq.flow.IFlowDrain;
+import org.apache.activemq.flow.IFlowSink;
+import org.apache.activemq.flow.IFlowSource;
+import org.apache.activemq.flow.ISourceController;
+import org.apache.activemq.flow.SizeLimiter;
+import org.apache.activemq.flow.ISinkController.FlowControllable;
+import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.queue.SingleFlowRelay;
+import org.apache.activemq.selector.SelectorParser;
+import org.apache.activemq.transport.stomp.Stomp;
+import org.apache.activemq.transport.stomp.StompFrame;
+import org.apache.activemq.transport.stomp.StompSubscription;
+import org.apache.activemq.util.ByteArrayOutputStream;
+import org.apache.activemq.util.FactoryFinder;
+import org.apache.activemq.wireformat.WireFormat;
+
+
+public class StompProtocolHandler implements ProtocolHandler {
+
+    interface ActionHander {
+        public void onStompFrame(StompFrame frame) throws Exception;
+    }
+
+    protected final HashMap<String, ActionHander> actionHandlers = new HashMap<String, ActionHander>();
+    protected final HashMap<String, ConsumerContext> consumers = new HashMap<String, ConsumerContext>();
+
+    protected final Object inboundMutex = new Object();
+    protected IFlowController<MessageDelivery> inboundController;
+    
+    protected BrokerConnection connection;
+    
+    // TODO: need to update the FrameTranslator to normalize to new broker API objects instead of to the openwire command set.
+    private final FrameTranslator translator = new LegacyFrameTranslator();
+    private final FactoryFinder FRAME_TRANSLATOR_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/broker/stomp/frametranslator/");
+    private SingleFlowRelay<MessageDelivery> outboundQueue;
+
+    private HashMap<AsciiBuffer, ConsumerContext> allSentMessageIds = new HashMap<AsciiBuffer, ConsumerContext>();
+
+    protected FrameTranslator translator(StompFrame frame) {
+        try {
+            String header = frame.getHeaders().get(Stomp.Headers.TRANSFORMATION);
+            if (header != null) {
+                return (FrameTranslator) FRAME_TRANSLATOR_FINDER.newInstance(header);
+            }
+        } catch (Exception ignore) {
+        }
+        return translator;
+    }
+    
+    public StompProtocolHandler() {
+        actionHandlers.put(Stomp.Commands.CONNECT, new ActionHander(){
+            public void onStompFrame(StompFrame frame) throws Exception {
+                StompFrame response = new StompFrame(Stomp.Responses.CONNECTED);
+                connection.write(response);
+            }
+        });
+        actionHandlers.put(Stomp.Commands.SEND, new ActionHander(){
+            public void onStompFrame(StompFrame frame) throws Exception {
+                String dest = frame.getHeaders().get(Stomp.Headers.Send.DESTINATION);
+                Destination destination = translator(frame).convertToDestination(StompProtocolHandler.this, dest);
+                
+                StompMessageDelivery md = new StompMessageDelivery(frame, destination);
+                while (!inboundController.offer(md, null)) {
+                    inboundController.waitForFlowUnblock();
+                }
+            }
+        });
+        actionHandlers.put(Stomp.Commands.SUBSCRIBE, new ActionHander(){
+            public void onStompFrame(StompFrame frame) throws Exception {
+                ConsumerContext ctx = new ConsumerContext(frame);
+                consumers.put(ctx.stompDestination, ctx);
+                connection.getBroker().getRouter().bind(ctx.destination, ctx);
+                ack(frame);
+            }
+        });
+        actionHandlers.put(Stomp.Commands.UNSUBSCRIBE, new ActionHander(){
+            public void onStompFrame(StompFrame frame) throws Exception {
+            }
+        });
+        actionHandlers.put(Stomp.Commands.ACK, new ActionHander(){
+            public void onStompFrame(StompFrame frame) throws Exception {
+                frame.getHeaders().get(Stomp.Headers.Ack.MESSAGE_ID);
+            }
+        });
+        actionHandlers.put(Stomp.Commands.DISCONNECT, new ActionHander(){
+            public void onStompFrame(StompFrame frame) throws Exception {
+            }
+        });
+        
+        actionHandlers.put(Stomp.Commands.ABORT_TRANSACTION, new ActionHander(){
+            public void onStompFrame(StompFrame frame) throws Exception {
+            }
+        });
+        actionHandlers.put(Stomp.Commands.BEGIN_TRANSACTION, new ActionHander(){
+            public void onStompFrame(StompFrame frame) throws Exception {
+            }
+        });
+        actionHandlers.put(Stomp.Commands.COMMIT_TRANSACTION, new ActionHander(){
+            public void onStompFrame(StompFrame frame) throws Exception {
+            }
+        });
+    }
+    
+    public void start() throws Exception {
+        // Setup the inbound processing..
+        final Flow inboundFlow = new Flow("broker-"+connection.getName()+"-inbound", false);
+        SizeLimiter<MessageDelivery> limiter = new SizeLimiter<MessageDelivery>(connection.getInputWindowSize(), connection.getInputResumeThreshold());
+        inboundController = new FlowController<MessageDelivery>(new FlowControllableAdapter() {
+            public void flowElemAccepted(ISourceController<MessageDelivery> controller, MessageDelivery elem) {
+                route(controller, elem);
+            }
+        
+            public String toString() {
+                return inboundFlow.getFlowName();
+            }
+        }, inboundFlow, limiter, inboundMutex);
+        
+        Flow outboundFlow = new Flow("broker-"+connection.getName()+"-outbound", false);
+        limiter = new SizeLimiter<MessageDelivery>(connection.getOutputWindowSize(), connection.getOutputWindowSize());
+        outboundQueue = new SingleFlowRelay<MessageDelivery>(outboundFlow, outboundFlow.getFlowName(), limiter);
+        outboundQueue.setDrain(new IFlowDrain<MessageDelivery>() {
+            public void drain(final MessageDelivery message, ISourceController<MessageDelivery> controller) {
+                StompFrame msg = message.asType(StompFrame.class);
+                connection.write(msg);
+            };
+        });
+
+    }
+
+    public void stop() throws Exception {
+    }
+
+    public void onCommand(Object o) {
+        StompFrame command = (StompFrame)o;
+        try {
+            String action = command.getAction();
+            ActionHander actionHander = actionHandlers.get(action);
+            if( actionHander == null ) {
+                throw new IOException("Unsupported command: "+action);
+            }
+            actionHander.onStompFrame(command);
+        } catch (Exception error) {
+            try {
+                ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos, "UTF-8"));
+                error.printStackTrace(stream);
+                stream.close();
+
+                HashMap<String, String> headers = new HashMap<String, String>();
+                headers.put(Stomp.Headers.Error.MESSAGE, error.getMessage());
+
+                if (command != null) {
+                    final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
+                    if (receiptId != null) {
+                        headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
+                    }
+                }
+
+                StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR, headers, baos.toByteArray());
+                connection.write(errorMessage);
+                connection.stop();
+            } catch (Exception ignore) {
+            }
+        }
+    }
+    
+    public void onException(Exception error) {
+        if( !connection.isStopping() ) {
+            try {
+                
+                ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos, "UTF-8"));
+                error.printStackTrace(stream);
+                stream.close();
+
+                sendError(error.getMessage(), baos.toByteArray());
+                connection.stop();
+                
+            } catch (Exception ignore) {
+            }
+        }
+    }
+
+    // /////////////////////////////////////////////////////////////////
+    // Internal Support Methods
+    // /////////////////////////////////////////////////////////////////
+    static class FlowControllableAdapter implements FlowControllable<MessageDelivery> {
+        public void flowElemAccepted(ISourceController<MessageDelivery> controller, MessageDelivery elem) {
+        }
+
+        public IFlowSink<MessageDelivery> getFlowSink() {
+            return null;
+        }
+
+        public IFlowSource<MessageDelivery> getFlowSource() {
+            return null;
+        }
+    }
+
+    class ConsumerContext implements DeliveryTarget {
+
+        private BooleanExpression selector;
+
+        private SingleFlowRelay<MessageDelivery> queue;
+        public WindowLimiter<MessageDelivery> limiter;
+        private FrameTranslator translator;
+        private String subscriptionId;
+        private String stompDestination;
+        private Destination destination;
+        private String ackMode;
+        
+        private LinkedHashMap<AsciiBuffer, AsciiBuffer> sentMessageIds = new LinkedHashMap<AsciiBuffer, AsciiBuffer>();
+
+        public ConsumerContext(final StompFrame subscribe) throws Exception {
+            translator = translator(subscribe);
+            
+            Map<String, String> headers = subscribe.getHeaders();
+            stompDestination = headers.get(Stomp.Headers.Subscribe.DESTINATION);
+            destination = translator.convertToDestination(StompProtocolHandler.this, stompDestination);
+            subscriptionId = headers.get(Stomp.Headers.Subscribe.ID);
+
+            ackMode = headers.get(Stomp.Headers.Subscribe.ACK_MODE);
+            if (Stomp.Headers.Subscribe.AckModeValues.CLIENT.equals(ackMode)) {
+                ackMode = StompSubscription.CLIENT_ACK;
+            } else if (Stomp.Headers.Subscribe.AckModeValues.INDIVIDUAL.equals(ackMode)) {
+                ackMode = StompSubscription.INDIVIDUAL_ACK;
+                sendError(StompSubscription.INDIVIDUAL_ACK+" not supported.");
+                connection.stop();
+                return;
+            } else {
+                ackMode = StompSubscription.AUTO_ACK;
+            }
+            
+            selector = parseSelector(subscribe);
+
+            if( ackMode != StompSubscription.AUTO_ACK ) {
+                Flow flow = new Flow("broker-"+subscriptionId+"-outbound", false);
+                limiter = new WindowLimiter<MessageDelivery>(true, flow, 1000, 500) {
+                    public int getElementSize(MessageDelivery m) {
+                        return 1;
+                    }
+                };
+                queue = new SingleFlowRelay<MessageDelivery>(flow, flow.getFlowName(), limiter);
+                queue.setDrain(new IFlowDrain<MessageDelivery>() {
+                    public void drain(final MessageDelivery message, ISourceController<MessageDelivery> controller) {
+                        StompFrame frame = message.asType(StompFrame.class);
+                        if (ackMode == StompSubscription.CLIENT_ACK || ackMode==StompSubscription.INDIVIDUAL_ACK) {
+                            synchronized(allSentMessageIds) {
+                                AsciiBuffer msgId = message.getMsgId();
+                                sentMessageIds.put(msgId, msgId);
+                                allSentMessageIds.put(msgId, ConsumerContext.this);
+                            }
+                        }
+                        connection.write(frame);
+                    };
+                });
+            } else {
+                queue = outboundQueue;
+            }
+            
+        }
+
+        public void ack(StompFrame info) throws Exception {
+            if (ackMode == StompSubscription.CLIENT_ACK || ackMode==StompSubscription.INDIVIDUAL_ACK) {
+                int credits = 0;
+                synchronized(allSentMessageIds) {
+                    AsciiBuffer mid = new AsciiBuffer(info.getHeaders().get(Stomp.Headers.Ack.MESSAGE_ID));
+                    for (Iterator<AsciiBuffer> iterator = sentMessageIds.keySet().iterator(); iterator.hasNext();) {
+                        AsciiBuffer next = iterator.next();
+                        iterator.remove();
+                        allSentMessageIds.remove(next);
+                        credits++;
+                        if( next.equals(mid) ) {
+                            break;
+                        }
+                    }
+                        
+                }
+                synchronized(queue) {
+                    limiter.onProtocolCredit(credits);
+                }
+
+            } else {
+                // We should not be getting an ACK.
+                sendError("ACK not expected.");
+                connection.stop();
+            }
+
+        }
+
+        public IFlowSink<MessageDelivery> getSink() {
+            return queue;
+        }
+
+        public boolean match(MessageDelivery message) {
+            StompFrame stompMessage = message.asType(StompFrame.class);
+            if (stompMessage == null) {
+                return false;
+            }
+
+            Message msg = message.asType(Message.class);
+            if (msg == null) {
+                return false;
+            }
+
+            // TODO: abstract the Selector bits so that it is not openwire specific.
+            MessageEvaluationContext selectorContext = new MessageEvaluationContext();
+            selectorContext.setMessageReference(msg);
+            selectorContext.setDestination(msg.getDestination());
+            try {
+                return (selector == null || selector.matches(selectorContext));
+            } catch (JMSException e) {
+                e.printStackTrace();
+                return false;
+            }
+        }
+
+    }
+    
+    private void sendError(String message) {
+        sendError(message, StompFrame.NO_DATA);
+    }
+    
+    private void sendError(String message, String details) {
+        try {
+            sendError(message, details.getBytes("UTF-8"));
+        } catch (UnsupportedEncodingException e) {
+            throw new RuntimeException(e);
+        }
+    }
+    private void sendError(String message, byte[] details) {
+        HashMap<String, String> headers = new HashMap<String, String>();
+        headers.put(Stomp.Headers.Error.MESSAGE, message);
+        StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR, headers, details);
+        connection.write(errorMessage);
+    }
+
+    private void ack(StompFrame frame) {
+        ack(frame.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED));
+    }
+    private void ack(String receiptId) {
+        if (receiptId != null) {
+            StompFrame receipt = new StompFrame();
+            receipt.setAction(Stomp.Responses.RECEIPT);
+            receipt.setHeaders(new HashMap<String, String>(1));
+            receipt.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
+            connection.write(receipt);
+        }
+    }
+
+    protected void route(ISourceController<MessageDelivery> controller, MessageDelivery messageDelivery) {
+        // TODO:
+        // Consider doing some caching of this target list. Most producers
+        // always send to
+        // the same destination.
+        Collection<DeliveryTarget> targets = connection.getBroker().getRouter().route(messageDelivery);
+        final StompMessageDelivery smd = ((StompMessageDelivery) messageDelivery);
+        String receiptId = smd.getReceiptId();
+        if (targets != null) {
+            if (receiptId!=null) {
+                // We need to ack the message once we ensure we won't loose it.
+                // We know we won't loose it once it's persisted or delivered to
+                // a consumer
+                // Setup a callback to get notifed once one of those happens.
+                if (messageDelivery.isPersistent()) {
+                    messageDelivery.setCompletionCallback(new Runnable() {
+                        public void run() {
+                            ack(smd.getStomeFame());
+                        }
+                    });
+                } else {
+                    // Let the client know the broker got the message.
+                    ack(smd.getStomeFame());
+                }
+            }
+
+            // Deliver the message to all the targets..
+            for (DeliveryTarget dt : targets) {
+                if (dt.match(messageDelivery)) {
+                    dt.getSink().add(messageDelivery, controller);
+                }
+            }
+
+        } else {
+            // Let the client know we got the message even though there
+            // were no valid targets to deliver the message to.
+            if (receiptId!=null) {
+                ack(receiptId);
+            }
+        }
+        controller.elementDispatched(messageDelivery);
+    }
+
+    static public Destination convert(ActiveMQDestination dest) {
+        if (dest.isComposite()) {
+            ActiveMQDestination[] compositeDestinations = dest.getCompositeDestinations();
+            ArrayList<Destination> d = new ArrayList<Destination>();
+            for (int i = 0; i < compositeDestinations.length; i++) {
+                d.add(convert(compositeDestinations[i]));
+            }
+            return new Destination.MultiDestination(d);
+        }
+        AsciiBuffer domain;
+        if (dest.isQueue()) {
+            domain = Router.QUEUE_DOMAIN;
+        }
+        if (dest.isTopic()) {
+            domain = Router.TOPIC_DOMAIN;
+        } else {
+            throw new IllegalArgumentException("Unsupported domain type: " + dest);
+        }
+        return new Destination.SingleDestination(domain, new AsciiBuffer(dest.getPhysicalName()));
+    }
+    
+    private static BooleanExpression parseSelector(StompFrame frame) throws InvalidSelectorException {
+        BooleanExpression rc = null;
+        String selector = frame.getHeaders().get(Stomp.Headers.Subscribe.SELECTOR);
+        if( selector !=null ) { 
+            rc = SelectorParser.parse(selector);
+        }
+        return rc;
+    }
+
+    public BrokerConnection getConnection() {
+        return connection;
+    }
+
+    public void setConnection(BrokerConnection connection) {
+        this.connection = connection;
+    }
+
+    public void setWireFormat(WireFormat wireFormat) {
+    }
+
+    public String getCreatedTempDestinationName(ActiveMQDestination activeMQDestination) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    public ActiveMQDestination createTempQueue(String name) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    public ActiveMQDestination createTempTopic(String name) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+}

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/DiscriminatableStompWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/DiscriminatableStompWireFormatFactory.java?rev=754964&r1=754963&r2=754964&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/DiscriminatableStompWireFormatFactory.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/DiscriminatableStompWireFormatFactory.java Mon Mar 16 17:25:23 2009
@@ -16,17 +16,48 @@
  */
 package org.apache.activemq.wireformat;
 
+import java.io.UnsupportedEncodingException;
+
+import org.apache.activemq.transport.stomp.Stomp;
 import org.apache.activemq.transport.stomp.StompWireFormatFactory;
 import org.apache.activemq.util.ByteSequence;
 
 public class DiscriminatableStompWireFormatFactory extends StompWireFormatFactory implements DiscriminatableWireFormatFactory {
 
+    static byte MAGIC[] = toBytes(Stomp.Commands.CONNECT);
+    
+    static private byte[] toBytes(String value) {
+        try {
+            return value.getBytes("UTF-8");
+        } catch (UnsupportedEncodingException e) {
+            throw new RuntimeException(e);
+        }
+    }
+    
     public boolean matchesWireformatHeader(ByteSequence byteSequence) {
+        byte[] data = byteSequence.data;
+        if( byteSequence.length >= MAGIC.length) {
+            int offset = byteSequence.length - MAGIC.length;
+            for(int i=0; i < byteSequence.length; i++) {
+                // Newlines are allowed to precede the STOMP connect command.
+                if( i < offset ) {
+                    if( data[i]!='\n' && data[i]!='\r' ) {
+                        return false;
+                    }
+                } else {
+                    if( data[i]!=MAGIC[i-offset] ) {
+                        return false;
+                    }
+                }
+                return true;
+            }
+        }
         return false;
     }
 
+
     public int maxWireformatHeaderLength() {
-        return 100;
+        return MAGIC.length+10;
     }
 
 }

Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java?rev=754964&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java (added)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java Mon Mar 16 17:25:23 2009
@@ -0,0 +1,512 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import java.beans.ExceptionListener;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.dispatch.IDispatcher;
+import org.apache.activemq.dispatch.PriorityDispatcher;
+import org.apache.activemq.metric.MetricAggregator;
+import org.apache.activemq.metric.Period;
+import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.queue.Mapper;
+
+public abstract class BrokerTestBase extends TestCase {
+
+    protected static final int PERFORMANCE_SAMPLES = 3;
+
+    protected static final int IO_WORK_AMOUNT = 0;
+    protected static final int FANIN_COUNT = 10;
+    protected static final int FANOUT_COUNT = 10;
+
+    protected static final int PRIORITY_LEVELS = 10;
+    protected static final boolean USE_INPUT_QUEUES = true;
+
+    // Set to put senders and consumers on separate brokers.
+    protected boolean multibroker = false;
+
+    // Set to mockup up ptp:
+    protected boolean ptp = false;
+
+    // Set to use tcp IO
+    protected boolean tcp = true;
+    // set to force marshalling even in the NON tcp case.
+    protected boolean forceMarshalling = false;
+
+    protected String sendBrokerBindURI;
+    protected String receiveBrokerBindURI;
+    protected String sendBrokerConnectURI;
+    protected String receiveBrokerConnectURI;
+
+    // Set's the number of threads to use:
+    protected final int asyncThreadPoolSize = Runtime.getRuntime().availableProcessors();
+    protected boolean usePartitionedQueue = false;
+
+    protected int producerCount;
+    protected int consumerCount;
+    protected int destCount;
+
+    protected MetricAggregator totalProducerRate = new MetricAggregator().name("Aggregate Producer Rate").unit("items");
+    protected MetricAggregator totalConsumerRate = new MetricAggregator().name("Aggregate Consumer Rate").unit("items");
+
+    protected Broker sendBroker;
+    protected Broker rcvBroker;
+    protected ArrayList<Broker> brokers = new ArrayList<Broker>();
+    protected IDispatcher dispatcher;
+    protected final AtomicLong msgIdGenerator = new AtomicLong();
+    protected final AtomicBoolean stopping = new AtomicBoolean();
+    
+    final ArrayList<RemoteProducer> producers = new ArrayList<RemoteProducer>();
+    final ArrayList<RemoteConsumer> consumers = new ArrayList<RemoteConsumer>();
+
+    static public final Mapper<AsciiBuffer, MessageDelivery> KEY_MAPPER = new Mapper<AsciiBuffer, MessageDelivery>() {
+        public AsciiBuffer map(MessageDelivery element) {
+            return element.getMsgId();
+        }
+    };
+    static public final Mapper<Integer, MessageDelivery> PARTITION_MAPPER = new Mapper<Integer, MessageDelivery>() {
+        public Integer map(MessageDelivery element) {
+            // we modulo 10 to have at most 10 partitions which the producers
+            // gets split across.
+            return (int) (element.getProducerId().hashCode() % 10);
+        }
+    };
+
+    @Override
+    protected void setUp() throws Exception {
+        dispatcher = createDispatcher();
+        dispatcher.start();
+        if (tcp) {
+            sendBrokerBindURI = "tcp://localhost:10000?wireFormat=multi";
+            receiveBrokerBindURI = "tcp://localhost:20000?wireFormat=multi";
+            sendBrokerConnectURI = "tcp://localhost:10000";
+            receiveBrokerConnectURI = "tcp://localhost:20000";
+        } else {
+            if (forceMarshalling) {
+                sendBrokerBindURI = "pipe://SendBroker";
+                receiveBrokerBindURI = "pipe://ReceiveBroker";
+            } else {
+                sendBrokerBindURI = "pipe://SendBroker";
+                receiveBrokerBindURI = "pipe://ReceiveBroker";
+            }
+            sendBrokerConnectURI = sendBrokerBindURI;
+            receiveBrokerConnectURI = receiveBrokerBindURI;
+        }
+    }
+
+    protected IDispatcher createDispatcher() {
+        return PriorityDispatcher.createPriorityDispatchPool("BrokerDispatcher", Broker.MAX_PRIORITY, asyncThreadPoolSize);
+    }
+    
+    public void test_1_1_0() throws Exception {
+        producerCount = 1;
+        destCount = 1;
+
+        createConnections();
+
+        // Start 'em up.
+        startServices();
+        try {
+            reportRates();
+        } finally {
+            stopServices();
+        }
+    }
+
+    public void test_1_1_1() throws Exception {
+        producerCount = 1;
+        destCount = 1;
+        consumerCount = 1;
+
+        createConnections();
+
+        // Start 'em up.
+        startServices();
+        try {
+            reportRates();
+        } finally {
+            stopServices();
+        }
+    }
+
+    public void test_10_1_10() throws Exception {
+        producerCount = FANIN_COUNT;
+        consumerCount = FANOUT_COUNT;
+        destCount = 1;
+
+        createConnections();
+
+        // Start 'em up.
+        startServices();
+        try {
+            reportRates();
+        } finally {
+            stopServices();
+        }
+    }
+
+    public void test_10_1_1() throws Exception {
+        producerCount = FANIN_COUNT;
+        destCount = 1;
+        consumerCount = 1;
+
+        createConnections();
+
+        // Start 'em up.
+        startServices();
+        try {
+            reportRates();
+        } finally {
+            stopServices();
+        }
+    }
+
+    public void test_1_1_10() throws Exception {
+        producerCount = 1;
+        destCount = 1;
+        consumerCount = FANOUT_COUNT;
+
+        createConnections();
+
+        // Start 'em up.
+        startServices();
+        try {
+            reportRates();
+        } finally {
+            stopServices();
+        }
+    }
+
+    public void test_2_2_2() throws Exception {
+        producerCount = 2;
+        destCount = 2;
+        consumerCount = 2;
+
+        createConnections();
+
+        // Start 'em up.
+        startServices();
+        try {
+            reportRates();
+        } finally {
+            stopServices();
+        }
+    }
+
+    public void test_10_10_10() throws Exception {
+        producerCount = 10;
+        destCount = 10;
+        consumerCount = 10;
+
+        createConnections();
+
+        // Start 'em up.
+        startServices();
+        try {
+            reportRates();
+        } finally {
+            stopServices();
+        }
+    }
+
+
+    /**
+     * Tests 2 producers sending to 1 destination with 2 consumres, but with
+     * consumers set to select only messages from each producer. 1 consumers is
+     * set to slow, the other producer should be able to send quickly.
+     * 
+     * @throws Exception
+     */
+    public void test_2_2_2_SlowConsumer() throws Exception {
+        producerCount = 2;
+        destCount = 2;
+        consumerCount = 2;
+
+        createConnections();
+        consumers.get(0).setThinkTime(50);
+
+        // Start 'em up.
+        startServices();
+        try {
+            reportRates();
+        } finally {
+            stopServices();
+        }
+    }
+
+    public void test_2_2_2_Selector() throws Exception {
+        producerCount = 2;
+        destCount = 2;
+        consumerCount = 2;
+
+        createConnections();
+
+        // Add properties to match producers to their consumers
+        for (int i = 0; i < consumerCount; i++) {
+            String property = "match" + i;
+            consumers.get(i).setSelector(property);
+            producers.get(i).setProperty(property);
+        }
+
+        // Start 'em up.
+        startServices();
+        try {
+            reportRates();
+        } finally {
+            stopServices();
+        }
+    }
+
+    /**
+     * Test sending with 1 high priority sender. The high priority sender should
+     * have higher throughput than the other low priority senders.
+     * 
+     * @throws Exception
+     */
+    public void test_2_1_1_HighPriorityProducer() throws Exception {
+
+        producerCount = 2;
+        destCount = 1;
+        consumerCount = 1;
+
+        createConnections();
+        RemoteProducer producer = producers.get(0);
+        producer.setPriority(1);
+        producer.getRate().setName("High Priority Producer Rate");
+
+        consumers.get(0).setThinkTime(1);
+
+        // Start 'em up.
+        startServices();
+        try {
+
+            System.out.println("Checking rates for test: " + getName());
+            for (int i = 0; i < PERFORMANCE_SAMPLES; i++) {
+                Period p = new Period();
+                Thread.sleep(1000 * 5);
+                System.out.println(producer.getRate().getRateSummary(p));
+                System.out.println(totalProducerRate.getRateSummary(p));
+                System.out.println(totalConsumerRate.getRateSummary(p));
+                totalProducerRate.reset();
+                totalConsumerRate.reset();
+            }
+
+        } finally {
+            stopServices();
+        }
+    }
+
+    /**
+     * Test sending with 1 high priority sender. The high priority sender should
+     * have higher throughput than the other low priority senders.
+     * 
+     * @throws Exception
+     */
+    public void test_2_1_1_MixedHighPriorityProducer() throws Exception {
+        producerCount = 2;
+        destCount = 1;
+        consumerCount = 1;
+
+        createConnections();
+        RemoteProducer producer = producers.get(0);
+        producer.setPriority(1);
+        producer.setPriorityMod(3);
+        producer.getRate().setName("High Priority Producer Rate");
+
+        consumers.get(0).setThinkTime(1);
+
+        // Start 'em up.
+        startServices();
+        try {
+
+            System.out.println("Checking rates for test: " + getName());
+            for (int i = 0; i < PERFORMANCE_SAMPLES; i++) {
+                Period p = new Period();
+                Thread.sleep(1000 * 5);
+                System.out.println(producer.getRate().getRateSummary(p));
+                System.out.println(totalProducerRate.getRateSummary(p));
+                System.out.println(totalConsumerRate.getRateSummary(p));
+                totalProducerRate.reset();
+                totalConsumerRate.reset();
+            }
+
+        } finally {
+            stopServices();
+        }
+    }
+
+    private void reportRates() throws InterruptedException {
+        System.out.println("Checking rates for test: " + getName() + ", " + (ptp ? "ptp" : "topic"));
+        for (int i = 0; i < PERFORMANCE_SAMPLES; i++) {
+            Period p = new Period();
+            Thread.sleep(1000 * 5);
+            System.out.println(totalProducerRate.getRateSummary(p));
+            System.out.println(totalConsumerRate.getRateSummary(p));
+            totalProducerRate.reset();
+            totalConsumerRate.reset();
+        }
+    }
+
+    private void createConnections() throws IOException, URISyntaxException {
+
+        if (multibroker) {
+            sendBroker = createBroker("SendBroker", sendBrokerBindURI, sendBrokerConnectURI);
+            rcvBroker = createBroker("RcvBroker", receiveBrokerBindURI, receiveBrokerConnectURI);
+            brokers.add(sendBroker);
+            brokers.add(rcvBroker);
+        } else {
+            sendBroker = rcvBroker = createBroker("Broker", sendBrokerBindURI, sendBrokerConnectURI);
+            brokers.add(sendBroker);
+        }
+
+        Destination[] dests = new Destination[destCount];
+
+        for (int i = 0; i < destCount; i++) {
+            Destination.SingleDestination bean = new Destination.SingleDestination();
+            bean.setName(new AsciiBuffer("dest" + (i + 1)));
+            bean.setDomain(ptp ? Router.QUEUE_DOMAIN : Router.TOPIC_DOMAIN);
+            dests[i] = bean;
+            if (ptp) {
+                Queue queue = createQueue(sendBroker, dests[i]);
+                sendBroker.addQueue(queue);
+                if (multibroker) {
+                    queue = createQueue(rcvBroker, dests[i]);
+                    rcvBroker.addQueue(queue);
+                }
+            }
+        }
+
+        for (int i = 0; i < producerCount; i++) {
+            Destination destination = dests[i % destCount];
+            RemoteProducer producer = createProducer(i, destination);
+            producers.add(producer);
+        }
+
+        for (int i = 0; i < consumerCount; i++) {
+            Destination destination = dests[i % destCount];
+            RemoteConsumer consumer = createConsumer(i, destination);
+            consumers.add(consumer);
+        }
+
+        // Create MultiBroker connections:
+        // if (multibroker) {
+        // Pipe<Message> pipe = new Pipe<Message>();
+        // sendBroker.createBrokerConnection(rcvBroker, pipe);
+        // rcvBroker.createBrokerConnection(sendBroker, pipe.connect());
+        // }
+    }
+
+    private RemoteConsumer createConsumer(int i, Destination destination) throws URISyntaxException {
+        RemoteConsumer consumer = createConsumer();
+        consumer.setExceptionListener(new ExceptionListener(){
+            public void exceptionThrown(Exception error) {
+                if( !stopping.get() ) {
+                    System.err.println("Consumer Async Error:");
+                    error.printStackTrace();
+                }
+            }
+        });
+        consumer.setUri(new URI(rcvBroker.getConnectUri()));
+        consumer.setDestination(destination);
+        consumer.setName("consumer" + (i + 1));
+        consumer.setTotalConsumerRate(totalConsumerRate);
+        consumer.setDispatcher(dispatcher);
+        return consumer;
+    }
+
+    abstract protected RemoteConsumer createConsumer();
+
+    private RemoteProducer createProducer(int id, Destination destination) throws URISyntaxException {
+        RemoteProducer producer = cerateProducer();
+        producer.setExceptionListener(new ExceptionListener(){
+            public void exceptionThrown(Exception error) {
+                if( !stopping.get() ) {
+                    System.err.println("Producer Async Error:");
+                    error.printStackTrace();
+                }
+            }
+        });
+        producer.setUri(new URI(sendBroker.getConnectUri()));
+        producer.setProducerId(id + 1);
+        producer.setName("producer" + (id + 1));
+        producer.setDestination(destination);
+        producer.setMessageIdGenerator(msgIdGenerator);
+        producer.setTotalProducerRate(totalProducerRate);
+        producer.setDispatcher(dispatcher);
+        return producer;
+    }
+
+    abstract protected RemoteProducer cerateProducer();
+
+    private Queue createQueue(Broker broker, Destination destination) {
+        Queue queue = new Queue();
+        queue.setBroker(broker);
+        queue.setDestination(destination);
+        queue.setKeyExtractor(KEY_MAPPER);
+        if (usePartitionedQueue) {
+            queue.setPartitionMapper(PARTITION_MAPPER);
+        }
+        return queue;
+    }
+
+    private Broker createBroker(String name, String bindURI, String connectUri) {
+        Broker broker = new Broker();
+        broker.setName(name);
+        broker.setBindUri(bindURI);
+        broker.setConnectUri(connectUri);
+        broker.setDispatcher(dispatcher);
+        return broker;
+    }
+
+    private void stopServices() throws Exception {
+        stopping.set(true);
+        for (Broker broker : brokers) {
+            broker.stop();
+        }
+        for (RemoteProducer connection : producers) {
+            connection.stop();
+        }
+        for (RemoteConsumer connection : consumers) {
+            connection.stop();
+        }
+        if (dispatcher != null) {
+            dispatcher.shutdown();
+        }
+    }
+
+    private void startServices() throws Exception {
+        for (Broker broker : brokers) {
+            broker.start();
+        }
+        for (RemoteConsumer connection : consumers) {
+            connection.start();
+        }
+
+        for (RemoteProducer connection : producers) {
+            connection.start();
+        }
+    }
+
+}

Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteConsumer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteConsumer.java?rev=754964&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteConsumer.java (added)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteConsumer.java Mon Mar 16 17:25:23 2009
@@ -0,0 +1,118 @@
+package org.apache.activemq.broker;
+
+import java.net.URI;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.Connection;
+import org.apache.activemq.flow.ISourceController;
+import org.apache.activemq.metric.MetricAggregator;
+import org.apache.activemq.metric.MetricCounter;
+import org.apache.activemq.transport.DispatchableTransport;
+import org.apache.activemq.transport.TransportFactory;
+
+abstract public class RemoteConsumer extends Connection {
+
+    protected final MetricCounter consumerRate = new MetricCounter();
+
+    protected MetricAggregator totalConsumerRate;
+    protected long thinkTime;
+    protected Destination destination;
+    protected String selector;
+    protected URI uri;
+
+    private boolean schedualWait;
+
+    public void start() throws Exception {
+        consumerRate.name("Consumer " + name + " Rate");
+        totalConsumerRate.add(consumerRate);
+
+        transport = TransportFactory.compositeConnect(uri);
+        if(transport instanceof DispatchableTransport)
+        {
+            schedualWait = true;
+        }
+        initialize();
+        super.start();
+        setupSubscription();
+        
+    }
+
+    
+    abstract protected void setupSubscription() throws Exception;
+
+    protected void messageReceived(final ISourceController<MessageDelivery> controller, final MessageDelivery elem) {
+        if( schedualWait ) {
+            if (thinkTime > 0) {
+                getDispatcher().schedule(new Runnable(){
+                    public void run() {
+                        consumerRate.increment();
+                        controller.elementDispatched(elem);
+                    }
+                }, thinkTime, TimeUnit.MILLISECONDS);
+                
+            }
+            else
+            {
+                consumerRate.increment();
+                controller.elementDispatched(elem);
+            }
+
+        } else {
+            if( thinkTime>0 ) {
+                try {
+                    Thread.sleep(thinkTime);
+                } catch (InterruptedException e) {
+                }
+            }
+            consumerRate.increment();
+            controller.elementDispatched(elem);
+        }
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public MetricAggregator getTotalConsumerRate() {
+        return totalConsumerRate;
+    }
+
+    public void setTotalConsumerRate(MetricAggregator totalConsumerRate) {
+        this.totalConsumerRate = totalConsumerRate;
+    }
+
+    public Destination getDestination() {
+        return destination;
+    }
+
+    public void setDestination(Destination destination) {
+        this.destination = destination;
+    }
+
+    public long getThinkTime() {
+        return thinkTime;
+    }
+
+    public void setThinkTime(long thinkTime) {
+        this.thinkTime = thinkTime;
+    }
+
+    public MetricCounter getConsumerRate() {
+        return consumerRate;
+    }
+
+    public String getSelector() {
+        return selector;
+    }
+
+    public void setSelector(String selector) {
+        this.selector = selector;
+    }
+
+    public URI getUri() {
+        return uri;
+    }
+
+    public void setUri(URI uri) {
+        this.uri = uri;
+    }}

Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteProducer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteProducer.java?rev=754964&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteProducer.java (added)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteProducer.java Mon Mar 16 17:25:23 2009
@@ -0,0 +1,200 @@
+package org.apache.activemq.broker;
+
+import java.net.URI;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.activemq.Connection;
+import org.apache.activemq.dispatch.IDispatcher.DispatchContext;
+import org.apache.activemq.dispatch.IDispatcher.Dispatchable;
+import org.apache.activemq.flow.IFlowController;
+import org.apache.activemq.flow.IFlowSink;
+import org.apache.activemq.flow.ISinkController;
+import org.apache.activemq.flow.ISinkController.FlowUnblockListener;
+import org.apache.activemq.metric.MetricAggregator;
+import org.apache.activemq.metric.MetricCounter;
+import org.apache.activemq.transport.TransportFactory;
+
+abstract public class RemoteProducer extends Connection implements Dispatchable, FlowUnblockListener<MessageDelivery> {
+
+    protected final MetricCounter rate = new MetricCounter();
+
+    protected AtomicLong messageIdGenerator;
+    protected int priority;
+    protected int priorityMod;
+    protected int counter;
+    protected int producerId;
+    protected Destination destination;
+    protected String property;
+    protected MetricAggregator totalProducerRate;
+    protected MessageDelivery next;
+    protected DispatchContext dispatchContext;
+    protected String filler;
+    protected int payloadSize = 20;
+    protected URI uri;
+
+    protected IFlowController<MessageDelivery> outboundController;
+    protected IFlowSink<MessageDelivery> outboundQueue;
+    
+    public void start() throws Exception {
+        
+        if( payloadSize>0 ) {
+            StringBuilder sb = new StringBuilder(payloadSize);
+            for( int i=0; i < payloadSize; ++i) {
+                sb.append((char)('a'+(i%26)));
+            }
+            filler = sb.toString();
+        }
+        
+        rate.name("Producer " + name + " Rate");
+        totalProducerRate.add(rate);
+
+
+        transport = TransportFactory.compositeConnect(uri);
+        initialize();
+        super.start();
+        
+        setupProducer();
+        
+        dispatchContext = getDispatcher().register(this, name + "-client");
+        dispatchContext.requestDispatch();
+
+    }
+
+    abstract protected void setupProducer() throws Exception;
+    
+    abstract protected void createNextMessage();
+
+    public void stop() throws Exception
+    {
+    	dispatchContext.close(false);
+    	super.stop();
+    }
+    
+	public void onFlowUnblocked(ISinkController<MessageDelivery> controller) {
+		dispatchContext.requestDispatch();
+	}
+
+	public boolean dispatch() {
+		while(true)
+		{
+			
+			if(next == null)
+			{
+	            createNextMessage();
+			}
+	        
+			//If flow controlled stop until flow control is lifted.
+			if(outboundController.isSinkBlocked())
+			{
+				if(outboundController.addUnblockListener(this))
+				{
+					return true;
+				}
+			}
+			
+			outboundQueue.add(next, null);
+	        rate.increment();
+	        next = null;
+		}
+	}
+
+    protected String createPayload() {
+        if( payloadSize>=0 ) {
+            StringBuilder sb = new StringBuilder(payloadSize);
+            sb.append(name);
+            sb.append(':');
+            sb.append(++counter);
+            sb.append(':');
+            int length = sb.length();
+            if( length <= payloadSize ) {
+                sb.append(filler.subSequence(0, payloadSize-length));
+                return sb.toString();
+            } else {
+               return sb.substring(0, payloadSize); 
+            }
+        } else {
+            return name+":"+(++counter);
+        }
+    }
+	
+	public void setName(String name) {
+        this.name = name;
+    }
+
+    public AtomicLong getMessageIdGenerator() {
+        return messageIdGenerator;
+    }
+
+    public void setMessageIdGenerator(AtomicLong msgIdGenerator) {
+        this.messageIdGenerator = msgIdGenerator;
+    }
+
+    public int getPriority() {
+        return priority;
+    }
+
+    public void setPriority(int msgPriority) {
+        this.priority = msgPriority;
+    }
+
+    public int getPriorityMod() {
+        return priorityMod;
+    }
+
+    public void setPriorityMod(int priorityMod) {
+        this.priorityMod = priorityMod;
+    }
+
+    public int getProducerId() {
+        return producerId;
+    }
+
+    public void setProducerId(int producerId) {
+        this.producerId = producerId;
+    }
+
+    public Destination getDestination() {
+        return destination;
+    }
+
+    public void setDestination(Destination destination) {
+        this.destination = destination;
+    }
+
+    public String getProperty() {
+        return property;
+    }
+
+    public void setProperty(String property) {
+        this.property = property;
+    }
+
+    public MetricAggregator getTotalProducerRate() {
+        return totalProducerRate;
+    }
+
+    public void setTotalProducerRate(MetricAggregator totalProducerRate) {
+        this.totalProducerRate = totalProducerRate;
+    }
+
+    public MetricCounter getRate() {
+        return rate;
+    }
+
+    public int getPayloadSize() {
+        return payloadSize;
+    }
+
+    public void setPayloadSize(int messageSize) {
+        this.payloadSize = messageSize;
+    }
+
+    public URI getUri() {
+        return uri;
+    }
+
+    public void setUri(URI uri) {
+        this.uri = uri;
+    }
+}
+