You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gr...@apache.org on 2010/08/13 18:19:28 UTC

svn commit: r985262 - in /qpid/trunk/qpid/java: broker/src/main/java/org/apache/qpid/server/transport/ client/src/main/java/org/apache/qpid/client/ common/src/main/java/org/apache/qpid/transport/ common/src/test/java/org/apache/qpid/transport/

Author: grkvlt
Date: Fri Aug 13 16:19:28 2010
New Revision: 985262

URL: http://svn.apache.org/viewvc?rev=985262&view=rev
Log:
QPID-2657: Correct handling of sync on 0-10 client session for exceptions

AMQSession_0_10 is modified to contain a pair of get/set methods for the current
exception, using the set method to post the exception to the listener. The sync
method will now throw an exception if one has occurred and all other methods
that used to call sync()/getCurrentException() can just call sync(0 and get the
expected behaviour.

Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
    qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java?rev=985262&r1=985261&r2=985262&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java Fri Aug 13 16:19:28 2010
@@ -105,11 +105,19 @@ public class ServerConnection extends Co
     public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException
     {
         ExecutionException ex = new ExecutionException();
-        ex.setErrorCode(ExecutionErrorCode.get(cause.getCode()));
+        ExecutionErrorCode code = ExecutionErrorCode.INTERNAL_ERROR;
+        try
+        {
+	        code = ExecutionErrorCode.get(cause.getCode());
+        }
+        catch (IllegalArgumentException iae)
+        {
+            // Ignore, already set to INTERNAL_ERROR
+        }
+        ex.setErrorCode(code);
         ex.setDescription(message);
         ((ServerSession)session).invoke(ex);
 
         ((ServerSession)session).close();
     }
-   
 }

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=985262&r1=985261&r2=985262&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Fri Aug 13 16:19:28 2010
@@ -361,8 +361,7 @@ public class AMQSession_0_10 extends AMQ
         if (!nowait)
         {
             // We need to sync so that we get notify of an error.
-            getQpidSession().sync();
-            getCurrentException();
+            sync();
         }
     }
 
@@ -382,9 +381,21 @@ public class AMQSession_0_10 extends AMQ
             flushTask = null;
         }
         flushAcknowledgments();
-        getQpidSession().sync();
-        getQpidSession().close();
-        getCurrentException();
+        try
+        {
+	        getQpidSession().sync();
+	        getQpidSession().close();
+        }
+        catch (SessionException se)
+        {
+            setCurrentException(se);
+        }
+
+        AMQException amqe = getCurrentException();
+        if (amqe != null)
+        {
+            throw amqe;
+        }
     }
 
 
@@ -403,7 +414,7 @@ public class AMQSession_0_10 extends AMQ
             getQpidSession().setAutoSync(false);
         }
         // We need to sync so that we get notify of an error.
-        getCurrentException();
+        sync();
     }
 
     /**
@@ -426,8 +437,7 @@ public class AMQSession_0_10 extends AMQ
                                       autoDelete ? Option.AUTO_DELETE : Option.NONE,
                                       exclusive ? Option.EXCLUSIVE : Option.NONE);
         // We need to sync so that we get notify of an error.
-        getQpidSession().sync();
-        getCurrentException();
+        sync();
     }
 
     /**
@@ -451,8 +461,7 @@ public class AMQSession_0_10 extends AMQ
         }
         getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED);
         // We need to sync so that we get notify of an error.
-        getQpidSession().sync();
-        getCurrentException();
+        sync();
     }
 
 
@@ -566,7 +575,7 @@ public class AMQSession_0_10 extends AMQ
         try
         {
             boolean isTopic;
-            
+
             if (consumer.getDestination().getDestSyntax() == AMQDestination.DestSyntax.BURL)
             {
                 isTopic = consumer.getDestination() instanceof AMQTopic ||
@@ -583,7 +592,7 @@ public class AMQSession_0_10 extends AMQ
                              (isTopic || consumer.getMessageSelector() == null || 
                               consumer.getMessageSelector().equals(""));
             }
-                
+
             getQpidSession().messageSubscribe
                 (queueName.toString(), String.valueOf(tag),
                  getAcknowledgeMode() == NO_ACKNOWLEDGE ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT,
@@ -607,7 +616,7 @@ public class AMQSession_0_10 extends AMQ
         }
         getQpidSession().messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF,
                                      Option.UNRELIABLE);
-                
+
         if(capacity > 0 && _dispatcher != null && (isStarted() || _immediatePrefetch))
         {
             // set the flow
@@ -619,11 +628,10 @@ public class AMQSession_0_10 extends AMQ
 
         if (!nowait)
         {
-            getQpidSession().sync();
-            getCurrentException();
+            sync();
         }
     }
-    
+
     private long getCapacity(AMQDestination destination)
     {
         long capacity = 0;
@@ -677,8 +685,7 @@ public class AMQSession_0_10 extends AMQ
         // We need to sync so that we get notify of an error.
         if (!nowait)
         {
-            getQpidSession().sync();
-            getCurrentException();
+            sync();
         }
     }
 
@@ -710,7 +717,7 @@ public class AMQSession_0_10 extends AMQ
         {
             queueName = amqd.getAMQQueueName();
         }
-        
+
         if (amqd.getDestSyntax() == DestSyntax.BURL)
         {        
             Map<String,Object> arguments = new HashMap<String,Object>();
@@ -718,7 +725,7 @@ public class AMQSession_0_10 extends AMQ
             {            
                 arguments.put("no-local", true);
             } 
-           
+
             getQpidSession().queueDeclare(queueName.toString(), "" , arguments,
                                           amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE,
                                           amqd.isDurable() ? Option.DURABLE : Option.NONE,
@@ -733,13 +740,12 @@ public class AMQSession_0_10 extends AMQ
                     node.isDurable() ? Option.DURABLE : Option.NONE,
                     node.isExclusive() ? Option.EXCLUSIVE : Option.NONE);   
         }
-        
+
         // passive --> false
         if (!nowait)
         {
             // We need to sync so that we get notify of an error.
-            getQpidSession().sync();
-            getCurrentException();
+            sync();
         }
         return queueName;
     }
@@ -753,8 +759,7 @@ public class AMQSession_0_10 extends AMQ
         // ifEmpty --> false
         // ifUnused --> false
         // We need to sync so that we get notify of an error.
-        getQpidSession().sync();
-        getCurrentException();
+        sync();
     }
 
     /**
@@ -807,8 +812,7 @@ public class AMQSession_0_10 extends AMQ
             }
         }
         // We need to sync so that we get notify of an error.
-        getQpidSession().sync();
-        getCurrentException();
+        sync();
     }
 
 
@@ -816,8 +820,7 @@ public class AMQSession_0_10 extends AMQ
     {
         getQpidSession().txRollback();
         // We need to sync so that we get notify of an error.
-        getQpidSession().sync();
-        getCurrentException();
+        sync();
     }
 
     //------ Private methods
@@ -835,19 +838,20 @@ public class AMQSession_0_10 extends AMQ
     /**
      * Get the latest thrown exception.
      *
-     * @throws org.apache.qpid.AMQException get the latest thrown error.
+     * @throws SessionException get the latest thrown error.
      */
-    public void getCurrentException() throws AMQException
+    public AMQException getCurrentException()
     {
+        AMQException amqe = null;
         synchronized (_currentExceptionLock)
         {
             if (_currentException != null)
             {
-                AMQException amqe = _currentException;
+                amqe = _currentException;
                 _currentException = null;
-                throw amqe;
             }
         }
+        return amqe;
     }
 
     public void opened(Session ssn) {}
@@ -872,22 +876,7 @@ public class AMQSession_0_10 extends AMQ
 
     public void exception(Session ssn, SessionException exc)
     {
-        synchronized (_currentExceptionLock)
-        {
-            ExecutionException ee = exc.getException();
-            int code;
-            if (ee == null)
-            {
-                code = AMQConstant.INTERNAL_ERROR.getCode();
-            }
-            else
-            {
-                code = ee.getErrorCode().getValue();
-            }
-            AMQException amqe = new AMQException(AMQConstant.getConstant(code), exc.getMessage(), exc.getCause());
-            _connection.exceptionReceived(amqe);
-            _currentException = amqe;
-        }
+        setCurrentException(exc);
     }
 
     public void closed(Session ssn) {}
@@ -1041,11 +1030,41 @@ public class AMQSession_0_10 extends AMQ
     {
         return Serial.lt((int) currentMark, (int) deliveryTag);
     }
-    
+
     public void sync() throws AMQException
     {
-        _qpidSession.sync();
-        getCurrentException();
+        try
+        {
+            getQpidSession().sync();
+        }
+        catch (SessionException se)
+        {
+            setCurrentException(se);
+        }
+
+        AMQException amqe = getCurrentException();
+        if (amqe != null)
+        {
+            throw amqe;
+        }
+    }
+
+    public void setCurrentException(SessionException se)
+    {
+        synchronized (_currentExceptionLock)
+        {
+            ExecutionException ee = se.getException();
+            int code = AMQConstant.INTERNAL_ERROR.getCode();
+            if (ee != null)
+            {
+                code = ee.getErrorCode().getValue();
+            }
+            AMQException amqe = new AMQException(AMQConstant.getConstant(code), se.getMessage(), se.getCause());
+
+            _connection.exceptionReceived(amqe);
+
+            _currentException = amqe;
+        }
     }
 
     public AMQMessageDelegateFactory getMessageDelegateFactory()

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=985262&r1=985261&r2=985262&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Fri Aug 13 16:19:28 2010
@@ -168,16 +168,26 @@ public class BasicMessageConsumer_0_10 e
      */
     @Override void sendCancel() throws AMQException
     {
-        ((AMQSession_0_10) getSession()).getQpidSession().messageCancel(getConsumerTagString());
-        ((AMQSession_0_10) getSession()).getQpidSession().sync();
-        // confirm cancel
-        getSession().confirmConsumerCancelled(getConsumerTag());
-        ((AMQSession_0_10) getSession()).getCurrentException();
+        _0_10session.getQpidSession().messageCancel(getConsumerTagString());
+        try
+        {
+            _0_10session.getQpidSession().sync();
+            getSession().confirmConsumerCancelled(getConsumerTag()); // confirm cancel
+        }
+        catch (SessionException se)
+        {
+            _0_10session.setCurrentException(se);
+        }
+
+        AMQException amqe = _0_10session.getCurrentException();
+        if (amqe != null)
+        {
+            throw amqe;
+        }
     }
 
     @Override void notifyMessage(UnprocessedMessage_0_10 messageFrame)
     {
-
         super.notifyMessage(messageFrame);
     }
 
@@ -285,7 +295,12 @@ public class BasicMessageConsumer_0_10 e
             _0_10session.messageAcknowledge
                 (ranges,
                  _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
-            _0_10session.getCurrentException();
+
+            AMQException amqe = _0_10session.getCurrentException();
+            if (amqe != null)
+            {
+                throw amqe;
+            }
         }
     }
 
@@ -302,7 +317,7 @@ public class BasicMessageConsumer_0_10 e
             RangeSet ranges = new RangeSet();
             ranges.add((int) message.getDeliveryTag());
             _0_10session.getQpidSession().messageRelease(ranges);
-            _0_10session.getCurrentException();
+            _0_10session.sync();
         }
     }
 

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?rev=985262&r1=985261&r2=985262&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Fri Aug 13 16:19:28 2010
@@ -266,7 +266,7 @@ public abstract class BasicMessageProduc
         return _destination;
     }
 
-    public void close() throws JMSException
+    public void close()
     {
         _closed.set(true);
         _session.deregisterProducer(_producerId);

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java?rev=985262&r1=985261&r2=985262&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java Fri Aug 13 16:19:28 2010
@@ -56,7 +56,7 @@ public class Connection extends Connecti
     implements Receiver<ProtocolEvent>, Sender<ProtocolEvent>
 {
 
-    private static final Logger log = Logger.get(Connection.class);
+    protected static final Logger log = Logger.get(Connection.class);
 
 
     public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java?rev=985262&r1=985261&r2=985262&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java Fri Aug 13 16:19:28 2010
@@ -99,5 +99,4 @@ public abstract class ConnectionDelegate
             ssn.closed();
         }
     }
-
 }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java?rev=985262&r1=985261&r2=985262&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java Fri Aug 13 16:19:28 2010
@@ -280,7 +280,7 @@ public class Session extends SessionInvo
             {
                 if (m != null)
                 {
-                    System.out.println(m);
+                    log.debug("%s", m);
                 }
             }
         }
@@ -732,8 +732,7 @@ public class Session extends SessionInvo
             Waiter w = new Waiter(commands, timeout);
             while (w.hasTime() && state != CLOSED && lt(maxComplete, point))
             {
-                log.debug("%s   waiting for[%d]: %d, %s", this, point,
-                          maxComplete, commands);
+                log.debug("%s   waiting for[%d]: %d, %s", this, point, maxComplete, commands);
                 w.await();
             }
 
@@ -741,16 +740,23 @@ public class Session extends SessionInvo
             {
                 if (state != CLOSED)
                 {
-                    throw new SessionException
-                        (String.format
-                         ("timed out waiting for sync: complete = %s, point = %s", maxComplete, point));
+                    throw new SessionException(
+		                    String.format("timed out waiting for sync: complete = %s, point = %s",
+		                            maxComplete, point));
+                }
+                else
+                {
+                    ExecutionException ee = getException();
+                    if (ee != null)
+                    {
+                        throw new SessionException(ee);
+                    }
                 }
             }
         }
     }
 
-    private Map<Integer,ResultFuture<?>> results =
-        new HashMap<Integer,ResultFuture<?>>();
+    private Map<Integer,ResultFuture<?>> results = new HashMap<Integer,ResultFuture<?>>();
     private ExecutionException exception = null;
 
     void result(int command, Struct result)
@@ -769,9 +775,8 @@ public class Session extends SessionInvo
         {
             if (exception != null)
             {
-                throw new IllegalStateException
-                    (String.format
-                     ("too many exceptions: %s, %s", exception, exc));
+                throw new IllegalStateException(
+                        String.format("too many exceptions: %s, %s", exception, exc));
             }
             exception = exc;
         }
@@ -849,8 +854,8 @@ public class Session extends SessionInvo
             }
             else
             {
-                throw new SessionException
-                    (String.format("%s timed out waiting for result: %s",
+                throw new SessionException(
+                        String.format("%s timed out waiting for result: %s",
                                    Session.this, this));
             }
         }
@@ -961,5 +966,4 @@ public class Session extends SessionInvo
     {
         return String.format("ssn:%s", name);
     }
-
 }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java?rev=985262&r1=985261&r2=985262&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java Fri Aug 13 16:19:28 2010
@@ -33,11 +33,15 @@ public class SessionDelegate
     extends MethodDelegate<Session>
     implements ProtocolDelegate<Session>
 {
-    private static final Logger log = Logger.get(SessionDelegate.class);
+    protected static final Logger log = Logger.get(SessionDelegate.class);
 
-    public void init(Session ssn, ProtocolHeader hdr) { }
+    public void init(Session ssn, ProtocolHeader hdr)
+    {
+        log.warn("INIT: [%s] %s", ssn, hdr);
+    }
 
-    public void control(Session ssn, Method method) {
+    public void control(Session ssn, Method method)
+    {
         method.dispatch(ssn, this);
     }
 
@@ -50,7 +54,10 @@ public class SessionDelegate
         }
     }
 
-    public void error(Session ssn, ProtocolError error) { }
+    public void error(Session ssn, ProtocolError error)
+    {
+        log.warn("ERROR: [%s] %s", ssn, error);
+    }
 
     public void handle(Session ssn, Method method)
     {
@@ -195,9 +202,11 @@ public class SessionDelegate
 
     public void closed(Session session)
     {
+        log.warn("CLOSED: [%s]", session);
     }
 
     public void detached(Session session)
-    {        
+    {
+        log.warn("DETACHED: [%s]", session);
     }
 }

Modified: qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java?rev=985262&r1=985261&r2=985262&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java (original)
+++ qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java Fri Aug 13 16:19:28 2010
@@ -424,10 +424,6 @@ public class ConnectionTest extends Qpid
         }
     }
 
-    /**
-     * The 0-10 {@code executionSync} command should set the exception status in the session,
-     * so that the client session object can then throw it as an {@link AMQException}.
-     */
     public void testExecutionExceptionSync() throws Exception
     {
         startServer();
@@ -436,11 +432,15 @@ public class ConnectionTest extends Qpid
         conn.connect("localhost", port, null, "guest", "guest");
         Session ssn = conn.createSession();
         send(ssn, "EXCP 0", true);
-        ExecutionException before = ssn.getException();
-        assertNull("There should not be an exception stored in the session", before);
-        ssn.sync();
-        ExecutionException after = ssn.getException();
-        assertNotNull("There should be an exception stored in the session", after);
+        try
+        {
+            ssn.sync();
+            fail("this should have failed");
+        }
+        catch (SessionException exc)
+        {
+            assertNotNull(exc.getException());
+        }
     }
 
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org