You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2008/07/15 19:03:34 UTC
svn commit: r676973 -
/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
Author: ritchiem
Date: Tue Jul 15 10:03:34 2008
New Revision: 676973
URL: http://svn.apache.org/viewvc?rev=676973&view=rev
Log:
QPID-984 : Applied fix from M2.1.x that adds requried synchronization around setup and tear down of Connections.
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java?rev=676973&r1=676972&r2=676973&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java Tue Jul 15 10:03:34 2008
@@ -40,7 +40,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.net.Socket;
-
/**
* The TransportConnection is a helper class responsible for connecting to an AMQ server. It sets up the underlying
* connector, which currently always uses TCP/IP sockets. It creates the "protocol handler" which deals with MINA
@@ -85,38 +84,18 @@
throw new AMQNoTransportForProtocolException(details, null, null);
}
- /* if (transport == _currentInstance)
- {
- if (transport == VM)
- {
- if (_currentVMPort == details.getPort())
- {
- return _instance;
- }
- }
- else
- {
- return _instance;
- }
- }
-
- _currentInstance = transport;*/
-
- ITransportConnection instance;
switch (transport)
{
case SOCKET:
- instance =
- new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory()
- {
- public IoConnector newSocketConnector()
- {
- return new ExistingSocketConnector();
- }
- });
- break;
+ return new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory()
+ {
+ public IoConnector newSocketConnector()
+ {
+ return new ExistingSocketConnector();
+ }
+ });
case TCP:
- instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory()
+ return new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory()
{
public IoConnector newSocketConnector()
{
@@ -125,8 +104,8 @@
if (Boolean.getBoolean("qpidnio"))
{
_logger.warn("Using Qpid MultiThreaded NIO - " + (System.getProperties().containsKey("qpidnio")
- ? "Qpid NIO is new default"
- : "Sysproperty 'qpidnio' is set"));
+ ? "Qpid NIO is new default"
+ : "Sysproperty 'qpidnio' is set"));
result = new MultiThreadSocketConnector();
}
else
@@ -141,18 +120,13 @@
return result;
}
});
- break;
case VM:
{
- instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker"));
- break;
+ return getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker"));
}
default:
- // FIXME: TGM
- throw new AMQNoTransportForProtocolException(details, null, null);
+ throw new AMQNoTransportForProtocolException(details, "Transport not recognised:" + transport, null);
}
-
- return instance;
}
private static int getTransport(String transport)
@@ -180,13 +154,21 @@
{
int port = details.getPort();
- if (!_inVmPipeAddress.containsKey(port))
+ synchronized (_inVmPipeAddress)
{
- if (AutoCreate)
+ if (!_inVmPipeAddress.containsKey(port))
{
if (AutoCreate)
{
- createVMBroker(port);
+ if (AutoCreate)
+ {
+ createVMBroker(port);
+ }
+ else
+ {
+ throw new AMQVMBrokerCreationException(null, port, "VM Broker on port " + port
+ + " does not exist. Auto create disabled.", null);
+ }
}
else
{
@@ -194,11 +176,6 @@
+ " does not exist. Auto create disabled.", null);
}
}
- else
- {
- throw new AMQVMBrokerCreationException(null, port, "VM Broker on port " + port
- + " does not exist. Auto create disabled.", null);
- }
}
return new VmPipeTransportConnection(port);
@@ -214,70 +191,73 @@
config.setThreadModel(ReadWriteThreadModel.getInstance());
}
-
- if (!_inVmPipeAddress.containsKey(port))
+ synchronized (_inVmPipeAddress)
{
- _logger.info("Creating InVM Qpid.AMQP listening on port " + port);
- IoHandlerAdapter provider = null;
- try
- {
- VmPipeAddress pipe = new VmPipeAddress(port);
-
- provider = createBrokerInstance(port);
-
- _acceptor.bind(pipe, provider);
- _inVmPipeAddress.put(port, pipe);
- _logger.info("Created InVM Qpid.AMQP listening on port " + port);
- }
- catch (IOException e)
+ if (!_inVmPipeAddress.containsKey(port))
{
- _logger.error("Got IOException.", e);
-
- // Try and unbind provider
+ _logger.info("Creating InVM Qpid.AMQP listening on port " + port);
+ IoHandlerAdapter provider = null;
try
{
VmPipeAddress pipe = new VmPipeAddress(port);
- try
- {
- _acceptor.unbind(pipe);
- }
- catch (Exception ignore)
- {
- // ignore
- }
-
- if (provider == null)
- {
- provider = createBrokerInstance(port);
- }
+ provider = createBrokerInstance(port);
_acceptor.bind(pipe, provider);
+
_inVmPipeAddress.put(port, pipe);
_logger.info("Created InVM Qpid.AMQP listening on port " + port);
}
- catch (IOException justUseFirstException)
+ catch (IOException e)
{
- String because;
- if (e.getCause() == null)
+ _logger.error("Got IOException.", e);
+
+ // Try and unbind provider
+ try
{
- because = e.toString();
+ VmPipeAddress pipe = new VmPipeAddress(port);
+
+ try
+ {
+ _acceptor.unbind(pipe);
+ }
+ catch (Exception ignore)
+ {
+ // ignore
+ }
+
+ if (provider == null)
+ {
+ provider = createBrokerInstance(port);
+ }
+
+ _acceptor.bind(pipe, provider);
+ _inVmPipeAddress.put(port, pipe);
+ _logger.info("Created InVM Qpid.AMQP listening on port " + port);
}
- else
+ catch (IOException justUseFirstException)
{
- because = e.getCause().toString();
- }
+ String because;
+ if (e.getCause() == null)
+ {
+ because = e.toString();
+ }
+ else
+ {
+ because = e.getCause().toString();
+ }
- throw new AMQVMBrokerCreationException(null, port, because + " Stopped binding of InVM Qpid.AMQP", e);
+ throw new AMQVMBrokerCreationException(null, port, because + " Stopped binding of InVM Qpid.AMQP", e);
+ }
}
+
+ }
+ else
+ {
+ _logger.info("InVM Qpid.AMQP on port " + port + " already exits.");
}
}
- else
- {
- _logger.info("InVM Qpid.AMQP on port " + port + " already exits.");
- }
-
}
private static IoHandlerAdapter createBrokerInstance(int port) throws AMQVMBrokerCreationException
@@ -324,7 +304,7 @@
_logger.info("Killing all VM Brokers");
if (_acceptor != null)
{
- _acceptor.unbindAll();
+ _acceptor.unbindAll();
}
synchronized (_inVmPipeAddress)
{
@@ -337,14 +317,17 @@
public static void killVMBroker(int port)
{
- VmPipeAddress pipe = (VmPipeAddress) _inVmPipeAddress.get(port);
- if (pipe != null)
+ synchronized (_inVmPipeAddress)
{
- _logger.info("Killing VM Broker:" + port);
- _inVmPipeAddress.remove(port);
- // This does need to be sychronized as otherwise mina can hang
- // if a new connection is made
- _acceptor.unbind(pipe);
+ VmPipeAddress pipe = (VmPipeAddress) _inVmPipeAddress.get(port);
+ if (pipe != null)
+ {
+ _logger.info("Killing VM Broker:" + port);
+ _inVmPipeAddress.remove(port);
+ // This does need to be sychronized as otherwise mina can hang
+ // if a new connection is made
+ _acceptor.unbind(pipe);
+ }
}
}