You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/09/26 12:45:22 UTC

svn commit: r579577 - in /incubator/qpid/branches/M2.1/java/client/src: main/java/org/apache/qpid/client/transport/TransportConnection.java test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java

Author: ritchiem
Date: Wed Sep 26 03:45:21 2007
New Revision: 579577

URL: http://svn.apache.org/viewvc?rev=579577&view=rev
Log:
Updated TransportConnection to synchronize around the creation/destruction of VM Brokers. I had observed a ConcurrentModificationException in the KillAllVMBrokers().

This isn't good this suggests that the tests are overlapping. This fix won't address that problem but will stop any CModifications occuring. If there is test setup/teardown overlapping we should now see tests failing because the VM broker isn't there.

Modified:
    incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
    incubator/qpid/branches/M2.1/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java

Modified: incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java?rev=579577&r1=579576&r2=579577&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java (original)
+++ incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java Wed Sep 26 03:45:21 2007
@@ -149,19 +149,21 @@
     {
         int port = details.getPort();
 
-        if (!_inVmPipeAddress.containsKey(port))
+        synchronized (_inVmPipeAddress)
         {
-            if (AutoCreate)
+            if (!_inVmPipeAddress.containsKey(port))
             {
-                createVMBroker(port);
-            }
-            else
-            {
-                throw new AMQVMBrokerCreationException(null, port, "VM Broker on port " + port
-                    + " does not exist. Auto create disabled.", null);
+                if (AutoCreate)
+                {
+                    createVMBroker(port);
+                }
+                else
+                {
+                    throw new AMQVMBrokerCreationException(null, port, "VM Broker on port " + port
+                        + " does not exist. Auto create disabled.", null);
+                }
             }
         }
-
         return new VmPipeTransportConnection(port);
     }
 
@@ -176,69 +178,71 @@
             config.setThreadModel(ReadWriteThreadModel.getInstance());
         }
 
-        if (!_inVmPipeAddress.containsKey(port))
+        synchronized (_inVmPipeAddress)
         {
-            _logger.info("Creating InVM Qpid.AMQP listening on port " + port);
-            IoHandlerAdapter provider = null;
-            try
+            if (!_inVmPipeAddress.containsKey(port))
             {
-                VmPipeAddress pipe = new VmPipeAddress(port);
-
-                provider = createBrokerInstance(port);
-
-                _acceptor.bind(pipe, provider);
-
-                _inVmPipeAddress.put(port, pipe);
-                _logger.info("Created InVM Qpid.AMQP listening on port " + port);
-            }
-            catch (IOException e)
-            {
-                _logger.error("Got IOException.", e);
-
-                // Try and unbind provider
+                _logger.info("Creating InVM Qpid.AMQP listening on port " + port);
+                IoHandlerAdapter provider = null;
                 try
                 {
                     VmPipeAddress pipe = new VmPipeAddress(port);
 
-                    try
-                    {
-                        _acceptor.unbind(pipe);
-                    }
-                    catch (Exception ignore)
-                    {
-                        // ignore
-                    }
-
-                    if (provider == null)
-                    {
-                        provider = createBrokerInstance(port);
-                    }
+                    provider = createBrokerInstance(port);
 
                     _acceptor.bind(pipe, provider);
+
                     _inVmPipeAddress.put(port, pipe);
                     _logger.info("Created InVM Qpid.AMQP listening on port " + port);
                 }
-                catch (IOException justUseFirstException)
+                catch (IOException e)
                 {
-                    String because;
-                    if (e.getCause() == null)
+                    _logger.error("Got IOException.", e);
+
+                    // Try and unbind provider
+                    try
                     {
-                        because = e.toString();
+                        VmPipeAddress pipe = new VmPipeAddress(port);
+
+                        try
+                        {
+                            _acceptor.unbind(pipe);
+                        }
+                        catch (Exception ignore)
+                        {
+                            // ignore
+                        }
+
+                        if (provider == null)
+                        {
+                            provider = createBrokerInstance(port);
+                        }
+
+                        _acceptor.bind(pipe, provider);
+                        _inVmPipeAddress.put(port, pipe);
+                        _logger.info("Created InVM Qpid.AMQP listening on port " + port);
                     }
-                    else
+                    catch (IOException justUseFirstException)
                     {
-                        because = e.getCause().toString();
-                    }
+                        String because;
+                        if (e.getCause() == null)
+                        {
+                            because = e.toString();
+                        }
+                        else
+                        {
+                            because = e.getCause().toString();
+                        }
 
-                    throw new AMQVMBrokerCreationException(null, port, because + " Stopped binding of InVM Qpid.AMQP", e);
+                        throw new AMQVMBrokerCreationException(null, port, because + " Stopped binding of InVM Qpid.AMQP", e);
+                    }
                 }
             }
+            else
+            {
+                _logger.info("InVM Qpid.AMQP on port " + port + " already exits.");
+            }
         }
-        else
-        {
-            _logger.info("InVM Qpid.AMQP on port " + port + " already exits.");
-        }
-
     }
 
     private static IoHandlerAdapter createBrokerInstance(int port) throws AMQVMBrokerCreationException
@@ -285,25 +289,29 @@
     {
         _logger.info("Killing all VM Brokers");
         _acceptor.unbindAll();
-
-        Iterator keys = _inVmPipeAddress.keySet().iterator();
-
-        while (keys.hasNext())
+        synchronized (_inVmPipeAddress)
         {
-            int id = (Integer) keys.next();
-            _inVmPipeAddress.remove(id);
-        }
+            Iterator keys = _inVmPipeAddress.keySet().iterator();
 
+            while (keys.hasNext())
+            {
+                int id = (Integer) keys.next();
+                _inVmPipeAddress.remove(id);
+            }
+        }
     }
 
     public static void killVMBroker(int port)
     {
-        VmPipeAddress pipe = (VmPipeAddress) _inVmPipeAddress.get(port);
-        if (pipe != null)
+        synchronized (_inVmPipeAddress)
         {
-            _logger.info("Killing VM Broker:" + port);
-            _inVmPipeAddress.remove(port);
-            _acceptor.unbind(pipe);
+            VmPipeAddress pipe = (VmPipeAddress) _inVmPipeAddress.get(port);
+            if (pipe != null)
+            {
+                _logger.info("Killing VM Broker:" + port);
+                _inVmPipeAddress.remove(port);
+                _acceptor.unbind(pipe);
+            }
         }
     }
 

Modified: incubator/qpid/branches/M2.1/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java?rev=579577&r1=579576&r2=579577&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java (original)
+++ incubator/qpid/branches/M2.1/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java Wed Sep 26 03:45:21 2007
@@ -434,6 +434,13 @@
         verifyMessages(_consumer.receive(1000));
     }
 
+    /**
+     * This test sends two messages receives on of them but doesn't ack it.
+     * The consumer is then closed
+     * the first message should be returned as redelivered.
+     *  the second message should be delivered normally. 
+     * @throws Exception
+     */
     public void testSend2ThenCloseAfter1andTryAgain() throws Exception
     {
         assertTrue("session is not transacted", _session.getTransacted());