You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/02/12 16:25:18 UTC

svn commit: r506483 - in /incubator/qpid/branches/perftesting_persistent: ./ qpid/java/client/src/main/java/org/apache/qpid/client/ qpid/java/client/src/main/java/org/apache/qpid/client/message/ qpid/java/perftests/src/main/java/org/apache/qpid/topic/

Author: rgreig
Date: Mon Feb 12 07:25:17 2007
New Revision: 506483

URL: http://svn.apache.org/viewvc?view=rev&rev=506483
Log:
Merged revisions 506439 via svnmerge from 
https://svn.apache.org/repos/asf/incubator/qpid/trunk

........
  r506439 | rgreig | 2007-02-12 13:25:36 +0000 (Mon, 12 Feb 2007) | 3 lines
  
  (Patch submitted by Rupert Smith) Qpid-360 fixes.
  Message type defaults to ByteMessage when not specified.
  Unknown destination type is used as default when not specified.
........

Modified:
    incubator/qpid/branches/perftesting_persistent/   (props changed)
    incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Listener.java
    incubator/qpid/branches/perftesting_persistent/qpid/java/perftests/src/main/java/org/apache/qpid/topic/MessageFactory.java

Propchange: incubator/qpid/branches/perftesting_persistent/
------------------------------------------------------------------------------
--- svnmerge-integrated (original)
+++ svnmerge-integrated Mon Feb 12 07:25:17 2007
@@ -1 +1 @@
-/incubator/qpid/trunk:1-504056,504915-505241,505243-505256,505892
+/incubator/qpid/trunk:1-504056,504915-505241,505243-505256,505892,506439

Modified: incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?view=diff&rev=506483&r1=506482&r2=506483
==============================================================================
--- incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Mon Feb 12 07:25:17 2007
@@ -20,17 +20,20 @@
  */
 package org.apache.qpid.client;
 
+import java.io.UnsupportedEncodingException;
+
+import javax.jms.*;
+
 import org.apache.log4j.Logger;
+
 import org.apache.mina.common.ByteBuffer;
+
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.message.AbstractJMSMessage;
 import org.apache.qpid.client.message.MessageConverter;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.framing.*;
 
-import javax.jms.*;
-import java.io.UnsupportedEncodingException;
-
 public class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer
 {
     protected final Logger _logger = Logger.getLogger(getClass());
@@ -101,9 +104,9 @@
     private final boolean _waitUntilSent;
     private static final ContentBody[] NO_CONTENT_BODIES = new ContentBody[0];
 
-    protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted,
-                                   int channelId, AMQSession session, AMQProtocolHandler protocolHandler,
-                                   long producerId, boolean immediate, boolean mandatory, boolean waitUntilSent)
+    protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
+                                   AMQSession session, AMQProtocolHandler protocolHandler, long producerId,
+                                   boolean immediate, boolean mandatory, boolean waitUntilSent)
     {
         _connection = connection;
         _destination = destination;
@@ -116,6 +119,7 @@
         {
             declareDestination(destination);
         }
+
         _immediate = immediate;
         _mandatory = mandatory;
         _waitUntilSent = waitUntilSent;
@@ -134,18 +138,18 @@
         // Declare the exchange
         // Note that the durable and internal arguments are ignored since passive is set to false
         // TODO: Be aware of possible changes to parameter order as versions change.
-        AMQFrame declare = ExchangeDeclareBody.createAMQFrame(_channelId,
-            _protocolHandler.getProtocolMajorVersion(),
-            _protocolHandler.getProtocolMinorVersion(),
-            null,	// arguments
-            false,	// autoDelete
-            false,	// durable
-            destination.getExchangeName(),	// exchange
-            false,	// internal
-            true,	// nowait
-            false,	// passive
-            0,	// ticket
-            destination.getExchangeClass());	// type
+        AMQFrame declare =
+            ExchangeDeclareBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(),
+                                               _protocolHandler.getProtocolMinorVersion(), null, // arguments
+                                               false, // autoDelete
+                                               false, // durable
+                                               destination.getExchangeName(), // exchange
+                                               false, // internal
+                                               true, // nowait
+                                               false, // passive
+                                               _session.getTicket(), // ticket
+                                               destination.getExchangeClass()); // type
+
         _protocolHandler.writeFrame(declare);
     }
 
@@ -159,6 +163,7 @@
     public boolean getDisableMessageID() throws JMSException
     {
         checkNotClosed();
+
         // Always false for AMQP
         return false;
     }
@@ -172,39 +177,44 @@
     public boolean getDisableMessageTimestamp() throws JMSException
     {
         checkNotClosed();
+
         return _disableTimestamps;
     }
 
     public void setDeliveryMode(int i) throws JMSException
     {
         checkPreConditions();
-        if (i != DeliveryMode.NON_PERSISTENT && i != DeliveryMode.PERSISTENT)
+        if ((i != DeliveryMode.NON_PERSISTENT) && (i != DeliveryMode.PERSISTENT))
         {
-            throw new JMSException("DeliveryMode must be either NON_PERSISTENT or PERSISTENT. Value of " + i +
-                    " is illegal");
+            throw new JMSException("DeliveryMode must be either NON_PERSISTENT or PERSISTENT. Value of " + i
+                                   + " is illegal");
         }
+
         _deliveryMode = i;
     }
 
     public int getDeliveryMode() throws JMSException
     {
         checkNotClosed();
+
         return _deliveryMode;
     }
 
     public void setPriority(int i) throws JMSException
     {
         checkPreConditions();
-        if (i < 0 || i > 9)
+        if ((i < 0) || (i > 9))
         {
             throw new IllegalArgumentException("Priority of " + i + " is illegal. Value must be in range 0 to 9");
         }
+
         _messagePriority = i;
     }
 
     public int getPriority() throws JMSException
     {
         checkNotClosed();
+
         return _messagePriority;
     }
 
@@ -215,18 +225,21 @@
         {
             throw new IllegalArgumentException("Time to live must be non-negative - supplied value was " + l);
         }
+
         _timeToLive = l;
     }
 
     public long getTimeToLive() throws JMSException
     {
         checkNotClosed();
+
         return _timeToLive;
     }
 
     public Destination getDestination() throws JMSException
     {
         checkNotClosed();
+
         return _destination;
     }
 
@@ -241,11 +254,9 @@
         checkPreConditions();
         checkInitialDestination();
 
-
         synchronized (_connection.getFailoverMutex())
         {
-            sendImpl(_destination, message, _deliveryMode, _messagePriority, _timeToLive,
-                     _mandatory, _immediate);
+            sendImpl(_destination, message, _deliveryMode, _messagePriority, _timeToLive, _mandatory, _immediate);
         }
     }
 
@@ -256,8 +267,7 @@
 
         synchronized (_connection.getFailoverMutex())
         {
-            sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive,
-                     _mandatory, _immediate);
+            sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive, _mandatory, _immediate);
         }
     }
 
@@ -267,20 +277,17 @@
         checkInitialDestination();
         synchronized (_connection.getFailoverMutex())
         {
-            sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive,
-                     _mandatory, immediate);
+            sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive, _mandatory, immediate);
         }
     }
 
-    public void send(Message message, int deliveryMode, int priority,
-                     long timeToLive) throws JMSException
+    public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException
     {
         checkPreConditions();
         checkInitialDestination();
         synchronized (_connection.getFailoverMutex())
         {
-            sendImpl(_destination, message, deliveryMode, priority, timeToLive, _mandatory,
-                     _immediate);
+            sendImpl(_destination, message, deliveryMode, priority, timeToLive, _mandatory, _immediate);
         }
     }
 
@@ -291,69 +298,60 @@
         synchronized (_connection.getFailoverMutex())
         {
             validateDestination(destination);
-            sendImpl((AMQDestination) destination, message, _deliveryMode, _messagePriority, _timeToLive,
-                     _mandatory, _immediate);
+            sendImpl((AMQDestination) destination, message, _deliveryMode, _messagePriority, _timeToLive, _mandatory,
+                     _immediate);
         }
     }
 
-    public void send(Destination destination, Message message, int deliveryMode,
-                     int priority, long timeToLive)
-            throws JMSException
+    public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive)
+              throws JMSException
     {
         checkPreConditions();
         checkDestination(destination);
         synchronized (_connection.getFailoverMutex())
         {
             validateDestination(destination);
-            sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive,
-                     _mandatory, _immediate);
+            sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, _mandatory, _immediate);
         }
     }
 
-    public void send(Destination destination, Message message, int deliveryMode,
-                     int priority, long timeToLive, boolean mandatory)
-            throws JMSException
+    public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive,
+                     boolean mandatory) throws JMSException
     {
         checkPreConditions();
         checkDestination(destination);
         synchronized (_connection.getFailoverMutex())
         {
             validateDestination(destination);
-            sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive,
-                     mandatory, _immediate);
+            sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, mandatory, _immediate);
         }
     }
 
-    public void send(Destination destination, Message message, int deliveryMode,
-                     int priority, long timeToLive, boolean mandatory, boolean immediate)
-            throws JMSException
+    public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive,
+                     boolean mandatory, boolean immediate) throws JMSException
     {
         checkPreConditions();
         checkDestination(destination);
         synchronized (_connection.getFailoverMutex())
         {
             validateDestination(destination);
-            sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive,
-                     mandatory, immediate);
+            sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, mandatory, immediate);
         }
     }
 
-    public void send(Destination destination, Message message, int deliveryMode,
-                     int priority, long timeToLive, boolean mandatory,
-                     boolean immediate, boolean waitUntilSent)
-            throws JMSException
+    public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive,
+                     boolean mandatory, boolean immediate, boolean waitUntilSent) throws JMSException
     {
         checkPreConditions();
         checkDestination(destination);
         synchronized (_connection.getFailoverMutex())
         {
             validateDestination(destination);
-            sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive,
-                     mandatory, immediate, waitUntilSent);
+            sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, mandatory, immediate,
+                     waitUntilSent);
         }
     }
 
-
     private AbstractJMSMessage convertToNativeMessage(Message message) throws JMSException
     {
         if (message instanceof AbstractJMSMessage)
@@ -366,23 +364,23 @@
 
             if (message instanceof BytesMessage)
             {
-                newMessage = new MessageConverter((BytesMessage)message).getConvertedMessage();
+                newMessage = new MessageConverter((BytesMessage) message).getConvertedMessage();
             }
             else if (message instanceof MapMessage)
             {
-                newMessage = new MessageConverter((MapMessage)message).getConvertedMessage();
+                newMessage = new MessageConverter((MapMessage) message).getConvertedMessage();
             }
             else if (message instanceof ObjectMessage)
             {
-                newMessage = new MessageConverter((ObjectMessage)message).getConvertedMessage();
+                newMessage = new MessageConverter((ObjectMessage) message).getConvertedMessage();
             }
             else if (message instanceof TextMessage)
             {
-               newMessage = new MessageConverter((TextMessage)message).getConvertedMessage();
+                newMessage = new MessageConverter((TextMessage) message).getConvertedMessage();
             }
             else if (message instanceof StreamMessage)
             {
-                newMessage = new MessageConverter((StreamMessage)message).getConvertedMessage();
+                newMessage = new MessageConverter((StreamMessage) message).getConvertedMessage();
             }
             else
             {
@@ -395,24 +393,25 @@
             }
             else
             {
-                throw new JMSException("Unable to send message, due to class conversion error: " + message.getClass().getName());
+                throw new JMSException("Unable to send message, due to class conversion error: "
+                                       + message.getClass().getName());
             }
         }
     }
 
-
     private void validateDestination(Destination destination) throws JMSException
     {
         if (!(destination instanceof AMQDestination))
         {
-            throw new JMSException("Unsupported destination class: " +
-                    (destination != null ? destination.getClass() : null));
+            throw new JMSException("Unsupported destination class: "
+                                   + ((destination != null) ? destination.getClass() : null));
         }
+
         declareDestination((AMQDestination) destination);
     }
 
-    protected void sendImpl(AMQDestination destination, Message message, int deliveryMode, int priority,
-                            long timeToLive, boolean mandatory, boolean immediate) throws JMSException
+    protected void sendImpl(AMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,
+                            boolean mandatory, boolean immediate) throws JMSException
     {
         sendImpl(destination, message, deliveryMode, priority, timeToLive, mandatory, immediate, _waitUntilSent);
     }
@@ -429,21 +428,20 @@
      * @param immediate
      * @throws JMSException
      */
-    protected void sendImpl(AMQDestination destination, Message origMessage, int deliveryMode, int priority,
-                            long timeToLive, boolean mandatory, boolean immediate, boolean wait) throws JMSException
+    protected void sendImpl(AMQDestination destination, Message origMessage, int deliveryMode, int priority, long timeToLive,
+                            boolean mandatory, boolean immediate, boolean wait) throws JMSException
     {
         checkTemporaryDestination(destination);
         origMessage.setJMSDestination(destination);
 
-
         AbstractJMSMessage message = convertToNativeMessage(origMessage);
 
         int type;
-        if(destination instanceof Topic)
+        if (destination instanceof Topic)
         {
             type = AMQDestination.TOPIC_TYPE;
         }
-        else if(destination instanceof Queue)
+        else if (destination instanceof Queue)
         {
             type = AMQDestination.QUEUE_TYPE;
         }
@@ -452,22 +450,19 @@
             type = AMQDestination.UNKNOWN_TYPE;
         }
 
-        message.getJmsHeaders().setInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName(),
-                                               type);
+        message.getJmsHeaders().setInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName(), type);
 
         // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
         // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
         // Be aware of possible changes to parameter order as versions change.
-        AMQFrame publishFrame = BasicPublishBody.createAMQFrame(_channelId,
-            _protocolHandler.getProtocolMajorVersion(),
-            _protocolHandler.getProtocolMinorVersion(),	
-            destination.getExchangeName(),	// exchange
-            immediate,	// immediate
-            mandatory,	// mandatory
-            destination.getRoutingKey(),	// routingKey
-            0);	// ticket
-
-
+        AMQFrame publishFrame =
+            BasicPublishBody.createAMQFrame(
+                _channelId, _protocolHandler.getProtocolMajorVersion(), _protocolHandler.getProtocolMinorVersion(),
+                destination.getExchangeName(), // exchange
+                immediate, // immediate
+                mandatory, // mandatory
+                destination.getRoutingKey(), // routingKey
+                _session.getTicket()); // ticket
 
         message.prepareForSending();
         ByteBuffer payload = message.getData();
@@ -487,6 +482,7 @@
                 contentHeaderProperties.setExpiration(0);
             }
         }
+
         contentHeaderProperties.setDeliveryMode((byte) deliveryMode);
         contentHeaderProperties.setPriority((byte) priority);
 
@@ -494,12 +490,12 @@
         final int contentBodyFrameCount = calculateContentBodyFrameCount(payload);
         final AMQFrame[] frames = new AMQFrame[2 + contentBodyFrameCount];
 
-        if(payload != null)
+        if (payload != null)
         {
             createContentBodies(payload, frames, 2, _channelId);
         }
 
-        if (contentBodyFrameCount != 0 && _logger.isDebugEnabled())
+        if ((contentBodyFrameCount != 0) && _logger.isDebugEnabled())
         {
             _logger.debug("Sending content body frames to " + destination);
         }
@@ -508,12 +504,10 @@
         // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
         // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
         AMQFrame contentHeaderFrame =
-                ContentHeaderBody.createAMQFrame(_channelId,
-                                                 BasicConsumeBody.getClazz(_protocolHandler.getProtocolMajorVersion(),
-                                                                           _protocolHandler.getProtocolMinorVersion()), 
-                                                 0,
-                                                 contentHeaderProperties,
-                                                 size);
+            ContentHeaderBody.createAMQFrame(_channelId,
+                                             BasicConsumeBody.getClazz(_protocolHandler.getProtocolMajorVersion(),
+                                                                       _protocolHandler.getProtocolMinorVersion()), 0,
+                                             contentHeaderProperties, size);
         if (_logger.isDebugEnabled())
         {
             _logger.debug("Sending content header frame to " + destination);
@@ -524,7 +518,6 @@
         CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames);
         _protocolHandler.writeFrame(compositeFrame, wait);
 
-
         if (message != origMessage)
         {
             _logger.debug("Updating original message");
@@ -538,16 +531,17 @@
 
     private void checkTemporaryDestination(AMQDestination destination) throws JMSException
     {
-        if(destination instanceof TemporaryDestination)
+        if (destination instanceof TemporaryDestination)
         {
             _logger.debug("destination is temporary destination");
             TemporaryDestination tempDest = (TemporaryDestination) destination;
-            if(tempDest.getSession().isClosed())
+            if (tempDest.getSession().isClosed())
             {
                 _logger.debug("session is closed");
                 throw new JMSException("Session for temporary destination has been closed");
             }
-            if(tempDest.isDeleted())
+
+            if (tempDest.isDeleted())
             {
                 _logger.debug("destination is deleted");
                 throw new JMSException("Cannot send to a deleted temporary destination");
@@ -567,9 +561,9 @@
     private void createContentBodies(ByteBuffer payload, AMQFrame[] frames, int offset, int channelId)
     {
 
-        if (frames.length == offset + 1)
+        if (frames.length == (offset + 1))
         {
-            frames[offset] = ContentBody.createAMQFrame(channelId,new ContentBody(payload));
+            frames[offset] = ContentBody.createAMQFrame(channelId, new ContentBody(payload));
         }
         else
         {
@@ -578,10 +572,10 @@
             long remaining = payload.remaining();
             for (int i = offset; i < frames.length; i++)
             {
-                payload.position((int) framePayloadMax * (i-offset));
+                payload.position((int) framePayloadMax * (i - offset));
                 int length = (remaining >= framePayloadMax) ? (int) framePayloadMax : (int) remaining;
                 payload.limit(payload.position() + length);
-                frames[i] = ContentBody.createAMQFrame(channelId,new ContentBody(payload.slice()));
+                frames[i] = ContentBody.createAMQFrame(channelId, new ContentBody(payload.slice()));
 
                 remaining -= length;
             }
@@ -594,7 +588,7 @@
         // we substract one from the total frame maximum size to account for the end of frame marker in a body frame
         // (0xCE byte).
         int frameCount;
-        if(payload == null || payload.remaining() == 0)
+        if ((payload == null) || (payload.remaining() == 0))
         {
             frameCount = 0;
         }
@@ -602,9 +596,10 @@
         {
             int dataLength = payload.remaining();
             final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1;
-            int lastFrame = (dataLength % framePayloadMax) > 0 ? 1 : 0;
+            int lastFrame = ((dataLength % framePayloadMax) > 0) ? 1 : 0;
             frameCount = (int) (dataLength / framePayloadMax) + lastFrame;
         }
+
         return frameCount;
     }
 
@@ -624,7 +619,7 @@
     {
         checkNotClosed();
 
-        if (_session == null || _session.isClosed())
+        if ((_session == null) || _session.isClosed())
         {
             throw new javax.jms.IllegalStateException("Invalid Session");
         }
@@ -640,9 +635,10 @@
 
     private void checkDestination(Destination suppliedDestination) throws InvalidDestinationException
     {
-        if (_destination != null && suppliedDestination != null)
+        if ((_destination != null) && (suppliedDestination != null))
         {
-            throw new UnsupportedOperationException("This message producer was created with a Destination, therefore you cannot use an unidentified Destination");
+            throw new UnsupportedOperationException(
+                "This message producer was created with a Destination, therefore you cannot use an unidentified Destination");
         }
 
         if (suppliedDestination == null)
@@ -650,9 +646,7 @@
             throw new InvalidDestinationException("Supplied Destination was invalid");
         }
 
-
     }
-
 
     public AMQSession getSession()
     {

Modified: incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java?view=diff&rev=506483&r1=506482&r2=506483
==============================================================================
--- incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java (original)
+++ incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java Mon Feb 12 07:25:17 2007
@@ -20,27 +20,29 @@
  */
 package org.apache.qpid.client.message;
 
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.Map;
+
+import javax.jms.*;
+
 import org.apache.commons.collections.map.ReferenceMap;
+
 import org.apache.mina.common.ByteBuffer;
+
 import org.apache.qpid.AMQException;
-import org.apache.qpid.url.BindingURL;
-import org.apache.qpid.url.AMQBindingURL;
-import org.apache.qpid.url.URLSyntaxException;
-import org.apache.qpid.client.AMQUndefinedDestination;
 import org.apache.qpid.client.*;
+import org.apache.qpid.client.AMQUndefinedDestination;
+import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.AMQShortString;
-
-import javax.jms.*;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.Map;
+import org.apache.qpid.url.AMQBindingURL;
+import org.apache.qpid.url.BindingURL;
+import org.apache.qpid.url.URLSyntaxException;
 
 public abstract class AbstractJMSMessage extends AMQMessage implements org.apache.qpid.jms.Message
 {
     private static final Map _destinationCache = Collections.synchronizedMap(new ReferenceMap());
-                                                                  
 
     protected boolean _redelivered;
 
@@ -60,10 +62,11 @@
         {
             _data.acquire();
         }
+
         _readableProperties = false;
         _readableMessage = (data != null);
         _changedData = (data == null);
-        _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties)_contentHeaderProperties).getHeaders());
+        _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders());
     }
 
     protected AbstractJMSMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
@@ -71,28 +74,29 @@
     {
         this(contentHeader, deliveryTag);
 
-
-        int type = contentHeader.getHeaders().getInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName());
+        Integer type = contentHeader.getHeaders().getInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName());
+        int contentType = (type == null) ? AMQDestination.UNKNOWN_TYPE : type.intValue();
 
         AMQDestination dest;
 
-        switch(type)
+        switch (contentType)
         {
-            case AMQDestination.QUEUE_TYPE:
-                dest = new AMQQueue(exchange, routingKey, routingKey);
-                break;
-            case AMQDestination.TOPIC_TYPE:
-                dest = new AMQTopic(exchange, routingKey, null);
-                break;
-            default:
-                dest = new AMQUndefinedDestination(exchange, routingKey, null);
-                break;
+
+        case AMQDestination.QUEUE_TYPE:
+            dest = new AMQQueue(exchange, routingKey, routingKey);
+            break;
+
+        case AMQDestination.TOPIC_TYPE:
+            dest = new AMQTopic(exchange, routingKey, null);
+            break;
+
+        default:
+            dest = new AMQUndefinedDestination(exchange, routingKey, null);
+            break;
         }
         //Destination dest = AMQDestination.createDestination(url);
         setJMSDestination(dest);
 
-
-
         _data = data;
         if (_data != null)
         {
@@ -107,7 +111,7 @@
     {
         super(contentHeader, deliveryTag);
         _readableProperties = (_contentHeaderProperties != null);
-        _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties)_contentHeaderProperties).getHeaders());
+        _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders());
     }
 
     public String getJMSMessageID() throws JMSException
@@ -116,6 +120,7 @@
         {
             getContentHeaderProperties().setMessageId("ID:" + _deliveryTag);
         }
+
         return getContentHeaderProperties().getMessageId();
     }
 
@@ -178,6 +183,7 @@
 
                 _destinationCache.put(replyToEncoding, dest);
             }
+
             return dest;
         }
     }
@@ -188,11 +194,13 @@
         {
             throw new IllegalArgumentException("Null destination not allowed");
         }
+
         if (!(destination instanceof AMQDestination))
         {
-            throw new IllegalArgumentException("ReplyTo destination may only be an AMQDestination - passed argument was type " +
-                    destination.getClass());
+            throw new IllegalArgumentException(
+                "ReplyTo destination may only be an AMQDestination - passed argument was type " + destination.getClass());
         }
+
         final AMQDestination amqd = (AMQDestination) destination;
 
         final AMQShortString encodedDestination = amqd.getEncodedName();
@@ -278,17 +286,17 @@
         _readableMessage = false;
     }
 
-
     public boolean propertyExists(AMQShortString propertyName) throws JMSException
     {
         checkPropertyName(propertyName);
+
         return getJmsHeaders().propertyExists(propertyName);
     }
 
-
     public boolean propertyExists(String propertyName) throws JMSException
     {
         checkPropertyName(propertyName);
+
         return getJmsHeaders().propertyExists(propertyName);
     }
 
@@ -299,7 +307,6 @@
         return getJmsHeaders().getBoolean(propertyName);
     }
 
-
     public boolean getBooleanProperty(String propertyName) throws JMSException
     {
         checkPropertyName(propertyName);
@@ -310,48 +317,56 @@
     public byte getByteProperty(String propertyName) throws JMSException
     {
         checkPropertyName(propertyName);
+
         return getJmsHeaders().getByte(propertyName);
     }
 
     public short getShortProperty(String propertyName) throws JMSException
     {
         checkPropertyName(propertyName);
+
         return getJmsHeaders().getShort(propertyName);
     }
 
     public int getIntProperty(String propertyName) throws JMSException
     {
         checkPropertyName(propertyName);
+
         return getJmsHeaders().getInteger(propertyName);
     }
 
     public long getLongProperty(String propertyName) throws JMSException
     {
         checkPropertyName(propertyName);
+
         return getJmsHeaders().getLong(propertyName);
     }
 
     public float getFloatProperty(String propertyName) throws JMSException
     {
         checkPropertyName(propertyName);
+
         return getJmsHeaders().getFloat(propertyName);
     }
 
     public double getDoubleProperty(String propertyName) throws JMSException
     {
         checkPropertyName(propertyName);
+
         return getJmsHeaders().getDouble(propertyName);
     }
 
     public String getStringProperty(String propertyName) throws JMSException
     {
         checkPropertyName(propertyName);
+
         return getJmsHeaders().getString(propertyName);
     }
 
     public Object getObjectProperty(String propertyName) throws JMSException
     {
         checkPropertyName(propertyName);
+
         return getJmsHeaders().getObject(propertyName);
     }
 
@@ -436,7 +451,6 @@
         getJmsHeaders().remove(propertyName);
     }
 
-
     protected void removeProperty(String propertyName) throws JMSException
     {
         checkPropertyName(propertyName);
@@ -468,7 +482,6 @@
         }
     }
 
-
     /**
      * This forces concrete classes to implement clearBody()
      *
@@ -511,6 +524,7 @@
             {
                 buf.append('\n').append(getJmsHeaders().getHeaders());
             }
+
             return buf.toString();
         }
         catch (JMSException e)
@@ -519,7 +533,6 @@
         }
     }
 
-
     public void setUnderlyingMessagePropertiesMap(FieldTable messageProperties)
     {
         getContentHeaderProperties().setHeaders(messageProperties);
@@ -550,6 +563,7 @@
         {
             reset();
         }
+
         return _data;
     }
 
@@ -608,6 +622,7 @@
     public byte[] getBytesProperty(AMQShortString propertyName) throws JMSException
     {
         checkPropertyName(propertyName);
+
         return getJmsHeaders().getBytes(propertyName);
 
     }

Modified: incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java?view=diff&rev=506483&r1=506482&r2=506483
==============================================================================
--- incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java (original)
+++ incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java Mon Feb 12 07:25:17 2007
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,20 +20,40 @@
  */
 package org.apache.qpid.client.message;
 
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.jms.JMSException;
+
 import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.AMQShortString;
-
-import javax.jms.JMSException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.List;
 
 public class MessageFactoryRegistry
 {
     private final Map<String, MessageFactory> _mimeStringToFactoryMap = new HashMap<String, MessageFactory>();
-    private final Map<AMQShortString, MessageFactory> _mimeShortStringToFactoryMap = new HashMap<AMQShortString, MessageFactory>();
+    private final Map<AMQShortString, MessageFactory> _mimeShortStringToFactoryMap =
+        new HashMap<AMQShortString, MessageFactory>();
+
+    /**
+     * Construct a new registry with the default message factories registered
+     * @return a message factory registry
+     */
+    public static MessageFactoryRegistry newDefaultRegistry()
+    {
+        MessageFactoryRegistry mf = new MessageFactoryRegistry();
+        mf.registerFactory(JMSMapMessage.MIME_TYPE, new JMSMapMessageFactory());
+        mf.registerFactory("text/plain", new JMSTextMessageFactory());
+        mf.registerFactory("text/xml", new JMSTextMessageFactory());
+        mf.registerFactory(JMSBytesMessage.MIME_TYPE, new JMSBytesMessageFactory());
+        mf.registerFactory(JMSObjectMessage.MIME_TYPE, new JMSObjectMessageFactory());
+        mf.registerFactory(JMSStreamMessage.MIME_TYPE, new JMSStreamMessageFactory());
+        mf.registerFactory(null, new JMSBytesMessageFactory());
+
+        return mf;
+    }
 
     public void registerFactory(String mimeType, MessageFactory mf)
     {
@@ -41,6 +61,7 @@
         {
             throw new IllegalArgumentException("Message factory must not be null");
         }
+
         _mimeStringToFactoryMap.put(mimeType, mf);
         _mimeShortStringToFactoryMap.put(new AMQShortString(mimeType), mf);
     }
@@ -48,6 +69,7 @@
     public MessageFactory deregisterFactory(String mimeType)
     {
         _mimeShortStringToFactoryMap.remove(new AMQShortString(mimeType));
+
         return _mimeStringToFactoryMap.remove(mimeType);
     }
 
@@ -62,14 +84,19 @@
      * @throws AMQException
      * @throws JMSException
      */
-    public AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered,
-                                            AMQShortString exchange,
-                                            AMQShortString routingKey,
-                                            ContentHeaderBody contentHeader,
-                                            List bodies) throws AMQException, JMSException
+    public AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered, AMQShortString exchange,
+                                            AMQShortString routingKey, ContentHeaderBody contentHeader, List bodies)
+                                     throws AMQException, JMSException
     {
-        BasicContentHeaderProperties properties =  (BasicContentHeaderProperties) contentHeader.properties;
-        MessageFactory mf =  _mimeShortStringToFactoryMap.get(properties.getContentTypeShortString());
+        BasicContentHeaderProperties properties = (BasicContentHeaderProperties) contentHeader.properties;
+
+        // Get the message content type. This may be null for pure AMQP messages, but will always be set for JMS over
+        // AMQP. When the type is null, it can only be assumed that the message is a byte message.
+        AMQShortString contentTypeShortString = properties.getContentTypeShortString();
+        contentTypeShortString = (contentTypeShortString == null) ? new AMQShortString(JMSBytesMessage.MIME_TYPE)
+                                                                  : contentTypeShortString;
+
+        MessageFactory mf = _mimeShortStringToFactoryMap.get(contentTypeShortString);
         if (mf == null)
         {
             throw new AMQException("Unsupport MIME type of " + properties.getContentType());
@@ -86,6 +113,7 @@
         {
             throw new IllegalArgumentException("Mime type must not be null");
         }
+
         MessageFactory mf = _mimeStringToFactoryMap.get(mimeType);
         if (mf == null)
         {
@@ -95,22 +123,5 @@
         {
             return mf.createMessage();
         }
-    }
-
-    /**
-     * Construct a new registry with the default message factories registered
-     * @return a message factory registry
-     */
-    public static MessageFactoryRegistry newDefaultRegistry()
-    {
-        MessageFactoryRegistry mf = new MessageFactoryRegistry();
-        mf.registerFactory(JMSMapMessage.MIME_TYPE, new JMSMapMessageFactory());
-        mf.registerFactory("text/plain", new JMSTextMessageFactory());
-        mf.registerFactory("text/xml", new JMSTextMessageFactory());
-        mf.registerFactory(JMSBytesMessage.MIME_TYPE, new JMSBytesMessageFactory());
-        mf.registerFactory(JMSObjectMessage.MIME_TYPE, new JMSObjectMessageFactory());
-        mf.registerFactory(JMSStreamMessage.MIME_TYPE, new JMSStreamMessageFactory());
-        mf.registerFactory(null, new JMSBytesMessageFactory());
-        return mf;
     }
 }

Modified: incubator/qpid/branches/perftesting_persistent/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Listener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting_persistent/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Listener.java?view=diff&rev=506483&r1=506482&r2=506483
==============================================================================
--- incubator/qpid/branches/perftesting_persistent/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Listener.java (original)
+++ incubator/qpid/branches/perftesting_persistent/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Listener.java Mon Feb 12 07:25:17 2007
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,122 +20,277 @@
  */
 package org.apache.qpid.topic;
 
-import javax.jms.Connection;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
+import java.util.Random;
+
+import javax.jms.*;
+
+import org.apache.log4j.Logger;
+import org.apache.log4j.NDC;
 
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQTopic;
+
+/**
+ * This class has not kept up to date with the topic_listener in the cpp tests. It should provide identical behaviour for
+ * cross testing the java and cpp clients.
+ *
+ * <p/>How the cpp topic_publisher operates:
+ * It publishes text messages to the default topic exchange, on virtual host "/test", on the topic "topic_control", for
+ * the specified number of test messages to be sent.
+ * It publishes a report request message (on same topic), with the header text field "TYPE", value "REPORT_REQUEST",
+ * optionally within a transaction, and waits for the specified number of consumers to reply to this request. The
+ * listeners should reply to this message on a queue named "response", on virtual host "/test", with some sort of message
+ * about the number of messages received and how long it took, although the publisher never looks at the message content.
+ * The publisher then send a message (on the same topic), with the header text field "TYPE", value "TERMINATION_REQUEST",
+ * which the listener should close its connection and terminate upon receipt of.
+ *
+ * @deprecated Use PingPongBouncer instead once the below todo is completed.
+ *
+ * @todo Make the functionality of this class available through PingPongBouncer. Rename PingPongBouncer to
+ *       PingListener and make its bouncing functionality optional, either through a switch or as an extending class
+ *       called PingBouncer. Want to have as few ping classes as possible with configurable behaviour, re-using code
+ *       accross p2p and topic style tests in almost all cases.
+ */
 public class Listener implements MessageListener
 {
+    private static Logger log = Logger.getLogger(Listener.class);
+
+    private static final char[] DATA = "abcdefghijklmnopqrstuvwxyz".toCharArray();
+    public static final String CONTROL_TOPIC = "topic_control";
+    public static final String RESPONSE_QUEUE = "response";
+
+    private final Topic _topic;
+    //private final Topic _control;
+
+    private final Queue _response;
+
+    private final byte[] _payload;
+
+    /** Holds the connection to listen on. */
     private final Connection _connection;
+
+    /** Holds the producer to send control messages on. */
     private final MessageProducer _controller;
+
+    /** Holds the JMS session. */
     private final javax.jms.Session _session;
-    private final MessageFactory _factory;
+
+    /** Holds a flag to indicate that a timer has begun on the first message. Reset when report is sent. */
     private boolean init;
+
+    /** Holds the count of messages received by this listener. */
     private int count;
-    private long start;
 
-    Listener(Connection connection, int ackMode) throws Exception
-    {
-        this(connection, ackMode, null);
-    }
+    /** Used to hold the start time of the first message. */
+    private long start;
+    private static String clientId;
 
     Listener(Connection connection, int ackMode, String name) throws Exception
     {
+        log.debug("Listener(Connection connection = " + connection + ", int ackMode = " + ackMode + ", String name = " + name
+                  + "): called");
+
         _connection = connection;
         _session = connection.createSession(false, ackMode);
-        _factory = new MessageFactory(_session);
 
-        //register for events
-        if(name == null)
+        if (_session instanceof AMQSession)
         {
-            _factory.createTopicConsumer().setMessageListener(this);
+            _topic = new AMQTopic(CONTROL_TOPIC);
+            //_control = new AMQTopic(CONTROL_TOPIC);
+            _response = new AMQQueue(RESPONSE_QUEUE);
         }
         else
         {
-            _factory.createDurableTopicConsumer(name).setMessageListener(this);
+            _topic = _session.createTopic(CONTROL_TOPIC);
+            //_control = _session.createTopic(CONTROL_TOPIC);
+            _response = _session.createQueue(RESPONSE_QUEUE);
         }
 
-        _connection.start();
+        int size = 256;
 
-        _controller = _factory.createControlPublisher();
-        System.out.println("Waiting for messages " +
-                Config.getAckModeDescription(ackMode)
-                + (name == null ? "" : " (subscribed with name " + name + " and client id " + connection.getClientID() + ")")
-                + "...");
+        _payload = new byte[size];
 
-    }
+        for (int i = 0; i < size; i++)
+        {
+            _payload[i] = (byte) DATA[i % DATA.length];
+        }
 
-    private void shutdown()
-    {
-        try
+        //register for events
+        if (name == null)
         {
-            _session.close();
-            _connection.stop();
-            _connection.close();
+            log.debug("Calling _factory.createTopicConsumer().setMessageListener(this)");
+            createTopicConsumer().setMessageListener(this);
         }
-        catch(Exception e)
+        else
         {
-            e.printStackTrace(System.out);
+            log.debug("Calling createDurableTopicConsumer(name).setMessageListener(this)");
+            createDurableTopicConsumer(name).setMessageListener(this);
         }
+
+        _connection.start();
+
+        _controller = createControlPublisher();
+        System.out.println("Waiting for messages " + Config.getAckModeDescription(ackMode)
+                           +
+                           ((name == null)
+                            ? "" : (" (subscribed with name " + name + " and client id " + connection.getClientID() + ")"))
+                           + "...");
     }
 
-    private void report()
+    public static void main(String[] argv) throws Exception
     {
-        try
-        {
-            String msg = getReport();
-            _controller.send(_factory.createReportResponseMessage(msg));
-            System.out.println("Sent report: " + msg);
-        }
-        catch(Exception e)
+        clientId = "Listener-" + System.currentTimeMillis();
+
+        NDC.push(clientId);
+
+        Config config = new Config();
+        config.setOptions(argv);
+
+        //Connection con = config.createConnection();
+        Connection con =
+            new AMQConnection("amqp://guest:guest@testid/test?brokerlist='" + config.getHost() + ":" + config.getPort()
+                              + "'");
+
+        if (config.getClientId() != null)
         {
-            e.printStackTrace(System.out);
+            con.setClientID(config.getClientId());
         }
+
+        new Listener(con, config.getAckMode(), config.getSubscriptionId());
+
+        NDC.pop();
+        NDC.remove();
     }
 
-    private String getReport()
+    /**
+     * Checks whether or not a text field on a message has the specified value.
+     *
+     * @param m         The message to check.
+     * @param fieldName The name of the field to check.
+     * @param value     The expected value of the field to compare with.
+     *
+     * @return <tt>true</tt>If the specified field has the specified value, <tt>fals</tt> otherwise.
+     *
+     * @throws JMSException Any JMSExceptions are allowed to fall through.
+     */
+    private static boolean checkTextField(Message m, String fieldName, String value) throws JMSException
     {
-        long time = (System.currentTimeMillis() - start);
-        return "Received " + count + " in " + time + "ms";
+        log.debug("private static boolean checkTextField(Message m = " + m + ", String fieldName = " + fieldName
+                  + ", String value = " + value + "): called");
+
+        String comp = m.getStringProperty(fieldName);
+
+        return (comp != null) && comp.equals(value);
     }
 
     public void onMessage(Message message)
     {
-        if(!init)
+        NDC.push(clientId);
+
+        log.debug("public void onMessage(Message message): called");
+
+        if (!init)
         {
-            start = System.currentTimeMillis();
+            start = System.nanoTime() / 1000000;
             count = 0;
             init = true;
         }
 
-        if(_factory.isShutdown(message))
+        try
         {
-            shutdown();
+            if (isShutdown(message))
+            {
+                shutdown();
+            }
+            else if (isReport(message))
+            {
+                //send a report:
+                report();
+                init = false;
+            }
         }
-        else if(_factory.isReport(message))
+        catch (JMSException e)
         {
-            //send a report:
-            report();
-            init = false;
+            log.warn("There was a JMSException during onMessage.", e);
         }
-        else if (++count % 100 == 0)
+        finally
         {
-            System.out.println("Received " + count + " messages.");
+            NDC.pop();
         }
     }
 
-    public static void main(String[] argv) throws Exception
+    Message createReportResponseMessage(String msg) throws JMSException
     {
-        Config config = new Config();
-        config.setOptions(argv);
+        return _session.createTextMessage(msg);
+    }
+
+    boolean isShutdown(Message m) throws JMSException
+    {
+        boolean result = checkTextField(m, "TYPE", "TERMINATION_REQUEST");
+
+        log.debug("isShutdown = " + result);
+
+        return result;
+    }
+
+    boolean isReport(Message m) throws JMSException
+    {
+        boolean result = checkTextField(m, "TYPE", "REPORT_REQUEST");
+
+        log.debug("isReport = " + result);
+
+        return result;
+    }
+
+    MessageConsumer createTopicConsumer() throws Exception
+    {
+        return _session.createConsumer(_topic);
+    }
+
+    MessageConsumer createDurableTopicConsumer(String name) throws Exception
+    {
+        return _session.createDurableSubscriber(_topic, name);
+    }
+
+    MessageProducer createControlPublisher() throws Exception
+    {
+        return _session.createProducer(_response);
+    }
 
-        Connection con = config.createConnection();
-        if(config.getClientId() != null)
+    private void shutdown()
+    {
+        try
         {
-            con.setClientID(config.getClientId());
+            _session.close();
+            _connection.stop();
+            _connection.close();
         }
-        new Listener(con, config.getAckMode(), config.getSubscriptionId());
+        catch (Exception e)
+        {
+            e.printStackTrace(System.out);
+        }
+    }
+
+    private void report()
+    {
+        try
+        {
+            String msg = getReport();
+            _controller.send(createReportResponseMessage(msg));
+            System.out.println("Sent report: " + msg);
+        }
+        catch (Exception e)
+        {
+            e.printStackTrace(System.out);
+        }
+    }
+
+    private String getReport()
+    {
+        long time = ((System.nanoTime() / 1000000) - start);
+
+        return "Received " + count + " in " + time + "ms";
     }
 }

Modified: incubator/qpid/branches/perftesting_persistent/qpid/java/perftests/src/main/java/org/apache/qpid/topic/MessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting_persistent/qpid/java/perftests/src/main/java/org/apache/qpid/topic/MessageFactory.java?view=diff&rev=506483&r1=506482&r2=506483
==============================================================================
--- incubator/qpid/branches/perftesting_persistent/qpid/java/perftests/src/main/java/org/apache/qpid/topic/MessageFactory.java (original)
+++ incubator/qpid/branches/perftesting_persistent/qpid/java/perftests/src/main/java/org/apache/qpid/topic/MessageFactory.java Mon Feb 12 07:25:17 2007
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,11 +20,11 @@
  */
 package org.apache.qpid.topic;
 
+import javax.jms.*;
+
 import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.client.AMQTopic;
 
-import javax.jms.*;
-
 /**
  */
 class MessageFactory
@@ -36,7 +36,6 @@
     private final Topic _control;
     private final byte[] _payload;
 
-
     MessageFactory(Session session) throws JMSException
     {
         this(session, 256);
@@ -45,24 +44,39 @@
     MessageFactory(Session session, int size) throws JMSException
     {
         _session = session;
-        if(session instanceof AMQSession)
+        if (session instanceof AMQSession)
         {
-            _topic = new AMQTopic("topictest.messages");
+            _topic = new AMQTopic("topic_control");
             _control = new AMQTopic("topictest.control");
         }
         else
         {
-            _topic = session.createTopic("topictest.messages");
+            _topic = session.createTopic("topic_control");
             _control = session.createTopic("topictest.control");
         }
+
         _payload = new byte[size];
 
-        for(int i = 0; i < size; i++)
+        for (int i = 0; i < size; i++)
         {
             _payload[i] = (byte) DATA[i % DATA.length];
         }
     }
 
+    private static boolean checkText(Message m, String s)
+    {
+        try
+        {
+            return (m instanceof TextMessage) && ((TextMessage) m).getText().equals(s);
+        }
+        catch (JMSException e)
+        {
+            e.printStackTrace(System.out);
+
+            return false;
+        }
+    }
+
     Topic getTopic()
     {
         return _topic;
@@ -72,6 +86,7 @@
     {
         BytesMessage msg = _session.createBytesMessage();
         msg.writeBytes(_payload);
+
         return msg;
     }
 
@@ -109,6 +124,7 @@
         catch (JMSException e)
         {
             e.printStackTrace(System.out);
+
             return e.toString();
         }
     }
@@ -136,18 +152,5 @@
     MessageProducer createControlPublisher() throws Exception
     {
         return _session.createProducer(_control);
-    }
-
-    private static boolean checkText(Message m, String s)
-    {
-        try
-        {
-            return m instanceof TextMessage && ((TextMessage) m).getText().equals(s);
-        }
-        catch (JMSException e)
-        {
-            e.printStackTrace(System.out);
-            return false;
-        }
     }
 }