You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2011/06/28 21:22:27 UTC

svn commit: r1140793 - in /qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools: PerfProducer.java TestParams.java

Author: rajith
Date: Tue Jun 28 19:22:27 2011
New Revision: 1140793

URL: http://svn.apache.org/viewvc?rev=1140793&view=rev
Log:
NO-JIRA Added the ability to specify a message type. Currently on bytes
and text messages are supported. Hoping to add map and object message
support in the near future. You could use -Dmsg_type=TEXT|BYTES to
specify the type. The default is BYTES.

Modified:
    qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java
    qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java

Modified: qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java?rev=1140793&r1=1140792&r2=1140793&view=diff
==============================================================================
--- qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java (original)
+++ qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java Tue Jun 28 19:22:27 2011
@@ -56,14 +56,44 @@ public class PerfProducer extends PerfBa
 {
     MessageProducer producer;
     Message msg;
-    byte[] payload;
-    List<byte[]> payloads;
+    Object payload;
+    List<Object> payloads;
     boolean cacheMsg = false;
     boolean randomMsgSize = false;
     boolean durable = false;
     Random random;
     int msgSizeRange = 1024;
-    
+
+    enum MessageType {
+        BYTE, TEXT, MAP, OBJECT;
+
+        public static MessageType getType(String s) throws Exception
+        {
+            if ("text".equalsIgnoreCase(s))
+            {
+                return TEXT;
+            }
+            else if ("byte".equalsIgnoreCase(s))
+            {
+                return BYTE;
+            }
+            /*else if ("map".equalsIgnoreCase(s))
+            {
+                return MAP;
+            }
+            else if ("object".equalsIgnoreCase(s))
+            {
+                return OBJECT;
+            }*/
+            else
+            {
+                throw new Exception("Unsupported message type");
+            }
+        }
+    };
+
+    MessageType msgType = MessageType.BYTE;
+
     public PerfProducer()
     {
         super();
@@ -75,14 +105,16 @@ public class PerfProducer extends PerfBa
         feedbackDest = session.createTemporaryQueue();
 
         durable = params.isDurable();
-        
+        msgType = MessageType.getType(params.getMessageType());
+
+        System.out.println("Using " + msgType + " messages");
+
         // if message caching is enabled we pre create the message
         // else we pre create the payload
         if (params.isCacheMessage())
         {
             cacheMsg = true;
-            
-            msg = MessageFactory.createBytesMessage(session, params.getMsgSize());
+            msg = createMessage(createPayload(params.getMsgSize()));
             msg.setJMSDeliveryMode(durable?
                                    DeliveryMode.PERSISTENT :
                                    DeliveryMode.NON_PERSISTENT
@@ -93,16 +125,16 @@ public class PerfProducer extends PerfBa
             random = new Random(20080921);
             randomMsgSize = true;
             msgSizeRange = params.getMsgSize();
-            payloads = new ArrayList<byte[]>(msgSizeRange);
-            
+            payloads = new ArrayList<Object>(msgSizeRange);
+
             for (int i=0; i < msgSizeRange; i++)
             {
-                payloads.add(MessageFactory.createMessagePayload(i).getBytes());
+                payloads.add(createPayload(i));
             }
-        }        
+        }
         else
         {
-            payload = MessageFactory.createMessagePayload(params.getMsgSize()).getBytes();
+            payload = createPayload(params.getMsgSize());
         }
 
         producer = session.createProducer(dest);
@@ -110,6 +142,33 @@ public class PerfProducer extends PerfBa
         producer.setDisableMessageTimestamp(params.isDisableTimestamp());
     }
 
+    Object createPayload(int size)
+    {
+        if (msgType == MessageType.TEXT)
+        {
+           return MessageFactory.createMessagePayload(size);
+        }
+        else
+        {
+            return MessageFactory.createMessagePayload(size).getBytes();
+        }
+    }
+
+    Message createMessage(Object payload) throws Exception
+    {
+        if (msgType == MessageType.TEXT)
+        {
+            return session.createTextMessage((String)payload);
+        }
+        else
+        {
+            BytesMessage m = session.createBytesMessage();
+            m.writeBytes((byte[])payload);
+            return m;
+        }
+    }
+
+
     protected Message getNextMessage() throws Exception
     {
         if (cacheMsg)
@@ -117,22 +176,22 @@ public class PerfProducer extends PerfBa
             return msg;
         }
         else
-        {            
-            msg = session.createBytesMessage();
-            
+        {
+            Message m;
+
             if (!randomMsgSize)
             {
-                ((BytesMessage)msg).writeBytes(payload);
+                m = createMessage(payload);
             }
             else
             {
-                ((BytesMessage)msg).writeBytes(payloads.get(random.nextInt(msgSizeRange)));
+                m = createMessage(payloads.get(random.nextInt(msgSizeRange)));
             }
-            msg.setJMSDeliveryMode(durable?
+            m.setJMSDeliveryMode(durable?
                     DeliveryMode.PERSISTENT :
                     DeliveryMode.NON_PERSISTENT
                    );
-            return msg;
+            return m;
         }
     }
 
@@ -247,16 +306,16 @@ public class PerfProducer extends PerfBa
                 prod.test();
             }
         };
-        
+
         Thread t;
         try
         {
-            t = Threading.getThreadFactory().createThread(r);                      
+            t = Threading.getThreadFactory().createThread(r);
         }
         catch(Exception e)
         {
             throw new Error("Error creating producer thread",e);
         }
-        t.start();            
+        t.start();
     }
 }
\ No newline at end of file

Modified: qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java?rev=1140793&r1=1140792&r2=1140793&view=diff
==============================================================================
--- qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java (original)
+++ qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java Tue Jun 28 19:22:27 2011
@@ -62,12 +62,14 @@ public class TestParams
     private int msg_count = 10;
 
     private int warmup_count = 1;
-    
+
     private boolean random_msg_size = false;
 
+    private String msgType = "byte";
+
     public TestParams()
     {
-     
+
         url = System.getProperty("url",url);
         host = System.getProperty("host","");
         port = Integer.getInteger("port", -1);
@@ -85,6 +87,7 @@ public class TestParams
         msg_count = Integer.getInteger("msg_count",msg_count);
         warmup_count = Integer.getInteger("warmup_count",warmup_count);
         random_msg_size = Boolean.getBoolean("random_msg_size");
+        msgType = System.getProperty("msg_type","byte");
     }
 
     public String getUrl()
@@ -161,10 +164,14 @@ public class TestParams
     {
         return disableTimestamp;
     }
-    
+
     public boolean isRandomMsgSize()
     {
         return random_msg_size;
     }
 
+    public String getMessageType()
+    {
+        return msgType;
+    }
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org