You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/01/23 23:32:52 UTC
svn commit: r499165 - in
/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client:
AMQSession.java BasicMessageConsumer.java
Author: ritchiem
Date: Tue Jan 23 14:32:51 2007
New Revision: 499165
URL: http://svn.apache.org/viewvc?view=rev&rev=499165
Log:
QPID-103 Implemented support for MessageListener in AMQSession.
Required configuring an Asynchronous performance test.
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=499165&r1=499164&r2=499165
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Tue Jan 23 14:32:51 2007
@@ -67,6 +67,8 @@
private int _defaultPrefetchHighMark = DEFAULT_PREFETCH_HIGH_MARK;
private int _defaultPrefetchLowMark = DEFAULT_PREFETCH_LOW_MARK;
+ private MessageListener _messageListener = null;
+
/**
* Used to reference durable subscribers so they requests for unsubscribe can be handled
* correctly. Note this only keeps a record of subscriptions which have been created
@@ -852,13 +854,37 @@
public MessageListener getMessageListener() throws JMSException
{
checkNotClosed();
- throw new java.lang.UnsupportedOperationException("MessageListener interface not supported");
+ return _messageListener;
}
public void setMessageListener(MessageListener listener) throws JMSException
{
checkNotClosed();
- throw new java.lang.UnsupportedOperationException("MessageListener interface not supported");
+
+ if (!isStopped())
+ {
+ throw new javax.jms.IllegalStateException("Attempt to set listener while session is started.");
+ }
+
+ // We are stopped
+ for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
+ {
+ BasicMessageConsumer consumer = i.next();
+
+ if (consumer.isReceiving())
+ {
+ throw new javax.jms.IllegalStateException("Another thread is already receiving synchronously.");
+ }
+ }
+
+ _messageListener = listener;
+
+ for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
+ {
+ i.next().setMessageListener(_messageListener);
+ }
+
+
}
public void run()
@@ -1067,6 +1093,7 @@
{
checkTemporaryDestination(destination);
+
return (org.apache.qpid.jms.MessageConsumer) new FailoverSupport()
{
public Object operation() throws JMSException
@@ -1089,6 +1116,11 @@
protocolHandler, ft, prefetchHigh, prefetchLow, exclusive,
_acknowledgeMode, noConsume, autoClose);
+ if (_messageListener != null)
+ {
+ consumer.setMessageListener(_messageListener);
+ }
+
try
{
registerConsumer(consumer, false);
@@ -1736,19 +1768,21 @@
*/
void deregisterConsumer(BasicMessageConsumer consumer)
{
- _consumers.remove(consumer.getConsumerTag());
- String subscriptionName = _reverseSubscriptionMap.remove(consumer);
- if (subscriptionName != null)
+ if (_consumers.remove(consumer.getConsumerTag()) != null)
{
- _subscriptions.remove(subscriptionName);
- }
+ String subscriptionName = _reverseSubscriptionMap.remove(consumer);
+ if (subscriptionName != null)
+ {
+ _subscriptions.remove(subscriptionName);
+ }
- Destination dest = consumer.getDestination();
- synchronized (dest)
- {
- if (_destinationConsumerCount.get(dest).decrementAndGet() == 0)
+ Destination dest = consumer.getDestination();
+ synchronized (dest)
{
- _destinationConsumerCount.remove(dest);
+ if (_destinationConsumerCount.get(dest).decrementAndGet() == 0)
+ {
+ _destinationConsumerCount.remove(dest);
+ }
}
}
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?view=diff&rev=499165&r1=499164&r2=499165
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Tue Jan 23 14:32:51 2007
@@ -221,7 +221,10 @@
if (_session.isStopped())
{
_messageListener.set(messageListener);
- _logger.debug("Session stopped : Message listener set for destination " + _destination);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Session stopped : Message listener(" + messageListener + ") set for destination " + _destination);
+ }
}
else
{
@@ -258,10 +261,10 @@
// Set Message Listener
_logger.debug("Set Message Listener");
- _messageListener.set(messageListener);
+ _messageListener.set(messageListener);
}
}
- );
+ );
}
}
}
@@ -328,6 +331,11 @@
public boolean isExclusive()
{
return _exclusive;
+ }
+
+ public boolean isReceiving()
+ {
+ return _receiving.get();
}
public Message receive() throws JMSException