You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/03/16 18:25:24 UTC
svn commit: r754964 [1/2] - in /activemq/sandbox/activemq-flow: ./
src/main/java/org/apache/activemq/ src/main/java/org/apache/activemq/broker/
src/main/java/org/apache/activemq/broker/openwire/
src/main/java/org/apache/activemq/broker/stomp/ src/main/...
Author: chirino
Date: Mon Mar 16 17:25:23 2009
New Revision: 754964
URL: http://svn.apache.org/viewvc?rev=754964&view=rev
Log:
Added in initial bits needed to support the STOMP protocol better.
Added:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/FrameTranslator.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/JmsFrameTranslator.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/LegacyFrameTranslator.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteConsumer.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteProducer.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteConsumer.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompBrokerTest.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompRemoteConsumer.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompRemoteProducer.java
Removed:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java
Modified:
activemq/sandbox/activemq-flow/pom.xml
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerConnection.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/DiscriminatableStompWireFormatFactory.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java
Modified: activemq/sandbox/activemq-flow/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/pom.xml?rev=754964&r1=754963&r2=754964&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/pom.xml (original)
+++ activemq/sandbox/activemq-flow/pom.xml Mon Mar 16 17:25:23 2009
@@ -37,6 +37,11 @@
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.thoughtworks.xstream</groupId>
+ <artifactId>xstream</artifactId>
+ <optional>true</optional>
+ </dependency>
<dependency>
<groupId>junit</groupId>
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java?rev=754964&r1=754963&r2=754964&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java Mon Mar 16 17:25:23 2009
@@ -16,6 +16,7 @@
*/
package org.apache.activemq;
+import java.beans.ExceptionListener;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -41,7 +42,9 @@
private IDispatcher dispatcher;
private final AtomicBoolean stopping = new AtomicBoolean();
private ExecutorService blockingWriter;
-
+ private ExceptionListener exceptionListener;
+
+
public void setTransport(Transport transport) {
this.transport = transport;
}
@@ -99,13 +102,16 @@
}
}
- public void onException(IOException error) {
+ final public void onException(IOException error) {
if (!isStopping()) {
onException((Exception) error);
}
}
- public void onException(Exception error) {
+ final public void onException(Exception error) {
+ if( exceptionListener!=null ) {
+ exceptionListener.exceptionThrown(error);
+ }
}
public boolean isStopping(){
@@ -158,4 +164,12 @@
return transport;
}
+ public ExceptionListener getExceptionListener() {
+ return exceptionListener;
+ }
+
+ public void setExceptionListener(ExceptionListener exceptionListener) {
+ this.exceptionListener = exceptionListener;
+ }
+
}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerConnection.java?rev=754964&r1=754963&r2=754964&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerConnection.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerConnection.java Mon Mar 16 17:25:23 2009
@@ -16,10 +16,15 @@
*/
package org.apache.activemq.broker;
+import java.beans.ExceptionListener;
+import java.io.IOException;
+
import org.apache.activemq.Connection;
import org.apache.activemq.Service;
import org.apache.activemq.broker.openwire.OpenwireProtocolHandler;
+import org.apache.activemq.broker.stomp.StompProtocolHandler;
import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.transport.stomp.StompWireFormat;
import org.apache.activemq.wireformat.WireFormat;
public class BrokerConnection extends Connection {
@@ -34,6 +39,19 @@
public void setWireFormat(WireFormat wf);
}
+
+ public BrokerConnection() {
+ setExceptionListener(new ExceptionListener(){
+ public void exceptionThrown(Exception error) {
+ error.printStackTrace();
+ try {
+ stop();
+ } catch (Exception ignore) {
+ }
+ }
+ });
+ }
+
public Broker getBroker() {
return broker;
}
@@ -53,13 +71,27 @@
protocolHandler.onCommand(command);
} else {
try {
+
+ // TODO: need to make this more extensible and dynamic. Perhaps
+ // we should lookup the ProtocolHandler via a FactoryFinder
WireFormat wf = (WireFormat) command;
if( wf.getClass() == OpenWireFormat.class ) {
protocolHandler = new OpenwireProtocolHandler();
- protocolHandler.setConnection(this);
- protocolHandler.setWireFormat(wf);
- protocolHandler.start();
+ } else if( wf.getClass() == StompWireFormat.class ) {
+ protocolHandler = new StompProtocolHandler();
+ } else {
+ throw new IOException("No protocol handler available for: "+wf.getClass());
}
+
+ protocolHandler.setConnection(this);
+ protocolHandler.setWireFormat(wf);
+ protocolHandler.start();
+
+ setExceptionListener(new ExceptionListener(){
+ public void exceptionThrown(Exception error) {
+ protocolHandler.onException(error);
+ }
+ });
} catch (Exception e) {
onException(e);
}
@@ -67,19 +99,6 @@
}
@Override
- public void onException(Exception error) {
- if( protocolHandler!=null ) {
- protocolHandler.onException(error);
- } else {
- error.printStackTrace();
- try {
- stop();
- } catch (Exception ignore) {
- }
- }
- }
-
- @Override
public void stop() throws Exception {
super.stop();
if( protocolHandler!=null ) {
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java?rev=754964&r1=754963&r2=754964&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java Mon Mar 16 17:25:23 2009
@@ -35,4 +35,6 @@
public <T> T asType(Class<T> type);
+ public boolean isPersistent();
+
}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java?rev=754964&r1=754963&r2=754964&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java Mon Mar 16 17:25:23 2009
@@ -81,4 +81,8 @@
return null;
}
+ public boolean isPersistent() {
+ return message.isPersistent();
+ }
+
}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/FrameTranslator.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/FrameTranslator.java?rev=754964&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/FrameTranslator.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/FrameTranslator.java Mon Mar 16 17:25:23 2009
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.stomp;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.broker.Destination;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.transport.stomp.ProtocolException;
+import org.apache.activemq.transport.stomp.Stomp;
+import org.apache.activemq.transport.stomp.StompFrame;
+
+/**
+ * Implementations of this interface are used to map back and forth from Stomp
+ * to ActiveMQ. There are several standard mappings which are semantically the
+ * same, the inner class, Helper, provides functions to copy those properties
+ * from one to the other
+ */
+public interface FrameTranslator {
+
+ ActiveMQMessage convertToOpenwireMessage(StompProtocolHandler converter, StompFrame frame) throws JMSException, ProtocolException;
+ StompFrame convertFromOpenwireMessage(StompProtocolHandler converter, ActiveMQMessage message) throws IOException, JMSException;
+
+ String convertFromOpenwireDestination(StompProtocolHandler converter, ActiveMQDestination d);
+ ActiveMQDestination convertToOpenwireDestination(StompProtocolHandler converter, String name) throws ProtocolException;
+
+ String convertFromDestination(StompProtocolHandler converter, Destination d) throws ProtocolException;
+ Destination convertToDestination(StompProtocolHandler converter, String name) throws ProtocolException;
+
+ /**
+ * Helper class which holds commonly needed functions used when implementing
+ * FrameTranslators
+ */
+ static final class Helper {
+
+ private Helper() {
+ }
+
+ public static void copyStandardHeadersFromMessageToFrame(StompProtocolHandler converter, ActiveMQMessage message, StompFrame command, FrameTranslator ft) throws IOException {
+ final Map<String, String> headers = command.getHeaders();
+ headers.put(Stomp.Headers.Message.DESTINATION, ft.convertFromOpenwireDestination(converter, message.getDestination()));
+ headers.put(Stomp.Headers.Message.MESSAGE_ID, message.getJMSMessageID());
+
+ if (message.getJMSCorrelationID() != null) {
+ headers.put(Stomp.Headers.Message.CORRELATION_ID, message.getJMSCorrelationID());
+ }
+ headers.put(Stomp.Headers.Message.EXPIRATION_TIME, "" + message.getJMSExpiration());
+
+ if (message.getJMSRedelivered()) {
+ headers.put(Stomp.Headers.Message.REDELIVERED, "true");
+ }
+ headers.put(Stomp.Headers.Message.PRORITY, "" + message.getJMSPriority());
+
+ if (message.getJMSReplyTo() != null) {
+ headers.put(Stomp.Headers.Message.REPLY_TO, ft.convertFromOpenwireDestination(converter, (ActiveMQDestination) message.getJMSReplyTo()));
+ }
+ headers.put(Stomp.Headers.Message.TIMESTAMP, "" + message.getJMSTimestamp());
+
+ if (message.getJMSType() != null) {
+ headers.put(Stomp.Headers.Message.TYPE, message.getJMSType());
+ }
+
+ // now lets add all the message headers
+ final Map<String, Object> properties = message.getProperties();
+ if (properties != null) {
+ for (Map.Entry<String, Object> prop : properties.entrySet()) {
+ headers.put(prop.getKey(), "" + prop.getValue());
+ }
+ }
+ }
+
+ public static void copyStandardHeadersFromFrameToMessage(StompProtocolHandler converter, StompFrame command, ActiveMQMessage msg, FrameTranslator ft) throws ProtocolException, JMSException {
+ final Map<String, String> headers = new HashMap<String, String>(command.getHeaders());
+ final String destination = headers.remove(Stomp.Headers.Send.DESTINATION);
+ msg.setDestination(ft.convertToOpenwireDestination(converter, destination));
+
+ // the standard JMS headers
+ msg.setJMSCorrelationID(headers.remove(Stomp.Headers.Send.CORRELATION_ID));
+
+ Object o = headers.remove(Stomp.Headers.Send.EXPIRATION_TIME);
+ if (o != null) {
+ msg.setJMSExpiration(Long.parseLong((String)o));
+ }
+
+ o = headers.remove(Stomp.Headers.Send.PRIORITY);
+ if (o != null) {
+ msg.setJMSPriority(Integer.parseInt((String)o));
+ }
+
+ o = headers.remove(Stomp.Headers.Send.TYPE);
+ if (o != null) {
+ msg.setJMSType((String)o);
+ }
+
+ o = headers.remove(Stomp.Headers.Send.REPLY_TO);
+ if (o != null) {
+ msg.setJMSReplyTo(ft.convertToOpenwireDestination(converter, (String)o));
+ }
+
+ o = headers.remove(Stomp.Headers.Send.PERSISTENT);
+ if (o != null) {
+ msg.setPersistent("true".equals(o));
+ }
+
+ // now the general headers
+ msg.setProperties(headers);
+ }
+ }
+}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/JmsFrameTranslator.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/JmsFrameTranslator.java?rev=754964&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/JmsFrameTranslator.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/JmsFrameTranslator.java Mon Mar 16 17:25:23 2009
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.stomp;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.io.StringReader;
+import java.io.StringWriter;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.command.ActiveMQMapMessage;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQObjectMessage;
+import org.apache.activemq.transport.stomp.ProtocolException;
+import org.apache.activemq.transport.stomp.Stomp;
+import org.apache.activemq.transport.stomp.StompFrame;
+
+import com.thoughtworks.xstream.XStream;
+import com.thoughtworks.xstream.io.HierarchicalStreamReader;
+import com.thoughtworks.xstream.io.HierarchicalStreamWriter;
+import com.thoughtworks.xstream.io.json.JettisonMappedXmlDriver;
+import com.thoughtworks.xstream.io.xml.PrettyPrintWriter;
+import com.thoughtworks.xstream.io.xml.XppReader;
+
+/**
+ * Frame translator implementation that uses XStream to convert messages to and
+ * from XML and JSON
+ *
+ * @author <a href="mailto:dejan@nighttale.net">Dejan Bosanac</a>
+ */
+public class JmsFrameTranslator extends LegacyFrameTranslator {
+
+ XStream xStream = null;
+
+ public ActiveMQMessage convertToOpenwireMessage(StompProtocolHandler converter,
+ StompFrame command) throws JMSException, ProtocolException {
+ Map<String, String> headers = command.getHeaders();
+ ActiveMQMessage msg;
+ String transformation = (String) headers.get(Stomp.Headers.TRANSFORMATION);
+ if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH) || transformation.equals(Stomp.Transformations.JMS_BYTE.toString())) {
+ msg = super.convertToOpenwireMessage(converter, command);
+ } else {
+ HierarchicalStreamReader in;
+
+ try {
+ String text = new String(command.getContent(), "UTF-8");
+ switch (Stomp.Transformations.getValue(transformation)) {
+ case JMS_OBJECT_XML:
+ in = new XppReader(new StringReader(text));
+ msg = createObjectMessage(in);
+ break;
+ case JMS_OBJECT_JSON:
+ in = new JettisonMappedXmlDriver().createReader(new StringReader(text));
+ msg = createObjectMessage(in);
+ break;
+ case JMS_MAP_XML:
+ in = new XppReader(new StringReader(text));
+ msg = createMapMessage(in);
+ break;
+ case JMS_MAP_JSON:
+ in = new JettisonMappedXmlDriver().createReader(new StringReader(text));
+ msg = createMapMessage(in);
+ break;
+ default:
+ throw new Exception("Unkown transformation: " + transformation);
+ }
+ } catch (Throwable e) {
+ command.getHeaders().put(Stomp.Headers.TRANSFORMATION_ERROR, e.getMessage());
+ msg = super.convertToOpenwireMessage(converter, command);
+ }
+ }
+ FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage(converter, command, msg, this);
+ return msg;
+ }
+
+ public StompFrame convertFromOpenwireMessage(StompProtocolHandler converter,
+ ActiveMQMessage message) throws IOException, JMSException {
+ if (message.getDataStructureType() == ActiveMQObjectMessage.DATA_STRUCTURE_TYPE) {
+ StompFrame command = new StompFrame();
+ command.setAction(Stomp.Responses.MESSAGE);
+ Map<String, String> headers = new HashMap<String, String>(25);
+ command.setHeaders(headers);
+
+ FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
+ converter, message, command, this);
+ ActiveMQObjectMessage msg = (ActiveMQObjectMessage) message.copy();
+ command.setContent(marshall(msg.getObject(),
+ headers.get(Stomp.Headers.TRANSFORMATION))
+ .getBytes("UTF-8"));
+ return command;
+
+ } else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) {
+ StompFrame command = new StompFrame();
+ command.setAction(Stomp.Responses.MESSAGE);
+ Map<String, String> headers = new HashMap<String, String>(25);
+ command.setHeaders(headers);
+
+ FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
+ converter, message, command, this);
+ ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy();
+ command.setContent(marshall((Serializable)msg.getContentMap(),
+ headers.get(Stomp.Headers.TRANSFORMATION))
+ .getBytes("UTF-8"));
+ return command;
+ } else {
+ return super.convertFromOpenwireMessage(converter, message);
+ }
+ }
+
+ /**
+ * Marshalls the Object to a string using XML or JSON encoding
+ */
+ protected String marshall(Serializable object, String transformation)
+ throws JMSException {
+ StringWriter buffer = new StringWriter();
+ HierarchicalStreamWriter out;
+ if (transformation.toLowerCase().endsWith("json")) {
+ out = new JettisonMappedXmlDriver().createWriter(buffer);
+ } else {
+ out = new PrettyPrintWriter(buffer);
+ }
+ getXStream().marshal(object, out);
+ return buffer.toString();
+ }
+
+ protected ActiveMQObjectMessage createObjectMessage(HierarchicalStreamReader in) throws JMSException {
+ ActiveMQObjectMessage objMsg = new ActiveMQObjectMessage();
+ Object obj = getXStream().unmarshal(in);
+ objMsg.setObject((Serializable) obj);
+ return objMsg;
+ }
+
+ protected ActiveMQMapMessage createMapMessage(HierarchicalStreamReader in) throws JMSException {
+ ActiveMQMapMessage mapMsg = new ActiveMQMapMessage();
+ Map<String, Object> map = (Map<String, Object>)getXStream().unmarshal(in);
+ for (String key : map.keySet()) {
+ mapMsg.setObject(key, map.get(key));
+ }
+ return mapMsg;
+ }
+
+
+
+ // Properties
+ // -------------------------------------------------------------------------
+ public XStream getXStream() {
+ if (xStream == null) {
+ xStream = new XStream();
+ }
+ return xStream;
+ }
+
+ public void setXStream(XStream xStream) {
+ this.xStream = xStream;
+ }
+
+
+}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/LegacyFrameTranslator.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/LegacyFrameTranslator.java?rev=754964&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/LegacyFrameTranslator.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/LegacyFrameTranslator.java Mon Mar 16 17:25:23 2009
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.stomp;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.broker.Destination;
+import org.apache.activemq.broker.Router;
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.transport.stomp.ProtocolException;
+import org.apache.activemq.transport.stomp.Stomp;
+import org.apache.activemq.transport.stomp.StompFrame;
+
+/**
+ * Implements ActiveMQ 4.0 translations
+ */
+public class LegacyFrameTranslator implements FrameTranslator {
+
+
+ public ActiveMQMessage convertToOpenwireMessage(StompProtocolHandler converter, StompFrame command) throws JMSException, ProtocolException {
+ final Map<String, String> headers = command.getHeaders();
+ final ActiveMQMessage msg;
+ if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH)) {
+ headers.remove(Stomp.Headers.CONTENT_LENGTH);
+ ActiveMQBytesMessage bm = new ActiveMQBytesMessage();
+ bm.writeBytes(command.getContent());
+ msg = bm;
+ } else {
+ ActiveMQTextMessage text = new ActiveMQTextMessage();
+ try {
+ text.setText(new String(command.getContent(), "UTF-8"));
+ } catch (Throwable e) {
+ throw new ProtocolException("Text could not bet set: " + e, false, e);
+ }
+ msg = text;
+ }
+ FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage(converter, command, msg, this);
+ return msg;
+ }
+
+ public StompFrame convertFromOpenwireMessage(StompProtocolHandler converter, ActiveMQMessage message) throws IOException, JMSException {
+ StompFrame command = new StompFrame();
+ command.setAction(Stomp.Responses.MESSAGE);
+ Map<String, String> headers = new HashMap<String, String>(25);
+ command.setHeaders(headers);
+
+ FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(converter, message, command, this);
+
+ if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) {
+
+ ActiveMQTextMessage msg = (ActiveMQTextMessage)message.copy();
+ command.setContent(msg.getText().getBytes("UTF-8"));
+
+ } else if (message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE) {
+
+ ActiveMQBytesMessage msg = (ActiveMQBytesMessage)message.copy();
+ msg.setReadOnlyBody(true);
+ byte[] data = new byte[(int)msg.getBodyLength()];
+ msg.readBytes(data);
+
+ headers.put(Stomp.Headers.CONTENT_LENGTH, "" + data.length);
+ command.setContent(data);
+ }
+ return command;
+ }
+
+ public String convertFromOpenwireDestination(StompProtocolHandler converter, ActiveMQDestination activeMQDestination) {
+ if (activeMQDestination == null) {
+ return null;
+ }
+ String physicalName = activeMQDestination.getPhysicalName();
+
+ String rc = converter.getCreatedTempDestinationName(activeMQDestination);
+ if( rc!=null ) {
+ return rc;
+ }
+
+ StringBuffer buffer = new StringBuffer();
+ if (activeMQDestination.isQueue()) {
+ if (activeMQDestination.isTemporary()) {
+ buffer.append("/remote-temp-queue/");
+ } else {
+ buffer.append("/queue/");
+ }
+ } else {
+ if (activeMQDestination.isTemporary()) {
+ buffer.append("/remote-temp-topic/");
+ } else {
+ buffer.append("/topic/");
+ }
+ }
+ buffer.append(physicalName);
+ return buffer.toString();
+ }
+
+ public ActiveMQDestination convertToOpenwireDestination(StompProtocolHandler converter, String name) throws ProtocolException {
+ if (name == null) {
+ return null;
+ } else if (name.startsWith("/queue/")) {
+ String qName = name.substring("/queue/".length(), name.length());
+ return ActiveMQDestination.createDestination(qName, ActiveMQDestination.QUEUE_TYPE);
+ } else if (name.startsWith("/topic/")) {
+ String tName = name.substring("/topic/".length(), name.length());
+ return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TOPIC_TYPE);
+ } else if (name.startsWith("/remote-temp-queue/")) {
+ String tName = name.substring("/remote-temp-queue/".length(), name.length());
+ return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TEMP_QUEUE_TYPE);
+ } else if (name.startsWith("/remote-temp-topic/")) {
+ String tName = name.substring("/remote-temp-topic/".length(), name.length());
+ return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TEMP_TOPIC_TYPE);
+ } else if (name.startsWith("/temp-queue/")) {
+ return converter.createTempQueue(name);
+ } else if (name.startsWith("/temp-topic/")) {
+ return converter.createTempTopic(name);
+ } else {
+ throw new ProtocolException("Illegal destination name: [" + name + "] -- ActiveMQ STOMP destinations "
+ + "must begine with one of: /queue/ /topic/ /temp-queue/ /temp-topic/");
+ }
+ }
+
+ public String convertFromDestination(StompProtocolHandler converter, Destination d) throws ProtocolException {
+ if (d == null) {
+ return null;
+ }
+
+ StringBuffer buffer = new StringBuffer();
+ if( d.getDomain().equals(Router.QUEUE_DOMAIN) ) {
+ buffer.append("/queue/");
+ } else if( d.getDomain().equals(Router.QUEUE_DOMAIN) ) {
+ buffer.append("/topic/");
+ } else {
+ throw new ProtocolException("Illegal destination: Stomp can only handle queue or topic Domains");
+ }
+
+ buffer.append(d.getName().toString());
+ return buffer.toString();
+ }
+
+ public Destination convertToDestination(StompProtocolHandler converter, String name) throws ProtocolException {
+ if (name == null) {
+ return null;
+ } else if (name.startsWith("/queue/")) {
+ String qName = name.substring("/queue/".length(), name.length());
+ return new Destination.SingleDestination(Router.QUEUE_DOMAIN, new AsciiBuffer(qName));
+ } else if (name.startsWith("/topic/")) {
+ String tName = name.substring("/topic/".length(), name.length());
+ return new Destination.SingleDestination(Router.TOPIC_DOMAIN, new AsciiBuffer(tName));
+ } else if (name.startsWith("/remote-temp-queue/")) {
+ throw new UnsupportedOperationException();
+ } else if (name.startsWith("/remote-temp-topic/")) {
+ throw new UnsupportedOperationException();
+ } else if (name.startsWith("/temp-queue/")) {
+ throw new UnsupportedOperationException();
+ } else if (name.startsWith("/temp-topic/")) {
+ throw new UnsupportedOperationException();
+ } else {
+ throw new ProtocolException("Illegal destination name: [" + name + "] -- ActiveMQ STOMP destinations "
+ + "must begine with one of: /queue/ /topic/ /temp-queue/ /temp-topic/");
+ }
+ }
+}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java?rev=754964&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java Mon Mar 16 17:25:23 2009
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.stomp;
+
+import org.apache.activemq.broker.Destination;
+import org.apache.activemq.broker.MessageDelivery;
+import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.transport.stomp.Stomp;
+import org.apache.activemq.transport.stomp.StompFrame;
+
+public class StompMessageDelivery implements MessageDelivery {
+
+ private final StompFrame frame;
+ private Destination destination;
+ private Runnable completionCallback;
+ private String receiptId;
+ private int priority = Integer.MIN_VALUE;
+ private AsciiBuffer msgId;
+
+ public StompMessageDelivery(StompFrame frame, Destination destiantion) {
+ this.frame = frame;
+ this.destination = destiantion;
+ this.frame.setAction(Stomp.Responses.MESSAGE);
+ this.receiptId = frame.getHeaders().remove(Stomp.Headers.RECEIPT_REQUESTED);
+ }
+
+ public Destination getDestination() {
+ return destination;
+ }
+
+ public int getFlowLimiterSize() {
+ return frame.getContent().length;
+ }
+
+ public int getPriority() {
+ if( priority == Integer.MIN_VALUE ) {
+ String p = frame.getHeaders().get(Stomp.Headers.Message.PRORITY);
+ try {
+ priority = (p == null) ? 4 : Integer.parseInt(p);
+ } catch (NumberFormatException e) {
+ priority = 4;
+ }
+ }
+ return priority;
+ }
+
+ public AsciiBuffer getMsgId() {
+ if( msgId == null ) {
+ String p = frame.getHeaders().get(Stomp.Headers.Message.MESSAGE_ID);
+ if( p!=null ) {
+ msgId = new AsciiBuffer(p);
+ }
+ }
+ return msgId;
+ }
+
+ public AsciiBuffer getProducerId() {
+ return null;
+ }
+
+ public Runnable getCompletionCallback() {
+ return completionCallback;
+ }
+
+ public void setCompletionCallback(Runnable completionCallback) {
+ this.completionCallback = completionCallback;
+ }
+
+ public <T> T asType(Class<T> type) {
+ if( type == StompFrame.class ) {
+ return type.cast(frame);
+ }
+ return null;
+ }
+
+ public StompFrame getStomeFame() {
+ return frame;
+ }
+
+ public String getReceiptId() {
+ return receiptId;
+ }
+
+ public boolean isPersistent() {
+ String p = frame.getHeaders().get(Stomp.Headers.Send.PERSISTENT);
+ return "true".equals(p);
+ }
+
+}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java?rev=754964&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java Mon Mar 16 17:25:23 2009
@@ -0,0 +1,496 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.stomp;
+
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
+
+import org.apache.activemq.WindowLimiter;
+import org.apache.activemq.broker.BrokerConnection;
+import org.apache.activemq.broker.DeliveryTarget;
+import org.apache.activemq.broker.Destination;
+import org.apache.activemq.broker.MessageDelivery;
+import org.apache.activemq.broker.Router;
+import org.apache.activemq.broker.BrokerConnection.ProtocolHandler;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.filter.BooleanExpression;
+import org.apache.activemq.filter.MessageEvaluationContext;
+import org.apache.activemq.flow.Flow;
+import org.apache.activemq.flow.FlowController;
+import org.apache.activemq.flow.IFlowController;
+import org.apache.activemq.flow.IFlowDrain;
+import org.apache.activemq.flow.IFlowSink;
+import org.apache.activemq.flow.IFlowSource;
+import org.apache.activemq.flow.ISourceController;
+import org.apache.activemq.flow.SizeLimiter;
+import org.apache.activemq.flow.ISinkController.FlowControllable;
+import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.queue.SingleFlowRelay;
+import org.apache.activemq.selector.SelectorParser;
+import org.apache.activemq.transport.stomp.Stomp;
+import org.apache.activemq.transport.stomp.StompFrame;
+import org.apache.activemq.transport.stomp.StompSubscription;
+import org.apache.activemq.util.ByteArrayOutputStream;
+import org.apache.activemq.util.FactoryFinder;
+import org.apache.activemq.wireformat.WireFormat;
+
+
+public class StompProtocolHandler implements ProtocolHandler {
+
+ interface ActionHander {
+ public void onStompFrame(StompFrame frame) throws Exception;
+ }
+
+ protected final HashMap<String, ActionHander> actionHandlers = new HashMap<String, ActionHander>();
+ protected final HashMap<String, ConsumerContext> consumers = new HashMap<String, ConsumerContext>();
+
+ protected final Object inboundMutex = new Object();
+ protected IFlowController<MessageDelivery> inboundController;
+
+ protected BrokerConnection connection;
+
+ // TODO: need to update the FrameTranslator to normalize to new broker API objects instead of to the openwire command set.
+ private final FrameTranslator translator = new LegacyFrameTranslator();
+ private final FactoryFinder FRAME_TRANSLATOR_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/broker/stomp/frametranslator/");
+ private SingleFlowRelay<MessageDelivery> outboundQueue;
+
+ private HashMap<AsciiBuffer, ConsumerContext> allSentMessageIds = new HashMap<AsciiBuffer, ConsumerContext>();
+
+ protected FrameTranslator translator(StompFrame frame) {
+ try {
+ String header = frame.getHeaders().get(Stomp.Headers.TRANSFORMATION);
+ if (header != null) {
+ return (FrameTranslator) FRAME_TRANSLATOR_FINDER.newInstance(header);
+ }
+ } catch (Exception ignore) {
+ }
+ return translator;
+ }
+
+ public StompProtocolHandler() {
+ actionHandlers.put(Stomp.Commands.CONNECT, new ActionHander(){
+ public void onStompFrame(StompFrame frame) throws Exception {
+ StompFrame response = new StompFrame(Stomp.Responses.CONNECTED);
+ connection.write(response);
+ }
+ });
+ actionHandlers.put(Stomp.Commands.SEND, new ActionHander(){
+ public void onStompFrame(StompFrame frame) throws Exception {
+ String dest = frame.getHeaders().get(Stomp.Headers.Send.DESTINATION);
+ Destination destination = translator(frame).convertToDestination(StompProtocolHandler.this, dest);
+
+ StompMessageDelivery md = new StompMessageDelivery(frame, destination);
+ while (!inboundController.offer(md, null)) {
+ inboundController.waitForFlowUnblock();
+ }
+ }
+ });
+ actionHandlers.put(Stomp.Commands.SUBSCRIBE, new ActionHander(){
+ public void onStompFrame(StompFrame frame) throws Exception {
+ ConsumerContext ctx = new ConsumerContext(frame);
+ consumers.put(ctx.stompDestination, ctx);
+ connection.getBroker().getRouter().bind(ctx.destination, ctx);
+ ack(frame);
+ }
+ });
+ actionHandlers.put(Stomp.Commands.UNSUBSCRIBE, new ActionHander(){
+ public void onStompFrame(StompFrame frame) throws Exception {
+ }
+ });
+ actionHandlers.put(Stomp.Commands.ACK, new ActionHander(){
+ public void onStompFrame(StompFrame frame) throws Exception {
+ frame.getHeaders().get(Stomp.Headers.Ack.MESSAGE_ID);
+ }
+ });
+ actionHandlers.put(Stomp.Commands.DISCONNECT, new ActionHander(){
+ public void onStompFrame(StompFrame frame) throws Exception {
+ }
+ });
+
+ actionHandlers.put(Stomp.Commands.ABORT_TRANSACTION, new ActionHander(){
+ public void onStompFrame(StompFrame frame) throws Exception {
+ }
+ });
+ actionHandlers.put(Stomp.Commands.BEGIN_TRANSACTION, new ActionHander(){
+ public void onStompFrame(StompFrame frame) throws Exception {
+ }
+ });
+ actionHandlers.put(Stomp.Commands.COMMIT_TRANSACTION, new ActionHander(){
+ public void onStompFrame(StompFrame frame) throws Exception {
+ }
+ });
+ }
+
+ public void start() throws Exception {
+ // Setup the inbound processing..
+ final Flow inboundFlow = new Flow("broker-"+connection.getName()+"-inbound", false);
+ SizeLimiter<MessageDelivery> limiter = new SizeLimiter<MessageDelivery>(connection.getInputWindowSize(), connection.getInputResumeThreshold());
+ inboundController = new FlowController<MessageDelivery>(new FlowControllableAdapter() {
+ public void flowElemAccepted(ISourceController<MessageDelivery> controller, MessageDelivery elem) {
+ route(controller, elem);
+ }
+
+ public String toString() {
+ return inboundFlow.getFlowName();
+ }
+ }, inboundFlow, limiter, inboundMutex);
+
+ Flow outboundFlow = new Flow("broker-"+connection.getName()+"-outbound", false);
+ limiter = new SizeLimiter<MessageDelivery>(connection.getOutputWindowSize(), connection.getOutputWindowSize());
+ outboundQueue = new SingleFlowRelay<MessageDelivery>(outboundFlow, outboundFlow.getFlowName(), limiter);
+ outboundQueue.setDrain(new IFlowDrain<MessageDelivery>() {
+ public void drain(final MessageDelivery message, ISourceController<MessageDelivery> controller) {
+ StompFrame msg = message.asType(StompFrame.class);
+ connection.write(msg);
+ };
+ });
+
+ }
+
+ public void stop() throws Exception {
+ }
+
+ public void onCommand(Object o) {
+ StompFrame command = (StompFrame)o;
+ try {
+ String action = command.getAction();
+ ActionHander actionHander = actionHandlers.get(action);
+ if( actionHander == null ) {
+ throw new IOException("Unsupported command: "+action);
+ }
+ actionHander.onStompFrame(command);
+ } catch (Exception error) {
+ try {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos, "UTF-8"));
+ error.printStackTrace(stream);
+ stream.close();
+
+ HashMap<String, String> headers = new HashMap<String, String>();
+ headers.put(Stomp.Headers.Error.MESSAGE, error.getMessage());
+
+ if (command != null) {
+ final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
+ if (receiptId != null) {
+ headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
+ }
+ }
+
+ StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR, headers, baos.toByteArray());
+ connection.write(errorMessage);
+ connection.stop();
+ } catch (Exception ignore) {
+ }
+ }
+ }
+
+ public void onException(Exception error) {
+ if( !connection.isStopping() ) {
+ try {
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos, "UTF-8"));
+ error.printStackTrace(stream);
+ stream.close();
+
+ sendError(error.getMessage(), baos.toByteArray());
+ connection.stop();
+
+ } catch (Exception ignore) {
+ }
+ }
+ }
+
+ // /////////////////////////////////////////////////////////////////
+ // Internal Support Methods
+ // /////////////////////////////////////////////////////////////////
+ static class FlowControllableAdapter implements FlowControllable<MessageDelivery> {
+ public void flowElemAccepted(ISourceController<MessageDelivery> controller, MessageDelivery elem) {
+ }
+
+ public IFlowSink<MessageDelivery> getFlowSink() {
+ return null;
+ }
+
+ public IFlowSource<MessageDelivery> getFlowSource() {
+ return null;
+ }
+ }
+
+ class ConsumerContext implements DeliveryTarget {
+
+ private BooleanExpression selector;
+
+ private SingleFlowRelay<MessageDelivery> queue;
+ public WindowLimiter<MessageDelivery> limiter;
+ private FrameTranslator translator;
+ private String subscriptionId;
+ private String stompDestination;
+ private Destination destination;
+ private String ackMode;
+
+ private LinkedHashMap<AsciiBuffer, AsciiBuffer> sentMessageIds = new LinkedHashMap<AsciiBuffer, AsciiBuffer>();
+
+ public ConsumerContext(final StompFrame subscribe) throws Exception {
+ translator = translator(subscribe);
+
+ Map<String, String> headers = subscribe.getHeaders();
+ stompDestination = headers.get(Stomp.Headers.Subscribe.DESTINATION);
+ destination = translator.convertToDestination(StompProtocolHandler.this, stompDestination);
+ subscriptionId = headers.get(Stomp.Headers.Subscribe.ID);
+
+ ackMode = headers.get(Stomp.Headers.Subscribe.ACK_MODE);
+ if (Stomp.Headers.Subscribe.AckModeValues.CLIENT.equals(ackMode)) {
+ ackMode = StompSubscription.CLIENT_ACK;
+ } else if (Stomp.Headers.Subscribe.AckModeValues.INDIVIDUAL.equals(ackMode)) {
+ ackMode = StompSubscription.INDIVIDUAL_ACK;
+ sendError(StompSubscription.INDIVIDUAL_ACK+" not supported.");
+ connection.stop();
+ return;
+ } else {
+ ackMode = StompSubscription.AUTO_ACK;
+ }
+
+ selector = parseSelector(subscribe);
+
+ if( ackMode != StompSubscription.AUTO_ACK ) {
+ Flow flow = new Flow("broker-"+subscriptionId+"-outbound", false);
+ limiter = new WindowLimiter<MessageDelivery>(true, flow, 1000, 500) {
+ public int getElementSize(MessageDelivery m) {
+ return 1;
+ }
+ };
+ queue = new SingleFlowRelay<MessageDelivery>(flow, flow.getFlowName(), limiter);
+ queue.setDrain(new IFlowDrain<MessageDelivery>() {
+ public void drain(final MessageDelivery message, ISourceController<MessageDelivery> controller) {
+ StompFrame frame = message.asType(StompFrame.class);
+ if (ackMode == StompSubscription.CLIENT_ACK || ackMode==StompSubscription.INDIVIDUAL_ACK) {
+ synchronized(allSentMessageIds) {
+ AsciiBuffer msgId = message.getMsgId();
+ sentMessageIds.put(msgId, msgId);
+ allSentMessageIds.put(msgId, ConsumerContext.this);
+ }
+ }
+ connection.write(frame);
+ };
+ });
+ } else {
+ queue = outboundQueue;
+ }
+
+ }
+
+ public void ack(StompFrame info) throws Exception {
+ if (ackMode == StompSubscription.CLIENT_ACK || ackMode==StompSubscription.INDIVIDUAL_ACK) {
+ int credits = 0;
+ synchronized(allSentMessageIds) {
+ AsciiBuffer mid = new AsciiBuffer(info.getHeaders().get(Stomp.Headers.Ack.MESSAGE_ID));
+ for (Iterator<AsciiBuffer> iterator = sentMessageIds.keySet().iterator(); iterator.hasNext();) {
+ AsciiBuffer next = iterator.next();
+ iterator.remove();
+ allSentMessageIds.remove(next);
+ credits++;
+ if( next.equals(mid) ) {
+ break;
+ }
+ }
+
+ }
+ synchronized(queue) {
+ limiter.onProtocolCredit(credits);
+ }
+
+ } else {
+ // We should not be getting an ACK.
+ sendError("ACK not expected.");
+ connection.stop();
+ }
+
+ }
+
+ public IFlowSink<MessageDelivery> getSink() {
+ return queue;
+ }
+
+ public boolean match(MessageDelivery message) {
+ StompFrame stompMessage = message.asType(StompFrame.class);
+ if (stompMessage == null) {
+ return false;
+ }
+
+ Message msg = message.asType(Message.class);
+ if (msg == null) {
+ return false;
+ }
+
+ // TODO: abstract the Selector bits so that it is not openwire specific.
+ MessageEvaluationContext selectorContext = new MessageEvaluationContext();
+ selectorContext.setMessageReference(msg);
+ selectorContext.setDestination(msg.getDestination());
+ try {
+ return (selector == null || selector.matches(selectorContext));
+ } catch (JMSException e) {
+ e.printStackTrace();
+ return false;
+ }
+ }
+
+ }
+
+ private void sendError(String message) {
+ sendError(message, StompFrame.NO_DATA);
+ }
+
+ private void sendError(String message, String details) {
+ try {
+ sendError(message, details.getBytes("UTF-8"));
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ private void sendError(String message, byte[] details) {
+ HashMap<String, String> headers = new HashMap<String, String>();
+ headers.put(Stomp.Headers.Error.MESSAGE, message);
+ StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR, headers, details);
+ connection.write(errorMessage);
+ }
+
+ private void ack(StompFrame frame) {
+ ack(frame.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED));
+ }
+ private void ack(String receiptId) {
+ if (receiptId != null) {
+ StompFrame receipt = new StompFrame();
+ receipt.setAction(Stomp.Responses.RECEIPT);
+ receipt.setHeaders(new HashMap<String, String>(1));
+ receipt.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
+ connection.write(receipt);
+ }
+ }
+
+ protected void route(ISourceController<MessageDelivery> controller, MessageDelivery messageDelivery) {
+ // TODO:
+ // Consider doing some caching of this target list. Most producers
+ // always send to
+ // the same destination.
+ Collection<DeliveryTarget> targets = connection.getBroker().getRouter().route(messageDelivery);
+ final StompMessageDelivery smd = ((StompMessageDelivery) messageDelivery);
+ String receiptId = smd.getReceiptId();
+ if (targets != null) {
+ if (receiptId!=null) {
+ // We need to ack the message once we ensure we won't loose it.
+ // We know we won't loose it once it's persisted or delivered to
+ // a consumer
+ // Setup a callback to get notifed once one of those happens.
+ if (messageDelivery.isPersistent()) {
+ messageDelivery.setCompletionCallback(new Runnable() {
+ public void run() {
+ ack(smd.getStomeFame());
+ }
+ });
+ } else {
+ // Let the client know the broker got the message.
+ ack(smd.getStomeFame());
+ }
+ }
+
+ // Deliver the message to all the targets..
+ for (DeliveryTarget dt : targets) {
+ if (dt.match(messageDelivery)) {
+ dt.getSink().add(messageDelivery, controller);
+ }
+ }
+
+ } else {
+ // Let the client know we got the message even though there
+ // were no valid targets to deliver the message to.
+ if (receiptId!=null) {
+ ack(receiptId);
+ }
+ }
+ controller.elementDispatched(messageDelivery);
+ }
+
+ static public Destination convert(ActiveMQDestination dest) {
+ if (dest.isComposite()) {
+ ActiveMQDestination[] compositeDestinations = dest.getCompositeDestinations();
+ ArrayList<Destination> d = new ArrayList<Destination>();
+ for (int i = 0; i < compositeDestinations.length; i++) {
+ d.add(convert(compositeDestinations[i]));
+ }
+ return new Destination.MultiDestination(d);
+ }
+ AsciiBuffer domain;
+ if (dest.isQueue()) {
+ domain = Router.QUEUE_DOMAIN;
+ }
+ if (dest.isTopic()) {
+ domain = Router.TOPIC_DOMAIN;
+ } else {
+ throw new IllegalArgumentException("Unsupported domain type: " + dest);
+ }
+ return new Destination.SingleDestination(domain, new AsciiBuffer(dest.getPhysicalName()));
+ }
+
+ private static BooleanExpression parseSelector(StompFrame frame) throws InvalidSelectorException {
+ BooleanExpression rc = null;
+ String selector = frame.getHeaders().get(Stomp.Headers.Subscribe.SELECTOR);
+ if( selector !=null ) {
+ rc = SelectorParser.parse(selector);
+ }
+ return rc;
+ }
+
+ public BrokerConnection getConnection() {
+ return connection;
+ }
+
+ public void setConnection(BrokerConnection connection) {
+ this.connection = connection;
+ }
+
+ public void setWireFormat(WireFormat wireFormat) {
+ }
+
+ public String getCreatedTempDestinationName(ActiveMQDestination activeMQDestination) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public ActiveMQDestination createTempQueue(String name) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public ActiveMQDestination createTempTopic(String name) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/DiscriminatableStompWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/DiscriminatableStompWireFormatFactory.java?rev=754964&r1=754963&r2=754964&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/DiscriminatableStompWireFormatFactory.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/DiscriminatableStompWireFormatFactory.java Mon Mar 16 17:25:23 2009
@@ -16,17 +16,48 @@
*/
package org.apache.activemq.wireformat;
+import java.io.UnsupportedEncodingException;
+
+import org.apache.activemq.transport.stomp.Stomp;
import org.apache.activemq.transport.stomp.StompWireFormatFactory;
import org.apache.activemq.util.ByteSequence;
public class DiscriminatableStompWireFormatFactory extends StompWireFormatFactory implements DiscriminatableWireFormatFactory {
+ static byte MAGIC[] = toBytes(Stomp.Commands.CONNECT);
+
+ static private byte[] toBytes(String value) {
+ try {
+ return value.getBytes("UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
public boolean matchesWireformatHeader(ByteSequence byteSequence) {
+ byte[] data = byteSequence.data;
+ if( byteSequence.length >= MAGIC.length) {
+ int offset = byteSequence.length - MAGIC.length;
+ for(int i=0; i < byteSequence.length; i++) {
+ // Newlines are allowed to precede the STOMP connect command.
+ if( i < offset ) {
+ if( data[i]!='\n' && data[i]!='\r' ) {
+ return false;
+ }
+ } else {
+ if( data[i]!=MAGIC[i-offset] ) {
+ return false;
+ }
+ }
+ return true;
+ }
+ }
return false;
}
+
public int maxWireformatHeaderLength() {
- return 100;
+ return MAGIC.length+10;
}
}
Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java?rev=754964&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java (added)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java Mon Mar 16 17:25:23 2009
@@ -0,0 +1,512 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import java.beans.ExceptionListener;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.dispatch.IDispatcher;
+import org.apache.activemq.dispatch.PriorityDispatcher;
+import org.apache.activemq.metric.MetricAggregator;
+import org.apache.activemq.metric.Period;
+import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.queue.Mapper;
+
+public abstract class BrokerTestBase extends TestCase {
+
+ protected static final int PERFORMANCE_SAMPLES = 3;
+
+ protected static final int IO_WORK_AMOUNT = 0;
+ protected static final int FANIN_COUNT = 10;
+ protected static final int FANOUT_COUNT = 10;
+
+ protected static final int PRIORITY_LEVELS = 10;
+ protected static final boolean USE_INPUT_QUEUES = true;
+
+ // Set to put senders and consumers on separate brokers.
+ protected boolean multibroker = false;
+
+ // Set to mockup up ptp:
+ protected boolean ptp = false;
+
+ // Set to use tcp IO
+ protected boolean tcp = true;
+ // set to force marshalling even in the NON tcp case.
+ protected boolean forceMarshalling = false;
+
+ protected String sendBrokerBindURI;
+ protected String receiveBrokerBindURI;
+ protected String sendBrokerConnectURI;
+ protected String receiveBrokerConnectURI;
+
+ // Set's the number of threads to use:
+ protected final int asyncThreadPoolSize = Runtime.getRuntime().availableProcessors();
+ protected boolean usePartitionedQueue = false;
+
+ protected int producerCount;
+ protected int consumerCount;
+ protected int destCount;
+
+ protected MetricAggregator totalProducerRate = new MetricAggregator().name("Aggregate Producer Rate").unit("items");
+ protected MetricAggregator totalConsumerRate = new MetricAggregator().name("Aggregate Consumer Rate").unit("items");
+
+ protected Broker sendBroker;
+ protected Broker rcvBroker;
+ protected ArrayList<Broker> brokers = new ArrayList<Broker>();
+ protected IDispatcher dispatcher;
+ protected final AtomicLong msgIdGenerator = new AtomicLong();
+ protected final AtomicBoolean stopping = new AtomicBoolean();
+
+ final ArrayList<RemoteProducer> producers = new ArrayList<RemoteProducer>();
+ final ArrayList<RemoteConsumer> consumers = new ArrayList<RemoteConsumer>();
+
+ static public final Mapper<AsciiBuffer, MessageDelivery> KEY_MAPPER = new Mapper<AsciiBuffer, MessageDelivery>() {
+ public AsciiBuffer map(MessageDelivery element) {
+ return element.getMsgId();
+ }
+ };
+ static public final Mapper<Integer, MessageDelivery> PARTITION_MAPPER = new Mapper<Integer, MessageDelivery>() {
+ public Integer map(MessageDelivery element) {
+ // we modulo 10 to have at most 10 partitions which the producers
+ // gets split across.
+ return (int) (element.getProducerId().hashCode() % 10);
+ }
+ };
+
+ @Override
+ protected void setUp() throws Exception {
+ dispatcher = createDispatcher();
+ dispatcher.start();
+ if (tcp) {
+ sendBrokerBindURI = "tcp://localhost:10000?wireFormat=multi";
+ receiveBrokerBindURI = "tcp://localhost:20000?wireFormat=multi";
+ sendBrokerConnectURI = "tcp://localhost:10000";
+ receiveBrokerConnectURI = "tcp://localhost:20000";
+ } else {
+ if (forceMarshalling) {
+ sendBrokerBindURI = "pipe://SendBroker";
+ receiveBrokerBindURI = "pipe://ReceiveBroker";
+ } else {
+ sendBrokerBindURI = "pipe://SendBroker";
+ receiveBrokerBindURI = "pipe://ReceiveBroker";
+ }
+ sendBrokerConnectURI = sendBrokerBindURI;
+ receiveBrokerConnectURI = receiveBrokerBindURI;
+ }
+ }
+
+ protected IDispatcher createDispatcher() {
+ return PriorityDispatcher.createPriorityDispatchPool("BrokerDispatcher", Broker.MAX_PRIORITY, asyncThreadPoolSize);
+ }
+
+ public void test_1_1_0() throws Exception {
+ producerCount = 1;
+ destCount = 1;
+
+ createConnections();
+
+ // Start 'em up.
+ startServices();
+ try {
+ reportRates();
+ } finally {
+ stopServices();
+ }
+ }
+
+ public void test_1_1_1() throws Exception {
+ producerCount = 1;
+ destCount = 1;
+ consumerCount = 1;
+
+ createConnections();
+
+ // Start 'em up.
+ startServices();
+ try {
+ reportRates();
+ } finally {
+ stopServices();
+ }
+ }
+
+ public void test_10_1_10() throws Exception {
+ producerCount = FANIN_COUNT;
+ consumerCount = FANOUT_COUNT;
+ destCount = 1;
+
+ createConnections();
+
+ // Start 'em up.
+ startServices();
+ try {
+ reportRates();
+ } finally {
+ stopServices();
+ }
+ }
+
+ public void test_10_1_1() throws Exception {
+ producerCount = FANIN_COUNT;
+ destCount = 1;
+ consumerCount = 1;
+
+ createConnections();
+
+ // Start 'em up.
+ startServices();
+ try {
+ reportRates();
+ } finally {
+ stopServices();
+ }
+ }
+
+ public void test_1_1_10() throws Exception {
+ producerCount = 1;
+ destCount = 1;
+ consumerCount = FANOUT_COUNT;
+
+ createConnections();
+
+ // Start 'em up.
+ startServices();
+ try {
+ reportRates();
+ } finally {
+ stopServices();
+ }
+ }
+
+ public void test_2_2_2() throws Exception {
+ producerCount = 2;
+ destCount = 2;
+ consumerCount = 2;
+
+ createConnections();
+
+ // Start 'em up.
+ startServices();
+ try {
+ reportRates();
+ } finally {
+ stopServices();
+ }
+ }
+
+ public void test_10_10_10() throws Exception {
+ producerCount = 10;
+ destCount = 10;
+ consumerCount = 10;
+
+ createConnections();
+
+ // Start 'em up.
+ startServices();
+ try {
+ reportRates();
+ } finally {
+ stopServices();
+ }
+ }
+
+
+ /**
+ * Tests 2 producers sending to 1 destination with 2 consumres, but with
+ * consumers set to select only messages from each producer. 1 consumers is
+ * set to slow, the other producer should be able to send quickly.
+ *
+ * @throws Exception
+ */
+ public void test_2_2_2_SlowConsumer() throws Exception {
+ producerCount = 2;
+ destCount = 2;
+ consumerCount = 2;
+
+ createConnections();
+ consumers.get(0).setThinkTime(50);
+
+ // Start 'em up.
+ startServices();
+ try {
+ reportRates();
+ } finally {
+ stopServices();
+ }
+ }
+
+ public void test_2_2_2_Selector() throws Exception {
+ producerCount = 2;
+ destCount = 2;
+ consumerCount = 2;
+
+ createConnections();
+
+ // Add properties to match producers to their consumers
+ for (int i = 0; i < consumerCount; i++) {
+ String property = "match" + i;
+ consumers.get(i).setSelector(property);
+ producers.get(i).setProperty(property);
+ }
+
+ // Start 'em up.
+ startServices();
+ try {
+ reportRates();
+ } finally {
+ stopServices();
+ }
+ }
+
+ /**
+ * Test sending with 1 high priority sender. The high priority sender should
+ * have higher throughput than the other low priority senders.
+ *
+ * @throws Exception
+ */
+ public void test_2_1_1_HighPriorityProducer() throws Exception {
+
+ producerCount = 2;
+ destCount = 1;
+ consumerCount = 1;
+
+ createConnections();
+ RemoteProducer producer = producers.get(0);
+ producer.setPriority(1);
+ producer.getRate().setName("High Priority Producer Rate");
+
+ consumers.get(0).setThinkTime(1);
+
+ // Start 'em up.
+ startServices();
+ try {
+
+ System.out.println("Checking rates for test: " + getName());
+ for (int i = 0; i < PERFORMANCE_SAMPLES; i++) {
+ Period p = new Period();
+ Thread.sleep(1000 * 5);
+ System.out.println(producer.getRate().getRateSummary(p));
+ System.out.println(totalProducerRate.getRateSummary(p));
+ System.out.println(totalConsumerRate.getRateSummary(p));
+ totalProducerRate.reset();
+ totalConsumerRate.reset();
+ }
+
+ } finally {
+ stopServices();
+ }
+ }
+
+ /**
+ * Test sending with 1 high priority sender. The high priority sender should
+ * have higher throughput than the other low priority senders.
+ *
+ * @throws Exception
+ */
+ public void test_2_1_1_MixedHighPriorityProducer() throws Exception {
+ producerCount = 2;
+ destCount = 1;
+ consumerCount = 1;
+
+ createConnections();
+ RemoteProducer producer = producers.get(0);
+ producer.setPriority(1);
+ producer.setPriorityMod(3);
+ producer.getRate().setName("High Priority Producer Rate");
+
+ consumers.get(0).setThinkTime(1);
+
+ // Start 'em up.
+ startServices();
+ try {
+
+ System.out.println("Checking rates for test: " + getName());
+ for (int i = 0; i < PERFORMANCE_SAMPLES; i++) {
+ Period p = new Period();
+ Thread.sleep(1000 * 5);
+ System.out.println(producer.getRate().getRateSummary(p));
+ System.out.println(totalProducerRate.getRateSummary(p));
+ System.out.println(totalConsumerRate.getRateSummary(p));
+ totalProducerRate.reset();
+ totalConsumerRate.reset();
+ }
+
+ } finally {
+ stopServices();
+ }
+ }
+
+ private void reportRates() throws InterruptedException {
+ System.out.println("Checking rates for test: " + getName() + ", " + (ptp ? "ptp" : "topic"));
+ for (int i = 0; i < PERFORMANCE_SAMPLES; i++) {
+ Period p = new Period();
+ Thread.sleep(1000 * 5);
+ System.out.println(totalProducerRate.getRateSummary(p));
+ System.out.println(totalConsumerRate.getRateSummary(p));
+ totalProducerRate.reset();
+ totalConsumerRate.reset();
+ }
+ }
+
+ private void createConnections() throws IOException, URISyntaxException {
+
+ if (multibroker) {
+ sendBroker = createBroker("SendBroker", sendBrokerBindURI, sendBrokerConnectURI);
+ rcvBroker = createBroker("RcvBroker", receiveBrokerBindURI, receiveBrokerConnectURI);
+ brokers.add(sendBroker);
+ brokers.add(rcvBroker);
+ } else {
+ sendBroker = rcvBroker = createBroker("Broker", sendBrokerBindURI, sendBrokerConnectURI);
+ brokers.add(sendBroker);
+ }
+
+ Destination[] dests = new Destination[destCount];
+
+ for (int i = 0; i < destCount; i++) {
+ Destination.SingleDestination bean = new Destination.SingleDestination();
+ bean.setName(new AsciiBuffer("dest" + (i + 1)));
+ bean.setDomain(ptp ? Router.QUEUE_DOMAIN : Router.TOPIC_DOMAIN);
+ dests[i] = bean;
+ if (ptp) {
+ Queue queue = createQueue(sendBroker, dests[i]);
+ sendBroker.addQueue(queue);
+ if (multibroker) {
+ queue = createQueue(rcvBroker, dests[i]);
+ rcvBroker.addQueue(queue);
+ }
+ }
+ }
+
+ for (int i = 0; i < producerCount; i++) {
+ Destination destination = dests[i % destCount];
+ RemoteProducer producer = createProducer(i, destination);
+ producers.add(producer);
+ }
+
+ for (int i = 0; i < consumerCount; i++) {
+ Destination destination = dests[i % destCount];
+ RemoteConsumer consumer = createConsumer(i, destination);
+ consumers.add(consumer);
+ }
+
+ // Create MultiBroker connections:
+ // if (multibroker) {
+ // Pipe<Message> pipe = new Pipe<Message>();
+ // sendBroker.createBrokerConnection(rcvBroker, pipe);
+ // rcvBroker.createBrokerConnection(sendBroker, pipe.connect());
+ // }
+ }
+
+ private RemoteConsumer createConsumer(int i, Destination destination) throws URISyntaxException {
+ RemoteConsumer consumer = createConsumer();
+ consumer.setExceptionListener(new ExceptionListener(){
+ public void exceptionThrown(Exception error) {
+ if( !stopping.get() ) {
+ System.err.println("Consumer Async Error:");
+ error.printStackTrace();
+ }
+ }
+ });
+ consumer.setUri(new URI(rcvBroker.getConnectUri()));
+ consumer.setDestination(destination);
+ consumer.setName("consumer" + (i + 1));
+ consumer.setTotalConsumerRate(totalConsumerRate);
+ consumer.setDispatcher(dispatcher);
+ return consumer;
+ }
+
+ abstract protected RemoteConsumer createConsumer();
+
+ private RemoteProducer createProducer(int id, Destination destination) throws URISyntaxException {
+ RemoteProducer producer = cerateProducer();
+ producer.setExceptionListener(new ExceptionListener(){
+ public void exceptionThrown(Exception error) {
+ if( !stopping.get() ) {
+ System.err.println("Producer Async Error:");
+ error.printStackTrace();
+ }
+ }
+ });
+ producer.setUri(new URI(sendBroker.getConnectUri()));
+ producer.setProducerId(id + 1);
+ producer.setName("producer" + (id + 1));
+ producer.setDestination(destination);
+ producer.setMessageIdGenerator(msgIdGenerator);
+ producer.setTotalProducerRate(totalProducerRate);
+ producer.setDispatcher(dispatcher);
+ return producer;
+ }
+
+ abstract protected RemoteProducer cerateProducer();
+
+ private Queue createQueue(Broker broker, Destination destination) {
+ Queue queue = new Queue();
+ queue.setBroker(broker);
+ queue.setDestination(destination);
+ queue.setKeyExtractor(KEY_MAPPER);
+ if (usePartitionedQueue) {
+ queue.setPartitionMapper(PARTITION_MAPPER);
+ }
+ return queue;
+ }
+
+ private Broker createBroker(String name, String bindURI, String connectUri) {
+ Broker broker = new Broker();
+ broker.setName(name);
+ broker.setBindUri(bindURI);
+ broker.setConnectUri(connectUri);
+ broker.setDispatcher(dispatcher);
+ return broker;
+ }
+
+ private void stopServices() throws Exception {
+ stopping.set(true);
+ for (Broker broker : brokers) {
+ broker.stop();
+ }
+ for (RemoteProducer connection : producers) {
+ connection.stop();
+ }
+ for (RemoteConsumer connection : consumers) {
+ connection.stop();
+ }
+ if (dispatcher != null) {
+ dispatcher.shutdown();
+ }
+ }
+
+ private void startServices() throws Exception {
+ for (Broker broker : brokers) {
+ broker.start();
+ }
+ for (RemoteConsumer connection : consumers) {
+ connection.start();
+ }
+
+ for (RemoteProducer connection : producers) {
+ connection.start();
+ }
+ }
+
+}
Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteConsumer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteConsumer.java?rev=754964&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteConsumer.java (added)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteConsumer.java Mon Mar 16 17:25:23 2009
@@ -0,0 +1,118 @@
+package org.apache.activemq.broker;
+
+import java.net.URI;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.Connection;
+import org.apache.activemq.flow.ISourceController;
+import org.apache.activemq.metric.MetricAggregator;
+import org.apache.activemq.metric.MetricCounter;
+import org.apache.activemq.transport.DispatchableTransport;
+import org.apache.activemq.transport.TransportFactory;
+
+abstract public class RemoteConsumer extends Connection {
+
+ protected final MetricCounter consumerRate = new MetricCounter();
+
+ protected MetricAggregator totalConsumerRate;
+ protected long thinkTime;
+ protected Destination destination;
+ protected String selector;
+ protected URI uri;
+
+ private boolean schedualWait;
+
+ public void start() throws Exception {
+ consumerRate.name("Consumer " + name + " Rate");
+ totalConsumerRate.add(consumerRate);
+
+ transport = TransportFactory.compositeConnect(uri);
+ if(transport instanceof DispatchableTransport)
+ {
+ schedualWait = true;
+ }
+ initialize();
+ super.start();
+ setupSubscription();
+
+ }
+
+
+ abstract protected void setupSubscription() throws Exception;
+
+ protected void messageReceived(final ISourceController<MessageDelivery> controller, final MessageDelivery elem) {
+ if( schedualWait ) {
+ if (thinkTime > 0) {
+ getDispatcher().schedule(new Runnable(){
+ public void run() {
+ consumerRate.increment();
+ controller.elementDispatched(elem);
+ }
+ }, thinkTime, TimeUnit.MILLISECONDS);
+
+ }
+ else
+ {
+ consumerRate.increment();
+ controller.elementDispatched(elem);
+ }
+
+ } else {
+ if( thinkTime>0 ) {
+ try {
+ Thread.sleep(thinkTime);
+ } catch (InterruptedException e) {
+ }
+ }
+ consumerRate.increment();
+ controller.elementDispatched(elem);
+ }
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public MetricAggregator getTotalConsumerRate() {
+ return totalConsumerRate;
+ }
+
+ public void setTotalConsumerRate(MetricAggregator totalConsumerRate) {
+ this.totalConsumerRate = totalConsumerRate;
+ }
+
+ public Destination getDestination() {
+ return destination;
+ }
+
+ public void setDestination(Destination destination) {
+ this.destination = destination;
+ }
+
+ public long getThinkTime() {
+ return thinkTime;
+ }
+
+ public void setThinkTime(long thinkTime) {
+ this.thinkTime = thinkTime;
+ }
+
+ public MetricCounter getConsumerRate() {
+ return consumerRate;
+ }
+
+ public String getSelector() {
+ return selector;
+ }
+
+ public void setSelector(String selector) {
+ this.selector = selector;
+ }
+
+ public URI getUri() {
+ return uri;
+ }
+
+ public void setUri(URI uri) {
+ this.uri = uri;
+ }}
Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteProducer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteProducer.java?rev=754964&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteProducer.java (added)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteProducer.java Mon Mar 16 17:25:23 2009
@@ -0,0 +1,200 @@
+package org.apache.activemq.broker;
+
+import java.net.URI;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.activemq.Connection;
+import org.apache.activemq.dispatch.IDispatcher.DispatchContext;
+import org.apache.activemq.dispatch.IDispatcher.Dispatchable;
+import org.apache.activemq.flow.IFlowController;
+import org.apache.activemq.flow.IFlowSink;
+import org.apache.activemq.flow.ISinkController;
+import org.apache.activemq.flow.ISinkController.FlowUnblockListener;
+import org.apache.activemq.metric.MetricAggregator;
+import org.apache.activemq.metric.MetricCounter;
+import org.apache.activemq.transport.TransportFactory;
+
+abstract public class RemoteProducer extends Connection implements Dispatchable, FlowUnblockListener<MessageDelivery> {
+
+ protected final MetricCounter rate = new MetricCounter();
+
+ protected AtomicLong messageIdGenerator;
+ protected int priority;
+ protected int priorityMod;
+ protected int counter;
+ protected int producerId;
+ protected Destination destination;
+ protected String property;
+ protected MetricAggregator totalProducerRate;
+ protected MessageDelivery next;
+ protected DispatchContext dispatchContext;
+ protected String filler;
+ protected int payloadSize = 20;
+ protected URI uri;
+
+ protected IFlowController<MessageDelivery> outboundController;
+ protected IFlowSink<MessageDelivery> outboundQueue;
+
+ public void start() throws Exception {
+
+ if( payloadSize>0 ) {
+ StringBuilder sb = new StringBuilder(payloadSize);
+ for( int i=0; i < payloadSize; ++i) {
+ sb.append((char)('a'+(i%26)));
+ }
+ filler = sb.toString();
+ }
+
+ rate.name("Producer " + name + " Rate");
+ totalProducerRate.add(rate);
+
+
+ transport = TransportFactory.compositeConnect(uri);
+ initialize();
+ super.start();
+
+ setupProducer();
+
+ dispatchContext = getDispatcher().register(this, name + "-client");
+ dispatchContext.requestDispatch();
+
+ }
+
+ abstract protected void setupProducer() throws Exception;
+
+ abstract protected void createNextMessage();
+
+ public void stop() throws Exception
+ {
+ dispatchContext.close(false);
+ super.stop();
+ }
+
+ public void onFlowUnblocked(ISinkController<MessageDelivery> controller) {
+ dispatchContext.requestDispatch();
+ }
+
+ public boolean dispatch() {
+ while(true)
+ {
+
+ if(next == null)
+ {
+ createNextMessage();
+ }
+
+ //If flow controlled stop until flow control is lifted.
+ if(outboundController.isSinkBlocked())
+ {
+ if(outboundController.addUnblockListener(this))
+ {
+ return true;
+ }
+ }
+
+ outboundQueue.add(next, null);
+ rate.increment();
+ next = null;
+ }
+ }
+
+ protected String createPayload() {
+ if( payloadSize>=0 ) {
+ StringBuilder sb = new StringBuilder(payloadSize);
+ sb.append(name);
+ sb.append(':');
+ sb.append(++counter);
+ sb.append(':');
+ int length = sb.length();
+ if( length <= payloadSize ) {
+ sb.append(filler.subSequence(0, payloadSize-length));
+ return sb.toString();
+ } else {
+ return sb.substring(0, payloadSize);
+ }
+ } else {
+ return name+":"+(++counter);
+ }
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public AtomicLong getMessageIdGenerator() {
+ return messageIdGenerator;
+ }
+
+ public void setMessageIdGenerator(AtomicLong msgIdGenerator) {
+ this.messageIdGenerator = msgIdGenerator;
+ }
+
+ public int getPriority() {
+ return priority;
+ }
+
+ public void setPriority(int msgPriority) {
+ this.priority = msgPriority;
+ }
+
+ public int getPriorityMod() {
+ return priorityMod;
+ }
+
+ public void setPriorityMod(int priorityMod) {
+ this.priorityMod = priorityMod;
+ }
+
+ public int getProducerId() {
+ return producerId;
+ }
+
+ public void setProducerId(int producerId) {
+ this.producerId = producerId;
+ }
+
+ public Destination getDestination() {
+ return destination;
+ }
+
+ public void setDestination(Destination destination) {
+ this.destination = destination;
+ }
+
+ public String getProperty() {
+ return property;
+ }
+
+ public void setProperty(String property) {
+ this.property = property;
+ }
+
+ public MetricAggregator getTotalProducerRate() {
+ return totalProducerRate;
+ }
+
+ public void setTotalProducerRate(MetricAggregator totalProducerRate) {
+ this.totalProducerRate = totalProducerRate;
+ }
+
+ public MetricCounter getRate() {
+ return rate;
+ }
+
+ public int getPayloadSize() {
+ return payloadSize;
+ }
+
+ public void setPayloadSize(int messageSize) {
+ this.payloadSize = messageSize;
+ }
+
+ public URI getUri() {
+ return uri;
+ }
+
+ public void setUri(URI uri) {
+ this.uri = uri;
+ }
+}
+