You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2008/07/15 19:03:34 UTC

svn commit: r676973 - /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java

Author: ritchiem
Date: Tue Jul 15 10:03:34 2008
New Revision: 676973

URL: http://svn.apache.org/viewvc?rev=676973&view=rev
Log:
QPID-984 : Applied fix from M2.1.x that adds requried synchronization around setup and tear down of Connections.

Modified:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java?rev=676973&r1=676972&r2=676973&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java Tue Jul 15 10:03:34 2008
@@ -40,7 +40,6 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.net.Socket;
 
-
 /**
  * The TransportConnection is a helper class responsible for connecting to an AMQ server. It sets up the underlying
  * connector, which currently always uses TCP/IP sockets. It creates the "protocol handler" which deals with MINA
@@ -85,38 +84,18 @@
             throw new AMQNoTransportForProtocolException(details, null, null);
         }
 
-       /* if (transport == _currentInstance)
-        {
-            if (transport == VM)
-            {
-                if (_currentVMPort == details.getPort())
-                {
-                    return _instance;
-                }
-            }
-            else
-            {
-                return _instance;
-            }
-        }
-
-        _currentInstance = transport;*/
-
-        ITransportConnection instance;
         switch (transport)
         {
             case SOCKET:
-                instance =
-                        new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory()
-                        {
-                            public IoConnector newSocketConnector()
-                            {
-                                return new ExistingSocketConnector();
-                            }
-                        });
-                break;
+                return new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory()
+                {
+                    public IoConnector newSocketConnector()
+                    {
+                        return new ExistingSocketConnector();
+                    }
+                });
             case TCP:
-                instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory()
+                return new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory()
                 {
                     public IoConnector newSocketConnector()
                     {
@@ -125,8 +104,8 @@
                         if (Boolean.getBoolean("qpidnio"))
                         {
                             _logger.warn("Using Qpid MultiThreaded NIO - " + (System.getProperties().containsKey("qpidnio")
-                                                                 ? "Qpid NIO is new default"
-                                                                 : "Sysproperty 'qpidnio' is set"));
+                                                                              ? "Qpid NIO is new default"
+                                                                              : "Sysproperty 'qpidnio' is set"));
                             result = new MultiThreadSocketConnector();
                         }
                         else
@@ -141,18 +120,13 @@
                         return result;
                     }
                 });
-                break;
             case VM:
             {
-                instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker"));
-                break;
+                return getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker"));
             }
             default:
-                // FIXME: TGM
-                throw new AMQNoTransportForProtocolException(details, null, null);
+                throw new AMQNoTransportForProtocolException(details, "Transport not recognised:" + transport, null);
         }
-
-        return instance;
     }
 
     private static int getTransport(String transport)
@@ -180,13 +154,21 @@
     {
         int port = details.getPort();
 
-        if (!_inVmPipeAddress.containsKey(port))
+        synchronized (_inVmPipeAddress)
         {
-            if (AutoCreate)
+            if (!_inVmPipeAddress.containsKey(port))
             {
                 if (AutoCreate)
                 {
-                    createVMBroker(port);
+                    if (AutoCreate)
+                    {
+                        createVMBroker(port);
+                    }
+                    else
+                    {
+                        throw new AMQVMBrokerCreationException(null, port, "VM Broker on port " + port
+                                                                           + " does not exist. Auto create disabled.", null);
+                    }
                 }
                 else
                 {
@@ -194,11 +176,6 @@
                                                                        + " does not exist. Auto create disabled.", null);
                 }
             }
-            else
-            {
-                throw new AMQVMBrokerCreationException(null, port, "VM Broker on port " + port
-                                                                   + " does not exist. Auto create disabled.", null);
-            }
         }
 
         return new VmPipeTransportConnection(port);
@@ -214,70 +191,73 @@
 
             config.setThreadModel(ReadWriteThreadModel.getInstance());
         }
-
-        if (!_inVmPipeAddress.containsKey(port))
+        synchronized (_inVmPipeAddress)
         {
-            _logger.info("Creating InVM Qpid.AMQP listening on port " + port);
-            IoHandlerAdapter provider = null;
-            try
-            {
-                VmPipeAddress pipe = new VmPipeAddress(port);
-
-                provider = createBrokerInstance(port);
-
-                _acceptor.bind(pipe, provider);
 
-                _inVmPipeAddress.put(port, pipe);
-                _logger.info("Created InVM Qpid.AMQP listening on port " + port);
-            }
-            catch (IOException e)
+            if (!_inVmPipeAddress.containsKey(port))
             {
-                _logger.error("Got IOException.", e);
-
-                // Try and unbind provider
+                _logger.info("Creating InVM Qpid.AMQP listening on port " + port);
+                IoHandlerAdapter provider = null;
                 try
                 {
                     VmPipeAddress pipe = new VmPipeAddress(port);
 
-                    try
-                    {
-                        _acceptor.unbind(pipe);
-                    }
-                    catch (Exception ignore)
-                    {
-                        // ignore
-                    }
-
-                    if (provider == null)
-                    {
-                        provider = createBrokerInstance(port);
-                    }
+                    provider = createBrokerInstance(port);
 
                     _acceptor.bind(pipe, provider);
+
                     _inVmPipeAddress.put(port, pipe);
                     _logger.info("Created InVM Qpid.AMQP listening on port " + port);
                 }
-                catch (IOException justUseFirstException)
+                catch (IOException e)
                 {
-                    String because;
-                    if (e.getCause() == null)
+                    _logger.error("Got IOException.", e);
+
+                    // Try and unbind provider
+                    try
                     {
-                        because = e.toString();
+                        VmPipeAddress pipe = new VmPipeAddress(port);
+
+                        try
+                        {
+                            _acceptor.unbind(pipe);
+                        }
+                        catch (Exception ignore)
+                        {
+                            // ignore
+                        }
+
+                        if (provider == null)
+                        {
+                            provider = createBrokerInstance(port);
+                        }
+
+                        _acceptor.bind(pipe, provider);
+                        _inVmPipeAddress.put(port, pipe);
+                        _logger.info("Created InVM Qpid.AMQP listening on port " + port);
                     }
-                    else
+                    catch (IOException justUseFirstException)
                     {
-                        because = e.getCause().toString();
-                    }
+                        String because;
+                        if (e.getCause() == null)
+                        {
+                            because = e.toString();
+                        }
+                        else
+                        {
+                            because = e.getCause().toString();
+                        }
 
-                    throw new AMQVMBrokerCreationException(null, port, because + " Stopped binding of InVM Qpid.AMQP", e);
+                        throw new AMQVMBrokerCreationException(null, port, because + " Stopped binding of InVM Qpid.AMQP", e);
+                    }
                 }
+
+            }
+            else
+            {
+                _logger.info("InVM Qpid.AMQP on port " + port + " already exits.");
             }
         }
-        else
-        {
-            _logger.info("InVM Qpid.AMQP on port " + port + " already exits.");
-        }
-
     }
 
     private static IoHandlerAdapter createBrokerInstance(int port) throws AMQVMBrokerCreationException
@@ -324,7 +304,7 @@
         _logger.info("Killing all VM Brokers");
         if (_acceptor != null)
         {
-        	_acceptor.unbindAll();
+            _acceptor.unbindAll();
         }
         synchronized (_inVmPipeAddress)
         {
@@ -337,14 +317,17 @@
 
     public static void killVMBroker(int port)
     {
-        VmPipeAddress pipe = (VmPipeAddress) _inVmPipeAddress.get(port);
-        if (pipe != null)
+        synchronized (_inVmPipeAddress)
         {
-            _logger.info("Killing VM Broker:" + port);
-            _inVmPipeAddress.remove(port);
-            // This does need to be sychronized as otherwise mina can hang
-            // if a new connection is made
-            _acceptor.unbind(pipe);
+            VmPipeAddress pipe = (VmPipeAddress) _inVmPipeAddress.get(port);
+            if (pipe != null)
+            {
+                _logger.info("Killing VM Broker:" + port);
+                _inVmPipeAddress.remove(port);
+                // This does need to be sychronized as otherwise mina can hang
+                // if a new connection is made
+                _acceptor.unbind(pipe);
+            }
         }
     }