You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/02/12 16:25:18 UTC
svn commit: r506483 - in /incubator/qpid/branches/perftesting_persistent: ./
qpid/java/client/src/main/java/org/apache/qpid/client/
qpid/java/client/src/main/java/org/apache/qpid/client/message/
qpid/java/perftests/src/main/java/org/apache/qpid/topic/
Author: rgreig
Date: Mon Feb 12 07:25:17 2007
New Revision: 506483
URL: http://svn.apache.org/viewvc?view=rev&rev=506483
Log:
Merged revisions 506439 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/trunk
........
r506439 | rgreig | 2007-02-12 13:25:36 +0000 (Mon, 12 Feb 2007) | 3 lines
(Patch submitted by Rupert Smith) Qpid-360 fixes.
Message type defaults to ByteMessage when not specified.
Unknown destination type is used as default when not specified.
........
Modified:
incubator/qpid/branches/perftesting_persistent/ (props changed)
incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
incubator/qpid/branches/perftesting_persistent/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Listener.java
incubator/qpid/branches/perftesting_persistent/qpid/java/perftests/src/main/java/org/apache/qpid/topic/MessageFactory.java
Propchange: incubator/qpid/branches/perftesting_persistent/
------------------------------------------------------------------------------
--- svnmerge-integrated (original)
+++ svnmerge-integrated Mon Feb 12 07:25:17 2007
@@ -1 +1 @@
-/incubator/qpid/trunk:1-504056,504915-505241,505243-505256,505892
+/incubator/qpid/trunk:1-504056,504915-505241,505243-505256,505892,506439
Modified: incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?view=diff&rev=506483&r1=506482&r2=506483
==============================================================================
--- incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Mon Feb 12 07:25:17 2007
@@ -20,17 +20,20 @@
*/
package org.apache.qpid.client;
+import java.io.UnsupportedEncodingException;
+
+import javax.jms.*;
+
import org.apache.log4j.Logger;
+
import org.apache.mina.common.ByteBuffer;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageConverter;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.*;
-import javax.jms.*;
-import java.io.UnsupportedEncodingException;
-
public class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer
{
protected final Logger _logger = Logger.getLogger(getClass());
@@ -101,9 +104,9 @@
private final boolean _waitUntilSent;
private static final ContentBody[] NO_CONTENT_BODIES = new ContentBody[0];
- protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted,
- int channelId, AMQSession session, AMQProtocolHandler protocolHandler,
- long producerId, boolean immediate, boolean mandatory, boolean waitUntilSent)
+ protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
+ AMQSession session, AMQProtocolHandler protocolHandler, long producerId,
+ boolean immediate, boolean mandatory, boolean waitUntilSent)
{
_connection = connection;
_destination = destination;
@@ -116,6 +119,7 @@
{
declareDestination(destination);
}
+
_immediate = immediate;
_mandatory = mandatory;
_waitUntilSent = waitUntilSent;
@@ -134,18 +138,18 @@
// Declare the exchange
// Note that the durable and internal arguments are ignored since passive is set to false
// TODO: Be aware of possible changes to parameter order as versions change.
- AMQFrame declare = ExchangeDeclareBody.createAMQFrame(_channelId,
- _protocolHandler.getProtocolMajorVersion(),
- _protocolHandler.getProtocolMinorVersion(),
- null, // arguments
- false, // autoDelete
- false, // durable
- destination.getExchangeName(), // exchange
- false, // internal
- true, // nowait
- false, // passive
- 0, // ticket
- destination.getExchangeClass()); // type
+ AMQFrame declare =
+ ExchangeDeclareBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(),
+ _protocolHandler.getProtocolMinorVersion(), null, // arguments
+ false, // autoDelete
+ false, // durable
+ destination.getExchangeName(), // exchange
+ false, // internal
+ true, // nowait
+ false, // passive
+ _session.getTicket(), // ticket
+ destination.getExchangeClass()); // type
+
_protocolHandler.writeFrame(declare);
}
@@ -159,6 +163,7 @@
public boolean getDisableMessageID() throws JMSException
{
checkNotClosed();
+
// Always false for AMQP
return false;
}
@@ -172,39 +177,44 @@
public boolean getDisableMessageTimestamp() throws JMSException
{
checkNotClosed();
+
return _disableTimestamps;
}
public void setDeliveryMode(int i) throws JMSException
{
checkPreConditions();
- if (i != DeliveryMode.NON_PERSISTENT && i != DeliveryMode.PERSISTENT)
+ if ((i != DeliveryMode.NON_PERSISTENT) && (i != DeliveryMode.PERSISTENT))
{
- throw new JMSException("DeliveryMode must be either NON_PERSISTENT or PERSISTENT. Value of " + i +
- " is illegal");
+ throw new JMSException("DeliveryMode must be either NON_PERSISTENT or PERSISTENT. Value of " + i
+ + " is illegal");
}
+
_deliveryMode = i;
}
public int getDeliveryMode() throws JMSException
{
checkNotClosed();
+
return _deliveryMode;
}
public void setPriority(int i) throws JMSException
{
checkPreConditions();
- if (i < 0 || i > 9)
+ if ((i < 0) || (i > 9))
{
throw new IllegalArgumentException("Priority of " + i + " is illegal. Value must be in range 0 to 9");
}
+
_messagePriority = i;
}
public int getPriority() throws JMSException
{
checkNotClosed();
+
return _messagePriority;
}
@@ -215,18 +225,21 @@
{
throw new IllegalArgumentException("Time to live must be non-negative - supplied value was " + l);
}
+
_timeToLive = l;
}
public long getTimeToLive() throws JMSException
{
checkNotClosed();
+
return _timeToLive;
}
public Destination getDestination() throws JMSException
{
checkNotClosed();
+
return _destination;
}
@@ -241,11 +254,9 @@
checkPreConditions();
checkInitialDestination();
-
synchronized (_connection.getFailoverMutex())
{
- sendImpl(_destination, message, _deliveryMode, _messagePriority, _timeToLive,
- _mandatory, _immediate);
+ sendImpl(_destination, message, _deliveryMode, _messagePriority, _timeToLive, _mandatory, _immediate);
}
}
@@ -256,8 +267,7 @@
synchronized (_connection.getFailoverMutex())
{
- sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive,
- _mandatory, _immediate);
+ sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive, _mandatory, _immediate);
}
}
@@ -267,20 +277,17 @@
checkInitialDestination();
synchronized (_connection.getFailoverMutex())
{
- sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive,
- _mandatory, immediate);
+ sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive, _mandatory, immediate);
}
}
- public void send(Message message, int deliveryMode, int priority,
- long timeToLive) throws JMSException
+ public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException
{
checkPreConditions();
checkInitialDestination();
synchronized (_connection.getFailoverMutex())
{
- sendImpl(_destination, message, deliveryMode, priority, timeToLive, _mandatory,
- _immediate);
+ sendImpl(_destination, message, deliveryMode, priority, timeToLive, _mandatory, _immediate);
}
}
@@ -291,69 +298,60 @@
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
- sendImpl((AMQDestination) destination, message, _deliveryMode, _messagePriority, _timeToLive,
- _mandatory, _immediate);
+ sendImpl((AMQDestination) destination, message, _deliveryMode, _messagePriority, _timeToLive, _mandatory,
+ _immediate);
}
}
- public void send(Destination destination, Message message, int deliveryMode,
- int priority, long timeToLive)
- throws JMSException
+ public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive)
+ throws JMSException
{
checkPreConditions();
checkDestination(destination);
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
- sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive,
- _mandatory, _immediate);
+ sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, _mandatory, _immediate);
}
}
- public void send(Destination destination, Message message, int deliveryMode,
- int priority, long timeToLive, boolean mandatory)
- throws JMSException
+ public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive,
+ boolean mandatory) throws JMSException
{
checkPreConditions();
checkDestination(destination);
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
- sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive,
- mandatory, _immediate);
+ sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, mandatory, _immediate);
}
}
- public void send(Destination destination, Message message, int deliveryMode,
- int priority, long timeToLive, boolean mandatory, boolean immediate)
- throws JMSException
+ public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive,
+ boolean mandatory, boolean immediate) throws JMSException
{
checkPreConditions();
checkDestination(destination);
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
- sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive,
- mandatory, immediate);
+ sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, mandatory, immediate);
}
}
- public void send(Destination destination, Message message, int deliveryMode,
- int priority, long timeToLive, boolean mandatory,
- boolean immediate, boolean waitUntilSent)
- throws JMSException
+ public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive,
+ boolean mandatory, boolean immediate, boolean waitUntilSent) throws JMSException
{
checkPreConditions();
checkDestination(destination);
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
- sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive,
- mandatory, immediate, waitUntilSent);
+ sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, mandatory, immediate,
+ waitUntilSent);
}
}
-
private AbstractJMSMessage convertToNativeMessage(Message message) throws JMSException
{
if (message instanceof AbstractJMSMessage)
@@ -366,23 +364,23 @@
if (message instanceof BytesMessage)
{
- newMessage = new MessageConverter((BytesMessage)message).getConvertedMessage();
+ newMessage = new MessageConverter((BytesMessage) message).getConvertedMessage();
}
else if (message instanceof MapMessage)
{
- newMessage = new MessageConverter((MapMessage)message).getConvertedMessage();
+ newMessage = new MessageConverter((MapMessage) message).getConvertedMessage();
}
else if (message instanceof ObjectMessage)
{
- newMessage = new MessageConverter((ObjectMessage)message).getConvertedMessage();
+ newMessage = new MessageConverter((ObjectMessage) message).getConvertedMessage();
}
else if (message instanceof TextMessage)
{
- newMessage = new MessageConverter((TextMessage)message).getConvertedMessage();
+ newMessage = new MessageConverter((TextMessage) message).getConvertedMessage();
}
else if (message instanceof StreamMessage)
{
- newMessage = new MessageConverter((StreamMessage)message).getConvertedMessage();
+ newMessage = new MessageConverter((StreamMessage) message).getConvertedMessage();
}
else
{
@@ -395,24 +393,25 @@
}
else
{
- throw new JMSException("Unable to send message, due to class conversion error: " + message.getClass().getName());
+ throw new JMSException("Unable to send message, due to class conversion error: "
+ + message.getClass().getName());
}
}
}
-
private void validateDestination(Destination destination) throws JMSException
{
if (!(destination instanceof AMQDestination))
{
- throw new JMSException("Unsupported destination class: " +
- (destination != null ? destination.getClass() : null));
+ throw new JMSException("Unsupported destination class: "
+ + ((destination != null) ? destination.getClass() : null));
}
+
declareDestination((AMQDestination) destination);
}
- protected void sendImpl(AMQDestination destination, Message message, int deliveryMode, int priority,
- long timeToLive, boolean mandatory, boolean immediate) throws JMSException
+ protected void sendImpl(AMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,
+ boolean mandatory, boolean immediate) throws JMSException
{
sendImpl(destination, message, deliveryMode, priority, timeToLive, mandatory, immediate, _waitUntilSent);
}
@@ -429,21 +428,20 @@
* @param immediate
* @throws JMSException
*/
- protected void sendImpl(AMQDestination destination, Message origMessage, int deliveryMode, int priority,
- long timeToLive, boolean mandatory, boolean immediate, boolean wait) throws JMSException
+ protected void sendImpl(AMQDestination destination, Message origMessage, int deliveryMode, int priority, long timeToLive,
+ boolean mandatory, boolean immediate, boolean wait) throws JMSException
{
checkTemporaryDestination(destination);
origMessage.setJMSDestination(destination);
-
AbstractJMSMessage message = convertToNativeMessage(origMessage);
int type;
- if(destination instanceof Topic)
+ if (destination instanceof Topic)
{
type = AMQDestination.TOPIC_TYPE;
}
- else if(destination instanceof Queue)
+ else if (destination instanceof Queue)
{
type = AMQDestination.QUEUE_TYPE;
}
@@ -452,22 +450,19 @@
type = AMQDestination.UNKNOWN_TYPE;
}
- message.getJmsHeaders().setInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName(),
- type);
+ message.getJmsHeaders().setInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName(), type);
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- AMQFrame publishFrame = BasicPublishBody.createAMQFrame(_channelId,
- _protocolHandler.getProtocolMajorVersion(),
- _protocolHandler.getProtocolMinorVersion(),
- destination.getExchangeName(), // exchange
- immediate, // immediate
- mandatory, // mandatory
- destination.getRoutingKey(), // routingKey
- 0); // ticket
-
-
+ AMQFrame publishFrame =
+ BasicPublishBody.createAMQFrame(
+ _channelId, _protocolHandler.getProtocolMajorVersion(), _protocolHandler.getProtocolMinorVersion(),
+ destination.getExchangeName(), // exchange
+ immediate, // immediate
+ mandatory, // mandatory
+ destination.getRoutingKey(), // routingKey
+ _session.getTicket()); // ticket
message.prepareForSending();
ByteBuffer payload = message.getData();
@@ -487,6 +482,7 @@
contentHeaderProperties.setExpiration(0);
}
}
+
contentHeaderProperties.setDeliveryMode((byte) deliveryMode);
contentHeaderProperties.setPriority((byte) priority);
@@ -494,12 +490,12 @@
final int contentBodyFrameCount = calculateContentBodyFrameCount(payload);
final AMQFrame[] frames = new AMQFrame[2 + contentBodyFrameCount];
- if(payload != null)
+ if (payload != null)
{
createContentBodies(payload, frames, 2, _channelId);
}
- if (contentBodyFrameCount != 0 && _logger.isDebugEnabled())
+ if ((contentBodyFrameCount != 0) && _logger.isDebugEnabled())
{
_logger.debug("Sending content body frames to " + destination);
}
@@ -508,12 +504,10 @@
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
AMQFrame contentHeaderFrame =
- ContentHeaderBody.createAMQFrame(_channelId,
- BasicConsumeBody.getClazz(_protocolHandler.getProtocolMajorVersion(),
- _protocolHandler.getProtocolMinorVersion()),
- 0,
- contentHeaderProperties,
- size);
+ ContentHeaderBody.createAMQFrame(_channelId,
+ BasicConsumeBody.getClazz(_protocolHandler.getProtocolMajorVersion(),
+ _protocolHandler.getProtocolMinorVersion()), 0,
+ contentHeaderProperties, size);
if (_logger.isDebugEnabled())
{
_logger.debug("Sending content header frame to " + destination);
@@ -524,7 +518,6 @@
CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames);
_protocolHandler.writeFrame(compositeFrame, wait);
-
if (message != origMessage)
{
_logger.debug("Updating original message");
@@ -538,16 +531,17 @@
private void checkTemporaryDestination(AMQDestination destination) throws JMSException
{
- if(destination instanceof TemporaryDestination)
+ if (destination instanceof TemporaryDestination)
{
_logger.debug("destination is temporary destination");
TemporaryDestination tempDest = (TemporaryDestination) destination;
- if(tempDest.getSession().isClosed())
+ if (tempDest.getSession().isClosed())
{
_logger.debug("session is closed");
throw new JMSException("Session for temporary destination has been closed");
}
- if(tempDest.isDeleted())
+
+ if (tempDest.isDeleted())
{
_logger.debug("destination is deleted");
throw new JMSException("Cannot send to a deleted temporary destination");
@@ -567,9 +561,9 @@
private void createContentBodies(ByteBuffer payload, AMQFrame[] frames, int offset, int channelId)
{
- if (frames.length == offset + 1)
+ if (frames.length == (offset + 1))
{
- frames[offset] = ContentBody.createAMQFrame(channelId,new ContentBody(payload));
+ frames[offset] = ContentBody.createAMQFrame(channelId, new ContentBody(payload));
}
else
{
@@ -578,10 +572,10 @@
long remaining = payload.remaining();
for (int i = offset; i < frames.length; i++)
{
- payload.position((int) framePayloadMax * (i-offset));
+ payload.position((int) framePayloadMax * (i - offset));
int length = (remaining >= framePayloadMax) ? (int) framePayloadMax : (int) remaining;
payload.limit(payload.position() + length);
- frames[i] = ContentBody.createAMQFrame(channelId,new ContentBody(payload.slice()));
+ frames[i] = ContentBody.createAMQFrame(channelId, new ContentBody(payload.slice()));
remaining -= length;
}
@@ -594,7 +588,7 @@
// we substract one from the total frame maximum size to account for the end of frame marker in a body frame
// (0xCE byte).
int frameCount;
- if(payload == null || payload.remaining() == 0)
+ if ((payload == null) || (payload.remaining() == 0))
{
frameCount = 0;
}
@@ -602,9 +596,10 @@
{
int dataLength = payload.remaining();
final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1;
- int lastFrame = (dataLength % framePayloadMax) > 0 ? 1 : 0;
+ int lastFrame = ((dataLength % framePayloadMax) > 0) ? 1 : 0;
frameCount = (int) (dataLength / framePayloadMax) + lastFrame;
}
+
return frameCount;
}
@@ -624,7 +619,7 @@
{
checkNotClosed();
- if (_session == null || _session.isClosed())
+ if ((_session == null) || _session.isClosed())
{
throw new javax.jms.IllegalStateException("Invalid Session");
}
@@ -640,9 +635,10 @@
private void checkDestination(Destination suppliedDestination) throws InvalidDestinationException
{
- if (_destination != null && suppliedDestination != null)
+ if ((_destination != null) && (suppliedDestination != null))
{
- throw new UnsupportedOperationException("This message producer was created with a Destination, therefore you cannot use an unidentified Destination");
+ throw new UnsupportedOperationException(
+ "This message producer was created with a Destination, therefore you cannot use an unidentified Destination");
}
if (suppliedDestination == null)
@@ -650,9 +646,7 @@
throw new InvalidDestinationException("Supplied Destination was invalid");
}
-
}
-
public AMQSession getSession()
{
Modified: incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java?view=diff&rev=506483&r1=506482&r2=506483
==============================================================================
--- incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java (original)
+++ incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java Mon Feb 12 07:25:17 2007
@@ -20,27 +20,29 @@
*/
package org.apache.qpid.client.message;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.Map;
+
+import javax.jms.*;
+
import org.apache.commons.collections.map.ReferenceMap;
+
import org.apache.mina.common.ByteBuffer;
+
import org.apache.qpid.AMQException;
-import org.apache.qpid.url.BindingURL;
-import org.apache.qpid.url.AMQBindingURL;
-import org.apache.qpid.url.URLSyntaxException;
-import org.apache.qpid.client.AMQUndefinedDestination;
import org.apache.qpid.client.*;
+import org.apache.qpid.client.AMQUndefinedDestination;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.AMQShortString;
-
-import javax.jms.*;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.Map;
+import org.apache.qpid.url.AMQBindingURL;
+import org.apache.qpid.url.BindingURL;
+import org.apache.qpid.url.URLSyntaxException;
public abstract class AbstractJMSMessage extends AMQMessage implements org.apache.qpid.jms.Message
{
private static final Map _destinationCache = Collections.synchronizedMap(new ReferenceMap());
-
protected boolean _redelivered;
@@ -60,10 +62,11 @@
{
_data.acquire();
}
+
_readableProperties = false;
_readableMessage = (data != null);
_changedData = (data == null);
- _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties)_contentHeaderProperties).getHeaders());
+ _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders());
}
protected AbstractJMSMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
@@ -71,28 +74,29 @@
{
this(contentHeader, deliveryTag);
-
- int type = contentHeader.getHeaders().getInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName());
+ Integer type = contentHeader.getHeaders().getInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName());
+ int contentType = (type == null) ? AMQDestination.UNKNOWN_TYPE : type.intValue();
AMQDestination dest;
- switch(type)
+ switch (contentType)
{
- case AMQDestination.QUEUE_TYPE:
- dest = new AMQQueue(exchange, routingKey, routingKey);
- break;
- case AMQDestination.TOPIC_TYPE:
- dest = new AMQTopic(exchange, routingKey, null);
- break;
- default:
- dest = new AMQUndefinedDestination(exchange, routingKey, null);
- break;
+
+ case AMQDestination.QUEUE_TYPE:
+ dest = new AMQQueue(exchange, routingKey, routingKey);
+ break;
+
+ case AMQDestination.TOPIC_TYPE:
+ dest = new AMQTopic(exchange, routingKey, null);
+ break;
+
+ default:
+ dest = new AMQUndefinedDestination(exchange, routingKey, null);
+ break;
}
//Destination dest = AMQDestination.createDestination(url);
setJMSDestination(dest);
-
-
_data = data;
if (_data != null)
{
@@ -107,7 +111,7 @@
{
super(contentHeader, deliveryTag);
_readableProperties = (_contentHeaderProperties != null);
- _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties)_contentHeaderProperties).getHeaders());
+ _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders());
}
public String getJMSMessageID() throws JMSException
@@ -116,6 +120,7 @@
{
getContentHeaderProperties().setMessageId("ID:" + _deliveryTag);
}
+
return getContentHeaderProperties().getMessageId();
}
@@ -178,6 +183,7 @@
_destinationCache.put(replyToEncoding, dest);
}
+
return dest;
}
}
@@ -188,11 +194,13 @@
{
throw new IllegalArgumentException("Null destination not allowed");
}
+
if (!(destination instanceof AMQDestination))
{
- throw new IllegalArgumentException("ReplyTo destination may only be an AMQDestination - passed argument was type " +
- destination.getClass());
+ throw new IllegalArgumentException(
+ "ReplyTo destination may only be an AMQDestination - passed argument was type " + destination.getClass());
}
+
final AMQDestination amqd = (AMQDestination) destination;
final AMQShortString encodedDestination = amqd.getEncodedName();
@@ -278,17 +286,17 @@
_readableMessage = false;
}
-
public boolean propertyExists(AMQShortString propertyName) throws JMSException
{
checkPropertyName(propertyName);
+
return getJmsHeaders().propertyExists(propertyName);
}
-
public boolean propertyExists(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
+
return getJmsHeaders().propertyExists(propertyName);
}
@@ -299,7 +307,6 @@
return getJmsHeaders().getBoolean(propertyName);
}
-
public boolean getBooleanProperty(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
@@ -310,48 +317,56 @@
public byte getByteProperty(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
+
return getJmsHeaders().getByte(propertyName);
}
public short getShortProperty(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
+
return getJmsHeaders().getShort(propertyName);
}
public int getIntProperty(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
+
return getJmsHeaders().getInteger(propertyName);
}
public long getLongProperty(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
+
return getJmsHeaders().getLong(propertyName);
}
public float getFloatProperty(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
+
return getJmsHeaders().getFloat(propertyName);
}
public double getDoubleProperty(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
+
return getJmsHeaders().getDouble(propertyName);
}
public String getStringProperty(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
+
return getJmsHeaders().getString(propertyName);
}
public Object getObjectProperty(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
+
return getJmsHeaders().getObject(propertyName);
}
@@ -436,7 +451,6 @@
getJmsHeaders().remove(propertyName);
}
-
protected void removeProperty(String propertyName) throws JMSException
{
checkPropertyName(propertyName);
@@ -468,7 +482,6 @@
}
}
-
/**
* This forces concrete classes to implement clearBody()
*
@@ -511,6 +524,7 @@
{
buf.append('\n').append(getJmsHeaders().getHeaders());
}
+
return buf.toString();
}
catch (JMSException e)
@@ -519,7 +533,6 @@
}
}
-
public void setUnderlyingMessagePropertiesMap(FieldTable messageProperties)
{
getContentHeaderProperties().setHeaders(messageProperties);
@@ -550,6 +563,7 @@
{
reset();
}
+
return _data;
}
@@ -608,6 +622,7 @@
public byte[] getBytesProperty(AMQShortString propertyName) throws JMSException
{
checkPropertyName(propertyName);
+
return getJmsHeaders().getBytes(propertyName);
}
Modified: incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java?view=diff&rev=506483&r1=506482&r2=506483
==============================================================================
--- incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java (original)
+++ incubator/qpid/branches/perftesting_persistent/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java Mon Feb 12 07:25:17 2007
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,20 +20,40 @@
*/
package org.apache.qpid.client.message;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.jms.JMSException;
+
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.AMQShortString;
-
-import javax.jms.JMSException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.List;
public class MessageFactoryRegistry
{
private final Map<String, MessageFactory> _mimeStringToFactoryMap = new HashMap<String, MessageFactory>();
- private final Map<AMQShortString, MessageFactory> _mimeShortStringToFactoryMap = new HashMap<AMQShortString, MessageFactory>();
+ private final Map<AMQShortString, MessageFactory> _mimeShortStringToFactoryMap =
+ new HashMap<AMQShortString, MessageFactory>();
+
+ /**
+ * Construct a new registry with the default message factories registered
+ * @return a message factory registry
+ */
+ public static MessageFactoryRegistry newDefaultRegistry()
+ {
+ MessageFactoryRegistry mf = new MessageFactoryRegistry();
+ mf.registerFactory(JMSMapMessage.MIME_TYPE, new JMSMapMessageFactory());
+ mf.registerFactory("text/plain", new JMSTextMessageFactory());
+ mf.registerFactory("text/xml", new JMSTextMessageFactory());
+ mf.registerFactory(JMSBytesMessage.MIME_TYPE, new JMSBytesMessageFactory());
+ mf.registerFactory(JMSObjectMessage.MIME_TYPE, new JMSObjectMessageFactory());
+ mf.registerFactory(JMSStreamMessage.MIME_TYPE, new JMSStreamMessageFactory());
+ mf.registerFactory(null, new JMSBytesMessageFactory());
+
+ return mf;
+ }
public void registerFactory(String mimeType, MessageFactory mf)
{
@@ -41,6 +61,7 @@
{
throw new IllegalArgumentException("Message factory must not be null");
}
+
_mimeStringToFactoryMap.put(mimeType, mf);
_mimeShortStringToFactoryMap.put(new AMQShortString(mimeType), mf);
}
@@ -48,6 +69,7 @@
public MessageFactory deregisterFactory(String mimeType)
{
_mimeShortStringToFactoryMap.remove(new AMQShortString(mimeType));
+
return _mimeStringToFactoryMap.remove(mimeType);
}
@@ -62,14 +84,19 @@
* @throws AMQException
* @throws JMSException
*/
- public AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered,
- AMQShortString exchange,
- AMQShortString routingKey,
- ContentHeaderBody contentHeader,
- List bodies) throws AMQException, JMSException
+ public AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered, AMQShortString exchange,
+ AMQShortString routingKey, ContentHeaderBody contentHeader, List bodies)
+ throws AMQException, JMSException
{
- BasicContentHeaderProperties properties = (BasicContentHeaderProperties) contentHeader.properties;
- MessageFactory mf = _mimeShortStringToFactoryMap.get(properties.getContentTypeShortString());
+ BasicContentHeaderProperties properties = (BasicContentHeaderProperties) contentHeader.properties;
+
+ // Get the message content type. This may be null for pure AMQP messages, but will always be set for JMS over
+ // AMQP. When the type is null, it can only be assumed that the message is a byte message.
+ AMQShortString contentTypeShortString = properties.getContentTypeShortString();
+ contentTypeShortString = (contentTypeShortString == null) ? new AMQShortString(JMSBytesMessage.MIME_TYPE)
+ : contentTypeShortString;
+
+ MessageFactory mf = _mimeShortStringToFactoryMap.get(contentTypeShortString);
if (mf == null)
{
throw new AMQException("Unsupport MIME type of " + properties.getContentType());
@@ -86,6 +113,7 @@
{
throw new IllegalArgumentException("Mime type must not be null");
}
+
MessageFactory mf = _mimeStringToFactoryMap.get(mimeType);
if (mf == null)
{
@@ -95,22 +123,5 @@
{
return mf.createMessage();
}
- }
-
- /**
- * Construct a new registry with the default message factories registered
- * @return a message factory registry
- */
- public static MessageFactoryRegistry newDefaultRegistry()
- {
- MessageFactoryRegistry mf = new MessageFactoryRegistry();
- mf.registerFactory(JMSMapMessage.MIME_TYPE, new JMSMapMessageFactory());
- mf.registerFactory("text/plain", new JMSTextMessageFactory());
- mf.registerFactory("text/xml", new JMSTextMessageFactory());
- mf.registerFactory(JMSBytesMessage.MIME_TYPE, new JMSBytesMessageFactory());
- mf.registerFactory(JMSObjectMessage.MIME_TYPE, new JMSObjectMessageFactory());
- mf.registerFactory(JMSStreamMessage.MIME_TYPE, new JMSStreamMessageFactory());
- mf.registerFactory(null, new JMSBytesMessageFactory());
- return mf;
}
}
Modified: incubator/qpid/branches/perftesting_persistent/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Listener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting_persistent/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Listener.java?view=diff&rev=506483&r1=506482&r2=506483
==============================================================================
--- incubator/qpid/branches/perftesting_persistent/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Listener.java (original)
+++ incubator/qpid/branches/perftesting_persistent/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Listener.java Mon Feb 12 07:25:17 2007
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,122 +20,277 @@
*/
package org.apache.qpid.topic;
-import javax.jms.Connection;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
+import java.util.Random;
+
+import javax.jms.*;
+
+import org.apache.log4j.Logger;
+import org.apache.log4j.NDC;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQTopic;
+
+/**
+ * This class has not kept up to date with the topic_listener in the cpp tests. It should provide identical behaviour for
+ * cross testing the java and cpp clients.
+ *
+ * <p/>How the cpp topic_publisher operates:
+ * It publishes text messages to the default topic exchange, on virtual host "/test", on the topic "topic_control", for
+ * the specified number of test messages to be sent.
+ * It publishes a report request message (on same topic), with the header text field "TYPE", value "REPORT_REQUEST",
+ * optionally within a transaction, and waits for the specified number of consumers to reply to this request. The
+ * listeners should reply to this message on a queue named "response", on virtual host "/test", with some sort of message
+ * about the number of messages received and how long it took, although the publisher never looks at the message content.
+ * The publisher then send a message (on the same topic), with the header text field "TYPE", value "TERMINATION_REQUEST",
+ * which the listener should close its connection and terminate upon receipt of.
+ *
+ * @deprecated Use PingPongBouncer instead once the below todo is completed.
+ *
+ * @todo Make the functionality of this class available through PingPongBouncer. Rename PingPongBouncer to
+ * PingListener and make its bouncing functionality optional, either through a switch or as an extending class
+ * called PingBouncer. Want to have as few ping classes as possible with configurable behaviour, re-using code
+ * accross p2p and topic style tests in almost all cases.
+ */
public class Listener implements MessageListener
{
+ private static Logger log = Logger.getLogger(Listener.class);
+
+ private static final char[] DATA = "abcdefghijklmnopqrstuvwxyz".toCharArray();
+ public static final String CONTROL_TOPIC = "topic_control";
+ public static final String RESPONSE_QUEUE = "response";
+
+ private final Topic _topic;
+ //private final Topic _control;
+
+ private final Queue _response;
+
+ private final byte[] _payload;
+
+ /** Holds the connection to listen on. */
private final Connection _connection;
+
+ /** Holds the producer to send control messages on. */
private final MessageProducer _controller;
+
+ /** Holds the JMS session. */
private final javax.jms.Session _session;
- private final MessageFactory _factory;
+
+ /** Holds a flag to indicate that a timer has begun on the first message. Reset when report is sent. */
private boolean init;
+
+ /** Holds the count of messages received by this listener. */
private int count;
- private long start;
- Listener(Connection connection, int ackMode) throws Exception
- {
- this(connection, ackMode, null);
- }
+ /** Used to hold the start time of the first message. */
+ private long start;
+ private static String clientId;
Listener(Connection connection, int ackMode, String name) throws Exception
{
+ log.debug("Listener(Connection connection = " + connection + ", int ackMode = " + ackMode + ", String name = " + name
+ + "): called");
+
_connection = connection;
_session = connection.createSession(false, ackMode);
- _factory = new MessageFactory(_session);
- //register for events
- if(name == null)
+ if (_session instanceof AMQSession)
{
- _factory.createTopicConsumer().setMessageListener(this);
+ _topic = new AMQTopic(CONTROL_TOPIC);
+ //_control = new AMQTopic(CONTROL_TOPIC);
+ _response = new AMQQueue(RESPONSE_QUEUE);
}
else
{
- _factory.createDurableTopicConsumer(name).setMessageListener(this);
+ _topic = _session.createTopic(CONTROL_TOPIC);
+ //_control = _session.createTopic(CONTROL_TOPIC);
+ _response = _session.createQueue(RESPONSE_QUEUE);
}
- _connection.start();
+ int size = 256;
- _controller = _factory.createControlPublisher();
- System.out.println("Waiting for messages " +
- Config.getAckModeDescription(ackMode)
- + (name == null ? "" : " (subscribed with name " + name + " and client id " + connection.getClientID() + ")")
- + "...");
+ _payload = new byte[size];
- }
+ for (int i = 0; i < size; i++)
+ {
+ _payload[i] = (byte) DATA[i % DATA.length];
+ }
- private void shutdown()
- {
- try
+ //register for events
+ if (name == null)
{
- _session.close();
- _connection.stop();
- _connection.close();
+ log.debug("Calling _factory.createTopicConsumer().setMessageListener(this)");
+ createTopicConsumer().setMessageListener(this);
}
- catch(Exception e)
+ else
{
- e.printStackTrace(System.out);
+ log.debug("Calling createDurableTopicConsumer(name).setMessageListener(this)");
+ createDurableTopicConsumer(name).setMessageListener(this);
}
+
+ _connection.start();
+
+ _controller = createControlPublisher();
+ System.out.println("Waiting for messages " + Config.getAckModeDescription(ackMode)
+ +
+ ((name == null)
+ ? "" : (" (subscribed with name " + name + " and client id " + connection.getClientID() + ")"))
+ + "...");
}
- private void report()
+ public static void main(String[] argv) throws Exception
{
- try
- {
- String msg = getReport();
- _controller.send(_factory.createReportResponseMessage(msg));
- System.out.println("Sent report: " + msg);
- }
- catch(Exception e)
+ clientId = "Listener-" + System.currentTimeMillis();
+
+ NDC.push(clientId);
+
+ Config config = new Config();
+ config.setOptions(argv);
+
+ //Connection con = config.createConnection();
+ Connection con =
+ new AMQConnection("amqp://guest:guest@testid/test?brokerlist='" + config.getHost() + ":" + config.getPort()
+ + "'");
+
+ if (config.getClientId() != null)
{
- e.printStackTrace(System.out);
+ con.setClientID(config.getClientId());
}
+
+ new Listener(con, config.getAckMode(), config.getSubscriptionId());
+
+ NDC.pop();
+ NDC.remove();
}
- private String getReport()
+ /**
+ * Checks whether or not a text field on a message has the specified value.
+ *
+ * @param m The message to check.
+ * @param fieldName The name of the field to check.
+ * @param value The expected value of the field to compare with.
+ *
+ * @return <tt>true</tt>If the specified field has the specified value, <tt>fals</tt> otherwise.
+ *
+ * @throws JMSException Any JMSExceptions are allowed to fall through.
+ */
+ private static boolean checkTextField(Message m, String fieldName, String value) throws JMSException
{
- long time = (System.currentTimeMillis() - start);
- return "Received " + count + " in " + time + "ms";
+ log.debug("private static boolean checkTextField(Message m = " + m + ", String fieldName = " + fieldName
+ + ", String value = " + value + "): called");
+
+ String comp = m.getStringProperty(fieldName);
+
+ return (comp != null) && comp.equals(value);
}
public void onMessage(Message message)
{
- if(!init)
+ NDC.push(clientId);
+
+ log.debug("public void onMessage(Message message): called");
+
+ if (!init)
{
- start = System.currentTimeMillis();
+ start = System.nanoTime() / 1000000;
count = 0;
init = true;
}
- if(_factory.isShutdown(message))
+ try
{
- shutdown();
+ if (isShutdown(message))
+ {
+ shutdown();
+ }
+ else if (isReport(message))
+ {
+ //send a report:
+ report();
+ init = false;
+ }
}
- else if(_factory.isReport(message))
+ catch (JMSException e)
{
- //send a report:
- report();
- init = false;
+ log.warn("There was a JMSException during onMessage.", e);
}
- else if (++count % 100 == 0)
+ finally
{
- System.out.println("Received " + count + " messages.");
+ NDC.pop();
}
}
- public static void main(String[] argv) throws Exception
+ Message createReportResponseMessage(String msg) throws JMSException
{
- Config config = new Config();
- config.setOptions(argv);
+ return _session.createTextMessage(msg);
+ }
+
+ boolean isShutdown(Message m) throws JMSException
+ {
+ boolean result = checkTextField(m, "TYPE", "TERMINATION_REQUEST");
+
+ log.debug("isShutdown = " + result);
+
+ return result;
+ }
+
+ boolean isReport(Message m) throws JMSException
+ {
+ boolean result = checkTextField(m, "TYPE", "REPORT_REQUEST");
+
+ log.debug("isReport = " + result);
+
+ return result;
+ }
+
+ MessageConsumer createTopicConsumer() throws Exception
+ {
+ return _session.createConsumer(_topic);
+ }
+
+ MessageConsumer createDurableTopicConsumer(String name) throws Exception
+ {
+ return _session.createDurableSubscriber(_topic, name);
+ }
+
+ MessageProducer createControlPublisher() throws Exception
+ {
+ return _session.createProducer(_response);
+ }
- Connection con = config.createConnection();
- if(config.getClientId() != null)
+ private void shutdown()
+ {
+ try
{
- con.setClientID(config.getClientId());
+ _session.close();
+ _connection.stop();
+ _connection.close();
}
- new Listener(con, config.getAckMode(), config.getSubscriptionId());
+ catch (Exception e)
+ {
+ e.printStackTrace(System.out);
+ }
+ }
+
+ private void report()
+ {
+ try
+ {
+ String msg = getReport();
+ _controller.send(createReportResponseMessage(msg));
+ System.out.println("Sent report: " + msg);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace(System.out);
+ }
+ }
+
+ private String getReport()
+ {
+ long time = ((System.nanoTime() / 1000000) - start);
+
+ return "Received " + count + " in " + time + "ms";
}
}
Modified: incubator/qpid/branches/perftesting_persistent/qpid/java/perftests/src/main/java/org/apache/qpid/topic/MessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting_persistent/qpid/java/perftests/src/main/java/org/apache/qpid/topic/MessageFactory.java?view=diff&rev=506483&r1=506482&r2=506483
==============================================================================
--- incubator/qpid/branches/perftesting_persistent/qpid/java/perftests/src/main/java/org/apache/qpid/topic/MessageFactory.java (original)
+++ incubator/qpid/branches/perftesting_persistent/qpid/java/perftests/src/main/java/org/apache/qpid/topic/MessageFactory.java Mon Feb 12 07:25:17 2007
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,11 +20,11 @@
*/
package org.apache.qpid.topic;
+import javax.jms.*;
+
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQTopic;
-import javax.jms.*;
-
/**
*/
class MessageFactory
@@ -36,7 +36,6 @@
private final Topic _control;
private final byte[] _payload;
-
MessageFactory(Session session) throws JMSException
{
this(session, 256);
@@ -45,24 +44,39 @@
MessageFactory(Session session, int size) throws JMSException
{
_session = session;
- if(session instanceof AMQSession)
+ if (session instanceof AMQSession)
{
- _topic = new AMQTopic("topictest.messages");
+ _topic = new AMQTopic("topic_control");
_control = new AMQTopic("topictest.control");
}
else
{
- _topic = session.createTopic("topictest.messages");
+ _topic = session.createTopic("topic_control");
_control = session.createTopic("topictest.control");
}
+
_payload = new byte[size];
- for(int i = 0; i < size; i++)
+ for (int i = 0; i < size; i++)
{
_payload[i] = (byte) DATA[i % DATA.length];
}
}
+ private static boolean checkText(Message m, String s)
+ {
+ try
+ {
+ return (m instanceof TextMessage) && ((TextMessage) m).getText().equals(s);
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace(System.out);
+
+ return false;
+ }
+ }
+
Topic getTopic()
{
return _topic;
@@ -72,6 +86,7 @@
{
BytesMessage msg = _session.createBytesMessage();
msg.writeBytes(_payload);
+
return msg;
}
@@ -109,6 +124,7 @@
catch (JMSException e)
{
e.printStackTrace(System.out);
+
return e.toString();
}
}
@@ -136,18 +152,5 @@
MessageProducer createControlPublisher() throws Exception
{
return _session.createProducer(_control);
- }
-
- private static boolean checkText(Message m, String s)
- {
- try
- {
- return m instanceof TextMessage && ((TextMessage) m).getText().equals(s);
- }
- catch (JMSException e)
- {
- e.printStackTrace(System.out);
- return false;
- }
}
}