You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ar...@apache.org on 2007/08/20 14:50:20 UTC

svn commit: r567674 - in /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity: client/Session.java client/impl/ClientSession.java jms/MessageConsumerImpl.java jms/SessionImpl.java jms/message/QpidMessage.java

Author: arnaudsimon
Date: Mon Aug 20 05:50:19 2007
New Revision: 567674

URL: http://svn.apache.org/viewvc?rev=567674&view=rev
Log:
added sync

Modified:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java?rev=567674&r1=567673&r2=567674&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java Mon Aug 20 05:50:19 2007
@@ -326,8 +326,7 @@
     /**
      * Forces the broker to exhaust its credit supply.
      * <p> The broker's credit will always be zero when
-     * this method completes. This method does not complete until all the message transfers occur.
-     * <p> This method returns the number of flushed messages.
+     * this method completes.
      *
      * @param destination The destination to call flush on.
      */
@@ -424,6 +423,16 @@
      */
     public void messageRelease(RangeSet ranges);
 
+
+    /**
+     * Returns the number of message received for this session since
+     * {@link Session#messageFlow} has bee invoked.
+     *
+     * @return The number of message received for this session since
+     *         {@link Session#messageFlow} has bee invoked.
+     */
+    public int messagesReceived();
+
     // -----------------------------------------------
     //            Local transaction methods
     //  ----------------------------------------------
@@ -568,7 +577,7 @@
      * @param type              Each exchange belongs to one of a set of exchange types implemented by the server. The
      *                          exchange types define the functionality of the exchange - i.e. how messages are routed
      *                          through it. It is not valid or meaningful to attempt to change the type of an existing
-     *                          exchange. Default exchange types are: direct, topic, headers and fanout.    
+     *                          exchange. Default exchange types are: direct, topic, headers and fanout.
      * @param alternateExchange In the event that a message cannot be routed, this is the name of the exchange to which
      *                          the message will be sent.
      * @param options           Set of options (valid options are: {@link Option#AUTO_DELETE}, {@link Option#DURABLE},

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java?rev=567674&r1=567673&r2=567674&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java Mon Aug 20 05:50:19 2007
@@ -25,7 +25,14 @@
     private ExceptionListener _exceptionListner;
     private RangeSet _acquiredMessages;
     private RangeSet _rejectedMessages;
-    
+
+
+    public int messagesReceived()
+    {
+        // TODO
+        return 1;
+    }
+
     @Override public void sessionClose()
     {
         super.sessionClose();

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java?rev=567674&r1=567673&r2=567674&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java Mon Aug 20 05:50:19 2007
@@ -130,7 +130,8 @@
         {
             // this is a queue we expect that this queue exists
             getSession().getQpidSession()
-                    .messageSubscribe(destination.getQpidQueueName(), getMessageActorID(),
+                    .messageSubscribe(destination.getQpidQueueName(), // queue
+                                      getMessageActorID(), // destination
                                       org.apache.qpidity.client.Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
                                       // When the message selctor is set we do not acquire the messages
                                       _messageSelector != null ? org.apache.qpidity.client.Session.TRANSFER_ACQUIRE_MODE_NO_ACQUIRE : org.apache.qpidity.client.Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE,
@@ -156,8 +157,7 @@
             else
             {
                 // this is a non durable subscriber
-                // create a temporary queue
-                queueName = "topic-" + getMessageActorID();
+                queueName = destination.getQpidQueueName();
                 getSession().getQpidSession()
                         .queueDeclare(queueName, null, null, Option.AUTO_DELETE, Option.EXCLUSIVE);
             }
@@ -169,8 +169,8 @@
                     .messageSubscribe(queueName, getMessageActorID(),
                                       org.apache.qpidity.client.Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
                                       // We always acquire the messages
-                                      org.apache.qpidity.client.Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE, messageAssembler, null,
-                                      _noLocal ? Option.NO_LOCAL : Option.NO_OPTION,
+                                      org.apache.qpidity.client.Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE,
+                                      messageAssembler, null, _noLocal ? Option.NO_LOCAL : Option.NO_OPTION,
                                       // Request exclusive subscription access, meaning only this subscription
                                       // can access the queue.
                                       Option.EXCLUSIVE);
@@ -179,6 +179,12 @@
         // set the flow mode
         getSession().getQpidSession()
                 .messageFlowMode(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_MODE_CREDIT);
+        getSession().getQpidSession().sync();
+        // check for an exception
+        if (getSession().getCurrentException() != null)
+        {
+            throw getSession().getCurrentException();
+        }
     }
 
     //----- Message consumer API
@@ -353,11 +359,13 @@
             {
                 // if this consumer is stopped then this will be call when starting
                 getSession().getQpidSession()
-                        .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
+                        .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE,
+                                     1);
                 getSession().getQpidSession().messageFlush(getMessageActorID());
-               // received = getSession().getQpidSession().
+                getSession().getQpidSession().sync();
+                received = getSession().getQpidSession().messagesReceived();
             }
-            if ( received == 0 && timeout < 0)
+            if (received == 0 && timeout < 0)
             {
                 // this is a nowait and we havent received a message then we must immediatly return
                 result = null;
@@ -425,7 +433,8 @@
                 // there is a synch call waiting for a message to be delivered
                 // so tell the broker to deliver a message
                 getSession().getQpidSession()
-                        .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
+                        .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE,
+                                     1);
                 getSession().getQpidSession().messageFlush(getMessageActorID());
             }
         }
@@ -490,8 +499,10 @@
                             getSession().getQpidSession()
                                     .messageFlow(getMessageActorID(),
                                                  org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
-                            int received = 0; //getSession().getQpidSession().messageFlush(getMessageActorID());
-                            if ( received == 0  && _isNoWaitIsReceiving)
+                            getSession().getQpidSession().messageFlush(getMessageActorID());
+                            getSession().getQpidSession().sync();
+                            int received = getSession().getQpidSession().messagesReceived();
+                            if (received == 0 && _isNoWaitIsReceiving)
                             {
                                 // Right a message nowait is waiting for a message
                                 // but no one can be delivered it then need to return
@@ -570,9 +581,10 @@
         if (_preAcquire)
         {
             RangeSet ranges = new RangeSet();
-            // TODO: messageID is a string but range need a long???
-            //ranges.add(message.getMessageID());
+            ranges.add(message.getMessageTransferId());
             getSession().getQpidSession().messageRelease(ranges);
+            getSession().getQpidSession().sync();
+            testQpidException();
         }
     }
 
@@ -589,15 +601,17 @@
         if (!_preAcquire)
         {
             RangeSet ranges = new RangeSet();
-            // TODO: messageID is a string but range need a long???
-            // ranges.add(message.getMessageID());
+            ranges.add(message.getMessageTransferId());
 
-            getSession().getQpidSession().messageAcquire(ranges, org.apache.qpidity.client.Session.MESSAGE_ACQUIRE_ANY_AVAILABLE_MESSAGE);
+            getSession().getQpidSession()
+                    .messageAcquire(ranges, org.apache.qpidity.client.Session.MESSAGE_ACQUIRE_ANY_AVAILABLE_MESSAGE);
+            getSession().getQpidSession().sync();
             RangeSet acquired = getSession().getQpidSession().getAccquiredMessages();
             if (acquired.size() > 0)
             {
-                result = true; // todo acquired.iterator().next().getLower() == message.getMessageID();
+                result = true;
             }
+            testQpidException();
         }
         return result;
     }
@@ -613,9 +627,19 @@
         if (!_preAcquire)
         {
             RangeSet ranges = new RangeSet();
-            // TODO: messageID is a string but range need a long???
-            // ranges.add(message.getMessageID());
+            ranges.add(message.getMessageTransferId());
             getSession().getQpidSession().messageAcknowledge(ranges);
+            getSession().getQpidSession().sync();
+            testQpidException();
+        }
+    }
+
+    private void testQpidException() throws QpidException
+    {
+        QpidException qe = getSession().getCurrentException();
+        if (qe != null)
+        {
+            throw qe;
         }
     }
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java?rev=567674&r1=567673&r2=567674&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java Mon Aug 20 05:50:19 2007
@@ -25,6 +25,8 @@
 
 import javax.jms.*;
 import javax.jms.IllegalStateException;
+import javax.jms.MessageListener;
+import javax.jms.Session;
 import java.io.Serializable;
 import java.util.LinkedList;
 import java.util.HashMap;
@@ -108,6 +110,11 @@
     private org.apache.qpidity.client.Session _qpidSession;
 
     /**
+     * The latest qpid Exception that has been reaised.
+     */
+    private QpidException _currentException;
+
+    /**
      * Indicates whether this session is recovering
      */
     private boolean _inRecovery = false;
@@ -142,6 +149,8 @@
 
         // create the qpid session with an expiry  <= 0 so that the session does not expire
         _qpidSession = _connection.getQpidConnection().createSession(0);
+        // set the exception listnere for this session
+        _qpidSession.setExceptionListener(new QpidSessionExceptionListener());
         // set transacted if required
         if (_transacted && !isXA)
         {
@@ -431,8 +440,7 @@
         {
             // release this message
             RangeSet ranges = new RangeSet();
-            // TODO: messageID is a string but range need a long???
-            // ranges.add(message.getMessageID());
+            ranges.add(message.getMessageTransferId());
             getQpidSession().messageRelease(ranges);
         }
     }
@@ -817,6 +825,17 @@
         checkNotClosed();
     }
 
+    /**
+     * Get the latest thrown exception.
+     *
+     * @return The latest thrown exception.
+     */
+    public synchronized QpidException getCurrentException()
+    {
+        QpidException result = _currentException;
+        _currentException = null;
+        return result;
+    }
     //----- Protected methods
 
     /**
@@ -1006,8 +1025,7 @@
         {
             // acknowledge this message
             RangeSet ranges = new RangeSet();
-            // TODO: messageID is a string but range need a long???
-            // ranges.add(message.getMessageID());
+            ranges.add(message.getMessageTransferId());
             getQpidSession().messageAcknowledge(ranges);
         }
         //tobedone: Implement DUPS OK heuristic
@@ -1035,8 +1053,7 @@
                 {
                     // acknowledge this message
                     RangeSet ranges = new RangeSet();
-                    // TODO: messageID is a string but range need a long???
-                    // ranges.add(message.getMessageID()); 
+                    ranges.add(message.getMessageTransferId());
                     getQpidSession().messageAcknowledge(ranges);
                 }
                 //empty the list of unack messages
@@ -1092,6 +1109,22 @@
     }
 
     //------ Inner classes
+
+    /**
+     * Lstener for qpid protocol exceptions
+     */
+    private class QpidSessionExceptionListener implements org.apache.qpidity.client.ExceptionListener
+    {
+        public void onException(QpidException exception)
+        {
+            synchronized (this)
+            {
+                //todo check the error code for finding out if we need to notify the
+                // JMS connection exception listener
+                _currentException = exception;
+            }
+        }
+    }
 
     /**
      * Convenient class for storing incoming messages associated with a consumer ID.

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java?rev=567674&r1=567673&r2=567674&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java Mon Aug 20 05:50:19 2007
@@ -369,6 +369,7 @@
 
     /**
      * Get this message excahgne name
+     *
      * @return this message excahgne name
      */
     public String getExchangeName()
@@ -406,6 +407,16 @@
     public org.apache.qpidity.api.Message getQpidityMessage()
     {
         return _qpidityMessage;
+    }
+
+    /**
+     * Get this message transfer ID.
+     *
+     * @return This message transfer ID.
+     */
+    public long getMessageTransferId()
+    {
+        return _qpidityMessage.getMessageTransferId();
     }
 }