You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2009/04/11 01:46:20 UTC

svn commit: r764109 - in /qpid/trunk/qpid/java: ./ client/src/main/java/org/apache/qpid/client/ systests/src/main/java/org/apache/qpid/test/unit/client/connection/

Author: ritchiem
Date: Fri Apr 10 23:46:19 2009
New Revision: 764109

URL: http://svn.apache.org/viewvc?rev=764109&view=rev
Log:
QPID-1779 : Application of patches attached to JIRA. Should address connection close issues experienced on 0-8/9 branch
Excluded test from TCP runs as it is hardwired to InVM.

Added:
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/CloseAfterConnectionFailureTest.java
Modified:
    qpid/trunk/qpid/java/010ExcludeList
    qpid/trunk/qpid/java/08ExcludeList-nonvm
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/Closeable.java

Modified: qpid/trunk/qpid/java/010ExcludeList
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/010ExcludeList?rev=764109&r1=764108&r2=764109&view=diff
==============================================================================
--- qpid/trunk/qpid/java/010ExcludeList (original)
+++ qpid/trunk/qpid/java/010ExcludeList Fri Apr 10 23:46:19 2009
@@ -64,3 +64,6 @@
 // This test currently does not pick up the runtime location of the nonVm queueBacking store.
 org.apache.qpid.test.unit.client.close.FlowToDiskBackingQueueDeleteTest#*
 
+// This test may use QpidTestCase but it is not using the getConnection and is hardwired to InVM
+org.apache.qpid.test.unit.client.connection.CloseAfterConnectionFailureTest#*
+

Modified: qpid/trunk/qpid/java/08ExcludeList-nonvm
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/08ExcludeList-nonvm?rev=764109&r1=764108&r2=764109&view=diff
==============================================================================
--- qpid/trunk/qpid/java/08ExcludeList-nonvm (original)
+++ qpid/trunk/qpid/java/08ExcludeList-nonvm Fri Apr 10 23:46:19 2009
@@ -32,3 +32,5 @@
 // This test currently does not pick up the runtime location of the nonVm queueBacking store.
 org.apache.qpid.test.unit.client.close.FlowToDiskBackingQueueDeleteTest#*
 
+// This test may use QpidTestCase but it is not using the getConnection and is hardwired to InVM
+org.apache.qpid.test.unit.client.connection.CloseAfterConnectionFailureTest#*

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=764109&r1=764108&r2=764109&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Fri Apr 10 23:46:19 2009
@@ -58,6 +58,7 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQProtocolException;
 import org.apache.qpid.AMQUnresolvedAddressException;
+import org.apache.qpid.AMQDisconnectedException;
 import org.apache.qpid.client.configuration.ClientProperties;
 import org.apache.qpid.client.failover.FailoverException;
 import org.apache.qpid.client.failover.FailoverProtectedOperation;
@@ -924,7 +925,12 @@
     {
         if (!_closed.getAndSet(true))
         {
-            doClose(sessions, timeout);
+            _closing.set(true);
+            try{
+                doClose(sessions, timeout);
+            }finally{
+                _closing.set(false);
+            }
         }
     }
 
@@ -1318,7 +1324,7 @@
         // in the case of an IOException, MINA has closed the protocol session so we set _closed to true
         // so that any generic client code that tries to close the connection will not mess up this error
         // handling sequence
-        if (cause instanceof IOException)
+        if (cause instanceof IOException || cause instanceof AMQDisconnectedException)
         {
             closer = !_closed.getAndSet(true);
 

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=764109&r1=764108&r2=764109&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Fri Apr 10 23:46:19 2009
@@ -282,7 +282,7 @@
 
     /** Holds the dispatcher thread for this session. */
     protected Dispatcher _dispatcher;
-    
+
     protected Thread _dispatcherThread;
 
     /** Holds the message factory factory for this session. */
@@ -644,7 +644,11 @@
 
                     try
                     {
-                        sendClose(timeout);
+                        // IF we are closing then send the close.
+                        if (_connection.isClosing())
+                        {
+                            sendClose(timeout);
+                        }
                     }
                     catch (AMQException e)
                     {
@@ -1219,9 +1223,9 @@
 
             // this is done so that we can produce to a temporary queue before we create a consumer
             result.setQueueName(result.getRoutingKey());
-            createQueue(result.getAMQQueueName(), result.isAutoDelete(), 
+            createQueue(result.getAMQQueueName(), result.isAutoDelete(),
                         result.isDurable(), result.isExclusive());
-            bindQueue(result.getAMQQueueName(), result.getRoutingKey(), 
+            bindQueue(result.getAMQQueueName(), result.getRoutingKey(),
                     new FieldTable(), result.getExchangeName(), result);
             return result;
         }
@@ -1683,11 +1687,11 @@
                         // if (rawSelector != null)
                         // ft.put("headers", rawSelector.getDataAsBytes());
                         // rawSelector is used by HeadersExchange and is not a JMS Selector
-                        if (rawSelector != null) 
+                        if (rawSelector != null)
                         {
                             ft.addAll(rawSelector);
                         }
-                        
+
                         if (messageSelector != null)
                         {
                             ft.put(new AMQShortString("x-filter-jms-selector"), messageSelector);
@@ -1937,13 +1941,13 @@
             _dispatcher = new Dispatcher();
             try
             {
-                _dispatcherThread = Threading.getThreadFactory().createThread(_dispatcher);       
-                
+                _dispatcherThread = Threading.getThreadFactory().createThread(_dispatcher);
+
             }
             catch(Exception e)
             {
                 throw new Error("Error creating Dispatcher thread",e);
-            }            
+            }
             _dispatcherThread.setName("Dispatcher-Channel-" + _channelId);
             _dispatcherThread.setDaemon(true);
             _dispatcher.setConnectionStopped(initiallyStopped);

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=764109&r1=764108&r2=764109&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Fri Apr 10 23:46:19 2009
@@ -563,7 +563,10 @@
                 {
                     try
                     {
-                        sendCancel();
+                        if (!_connection.isClosing())
+                        {
+                            sendCancel();
+                        }
                     }
                     catch (AMQException e)
                     {

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/Closeable.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/Closeable.java?rev=764109&r1=764108&r2=764109&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/Closeable.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/Closeable.java Fri Apr 10 23:46:19 2009
@@ -52,6 +52,13 @@
     protected final AtomicBoolean _closed = new AtomicBoolean(false);
 
     /**
+     * Are we in the process of closing. We have this distinction so we can
+     * still signal we are in the process of closing so other objects can tell
+     * the difference and tidy up.
+     */
+    protected final AtomicBoolean _closing = new AtomicBoolean(false);
+
+    /**
      * Checks if this is closed, and raises a JMSException if it is.
      *
      * @throws JMSException If this is closed.
@@ -75,6 +82,17 @@
     }
 
     /**
+     * Checks if this is closis.
+     *
+     * @return <tt>true</tt> if we are closing, <tt>false</tt> otherwise.
+     */
+    public boolean isClosing()
+    {
+        return _closing.get();
+    }
+
+
+    /**
      * Closes this object.
      *
      * @throws JMSException If this cannot be closed for any reason.

Added: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/CloseAfterConnectionFailureTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/CloseAfterConnectionFailureTest.java?rev=764109&view=auto
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/CloseAfterConnectionFailureTest.java (added)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/CloseAfterConnectionFailureTest.java Fri Apr 10 23:46:19 2009
@@ -0,0 +1,110 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.client.connection;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQConnectionURL;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
+import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.url.URLSyntaxException;
+
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import java.util.concurrent.CountDownLatch;
+
+public class CloseAfterConnectionFailureTest extends QpidTestCase implements ExceptionListener
+{
+    private int sessionCount = 0;
+    AMQConnection connection;
+    Session session;
+    MessageConsumer consumer;
+    private CountDownLatch _latch = new CountDownLatch(1);
+
+    public void testNoFailover() throws URLSyntaxException, AMQVMBrokerCreationException,
+                                        InterruptedException, JMSException
+    {
+        String connectionString = "amqp://guest:guest@/test?brokerlist='vm://:1?connectdelay='500',retries='3'',failover='nofailover'";
+
+        AMQConnectionURL url = new AMQConnectionURL(connectionString);
+
+        try
+        {
+            //Start the connection so it will use the retries
+            connection = new AMQConnection(url, null);
+
+            connection.setExceptionListener(this);
+
+            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            consumer = session.createConsumer(session.createQueue(this.getName()));
+
+            //Kill connection
+            TransportConnection.killAllVMBrokers();
+            _latch.await();
+        }
+        catch (AMQException e)
+        {
+            fail(e.getMessage());
+        }
+    }
+
+    public void onException(JMSException e)
+    {
+        System.err.println("Connection isClosed after connection Falure?:" + connection.isClosed());
+        try
+        {
+            consumer.close();
+        }
+        catch (JMSException jsme)
+        {
+            System.err.println("Consumer close failed with:" + jsme.getMessage());
+        }
+        System.err.println("Connection isClosed after connection Falure?:" + connection.isClosed());
+        try
+        {
+            //Note that if we actually do session.close() we will lock up as the session will never receive a frame
+            // from the
+            ((AMQSession)session).close(10);
+        }
+        catch (JMSException jsme)
+        {
+            System.err.println("Session close failed with:" + jsme.getMessage());
+        }
+        System.err.println("Connection isClosed after connection Falure?:" + connection.isClosed());
+
+        try
+        {
+            connection.close();
+        }
+        catch (JMSException jsme)
+        {
+            System.err.println("Session close failed with:" + jsme.getMessage());
+        }
+        System.err.println("Connection isClosed after connection Falure?:" + connection.isClosed());
+
+        _latch.countDown();
+    }
+
+}



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org