You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ar...@apache.org on 2007/11/14 10:38:06 UTC

svn commit: r594814 - in /incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf: JMSAsyncConsumer.java JMSConsumer.java JMSSyncConsumer.java MessageConsumerTest.java Options.java

Author: arnaudsimon
Date: Wed Nov 14 01:38:05 2007
New Revision: 594814

URL: http://svn.apache.org/viewvc?rev=594814&view=rev
Log:
Added distinction between sync and async consumers

Added:
    incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSAsyncConsumer.java
    incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSConsumer.java
    incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSSyncConsumer.java
      - copied, changed from r594797, incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSConsumer.java
Modified:
    incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageConsumerTest.java
    incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/Options.java

Added: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSAsyncConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSAsyncConsumer.java?rev=594814&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSAsyncConsumer.java (added)
+++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSAsyncConsumer.java Wed Nov 14 01:38:05 2007
@@ -0,0 +1,91 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.client.perf;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ *
+ *
+ */
+public class JMSAsyncConsumer implements MessageListener, JMSConsumer
+{
+    private static final Logger _logger = LoggerFactory.getLogger(JMSSyncConsumer.class);
+
+    private String _id;
+    private Connection _connection;
+    private Session _session;
+    private MessageConsumer _consumer;
+    private Destination _destination;
+    private boolean _transacted;
+    private int _ackMode = Session.AUTO_ACKNOWLEDGE;
+    private AtomicBoolean _run = new AtomicBoolean(true);
+    private long _currentMsgCount;
+
+    /* Not implementing transactions for first phase */
+    public JMSAsyncConsumer(String id,Connection connection, Destination destination,boolean transacted,int ackMode) throws Exception
+    {
+        _id = id;
+        _connection = connection;
+        _destination = destination;
+        _transacted = transacted;
+        _ackMode = ackMode;
+        _session = _connection.createSession(_transacted, _ackMode);
+        _consumer = _session.createConsumer(_destination);
+        _consumer.setMessageListener(this);
+    }
+
+
+
+    public void onMessage(Message message)
+    {
+        _currentMsgCount ++;
+    }
+
+    public void stopConsuming()
+    {
+        System.out.println("Producer received notification to stop");
+        try
+        {
+            _session.close();
+            _connection.close();
+        }
+        catch(Exception e)
+        {
+            _logger.error("Error Closing JMSSyncConsumer:"+ _id, e);
+        }
+    }
+
+    public String getId()
+    {
+        return _id;
+    }
+
+    /* Not worried about synchronizing as accuracy here is not that important.
+     * So if this method is invoked the count maybe off by a few digits.
+     * But when the test stops, this will always return the proper count.
+     */
+    public long getCurrentMessageCount()
+    {
+        return _currentMsgCount;
+    }
+}

Added: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSConsumer.java?rev=594814&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSConsumer.java (added)
+++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSConsumer.java Wed Nov 14 01:38:05 2007
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.client.perf;
+
+/**
+ *
+ *
+ */
+public interface JMSConsumer
+{
+     public String getId();
+     public void stopConsuming();
+        public long getCurrentMessageCount();
+}

Copied: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSSyncConsumer.java (from r594797, incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSConsumer.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSSyncConsumer.java?p2=incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSSyncConsumer.java&p1=incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSConsumer.java&r1=594797&r2=594814&rev=594814&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSConsumer.java (original)
+++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSSyncConsumer.java Wed Nov 14 01:38:05 2007
@@ -11,9 +11,9 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class JMSConsumer implements Runnable
+public class JMSSyncConsumer implements Runnable, JMSConsumer
 {
-    private static final Logger _logger = LoggerFactory.getLogger(JMSConsumer.class);
+    private static final Logger _logger = LoggerFactory.getLogger(JMSSyncConsumer.class);
 
     private String _id;
     private Connection _connection;
@@ -26,7 +26,7 @@
     private long _currentMsgCount;
 
     /* Not implementing transactions for first phase */
-    public JMSConsumer(String id,Connection connection, Destination destination,boolean transacted,int ackMode) throws Exception
+    public JMSSyncConsumer(String id,Connection connection, Destination destination,boolean transacted,int ackMode) throws Exception
     {
         _id = id;
         _connection = connection;
@@ -56,17 +56,17 @@
                 BytesMessage msg = (BytesMessage)_consumer.receive();
                 if (msg != null)
                 {
-                    long msgId = Integer.parseInt(msg.getJMSCorrelationID());
+                   // long msgId = Integer.parseInt(msg.getJMSCorrelationID());
                     /*if (_currentMsgCount+1 != msgId)
                     {
-                        _logger.error("Error : Message received out of order in JMSConsumer:" + _id + " message id was " + msgId + " expected: " + _currentMsgCount+1);
+                        _logger.error("Error : Message received out of order in JMSSyncConsumer:" + _id + " message id was " + msgId + " expected: " + _currentMsgCount+1);
                     }*/
                     _currentMsgCount ++;
                 }
             }
             catch(Exception e)
             {
-                _logger.error("Error Receiving message from JMSConsumer:" + _id, e);
+                _logger.error("Error Receiving message from JMSSyncConsumer:" + _id, e);
             }
         }
         try
@@ -76,7 +76,7 @@
         }
         catch(Exception e)
         {
-            _logger.error("Error Closing JMSConsumer:"+ _id, e);
+            _logger.error("Error Closing JMSSyncConsumer:"+ _id, e);
         }
     }
 

Modified: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageConsumerTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageConsumerTest.java?rev=594814&r1=594813&r2=594814&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageConsumerTest.java (original)
+++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageConsumerTest.java Wed Nov 14 01:38:05 2007
@@ -1,13 +1,11 @@
 package org.apache.qpid.client.perf;
 
 import java.io.FileWriter;
-import java.io.RandomAccessFile;
 import java.sql.Date;
 import java.text.SimpleDateFormat;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
-import javax.jms.Connection;
 import javax.jms.Destination;
 import javax.jms.Session;
 
@@ -21,7 +19,7 @@
     private static final Logger _logger = LoggerFactory.getLogger(MessageConsumerTest.class);
     private SimpleDateFormat df = new SimpleDateFormat("h:mm a");
 
-    private Map<Integer,JMSConsumer> _consumers = new ConcurrentHashMap<Integer,JMSConsumer>();
+    private Map<Integer, JMSConsumer> _consumers = new ConcurrentHashMap<Integer, JMSConsumer>();
     private int _count;
     String _logFileName;
     private long _gracePeriod = 5 * 60 * 1000;
@@ -30,30 +28,38 @@
 
     public void start() throws Exception
     {
-       this.parseOptions();
-       boolean useSameDest = true;
-       _logFileName = _logFilePath + "/MessageConsumerTest_" + System.currentTimeMillis();
-
-       // use each destination with a different producer
-       if (_producerCount == destArray.length)
-       {
-           useSameDest = false;
-       }
-       for (;_count < _producerCount;_count++)
-       {
-           createAndStartProducer(useSameDest?destArray[0]:destArray[_count]);
-       }
+        this.parseOptions();
+        boolean useSameDest = true;
+        _logFileName = _logFilePath + "/MessageConsumerTest_" + System.currentTimeMillis();
+
+        // use each destination with a different producer
+        if (_producerCount == destArray.length)
+        {
+            useSameDest = false;
+        }
+        for (; _count < _producerCount; _count++)
+        {
+            createAndStartConsumer(useSameDest ? destArray[0] : destArray[_count]);
+        }
     }
 
-    private void createAndStartProducer(String routingKey)throws Exception
+    private void createAndStartConsumer(String routingKey) throws Exception
     {
         AMQConnection con = ConnectionUtility.getInstance().getConnection();
         con.start();
-        Destination dest = new AMQTopic(con,routingKey);
-        JMSConsumer prod = new JMSConsumer(String.valueOf(_count),(Connection)con, dest, _transacted,Session.AUTO_ACKNOWLEDGE);
-        Thread t = new Thread(prod);
-        t.setName("JMSConsumer-"+_count);
-        t.start();
+        Destination dest = new AMQTopic(con, routingKey);
+        JMSConsumer prod;
+        if (_synchronous)
+        {
+            prod = new JMSSyncConsumer(String.valueOf(_count), con, dest, _transacted, Session.AUTO_ACKNOWLEDGE);
+            Thread t = new Thread((JMSSyncConsumer) prod);
+            t.setName("JMSSyncConsumer-" + _count);
+            t.start();
+        }
+        else
+        {
+            prod = new JMSAsyncConsumer(String.valueOf(_count), con, dest, _transacted, Session.AUTO_ACKNOWLEDGE);
+        }
         _consumers.put(_count, prod);
     }
 
@@ -71,12 +77,12 @@
         runReaper(false);
         try
         {
-            while(run)
+            while (run)
             {
                 Thread.sleep(_logDuration);
                 runReaper(false);
 
-                if(System.currentTimeMillis() + _gracePeriod - _startTime > _expiry )
+                if (System.currentTimeMillis() + _gracePeriod - _startTime > _expiry)
                 {
                     // time to stop the test.
                     for (Integer id : _consumers.keySet())
@@ -91,7 +97,7 @@
         }
         catch (InterruptedException e)
         {
-            _logger.error("The timer thread exited",e);
+            _logger.error("The timer thread exited", e);
         }
     }
 
@@ -99,11 +105,11 @@
     {
         try
         {
-            FileWriter _logFile = new FileWriter(_logFileName + ".csv",true);
+            FileWriter _logFile = new FileWriter(_logFileName + ".csv", true);
             for (Integer id : _consumers.keySet())
             {
                 JMSConsumer prod = _consumers.get(id);
-                StringBuffer buf = new StringBuffer("JMSConsumer(").append(prod.getId()).append("),");
+                StringBuffer buf = new StringBuffer("JMSSyncConsumer(").append(prod.getId()).append("),");
                 Date d = new Date(System.currentTimeMillis());
                 buf.append(df.format(d)).append(",");
                 buf.append(d.getTime()).append(",");
@@ -113,21 +119,21 @@
             }
             _logFile.close();
 
-            FileWriter _memoryLog = new FileWriter(_logFileName + "_memory.csv",true);
-            StringBuffer buf = new StringBuffer("JMSConsumer,");
+            FileWriter _memoryLog = new FileWriter(_logFileName + "_memory.csv", true);
+            StringBuffer buf = new StringBuffer("JMSSyncConsumer,");
             Date d = new Date(System.currentTimeMillis());
             buf.append(df.format(d)).append(",");
             buf.append(d.getTime()).append(",");
             buf.append(totalMsgCount).append(",");
-            buf.append(Runtime.getRuntime().totalMemory() -Runtime.getRuntime().freeMemory()).append("\n");
+            buf.append(Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()).append("\n");
             _memoryLog.write(buf.toString());
             _memoryLog.close();
             if (printSummary)
             {
                 double totaltime = d.getTime() - _startTime;
                 double dCount = totalMsgCount;
-                double ratio = (dCount/totaltime)*1000;
-                FileWriter _summaryLog = new FileWriter(_logFileName + "_Summary",true);
+                double ratio = (dCount / totaltime) * 1000;
+                FileWriter _summaryLog = new FileWriter(_logFileName + "_Summary", true);
                 buf = new StringBuffer("MessageConsumerTest \n Test started at : ");
                 buf.append(df.format(new Date(_startTime))).append("\n Test finished at : ");
                 d = new Date(System.currentTimeMillis());
@@ -140,9 +146,9 @@
                 _summaryLog.close();
             }
         }
-        catch(Exception e)
+        catch (Exception e)
         {
-            _logger.error("Error printing info to the log file",e);
+            _logger.error("Error printing info to the log file", e);
         }
     }
 
@@ -154,7 +160,7 @@
             test.start();
             test.startTimerThread();
         }
-        catch(Exception e)
+        catch (Exception e)
         {
             e.printStackTrace();
         }

Modified: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/Options.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/Options.java?rev=594814&r1=594813&r2=594814&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/Options.java (original)
+++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/Options.java Wed Nov 14 01:38:05 2007
@@ -4,6 +4,7 @@
 {
     int _messageSize;
     boolean _transacted;
+    boolean _synchronous;
     String[] destArray;
     int _producerCount;
     int _consumerCount;
@@ -25,6 +26,7 @@
     public void parseOptions()
     {
         _messageSize = Integer.parseInt(System.getProperty("messageSize","100"));
+        _synchronous =Boolean.parseBoolean( System.getProperty("synchronous", "false"));
         _transacted = false;
         String destinations = System.getProperty("destinations");
         destArray = destinations.split(",");
@@ -37,7 +39,7 @@
 
         System.out.println("============= Test Data ===================");
         System.out.println("Total no of producers  : " + _producerCount);
-        System.out.println("Total no of consumer   : " + _consumerCount);
+        System.out.println(_synchronous? "Total no of synchronous consumers   : " : "Total no of asynchronous consumers   :" + _consumerCount);
         System.out.println("Log Frequency in mins  : " + _logDuration/(1000*60));
         System.out.println("Log file path          : " + _logFilePath);
         System.out.println("Test Duration          : " + printTestDuration());