You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/08/12 07:27:34 UTC
svn commit: r565003 [16/17] - in /activemq/trunk:
activemq-fileserver/src/main/java/org/apache/activemq/util/
activemq-fileserver/src/test/java/org/apache/activemq/util/
activemq-jaas/src/main/java/org/apache/activemq/jaas/
activemq-jaas/src/test/java/...
Modified: activemq/trunk/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/XmppTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/XmppTransport.java?view=diff&rev=565003&r1=565002&r2=565003
==============================================================================
--- activemq/trunk/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/XmppTransport.java (original)
+++ activemq/trunk/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/XmppTransport.java Sat Aug 11 22:27:21 2007
@@ -16,18 +16,11 @@
*/
package org.apache.activemq.transport.xmpp;
-import ietf.params.xml.ns.xmpp_sasl.Mechanisms;
-import org.apache.activemq.command.BrokerInfo;
-import org.apache.activemq.command.Command;
-import org.apache.activemq.transport.tcp.TcpBufferedInputStream;
-import org.apache.activemq.transport.tcp.TcpBufferedOutputStream;
-import org.apache.activemq.transport.tcp.TcpTransport;
-import org.apache.activemq.util.IOExceptionSupport;
-import org.apache.activemq.util.ServiceStopper;
-import org.apache.activemq.wireformat.WireFormat;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.jabber.etherx.streams.Features;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.net.URI;
import javax.net.SocketFactory;
import javax.xml.bind.JAXBContext;
@@ -45,11 +38,20 @@
import javax.xml.stream.events.Attribute;
import javax.xml.stream.events.StartElement;
import javax.xml.stream.events.XMLEvent;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.Socket;
-import java.net.URI;
+
+import ietf.params.xml.ns.xmpp_sasl.Mechanisms;
+
+import org.apache.activemq.command.BrokerInfo;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.transport.tcp.TcpBufferedInputStream;
+import org.apache.activemq.transport.tcp.TcpBufferedOutputStream;
+import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jabber.etherx.streams.Features;
/**
* @version $Revision$
@@ -57,7 +59,10 @@
public class XmppTransport extends TcpTransport {
protected static final QName ATTRIBUTE_TO = new QName("to");
- private static final transient Log log = LogFactory.getLog(XmppTransport.class);
+ private static final transient Log LOG = LogFactory.getLog(XmppTransport.class);
+
+ protected OutputStream outputStream;
+ protected InputStream inputStream;
private JAXBContext context;
private XMLEventReader xmlReader;
@@ -65,8 +70,6 @@
private Marshaller marshaller;
private XMLStreamWriter xmlWriter;
private String to = "client";
- protected OutputStream outputStream;
- protected InputStream inputStream;
private ProtocolConverter converter;
private String from = "localhost";
private String brokerId = "broker-id-1";
@@ -85,71 +88,62 @@
converter = new ProtocolConverter(this);
}
-
@Override
public void oneway(Object object) throws IOException {
if (object instanceof Command) {
- Command command = (Command) object;
+ Command command = (Command)object;
if (command instanceof BrokerInfo) {
- BrokerInfo brokerInfo = (BrokerInfo) command;
+ BrokerInfo brokerInfo = (BrokerInfo)command;
brokerId = brokerInfo.getBrokerId().toString();
from = brokerInfo.getBrokerName();
try {
writeOpenStream(brokerId, from);
- }
- catch (XMLStreamException e) {
+ } catch (XMLStreamException e) {
throw IOExceptionSupport.create(e);
}
- }
- else {
+ } else {
try {
converter.onActiveMQCommad(command);
- }
- catch (IOException e) {
+ } catch (IOException e) {
throw e;
- }
- catch (Exception e) {
+ } catch (Exception e) {
throw IOExceptionSupport.create(e);
}
}
- }
- else {
- log.warn("Unkown command: " + object);
+ } else {
+ LOG.warn("Unkown command: " + object);
}
}
-
/**
* Marshalls the given POJO to the client
*/
public void marshall(Object command) throws IOException {
if (isStopped() || isStopping()) {
- log.warn("Not marshalling command as shutting down: " + command);
+ LOG.warn("Not marshalling command as shutting down: " + command);
return;
}
try {
marshaller.marshal(command, xmlWriter);
xmlWriter.flush();
outputStream.flush();
- }
- catch (JAXBException e) {
+ } catch (JAXBException e) {
throw IOExceptionSupport.create(e);
- }
- catch (XMLStreamException e) {
+ } catch (XMLStreamException e) {
throw IOExceptionSupport.create(e);
}
}
@Override
public void doRun() throws IOException {
- log.debug("XMPP consumer thread starting");
+ LOG.debug("XMPP consumer thread starting");
try {
XMLInputFactory xif = XMLInputFactory.newInstance();
xif.setXMLReporter(new XMLReporter() {
public void report(String message, String errorType, Object relatedInformation, Location location) throws XMLStreamException {
- log.warn(message + " errorType: " + errorType + " relatedInfo: " + relatedInformation);
+ LOG.warn(message + " errorType: " + errorType + " relatedInfo: " + relatedInformation);
}
});
@@ -160,7 +154,7 @@
XMLEvent rootElement = xmlReader.nextTag();
if (rootElement instanceof StartElement) {
- StartElement startElement = (StartElement) rootElement;
+ StartElement startElement = (StartElement)rootElement;
Attribute toAttribute = startElement.getAttributeByName(ATTRIBUTE_TO);
if (toAttribute != null) {
to = toAttribute.getValue();
@@ -178,28 +172,22 @@
if (object != null) {
converter.onXmppCommand(object);
}
- }
- else {
+ } else {
if (event.getEventType() == XMLEvent.END_ELEMENT) {
break;
- }
- else
- if (event.getEventType() == XMLEvent.END_ELEMENT || event.getEventType() == XMLEvent.END_DOCUMENT) {
+ } else if (event.getEventType() == XMLEvent.END_ELEMENT || event.getEventType() == XMLEvent.END_DOCUMENT) {
break;
- }
- else {
+ } else {
xmlReader.nextEvent();
}
}
}
- }
- catch (Exception e) {
- throw IOExceptionSupport.create(e);
+ } catch (Exception e) {
+ throw IOExceptionSupport.create(e);
}
}
-
public String getFrom() {
return from;
}
@@ -211,19 +199,17 @@
xmlWriter.writeEndElement();
xmlWriter.writeEndDocument();
xmlWriter.close();
- }
- catch (XMLStreamException e) {
+ } catch (XMLStreamException e) {
// the client may have closed first so ignore this
- log.info("Caught trying to close transport: " + e, e);
+ LOG.info("Caught trying to close transport: " + e, e);
}
}
if (xmlReader != null) {
try {
xmlReader.close();
- }
- catch (XMLStreamException e) {
+ } catch (XMLStreamException e) {
// the client may have closed first so ignore this
- log.info("Caught trying to close transport: " + e, e);
+ LOG.info("Caught trying to close transport: " + e, e);
}
}
super.doStop(stopper);
@@ -233,29 +219,17 @@
protected void initializeStreams() throws Exception {
// TODO it would be preferable to use class discovery here!
context = JAXBContext.newInstance("jabber.client"
- /*
- + ":jabber.server"
- + ":jabber.iq.gateway"
- + ":jabber.iq.last"
- + ":jabber.iq.oob"
- + ":jabber.iq.pass"
- + ":jabber.iq.time"
- + ":jabber.iq.version"
- + ":org.jabber.protocol.activity" + ":org.jabber.protocol.address"
- + ":org.jabber.protocol.amp" + ":org.jabber.protocol.amp_errors"
- + ":org.jabber.protocol.muc_admin"
- + ":org.jabber.protocol.muc_unique"
- */
- + ":jabber.iq._private"
- + ":jabber.iq.auth"
- + ":jabber.iq.roster"
- + ":org.jabber.etherx.streams"
- + ":org.jabber.protocol.disco_info" + ":org.jabber.protocol.disco_items"
- + ":org.jabber.protocol.muc"
- + ":org.jabber.protocol.muc_user"
- + ":ietf.params.xml.ns.xmpp_sasl" + ":ietf.params.xml.ns.xmpp_stanzas"
- + ":ietf.params.xml.ns.xmpp_streams" + ":ietf.params.xml.ns.xmpp_tls"
- );
+ /*
+ * + ":jabber.server" + ":jabber.iq.gateway" + ":jabber.iq.last" +
+ * ":jabber.iq.oob" + ":jabber.iq.pass" + ":jabber.iq.time" +
+ * ":jabber.iq.version" + ":org.jabber.protocol.activity" +
+ * ":org.jabber.protocol.address" + ":org.jabber.protocol.amp" +
+ * ":org.jabber.protocol.amp_errors" + ":org.jabber.protocol.muc_admin" +
+ * ":org.jabber.protocol.muc_unique"
+ */
+ + ":jabber.iq._private" + ":jabber.iq.auth" + ":jabber.iq.roster" + ":org.jabber.etherx.streams" + ":org.jabber.protocol.disco_info" + ":org.jabber.protocol.disco_items"
+ + ":org.jabber.protocol.muc" + ":org.jabber.protocol.muc_user" + ":ietf.params.xml.ns.xmpp_sasl" + ":ietf.params.xml.ns.xmpp_stanzas"
+ + ":ietf.params.xml.ns.xmpp_streams" + ":ietf.params.xml.ns.xmpp_tls");
inputStream = new TcpBufferedInputStream(socket.getInputStream(), 8 * 1024);
outputStream = new TcpBufferedOutputStream(socket.getOutputStream(), 16 * 1024);
@@ -266,9 +240,9 @@
}
protected void writeOpenStream(String id, String from) throws IOException, XMLStreamException {
- log.debug("Sending initial stream element");
+ LOG.debug("Sending initial stream element");
XMLOutputFactory factory = XMLOutputFactory.newInstance();
- //factory.setProperty(XMLOutputFactory.IS_REPAIRING_NAMESPACES, true);
+ // factory.setProperty(XMLOutputFactory.IS_REPAIRING_NAMESPACES, true);
xmlWriter = factory.createXMLStreamWriter(outputStream);
// write the dummy start tag
@@ -284,22 +258,21 @@
xmlWriter.writeAttribute("to", to);
xmlWriter.writeAttribute("from", from);
-
// now lets write the features
Features features = new Features();
// TODO support TLS
- //features.getAny().add(new Starttls());
+ // features.getAny().add(new Starttls());
Mechanisms mechanisms = new Mechanisms();
// TODO support SASL
- //mechanisms.getMechanism().add("DIGEST-MD5");
- //mechanisms.getMechanism().add("PLAIN");
+ // mechanisms.getMechanism().add("DIGEST-MD5");
+ // mechanisms.getMechanism().add("PLAIN");
features.getAny().add(mechanisms);
marshall(features);
- log.debug("Initial stream element sent!");
+ LOG.debug("Initial stream element sent!");
}
}
Modified: activemq/trunk/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/XmppTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/XmppTransportFactory.java?view=diff&rev=565003&r1=565002&r2=565003
==============================================================================
--- activemq/trunk/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/XmppTransportFactory.java (original)
+++ activemq/trunk/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/XmppTransportFactory.java Sat Aug 11 22:27:21 2007
@@ -16,18 +16,19 @@
*/
package org.apache.activemq.transport.xmpp;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Map;
+
+import javax.net.ServerSocketFactory;
+import javax.net.SocketFactory;
+
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.transport.tcp.TcpTransportFactory;
import org.apache.activemq.transport.tcp.TcpTransportServer;
import org.apache.activemq.wireformat.WireFormat;
-
-import javax.net.SocketFactory;
-import javax.net.ServerSocketFactory;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Map;
/**
* @version $Revision$
Modified: activemq/trunk/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/XmppTransportServer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/XmppTransportServer.java?view=diff&rev=565003&r1=565002&r2=565003
==============================================================================
--- activemq/trunk/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/XmppTransportServer.java (original)
+++ activemq/trunk/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/XmppTransportServer.java Sat Aug 11 22:27:21 2007
@@ -16,16 +16,17 @@
*/
package org.apache.activemq.transport.xmpp;
-import org.apache.activemq.transport.tcp.TcpTransportServer;
-import org.apache.activemq.transport.tcp.TcpTransportFactory;
-import org.apache.activemq.transport.Transport;
-import org.apache.activemq.wireformat.WireFormat;
-
-import javax.net.ServerSocketFactory;
+import java.io.IOException;
+import java.net.Socket;
import java.net.URI;
import java.net.URISyntaxException;
-import java.net.Socket;
-import java.io.IOException;
+
+import javax.net.ServerSocketFactory;
+
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.tcp.TcpTransportFactory;
+import org.apache.activemq.transport.tcp.TcpTransportServer;
+import org.apache.activemq.wireformat.WireFormat;
/**
* @version $Revision$
Modified: activemq/trunk/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/XmppWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/XmppWireFormat.java?view=diff&rev=565003&r1=565002&r2=565003
==============================================================================
--- activemq/trunk/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/XmppWireFormat.java (original)
+++ activemq/trunk/activemq-xmpp/src/main/java/org/apache/activemq/transport/xmpp/XmppWireFormat.java Sat Aug 11 22:27:21 2007
@@ -16,6 +16,12 @@
*/
package org.apache.activemq.transport.xmpp;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence;
@@ -23,20 +29,13 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
/**
* A wire format which uses XMPP format of messages
*
* @version $Revision$
*/
public class XmppWireFormat implements WireFormat {
- private static final Log log = LogFactory.getLog(XmppWireFormat.class);
-
+
private int version = 1;
public WireFormat copy() {
Modified: activemq/trunk/activemq-xmpp/src/test/java/org/apache/activemq/transport/xmpp/XmppBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-xmpp/src/test/java/org/apache/activemq/transport/xmpp/XmppBroker.java?view=diff&rev=565003&r1=565002&r2=565003
==============================================================================
--- activemq/trunk/activemq-xmpp/src/test/java/org/apache/activemq/transport/xmpp/XmppBroker.java (original)
+++ activemq/trunk/activemq-xmpp/src/test/java/org/apache/activemq/transport/xmpp/XmppBroker.java Sat Aug 11 22:27:21 2007
@@ -32,8 +32,7 @@
System.out.println("Press any key to terminate");
System.in.read();
- }
- catch (Exception e) {
+ } catch (Exception e) {
System.out.println("Caught: " + e);
e.printStackTrace();
}
Modified: activemq/trunk/activemq-xmpp/src/test/java/org/apache/activemq/transport/xmpp/XmppTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-xmpp/src/test/java/org/apache/activemq/transport/xmpp/XmppTest.java?view=diff&rev=565003&r1=565002&r2=565003
==============================================================================
--- activemq/trunk/activemq-xmpp/src/test/java/org/apache/activemq/transport/xmpp/XmppTest.java (original)
+++ activemq/trunk/activemq-xmpp/src/test/java/org/apache/activemq/transport/xmpp/XmppTest.java Sat Aug 11 22:27:21 2007
@@ -27,7 +27,7 @@
*/
public class XmppTest extends TestCase {
- protected static boolean block = false;
+ protected static boolean block;
private XmppBroker broker = new XmppBroker();
@@ -37,12 +37,13 @@
}
public void testConnect() throws Exception {
- //ConnectionConfiguration config = new ConnectionConfiguration("localhost", 61222);
- //config.setDebuggerEnabled(true);
+ // ConnectionConfiguration config = new
+ // ConnectionConfiguration("localhost", 61222);
+ // config.setDebuggerEnabled(true);
try {
- //SmackConfiguration.setPacketReplyTimeout(1000);
- //XMPPConnection con = new XMPPConnection(config);
+ // SmackConfiguration.setPacketReplyTimeout(1000);
+ // XMPPConnection con = new XMPPConnection(config);
XMPPConnection con = new XMPPConnection("localhost", 61222);
con.login("amq-user", "amq-pwd");
Chat chat = con.createChat("test@localhost");
@@ -51,13 +52,11 @@
chat.sendMessage("Hello from Message: " + i);
}
System.out.println("Sent all messages!");
- }
- catch (XMPPException e) {
+ } catch (XMPPException e) {
if (block) {
System.out.println("Caught: " + e);
e.printStackTrace();
- }
- else {
+ } else {
throw e;
}
}
@@ -69,12 +68,10 @@
System.out.println("Done!");
}
-
@Override
protected void setUp() throws Exception {
broker.start();
}
-
@Override
protected void tearDown() throws Exception {
Modified: activemq/trunk/assembly/src/release/example/src/CommandLineSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/assembly/src/release/example/src/CommandLineSupport.java?view=diff&rev=565003&r1=565002&r2=565003
==============================================================================
--- activemq/trunk/assembly/src/release/example/src/CommandLineSupport.java (original)
+++ activemq/trunk/assembly/src/release/example/src/CommandLineSupport.java Sat Aug 11 22:27:21 2007
@@ -20,94 +20,101 @@
import org.apache.activemq.util.IntrospectionSupport;
/**
- * Helper utility that can be used to set the properties on any object
- * using command line arguments.
+ * Helper utility that can be used to set the properties on any object using
+ * command line arguments.
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-public class CommandLineSupport {
-
- /**
- * Sets the properties of an object given the command line args.
- *
- * if args contains: --ack-mode=AUTO --url=tcp://localhost:61616 --persistent
- *
- * then it will try to call the following setters on the target object.
- *
- * target.setAckMode("AUTO");
- * target.setURL(new URI("tcp://localhost:61616") );
- * target.setPersistent(true);
- *
- * Notice the the proper conversion for the argument is determined by examining the
- * setter arguement type.
- *
- * @param target the object that will have it's properties set
- * @param args the commline options
- * @return any arguments that are not valid options for the target
- */
- static public String[] setOptions(Object target, String []args) {
- ArrayList rc = new ArrayList();
-
- for (int i = 0; i < args.length; i++) {
- if( args[i] == null )
- continue;
-
- if( args[i].startsWith("--") ) {
-
- // --options without a specified value are considered boolean flags that are enabled.
- String value="true";
- String name = args[i].substring(2);
-
- // if --option=value case
- int p = name.indexOf("=");
- if( p > 0 ) {
- value = name.substring(p+1);
- name = name.substring(0,p);
- }
-
- // name not set, then it's an unrecognized option
- if( name.length()==0 ) {
- rc.add(args[i]);
- continue;
- }
-
- String propName = convertOptionToPropertyName(name);
- if( !IntrospectionSupport.setProperty(target, propName, value) ) {
- rc.add(args[i]);
- continue;
- }
- }
-
- }
-
- String r[] = new String[rc.size()];
- rc.toArray(r);
- return r;
- }
-
- /**
- * converts strings like: test-enabled to testEnabled
- * @param name
- * @return
- */
- private static String convertOptionToPropertyName(String name) {
- String rc="";
-
- // Look for '-' and strip and then convert the subsequent char to uppercase
- int p = name.indexOf("-");
- while( p > 0 ) {
- // strip
- rc += name.substring(0, p);
- name = name.substring(p+1);
-
- // can I convert the next char to upper?
- if( name.length() >0 ) {
- rc += name.substring(0,1).toUpperCase();
- name = name.substring(1);
- }
-
- p = name.indexOf("-");
- }
- return rc+name;
- }
+public final class CommandLineSupport {
+
+ private CommandLineSupport() {
+ }
+
+ /**
+ * Sets the properties of an object given the command line args.
+ *
+ * if args contains: --ack-mode=AUTO --url=tcp://localhost:61616 --persistent
+ *
+ * then it will try to call the following setters on the target object.
+ *
+ * target.setAckMode("AUTO");
+ * target.setURL(new URI("tcp://localhost:61616") );
+ * target.setPersistent(true);
+ *
+ * Notice the the proper conversion for the argument is determined by examining the
+ * setter arguement type.
+ *
+ * @param target the object that will have it's properties set
+ * @param args the commline options
+ * @return any arguments that are not valid options for the target
+ */
+ public static String[] setOptions(Object target, String[] args) {
+ ArrayList<String> rc = new ArrayList<String>();
+
+ for (int i = 0; i < args.length; i++) {
+ if (args[i] == null) {
+ continue;
+ }
+
+ if (args[i].startsWith("--")) {
+
+ // --options without a specified value are considered boolean
+ // flags that are enabled.
+ String value = "true";
+ String name = args[i].substring(2);
+
+ // if --option=value case
+ int p = name.indexOf("=");
+ if (p > 0) {
+ value = name.substring(p + 1);
+ name = name.substring(0, p);
+ }
+
+ // name not set, then it's an unrecognized option
+ if (name.length() == 0) {
+ rc.add(args[i]);
+ continue;
+ }
+
+ String propName = convertOptionToPropertyName(name);
+ if (!IntrospectionSupport.setProperty(target, propName, value)) {
+ rc.add(args[i]);
+ continue;
+ }
+ }
+
+ }
+
+ String r[] = new String[rc.size()];
+ rc.toArray(r);
+ return r;
+ }
+
+ /**
+ * converts strings like: test-enabled to testEnabled
+ *
+ * @param name
+ * @return
+ */
+ private static String convertOptionToPropertyName(String name) {
+ String rc = "";
+
+ // Look for '-' and strip and then convert the subsequent char to
+ // uppercase
+ int p = name.indexOf("-");
+ while (p > 0) {
+ // strip
+ rc += name.substring(0, p);
+ name = name.substring(p + 1);
+
+ // can I convert the next char to upper?
+ if (name.length() > 0) {
+ rc += name.substring(0, 1).toUpperCase();
+ name = name.substring(1);
+ }
+
+ p = name.indexOf("-");
+ }
+ return rc + name;
+ }
}
Modified: activemq/trunk/assembly/src/release/example/src/ConsumerTool.java
URL: http://svn.apache.org/viewvc/activemq/trunk/assembly/src/release/example/src/ConsumerTool.java?view=diff&rev=565003&r1=565002&r2=565003
==============================================================================
--- activemq/trunk/assembly/src/release/example/src/ConsumerTool.java (original)
+++ activemq/trunk/assembly/src/release/example/src/ConsumerTool.java Sat Aug 11 22:27:21 2007
@@ -15,6 +15,9 @@
* limitations under the License.
*/
+import java.io.IOException;
+import java.util.Arrays;
+
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
@@ -31,9 +34,6 @@
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
-import java.io.IOException;
-import java.util.Arrays;
-
/**
* A simple tool for consuming messages
*
@@ -41,239 +41,250 @@
*/
public class ConsumerTool implements MessageListener, ExceptionListener {
- private boolean running;
-
- private Session session;
- private Destination destination;
- private MessageProducer replyProducer;
-
- private boolean pauseBeforeShutdown;
- private boolean verbose = true;
- private int maxiumMessages = 0;
- private String subject = "TOOL.DEFAULT";
- private boolean topic = false;
- private String user = ActiveMQConnection.DEFAULT_USER;
- private String password = ActiveMQConnection.DEFAULT_PASSWORD;
- private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
- private boolean transacted = false;
- private boolean durable = false;
- private String clientId;
- private int ackMode = Session.AUTO_ACKNOWLEDGE;
- private String consumerName = "James";
- private long sleepTime = 0;
- private long receiveTimeOut = 0;
-
- public static void main(String[] args) {
- ConsumerTool consumerTool = new ConsumerTool();
- String[] unknown = CommandLineSupport.setOptions(consumerTool, args);
- if (unknown.length > 0) {
- System.out.println("Unknown options: " + Arrays.toString(unknown));
- System.exit(-1);
- }
- consumerTool.run();
- }
-
- public void run() {
- try {
- running = true;
-
- System.out.println("Connecting to URL: " + url);
- System.out.println("Consuming " + (topic ? "topic" : "queue") + ": " + subject);
- System.out.println("Using a " + (durable ? "durable" : "non-durable") + " subscription");
-
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
- Connection connection = connectionFactory.createConnection();
- if (durable && clientId != null && clientId.length()>0 && !"null".equals(clientId) ) {
- connection.setClientID(clientId);
- }
- connection.setExceptionListener(this);
- connection.start();
-
- session = connection.createSession(transacted, ackMode);
- if (topic) {
- destination = session.createTopic(subject);
- } else {
- destination = session.createQueue(subject);
- }
-
- replyProducer = session.createProducer(null);
- replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-
- MessageConsumer consumer = null;
- if (durable && topic) {
- consumer = session.createDurableSubscriber((Topic) destination, consumerName);
- } else {
- consumer = session.createConsumer(destination);
- }
-
- if (maxiumMessages > 0) {
- consumeMessagesAndClose(connection, session, consumer);
- } else {
- if (receiveTimeOut == 0) {
- consumer.setMessageListener(this);
- } else {
- consumeMessagesAndClose(connection, session, consumer, receiveTimeOut);
- }
- }
-
- } catch (Exception e) {
- System.out.println("Caught: " + e);
- e.printStackTrace();
- }
- }
-
- public void onMessage(Message message) {
- try {
-
- if (message instanceof TextMessage) {
- TextMessage txtMsg = (TextMessage) message;
- if (verbose) {
-
- String msg = txtMsg.getText();
- if (msg.length() > 50) {
- msg = msg.substring(0, 50) + "...";
- }
-
- System.out.println("Received: " + msg);
- }
- } else {
- if (verbose) {
- System.out.println("Received: " + message);
- }
- }
-
- if (message.getJMSReplyTo() != null) {
- replyProducer.send(message.getJMSReplyTo(), session.createTextMessage("Reply: " + message.getJMSMessageID()));
- }
-
- if (transacted) {
- session.commit();
- } else if ( ackMode == Session.CLIENT_ACKNOWLEDGE ) {
- message.acknowledge();
- }
-
- } catch (JMSException e) {
- System.out.println("Caught: " + e);
- e.printStackTrace();
- } finally {
- if (sleepTime > 0) {
- try {
- Thread.sleep(sleepTime);
- } catch (InterruptedException e) {
- }
- }
- }
- }
-
- synchronized public void onException(JMSException ex) {
- System.out.println("JMS Exception occured. Shutting down client.");
- running = false;
- }
-
- synchronized boolean isRunning() {
- return running;
- }
-
- protected void consumeMessagesAndClose(Connection connection, Session session, MessageConsumer consumer) throws JMSException,
- IOException {
- System.out.println("We are about to wait until we consume: " + maxiumMessages + " message(s) then we will shutdown");
-
- for (int i = 0; i < maxiumMessages && isRunning();) {
- Message message = consumer.receive(1000);
- if (message != null) {
- i++;
- onMessage(message);
- }
- }
- System.out.println("Closing connection");
- consumer.close();
- session.close();
- connection.close();
- if (pauseBeforeShutdown) {
- System.out.println("Press return to shut down");
- System.in.read();
- }
- }
-
- protected void consumeMessagesAndClose(Connection connection, Session session, MessageConsumer consumer, long timeout)
- throws JMSException, IOException {
- System.out.println("We will consume messages while they continue to be delivered within: " + timeout
- + " ms, and then we will shutdown");
-
- Message message;
- while ((message = consumer.receive(timeout)) != null) {
- onMessage(message);
- }
-
- System.out.println("Closing connection");
- consumer.close();
- session.close();
- connection.close();
- if (pauseBeforeShutdown) {
- System.out.println("Press return to shut down");
- System.in.read();
- }
- }
-
- public void setAckMode(String ackMode) {
- if( "CLIENT_ACKNOWLEDGE".equals(ackMode) ) {
- this.ackMode = Session.CLIENT_ACKNOWLEDGE;
- }
- if( "AUTO_ACKNOWLEDGE".equals(ackMode) ) {
- this.ackMode = Session.AUTO_ACKNOWLEDGE;
- }
- if( "DUPS_OK_ACKNOWLEDGE".equals(ackMode) ) {
- this.ackMode = Session.DUPS_OK_ACKNOWLEDGE;
- }
- if( "SESSION_TRANSACTED".equals(ackMode) ) {
- this.ackMode = Session.SESSION_TRANSACTED;
- }
- }
-
- public void setClientId(String clientID) {
- this.clientId = clientID;
- }
- public void setConsumerName(String consumerName) {
- this.consumerName = consumerName;
- }
- public void setDurable(boolean durable) {
- this.durable = durable;
- }
- public void setMaxiumMessages(int maxiumMessages) {
- this.maxiumMessages = maxiumMessages;
- }
- public void setPauseBeforeShutdown(boolean pauseBeforeShutdown) {
- this.pauseBeforeShutdown = pauseBeforeShutdown;
- }
- public void setPassword(String pwd) {
- this.password = pwd;
- }
- public void setReceiveTimeOut(long receiveTimeOut) {
- this.receiveTimeOut = receiveTimeOut;
- }
- public void setSleepTime(long sleepTime) {
- this.sleepTime = sleepTime;
- }
- public void setSubject(String subject) {
- this.subject = subject;
- }
- public void setTopic(boolean topic) {
- this.topic = topic;
- }
- public void setQueue(boolean queue) {
- this.topic = !queue;
- }
- public void setTransacted(boolean transacted) {
- this.transacted = transacted;
- }
- public void setUrl(String url) {
- this.url = url;
- }
- public void setUser(String user) {
- this.user = user;
- }
- public void setVerbose(boolean verbose) {
- this.verbose = verbose;
- }
+ private boolean running;
+
+ private Session session;
+ private Destination destination;
+ private MessageProducer replyProducer;
+
+ private boolean pauseBeforeShutdown;
+ private boolean verbose = true;
+ private int maxiumMessages;
+ private String subject = "TOOL.DEFAULT";
+ private boolean topic;
+ private String user = ActiveMQConnection.DEFAULT_USER;
+ private String password = ActiveMQConnection.DEFAULT_PASSWORD;
+ private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
+ private boolean transacted;
+ private boolean durable;
+ private String clientId;
+ private int ackMode = Session.AUTO_ACKNOWLEDGE;
+ private String consumerName = "James";
+ private long sleepTime;
+ private long receiveTimeOut;
+
+ public static void main(String[] args) {
+ ConsumerTool consumerTool = new ConsumerTool();
+ String[] unknown = CommandLineSupport.setOptions(consumerTool, args);
+ if (unknown.length > 0) {
+ System.out.println("Unknown options: " + Arrays.toString(unknown));
+ System.exit(-1);
+ }
+ consumerTool.run();
+ }
+
+ public void run() {
+ try {
+ running = true;
+
+ System.out.println("Connecting to URL: " + url);
+ System.out.println("Consuming " + (topic ? "topic" : "queue") + ": " + subject);
+ System.out.println("Using a " + (durable ? "durable" : "non-durable") + " subscription");
+
+ ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
+ Connection connection = connectionFactory.createConnection();
+ if (durable && clientId != null && clientId.length() > 0 && !"null".equals(clientId)) {
+ connection.setClientID(clientId);
+ }
+ connection.setExceptionListener(this);
+ connection.start();
+
+ session = connection.createSession(transacted, ackMode);
+ if (topic) {
+ destination = session.createTopic(subject);
+ } else {
+ destination = session.createQueue(subject);
+ }
+
+ replyProducer = session.createProducer(null);
+ replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+ MessageConsumer consumer = null;
+ if (durable && topic) {
+ consumer = session.createDurableSubscriber((Topic)destination, consumerName);
+ } else {
+ consumer = session.createConsumer(destination);
+ }
+
+ if (maxiumMessages > 0) {
+ consumeMessagesAndClose(connection, session, consumer);
+ } else {
+ if (receiveTimeOut == 0) {
+ consumer.setMessageListener(this);
+ } else {
+ consumeMessagesAndClose(connection, session, consumer, receiveTimeOut);
+ }
+ }
+
+ } catch (Exception e) {
+ System.out.println("Caught: " + e);
+ e.printStackTrace();
+ }
+ }
+
+ public void onMessage(Message message) {
+ try {
+
+ if (message instanceof TextMessage) {
+ TextMessage txtMsg = (TextMessage)message;
+ if (verbose) {
+
+ String msg = txtMsg.getText();
+ if (msg.length() > 50) {
+ msg = msg.substring(0, 50) + "...";
+ }
+
+ System.out.println("Received: " + msg);
+ }
+ } else {
+ if (verbose) {
+ System.out.println("Received: " + message);
+ }
+ }
+
+ if (message.getJMSReplyTo() != null) {
+ replyProducer.send(message.getJMSReplyTo(), session.createTextMessage("Reply: " + message.getJMSMessageID()));
+ }
+
+ if (transacted) {
+ session.commit();
+ } else if (ackMode == Session.CLIENT_ACKNOWLEDGE) {
+ message.acknowledge();
+ }
+
+ } catch (JMSException e) {
+ System.out.println("Caught: " + e);
+ e.printStackTrace();
+ } finally {
+ if (sleepTime > 0) {
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ }
+
+ public synchronized void onException(JMSException ex) {
+ System.out.println("JMS Exception occured. Shutting down client.");
+ running = false;
+ }
+
+ synchronized boolean isRunning() {
+ return running;
+ }
+
+ protected void consumeMessagesAndClose(Connection connection, Session session, MessageConsumer consumer) throws JMSException, IOException {
+ System.out.println("We are about to wait until we consume: " + maxiumMessages + " message(s) then we will shutdown");
+
+ for (int i = 0; i < maxiumMessages && isRunning();) {
+ Message message = consumer.receive(1000);
+ if (message != null) {
+ i++;
+ onMessage(message);
+ }
+ }
+ System.out.println("Closing connection");
+ consumer.close();
+ session.close();
+ connection.close();
+ if (pauseBeforeShutdown) {
+ System.out.println("Press return to shut down");
+ System.in.read();
+ }
+ }
+
+ protected void consumeMessagesAndClose(Connection connection, Session session, MessageConsumer consumer, long timeout) throws JMSException, IOException {
+ System.out.println("We will consume messages while they continue to be delivered within: " + timeout + " ms, and then we will shutdown");
+
+ Message message;
+ while ((message = consumer.receive(timeout)) != null) {
+ onMessage(message);
+ }
+
+ System.out.println("Closing connection");
+ consumer.close();
+ session.close();
+ connection.close();
+ if (pauseBeforeShutdown) {
+ System.out.println("Press return to shut down");
+ System.in.read();
+ }
+ }
+
+ public void setAckMode(String ackMode) {
+ if ("CLIENT_ACKNOWLEDGE".equals(ackMode)) {
+ this.ackMode = Session.CLIENT_ACKNOWLEDGE;
+ }
+ if ("AUTO_ACKNOWLEDGE".equals(ackMode)) {
+ this.ackMode = Session.AUTO_ACKNOWLEDGE;
+ }
+ if ("DUPS_OK_ACKNOWLEDGE".equals(ackMode)) {
+ this.ackMode = Session.DUPS_OK_ACKNOWLEDGE;
+ }
+ if ("SESSION_TRANSACTED".equals(ackMode)) {
+ this.ackMode = Session.SESSION_TRANSACTED;
+ }
+ }
+
+ public void setClientId(String clientID) {
+ this.clientId = clientID;
+ }
+
+ public void setConsumerName(String consumerName) {
+ this.consumerName = consumerName;
+ }
+
+ public void setDurable(boolean durable) {
+ this.durable = durable;
+ }
+
+ public void setMaxiumMessages(int maxiumMessages) {
+ this.maxiumMessages = maxiumMessages;
+ }
+
+ public void setPauseBeforeShutdown(boolean pauseBeforeShutdown) {
+ this.pauseBeforeShutdown = pauseBeforeShutdown;
+ }
+
+ public void setPassword(String pwd) {
+ this.password = pwd;
+ }
+
+ public void setReceiveTimeOut(long receiveTimeOut) {
+ this.receiveTimeOut = receiveTimeOut;
+ }
+
+ public void setSleepTime(long sleepTime) {
+ this.sleepTime = sleepTime;
+ }
+
+ public void setSubject(String subject) {
+ this.subject = subject;
+ }
+
+ public void setTopic(boolean topic) {
+ this.topic = topic;
+ }
+
+ public void setQueue(boolean queue) {
+ this.topic = !queue;
+ }
+
+ public void setTransacted(boolean transacted) {
+ this.transacted = transacted;
+ }
+
+ public void setUrl(String url) {
+ this.url = url;
+ }
+
+ public void setUser(String user) {
+ this.user = user;
+ }
+
+ public void setVerbose(boolean verbose) {
+ this.verbose = verbose;
+ }
}
Modified: activemq/trunk/assembly/src/release/example/src/EmbeddedBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/assembly/src/release/example/src/EmbeddedBroker.java?view=diff&rev=565003&r1=565002&r2=565003
==============================================================================
--- activemq/trunk/assembly/src/release/example/src/EmbeddedBroker.java (original)
+++ activemq/trunk/assembly/src/release/example/src/EmbeddedBroker.java Sat Aug 11 22:27:21 2007
@@ -21,7 +21,11 @@
*
* @version $Revision$
*/
-public class EmbeddedBroker {
+public final class EmbeddedBroker {
+
+ private EmbeddedBroker() {
+ }
+
public static void main(String[] args) throws Exception {
BrokerService broker = new BrokerService();
broker.setUseJmx(true);
Modified: activemq/trunk/assembly/src/release/example/src/ProducerAndConsumerTool.java
URL: http://svn.apache.org/viewvc/activemq/trunk/assembly/src/release/example/src/ProducerAndConsumerTool.java?view=diff&rev=565003&r1=565002&r2=565003
==============================================================================
--- activemq/trunk/assembly/src/release/example/src/ProducerAndConsumerTool.java (original)
+++ activemq/trunk/assembly/src/release/example/src/ProducerAndConsumerTool.java Sat Aug 11 22:27:21 2007
@@ -22,30 +22,30 @@
/**
* A simple tool for producing and consuming messages
- *
+ *
* @version $Revision: 1.1.1.1 $
*/
public class ProducerAndConsumerTool extends ConsumerTool implements MessageListener {
public static void main(String[] args) {
-
- ConsumerTool consumerTool = new ConsumerTool();
- String[] unknown = CommandLineSupport.setOptions(consumerTool, args);
- HashSet set1 = new HashSet(Arrays.asList(unknown));
-
- ProducerTool producerTool = new ProducerTool();
+
+ ConsumerTool consumerTool = new ConsumerTool();
+ String[] unknown = CommandLineSupport.setOptions(consumerTool, args);
+ HashSet<String> set1 = new HashSet<String>(Arrays.asList(unknown));
+
+ ProducerTool producerTool = new ProducerTool();
unknown = CommandLineSupport.setOptions(producerTool, args);
- HashSet set2 = new HashSet(Arrays.asList(unknown));
+ HashSet<String> set2 = new HashSet<String>(Arrays.asList(unknown));
+
+ set1.retainAll(set2);
+ if (set1.size() > 0) {
+ System.out.println("Unknown options: " + set1);
+ System.exit(-1);
+ }
+
+ consumerTool.run();
+ producerTool.run();
- set1.retainAll(set2);
- if( set1.size() > 0 ) {
- System.out.println("Unknown options: "+set1);
- System.exit(-1);
- }
-
- consumerTool.run();
- producerTool.run();
-
}
}
Modified: activemq/trunk/assembly/src/release/example/src/ProducerTool.java
URL: http://svn.apache.org/viewvc/activemq/trunk/assembly/src/release/example/src/ProducerTool.java?view=diff&rev=565003&r1=565002&r2=565003
==============================================================================
--- activemq/trunk/assembly/src/release/example/src/ProducerTool.java (original)
+++ activemq/trunk/assembly/src/release/example/src/ProducerTool.java Sat Aug 11 22:27:21 2007
@@ -35,161 +35,172 @@
*/
public class ProducerTool {
- private Destination destination;
- private int messageCount = 10;
- private long sleepTime = 0L;
- private boolean verbose = true;
- private int messageSize = 255;
- private long timeToLive;
- private String user = ActiveMQConnection.DEFAULT_USER;
- private String password = ActiveMQConnection.DEFAULT_PASSWORD;
- private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
- private String subject = "TOOL.DEFAULT";
- private boolean topic = false;
- private boolean transacted = false;
- private boolean persistent = false;
-
- public static void main(String[] args) {
- ProducerTool producerTool = new ProducerTool();
- String[] unknown = CommandLineSupport.setOptions(producerTool, args);
- if( unknown.length > 0 ) {
- System.out.println("Unknown options: " + Arrays.toString(unknown));
- System.exit(-1);
- }
- producerTool.run();
- }
-
- public void run() {
- Connection connection=null;
- try {
- System.out.println("Connecting to URL: " + url);
- System.out.println("Publishing a Message with size " + messageSize+ " to " + (topic ? "topic" : "queue") + ": " + subject);
- System.out.println("Using " + (persistent ? "persistent" : "non-persistent") + " messages");
- System.out.println("Sleeping between publish " + sleepTime + " ms");
- if (timeToLive != 0) {
- System.out.println("Messages time to live " + timeToLive + " ms");
- }
-
- // Create the connection.
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
- connection = connectionFactory.createConnection();
- connection.start();
-
- // Create the session
- Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
- if (topic) {
- destination = session.createTopic(subject);
- } else {
- destination = session.createQueue(subject);
- }
-
- // Create the producer.
- MessageProducer producer = session.createProducer(destination);
- if (persistent) {
- producer.setDeliveryMode(DeliveryMode.PERSISTENT);
- } else {
- producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- }
- if (timeToLive != 0)
- producer.setTimeToLive(timeToLive);
-
- // Start sending messages
- sendLoop(session, producer);
-
- System.out.println("Done.");
-
- // Use the ActiveMQConnection interface to dump the connection stats.
- ActiveMQConnection c = (ActiveMQConnection) connection;
- c.getConnectionStats().dump(new IndentPrinter());
-
- } catch (Exception e) {
- System.out.println("Caught: " + e);
- e.printStackTrace();
- } finally {
- try {
- connection.close();
- } catch (Throwable ignore) {
- }
- }
- }
-
- protected void sendLoop(Session session, MessageProducer producer)
- throws Exception {
-
- for (int i = 0; i < messageCount || messageCount == 0; i++) {
-
- TextMessage message = session
- .createTextMessage(createMessageText(i));
-
- if (verbose) {
- String msg = message.getText();
- if (msg.length() > 50) {
- msg = msg.substring(0, 50) + "...";
- }
- System.out.println("Sending message: " + msg);
- }
-
- producer.send(message);
- if (transacted) {
- session.commit();
- }
-
- Thread.sleep(sleepTime);
-
- }
-
- }
-
- private String createMessageText(int index) {
- StringBuffer buffer = new StringBuffer(messageSize);
- buffer.append("Message: " + index + " sent at: " + new Date());
- if (buffer.length() > messageSize) {
- return buffer.substring(0, messageSize);
- }
- for (int i = buffer.length(); i < messageSize; i++) {
- buffer.append(' ');
- }
- return buffer.toString();
- }
-
-
- public void setPersistent(boolean durable) {
- this.persistent = durable;
- }
- public void setMessageCount(int messageCount) {
- this.messageCount = messageCount;
- }
- public void setMessageSize(int messageSize) {
- this.messageSize = messageSize;
- }
- public void setPassword(String pwd) {
- this.password = pwd;
- }
- public void setSleepTime(long sleepTime) {
- this.sleepTime = sleepTime;
- }
- public void setSubject(String subject) {
- this.subject = subject;
- }
- public void setTimeToLive(long timeToLive) {
- this.timeToLive = timeToLive;
- }
- public void setTopic(boolean topic) {
- this.topic = topic;
- }
- public void setQueue(boolean queue) {
- this.topic = !queue;
- }
- public void setTransacted(boolean transacted) {
- this.transacted = transacted;
- }
- public void setUrl(String url) {
- this.url = url;
- }
- public void setUser(String user) {
- this.user = user;
- }
- public void setVerbose(boolean verbose) {
- this.verbose = verbose;
- }
+ private Destination destination;
+ private int messageCount = 10;
+ private long sleepTime;
+ private boolean verbose = true;
+ private int messageSize = 255;
+ private long timeToLive;
+ private String user = ActiveMQConnection.DEFAULT_USER;
+ private String password = ActiveMQConnection.DEFAULT_PASSWORD;
+ private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
+ private String subject = "TOOL.DEFAULT";
+ private boolean topic;
+ private boolean transacted;
+ private boolean persistent;
+
+ public static void main(String[] args) {
+ ProducerTool producerTool = new ProducerTool();
+ String[] unknown = CommandLineSupport.setOptions(producerTool, args);
+ if (unknown.length > 0) {
+ System.out.println("Unknown options: " + Arrays.toString(unknown));
+ System.exit(-1);
+ }
+ producerTool.run();
+ }
+
+ public void run() {
+ Connection connection = null;
+ try {
+ System.out.println("Connecting to URL: " + url);
+ System.out.println("Publishing a Message with size " + messageSize + " to " + (topic ? "topic" : "queue") + ": " + subject);
+ System.out.println("Using " + (persistent ? "persistent" : "non-persistent") + " messages");
+ System.out.println("Sleeping between publish " + sleepTime + " ms");
+ if (timeToLive != 0) {
+ System.out.println("Messages time to live " + timeToLive + " ms");
+ }
+
+ // Create the connection.
+ ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
+ connection = connectionFactory.createConnection();
+ connection.start();
+
+ // Create the session
+ Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
+ if (topic) {
+ destination = session.createTopic(subject);
+ } else {
+ destination = session.createQueue(subject);
+ }
+
+ // Create the producer.
+ MessageProducer producer = session.createProducer(destination);
+ if (persistent) {
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+ } else {
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ }
+ if (timeToLive != 0) {
+ producer.setTimeToLive(timeToLive);
+ }
+
+ // Start sending messages
+ sendLoop(session, producer);
+
+ System.out.println("Done.");
+
+ // Use the ActiveMQConnection interface to dump the connection
+ // stats.
+ ActiveMQConnection c = (ActiveMQConnection)connection;
+ c.getConnectionStats().dump(new IndentPrinter());
+
+ } catch (Exception e) {
+ System.out.println("Caught: " + e);
+ e.printStackTrace();
+ } finally {
+ try {
+ connection.close();
+ } catch (Throwable ignore) {
+ }
+ }
+ }
+
+ protected void sendLoop(Session session, MessageProducer producer) throws Exception {
+
+ for (int i = 0; i < messageCount || messageCount == 0; i++) {
+
+ TextMessage message = session.createTextMessage(createMessageText(i));
+
+ if (verbose) {
+ String msg = message.getText();
+ if (msg.length() > 50) {
+ msg = msg.substring(0, 50) + "...";
+ }
+ System.out.println("Sending message: " + msg);
+ }
+
+ producer.send(message);
+ if (transacted) {
+ session.commit();
+ }
+
+ Thread.sleep(sleepTime);
+
+ }
+
+ }
+
+ private String createMessageText(int index) {
+ StringBuffer buffer = new StringBuffer(messageSize);
+ buffer.append("Message: " + index + " sent at: " + new Date());
+ if (buffer.length() > messageSize) {
+ return buffer.substring(0, messageSize);
+ }
+ for (int i = buffer.length(); i < messageSize; i++) {
+ buffer.append(' ');
+ }
+ return buffer.toString();
+ }
+
+ public void setPersistent(boolean durable) {
+ this.persistent = durable;
+ }
+
+ public void setMessageCount(int messageCount) {
+ this.messageCount = messageCount;
+ }
+
+ public void setMessageSize(int messageSize) {
+ this.messageSize = messageSize;
+ }
+
+ public void setPassword(String pwd) {
+ this.password = pwd;
+ }
+
+ public void setSleepTime(long sleepTime) {
+ this.sleepTime = sleepTime;
+ }
+
+ public void setSubject(String subject) {
+ this.subject = subject;
+ }
+
+ public void setTimeToLive(long timeToLive) {
+ this.timeToLive = timeToLive;
+ }
+
+ public void setTopic(boolean topic) {
+ this.topic = topic;
+ }
+
+ public void setQueue(boolean queue) {
+ this.topic = !queue;
+ }
+
+ public void setTransacted(boolean transacted) {
+ this.transacted = transacted;
+ }
+
+ public void setUrl(String url) {
+ this.url = url;
+ }
+
+ public void setUser(String user) {
+ this.user = user;
+ }
+
+ public void setVerbose(boolean verbose) {
+ this.verbose = verbose;
+ }
}
Modified: activemq/trunk/assembly/src/release/example/src/RequesterTool.java
URL: http://svn.apache.org/viewvc/activemq/trunk/assembly/src/release/example/src/RequesterTool.java?view=diff&rev=565003&r1=565002&r2=565003
==============================================================================
--- activemq/trunk/assembly/src/release/example/src/RequesterTool.java (original)
+++ activemq/trunk/assembly/src/release/example/src/RequesterTool.java Sat Aug 11 22:27:21 2007
@@ -37,205 +37,221 @@
*/
public class RequesterTool {
- private int messageCount = 10;
- private long sleepTime = 0L;
- private boolean verbose = true;
- private int messageSize = 255;
- private long timeToLive;
- private String subject = "TOOL.DEFAULT";
- private String replySubject;
- private boolean topic = false;
- private String user = ActiveMQConnection.DEFAULT_USER;
- private String password = ActiveMQConnection.DEFAULT_PASSWORD;
- private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
- private boolean transacted = false;
- private boolean persistent = false;
- private String clientId;
-
- private Destination destination;
- private Destination replyDest;
- private MessageProducer producer;
- private MessageConsumer consumer;
- private Session session;
-
- public static void main(String[] args) {
- RequesterTool requesterTool = new RequesterTool();
- String[] unknown = CommandLineSupport.setOptions(requesterTool, args);
- if (unknown.length > 0) {
- System.out.println("Unknown options: " + Arrays.toString(unknown));
- System.exit(-1);
- }
- requesterTool.run();
- }
-
- public void run() {
-
- Connection connection=null;
- try {
-
- System.out.println("Connecting to URL: " + url);
- System.out.println("Publishing a Message with size " + messageSize + " to " + (topic ? "topic" : "queue") + ": " + subject);
- System.out.println("Using " + (persistent ? "persistent" : "non-persistent") + " messages");
- System.out.println("Sleeping between publish " + sleepTime + " ms");
-
- // Create the connection
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
- connection = connectionFactory.createConnection();
- if (persistent && clientId != null && clientId.length() > 0 && !"null".equals(clientId)) {
- connection.setClientID(clientId);
- }
- connection.start();
-
- // Create the Session
- session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
-
- // And the Destinations..
- if (topic) {
- destination = session.createTopic(subject);
- if( replySubject==null || replySubject.equals("") )
- replyDest = session.createTemporaryTopic();
- else
- replyDest = session.createTopic(replySubject);
- } else {
- destination = session.createQueue(subject);
- if( replySubject==null || replySubject.equals("") )
- replyDest = session.createTemporaryQueue();
- else
- replyDest = session.createQueue(replySubject);
- }
- System.out.println("Reply Destination: " + replyDest);
-
- // Create the producer
- producer = session.createProducer(destination);
- if (persistent) {
- producer.setDeliveryMode(DeliveryMode.PERSISTENT);
- } else {
- producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- }
- if (timeToLive != 0) {
- System.out.println("Messages time to live " + timeToLive + " ms");
- producer.setTimeToLive(timeToLive);
- }
-
- // Create the reply consumer
- consumer = session.createConsumer(replyDest);
-
- // Start sending reqests.
- requestLoop();
-
- System.out.println("Done.");
-
- // Use the ActiveMQConnection interface to dump the connection stats.
- ActiveMQConnection c = (ActiveMQConnection) connection;
- c.getConnectionStats().dump(new IndentPrinter());
-
- } catch (Exception e) {
- System.out.println("Caught: " + e);
- e.printStackTrace();
- } finally {
- try {
- connection.close();
- } catch (Throwable ignore) {
- }
- }
- }
-
- protected void requestLoop() throws Exception {
-
- for (int i = 0; i < messageCount || messageCount == 0; i++) {
-
- TextMessage message = session.createTextMessage(createMessageText(i));
- message.setJMSReplyTo(replyDest);
-
- if (verbose) {
- String msg = message.getText();
- if (msg.length() > 50) {
- msg = msg.substring(0, 50) + "...";
- }
- System.out.println("Sending message: " + msg);
- }
-
- producer.send(message);
- if (transacted) {
- session.commit();
- }
-
- System.out.println("Waiting for reponse message...");
- Message message2 = consumer.receive();
- if (message2 instanceof TextMessage) {
- System.out.println("Reponse message: " + ((TextMessage) message2).getText());
- } else {
- System.out.println("Reponse message: " + message2);
- }
- if (transacted) {
- session.commit();
- }
-
- Thread.sleep(sleepTime);
-
- }
- }
-
- /**
- * @param i
- * @return
- */
- private String createMessageText(int index) {
- StringBuffer buffer = new StringBuffer(messageSize);
- buffer.append("Message: " + index + " sent at: " + new Date());
- if (buffer.length() > messageSize) {
- return buffer.substring(0, messageSize);
- }
- for (int i = buffer.length(); i < messageSize; i++) {
- buffer.append(' ');
- }
- return buffer.toString();
- }
-
-
- public void setClientId(String clientId) {
- this.clientId = clientId;
- }
- public void setPersistent(boolean durable) {
- this.persistent = durable;
- }
- public void setMessageCount(int messageCount) {
- this.messageCount = messageCount;
- }
- public void setMessageSize(int messageSize) {
- this.messageSize = messageSize;
- }
- public void setPassword(String password) {
- this.password = password;
- }
- public void setSleepTime(long sleepTime) {
- this.sleepTime = sleepTime;
- }
- public void setSubject(String subject) {
- this.subject = subject;
- }
- public void setTimeToLive(long timeToLive) {
- this.timeToLive = timeToLive;
- }
- public void setTopic(boolean topic) {
- this.topic = topic;
- }
- public void setQueue(boolean queue) {
- this.topic = !queue;
- }
- public void setTransacted(boolean transacted) {
- this.transacted = transacted;
- }
- public void setUrl(String url) {
- this.url = url;
- }
- public void setUser(String user) {
- this.user = user;
- }
- public void setVerbose(boolean verbose) {
- this.verbose = verbose;
- }
- public void setReplySubject(String replySubject) {
- this.replySubject = replySubject;
- }
+ private int messageCount = 10;
+ private long sleepTime;
+ private boolean verbose = true;
+ private int messageSize = 255;
+ private long timeToLive;
+ private String subject = "TOOL.DEFAULT";
+ private String replySubject;
+ private boolean topic;
+ private String user = ActiveMQConnection.DEFAULT_USER;
+ private String password = ActiveMQConnection.DEFAULT_PASSWORD;
+ private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
+ private boolean transacted;
+ private boolean persistent;
+ private String clientId;
+
+ private Destination destination;
+ private Destination replyDest;
+ private MessageProducer producer;
+ private MessageConsumer consumer;
+ private Session session;
+
+ public static void main(String[] args) {
+ RequesterTool requesterTool = new RequesterTool();
+ String[] unknown = CommandLineSupport.setOptions(requesterTool, args);
+ if (unknown.length > 0) {
+ System.out.println("Unknown options: " + Arrays.toString(unknown));
+ System.exit(-1);
+ }
+ requesterTool.run();
+ }
+
+ public void run() {
+
+ Connection connection = null;
+ try {
+
+ System.out.println("Connecting to URL: " + url);
+ System.out.println("Publishing a Message with size " + messageSize + " to " + (topic ? "topic" : "queue") + ": " + subject);
+ System.out.println("Using " + (persistent ? "persistent" : "non-persistent") + " messages");
+ System.out.println("Sleeping between publish " + sleepTime + " ms");
+
+ // Create the connection
+ ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
+ connection = connectionFactory.createConnection();
+ if (persistent && clientId != null && clientId.length() > 0 && !"null".equals(clientId)) {
+ connection.setClientID(clientId);
+ }
+ connection.start();
+
+ // Create the Session
+ session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
+
+ // And the Destinations..
+ if (topic) {
+ destination = session.createTopic(subject);
+ if (replySubject == null || replySubject.equals("")) {
+ replyDest = session.createTemporaryTopic();
+ } else {
+ replyDest = session.createTopic(replySubject);
+ }
+ } else {
+ destination = session.createQueue(subject);
+ if (replySubject == null || replySubject.equals("")) {
+ replyDest = session.createTemporaryQueue();
+ } else {
+ replyDest = session.createQueue(replySubject);
+ }
+ }
+ System.out.println("Reply Destination: " + replyDest);
+
+ // Create the producer
+ producer = session.createProducer(destination);
+ if (persistent) {
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+ } else {
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ }
+ if (timeToLive != 0) {
+ System.out.println("Messages time to live " + timeToLive + " ms");
+ producer.setTimeToLive(timeToLive);
+ }
+
+ // Create the reply consumer
+ consumer = session.createConsumer(replyDest);
+
+ // Start sending reqests.
+ requestLoop();
+
+ System.out.println("Done.");
+
+ // Use the ActiveMQConnection interface to dump the connection
+ // stats.
+ ActiveMQConnection c = (ActiveMQConnection)connection;
+ c.getConnectionStats().dump(new IndentPrinter());
+
+ } catch (Exception e) {
+ System.out.println("Caught: " + e);
+ e.printStackTrace();
+ } finally {
+ try {
+ connection.close();
+ } catch (Throwable ignore) {
+ }
+ }
+ }
+
+ protected void requestLoop() throws Exception {
+
+ for (int i = 0; i < messageCount || messageCount == 0; i++) {
+
+ TextMessage message = session.createTextMessage(createMessageText(i));
+ message.setJMSReplyTo(replyDest);
+
+ if (verbose) {
+ String msg = message.getText();
+ if (msg.length() > 50) {
+ msg = msg.substring(0, 50) + "...";
+ }
+ System.out.println("Sending message: " + msg);
+ }
+
+ producer.send(message);
+ if (transacted) {
+ session.commit();
+ }
+
+ System.out.println("Waiting for reponse message...");
+ Message message2 = consumer.receive();
+ if (message2 instanceof TextMessage) {
+ System.out.println("Reponse message: " + ((TextMessage)message2).getText());
+ } else {
+ System.out.println("Reponse message: " + message2);
+ }
+ if (transacted) {
+ session.commit();
+ }
+
+ Thread.sleep(sleepTime);
+
+ }
+ }
+
+ /**
+ * @param i
+ * @return
+ */
+ private String createMessageText(int index) {
+ StringBuffer buffer = new StringBuffer(messageSize);
+ buffer.append("Message: " + index + " sent at: " + new Date());
+ if (buffer.length() > messageSize) {
+ return buffer.substring(0, messageSize);
+ }
+ for (int i = buffer.length(); i < messageSize; i++) {
+ buffer.append(' ');
+ }
+ return buffer.toString();
+ }
+
+ public void setClientId(String clientId) {
+ this.clientId = clientId;
+ }
+
+ public void setPersistent(boolean durable) {
+ this.persistent = durable;
+ }
+
+ public void setMessageCount(int messageCount) {
+ this.messageCount = messageCount;
+ }
+
+ public void setMessageSize(int messageSize) {
+ this.messageSize = messageSize;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public void setSleepTime(long sleepTime) {
+ this.sleepTime = sleepTime;
+ }
+
+ public void setSubject(String subject) {
+ this.subject = subject;
+ }
+
+ public void setTimeToLive(long timeToLive) {
+ this.timeToLive = timeToLive;
+ }
+
+ public void setTopic(boolean topic) {
+ this.topic = topic;
+ }
+
+ public void setQueue(boolean queue) {
+ this.topic = !queue;
+ }
+
+ public void setTransacted(boolean transacted) {
+ this.transacted = transacted;
+ }
+
+ public void setUrl(String url) {
+ this.url = url;
+ }
+
+ public void setUser(String user) {
+ this.user = user;
+ }
+
+ public void setVerbose(boolean verbose) {
+ this.verbose = verbose;
+ }
+
+ public void setReplySubject(String replySubject) {
+ this.replySubject = replySubject;
+ }
}
Modified: activemq/trunk/assembly/src/release/example/src/TopicListener.java
URL: http://svn.apache.org/viewvc/activemq/trunk/assembly/src/release/example/src/TopicListener.java?view=diff&rev=565003&r1=565002&r2=565003
==============================================================================
--- activemq/trunk/assembly/src/release/example/src/TopicListener.java (original)
+++ activemq/trunk/assembly/src/release/example/src/TopicListener.java Sat Aug 11 22:27:21 2007
@@ -29,94 +29,90 @@
import org.apache.activemq.ActiveMQConnectionFactory;
/**
- * Use in conjunction with TopicPublisher to test the performance of ActiveMQ Topics.
+ * Use in conjunction with TopicPublisher to test the performance of ActiveMQ
+ * Topics.
*/
public class TopicListener implements MessageListener {
-
- private Connection connection;
- private MessageProducer producer;
- private Session session;
- private int count;
- private long start;
- private Topic topic;
- private Topic control;
-
-// private String url="tcp://localhost:61616?jms.dispatchAsync=true&jms.useAsyncSend=true&jms.optimizeAcknowledge=true&jms.disableTimeStampsByDefault=true&jms.optimizedMessageDispatch=true&wireFormat.cacheEnabled=false&wireFormat.tightEncodingEnabled=false";
- private String url="tcp://localhost:61616";
-
- public static void main(String[] argv) throws Exception {
- TopicListener l = new TopicListener();
- String[] unknown = CommandLineSupport.setOptions(l, argv);
- if (unknown.length > 0) {
- System.out.println("Unknown options: " + Arrays.toString(unknown));
- System.exit(-1);
- }
- l.run();
- }
-
- public void run() throws JMSException {
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
- connection = factory.createConnection();
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- topic = session.createTopic("topictest.messages");
- control = session.createTopic("topictest.control");
-
- MessageConsumer consumer = session.createConsumer(topic);
- consumer.setMessageListener(this);
-
- connection.start();
-
- producer = session.createProducer(control);
- System.out.println("Waiting for messages...");
- }
-
- private static boolean checkText(Message m, String s)
- {
- try
- {
- return m instanceof TextMessage && ((TextMessage) m).getText().equals(s);
+
+ private Connection connection;
+ private MessageProducer producer;
+ private Session session;
+ private int count;
+ private long start;
+ private Topic topic;
+ private Topic control;
+
+ private String url = "tcp://localhost:61616";
+
+ public static void main(String[] argv) throws Exception {
+ TopicListener l = new TopicListener();
+ String[] unknown = CommandLineSupport.setOptions(l, argv);
+ if (unknown.length > 0) {
+ System.out.println("Unknown options: " + Arrays.toString(unknown));
+ System.exit(-1);
}
- catch (JMSException e)
- {
+ l.run();
+ }
+
+ public void run() throws JMSException {
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
+ connection = factory.createConnection();
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ topic = session.createTopic("topictest.messages");
+ control = session.createTopic("topictest.control");
+
+ MessageConsumer consumer = session.createConsumer(topic);
+ consumer.setMessageListener(this);
+
+ connection.start();
+
+ producer = session.createProducer(control);
+ System.out.println("Waiting for messages...");
+ }
+
+ private static boolean checkText(Message m, String s) {
+ try {
+ return m instanceof TextMessage && ((TextMessage)m).getText().equals(s);
+ } catch (JMSException e) {
e.printStackTrace(System.out);
return false;
}
}
+ public void onMessage(Message message) {
+ if (checkText(message, "SHUTDOWN")) {
+
+ try {
+ connection.close();
+ } catch (Exception e) {
+ e.printStackTrace(System.out);
+ }
+
+ } else if (checkText(message, "REPORT")) {
+ // send a report:
+ try {
+ long time = System.currentTimeMillis() - start;
+ String msg = "Received " + count + " in " + time + "ms";
+ producer.send(session.createTextMessage(msg));
+ } catch (Exception e) {
+ e.printStackTrace(System.out);
+ }
+ count = 0;
+
+ } else {
+
+ if (count == 0) {
+ start = System.currentTimeMillis();
+ }
+
+ if (++count % 1000 == 0) {
+ System.out.println("Received " + count + " messages.");
+ }
+ }
+ }
- public void onMessage(Message message) {
- if ( checkText(message, "SHUTDOWN") ) {
-
- try {
- connection.close();
- } catch (Exception e) {
- e.printStackTrace(System.out);
- }
-
- } else if (checkText(message, "REPORT")) {
- // send a report:
- try {
- long time = (System.currentTimeMillis() - start);
- String msg = "Received " + count + " in " + time + "ms";
- producer.send(session.createTextMessage(msg));
- } catch (Exception e) {
- e.printStackTrace(System.out);
- }
- count = 0;
-
- } else {
-
- if (count==0) {
- start = System.currentTimeMillis();
- }
-
- if (++count % 1000 == 0)
- System.out.println("Received " + count + " messages.");
- }
- }
-
- public void setUrl(String url) {
- this.url = url;
- }
+ public void setUrl(String url) {
+ this.url = url;
+ }
}
Modified: activemq/trunk/assembly/src/release/example/src/TopicPublisher.java
URL: http://svn.apache.org/viewvc/activemq/trunk/assembly/src/release/example/src/TopicPublisher.java?view=diff&rev=565003&r1=565002&r2=565003
==============================================================================
--- activemq/trunk/assembly/src/release/example/src/TopicPublisher.java (original)
+++ activemq/trunk/assembly/src/release/example/src/TopicPublisher.java Sat Aug 11 22:27:21 2007
@@ -16,198 +16,190 @@
*/
import java.util.Arrays;
-import javax.jms.*;
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
- * Use in conjunction with TopicListener to test the performance of ActiveMQ Topics.
+ * Use in conjunction with TopicListener to test the performance of ActiveMQ
+ * Topics.
*/
-public class TopicPublisher implements MessageListener
-{
+public class TopicPublisher implements MessageListener {
+
+ private static final char[] DATA = "abcdefghijklmnopqrstuvwxyz".toCharArray();
+
private final Object mutex = new Object();
private Connection connection;
private Session session;
private MessageProducer publisher;
- private Topic topic;
- private Topic control;
-
-// private String url="tcp://localhost:61616?jms.dispatchAsync=true&jms.useAsyncSend=true&jms.optimizeAcknowledge=true&jms.disableTimeStampsByDefault=true&jms.optimizedMessageDispatch=true&wireFormat.cacheEnabled=false&wireFormat.tightEncodingEnabled=false";
- private String url="tcp://localhost:61616";
- private int size=256;
- private int subscribers=1;
- private int remaining;
- private int messages=10000;
- private long delay;
- private int batch=40;
-
- private byte[] payload;
- private static final char[] DATA = "abcdefghijklmnopqrstuvwxyz".toCharArray();
-
- public static void main(String[] argv) throws Exception
- {
- TopicPublisher p = new TopicPublisher();
- String[] unknown = CommandLineSupport.setOptions(p, argv);
- if (unknown.length > 0) {
- System.out.println("Unknown options: " + Arrays.toString(unknown));
- System.exit(-1);
- }
- p.run();
- }
+ private Topic topic;
+ private Topic control;
- private void run() throws Exception
- {
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
- connection = factory.createConnection();
+ private String url = "tcp://localhost:61616";
+ private int size = 256;
+ private int subscribers = 1;
+ private int remaining;
+ private int messages = 10000;
+ private long delay;
+ private int batch = 40;
+
+ private byte[] payload;
+
+ public static void main(String[] argv) throws Exception {
+ TopicPublisher p = new TopicPublisher();
+ String[] unknown = CommandLineSupport.setOptions(p, argv);
+ if (unknown.length > 0) {
+ System.out.println("Unknown options: " + Arrays.toString(unknown));
+ System.exit(-1);
+ }
+ p.run();
+ }
+
+ private void run() throws Exception {
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
+ connection = factory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- topic = session.createTopic("topictest.messages");
- control = session.createTopic("topictest.control");
-
+ topic = session.createTopic("topictest.messages");
+ control = session.createTopic("topictest.control");
+
publisher = session.createProducer(topic);
publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-
+
payload = new byte[size];
- for(int i = 0; i < size; i++)
- {
- payload[i] = (byte) DATA[i % DATA.length];
+ for (int i = 0; i < size; i++) {
+ payload[i] = (byte)DATA[i % DATA.length];
}
session.createConsumer(control).setMessageListener(this);
connection.start();
long[] times = new long[batch];
- for(int i = 0; i < batch; i++)
- {
- if(i > 0) Thread.sleep(delay*1000);
+ for (int i = 0; i < batch; i++) {
+ if (i > 0) {
+ Thread.sleep(delay * 1000);
+ }
times[i] = batch(messages);
- System.out.println("Batch " + (i+1) + " of " + batch + " completed in " + times[i] + " ms.");
+ System.out.println("Batch " + (i + 1) + " of " + batch + " completed in " + times[i] + " ms.");
}
long min = min(times);
long max = max(times);
System.out.println("min: " + min + ", max: " + max + " avg: " + avg(times, min, max));
- //request shutdown
+ // request shutdown
publisher.send(session.createTextMessage("SHUTDOWN"));
connection.stop();
connection.close();
}
- private long batch(int msgCount) throws Exception
- {
+ private long batch(int msgCount) throws Exception {
long start = System.currentTimeMillis();
- remaining=subscribers;
+ remaining = subscribers;
publish();
waitForCompletion();
return System.currentTimeMillis() - start;
}
- private void publish() throws Exception
- {
+ private void publish() throws Exception {
- //send events
+ // send events
BytesMessage msg = session.createBytesMessage();
msg.writeBytes(payload);
- for (int i = 0; i < messages; i++)
- {
+ for (int i = 0; i < messages; i++) {
publisher.send(msg);
- if ((i + 1) % 1000 == 0)
- {
+ if ((i + 1) % 1000 == 0) {
System.out.println("Sent " + (i + 1) + " messages");
}
}
- //request report
+ // request report
publisher.send(session.createTextMessage("REPORT"));
}
- private void waitForCompletion() throws Exception
- {
+ private void waitForCompletion() throws Exception {
System.out.println("Waiting for completion...");
- synchronized (mutex)
- {
- while (remaining > 0)
- {
+ synchronized (mutex) {
+ while (remaining > 0) {
mutex.wait();
}
}
}
-
- public void onMessage(Message message)
- {
- synchronized (mutex)
- {
+ public void onMessage(Message message) {
+ synchronized (mutex) {
System.out.println("Received report " + getReport(message) + " " + --remaining + " remaining");
- if (remaining == 0)
- {
- mutex.notify();
+ if (remaining == 0) {
+ mutex.notify();
}
}
}
-
- Object getReport(Message m)
- {
- try
- {
- return ((TextMessage) m).getText();
- }
- catch (JMSException e)
- {
+
+ Object getReport(Message m) {
+ try {
+ return ((TextMessage)m).getText();
+ } catch (JMSException e) {
e.printStackTrace(System.out);
return e.toString();
}
- }
+ }
- static long min(long[] times)
- {
+ static long min(long[] times) {
long min = times.length > 0 ? times[0] : 0;
- for(int i = 0; i < times.length; i++)
- {
+ for (int i = 0; i < times.length; i++) {
min = Math.min(min, times[i]);
}
return min;
}
- static long max(long[] times)
- {
+ static long max(long[] times) {
long max = times.length > 0 ? times[0] : 0;
- for(int i = 0; i < times.length; i++)
- {
+ for (int i = 0; i < times.length; i++) {
max = Math.max(max, times[i]);
}
return max;
}
- static long avg(long[] times, long min, long max)
- {
+ static long avg(long[] times, long min, long max) {
long sum = 0;
- for(int i = 0; i < times.length; i++)
- {
+ for (int i = 0; i < times.length; i++) {
sum += times[i];
}
sum -= min;
sum -= max;
- return (sum / times.length - 2);
+ return sum / times.length - 2;
+ }
+
+ public void setBatch(int batch) {
+ this.batch = batch;
+ }
+
+ public void setDelay(long delay) {
+ this.delay = delay;
+ }
+
+ public void setMessages(int messages) {
+ this.messages = messages;
+ }
+
+ public void setSize(int size) {
+ this.size = size;
}
- public void setBatch(int batch) {
- this.batch = batch;
- }
- public void setDelay(long delay) {
- this.delay = delay;
- }
- public void setMessages(int messages) {
- this.messages = messages;
- }
- public void setSize(int size) {
- this.size = size;
- }
- public void setSubscribers(int subscribers) {
- this.subscribers = subscribers;
- }
- public void setUrl(String url) {
- this.url = url;
- }
+ public void setSubscribers(int subscribers) {
+ this.subscribers = subscribers;
+ }
+
+ public void setUrl(String url) {
+ this.url = url;
+ }
}
Modified: activemq/trunk/assembly/src/test/java/org/apache/activemq/benchmark/BenchmarkSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/assembly/src/test/java/org/apache/activemq/benchmark/BenchmarkSupport.java?view=diff&rev=565003&r1=565002&r2=565003
==============================================================================
--- activemq/trunk/assembly/src/test/java/org/apache/activemq/benchmark/BenchmarkSupport.java (original)
+++ activemq/trunk/assembly/src/test/java/org/apache/activemq/benchmark/BenchmarkSupport.java Sat Aug 11 22:27:21 2007
@@ -16,23 +16,22 @@
*/
package org.apache.activemq.benchmark;
+import java.text.NumberFormat;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.util.IdGenerator;
-
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Session;
-import java.text.NumberFormat;
-import java.util.ArrayList;
-import java.util.List;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.util.IdGenerator;
/**
* Abstract base class for some simple benchmark tools
- *
+ *
* @author James Strachan
* @version $Revision$
*/
@@ -41,15 +40,14 @@
protected int connectionCount = 1;
protected int batch = 1000;
protected Destination destination;
- private boolean topic = true;
- private boolean durable = false;
+ protected String[] subjects;
+ private boolean topic = true;
+ private boolean durable;
private ActiveMQConnectionFactory factory;
private String url;
- protected String[] subjects;
- private long time = System.currentTimeMillis();
private int counter;
- private List resources = new ArrayList();
+ private List<Object> resources = new ArrayList<Object>();
private NumberFormat formatter = NumberFormat.getInstance();
private AtomicInteger connectionCounter = new AtomicInteger(0);
private IdGenerator idGenerator = new IdGenerator();
@@ -99,7 +97,9 @@
public void setSubject(String subject) {
connectionCount = 1;
- subjects = new String[]{subject};
+ subjects = new String[] {
+ subject
+ };
}
public boolean isDurable() {
@@ -144,16 +144,11 @@
protected synchronized void count(int count) {
counter += count;
/*
- if (counter > batch) {
- counter = 0;
- long current = System.currentTimeMillis();
- double end = current - time;
- end /= 1000;
- time = current;
-
- System.out.println("Processed " + batch + " messages in " + end + " (secs)");
- }
- */
+ * if (counter > batch) { counter = 0; long current =
+ * System.currentTimeMillis(); double end = current - time; end /= 1000;
+ * time = current; System.out.println("Processed " + batch + " messages
+ * in " + end + " (secs)"); }
+ */
}
protected synchronized int resetCount() {
@@ -162,7 +157,6 @@
return answer;
}
-
protected void timerLoop() {
int times = 0;
int total = 0;
@@ -172,8 +166,7 @@
while (true) {
try {
Thread.sleep(1000);
- }
- catch (InterruptedException e) {
+ } catch (InterruptedException e) {
e.printStackTrace();
}
int processed = resetCount();
@@ -183,21 +176,14 @@
times++;
}
if (times > 0) {
- average = total / (double) times;
+ average = total / (double)times;
}
- long oldtime = time;
- time = System.currentTimeMillis();
-
- double diff = time - oldtime;
-
System.out.println(getClass().getName() + " Processed: " + processed + " messages this second. Average: " + average);
if ((times % dumpVmStatsFrequency) == 0 && times != 0) {
- System.out.println("Used memory: " + asMemoryString(runtime.totalMemory() - runtime.freeMemory())
- + " Free memory: " + asMemoryString(runtime.freeMemory())
- + " Total memory: " + asMemoryString(runtime.totalMemory())
- + " Max memory: " + asMemoryString(runtime.maxMemory()));
+ System.out.println("Used memory: " + asMemoryString(runtime.totalMemory() - runtime.freeMemory()) + " Free memory: " + asMemoryString(runtime.freeMemory()) + " Total memory: "
+ + asMemoryString(runtime.totalMemory()) + " Max memory: " + asMemoryString(runtime.maxMemory()));
}
}
@@ -214,8 +200,7 @@
protected Destination createDestination(Session session, String subject) throws JMSException {
if (topic) {
return session.createTopic(subject);
- }
- else {
+ } else {
return session.createQueue(subject);
}
}