You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ar...@apache.org on 2008/03/07 15:42:06 UTC

svn commit: r634696 - in /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client: AMQSession_0_10.java BasicMessageConsumer_0_10.java

Author: arnaudsimon
Date: Fri Mar  7 06:42:02 2008
New Revision: 634696

URL: http://svn.apache.org/viewvc?rev=634696&view=rev
Log:
Removed redundant code (see QPID-838). As this is a major 0.10 change tck has been run prior to committing it.

Modified:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=634696&r1=634695&r2=634696&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Fri Mar  7 06:42:02 2008
@@ -249,24 +249,6 @@
         getCurrentException();
     }
 
-    /**
-     * We need to release message that may be pre-fetched in the local queue
-     *
-     * @throws JMSException
-     */
-    public void close() throws JMSException
-    {
-        super.close();
-        // We need to release pre-fetched messages
-        Iterator messages=_queue.iterator();
-        while (messages.hasNext())
-        {
-            UnprocessedMessage message=(UnprocessedMessage) messages.next();
-            messages.remove();
-            rejectMessage(message, true);
-        }
-    }
-
 
     /**
      * Commit the receipt and the delivery of all messages exchanged by this session resources.
@@ -426,7 +408,8 @@
         getQpidSession().messageFlowMode(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_WINDOW);
         getQpidSession().messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF);
         // We need to sync so that we get notify of an error.
-        if(consumer.isStrated())
+        // only if not immediat prefetch
+        if(consumer.isStrated() || _immediatePrefetch)
         {
             // set the flow
             getQpidSession().messageFlow(consumer.getConsumerTag().toString(),
@@ -654,21 +637,14 @@
 
     void start() throws AMQException
     {
-        suspendChannel(false);
+        super.start();
         for(BasicMessageConsumer  c:  _consumers.values())
         {
               c.start();
         }
-        // If the event dispatcher is not running then start it too.
-        if (hasMessageListeners())
-        {
-            startDistpatcherIfNecessary();
-        }
     }
 
 
-
-
     void stop() throws AMQException
     {
         super.stop();
@@ -678,27 +654,7 @@
         }
     }
 
-   synchronized void startDistpatcherIfNecessary()
-    {
-        // If IMMEDIATE_PREFETCH is not set then we need to start fetching
-        if (!_immediatePrefetch)
-        {
-            // We do this now if this is the first call on a started connection
-            if (isSuspended() &&  _firstDispatcher.getAndSet(false))
-            {
-                try
-                {
-                    suspendChannel(false);
-                }
-                catch (AMQException e)
-                {
-                    _logger.info("Unsuspending channel threw an exception:" + e);
-                }
-            }
-        }
-
-        startDistpatcherIfNecessary(false);
-    }
+ 
 
 
     public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=634696&r1=634695&r2=634696&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Fri Mar  7 06:42:02 2008
@@ -429,40 +429,14 @@
     public void setMessageListener(final MessageListener messageListener) throws JMSException
     {
         super.setMessageListener(messageListener);
-        if (messageListener == null)
+        if (messageListener != null && !_synchronousQueue.isEmpty())
         {
-           /* _0_10session.getQpidSession().messageStop(getConsumerTag().toString());
-            _0_10session.getQpidSession()
-                    .messageFlowMode(getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_CREDIT);
-            _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
-                                                      org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_BYTE,
-                                                      0xFFFFFFFF);
-            _0_10session.getQpidSession().sync();
-            */
-        }
-        else
-        {
-            if(! _synchronousQueue.isEmpty())
-            {
-                Iterator messages=_synchronousQueue.iterator();
-                while (messages.hasNext())
-                {
-                    AbstractJMSMessage message=(AbstractJMSMessage) messages.next();
-                    messages.remove();
-                    _session.rejectMessage(message, true);
-                }
-            }
-            if (_connection.started())
+            Iterator messages=_synchronousQueue.iterator();
+            while (messages.hasNext())
             {
-                _0_10session.getQpidSession()
-                        .messageFlowMode(getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_WINDOW);
-                _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
-                                                          org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,
-                                                          AMQSession_0_10.MAX_PREFETCH);
-                _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
-                                                          org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_BYTE,
-                                                          0xFFFFFFFF);
-                _0_10session.getQpidSession().sync();
+                AbstractJMSMessage message=(AbstractJMSMessage) messages.next();
+                messages.remove();
+                _session.rejectMessage(message, true);
             }
         }
     }
@@ -482,16 +456,4 @@
         _isStarted = false;
     }
 
-    public void close() throws JMSException
-    {
-        super.close();
-        // release message that may be staged
-        Iterator messages=_synchronousQueue.iterator();
-        while (messages.hasNext())
-        {
-            AbstractJMSMessage message=(AbstractJMSMessage) messages.next();
-            messages.remove();
-            _session.rejectMessage(message, true);
-        }
-    }
 }