You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2014/08/28 18:34:46 UTC
svn commit: r1621164 - in
/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/temp: ./ Drain.java
Spout.java
Author: robbie
Date: Thu Aug 28 16:34:45 2014
New Revision: 1621164
URL: http://svn.apache.org/r1621164
Log:
QPIDJMS-28: add initial utils drain and spout, temp location
Added:
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/temp/
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/temp/Drain.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/temp/Spout.java
Added: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/temp/Drain.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/temp/Drain.java?rev=1621164&view=auto
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/temp/Drain.java (added)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/temp/Drain.java Thu Aug 28 16:34:45 2014
@@ -0,0 +1,129 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.jms.impl.temp;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.jms.impl.ConnectionImpl;
+
+public class Drain
+{
+ private static final String DEFAULT_USER = "guest";
+ private static final String DEFAULT_PASSWORD = "guest";
+ private static final int DEFAULT_PORT = 5672;
+ private static final String DEFAULT_HOST = "localhost";
+ private static final int DEFAULT_COUNT = 10000;
+
+ private String _hostname;
+ private int _port;
+ private int _count;
+ private String _username;
+ private String _password;
+
+ public Drain(int count, String hostname, int port)
+ {
+ _count = count;
+ _hostname = hostname;
+ _port = port;
+ _username = DEFAULT_USER;
+ _password = DEFAULT_PASSWORD;
+ }
+
+ public void runTest()
+ {
+ try
+ {
+ Connection connection = new ConnectionImpl("drain", _hostname, _port, _username, _password);
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Destination queue = session.createQueue("myQueue");
+ MessageConsumer messageConsumer = session.createConsumer(queue);
+
+ long start = System.currentTimeMillis();
+
+ int actualCount = 0;
+ boolean deductTimeout = false;
+ int timeout = 1000;
+ for(int i = 1; i <= _count; i++, actualCount++)
+ {
+ TextMessage message = (TextMessage)messageConsumer.receive(timeout);
+ if(message == null)
+ {
+ System.out.println("Message not received, stopping");
+ deductTimeout = true;
+ break;
+ }
+ if(i % 100 == 0)
+ {
+ System.out.println("Got message " + i + ":" + message.getText());
+ }
+ }
+
+ long finish = System.currentTimeMillis();
+ long taken = finish - start;
+ if(deductTimeout)
+ {
+ taken -= timeout;
+ }
+ System.out.println("Received " + actualCount +" messages in " + taken + "ms");
+
+ connection.close();
+ System.exit(0);//TODO: remove
+ }
+ catch (Exception exp)
+ {
+ exp.printStackTrace();
+ System.exit(1);
+ }
+ }
+
+ public static void main(String[] argv) throws Exception
+ {
+ List<String> switches = new ArrayList<String>();
+ List<String> args = new ArrayList<String>();
+ for (String s : argv)
+ {
+ if (s.startsWith("-"))
+ {
+ switches.add(s);
+ }
+ else
+ {
+ args.add(s);
+ }
+ }
+
+ int count = args.isEmpty() ? DEFAULT_COUNT : Integer.parseInt(args.remove(0));
+ String hostname = args.isEmpty() ? DEFAULT_HOST : args.remove(0);
+ int port = args.isEmpty() ? DEFAULT_PORT : Integer.parseInt(args.remove(0));
+
+ new Drain(count, hostname, port).runTest();
+ }
+}
\ No newline at end of file
Added: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/temp/Spout.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/temp/Spout.java?rev=1621164&view=auto
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/temp/Spout.java (added)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/temp/Spout.java Thu Aug 28 16:34:45 2014
@@ -0,0 +1,122 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.jms.impl.temp;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.jms.impl.ConnectionImpl;
+
+public class Spout
+{
+ private static final String DEFAULT_USER = "guest";
+ private static final String DEFAULT_PASSWORD = "guest";
+ private static final int DEFAULT_PORT = 5672;
+ private static final String DEFAULT_HOST = "localhost";
+ private static final int DEFAULT_COUNT = 10000;
+
+ private String _hostname;
+ private int _port;
+ private int _count;
+ private String _username;
+ private String _password;
+ private boolean _persistent;
+
+ public Spout(int count, String hostname, int port, boolean persistent)
+ {
+ _count = count;
+ _hostname = hostname;
+ _port = port;
+ _persistent = persistent;
+ _username = DEFAULT_USER;
+ _password = DEFAULT_PASSWORD;
+ }
+
+ public void runTest()
+ {
+ try
+ {
+ Connection connection = new ConnectionImpl("spout", _hostname, _port, _username, _password);
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Destination queue = session.createQueue("myQueue");
+ MessageProducer messageProducer = session.createProducer(queue);
+
+ int dekiveryMode = _persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT;
+
+ long start = System.currentTimeMillis();
+ for(int i = 1; i <= _count; i++)
+ {
+ TextMessage message = session.createTextMessage("Hello world!");
+ messageProducer.send(message, dekiveryMode, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
+
+ if(i % 100 == 0)
+ {
+ System.out.println("Sent message " + i + ":" + message.getText());
+ }
+ }
+
+ long finish = System.currentTimeMillis();
+ long taken = finish - start;
+ System.out.println("Sent " + _count +" messages in " + taken + "ms");
+
+ connection.close();
+ System.exit(0);//TODO: remove
+ }
+ catch (Exception exp)
+ {
+ exp.printStackTrace();
+ System.exit(1);
+ }
+ }
+
+ public static void main(String[] argv) throws Exception
+ {
+ List<String> switches = new ArrayList<String>();
+ List<String> args = new ArrayList<String>();
+ for (String s : argv)
+ {
+ if (s.startsWith("-"))
+ {
+ switches.add(s);
+ }
+ else
+ {
+ args.add(s);
+ }
+ }
+
+ int count = args.isEmpty() ? DEFAULT_COUNT : Integer.parseInt(args.remove(0));
+ String hostname = args.isEmpty() ? DEFAULT_HOST : args.remove(0);
+ int port = args.isEmpty() ? DEFAULT_PORT : Integer.parseInt(args.remove(0));
+ boolean persistent = switches.contains("-p");
+
+ new Spout(count, hostname, port, persistent).runTest();
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org