You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2008/08/07 21:25:14 UTC
svn commit: r683683 [1/3] - in /incubator/qpid/trunk/qpid/java:
broker/src/test/java/org/apache/qpid/server/util/
client/src/main/java/org/apache/qpid/client/
client/src/main/java/org/apache/qpid/client/message/
client/src/main/java/org/apache/qpid/fil...
Author: rhs
Date: Thu Aug 7 12:25:12 2008
New Revision: 683683
URL: http://svn.apache.org/viewvc?rev=683683&view=rev
Log:
QPID-1213: Patch from rgodfrey to refactor AbstractJMSMessage and descendants to move AMQP version specific code into delegates and remove unnecessary conversion between 0-8 and 0-10 objects
Added:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegateFactory.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java (with props)
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java (with props)
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestMemoryMessageStore.java
- copied, changed from r682852, incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
Removed:
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
Modified:
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessage.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/filter/PropertyExpression.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/Message.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/ByteBufferMessage.java
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/message/NonQpidObjectMessage.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/txn/TxnBufferTest.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/util/TestApplicationRegistry.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
Modified: incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java?rev=683683&r1=683682&r2=683683&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java Thu Aug 7 12:25:12 2008
@@ -24,16 +24,15 @@
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.SimpleAMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.protocol.InternalTestProtocolSession;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.ConsumerTagNotUniqueException;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.BasicContentHeaderProperties;
Modified: incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java?rev=683683&r1=683682&r2=683683&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java Thu Aug 7 12:25:12 2008
@@ -20,20 +20,15 @@
*/
package org.apache.qpid.server.util;
-import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.MapConfiguration;
import org.apache.qpid.server.exchange.ExchangeFactory;
import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.management.ManagedObjectRegistry;
import org.apache.qpid.server.management.NoopManagedObjectRegistry;
-import org.apache.qpid.server.plugins.PluginManager;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.security.access.ACLPlugin;
import org.apache.qpid.server.security.access.plugins.AllowAll;
-import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
import org.apache.qpid.server.security.auth.database.PropertiesPrincipalDatabaseManager;
-import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java?rev=683683&r1=683682&r2=683683&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java Thu Aug 7 12:25:12 2008
@@ -65,9 +65,9 @@
private static final int IS_EXCLUSIVE_MASK = 0x2;
private static final int IS_AUTODELETE_MASK = 0x4;
- public static final Integer QUEUE_TYPE = Integer.valueOf(1);
- public static final Integer TOPIC_TYPE = Integer.valueOf(2);
- public static final Integer UNKNOWN_TYPE = Integer.valueOf(3);
+ public static final int QUEUE_TYPE = 1;
+ public static final int TOPIC_TYPE = 2;
+ public static final int UNKNOWN_TYPE = 3;
protected AMQDestination(String url) throws URISyntaxException
{
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=683683&r1=683682&r2=683683&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Thu Aug 7 12:25:12 2008
@@ -68,15 +68,7 @@
import org.apache.qpid.client.failover.FailoverNoopSupport;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.client.failover.FailoverRetrySupport;
-import org.apache.qpid.client.message.AbstractJMSMessage;
-import org.apache.qpid.client.message.JMSBytesMessage;
-import org.apache.qpid.client.message.JMSMapMessage;
-import org.apache.qpid.client.message.JMSObjectMessage;
-import org.apache.qpid.client.message.JMSStreamMessage;
-import org.apache.qpid.client.message.JMSTextMessage;
-import org.apache.qpid.client.message.MessageFactoryRegistry;
-import org.apache.qpid.client.message.ReturnMessage;
-import org.apache.qpid.client.message.UnprocessedMessage;
+import org.apache.qpid.client.message.*;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
import org.apache.qpid.client.state.AMQStateManager;
@@ -109,6 +101,8 @@
*/
public abstract class AMQSession extends Closeable implements Session, QueueSession, TopicSession
{
+
+
public static final class IdToConsumerMap
{
private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16];
@@ -401,7 +395,7 @@
* @param defaultPrefetchHighMark The maximum number of messages to prefetched before suspending the session.
* @param defaultPrefetchLowMark The number of prefetched messages at which to resume the session.
*/
- AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
+ protected AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark)
{
@@ -515,7 +509,7 @@
public BytesMessage createBytesMessage() throws JMSException
{
checkNotClosed();
- return new JMSBytesMessage();
+ return new JMSBytesMessage(getMessageDelegateFactory());
}
/**
@@ -629,7 +623,7 @@
// Ensure we only try and close an open session.
if (!_closed.getAndSet(true))
{
- synchronized (_connection.getFailoverMutex())
+ synchronized (getFailoverMutex())
{
// We must close down all producers and consumers in an orderly fashion. This is the only method
// that can be called from a different thread of control from the one controlling the session.
@@ -691,7 +685,7 @@
if (!_closed.getAndSet(true))
{
- synchronized (_connection.getFailoverMutex())
+ synchronized (getFailoverMutex())
{
synchronized (_messageDeliveryLock)
{
@@ -944,7 +938,7 @@
public MapMessage createMapMessage() throws JMSException
{
checkNotClosed();
- return new JMSMapMessage();
+ return new JMSMapMessage(getMessageDelegateFactory());
}
public javax.jms.Message createMessage() throws JMSException
@@ -955,7 +949,7 @@
public ObjectMessage createObjectMessage() throws JMSException
{
checkNotClosed();
- return (ObjectMessage) new JMSObjectMessage();
+ return (ObjectMessage) new JMSObjectMessage(getMessageDelegateFactory());
}
public ObjectMessage createObjectMessage(Serializable object) throws JMSException
@@ -1158,11 +1152,11 @@
// with the correct error code and text this is cleary WRONG as the instanceof check below will fail.
// We need to determin here if the connection should be
- synchronized (_connection.getFailoverMutex())
+ synchronized (getFailoverMutex())
{
checkNotClosed();
- return new JMSStreamMessage();
+ return new JMSStreamMessage(getMessageDelegateFactory());
}
}
@@ -1215,14 +1209,19 @@
public TextMessage createTextMessage() throws JMSException
{
- synchronized (_connection.getFailoverMutex())
+ synchronized (getFailoverMutex())
{
checkNotClosed();
- return new JMSTextMessage();
+ return new JMSTextMessage(getMessageDelegateFactory());
}
}
+ protected Object getFailoverMutex()
+ {
+ return _connection.getFailoverMutex();
+ }
+
public TextMessage createTextMessage(String text) throws JMSException
{
@@ -1459,7 +1458,7 @@
}
}
- abstract void sendRecover() throws AMQException, FailoverException;
+ protected abstract void sendRecover() throws AMQException, FailoverException;
public void rejectMessage(UnprocessedMessage message, boolean requeue)
{
@@ -2181,7 +2180,7 @@
}
- abstract Long requestQueueDepth(AMQDestination amqd) throws AMQException, FailoverException;
+ protected abstract Long requestQueueDepth(AMQDestination amqd) throws AMQException, FailoverException;
/**
* Declares the named exchange and type of exchange.
@@ -2887,9 +2886,11 @@
}
}
- abstract boolean tagLE(long tag1, long tag2);
+ protected abstract boolean tagLE(long tag1, long tag2);
+
+ protected abstract boolean updateRollbackMark(long current, long deliveryTag);
- abstract boolean updateRollbackMark(long current, long deliveryTag);
+ public abstract AMQMessageDelegateFactory getMessageDelegateFactory();
/*public void requestAccess(AMQShortString realm, boolean exclusive, boolean passive, boolean active, boolean write,
boolean read) throws AMQException
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=683683&r1=683682&r2=683683&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Thu Aug 7 12:25:12 2008
@@ -27,6 +27,7 @@
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.FiledTableSupport;
+import org.apache.qpid.client.message.AMQMessageDelegateFactory;
import org.apache.qpid.util.Serial;
import org.apache.qpid.nclient.Session;
import org.apache.qpid.nclient.util.MessagePartListenerAdapter;
@@ -773,7 +774,7 @@
return subscriber;
}
- Long requestQueueDepth(AMQDestination amqd)
+ protected Long requestQueueDepth(AMQDestination amqd)
{
return getQpidSession().queueQuery(amqd.getQueueName()).get().getMessageCount();
}
@@ -821,14 +822,19 @@
}
}
- final boolean tagLE(long tag1, long tag2)
+ protected final boolean tagLE(long tag1, long tag2)
{
return Serial.le((int) tag1, (int) tag2);
}
- final boolean updateRollbackMark(long currentMark, long deliveryTag)
+ protected final boolean updateRollbackMark(long currentMark, long deliveryTag)
{
return Serial.lt((int) currentMark, (int) deliveryTag);
}
+ public AMQMessageDelegateFactory getMessageDelegateFactory()
+ {
+ return AMQMessageDelegateFactory.FACTORY_0_10;
+ }
+
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=683683&r1=683682&r2=683683&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Thu Aug 7 12:25:12 2008
@@ -29,6 +29,7 @@
import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.client.failover.FailoverRetrySupport;
import org.apache.qpid.client.message.MessageFactoryRegistry;
+import org.apache.qpid.client.message.AMQMessageDelegateFactory;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
import org.apache.qpid.common.AMQPFilterTypes;
@@ -470,7 +471,7 @@
}
- Long requestQueueDepth(AMQDestination amqd) throws AMQException, FailoverException
+ protected Long requestQueueDepth(AMQDestination amqd) throws AMQException, FailoverException
{
AMQFrame queueDeclare =
getMethodRegistry().createQueueDeclareBody(getTicket(),
@@ -486,14 +487,19 @@
return okHandler._messageCount;
}
- final boolean tagLE(long tag1, long tag2)
+ protected final boolean tagLE(long tag1, long tag2)
{
return tag1 <= tag2;
}
- final boolean updateRollbackMark(long currentMark, long deliveryTag)
+ protected final boolean updateRollbackMark(long currentMark, long deliveryTag)
{
return false;
}
+ public AMQMessageDelegateFactory getMessageDelegateFactory()
+ {
+ return AMQMessageDelegateFactory.FACTORY_0_8;
+ }
+
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=683683&r1=683682&r2=683683&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Thu Aug 7 12:25:12 2008
@@ -25,6 +25,7 @@
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
+import org.apache.qpid.client.message.AMQMessageDelegateFactory;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.*;
import org.apache.qpid.jms.MessageConsumer;
@@ -683,7 +684,7 @@
try
{
- AbstractJMSMessage jmsMessage = createJMSMessageFromUnprocessedMessage(messageFrame);
+ AbstractJMSMessage jmsMessage = createJMSMessageFromUnprocessedMessage(_session.getMessageDelegateFactory(), messageFrame);
if (debug)
{
@@ -720,7 +721,7 @@
}
}
- public abstract AbstractJMSMessage createJMSMessageFromUnprocessedMessage(UnprocessedMessage<H, B> messageFrame)
+ public abstract AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, UnprocessedMessage<H, B> messageFrame)
throws Exception;
/** @param jmsMessage this message has already been processed so can't redo preDeliver */
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=683683&r1=683682&r2=683683&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Thu Aug 7 12:25:12 2008
@@ -24,12 +24,10 @@
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.AMQException;
-import org.apache.qpid.jms.*;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.api.Message;
import org.apache.qpid.transport.*;
-import org.apache.qpid.transport.Session;
import org.apache.qpid.QpidException;
import org.apache.qpid.filter.MessageFilter;
import org.apache.qpid.filter.JMSSelectorFilter;
@@ -269,12 +267,11 @@
}
@Override public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(
- UnprocessedMessage<Struct[], ByteBuffer> messageFrame) throws Exception
+ AMQMessageDelegateFactory delegateFactory, UnprocessedMessage<Struct[], ByteBuffer> messageFrame) throws Exception
{
return _messageFactory.createMessage(messageFrame.getDeliveryTag(), messageFrame.isRedelivered(),
- messageFrame.getExchange(), messageFrame.getRoutingKey(),
- messageFrame.getContentHeader(), messageFrame.getBodies(),
- messageFrame.getReplyToURL());
+ messageFrame.getContentHeader(), messageFrame.getBodies()
+ );
}
// private methods
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java?rev=683683&r1=683682&r2=683683&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java Thu Aug 7 12:25:12 2008
@@ -20,8 +20,6 @@
*/
package org.apache.qpid.client;
-import java.util.concurrent.TimeUnit;
-
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
@@ -31,6 +29,7 @@
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
+import org.apache.qpid.client.message.AMQMessageDelegateFactory;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.filter.JMSSelectorFilter;
import org.apache.qpid.framing.AMQFrame;
@@ -82,7 +81,7 @@
}
}
- public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(UnprocessedMessage<ContentHeaderBody, ContentBody> messageFrame)throws Exception
+ public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, UnprocessedMessage<ContentHeaderBody, ContentBody> messageFrame)throws Exception
{
return _messageFactory.createMessage(messageFrame.getDeliveryTag(),
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?rev=683683&r1=683682&r2=683683&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Thu Aug 7 12:25:12 2008
@@ -31,23 +31,14 @@
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.ObjectMessage;
-import javax.jms.Queue;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
-import javax.jms.Topic;
-import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageConverter;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.CompositeAMQDataBlock;
import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.ExchangeDeclareBody;
import org.apache.qpid.util.UUIDGen;
import org.apache.qpid.util.UUIDs;
import org.slf4j.Logger;
@@ -369,27 +360,27 @@
if (message instanceof BytesMessage)
{
- newMessage = new MessageConverter((BytesMessage) message).getConvertedMessage();
+ newMessage = new MessageConverter(_session, (BytesMessage) message).getConvertedMessage();
}
else if (message instanceof MapMessage)
{
- newMessage = new MessageConverter((MapMessage) message).getConvertedMessage();
+ newMessage = new MessageConverter(_session, (MapMessage) message).getConvertedMessage();
}
else if (message instanceof ObjectMessage)
{
- newMessage = new MessageConverter((ObjectMessage) message).getConvertedMessage();
+ newMessage = new MessageConverter(_session, (ObjectMessage) message).getConvertedMessage();
}
else if (message instanceof TextMessage)
{
- newMessage = new MessageConverter((TextMessage) message).getConvertedMessage();
+ newMessage = new MessageConverter(_session, (TextMessage) message).getConvertedMessage();
}
else if (message instanceof StreamMessage)
{
- newMessage = new MessageConverter((StreamMessage) message).getConvertedMessage();
+ newMessage = new MessageConverter(_session, (StreamMessage) message).getConvertedMessage();
}
else
{
- newMessage = new MessageConverter(message).getConvertedMessage();
+ newMessage = new MessageConverter(_session, message).getConvertedMessage();
}
if (newMessage != null)
@@ -460,7 +451,7 @@
UUID messageId = null;
if (_disableMessageId)
{
- message.setJMSMessageID(null);
+ message.setJMSMessageID((UUID)null);
}
else
{
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java?rev=683683&r1=683682&r2=683683&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java Thu Aug 7 12:25:12 2008
@@ -29,6 +29,7 @@
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.FiledTableSupport;
+import org.apache.qpid.client.message.AMQMessageDelegate_0_10;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
@@ -72,9 +73,15 @@
boolean immediate, boolean wait) throws JMSException
{
message.prepareForSending();
- if (message.get010Message() == null)
+
+ AMQMessageDelegate_0_10 delegate = (AMQMessageDelegate_0_10) message.getDelegate();
+
+ org.apache.qpid.api.Message underlyingMessage = message.get010Message();
+ if (underlyingMessage == null)
{
- message.set010Message(new ByteBufferMessage());
+ underlyingMessage = new ByteBufferMessage(delegate.getMessageProperties(), delegate.getDeliveryProperties());
+ message.set010Message(underlyingMessage);
+
}
// force a rebuild of the 0-10 message if data has changed
if (message.getData() == null)
@@ -82,8 +89,8 @@
message.dataChanged();
}
- DeliveryProperties deliveryProp = message.get010Message().getDeliveryProperties();
- MessageProperties messageProps = message.get010Message().getMessageProperties();
+ DeliveryProperties deliveryProp = underlyingMessage.getDeliveryProperties();
+ MessageProperties messageProps = underlyingMessage.getMessageProperties();
if (messageId != null)
{
@@ -144,20 +151,10 @@
deliveryProp.setRoutingKey(routingKey);
}
- BasicContentHeaderProperties contentHeaderProperties = message.getContentHeaderProperties();
- if (contentHeaderProperties.reset())
- {
- // set the application properties
- messageProps.setContentType(contentHeaderProperties.getContentType().toString());
- messageProps.setContentLength(message.getContentLength());
+ messageProps.setContentLength(message.getContentLength());
- AMQShortString correlationID = contentHeaderProperties.getCorrelationId();
- if (correlationID != null)
- {
- messageProps.setCorrelationId(correlationID.getBytes());
- }
- String replyToURL = contentHeaderProperties.getReplyToAsString();
+ /* String replyToURL = contentHeaderProperties.getReplyToAsString();
if (replyToURL != null)
{
if(_logger.isDebugEnabled())
@@ -179,31 +176,8 @@
}
messageProps.setReplyTo(new ReplyTo(dest.getExchangeName().toString(), dest.getRoutingKey().toString()));
}
+*/
- Map<String,Object> map = null;
-
- if (contentHeaderProperties.getHeaders() != null)
- {
- //JMS_QPID_DESTTYPE is always set but useles so this is a temporary fix
- contentHeaderProperties.getHeaders().remove(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName());
- map = FiledTableSupport.convertToMap(contentHeaderProperties.getHeaders());
- }
-
- AMQShortString type = contentHeaderProperties.getType();
- if (type != null)
- {
- if (map == null)
- {
- map = new HashMap<String,Object>();
- }
- map.put(AbstractJMSMessage.JMS_TYPE, type.toString());
- }
-
- if (map != null)
- {
- messageProps.setApplicationHeaders(map);
- }
- }
// send the message
try
@@ -221,7 +195,7 @@
try
{
ssn.messageTransfer(destination.getExchangeName().toString(),
- message.get010Message(),
+ underlyingMessage,
ssn.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
ssn.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE);
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java?rev=683683&r1=683682&r2=683683&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java Thu Aug 7 12:25:12 2008
@@ -27,6 +27,8 @@
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.message.AMQMessageDelegate;
+import org.apache.qpid.client.message.AMQMessageDelegate_0_8;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.BasicConsumeBody;
@@ -81,7 +83,8 @@
message.prepareForSending();
ByteBuffer payload = message.getData();
- BasicContentHeaderProperties contentHeaderProperties = message.getContentHeaderProperties();
+ AMQMessageDelegate_0_8 delegate = (AMQMessageDelegate_0_8) message.getDelegate();
+ BasicContentHeaderProperties contentHeaderProperties = delegate.getContentHeaderProperties();
if (!_disableTimestamps)
{
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessage.java?rev=683683&r1=683682&r2=683683&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessage.java Thu Aug 7 12:25:12 2008
@@ -1,135 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.client.message;
-
-import javax.jms.JMSException;
-
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.framing.ContentHeaderProperties;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-
-import java.math.BigDecimal;
-
-public class AMQMessage
-{
- protected ContentHeaderProperties _contentHeaderProperties;
-
- /** If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required */
- protected AMQSession _session;
-
- protected final long _deliveryTag;
-
- public AMQMessage(ContentHeaderProperties properties, long deliveryTag)
- {
- _contentHeaderProperties = properties;
- _deliveryTag = deliveryTag;
- }
-
- public AMQMessage(ContentHeaderProperties properties)
- {
- this(properties, -1);
- }
-
- /**
- * The session is set when CLIENT_ACKNOWLEDGE mode is used so that the CHANNEL ACK can be sent when the user calls
- * acknowledge()
- *
- * @param s the AMQ session that delivered this message
- */
- public void setAMQSession(AMQSession s)
- {
- _session = s;
- }
-
- public AMQSession getAMQSession()
- {
- return _session;
- }
-
- /**
- * Get the AMQ message number assigned to this message
- *
- * @return the message number
- */
- public long getDeliveryTag()
- {
- return _deliveryTag;
- }
-
- /** Invoked prior to sending the message. Allows the message to be modified if necessary before sending. */
- public void prepareForSending() throws JMSException
- {
- }
-
- public FieldTable getPropertyHeaders()
- {
- return ((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders();
- }
-
- public void setDecimalProperty(AMQShortString propertyName, BigDecimal bd) throws JMSException
- {
- getPropertyHeaders().setDecimal(propertyName, bd);
- }
-
- public void setIntProperty(AMQShortString propertyName, int i) throws JMSException
- {
- getPropertyHeaders().setInteger(propertyName, new Integer(i));
- }
-
- public void setLongStringProperty(AMQShortString propertyName, String value)
- {
- getPropertyHeaders().setString(propertyName, value);
- }
-
- public void setTimestampProperty(AMQShortString propertyName, long value)
- {
- getPropertyHeaders().setTimestamp(propertyName, value);
- }
-
- public void setVoidProperty(AMQShortString propertyName)
- {
- getPropertyHeaders().setVoid(propertyName);
- }
-
- //** Getters
-
- public BigDecimal getDecimalProperty(AMQShortString propertyName) throws JMSException
- {
- return getPropertyHeaders().getDecimal(propertyName);
- }
-
- public int getIntegerProperty(AMQShortString propertyName) throws JMSException
- {
- return getPropertyHeaders().getInteger(propertyName);
- }
-
- public String getLongStringProperty(AMQShortString propertyName)
- {
- return getPropertyHeaders().getString(propertyName);
- }
-
- public Long getTimestampProperty(AMQShortString propertyName)
- {
- return getPropertyHeaders().getTimestamp(propertyName);
- }
-}
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java?rev=683683&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java Thu Aug 7 12:25:12 2008
@@ -0,0 +1,138 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.client.message;
+
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import java.util.Enumeration;
+import java.util.UUID;
+
+public interface AMQMessageDelegate
+{
+ void acknowledgeThis() throws JMSException;
+
+ String getJMSMessageID() throws JMSException;
+
+ void setJMSMessageID(String string) throws JMSException;
+
+ long getJMSTimestamp() throws JMSException;
+
+ void setJMSTimestamp(long l) throws JMSException;
+
+ byte[] getJMSCorrelationIDAsBytes() throws JMSException;
+
+ void setJMSCorrelationIDAsBytes(byte[] bytes) throws JMSException;
+
+ void setJMSCorrelationID(String string) throws JMSException;
+
+ String getJMSCorrelationID() throws JMSException;
+
+ Destination getJMSReplyTo() throws JMSException;
+
+ void setJMSReplyTo(Destination destination) throws JMSException;
+
+ Destination getJMSDestination() throws JMSException;
+
+ int getJMSDeliveryMode() throws JMSException;
+
+ void setJMSDeliveryMode(int i) throws JMSException;
+
+ String getJMSType() throws JMSException;
+
+ void setJMSType(String string) throws JMSException;
+
+ long getJMSExpiration() throws JMSException;
+
+ void setJMSExpiration(long l) throws JMSException;
+
+ int getJMSPriority() throws JMSException;
+
+ void setJMSPriority(int i) throws JMSException;
+
+ void clearProperties() throws JMSException;
+
+ boolean propertyExists(String string) throws JMSException;
+
+ boolean getBooleanProperty(String string) throws JMSException;
+
+ byte getByteProperty(String string) throws JMSException;
+
+ short getShortProperty(String string) throws JMSException;
+
+ int getIntProperty(String string) throws JMSException;
+
+ long getLongProperty(String string) throws JMSException;
+
+ float getFloatProperty(String string) throws JMSException;
+
+ double getDoubleProperty(String string) throws JMSException;
+
+ String getStringProperty(String string) throws JMSException;
+
+ Object getObjectProperty(String string) throws JMSException;
+
+ Enumeration getPropertyNames() throws JMSException;
+
+ void setBooleanProperty(String string, boolean b) throws JMSException;
+
+ void setByteProperty(String string, byte b) throws JMSException;
+
+ void setShortProperty(String string, short i) throws JMSException;
+
+ void setIntProperty(String string, int i) throws JMSException;
+
+ void setLongProperty(String string, long l) throws JMSException;
+
+ void setFloatProperty(String string, float v) throws JMSException;
+
+ void setDoubleProperty(String string, double v) throws JMSException;
+
+ void setStringProperty(String string, String string1) throws JMSException;
+
+ void setObjectProperty(String string, Object object) throws JMSException;
+
+ void acknowledge() throws JMSException;
+
+ public void setJMSDestination(Destination destination);
+
+ public void setContentType(String contentType);
+ public String getContentType();
+
+ public void setEncoding(String encoding);
+ public String getEncoding();
+
+
+ String getReplyToString();
+
+ void removeProperty(final String propertyName) throws JMSException;
+
+ void setAMQSession(final AMQSession s);
+
+ AMQSession getAMQSession();
+
+ long getDeliveryTag();
+
+ void setJMSMessageID(final UUID messageId) throws JMSException;
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegateFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegateFactory.java?rev=683683&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegateFactory.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegateFactory.java Thu Aug 7 12:25:12 2008
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.client.message;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.AMQException;
+
+public interface AMQMessageDelegateFactory<D extends AMQMessageDelegate>
+{
+ public static AMQMessageDelegateFactory DEFAULT_FACTORY = null;
+
+ public static AMQMessageDelegateFactory<AMQMessageDelegate_0_8> FACTORY_0_8 =
+ new AMQMessageDelegateFactory<AMQMessageDelegate_0_8>()
+ {
+ public AMQMessageDelegate_0_8 createDelegate()
+ {
+ return new AMQMessageDelegate_0_8();
+ }
+ };
+
+ public static AMQMessageDelegateFactory<AMQMessageDelegate_0_10> FACTORY_0_10 =
+ new AMQMessageDelegateFactory<AMQMessageDelegate_0_10>()
+ {
+ public AMQMessageDelegate_0_10 createDelegate()
+ {
+ return new AMQMessageDelegate_0_10();
+ }
+ };
+
+
+ public D createDelegate();
+
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegateFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java?rev=683683&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java Thu Aug 7 12:25:12 2008
@@ -0,0 +1,900 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.client.message;
+
+import org.apache.commons.collections.map.ReferenceMap;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.CustomJMSXProperty;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.client.AMQUndefinedDestination;
+import org.apache.qpid.client.JMSAMQException;
+import org.apache.qpid.framing.ContentHeaderProperties;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQPInvalidClassException;
+import org.apache.qpid.jms.Message;
+import org.apache.qpid.url.BindingURL;
+import org.apache.qpid.url.AMQBindingURL;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.transport.*;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageNotWriteableException;
+import javax.jms.MessageFormatException;
+import javax.jms.DeliveryMode;
+import java.util.*;
+import java.net.URISyntaxException;
+import java.nio.charset.Charset;
+
+public class AMQMessageDelegate_0_10 implements AMQMessageDelegate
+{
+ private static final Map<ReplyTo, Destination> _destinationCache = Collections.synchronizedMap(new ReferenceMap());
+
+ public static final String JMS_TYPE = "x-jms-type";
+
+
+ private boolean _readableProperties = false;
+
+ private Destination _destination;
+
+
+ private MessageProperties _messageProps;
+ private DeliveryProperties _deliveryProps;
+ /** If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required */
+ private AMQSession _session;
+ private final long _deliveryTag;
+
+ private static Map<String,Integer> _exchangeTypeMap = new HashMap<String, Integer>();
+
+ static
+ {
+ // TODO - XXX - Need to add to this map when we find an exchange we don't know about
+
+ _exchangeTypeMap.put(null, AMQDestination.QUEUE_TYPE);
+ _exchangeTypeMap.put("amq.direct", AMQDestination.QUEUE_TYPE);
+ _exchangeTypeMap.put("", AMQDestination.QUEUE_TYPE);
+ _exchangeTypeMap.put("amq.topic", AMQDestination.TOPIC_TYPE);
+ _exchangeTypeMap.put("amq.fanout", AMQDestination.TOPIC_TYPE);
+
+
+ }
+
+ protected AMQMessageDelegate_0_10()
+ {
+ this(new MessageProperties(), new DeliveryProperties(), -1);
+ _readableProperties = false;
+
+
+ }
+
+ protected AMQMessageDelegate_0_10(long deliveryTag, MessageProperties messageProps, DeliveryProperties deliveryProps, AMQShortString exchange,
+ AMQShortString routingKey) throws AMQException
+ {
+ this(messageProps, deliveryProps, deliveryTag);
+
+
+ AMQDestination dest;
+
+ dest = new AMQUndefinedDestination(exchange, routingKey, null);
+
+ // Destination dest = AMQDestination.createDestination(url);
+ setJMSDestination(dest);
+
+
+
+ }
+
+ protected AMQMessageDelegate_0_10(MessageProperties messageProps, DeliveryProperties deliveryProps, long deliveryTag)
+ {
+ _messageProps = messageProps;
+ _deliveryProps = deliveryProps;
+ _deliveryTag = deliveryTag;
+ _readableProperties = (_messageProps != null);
+
+ }
+
+
+ public String getJMSMessageID() throws JMSException
+ {
+ UUID id = _messageProps.getMessageId();
+ return id == null ? null : "ID:" + id;
+ }
+
+ public void setJMSMessageID(String messageId) throws JMSException
+ {
+ if(messageId == null)
+ {
+ _messageProps.clearMessageId();
+ }
+ else
+ {
+ if(messageId.startsWith("ID:"))
+ {
+ try
+ {
+ _messageProps.setMessageId(UUID.fromString(messageId.substring(3)));
+ }
+ catch(IllegalArgumentException ex)
+ {
+ throw new JMSException("MessageId '"+messageId+"' is not of the correct format, it must be ID: followed by a UUID");
+ }
+ }
+ else
+ {
+ throw new JMSException("MessageId '"+messageId+"' is not of the correct format, it must be ID: followed by a UUID");
+ }
+ }
+ }
+
+ public void setJMSMessageID(UUID messageId) throws JMSException
+ {
+ if(messageId == null)
+ {
+ _messageProps.clearMessageId();
+ }
+ else
+ {
+ _messageProps.setMessageId(messageId);
+ }
+ }
+
+
+ public long getJMSTimestamp() throws JMSException
+ {
+ return _deliveryProps.getTimestamp();
+ }
+
+ public void setJMSTimestamp(long timestamp) throws JMSException
+ {
+ _deliveryProps.setTimestamp(timestamp);
+ }
+
+ public byte[] getJMSCorrelationIDAsBytes() throws JMSException
+ {
+ return _messageProps.getCorrelationId();
+ }
+
+ public void setJMSCorrelationIDAsBytes(byte[] bytes) throws JMSException
+ {
+ _messageProps.setCorrelationId(bytes);
+ }
+
+ public void setJMSCorrelationID(String correlationId) throws JMSException
+ {
+
+ setJMSCorrelationIDAsBytes(correlationId == null ? null : correlationId.getBytes());
+ }
+
+ public String getJMSCorrelationID() throws JMSException
+ {
+
+ byte[] correlationIDAsBytes = getJMSCorrelationIDAsBytes();
+ return correlationIDAsBytes == null ? null : new String(correlationIDAsBytes);
+ }
+
+ public Destination getJMSReplyTo()
+ {
+ ReplyTo replyTo = _messageProps.getReplyTo();
+
+ if (replyTo == null)
+ {
+ return null;
+ }
+ else
+ {
+ Destination dest = _destinationCache.get(replyTo);
+ if (dest == null)
+ {
+ String exchange = replyTo.getExchange();
+ String routingKey = replyTo.getRoutingKey();
+
+ int type = _exchangeTypeMap.get(exchange);
+
+ switch(type)
+ {
+ case AMQDestination.QUEUE_TYPE:
+ dest = new AMQQueue(new AMQShortString(exchange), new AMQShortString(routingKey), new AMQShortString(routingKey));
+ break;
+ case AMQDestination.TOPIC_TYPE:
+ dest = new AMQTopic(new AMQShortString(exchange), new AMQShortString(routingKey), null);
+ break;
+ default:
+ dest = new AMQUndefinedDestination(new AMQShortString(exchange), new AMQShortString(routingKey), null);
+ }
+
+
+
+ _destinationCache.put(replyTo, dest);
+ }
+
+ return dest;
+ }
+ }
+
+ public void setJMSReplyTo(Destination destination) throws JMSException
+ {
+ if (destination == null)
+ {
+ throw new IllegalArgumentException("Null destination not allowed");
+ }
+
+ if (!(destination instanceof AMQDestination))
+ {
+ throw new IllegalArgumentException(
+ "ReplyTo destination may only be an AMQDestination - passed argument was type " + destination.getClass());
+ }
+
+ final AMQDestination amqd = (AMQDestination) destination;
+
+ final ReplyTo replyTo = new ReplyTo(amqd.getExchangeName().toString(), amqd.getRoutingKey().toString());
+ _destinationCache.put(replyTo, destination);
+ _messageProps.setReplyTo(replyTo);
+
+ }
+
+ public Destination getJMSDestination() throws JMSException
+ {
+ return _destination;
+ }
+
+ public void setJMSDestination(Destination destination)
+ {
+ _destination = destination;
+ }
+
+ public void setContentType(String contentType)
+ {
+ _messageProps.setContentType(contentType);
+ }
+
+ public String getContentType()
+ {
+ return _messageProps.getContentType();
+ }
+
+ public void setEncoding(String encoding)
+ {
+ if(encoding == null || encoding.length() == 0)
+ {
+ _messageProps.clearContentEncoding();
+ }
+ else
+ {
+ _messageProps.setContentEncoding(encoding);
+ }
+ }
+
+ public String getEncoding()
+ {
+ return _messageProps.getContentEncoding();
+ }
+
+ public String getReplyToString()
+ {
+ Destination replyTo = getJMSReplyTo();
+ if(replyTo != null)
+ {
+ return ((AMQDestination)replyTo).toURL();
+ }
+ else
+ {
+ return null;
+ }
+
+ }
+
+ public int getJMSDeliveryMode() throws JMSException
+ {
+
+ MessageDeliveryMode deliveryMode = _deliveryProps.getDeliveryMode();
+ if(deliveryMode != null)
+ {
+ switch(deliveryMode)
+ {
+ case PERSISTENT :
+ return DeliveryMode.PERSISTENT;
+ case NON_PERSISTENT:
+ return DeliveryMode.NON_PERSISTENT;
+ default:
+ throw new JMSException("Unknown Message Delivery Mode: " + _deliveryProps.getDeliveryMode());
+ }
+ }
+ else
+ {
+ return Message.DEFAULT_DELIVERY_MODE;
+ }
+
+ }
+
+ public void setJMSDeliveryMode(int deliveryMode) throws JMSException
+ {
+ switch(deliveryMode)
+ {
+ case DeliveryMode.PERSISTENT:
+ _deliveryProps.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
+ break;
+ case DeliveryMode.NON_PERSISTENT:
+ _deliveryProps.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
+ break;
+ default:
+ throw new JMSException("Unknown JMS Delivery Mode: " + deliveryMode);
+ }
+
+ }
+
+
+ public String getJMSType() throws JMSException
+ {
+ if(getApplicationHeaders().containsKey(JMS_TYPE))
+ {
+ return getStringProperty(JMS_TYPE);
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ private Map<String, Object> getApplicationHeaders()
+ {
+ Map<String, Object> map = _messageProps.getApplicationHeaders();
+ return map == null ? Collections.EMPTY_MAP : map;
+ }
+
+ public void setJMSType(String type) throws JMSException
+ {
+ Map<String, Object> headers = _messageProps.getApplicationHeaders();
+ if(type == null)
+ {
+ if(headers != null)
+ {
+ headers.remove(JMS_TYPE);
+ }
+ }
+ else
+ {
+ if(headers == null)
+ {
+ headers = new HashMap<String,Object>();
+ _messageProps.setApplicationHeaders(headers);
+
+ }
+ headers.put(JMS_TYPE, type);
+ }
+ }
+
+ public long getJMSExpiration() throws JMSException
+ {
+ return _deliveryProps.getExpiration();
+ }
+
+ public void setJMSExpiration(long l) throws JMSException
+ {
+ _deliveryProps.setExpiration(l);
+ }
+
+
+
+ public boolean propertyExists(String propertyName) throws JMSException
+ {
+ return getApplicationHeaders().containsKey(propertyName);
+ }
+
+ public boolean getBooleanProperty(String propertyName) throws JMSException
+ {
+ checkPropertyName(propertyName);
+
+ Object o = getApplicationHeaders().get(propertyName);
+
+ if(o instanceof Boolean)
+ {
+ return ((Boolean)o).booleanValue();
+ }
+ else if(o instanceof String)
+ {
+ return Boolean.valueOf((String) o).booleanValue();
+ }
+ else if(getApplicationHeaders().containsKey(propertyName))
+ {
+ throw new MessageFormatException("getBooleanProperty(\""+propertyName+"\") failed as value is not boolean: " + o);
+ }
+ else
+ {
+ return Boolean.valueOf(null);
+ }
+ }
+
+ public byte getByteProperty(String propertyName) throws JMSException
+ {
+ checkPropertyName(propertyName);
+
+ Map<String, Object> propertyMap = getApplicationHeaders();
+
+ Object o = propertyMap.get(propertyName);
+
+ if(o instanceof Byte)
+ {
+ return ((Byte)o).byteValue();
+ }
+ else if(o instanceof String)
+ {
+ return Byte.valueOf((String) o).byteValue();
+ }
+ else if(getApplicationHeaders().containsKey(propertyName))
+ {
+ throw new MessageFormatException("getByteProperty(\""+propertyName+"\") failed as value is not a byte: " + o);
+ }
+ else
+ {
+ return Byte.valueOf(null);
+ }
+ }
+
+ public short getShortProperty(String propertyName) throws JMSException
+ {
+ checkPropertyName(propertyName);
+
+ Map<String, Object> propertyMap = getApplicationHeaders();
+
+ Object o = propertyMap.get(propertyName);
+
+ if(o instanceof Short)
+ {
+ return ((Short)o).shortValue();
+ }
+ else
+ {
+ try
+ {
+ return Short.valueOf(getByteProperty(propertyName));
+ }
+ catch(MessageFormatException e)
+ {
+ throw new MessageFormatException("getShortProperty(\""+propertyName+"\") failed as value is not a short: " + o);
+ }
+ }
+
+
+ }
+
+ public int getIntProperty(String propertyName) throws JMSException
+ {
+ checkPropertyName(propertyName);
+
+ Map<String, Object> propertyMap = getApplicationHeaders();
+
+ Object o = propertyMap.get(propertyName);
+
+ if(o instanceof Integer)
+ {
+ return ((Integer)o).intValue();
+ }
+ else
+ {
+ try
+ {
+ return Integer.valueOf(getShortProperty(propertyName));
+ }
+ catch(MessageFormatException e)
+ {
+ throw new MessageFormatException("getIntProperty(\""+propertyName+"\") failed as value is not an int: " + o);
+ }
+
+ }
+ }
+
+ public long getLongProperty(String propertyName) throws JMSException
+ {
+ checkPropertyName(propertyName);
+
+ Map<String, Object> propertyMap = getApplicationHeaders();
+
+ Object o = propertyMap.get(propertyName);
+
+ if(o instanceof Long)
+ {
+ return ((Long)o).longValue();
+ }
+ else
+ {
+ try
+ {
+ return Long.valueOf(getIntProperty(propertyName));
+ }
+ catch(MessageFormatException e)
+ {
+ throw new MessageFormatException("getLongProperty(\""+propertyName+"\") failed as value is not a long: " + o);
+ }
+
+ }
+ }
+
+ public float getFloatProperty(String propertyName) throws JMSException
+ {
+ checkPropertyName(propertyName);
+ Map<String, Object> propertyMap = getApplicationHeaders();
+
+ Object o = propertyMap.get(propertyName);
+
+ if(o instanceof Float)
+ {
+ return ((Float)o).floatValue();
+ }
+ else if(o instanceof String)
+ {
+ return Float.valueOf((String) o).floatValue();
+ }
+ else if(getApplicationHeaders().containsKey(propertyName))
+ {
+ throw new MessageFormatException("getFloatProperty(\""+propertyName+"\") failed as value is not a float: " + o);
+ }
+ else
+ {
+ return Float.valueOf(null);
+ }
+
+ }
+
+ public double getDoubleProperty(String propertyName) throws JMSException
+ {
+ checkPropertyName(propertyName);
+
+ Map<String, Object> propertyMap = getApplicationHeaders();
+
+ Object o = propertyMap.get(propertyName);
+
+ if(o instanceof Double)
+ {
+ return ((Double)o).doubleValue();
+ }
+ else
+ {
+ try
+ {
+ return Double.valueOf(getFloatProperty(propertyName));
+ }
+ catch(MessageFormatException e)
+ {
+ throw new MessageFormatException("getDoubleProperty(\""+propertyName+"\") failed as value is not a double: " + o);
+ }
+
+ }
+ }
+
+ public String getStringProperty(String propertyName) throws JMSException
+ {
+ if (propertyName.equals(CustomJMSXProperty.JMSXUserID.toString()))
+ {
+ return new String(_messageProps.getUserId());
+ }
+ else
+ {
+ checkPropertyName(propertyName);
+ Map<String, Object> propertyMap = getApplicationHeaders();
+
+ Object o = propertyMap.get(propertyName);
+
+ if(o instanceof String)
+ {
+ return (String) o;
+ }
+ else if(o == null)
+ {
+ if(propertyMap.containsKey(propertyName))
+ {
+ return null;
+ }
+ else
+ {
+ return String.valueOf(null);
+ }
+ }
+ else if(o.getClass().isArray())
+ {
+ throw new MessageFormatException("getString(\""+propertyName+"\") failed as value of type " + o.getClass()+ " is an array.");
+ }
+ else
+ {
+ return String.valueOf(o);
+ }
+
+ }
+ }
+
+ public Object getObjectProperty(String propertyName) throws JMSException
+ {
+ checkPropertyName(propertyName);
+ Map<String, Object> propertyMap = getApplicationHeaders();
+
+ return propertyMap.get(propertyName);
+
+ }
+
+ public Enumeration getPropertyNames() throws JMSException
+ {
+ return java.util.Collections.enumeration(getApplicationHeaders().keySet());
+ }
+
+ public void setBooleanProperty(String propertyName, boolean b) throws JMSException
+ {
+ checkWritableProperties();
+ setApplicationHeader(propertyName, b);
+ }
+
+ public void setByteProperty(String propertyName, byte b) throws JMSException
+ {
+ checkWritableProperties();
+ setApplicationHeader(propertyName, b);
+ }
+
+ public void setShortProperty(String propertyName, short i) throws JMSException
+ {
+ checkWritableProperties();
+ setApplicationHeader(propertyName, i);
+ }
+
+ public void setIntProperty(String propertyName, int i) throws JMSException
+ {
+ checkWritableProperties();
+ setApplicationHeader(propertyName, i);
+ }
+
+ public void setLongProperty(String propertyName, long l) throws JMSException
+ {
+ checkWritableProperties();
+ setApplicationHeader(propertyName, l);
+ }
+
+ public void setFloatProperty(String propertyName, float f) throws JMSException
+ {
+ checkWritableProperties();
+ setApplicationHeader(propertyName, f);
+ }
+
+ public void setDoubleProperty(String propertyName, double v) throws JMSException
+ {
+ checkWritableProperties();
+ setApplicationHeader(propertyName, v);
+ }
+
+ public void setStringProperty(String propertyName, String value) throws JMSException
+ {
+ checkWritableProperties();
+ setApplicationHeader(propertyName, value);
+ }
+
+ public void setObjectProperty(String propertyName, Object object) throws JMSException
+ {
+ checkWritableProperties();
+ setApplicationHeader(propertyName, object);
+ }
+
+ private void setApplicationHeader(String propertyName, Object object)
+ {
+ Map<String, Object> headers = _messageProps.getApplicationHeaders();
+ if(headers == null)
+ {
+ headers = new HashMap<String,Object>();
+ _messageProps.setApplicationHeaders(headers);
+ }
+ headers.put(propertyName, object);
+ }
+
+ public void removeProperty(String propertyName) throws JMSException
+ {
+ Map<String, Object> headers = _messageProps.getApplicationHeaders();
+ if(headers != null)
+ {
+ headers.remove(propertyName);
+ }
+ }
+
+
+
+ protected void checkWritableProperties() throws MessageNotWriteableException
+ {
+ if (_readableProperties)
+ {
+ throw new MessageNotWriteableException("You need to call clearProperties() to make the message writable");
+ }
+ }
+
+
+ public int getJMSPriority() throws JMSException
+ {
+ MessageDeliveryPriority messageDeliveryPriority = _deliveryProps.getPriority();
+ return messageDeliveryPriority == null ? Message.DEFAULT_PRIORITY : messageDeliveryPriority.getValue();
+ }
+
+ public void setJMSPriority(int i) throws JMSException
+ {
+ _deliveryProps.setPriority(MessageDeliveryPriority.get((short)i));
+ }
+
+ public void clearProperties() throws JMSException
+ {
+ if(!getApplicationHeaders().isEmpty())
+ {
+ getApplicationHeaders().clear();
+ }
+
+ _readableProperties = false;
+ }
+
+
+ public void acknowledgeThis() throws JMSException
+ {
+ // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge
+ // is not specified. In our case, we only set the session field where client acknowledge mode is specified.
+ if (_session != null)
+ {
+ if (_session.getAMQConnection().isClosed())
+ {
+ throw new javax.jms.IllegalStateException("Connection is already closed");
+ }
+
+ // we set multiple to true here since acknowledgement implies acknowledge of all previous messages
+ // received on the session
+ _session.acknowledgeMessage(_deliveryTag, true);
+ }
+ }
+
+ public void acknowledge() throws JMSException
+ {
+ if (_session != null)
+ {
+ _session.acknowledge();
+ }
+ }
+
+
+ /**
+ * The session is set when CLIENT_ACKNOWLEDGE mode is used so that the CHANNEL ACK can be sent when the user calls
+ * acknowledge()
+ *
+ * @param s the AMQ session that delivered this message
+ */
+ public void setAMQSession(AMQSession s)
+ {
+ _session = s;
+ }
+
+ public AMQSession getAMQSession()
+ {
+ return _session;
+ }
+
+ /**
+ * Get the AMQ message number assigned to this message
+ *
+ * @return the message number
+ */
+ public long getDeliveryTag()
+ {
+ return _deliveryTag;
+ }
+
+
+
+
+
+
+ protected void checkPropertyName(CharSequence propertyName)
+ {
+ if (propertyName == null)
+ {
+ throw new IllegalArgumentException("Property name must not be null");
+ }
+ else if (propertyName.length() == 0)
+ {
+ throw new IllegalArgumentException("Property name must not be the empty string");
+ }
+
+ checkIdentiferFormat(propertyName);
+ }
+
+ protected void checkIdentiferFormat(CharSequence propertyName)
+ {
+// JMS requirements 3.5.1 Property Names
+// Identifiers:
+// - An identifier is an unlimited-length character sequence that must begin
+// with a Java identifier start character; all following characters must be Java
+// identifier part characters. An identifier start character is any character for
+// which the method Character.isJavaIdentifierStart returns true. This includes
+// '_' and '$'. An identifier part character is any character for which the
+// method Character.isJavaIdentifierPart returns true.
+// - Identifiers cannot be the names NULL, TRUE, or FALSE.
+// Identifiers cannot be NOT, AND, OR, BETWEEN, LIKE, IN, IS, or
+// ESCAPE.
+// Identifiers are either header field references or property references. The
+// type of a property value in a message selector corresponds to the type
+// used to set the property. If a property that does not exist in a message is
+// referenced, its value is NULL. The semantics of evaluating NULL values
+// in a selector are described in Section 3.8.1.2, Null Values.
+// The conversions that apply to the get methods for properties do not
+// apply when a property is used in a message selector expression. For
+// example, suppose you set a property as a string value, as in the
+// following:
+// myMessage.setStringProperty("NumberOfOrders", "2");
+// The following expression in a message selector would evaluate to false,
+// because a string cannot be used in an arithmetic expression:
+// "NumberOfOrders > 1"
+// Identifiers are case sensitive.
+// Message header field references are restricted to JMSDeliveryMode,
+// JMSPriority, JMSMessageID, JMSTimestamp, JMSCorrelationID, and
+// JMSType. JMSMessageID, JMSCorrelationID, and JMSType values may be
+// null and if so are treated as a NULL value.
+
+ if (Boolean.getBoolean("strict-jms"))
+ {
+ // JMS start character
+ if (!(Character.isJavaIdentifierStart(propertyName.charAt(0))))
+ {
+ throw new IllegalArgumentException("Identifier '" + propertyName + "' does not start with a valid JMS identifier start character");
+ }
+
+ // JMS part character
+ int length = propertyName.length();
+ for (int c = 1; c < length; c++)
+ {
+ if (!(Character.isJavaIdentifierPart(propertyName.charAt(c))))
+ {
+ throw new IllegalArgumentException("Identifier '" + propertyName + "' contains an invalid JMS identifier character");
+ }
+ }
+
+ // JMS invalid names
+ if ((propertyName.equals("NULL")
+ || propertyName.equals("TRUE")
+ || propertyName.equals("FALSE")
+ || propertyName.equals("NOT")
+ || propertyName.equals("AND")
+ || propertyName.equals("OR")
+ || propertyName.equals("BETWEEN")
+ || propertyName.equals("LIKE")
+ || propertyName.equals("IN")
+ || propertyName.equals("IS")
+ || propertyName.equals("ESCAPE")))
+ {
+ throw new IllegalArgumentException("Identifier '" + propertyName + "' is not allowed in JMS");
+ }
+ }
+
+ }
+
+
+ public MessageProperties getMessageProperties()
+ {
+ return _messageProps;
+ }
+
+
+ public DeliveryProperties getDeliveryProperties()
+ {
+ return _deliveryProps;
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
------------------------------------------------------------------------------
svn:eol-style = native