You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2009/04/14 17:05:11 UTC

svn commit: r764816 - in /qpid/branches/0.5-fix/qpid: ./ java/ java/broker/bin/ java/broker/src/main/java/org/apache/qpid/server/management/ java/broker/src/main/java/org/apache/qpid/server/queue/ java/client/src/main/java/org/apache/qpid/client/ java/...

Author: ritchiem
Date: Tue Apr 14 15:05:10 2009
New Revision: 764816

URL: http://svn.apache.org/viewvc?rev=764816&view=rev
Log:
Merged change to SlowMessageStore to ensure we don't infinitely configure the same class.

QPID-1621: add ServerConfiguration, QueueConfiguration and SecurityConfiguration classes. Move almost all uses of o.a.commons.configuration.Configuration behind there.  @Configured delenda est

Merged changed from trunk r745799

Added:
    qpid/branches/0.5-fix/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/CloseAfterConnectionFailureTest.java
      - copied, changed from r764109, qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/CloseAfterConnectionFailureTest.java
Modified:
    qpid/branches/0.5-fix/qpid/   (props changed)
    qpid/branches/0.5-fix/qpid/java/010ExcludeList
    qpid/branches/0.5-fix/qpid/java/08ExcludeList-nonvm
    qpid/branches/0.5-fix/qpid/java/broker/bin/   (props changed)
    qpid/branches/0.5-fix/qpid/java/broker/src/main/java/org/apache/qpid/server/management/   (props changed)
    qpid/branches/0.5-fix/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/   (props changed)
    qpid/branches/0.5-fix/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    qpid/branches/0.5-fix/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    qpid/branches/0.5-fix/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
    qpid/branches/0.5-fix/qpid/java/client/src/main/java/org/apache/qpid/client/Closeable.java

Propchange: qpid/branches/0.5-fix/qpid/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Apr 14 15:05:10 2009
@@ -1 +1 @@
-/qpid/trunk/qpid:742626,743015,743028-743029,743304,743306,743311,743357,744113,747363,747367,747369-747370,747376,747783,747868-747870,747875,748561,748591,748641,748680,748686,749149,749282,749285,749315,749340,749572,753219-753220,753253,754934,754958,755256,757258,757270,758730,760919,761721,762365,762992,763959
+/qpid/trunk/qpid:742626,743015,743028-743029,743304,743306,743311,743357,744113,747363,747367,747369-747370,747376,747783,747868-747870,747875,748561,748591,748641,748680,748686,749149,749282,749285,749315,749340,749572,753219-753220,753253,754934,754958,755256,757258,757270,758730,759097,760919,761721,762365,762992,763959,764109,764140,764790

Modified: qpid/branches/0.5-fix/qpid/java/010ExcludeList
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-fix/qpid/java/010ExcludeList?rev=764816&r1=764815&r2=764816&view=diff
==============================================================================
--- qpid/branches/0.5-fix/qpid/java/010ExcludeList (original)
+++ qpid/branches/0.5-fix/qpid/java/010ExcludeList Tue Apr 14 15:05:10 2009
@@ -59,3 +59,5 @@
 org.apache.qpid.server.exchange.ReturnUnroutableMandatoryMessageTest#*
 org.apache.qpid.server.queue.PriorityTest#*
 org.apache.qpid.server.queue.TimeToLiveTest#*
+// This test may use QpidTestCase but it is not using the getConnection and is hardwired to InVM
+org.apache.qpid.test.unit.client.connection.CloseAfterConnectionFailureTest#*

Modified: qpid/branches/0.5-fix/qpid/java/08ExcludeList-nonvm
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-fix/qpid/java/08ExcludeList-nonvm?rev=764816&r1=764815&r2=764816&view=diff
==============================================================================
--- qpid/branches/0.5-fix/qpid/java/08ExcludeList-nonvm (original)
+++ qpid/branches/0.5-fix/qpid/java/08ExcludeList-nonvm Tue Apr 14 15:05:10 2009
@@ -27,3 +27,7 @@
 
 // Those tests are written against the 0.10 path
 org.apache.qpid.test.unit.message.UTF8Test#*
+org.apache.qpid.client.MessageListenerTest#testSynchronousRecieveNoWait
+
+// This test may use QpidTestCase but it is not using the getConnection and is hardwired to InVM
+org.apache.qpid.test.unit.client.connection.CloseAfterConnectionFailureTest#*

Propchange: qpid/branches/0.5-fix/qpid/java/broker/bin/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Apr 14 15:05:10 2009
@@ -1,2 +1,2 @@
 /qpid/branches/0.5-release/qpid/java/broker/bin:757268
-/qpid/trunk/qpid/java/broker/bin:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959
+/qpid/trunk/qpid/java/broker/bin:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764109,764140,764790

Propchange: qpid/branches/0.5-fix/qpid/java/broker/src/main/java/org/apache/qpid/server/management/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Apr 14 15:05:10 2009
@@ -1,2 +1,2 @@
 /qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/management:757268
-/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959
+/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764109,764140,764790

Propchange: qpid/branches/0.5-fix/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Apr 14 15:05:10 2009
@@ -1,2 +1,2 @@
 /qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:757257
-/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:753219-753220,753253,757270,758730,759097,760919,761721,762365,762992,763959
+/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:753219-753220,753253,757270,758730,759097,760919,761721,762365,762992,763959,764109,764140,764790

Modified: qpid/branches/0.5-fix/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-fix/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=764816&r1=764815&r2=764816&view=diff
==============================================================================
--- qpid/branches/0.5-fix/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ qpid/branches/0.5-fix/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Tue Apr 14 15:05:10 2009
@@ -58,6 +58,7 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQProtocolException;
 import org.apache.qpid.AMQUnresolvedAddressException;
+import org.apache.qpid.AMQDisconnectedException;
 import org.apache.qpid.client.configuration.ClientProperties;
 import org.apache.qpid.client.failover.FailoverException;
 import org.apache.qpid.client.failover.FailoverProtectedOperation;
@@ -889,7 +890,12 @@
     {
         if (!_closed.getAndSet(true))
         {
-            doClose(sessions, timeout);
+            _closing.set(true);
+            try{
+                doClose(sessions, timeout);
+            }finally{
+                _closing.set(false);
+            }
         }
     }
 
@@ -1283,8 +1289,10 @@
         // in the case of an IOException, MINA has closed the protocol session so we set _closed to true
         // so that any generic client code that tries to close the connection will not mess up this error
         // handling sequence
-        if (cause instanceof IOException)
+        if (cause instanceof IOException || cause instanceof AMQDisconnectedException)
         {
+            // If we have an IOE/AMQDisconnect there is no connection to close on.
+            _closing.set(false);
             closer = !_closed.getAndSet(true);
 
             _protocolHandler.getProtocolSession().notifyError(je);

Modified: qpid/branches/0.5-fix/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-fix/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=764816&r1=764815&r2=764816&view=diff
==============================================================================
--- qpid/branches/0.5-fix/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/branches/0.5-fix/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Tue Apr 14 15:05:10 2009
@@ -21,6 +21,7 @@
 package org.apache.qpid.client;
 
 import java.io.Serializable;
+import java.io.IOException;
 import java.net.URISyntaxException;
 import java.text.MessageFormat;
 import java.util.ArrayList;
@@ -282,7 +283,7 @@
 
     /** Holds the dispatcher thread for this session. */
     protected Dispatcher _dispatcher;
-    
+
     protected Thread _dispatcherThread;
 
     /** Holds the message factory factory for this session. */
@@ -625,6 +626,7 @@
         // Ensure we only try and close an open session.
         if (!_closed.getAndSet(true))
         {
+            _closing.set(true);
             synchronized (getFailoverMutex())
             {
                 // We must close down all producers and consumers in an orderly fashion. This is the only method
@@ -636,7 +638,13 @@
 
                     try
                     {
-                        sendClose(timeout);
+                        // If the connection is open or we are in the process
+                        // of closing the connection then send a cance
+                        // no point otherwise as the connection will be gone
+                        if (!_connection.isClosed() || _connection.isClosing())                        
+                        {
+                            sendClose(timeout);
+                        }
                     }
                     catch (AMQException e)
                     {
@@ -683,7 +691,11 @@
                 // Failover failed and ain't coming back. Knife the dispatcher.
                 _dispatcherThread.interrupt();
             }
-        }
+
+       }
+
+        //if we don't have an exception then we can perform closing operations  
+        _closing.set(e == null);
 
         if (!_closed.getAndSet(true))
         {
@@ -1210,9 +1222,9 @@
 
             // this is done so that we can produce to a temporary queue before we create a consumer
             result.setQueueName(result.getRoutingKey());
-            createQueue(result.getAMQQueueName(), result.isAutoDelete(), 
+            createQueue(result.getAMQQueueName(), result.isAutoDelete(),
                         result.isDurable(), result.isExclusive());
-            bindQueue(result.getAMQQueueName(), result.getRoutingKey(), 
+            bindQueue(result.getAMQQueueName(), result.getRoutingKey(),
                     new FieldTable(), result.getExchangeName(), result);
             return result;
         }
@@ -1674,11 +1686,11 @@
                         // if (rawSelector != null)
                         // ft.put("headers", rawSelector.getDataAsBytes());
                         // rawSelector is used by HeadersExchange and is not a JMS Selector
-                        if (rawSelector != null) 
+                        if (rawSelector != null)
                         {
                             ft.addAll(rawSelector);
                         }
-                        
+
                         if (messageSelector != null)
                         {
                             ft.put(new AMQShortString("x-filter-jms-selector"), messageSelector);
@@ -1918,13 +1930,13 @@
             _dispatcher = new Dispatcher();
             try
             {
-                _dispatcherThread = Threading.getThreadFactory().createThread(_dispatcher);       
-                
+                _dispatcherThread = Threading.getThreadFactory().createThread(_dispatcher);
+
             }
             catch(Exception e)
             {
                 throw new Error("Error creating Dispatcher thread",e);
-            }            
+            }
             _dispatcherThread.setName("Dispatcher-Channel-" + _channelId);
             _dispatcherThread.setDaemon(true);
             _dispatcher.setConnectionStopped(initiallyStopped);
@@ -2971,4 +2983,27 @@
             }
         }
     }
+
+    /**
+     * Checks if the Session and its parent connection are closed
+     *
+     * @return <tt>true</tt> if this is closed, <tt>false</tt> otherwise.
+     */
+    @Override
+    public boolean isClosed()
+    {
+        return _closed.get() || _connection.isClosed();
+    }
+
+    /**
+     * Checks if the Session and its parent connection are capable of performing
+     * closing operations
+     *
+     * @return <tt>true</tt> if we are closing, <tt>false</tt> otherwise.
+     */
+    @Override
+    public boolean isClosing()
+    {
+        return _closing.get()|| _connection.isClosing();
+    }
 }

Modified: qpid/branches/0.5-fix/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-fix/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=764816&r1=764815&r2=764816&view=diff
==============================================================================
--- qpid/branches/0.5-fix/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ qpid/branches/0.5-fix/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Tue Apr 14 15:05:10 2009
@@ -541,6 +541,7 @@
 
         if (!_closed.getAndSet(true))
         {
+            _closing.set(true);
             if (_logger.isDebugEnabled())
             {
                 StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
@@ -561,7 +562,13 @@
                 {
                     try
                     {
-                        sendCancel();
+                        // If the session is open or we are in the process
+                        // of closing the session then send a cance
+                        // no point otherwise as the connection will be gone
+                        if (!_session.isClosed() || _session.isClosing())
+                        {
+                            sendCancel();
+                        }
                     }
                     catch (AMQException e)
                     {

Modified: qpid/branches/0.5-fix/qpid/java/client/src/main/java/org/apache/qpid/client/Closeable.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-fix/qpid/java/client/src/main/java/org/apache/qpid/client/Closeable.java?rev=764816&r1=764815&r2=764816&view=diff
==============================================================================
--- qpid/branches/0.5-fix/qpid/java/client/src/main/java/org/apache/qpid/client/Closeable.java (original)
+++ qpid/branches/0.5-fix/qpid/java/client/src/main/java/org/apache/qpid/client/Closeable.java Tue Apr 14 15:05:10 2009
@@ -52,6 +52,13 @@
     protected final AtomicBoolean _closed = new AtomicBoolean(false);
 
     /**
+     * Are we in the process of closing. We have this distinction so we can
+     * still signal we are in the process of closing so other objects can tell
+     * the difference and tidy up.
+     */
+    protected final AtomicBoolean _closing = new AtomicBoolean(false);
+
+    /**
      * Checks if this is closed, and raises a JMSException if it is.
      *
      * @throws JMSException If this is closed.
@@ -75,6 +82,17 @@
     }
 
     /**
+     * Checks if this is closis.
+     *
+     * @return <tt>true</tt> if we are closing, <tt>false</tt> otherwise.
+     */
+    public boolean isClosing()
+    {
+        return _closing.get();
+    }
+
+
+    /**
      * Closes this object.
      *
      * @throws JMSException If this cannot be closed for any reason.

Copied: qpid/branches/0.5-fix/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/CloseAfterConnectionFailureTest.java (from r764109, qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/CloseAfterConnectionFailureTest.java)
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-fix/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/CloseAfterConnectionFailureTest.java?p2=qpid/branches/0.5-fix/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/CloseAfterConnectionFailureTest.java&p1=qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/CloseAfterConnectionFailureTest.java&r1=764109&r2=764816&rev=764816&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/CloseAfterConnectionFailureTest.java (original)
+++ qpid/branches/0.5-fix/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/CloseAfterConnectionFailureTest.java Tue Apr 14 15:05:10 2009
@@ -42,6 +42,7 @@
     Session session;
     MessageConsumer consumer;
     private CountDownLatch _latch = new CountDownLatch(1);
+    private JMSException _fail;
 
     public void testNoFailover() throws URLSyntaxException, AMQVMBrokerCreationException,
                                         InterruptedException, JMSException
@@ -63,6 +64,12 @@
             //Kill connection
             TransportConnection.killAllVMBrokers();
             _latch.await();
+
+            if (_fail != null)
+            {
+                _fail.printStackTrace(System.out);
+                fail("Exception thrown:" + _fail.getMessage());
+            }
         }
         catch (AMQException e)
         {
@@ -72,39 +79,43 @@
 
     public void onException(JMSException e)
     {
-        System.err.println("Connection isClosed after connection Falure?:" + connection.isClosed());
+        System.out.println("Connection isClosed after connection Falure?:" + connection.isClosed());
         try
         {
             consumer.close();
         }
-        catch (JMSException jsme)
+        catch (JMSException jmse)
         {
-            System.err.println("Consumer close failed with:" + jsme.getMessage());
+            System.out.println("Consumer close failed with:" + jmse.getMessage());
+            _fail = jmse;
         }
-        System.err.println("Connection isClosed after connection Falure?:" + connection.isClosed());
+        System.out.println("Connection isClosed after connection Falure?:" + connection.isClosed());
         try
         {
             //Note that if we actually do session.close() we will lock up as the session will never receive a frame
             // from the
-            ((AMQSession)session).close(10);
+            ((AMQSession) session).close(10);
         }
-        catch (JMSException jsme)
+        catch (JMSException jmse)
         {
-            System.err.println("Session close failed with:" + jsme.getMessage());
+            System.out.println("Session close failed with:" + jmse.getMessage());
+            _fail = jmse;
         }
-        System.err.println("Connection isClosed after connection Falure?:" + connection.isClosed());
+        System.out.println("Connection isClosed after connection Falure?:" + connection.isClosed());
 
         try
         {
             connection.close();
         }
-        catch (JMSException jsme)
+        catch (JMSException jmse)
         {
-            System.err.println("Session close failed with:" + jsme.getMessage());
+            System.out.println("Session close failed with:" + jmse.getMessage());
+            _fail = jmse;
         }
-        System.err.println("Connection isClosed after connection Falure?:" + connection.isClosed());
+        System.out.println("Connection isClosed after connection Falure?:" + connection.isClosed());
 
         _latch.countDown();
+
     }
 
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org