You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2007/08/22 01:16:45 UTC

svn commit: r568321 - in /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity: client/Session.java client/impl/ClientSession.java client/impl/ClientSessionDelegate.java jms/MessageConsumerImpl.java jms/QpidMessageListener.java

Author: rajith
Date: Tue Aug 21 16:16:44 2007
New Revision: 568321

URL: http://svn.apache.org/viewvc?rev=568321&view=rev
Log:
I added support in the JMS layer to figure out if it received any messages after calling flush

Modified:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java?rev=568321&r1=568320&r2=568321&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java Tue Aug 21 16:16:44 2007
@@ -233,7 +233,7 @@
      * published them.
      * <li>{@link Option#EXCLUSIVE}: <p> Request exclusive subscription access, meaning only this
      * ubscription can access the queue.
-     * <li>{@link Option#NO_OPTION}: <p> Has no effect as it represents an “empty” option.
+     * <li>{@link Option#NO_OPTION}: <p> Has no effect as it represents an �empty� option.
      * </ul>
      *
      * @param queue       The queue this receiver is receiving messages from.
@@ -423,16 +423,6 @@
      */
     public void messageRelease(RangeSet ranges);
 
-
-    /**
-     * Returns the number of message received for this session since
-     * {@link Session#messageFlow} has bee invoked.
-     *
-     * @return The number of message received for this session since
-     *         {@link Session#messageFlow} has bee invoked.
-     */
-    public int messagesReceived();
-
     // -----------------------------------------------
     //            Local transaction methods
     //  ----------------------------------------------
@@ -478,7 +468,7 @@
      * declaring connection closes.
      * <li> {@link Option#PASSIVE}: <p> If set, the server will not create the queue.
      * This field allows the client to assert the presence of a queue without modifying the server state.
-     * <li>{@link Option#NO_OPTION}: <p> Has no effect as it represents an “empty” option.
+     * <li>{@link Option#NO_OPTION}: <p> Has no effect as it represents an �empty� option.
      * </ul>
      * <p>In the absence of a particular option, the defaul value is false for each option
      *
@@ -540,7 +530,7 @@
      * <li> {@link Option#IF_EMPTY}: <p>  If set, the server will only delete the queue if it has no messages.
      * <li> {@link Option#IF_UNUSED}: <p> If set, the server will only delete the queue if it has no consumers.
      * If the queue has consumers the server does does not delete it but raises a channel exception instead.
-     * <li>{@link Option#NO_OPTION}: <p> Has no effect as it represents an “empty” option.
+     * <li>{@link Option#NO_OPTION}: <p> Has no effect as it represents an �empty� option.
      * </ul>
      * </p>
      * <p/>
@@ -569,7 +559,7 @@
      * exchanges) are purged if/when a server restarts.
      * <li> {@link Option#PASSIVE}: <p> If set, the server will not create the exchange.
      * The client can use this to check whether an exchange exists without modifying the server state.
-     * <li> {@link Option#NO_OPTION}: <p> Has no effect as it represents an “empty” option.
+     * <li> {@link Option#NO_OPTION}: <p> Has no effect as it represents an �empty� option.
      * </ul>
      * <p>In the absence of a particular option, the defaul value is false for each option</p>
      *
@@ -596,7 +586,7 @@
      * <li> {@link Option#IF_UNUSED}: <p> If set, the server will only delete the exchange if it has no queue bindings. If the
      * exchange has queue bindings the server does not delete it but raises a channel exception
      * instead.
-     * <li> {@link Option#NO_OPTION}: <p> Has no effect as it represents an “empty” option.
+     * <li> {@link Option#NO_OPTION}: <p> Has no effect as it represents an �empty� option.
      * </ul>
      * <p>In the absence of a particular option, the defaul value is false for each option
      *

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java?rev=568321&r1=568320&r2=568321&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java Tue Aug 21 16:16:44 2007
@@ -22,19 +22,7 @@
     private ExceptionListener _exceptionListner;
     private RangeSet _acquiredMessages;
     private RangeSet _rejectedMessages;
-
-
-    public int messagesReceived()
-    {
-        // TODO
-        return 1;
-    }
-
-    @Override public void sessionClose()
-    {
-        super.sessionClose();
-    }
-    
+        
     public void messageAcknowledge(RangeSet ranges)
     {
         for (Range range : ranges)
@@ -97,6 +85,10 @@
     
     public void setMessageListener(String destination, MessagePartListener listener)
     {
+        if (listener == null)
+        {
+            throw new IllegalArgumentException("Cannot set message listener to null");
+        }
         _messageListeners.put(destination, listener);       
     }
     
@@ -104,8 +96,6 @@
     {
         _exceptionListner = exceptionListner;        
     }   
-    
-    // ugly but nessacery
     
     void setAccquiredMessages(RangeSet acquiredMessages)
     {

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java?rev=568321&r1=568320&r2=568321&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java Tue Aug 21 16:16:44 2007
@@ -9,7 +9,6 @@
 import org.apache.qpidity.MessageTransfer;
 import org.apache.qpidity.QpidException;
 import org.apache.qpidity.Range;
-import org.apache.qpidity.RangeSet;
 import org.apache.qpidity.Session;
 import org.apache.qpidity.SessionClosed;
 import org.apache.qpidity.SessionDelegate;
@@ -54,16 +53,7 @@
         _currentTransfer = currentTransfer;
         _currentMessageListener = ((ClientSession)session).getMessageListerners().get(currentTransfer.getDestination());
         _currentMessageListener.messageTransfer(currentTransfer.getId());
-        
-        //a better way is to tell the broker to stop the transfer
-        if (_currentMessageListener == null && _currentTransfer.getAcquireMode() == 1)
-        {
-            RangeSet transfers = new RangeSet();
-            transfers.add(_currentTransfer.getId());            
-            session.messageRelease(transfers);
-        }
     }
-    
     
     @Override public void messageReject(Session session, MessageReject struct) 
     {

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java?rev=568321&r1=568320&r2=568321&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java Tue Aug 21 16:16:44 2007
@@ -17,6 +17,8 @@
  */
 package org.apache.qpidity.jms;
 
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.qpidity.jms.message.QpidMessage;
 import org.apache.qpidity.RangeSet;
 import org.apache.qpidity.QpidException;
@@ -94,6 +96,8 @@
      * Nether exceed MAX_MESSAGE_TRANSFERRED
      */
     private int _messageAsyncrhonouslyReceived = 0;
+    
+    private AtomicBoolean _messageReceived = new AtomicBoolean();
 
     //----- Constructors
     /**
@@ -354,7 +358,6 @@
             // This indicate to the delivery thread to deliver the message to this consumer
             // as it can happens that a message is delivered after a receive operation as returned.
             _isReceiving = true;
-            int received = 0;
             if (!_isStopped)
             {
                 // if this consumer is stopped then this will be call when starting
@@ -362,10 +365,13 @@
                         .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE,
                                      1);
                 getSession().getQpidSession().messageFlush(getMessageActorID());
+                _messageReceived.set(false);
+                
+                //When sync() returns we know whether we have received a message or not.
                 getSession().getQpidSession().sync();
-                received = getSession().getQpidSession().messagesReceived();
+                //received = getSession().getQpidSession().messagesReceived();
             }
-            if (received == 0 && timeout < 0)
+            if (_messageReceived.get() && timeout < 0)
             {
                 // this is a nowait and we havent received a message then we must immediatly return
                 result = null;
@@ -501,9 +507,12 @@
                                     .messageFlow(getMessageActorID(),
                                                  org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
                             getSession().getQpidSession().messageFlush(getMessageActorID());
-                            getSession().getQpidSession().sync();
-                            int received = getSession().getQpidSession().messagesReceived();
-                            if (received == 0 && _isNoWaitIsReceiving)
+                            _messageReceived.set(false);
+                            
+                            // When sync() returns we know whether we have received a message or not.
+                            getSession().getQpidSession().sync();                       
+                            
+                            if (_messageReceived.get()  && _isNoWaitIsReceiving)
                             {
                                 // Right a message nowait is waiting for a message
                                 // but no one can be delivered it then need to return
@@ -631,5 +640,10 @@
             getSession().getQpidSession().messageAcknowledge(ranges);
             getSession().testQpidException();
         }
+    }
+    
+    public void notifyMessageReceived()
+    {
+        _messageReceived.set(true);
     }
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java?rev=568321&r1=568320&r2=568321&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java Tue Aug 21 16:16:44 2007
@@ -62,6 +62,9 @@
     {
         try
         {
+            // to be used with flush
+            _consumer.notifyMessageReceived();
+            
             //convert this message into a JMS one
             QpidMessage jmsMessage = MessageFactory.getQpidMessage(message);
             // if consumer is asynchronous then send this message to its session.