You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2008/08/08 20:31:19 UTC
svn commit: r684036 [1/2] - in /incubator/qpid/trunk/qpid/java:
client/src/main/java/org/apache/qpid/client/
client/src/main/java/org/apache/qpid/client/handler/
client/src/main/java/org/apache/qpid/client/message/
client/src/main/java/org/apache/qpid/...
Author: rhs
Date: Fri Aug 8 11:31:18 2008
New Revision: 684036
URL: http://svn.apache.org/viewvc?rev=684036&view=rev
Log:
QPID-1213: simplified unprocessed message and moved version specific code into the _0_8 and _0_10 variants
Added:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/CloseConsumerMessage.java (with props)
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/MessagePartListener.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/ByteBufferMessage.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Header.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/util/Logger.java
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Fri Aug 8 11:31:18 2008
@@ -63,7 +63,6 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInvalidArgumentException;
import org.apache.qpid.AMQInvalidRoutingKeyException;
-import org.apache.qpid.AMQUndeliveredException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverNoopSupport;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
@@ -78,7 +77,6 @@
import org.apache.qpid.framing.FieldTableFactory;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.jms.Session;
-import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.url.AMQBindingURL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -99,20 +97,20 @@
* @todo Two new objects created on every failover supported method call. Consider more efficient ways of doing this,
* after looking at worse bottlenecks first.
*/
-public abstract class AMQSession extends Closeable implements Session, QueueSession, TopicSession
+public abstract class AMQSession<C extends BasicMessageConsumer, P extends BasicMessageProducer> extends Closeable implements Session, QueueSession, TopicSession
{
- public static final class IdToConsumerMap
+ public static final class IdToConsumerMap<C extends BasicMessageConsumer>
{
private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16];
- private final ConcurrentHashMap<Integer, BasicMessageConsumer> _slowAccessConsumers = new ConcurrentHashMap<Integer, BasicMessageConsumer>();
+ private final ConcurrentHashMap<Integer, C> _slowAccessConsumers = new ConcurrentHashMap<Integer, C>();
- public BasicMessageConsumer get(int id)
+ public C get(int id)
{
if ((id & 0xFFFFFFF0) == 0)
{
- return _fastAccessConsumers[id];
+ return (C) _fastAccessConsumers[id];
}
else
{
@@ -120,12 +118,12 @@
}
}
- public BasicMessageConsumer put(int id, BasicMessageConsumer consumer)
+ public C put(int id, C consumer)
{
- BasicMessageConsumer oldVal;
+ C oldVal;
if ((id & 0xFFFFFFF0) == 0)
{
- oldVal = _fastAccessConsumers[id];
+ oldVal = (C) _fastAccessConsumers[id];
_fastAccessConsumers[id] = consumer;
}
else
@@ -137,12 +135,12 @@
}
- public BasicMessageConsumer remove(int id)
+ public C remove(int id)
{
- BasicMessageConsumer consumer;
+ C consumer;
if ((id & 0xFFFFFFF0) == 0)
{
- consumer = _fastAccessConsumers[id];
+ consumer = (C) _fastAccessConsumers[id];
_fastAccessConsumers[id] = null;
}
else
@@ -154,15 +152,15 @@
}
- public Collection<BasicMessageConsumer> values()
+ public Collection<C> values()
{
- ArrayList<BasicMessageConsumer> values = new ArrayList<BasicMessageConsumer>();
+ ArrayList<C> values = new ArrayList<C>();
for (int i = 0; i < 16; i++)
{
if (_fastAccessConsumers[i] != null)
{
- values.add(_fastAccessConsumers[i]);
+ values.add((C) _fastAccessConsumers[i]);
}
}
values.addAll(_slowAccessConsumers.values());
@@ -183,8 +181,6 @@
/** Used for debugging. */
private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);
- /** Used for debugging in the dispatcher. */
- private static final Logger _dispatcherLogger = LoggerFactory.getLogger(Dispatcher.class);
/** The default maximum number of prefetched message at which to suspend the channel. */
public static final int DEFAULT_PREFETCH_HIGH_MARK = 5000;
@@ -234,7 +230,6 @@
/** Holds this session unique identifier, used to distinguish it from other sessions. */
protected int _channelId;
- /** @todo This does not appear to be set? */
private int _ticket;
/** Holds the high mark for prefetched message, at which the session is suspended. */
@@ -261,8 +256,8 @@
* Holds a mapping from message consumers to their identifying names, so that their subscriptions may be looked
* up in the {@link #_subscriptions} map.
*/
- protected final ConcurrentHashMap<BasicMessageConsumer, String> _reverseSubscriptionMap =
- new ConcurrentHashMap<BasicMessageConsumer, String>();
+ protected final ConcurrentHashMap<C, String> _reverseSubscriptionMap =
+ new ConcurrentHashMap<C, String>();
/**
* Used to hold incoming messages.
@@ -299,7 +294,7 @@
* Maps from identifying tags to message consumers, in order to pass dispatch incoming messages to the right
* consumer.
*/
- protected final IdToConsumerMap _consumers = new IdToConsumerMap();
+ protected final IdToConsumerMap<C> _consumers = new IdToConsumerMap<C>();
//Map<AMQShortString, BasicMessageConsumer> _consumers =
//new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>();
@@ -308,7 +303,7 @@
* Contains a list of consumers which have been removed but which might still have
* messages to acknowledge, eg in client ack or transacted modes
*/
- private CopyOnWriteArrayList<BasicMessageConsumer> _removedConsumers = new CopyOnWriteArrayList<BasicMessageConsumer>();
+ private CopyOnWriteArrayList<C> _removedConsumers = new CopyOnWriteArrayList<C>();
/** Provides a count of consumers on destinations, in order to be able to know if a destination has consumers. */
private ConcurrentHashMap<Destination, AtomicInteger> _destinationConsumerCount =
@@ -584,7 +579,7 @@
}, _connection).execute();
}
- public void addBindingKey(BasicMessageConsumer consumer, AMQDestination amqd, String routingKey) throws AMQException
+ public void addBindingKey(C consumer, AMQDestination amqd, String routingKey) throws AMQException
{
if (consumer.getQueuename() != null)
{
@@ -755,11 +750,12 @@
public abstract void sendCommit() throws AMQException, FailoverException;
- public void confirmConsumerCancelled(AMQShortString consumerTag)
+
+ public void confirmConsumerCancelled(int consumerTag)
{
// Remove the consumer from the map
- BasicMessageConsumer consumer = _consumers.get(consumerTag.toIntValue());
+ C consumer = _consumers.get(consumerTag);
if (consumer != null)
{
if (!consumer.isNoConsume()) // Normal Consumer
@@ -801,7 +797,7 @@
}
else
{
- _queue.add(new UnprocessedMessage.CloseConsumerMessage(consumer));
+ _queue.add(new CloseConsumerMessage(consumer));
}
}
}
@@ -848,7 +844,7 @@
false, false);
}
- public MessageConsumer createExclusiveConsumer(Destination destination) throws JMSException
+ public C createExclusiveConsumer(Destination destination) throws JMSException
{
checkValidDestination(destination);
@@ -927,8 +923,8 @@
_subscriptions.get(name).close();
}
AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection);
- BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal);
- TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, consumer);
+ C consumer = (C) createConsumer(dest, messageSelector, noLocal);
+ TopicSubscriberAdaptor<C> subscriber = new TopicSubscriberAdaptor(dest, consumer);
_subscriptions.put(name, subscriber);
_reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
@@ -960,23 +956,23 @@
return msg;
}
- public BasicMessageProducer createProducer(Destination destination) throws JMSException
+ public P createProducer(Destination destination) throws JMSException
{
return createProducerImpl(destination, DEFAULT_MANDATORY, DEFAULT_IMMEDIATE);
}
- public BasicMessageProducer createProducer(Destination destination, boolean immediate) throws JMSException
+ public P createProducer(Destination destination, boolean immediate) throws JMSException
{
return createProducerImpl(destination, DEFAULT_MANDATORY, immediate);
}
- public BasicMessageProducer createProducer(Destination destination, boolean mandatory, boolean immediate)
+ public P createProducer(Destination destination, boolean mandatory, boolean immediate)
throws JMSException
{
return createProducerImpl(destination, mandatory, immediate);
}
- public BasicMessageProducer createProducer(Destination destination, boolean mandatory, boolean immediate,
+ public P createProducer(Destination destination, boolean mandatory, boolean immediate,
boolean waitUntilSent) throws JMSException
{
return createProducerImpl(destination, mandatory, immediate, waitUntilSent);
@@ -986,7 +982,7 @@
{
checkNotClosed();
- return new TopicPublisherAdapter((BasicMessageProducer) createProducer(topic, false, false), topic);
+ return new TopicPublisherAdapter((P) createProducer(topic, false, false), topic);
}
public Queue createQueue(String queueName) throws JMSException
@@ -1074,7 +1070,7 @@
{
checkValidDestination(destination);
AMQQueue dest = (AMQQueue) destination;
- BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(destination);
+ C consumer = (C) createConsumer(destination);
return new QueueReceiverAdaptor(dest, consumer);
}
@@ -1093,7 +1089,7 @@
{
checkValidDestination(destination);
AMQQueue dest = (AMQQueue) destination;
- BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(destination, messageSelector);
+ C consumer = (C) createConsumer(destination, messageSelector);
return new QueueReceiverAdaptor(dest, consumer);
}
@@ -1111,7 +1107,7 @@
{
checkNotClosed();
AMQQueue dest = (AMQQueue) queue;
- BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest);
+ C consumer = (C) createConsumer(dest);
return new QueueReceiverAdaptor(dest, consumer);
}
@@ -1130,7 +1126,7 @@
{
checkNotClosed();
AMQQueue dest = (AMQQueue) queue;
- BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector);
+ C consumer = (C) createConsumer(dest, messageSelector);
return new QueueReceiverAdaptor(dest, consumer);
}
@@ -1175,7 +1171,7 @@
AMQTopic dest = checkValidTopic(topic);
// AMQTopic dest = new AMQTopic(topic.getTopicName());
- return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createExclusiveConsumer(dest));
+ return new TopicSubscriberAdaptor(dest, (C) createExclusiveConsumer(dest));
}
/**
@@ -1195,7 +1191,7 @@
AMQTopic dest = checkValidTopic(topic);
// AMQTopic dest = new AMQTopic(topic.getTopicName());
- return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createExclusiveConsumer(dest, messageSelector, noLocal));
+ return new TopicSubscriberAdaptor(dest, (C) createExclusiveConsumer(dest, messageSelector, noLocal));
}
public abstract TemporaryQueue createTemporaryQueue() throws JMSException;
@@ -1369,17 +1365,8 @@
{
_logger.debug("Message[" + message.toString() + "] received in session");
}
-
- if (message instanceof ReturnMessage)
- {
- // Return of the bounced message.
- returnBouncedMessage((ReturnMessage) message);
- }
- else
- {
- _highestDeliveryTag.set(message.getDeliveryTag());
- _queue.add(message);
- }
+ _highestDeliveryTag.set(message.getDeliveryTag());
+ _queue.add(message);
}
public void declareAndBind(AMQDestination amqd)
@@ -1618,7 +1605,7 @@
}
}
- protected MessageConsumer createConsumerImpl(final Destination destination, final int prefetchHigh,
+ protected C createConsumerImpl(final Destination destination, final int prefetchHigh,
final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable rawSelector,
final boolean noConsume, final boolean autoClose) throws JMSException
{
@@ -1642,10 +1629,10 @@
messageSelector = selector;
}
- return new FailoverRetrySupport<MessageConsumer, JMSException>(
- new FailoverProtectedOperation<MessageConsumer, JMSException>()
+ return new FailoverRetrySupport<C, JMSException>(
+ new FailoverProtectedOperation<C, JMSException>()
{
- public MessageConsumer execute() throws JMSException, FailoverException
+ public C execute() throws JMSException, FailoverException
{
checkNotClosed();
@@ -1668,7 +1655,7 @@
ft.put(new AMQShortString("x-filter-jms-selector"), messageSelector);
}
- BasicMessageConsumer consumer = createMessageConsumer(amqd, prefetchHigh, prefetchLow,
+ C consumer = createMessageConsumer(amqd, prefetchHigh, prefetchLow,
noLocal, exclusive, messageSelector, ft, noConsume, autoClose);
if (_messageListener != null)
@@ -1712,7 +1699,7 @@
}, _connection).execute();
}
- public abstract BasicMessageConsumer createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
+ public abstract C createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable arguments,
final boolean noConsume, final boolean autoClose) throws JMSException;
@@ -1722,9 +1709,9 @@
*
* @param consumer the consum
*/
- void deregisterConsumer(BasicMessageConsumer consumer)
+ void deregisterConsumer(C consumer)
{
- if (_consumers.remove(consumer.getConsumerTag().toIntValue()) != null)
+ if (_consumers.remove(consumer.getConsumerTag()) != null)
{
String subscriptionName = _reverseSubscriptionMap.remove(consumer);
if (subscriptionName != null)
@@ -2012,12 +1999,12 @@
{
// we need to clone the list of consumers since the close() method updates the _consumers collection
// which would result in a concurrent modification exception
- final ArrayList<BasicMessageConsumer> clonedConsumers = new ArrayList<BasicMessageConsumer>(_consumers.values());
+ final ArrayList<C> clonedConsumers = new ArrayList<C>(_consumers.values());
- final Iterator<BasicMessageConsumer> it = clonedConsumers.iterator();
+ final Iterator<C> it = clonedConsumers.iterator();
while (it.hasNext())
{
- final BasicMessageConsumer con = it.next();
+ final C con = it.next();
if (error != null)
{
con.notifyError(error);
@@ -2048,7 +2035,7 @@
final Iterator it = clonedProducers.iterator();
while (it.hasNext())
{
- final BasicMessageProducer prod = (BasicMessageProducer) it.next();
+ final P prod = (P) it.next();
prod.close();
}
// at this point the _producers map is empty
@@ -2096,20 +2083,18 @@
*
* @param queueName
*/
- private void consumeFromQueue(BasicMessageConsumer consumer, AMQShortString queueName,
+ private void consumeFromQueue(C consumer, AMQShortString queueName,
AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector) throws AMQException, FailoverException
{
int tagId = _nextTag++;
- // need to generate a consumer tag on the client so we can exploit the nowait flag
- AMQShortString tag = new AMQShortString(Integer.toString(tagId));
- consumer.setConsumerTag(tag);
+ consumer.setConsumerTag(tagId);
// we must register the consumer in the map before we actually start listening
_consumers.put(tagId, consumer);
try
{
- sendConsume(consumer, queueName, protocolHandler, nowait, messageSelector, tag);
+ sendConsume(consumer, queueName, protocolHandler, nowait, messageSelector, tagId);
}
catch (AMQException e)
{
@@ -2119,26 +2104,26 @@
}
}
- public abstract void sendConsume(BasicMessageConsumer consumer, AMQShortString queueName,
- AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector, AMQShortString tag) throws AMQException, FailoverException;
+ public abstract void sendConsume(C consumer, AMQShortString queueName,
+ AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector, int tag) throws AMQException, FailoverException;
- private BasicMessageProducer createProducerImpl(Destination destination, boolean mandatory, boolean immediate)
+ private P createProducerImpl(Destination destination, boolean mandatory, boolean immediate)
throws JMSException
{
return createProducerImpl(destination, mandatory, immediate, false);
}
- private BasicMessageProducer createProducerImpl(final Destination destination, final boolean mandatory,
+ private P createProducerImpl(final Destination destination, final boolean mandatory,
final boolean immediate, final boolean waitUntilSent) throws JMSException
{
- return new FailoverRetrySupport<BasicMessageProducer, JMSException>(
- new FailoverProtectedOperation<BasicMessageProducer, JMSException>()
+ return new FailoverRetrySupport<P, JMSException>(
+ new FailoverProtectedOperation<P, JMSException>()
{
- public BasicMessageProducer execute() throws JMSException, FailoverException
+ public P execute() throws JMSException, FailoverException
{
checkNotClosed();
long producerId = getNextProducerId();
- BasicMessageProducer producer = createMessageProducer(destination, mandatory,
+ P producer = createMessageProducer(destination, mandatory,
immediate, waitUntilSent, producerId);
registerProducer(producerId, producer);
@@ -2147,7 +2132,7 @@
}, _connection).execute();
}
- public abstract BasicMessageProducer createMessageProducer(final Destination destination, final boolean mandatory,
+ public abstract P createMessageProducer(final Destination destination, final boolean mandatory,
final boolean immediate, final boolean waitUntilSent, long producerId);
private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException
@@ -2320,12 +2305,12 @@
}
// we need to clone the list of consumers since the close() method updates the _consumers collection
// which would result in a concurrent modification exception
- final ArrayList<BasicMessageConsumer> clonedConsumers = new ArrayList<BasicMessageConsumer>(_consumers.values());
+ final ArrayList<C> clonedConsumers = new ArrayList<C>(_consumers.values());
- final Iterator<BasicMessageConsumer> it = clonedConsumers.iterator();
+ final Iterator<C> it = clonedConsumers.iterator();
while (it.hasNext())
{
- final BasicMessageConsumer con = it.next();
+ final C con = it.next();
con.markClosed();
}
// at this point the _consumers map will be empty
@@ -2360,7 +2345,7 @@
*
* @throws AMQException
*/
- private void registerConsumer(BasicMessageConsumer consumer, boolean nowait) throws AMQException // , FailoverException
+ private void registerConsumer(C consumer, boolean nowait) throws AMQException // , FailoverException
{
AMQDestination amqd = consumer.getDestination();
@@ -2424,15 +2409,16 @@
private void rejectAllMessages(boolean requeue)
{
- rejectMessagesForConsumerTag(null, requeue);
+ rejectMessagesForConsumerTag(0, requeue, true);
}
/**
* @param consumerTag The consumerTag to prune from queue or all if null
* @param requeue Should the removed messages be requeued (or discarded. Possibly to DLQ)
+ * @param rejectAllConsumers
*/
- private void rejectMessagesForConsumerTag(AMQShortString consumerTag, boolean requeue)
+ private void rejectMessagesForConsumerTag(int consumerTag, boolean requeue, boolean rejectAllConsumers)
{
Iterator messages = _queue.iterator();
if (_logger.isInfoEnabled())
@@ -2453,7 +2439,7 @@
{
UnprocessedMessage message = (UnprocessedMessage) messages.next();
- if ((consumerTag == null) || message.getConsumerTag().equals(consumerTag.toString()))
+ if (rejectAllConsumers || (message.getConsumerTag() == consumerTag))
{
if (_logger.isDebugEnabled())
{
@@ -2475,12 +2461,11 @@
private void resubscribeConsumers() throws AMQException
{
- ArrayList consumers = new ArrayList(_consumers.values());
+ ArrayList<C> consumers = new ArrayList<C>(_consumers.values());
_consumers.clear();
- for (Iterator it = consumers.iterator(); it.hasNext();)
- {
- BasicMessageConsumer consumer = (BasicMessageConsumer) it.next();
+ for (C consumer : consumers)
+ {
consumer.failedOver();
registerConsumer(consumer, true);
}
@@ -2492,53 +2477,11 @@
_logger.info(MessageFormat.format("Resubscribing producers = {0} producers.size={1}", producers, producers.size())); // FIXME: removeKey
for (Iterator it = producers.iterator(); it.hasNext();)
{
- BasicMessageProducer producer = (BasicMessageProducer) it.next();
+ P producer = (P) it.next();
producer.resubscribe();
}
}
- private void returnBouncedMessage(final ReturnMessage msg)
- {
- _connection.performConnectionTask(new Runnable()
- {
- public void run()
- {
- try
- {
- // Bounced message is processed here, away from the mina thread
- AbstractJMSMessage bouncedMessage =
- _messageFactoryRegistry.createMessage(0, false, msg.getExchange(),
- msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies());
- AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode());
- AMQShortString reason = msg.getReplyText();
- _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
-
- // @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
- if (errorCode == AMQConstant.NO_CONSUMERS)
- {
- _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage, null));
- }
- else if (errorCode == AMQConstant.NO_ROUTE)
- {
- _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage, null));
- }
- else
- {
- _connection.exceptionReceived(
- new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage, null));
- }
-
- }
- catch (Exception e)
- {
- _logger.error(
- "Caught exception trying to raise undelivered message exception (dump follows) - ignoring...",
- e);
- }
- }
- });
- }
-
/**
* Suspends or unsuspends this session.
*
@@ -2641,6 +2584,10 @@
}
+ /** Used for debugging in the dispatcher. */
+ private static final Logger _dispatcherLogger = LoggerFactory.getLogger("org.apache.qpid.client.AMQSession.Dispatcher");
+
+
/** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */
class Dispatcher extends Thread
{
@@ -2652,6 +2599,8 @@
private final AtomicLong _rollbackMark = new AtomicLong(-1);
private String dispatcherID = "" + System.identityHashCode(this);
+
+
public Dispatcher()
{
super("Dispatcher-Channel-" + _channelId);
@@ -2670,7 +2619,7 @@
}
- public void rejectPending(BasicMessageConsumer consumer)
+ public void rejectPending(C consumer)
{
synchronized (_lock)
{
@@ -2685,7 +2634,7 @@
consumer.rollbackPendingMessages();
// Reject messages on pre-dispatch queue
- rejectMessagesForConsumerTag(consumer.getConsumerTag(), true);
+ rejectMessagesForConsumerTag(consumer.getConsumerTag(), true, false);
//Let the dispatcher deal with this when it gets to them.
// closeConsumer
@@ -2712,7 +2661,7 @@
_dispatcherLogger.debug("Session Pre Dispatch Queue cleared");
- for (BasicMessageConsumer consumer : _consumers.values())
+ for (C consumer : _consumers.values())
{
if (!consumer.isNoConsume())
{
@@ -2778,7 +2727,7 @@
_lock.wait();
}
- if (!(message instanceof UnprocessedMessage.CloseConsumerMessage)
+ if (!(message instanceof CloseConsumerMessage)
&& tagLE(deliveryTag, _rollbackMark.get()))
{
rejectMessage(message, true);
@@ -2840,8 +2789,8 @@
//This if block is not needed anymore as bounce messages are handled separately
//if (message.getDeliverBody() != null)
//{
- final BasicMessageConsumer consumer =
- _consumers.get(message.getConsumerTag().toIntValue());
+ final C consumer =
+ _consumers.get(message.getConsumerTag());
if ((consumer == null) || consumer.isClosed())
{
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Fri Aug 8 11:31:18 2008
@@ -52,7 +52,7 @@
/**
* This is a 0.10 Session
*/
-public class AMQSession_0_10 extends AMQSession
+public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, BasicMessageProducer_0_10>
{
/**
@@ -345,7 +345,7 @@
/**
* Create an 0_10 message consumer
*/
- public BasicMessageConsumer createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
+ public BasicMessageConsumer_0_10 createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
final int prefetchLow, final boolean noLocal,
final boolean exclusive, String messageSelector,
final FieldTable ft, final boolean noConsume,
@@ -406,8 +406,8 @@
* This method is invoked when a consumer is creted
* Registers the consumer with the broker
*/
- public void sendConsume(BasicMessageConsumer consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler,
- boolean nowait, String messageSelector, AMQShortString tag)
+ public void sendConsume(BasicMessageConsumer_0_10 consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler,
+ boolean nowait, String messageSelector, int tag)
throws AMQException, FailoverException
{
boolean preAcquire;
@@ -416,10 +416,10 @@
preAcquire = ( ! consumer.isNoConsume() &&
(consumer.getMessageSelector() == null || consumer.getMessageSelector().equals("")) )
|| !(consumer.getDestination() instanceof AMQQueue);
- getQpidSession().messageSubscribe(queueName.toString(), tag.toString(),
+ getQpidSession().messageSubscribe(queueName.toString(), String.valueOf(tag),
getAcknowledgeMode() == NO_ACKNOWLEDGE ? Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED:Session.TRANSFER_CONFIRM_MODE_REQUIRED,
preAcquire ? Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE : Session.TRANSFER_ACQUIRE_MODE_NO_ACQUIRE,
- new MessagePartListenerAdapter((BasicMessageConsumer_0_10) consumer), null,
+ (BasicMessageConsumer_0_10) consumer, null,
consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
}
catch (JMSException e)
@@ -427,21 +427,23 @@
throw new AMQException(AMQConstant.INTERNAL_ERROR, "problem when registering consumer", e);
}
+ String consumerTag = ((BasicMessageConsumer_0_10)consumer).getConsumerTagString();
+
if (! prefetch())
{
- getQpidSession().messageSetFlowMode(consumer.getConsumerTag().toString(), MessageFlowMode.CREDIT);
+ getQpidSession().messageSetFlowMode(consumerTag, MessageFlowMode.CREDIT);
}
else
{
- getQpidSession().messageSetFlowMode(consumer.getConsumerTag().toString(), MessageFlowMode.WINDOW);
+ getQpidSession().messageSetFlowMode(consumerTag, MessageFlowMode.WINDOW);
}
- getQpidSession().messageFlow(consumer.getConsumerTag().toString(), MessageCreditUnit.BYTE, 0xFFFFFFFF);
+ getQpidSession().messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF);
// We need to sync so that we get notify of an error.
// only if not immediat prefetch
if(prefetch() && (consumer.isStrated() || _immediatePrefetch))
{
// set the flow
- getQpidSession().messageFlow(consumer.getConsumerTag().toString(),
+ getQpidSession().messageFlow(consumerTag,
MessageCreditUnit.MESSAGE,
getAMQConnection().getMaxPrefetch());
}
@@ -452,7 +454,7 @@
/**
* Create an 0_10 message producer
*/
- public BasicMessageProducer createMessageProducer(final Destination destination, final boolean mandatory,
+ public BasicMessageProducer_0_10 createMessageProducer(final Destination destination, final boolean mandatory,
final boolean immediate, final boolean waitUntilSent,
long producerId)
{
@@ -542,13 +544,14 @@
{
for (BasicMessageConsumer consumer : _consumers.values())
{
- getQpidSession().messageStop(consumer.getConsumerTag().toString());
+ getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()));
}
}
else
{
- for (BasicMessageConsumer consumer : _consumers.values())
+ for (BasicMessageConsumer_0_10 consumer : _consumers.values())
{
+ String consumerTag = String.valueOf(consumer.getConsumerTag());
//only set if msg list is null
try
{
@@ -556,18 +559,18 @@
{
if (consumer.getMessageListener() != null)
{
- getQpidSession().messageFlow(consumer.getConsumerTag().toString(),
+ getQpidSession().messageFlow(consumerTag,
MessageCreditUnit.MESSAGE, 1);
}
}
else
{
getQpidSession()
- .messageFlow(consumer.getConsumerTag().toString(), MessageCreditUnit.MESSAGE,
+ .messageFlow(consumerTag, MessageCreditUnit.MESSAGE,
getAMQConnection().getMaxPrefetch());
}
getQpidSession()
- .messageFlow(consumer.getConsumerTag().toString(), MessageCreditUnit.BYTE, 0xFFFFFFFF);
+ .messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF);
}
catch (Exception e)
{
@@ -715,7 +718,7 @@
AMQTopic origTopic=checkValidTopic(topic);
AMQTopic dest=AMQTopic.createDurable010Topic(origTopic, name, _connection);
- TopicSubscriberAdaptor subscriber=_subscriptions.get(name);
+ TopicSubscriberAdaptor<BasicMessageConsumer_0_10> subscriber=_subscriptions.get(name);
if (subscriber != null)
{
if (subscriber.getTopic().equals(topic))
@@ -766,7 +769,7 @@
}
}
- subscriber=new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createExclusiveConsumer(dest));
+ subscriber=new TopicSubscriberAdaptor(dest, createExclusiveConsumer(dest));
_subscriptions.put(name, subscriber);
_reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Fri Aug 8 11:31:18 2008
@@ -25,10 +25,11 @@
import javax.jms.IllegalStateException;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQUndeliveredException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.client.failover.FailoverRetrySupport;
-import org.apache.qpid.client.message.MessageFactoryRegistry;
+import org.apache.qpid.client.message.*;
import org.apache.qpid.client.message.AMQMessageDelegateFactory;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
@@ -43,7 +44,7 @@
import java.util.Map;
-public final class AMQSession_0_8 extends AMQSession
+public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8>
{
/** Used for debugging. */
@@ -218,6 +219,7 @@
return isQueueBound(destination.getExchangeName(),destination.getAMQQueueName(),destination.getAMQQueueName());
}
+
public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey)
throws JMSException
{
@@ -245,10 +247,14 @@
{
throw new JMSAMQException("Queue bound query failed: " + e.getMessage(), e);
}
- }
+ }
- public void sendConsume(BasicMessageConsumer consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, boolean nowait,
- String messageSelector, AMQShortString tag) throws AMQException, FailoverException
+ @Override public void sendConsume(BasicMessageConsumer_0_8 consumer,
+ AMQShortString queueName,
+ AMQProtocolHandler protocolHandler,
+ boolean nowait,
+ String messageSelector,
+ int tag) throws AMQException, FailoverException
{
FieldTable arguments = FieldTableFactory.newFieldTable();
if ((messageSelector != null) && !messageSelector.equals(""))
@@ -268,7 +274,7 @@
BasicConsumeBody body = getMethodRegistry().createBasicConsumeBody(getTicket(),
queueName,
- tag,
+ new AMQShortString(String.valueOf(tag)),
consumer.isNoLocal(),
consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE,
consumer.isExclusive(),
@@ -337,7 +343,7 @@
}
- public BasicMessageProducer createMessageProducer(final Destination destination, final boolean mandatory,
+ public BasicMessageProducer_0_8 createMessageProducer(final Destination destination, final boolean mandatory,
final boolean immediate, final boolean waitUntilSent, long producerId)
{
@@ -345,6 +351,66 @@
this, getProtocolHandler(), producerId, immediate, mandatory, waitUntilSent);
}
+
+ @Override public void messageReceived(UnprocessedMessage message)
+ {
+
+ if (message instanceof ReturnMessage)
+ {
+ // Return of the bounced message.
+ returnBouncedMessage((ReturnMessage) message);
+ }
+ else
+ {
+ super.messageReceived(message);
+ }
+ }
+
+ private void returnBouncedMessage(final ReturnMessage msg)
+ {
+ _connection.performConnectionTask(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ // Bounced message is processed here, away from the mina thread
+ AbstractJMSMessage bouncedMessage =
+ _messageFactoryRegistry.createMessage(0, false, msg.getExchange(),
+ msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies());
+ AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode());
+ AMQShortString reason = msg.getReplyText();
+ _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
+
+ // @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
+ if (errorCode == AMQConstant.NO_CONSUMERS)
+ {
+ _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage, null));
+ }
+ else if (errorCode == AMQConstant.NO_ROUTE)
+ {
+ _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage, null));
+ }
+ else
+ {
+ _connection.exceptionReceived(
+ new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage, null));
+ }
+
+ }
+ catch (Exception e)
+ {
+ _logger.error(
+ "Caught exception trying to raise undelivered message exception (dump follows) - ignoring...",
+ e);
+ }
+ }
+ });
+ }
+
+
+
+
public void sendRollback() throws AMQException, FailoverException
{
TxRollbackBody body = getMethodRegistry().createTxRollbackBody();
@@ -365,7 +431,7 @@
checkNotClosed();
AMQTopic origTopic = checkValidTopic(topic);
AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection);
- TopicSubscriberAdaptor subscriber = _subscriptions.get(name);
+ TopicSubscriberAdaptor<BasicMessageConsumer_0_8> subscriber = _subscriptions.get(name);
if (subscriber != null)
{
if (subscriber.getTopic().equals(topic))
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Fri Aug 8 11:31:18 2008
@@ -22,10 +22,7 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.client.failover.FailoverException;
-import org.apache.qpid.client.message.AbstractJMSMessage;
-import org.apache.qpid.client.message.MessageFactoryRegistry;
-import org.apache.qpid.client.message.UnprocessedMessage;
-import org.apache.qpid.client.message.AMQMessageDelegateFactory;
+import org.apache.qpid.client.message.*;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.*;
import org.apache.qpid.jms.MessageConsumer;
@@ -49,7 +46,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
-public abstract class BasicMessageConsumer<H, B> extends Closeable implements MessageConsumer
+public abstract class BasicMessageConsumer<U> extends Closeable implements MessageConsumer
{
private static final Logger _logger = LoggerFactory.getLogger(BasicMessageConsumer.class);
@@ -72,7 +69,7 @@
private final AtomicReference<MessageListener> _messageListener = new AtomicReference<MessageListener>();
/** The consumer tag allows us to close the consumer by sending a jmsCancel method to the broker */
- protected AMQShortString _consumerTag;
+ protected int _consumerTag;
/** We need to know the channel id when constructing frames */
protected final int _channelId;
@@ -517,7 +514,7 @@
throw e;
}
- else if (o instanceof UnprocessedMessage.CloseConsumerMessage)
+ else if (o instanceof CloseConsumerMessage)
{
_closed.set(true);
deregisterConsumer();
@@ -635,7 +632,7 @@
* @param closeMessage
* this message signals that we should close the browser
*/
- public void notifyCloseMessage(UnprocessedMessage.CloseConsumerMessage closeMessage)
+ public void notifyCloseMessage(CloseConsumerMessage closeMessage)
{
if (isMessageListenerSet())
{
@@ -667,26 +664,21 @@
*
* @param messageFrame the raw unprocessed mesage
*/
- void notifyMessage(UnprocessedMessage messageFrame)
+ void notifyMessage(U messageFrame)
{
- if (messageFrame instanceof UnprocessedMessage.CloseConsumerMessage)
+ if (messageFrame instanceof CloseConsumerMessage)
{
- notifyCloseMessage((UnprocessedMessage.CloseConsumerMessage) messageFrame);
+ notifyCloseMessage((CloseConsumerMessage) messageFrame);
return;
}
- final boolean debug = _logger.isDebugEnabled();
- if (debug)
- {
- _logger.debug("notifyMessage called with message number " + messageFrame.getDeliveryTag());
- }
try
{
AbstractJMSMessage jmsMessage = createJMSMessageFromUnprocessedMessage(_session.getMessageDelegateFactory(), messageFrame);
- if (debug)
+ if (_logger.isDebugEnabled())
{
_logger.debug("Message is of type: " + jmsMessage.getClass().getName());
}
@@ -721,7 +713,7 @@
}
}
- public abstract AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, UnprocessedMessage<H, B> messageFrame)
+ public abstract AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, U messageFrame)
throws Exception;
/** @param jmsMessage this message has already been processed so can't redo preDeliver */
@@ -936,12 +928,12 @@
_session.deregisterConsumer(this);
}
- public AMQShortString getConsumerTag()
+ public int getConsumerTag()
{
return _consumerTag;
}
- public void setConsumerTag(AMQShortString consumerTag)
+ public void setConsumerTag(int consumerTag)
{
_consumerTag = consumerTag;
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Fri Aug 8 11:31:18 2008
@@ -22,11 +22,8 @@
import org.apache.qpid.client.message.*;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.AMQException;
-import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.api.Message;
import org.apache.qpid.transport.*;
import org.apache.qpid.QpidException;
import org.apache.qpid.filter.MessageFilter;
@@ -35,16 +32,14 @@
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import javax.jms.MessageListener;
-import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* This is a 0.10 message consumer.
*/
-public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], ByteBuffer>
- implements org.apache.qpid.nclient.util.MessageListener
+public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedMessage_0_10>
+ implements org.apache.qpid.nclient.MessagePartListener
{
/**
@@ -76,6 +71,7 @@
* Specify whether this consumer is performing a sync receive
*/
private final AtomicBoolean _syncReceive = new AtomicBoolean(false);
+ private String _consumerTagString;
//--- constructor
protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination,
@@ -106,6 +102,19 @@
_isStarted = connection.started();
}
+
+ @Override public void setConsumerTag(int consumerTag)
+ {
+ super.setConsumerTag(consumerTag);
+ _consumerTagString = String.valueOf(consumerTag);
+ }
+
+ public String getConsumerTagString()
+ {
+ return _consumerTagString;
+ }
+
+
// ----- Interface org.apache.qpid.client.util.MessageListener
/**
@@ -142,7 +151,7 @@
{
if (isMessageListenerSet() && ! getSession().prefetch())
{
- _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+ _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
MessageCreditUnit.MESSAGE, 1);
}
_logger.debug("messageOk, trying to notify");
@@ -155,52 +164,19 @@
/**
* This method is invoked by the transport layer when a message is delivered for this
* consumer. The message is transformed and pass to the session.
- * @param message an 0.10 message
+ * @param xfr an 0.10 message transfer
*/
- public void onMessage(Message message)
+ public void messageTransfer(MessageTransfer xfr)
+
+ //public void onMessage(Message message)
{
int channelId = getSession().getChannelId();
- long deliveryId = message.getMessageTransferId();
- AMQShortString consumerTag = getConsumerTag();
- AMQShortString exchange;
- AMQShortString routingKey;
- boolean redelivered = false;
- Struct[] headers = {message.getMessageProperties(), message.getDeliveryProperties()};
- if (headers[0] == null) {
- headers[0] = new MessageProperties();
- }
- if( message.getDeliveryProperties() != null )
- {
- exchange = new AMQShortString(message.getDeliveryProperties().getExchange());
- routingKey = new AMQShortString(message.getDeliveryProperties().getRoutingKey());
- redelivered = message.getDeliveryProperties().getRedelivered();
- }
- else
- {
- exchange = new AMQShortString("");
- routingKey = new AMQShortString("");
- headers[1] = new DeliveryProperties();
- }
+ int consumerTag = getConsumerTag();
+
UnprocessedMessage_0_10 newMessage =
- new UnprocessedMessage_0_10(channelId, deliveryId, consumerTag, exchange, routingKey, redelivered);
- try
- {
- newMessage.receiveBody(message.readData());
- }
- catch (IOException e)
- {
- getSession().getAMQConnection().exceptionReceived(e);
- }
- // if there is a replyto destination then we need to request the exchange info
- ReplyTo replyTo = ((MessageProperties) headers[0]).getReplyTo();
- if (replyTo != null && replyTo.getExchange() != null && !replyTo.getExchange().equals(""))
- {
- // <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
- // the exchnage class will be set later from within the sesion thread
- String replyToUrl = replyTo.getExchange() + "/" + replyTo.getRoutingKey() + "/" + replyTo.getRoutingKey();
- newMessage.setReplyToURL(replyToUrl);
- }
- newMessage.setContentHeader(headers);
+ new UnprocessedMessage_0_10(consumerTag, xfr);
+
+
getSession().messageReceived(newMessage);
// else ignore this message
}
@@ -213,47 +189,16 @@
*/
@Override void sendCancel() throws AMQException
{
- ((AMQSession_0_10) getSession()).getQpidSession().messageCancel(getConsumerTag().toString());
+ ((AMQSession_0_10) getSession()).getQpidSession().messageCancel(getConsumerTagString());
((AMQSession_0_10) getSession()).getQpidSession().sync();
// confirm cancel
getSession().confirmConsumerCancelled(getConsumerTag());
((AMQSession_0_10) getSession()).getCurrentException();
}
- @Override void notifyMessage(UnprocessedMessage messageFrame)
+ @Override void notifyMessage(UnprocessedMessage_0_10 messageFrame)
{
- // if there is a replyto destination then we need to request the exchange info
- String replyToURL = messageFrame.getReplyToURL();
- if (replyToURL != null && !replyToURL.equals(""))
- {
- AMQShortString shortExchangeName = new AMQShortString( replyToURL.substring(0, replyToURL.indexOf('/')));
- String replyToUrl = "://" + replyToURL;
- if (shortExchangeName.equals(ExchangeDefaults.TOPIC_EXCHANGE_NAME))
- {
- replyToUrl = ExchangeDefaults.TOPIC_EXCHANGE_CLASS + replyToUrl;
- }
- else if (shortExchangeName.equals(ExchangeDefaults.DIRECT_EXCHANGE_NAME))
- {
- replyToUrl = ExchangeDefaults.DIRECT_EXCHANGE_CLASS + replyToUrl;
- }
- else if (shortExchangeName.equals(ExchangeDefaults.HEADERS_EXCHANGE_NAME))
- {
- replyToUrl = ExchangeDefaults.HEADERS_EXCHANGE_CLASS + replyToUrl;
- }
- else if (shortExchangeName.equals(ExchangeDefaults.FANOUT_EXCHANGE_NAME))
- {
- replyToUrl = ExchangeDefaults.FANOUT_EXCHANGE_CLASS + replyToUrl;
- }
- else
- {
- Future<ExchangeQueryResult> future =
- ((AMQSession_0_10) getSession()).getQpidSession().exchangeQuery(shortExchangeName.toString());
- ExchangeQueryResult res = future.get();
- // <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
- replyToUrl = res.getType() + replyToUrl;
- }
- ((UnprocessedMessage_0_10) messageFrame).setReplyToURL(replyToUrl);
- }
+
super.notifyMessage(messageFrame);
}
@@ -267,11 +212,10 @@
}
@Override public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(
- AMQMessageDelegateFactory delegateFactory, UnprocessedMessage<Struct[], ByteBuffer> messageFrame) throws Exception
+ AMQMessageDelegateFactory delegateFactory, UnprocessedMessage_0_10 msg) throws Exception
{
- return _messageFactory.createMessage(messageFrame.getDeliveryTag(), messageFrame.isRedelivered(),
- messageFrame.getContentHeader(), messageFrame.getBodies()
- );
+ AMQMessageDelegate_0_10.updateExchangeTypeMapping(msg.getMessageTransfer().getHeader(), ((AMQSession_0_10)getSession()).getQpidSession());
+ return _messageFactory.createMessage(msg.getMessageTransfer());
}
// private methods
@@ -327,7 +271,7 @@
// and messages are not prefetched we then need to request another one
if(! getSession().prefetch())
{
- _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+ _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
MessageCreditUnit.MESSAGE, 1);
}
}
@@ -415,7 +359,7 @@
super.setMessageListener(messageListener);
if (messageListener != null && ! getSession().prefetch())
{
- _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+ _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
MessageCreditUnit.MESSAGE, 1);
}
if (messageListener != null && !_synchronousQueue.isEmpty())
@@ -440,7 +384,7 @@
_isStarted = true;
if (_syncReceive.get())
{
- _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+ _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
MessageCreditUnit.MESSAGE, 1);
}
}
@@ -463,7 +407,7 @@
{
if (isStrated() && ! getSession().prefetch() && _synchronousQueue.isEmpty())
{
- _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+ _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
MessageCreditUnit.MESSAGE, 1);
}
if (! getSession().prefetch())
@@ -486,4 +430,5 @@
_session.acknowledgeMessage(msg.getDeliveryTag(), false);
}
}
+
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java Fri Aug 8 11:31:18 2008
@@ -26,22 +26,14 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.QpidException;
import org.apache.qpid.client.failover.FailoverException;
-import org.apache.qpid.client.message.AbstractJMSMessage;
-import org.apache.qpid.client.message.MessageFactoryRegistry;
-import org.apache.qpid.client.message.UnprocessedMessage;
-import org.apache.qpid.client.message.AMQMessageDelegateFactory;
+import org.apache.qpid.client.message.*;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.filter.JMSSelectorFilter;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.BasicCancelBody;
-import org.apache.qpid.framing.BasicCancelOkBody;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<ContentHeaderBody,ContentBody>
+public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMessage_0_8>
{
protected final Logger _logger = LoggerFactory.getLogger(getClass());
@@ -69,7 +61,7 @@
void sendCancel() throws AMQException, FailoverException
{
- BasicCancelBody body = getSession().getMethodRegistry().createBasicCancelBody(_consumerTag, false);
+ BasicCancelBody body = getSession().getMethodRegistry().createBasicCancelBody(new AMQShortString(String.valueOf(_consumerTag)), false);
final AMQFrame cancelFrame = body.generateFrame(_channelId);
@@ -81,7 +73,7 @@
}
}
- public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, UnprocessedMessage<ContentHeaderBody, ContentBody> messageFrame)throws Exception
+ public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, UnprocessedMessage_0_8 messageFrame)throws Exception
{
return _messageFactory.createMessage(messageFrame.getDeliveryTag(),
@@ -90,4 +82,4 @@
}
-}
\ No newline at end of file
+}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java Fri Aug 8 11:31:18 2008
@@ -22,6 +22,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
+import java.nio.ByteBuffer;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -36,11 +37,8 @@
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.nclient.util.ByteBufferMessage;
import org.apache.qpid.njms.ExceptionHelper;
-import org.apache.qpid.transport.DeliveryProperties;
-import org.apache.qpid.transport.MessageDeliveryMode;
-import org.apache.qpid.transport.MessageDeliveryPriority;
-import org.apache.qpid.transport.MessageProperties;
-import org.apache.qpid.transport.ReplyTo;
+import org.apache.qpid.transport.*;
+import static org.apache.qpid.transport.Option.*;
/**
* This is a 0_10 message producer.
@@ -76,21 +74,8 @@
AMQMessageDelegate_0_10 delegate = (AMQMessageDelegate_0_10) message.getDelegate();
- org.apache.qpid.api.Message underlyingMessage = message.get010Message();
- if (underlyingMessage == null)
- {
- underlyingMessage = new ByteBufferMessage(delegate.getMessageProperties(), delegate.getDeliveryProperties());
- message.set010Message(underlyingMessage);
-
- }
- // force a rebuild of the 0-10 message if data has changed
- if (message.getData() == null)
- {
- message.dataChanged();
- }
-
- DeliveryProperties deliveryProp = underlyingMessage.getDeliveryProperties();
- MessageProperties messageProps = underlyingMessage.getMessageProperties();
+ DeliveryProperties deliveryProp = delegate.getDeliveryProperties();
+ MessageProperties messageProps = delegate.getMessageProperties();
if (messageId != null)
{
@@ -140,10 +125,10 @@
deliveryProp.setPriority(MessageDeliveryPriority.get((short) priority));
message.setJMSPriority(priority);
}
- String excahngeName = destination.getExchangeName().toString();
- if ( deliveryProp.getExchange() == null || ! deliveryProp.getExchange().equals(excahngeName))
+ String exchangeName = destination.getExchangeName().toString();
+ if ( deliveryProp.getExchange() == null || ! deliveryProp.getExchange().equals(exchangeName))
{
- deliveryProp.setExchange(excahngeName);
+ deliveryProp.setExchange(exchangeName);
}
String routingKey = destination.getRoutingKey().toString();
if (deliveryProp.getRoutingKey() == null || ! deliveryProp.getRoutingKey().equals(routingKey))
@@ -153,63 +138,27 @@
messageProps.setContentLength(message.getContentLength());
-
- /* String replyToURL = contentHeaderProperties.getReplyToAsString();
- if (replyToURL != null)
- {
- if(_logger.isDebugEnabled())
- {
- StringBuffer b = new StringBuffer();
- b.append("\n==========================");
- b.append("\nReplyTo : " + replyToURL);
- b.append("\n==========================");
- _logger.debug(b.toString());
- }
- AMQBindingURL dest;
- try
- {
- dest = new AMQBindingURL(replyToURL);
- }
- catch (URISyntaxException e)
- {
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
- }
- messageProps.setReplyTo(new ReplyTo(dest.getExchangeName().toString(), dest.getRoutingKey().toString()));
- }
-*/
-
-
// send the message
try
{
- org.apache.qpid.nclient.Session ssn = ((AMQSession_0_10) getSession()).getQpidSession();
+ org.apache.qpid.transport.Session ssn = (org.apache.qpid.transport.Session)
+ ((AMQSession_0_10) getSession()).getQpidSession();
// if true, we need to sync the delivery of this message
boolean sync = (deliveryMode == DeliveryMode.PERSISTENT &&
getSession().getAMQConnection().getSyncPersistence());
+ org.apache.mina.common.ByteBuffer data = message.getData();
+ ByteBuffer buffer = data == null ? ByteBuffer.allocate(0) : data.buf().slice();
+
+ ssn.messageTransfer(destination.getExchangeName().toString(), MessageAcceptMode.NONE,
+ MessageAcquireMode.PRE_ACQUIRED,
+ new Header(deliveryProp, messageProps),
+ buffer, sync ? SYNC : NONE);
if (sync)
{
- ssn.setAutoSync(true);
- }
- try
- {
- ssn.messageTransfer(destination.getExchangeName().toString(),
- underlyingMessage,
- ssn.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
- ssn.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE);
+ ssn.sync();
}
- finally
- {
- if (sync)
- {
- ssn.setAutoSync(false);
- }
- }
- }
- catch (IOException e)
- {
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
}
catch (RuntimeException rte)
{
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java Fri Aug 8 11:31:18 2008
@@ -32,20 +32,20 @@
* Wraps a MessageConsumer to fulfill the extended TopicSubscriber contract
*
*/
-class TopicSubscriberAdaptor implements TopicSubscriber
+class TopicSubscriberAdaptor<C extends BasicMessageConsumer> implements TopicSubscriber
{
private final Topic _topic;
- private final BasicMessageConsumer _consumer;
+ private final C _consumer;
private final boolean _noLocal;
- TopicSubscriberAdaptor(Topic topic, BasicMessageConsumer consumer, boolean noLocal)
+ TopicSubscriberAdaptor(Topic topic, C consumer, boolean noLocal)
{
_topic = topic;
_consumer = consumer;
_noLocal = noLocal;
}
- TopicSubscriberAdaptor(Topic topic, BasicMessageConsumer consumer)
+ TopicSubscriberAdaptor(Topic topic, C consumer)
{
this(topic, consumer, consumer.isNoLocal());
}
@@ -103,7 +103,7 @@
}
private void checkPreConditions() throws javax.jms.IllegalStateException{
- BasicMessageConsumer msgConsumer = (BasicMessageConsumer)_consumer;
+ C msgConsumer = _consumer;
if (msgConsumer.isClosed() ){
throw new javax.jms.IllegalStateException("Consumer is closed");
@@ -120,7 +120,7 @@
}
}
- BasicMessageConsumer getMessageConsumer()
+ C getMessageConsumer()
{
return _consumer;
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java Fri Aug 8 11:31:18 2008
@@ -43,13 +43,12 @@
throws AMQException
{
final UnprocessedMessage_0_8 msg = new UnprocessedMessage_0_8(
- channelId,
body.getDeliveryTag(),
- body.getConsumerTag(),
+ body.getConsumerTag().toIntValue(),
body.getExchange(),
body.getRoutingKey(),
body.getRedelivered());
_logger.debug("New JmsDeliver method received:" + session);
- session.unprocessedMessageReceived(msg);
+ session.unprocessedMessageReceived(channelId, msg);
}
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java Fri Aug 8 11:31:18 2008
@@ -45,14 +45,14 @@
throws AMQException
{
_logger.debug("New JmsBounce method received");
- final ReturnMessage msg = new ReturnMessage(channelId,
+ final ReturnMessage msg = new ReturnMessage(
body.getExchange(),
body.getRoutingKey(),
body.getReplyText(),
body.getReplyCode()
);
- session.unprocessedMessageReceived(msg);
+ session.unprocessedMessageReceived(channelId, msg);
}
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java Fri Aug 8 11:31:18 2008
@@ -22,19 +22,14 @@
package org.apache.qpid.client.message;
import org.apache.commons.collections.map.ReferenceMap;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.CustomJMSXProperty;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.client.AMQUndefinedDestination;
-import org.apache.qpid.client.JMSAMQException;
+import org.apache.qpid.client.*;
import org.apache.qpid.framing.ContentHeaderProperties;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQPInvalidClassException;
+import org.apache.qpid.nclient.*;
import org.apache.qpid.jms.Message;
import org.apache.qpid.url.BindingURL;
import org.apache.qpid.url.AMQBindingURL;
@@ -47,8 +42,10 @@
import javax.jms.MessageFormatException;
import javax.jms.DeliveryMode;
import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
import java.net.URISyntaxException;
import java.nio.charset.Charset;
+import org.apache.qpid.exchange.ExchangeDefaults;
public class AMQMessageDelegate_0_10 implements AMQMessageDelegate
{
@@ -68,27 +65,32 @@
private AMQSession _session;
private final long _deliveryTag;
- private static Map<String,Integer> _exchangeTypeMap = new HashMap<String, Integer>();
+ private static Map<AMQShortString,Integer> _exchangeTypeMap = new ConcurrentHashMap<AMQShortString, Integer>();
+ private static Map<String,Integer> _exchangeTypeStringMap = new ConcurrentHashMap<String, Integer>();
+ private static Map<String, Integer> _exchangeTypeToDestinationType = new ConcurrentHashMap<String, Integer>();;
static
{
- // TODO - XXX - Need to add to this map when we find an exchange we don't know about
+ _exchangeTypeMap.put(ExchangeDefaults.DIRECT_EXCHANGE_NAME, AMQDestination.QUEUE_TYPE);
+ _exchangeTypeMap.put(AMQShortString.EMPTY_STRING, AMQDestination.QUEUE_TYPE);
+ _exchangeTypeMap.put(ExchangeDefaults.TOPIC_EXCHANGE_NAME, AMQDestination.TOPIC_TYPE);
+ _exchangeTypeMap.put(ExchangeDefaults.FANOUT_EXCHANGE_NAME, AMQDestination.TOPIC_TYPE);
+
+ _exchangeTypeStringMap.put(ExchangeDefaults.DIRECT_EXCHANGE_NAME.toString(), AMQDestination.QUEUE_TYPE);
+ _exchangeTypeStringMap.put("", AMQDestination.QUEUE_TYPE);
+ _exchangeTypeStringMap.put(ExchangeDefaults.TOPIC_EXCHANGE_NAME.toString(), AMQDestination.TOPIC_TYPE);
+ _exchangeTypeStringMap.put(ExchangeDefaults.FANOUT_EXCHANGE_NAME.toString(), AMQDestination.TOPIC_TYPE);
- _exchangeTypeMap.put(null, AMQDestination.QUEUE_TYPE);
- _exchangeTypeMap.put("amq.direct", AMQDestination.QUEUE_TYPE);
- _exchangeTypeMap.put("", AMQDestination.QUEUE_TYPE);
- _exchangeTypeMap.put("amq.topic", AMQDestination.TOPIC_TYPE);
- _exchangeTypeMap.put("amq.fanout", AMQDestination.TOPIC_TYPE);
-
+ _exchangeTypeToDestinationType.put(ExchangeDefaults.DIRECT_EXCHANGE_CLASS.toString(), AMQDestination.QUEUE_TYPE);
+ _exchangeTypeToDestinationType.put(ExchangeDefaults.TOPIC_EXCHANGE_NAME.toString(), AMQDestination.TOPIC_TYPE);
+ _exchangeTypeToDestinationType.put(ExchangeDefaults.FANOUT_EXCHANGE_NAME.toString(), AMQDestination.TOPIC_TYPE);
}
protected AMQMessageDelegate_0_10()
{
this(new MessageProperties(), new DeliveryProperties(), -1);
_readableProperties = false;
-
-
}
protected AMQMessageDelegate_0_10(long deliveryTag, MessageProperties messageProps, DeliveryProperties deliveryProps, AMQShortString exchange,
@@ -99,13 +101,68 @@
AMQDestination dest;
- dest = new AMQUndefinedDestination(exchange, routingKey, null);
-
- // Destination dest = AMQDestination.createDestination(url);
+ dest = generateDestination(exchange, routingKey);
setJMSDestination(dest);
+ }
+
+ private AMQDestination generateDestination(AMQShortString exchange, AMQShortString routingKey)
+ {
+ AMQDestination dest;
+ switch(getExchangeType(exchange))
+ {
+ case AMQDestination.QUEUE_TYPE:
+ dest = new AMQQueue(exchange, routingKey, routingKey);
+ break;
+ case AMQDestination.TOPIC_TYPE:
+ dest = new AMQTopic(exchange, routingKey, null);
+ break;
+ default:
+ dest = new AMQUndefinedDestination(exchange, routingKey, null);
+
+ }
+
+ return dest;
+ }
+
+ private int getExchangeType(AMQShortString exchange)
+ {
+ Integer type = _exchangeTypeMap.get(exchange == null ? AMQShortString.EMPTY_STRING : exchange);
+
+ if(type == null)
+ {
+ return AMQDestination.UNKNOWN_TYPE;
+ }
+
+
+ return type;
+ }
+ public static void updateExchangeTypeMapping(Header header, org.apache.qpid.nclient.Session session)
+ {
+ DeliveryProperties deliveryProps = header.get(DeliveryProperties.class);
+ if(deliveryProps != null)
+ {
+ String exchange = deliveryProps.getExchange();
+
+ if(exchange != null && !_exchangeTypeStringMap.containsKey(exchange))
+ {
+
+ AMQShortString exchangeShortString = new AMQShortString(exchange);
+ Future<ExchangeQueryResult> future =
+ session.exchangeQuery(exchange.toString());
+ ExchangeQueryResult res = future.get();
+ Integer type = _exchangeTypeToDestinationType.get(res.getType());
+ if(type == null)
+ {
+ type = AMQDestination.UNKNOWN_TYPE;
+ }
+ _exchangeTypeStringMap.put(exchange, type);
+ _exchangeTypeMap.put(exchangeShortString, type);
+
+ }
+ }
}
protected AMQMessageDelegate_0_10(MessageProperties messageProps, DeliveryProperties deliveryProps, long deliveryTag)
@@ -212,19 +269,10 @@
String exchange = replyTo.getExchange();
String routingKey = replyTo.getRoutingKey();
- int type = _exchangeTypeMap.get(exchange);
+ dest = generateDestination(exchange == null ? null : new AMQShortString(exchange),
+ routingKey == null ? null : new AMQShortString(routingKey));
+
- switch(type)
- {
- case AMQDestination.QUEUE_TYPE:
- dest = new AMQQueue(new AMQShortString(exchange), new AMQShortString(routingKey), new AMQShortString(routingKey));
- break;
- case AMQDestination.TOPIC_TYPE:
- dest = new AMQTopic(new AMQShortString(exchange), new AMQShortString(routingKey), null);
- break;
- default:
- dest = new AMQUndefinedDestination(new AMQShortString(exchange), new AMQShortString(routingKey), null);
- }
@@ -897,4 +945,5 @@
{
return _deliveryProps;
}
+
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java Fri Aug 8 11:31:18 2008
@@ -44,17 +44,11 @@
protected boolean _readableMessage = false;
protected boolean _changedData = true;
-
- /**
- * This is 0_10 specific
- */
- private org.apache.qpid.api.Message _010message = null;
/** If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required */
-
private AMQMessageDelegate _delegate;
private boolean _redelivered;
@@ -454,52 +448,10 @@
else
{
_data.flip();
- dataChanged();
_changedData = false;
}
}
- public void set010Message(org.apache.qpid.api.Message m )
- {
- _010message = m;
- }
-
- public void dataChanged()
- {
- if (_010message != null)
- {
- _010message.clearData();
- try
- {
- if (_data != null)
- {
- _010message.appendData(_data.buf().slice());
- }
- else
- {
- _010message.appendData(java.nio.ByteBuffer.allocate(0));
- }
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- }
- }
-
- /**
- * End 010 specific
- */
-
- public org.apache.qpid.api.Message get010Message()
- {
- return _010message;
- }
-
-
-
-
-
public int getContentLength()
{
if(_data != null)
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java Fri Aug 8 11:31:18 2008
@@ -110,17 +110,17 @@
protected AbstractJMSMessage create010MessageWithBody(long messageNbr, Struct[] contentHeader,
- List bodies) throws AMQException
+ java.nio.ByteBuffer body) throws AMQException
{
ByteBuffer data;
final boolean debug = _logger.isDebugEnabled();
- // we optimise the non-fragmented case to avoid copying
- if ((bodies != null))
+
+ if (body != null)
{
- data = ByteBuffer.wrap((java.nio.ByteBuffer) bodies.get(0));
+ data = ByteBuffer.wrap(body);
}
- else // bodies == null
+ else // body == null
{
data = ByteBuffer.allocate(0);
}
@@ -164,11 +164,11 @@
}
public AbstractJMSMessage createMessage(long messageNbr, boolean redelivered, Struct[] contentHeader,
- List bodies)
+ java.nio.ByteBuffer body)
throws JMSException, AMQException
{
final AbstractJMSMessage msg =
- create010MessageWithBody(messageNbr, contentHeader, bodies);
+ create010MessageWithBody(messageNbr, contentHeader, body);
msg.setJMSRedelivered(redelivered);
msg.receivedFromServer();
return msg;
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/CloseConsumerMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/CloseConsumerMessage.java?rev=684036&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/CloseConsumerMessage.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/CloseConsumerMessage.java Fri Aug 8 11:31:18 2008
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.message;
+
+import org.apache.qpid.client.BasicMessageConsumer;
+
+public final class CloseConsumerMessage extends UnprocessedMessage
+{
+
+ public CloseConsumerMessage(BasicMessageConsumer consumer)
+ {
+ super(consumer.getConsumerTag());
+ }
+
+
+ public long getDeliveryTag()
+ {
+ return 0;
+ }
+
+ public boolean isRedelivered()
+ {
+ return false;
+ }
+}