You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/07/07 02:54:47 UTC

svn commit: r419754 - in /incubator/activemq/branches/activemq-4.0/assembly/src/release/example: build.xml src/ConsumerTool.java src/RequesterTool.java

Author: chirino
Date: Thu Jul  6 17:54:47 2006
New Revision: 419754

URL: http://svn.apache.org/viewvc?rev=419754&view=rev
Log:
http://issues.apache.org/activemq/browse/AMQ-800

Added:
    incubator/activemq/branches/activemq-4.0/assembly/src/release/example/src/RequesterTool.java
Modified:
    incubator/activemq/branches/activemq-4.0/assembly/src/release/example/build.xml
    incubator/activemq/branches/activemq-4.0/assembly/src/release/example/src/ConsumerTool.java

Modified: incubator/activemq/branches/activemq-4.0/assembly/src/release/example/build.xml
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/assembly/src/release/example/build.xml?rev=419754&r1=419753&r2=419754&view=diff
==============================================================================
--- incubator/activemq/branches/activemq-4.0/assembly/src/release/example/build.xml (original)
+++ incubator/activemq/branches/activemq-4.0/assembly/src/release/example/build.xml Thu Jul  6 17:54:47 2006
@@ -166,6 +166,25 @@
 		</java>
 	</target>
 
+	<target name="requester" depends="compile" description="Runs a simple requester">
+
+		<echo>Running requester against server at $$url = ${url} for subject $$subject = ${subject}</echo>
+		<java classname="RequesterTool" fork="yes" maxmemory="100M">
+			<classpath refid="javac.classpath" />
+			<jvmarg value="-server" />
+			<arg value="${url}" />
+			<arg value="${topic}" />
+			<arg value="${subject}" />
+			<arg value="${durable}" />
+			<arg value="${max}" />
+			<arg value="${messageSize}" />
+			<arg value="${producerClientId}" />
+			<arg value="${timeToLive}" />
+			<arg value="${sleepTime}" />
+			<arg value="${transacted}" />
+		</java>
+	</target>
+
 	<target name="embedBroker" depends="compile" description="Runs a simple producer">
 
 		<echo>Running an embedded broker example</echo>

Modified: incubator/activemq/branches/activemq-4.0/assembly/src/release/example/src/ConsumerTool.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/assembly/src/release/example/src/ConsumerTool.java?rev=419754&r1=419753&r2=419754&view=diff
==============================================================================
--- incubator/activemq/branches/activemq-4.0/assembly/src/release/example/src/ConsumerTool.java (original)
+++ incubator/activemq/branches/activemq-4.0/assembly/src/release/example/src/ConsumerTool.java Thu Jul  6 17:54:47 2006
@@ -16,11 +16,13 @@
  */
 
 import javax.jms.Connection;
+import javax.jms.DeliveryMode;
 import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 import javax.jms.Topic;
@@ -41,8 +43,10 @@
     private boolean pauseBeforeShutdown;
     private boolean running;
     private Session session;
+
     private long sleepTime=0;
     private long receiveTimeOut=0;
+	private MessageProducer replyProducer;
 
     public static void main(String[] args) {
         ConsumerTool tool = new ConsumerTool();
@@ -88,6 +92,10 @@
             Connection connection = createConnection();
             connection.setExceptionListener(this);
             session = createSession(connection);
+            
+            replyProducer = session.createProducer(null);
+            replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+            
             MessageConsumer consumer = null;
             if (durable && topic) {
                 consumer = session.createDurableSubscriber((Topic) destination, consumerName);
@@ -133,6 +141,14 @@
             if(transacted) {
                 session.commit();
             }
+            
+            if ( message.getJMSReplyTo() !=null ) {            	
+            	replyProducer.send(message.getJMSReplyTo(), session.createTextMessage("Reply: "+message.getJMSMessageID()));
+                if(transacted) {
+                    session.commit();
+                }
+            }
+            
             /*
             if (++count % dumpCount == 0) {
                 dumpStats(connection);

Added: incubator/activemq/branches/activemq-4.0/assembly/src/release/example/src/RequesterTool.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/assembly/src/release/example/src/RequesterTool.java?rev=419754&view=auto
==============================================================================
--- incubator/activemq/branches/activemq-4.0/assembly/src/release/example/src/RequesterTool.java (added)
+++ incubator/activemq/branches/activemq-4.0/assembly/src/release/example/src/RequesterTool.java Thu Jul  6 17:54:47 2006
@@ -0,0 +1,182 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.Date;
+
+/**
+ * A simple tool for publishing messages
+ *
+ * @version $Revision: 1.2 $
+ */
+public class RequesterTool extends ToolSupport {
+
+    protected int messageCount = 10;
+    protected long sleepTime = 0L;
+    protected boolean verbose = true;
+    protected int messageSize = 255;
+    private long timeToLive;
+
+    public static void main(String[] args) {
+        runTool(args, new RequesterTool());
+    }
+
+    protected static void runTool(String[] args, RequesterTool tool) {
+        tool.clientID = null;
+        if (args.length > 0) {
+            tool.url = args[0];
+        }
+        if (args.length > 1) {
+            tool.topic = args[1].equalsIgnoreCase("true");
+        }
+        if (args.length > 2) {
+            tool.subject = args[2];
+        }
+        if (args.length > 3) {
+            tool.durable = args[3].equalsIgnoreCase("true");
+        }
+        if (args.length > 4) {
+            tool.messageCount = Integer.parseInt(args[4]);
+        }
+        if (args.length > 5) {
+            tool.messageSize = Integer.parseInt(args[5]);
+        }
+        if (args.length > 6) {
+            if( ! "null".equals(args[6]) ) { 
+                tool.clientID = args[6];
+            }
+        }
+        if (args.length > 7) {
+            tool.timeToLive = Long.parseLong(args[7]);
+        }
+        if (args.length > 8) {
+            tool.sleepTime = Long.parseLong(args[8]);
+        }
+        if (args.length > 9) {
+            tool.transacted = "true".equals(args[9]);
+        }
+        tool.run();
+    }
+
+    public void run() {
+        try {
+            System.out.println("Connecting to URL: " + url);
+            System.out.println("Publishing a Message with size " + messageSize + " to " + (topic ? "topic" : "queue") + ": " + subject);
+            System.out.println("Using " + (durable ? "durable" : "non-durable") + " publishing");
+            System.out.println("Sleeping between publish "+sleepTime+" ms");                
+            if( timeToLive!=0 ) {
+                System.out.println("Messages time to live "+timeToLive+" ms");                
+            }
+            Connection connection = createConnection();
+            Session session = createSession(connection);
+            MessageProducer producer = createProducer(session);
+            
+            Destination replyDest = null;
+            if( this.topic ) {
+            	replyDest = session.createTemporaryTopic();
+            } else {
+            	replyDest = session.createTemporaryQueue();
+            }
+            
+            System.out.println("Reply Destination: "+replyDest);                
+            MessageConsumer consumer = session.createConsumer(replyDest);
+            
+            requestLoop(session, producer, consumer, replyDest);
+
+            System.out.println("Done.");
+            close(connection, session);
+        }
+        catch (Exception e) {
+            System.out.println("Caught: " + e);
+            e.printStackTrace();
+        }
+    }
+
+    protected MessageProducer createProducer(Session session) throws JMSException {
+        MessageProducer producer = session.createProducer(destination);
+        if (durable) {
+            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+        }
+        else {
+            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+        }
+        if( timeToLive!=0 )
+            producer.setTimeToLive(timeToLive);
+        return producer;
+    }
+
+    protected void requestLoop(Session session, MessageProducer producer, MessageConsumer consumer, Destination replyDest) throws Exception {
+
+        for (int i = 0; i < messageCount || messageCount==0 ; i++) {
+
+
+            TextMessage message = session.createTextMessage(createMessageText(i));
+            message.setJMSReplyTo(replyDest);
+            
+            if (verbose) {
+                String msg = message.getText();
+                if (msg.length() > 50) {
+                    msg = msg.substring(0, 50) + "...";
+                }
+                System.out.println("Sending message: " + msg);
+            }
+            
+            producer.send(message);
+            if(transacted) {
+                session.commit();
+            }
+            
+            System.out.println("Waiting for reponse message...");
+            Message message2 = consumer.receive();
+            if( message2 instanceof TextMessage ) {
+            	System.out.println("Reponse message: "+((TextMessage)message2).getText());
+            } else {
+            	System.out.println("Reponse message: "+message2);
+            }
+            if(transacted) {
+                session.commit();
+            }
+            
+            Thread.sleep(sleepTime);
+            
+        }
+        
+    }
+
+    /**
+     * @param i
+     * @return
+     */
+    private String createMessageText(int index) {
+        StringBuffer buffer = new StringBuffer(messageSize);
+        buffer.append("Message: " + index + " sent at: " + new Date());
+        if (buffer.length() > messageSize) {
+            return buffer.substring(0, messageSize);
+        }
+        for (int i = buffer.length(); i < messageSize; i++) {
+            buffer.append(' ');
+        }
+        return buffer.toString();
+    }
+}