You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ar...@apache.org on 2008/03/07 15:42:06 UTC
svn commit: r634696 - in
/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client:
AMQSession_0_10.java BasicMessageConsumer_0_10.java
Author: arnaudsimon
Date: Fri Mar 7 06:42:02 2008
New Revision: 634696
URL: http://svn.apache.org/viewvc?rev=634696&view=rev
Log:
Removed redundant code (see QPID-838). As this is a major 0.10 change tck has been run prior to committing it.
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=634696&r1=634695&r2=634696&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Fri Mar 7 06:42:02 2008
@@ -249,24 +249,6 @@
getCurrentException();
}
- /**
- * We need to release message that may be pre-fetched in the local queue
- *
- * @throws JMSException
- */
- public void close() throws JMSException
- {
- super.close();
- // We need to release pre-fetched messages
- Iterator messages=_queue.iterator();
- while (messages.hasNext())
- {
- UnprocessedMessage message=(UnprocessedMessage) messages.next();
- messages.remove();
- rejectMessage(message, true);
- }
- }
-
/**
* Commit the receipt and the delivery of all messages exchanged by this session resources.
@@ -426,7 +408,8 @@
getQpidSession().messageFlowMode(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_WINDOW);
getQpidSession().messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF);
// We need to sync so that we get notify of an error.
- if(consumer.isStrated())
+ // only if not immediat prefetch
+ if(consumer.isStrated() || _immediatePrefetch)
{
// set the flow
getQpidSession().messageFlow(consumer.getConsumerTag().toString(),
@@ -654,21 +637,14 @@
void start() throws AMQException
{
- suspendChannel(false);
+ super.start();
for(BasicMessageConsumer c: _consumers.values())
{
c.start();
}
- // If the event dispatcher is not running then start it too.
- if (hasMessageListeners())
- {
- startDistpatcherIfNecessary();
- }
}
-
-
void stop() throws AMQException
{
super.stop();
@@ -678,27 +654,7 @@
}
}
- synchronized void startDistpatcherIfNecessary()
- {
- // If IMMEDIATE_PREFETCH is not set then we need to start fetching
- if (!_immediatePrefetch)
- {
- // We do this now if this is the first call on a started connection
- if (isSuspended() && _firstDispatcher.getAndSet(false))
- {
- try
- {
- suspendChannel(false);
- }
- catch (AMQException e)
- {
- _logger.info("Unsuspending channel threw an exception:" + e);
- }
- }
- }
-
- startDistpatcherIfNecessary(false);
- }
+
public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=634696&r1=634695&r2=634696&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Fri Mar 7 06:42:02 2008
@@ -429,40 +429,14 @@
public void setMessageListener(final MessageListener messageListener) throws JMSException
{
super.setMessageListener(messageListener);
- if (messageListener == null)
+ if (messageListener != null && !_synchronousQueue.isEmpty())
{
- /* _0_10session.getQpidSession().messageStop(getConsumerTag().toString());
- _0_10session.getQpidSession()
- .messageFlowMode(getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_CREDIT);
- _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
- org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_BYTE,
- 0xFFFFFFFF);
- _0_10session.getQpidSession().sync();
- */
- }
- else
- {
- if(! _synchronousQueue.isEmpty())
- {
- Iterator messages=_synchronousQueue.iterator();
- while (messages.hasNext())
- {
- AbstractJMSMessage message=(AbstractJMSMessage) messages.next();
- messages.remove();
- _session.rejectMessage(message, true);
- }
- }
- if (_connection.started())
+ Iterator messages=_synchronousQueue.iterator();
+ while (messages.hasNext())
{
- _0_10session.getQpidSession()
- .messageFlowMode(getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_WINDOW);
- _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
- org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,
- AMQSession_0_10.MAX_PREFETCH);
- _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
- org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_BYTE,
- 0xFFFFFFFF);
- _0_10session.getQpidSession().sync();
+ AbstractJMSMessage message=(AbstractJMSMessage) messages.next();
+ messages.remove();
+ _session.rejectMessage(message, true);
}
}
}
@@ -482,16 +456,4 @@
_isStarted = false;
}
- public void close() throws JMSException
- {
- super.close();
- // release message that may be staged
- Iterator messages=_synchronousQueue.iterator();
- while (messages.hasNext())
- {
- AbstractJMSMessage message=(AbstractJMSMessage) messages.next();
- messages.remove();
- _session.rejectMessage(message, true);
- }
- }
}