You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2010/01/21 03:46:15 UTC
svn commit: r901506 - in /qpid/trunk/qpid/java:
client/src/main/java/org/apache/qpid/client/
client/src/main/java/org/apache/qpid/client/protocol/
client/src/main/java/org/apache/qpid/jms/
common/src/main/java/org/apache/qpid/transport/ common/src/main...
Author: rajith
Date: Thu Jan 21 02:45:35 2010
New Revision: 901506
URL: http://svn.apache.org/viewvc?rev=901506&view=rev
Log:
The commit contains fixes for QPID-2351, QPID-2350 and some ground work for QPID-2352
- Modified Connection.java to add more than one ConnectionListener. This was done to facilitate the SASL encryption patch - QPID-2352.
- Changed the access modifier for getSaslClient method to "public" to allow the SaslClient to be retrieved by the SASL encryption code -QPID-2352.
- Introduced ConnectionSettings object to hold all the configuration options. Previous constructor methods remains unchanged.
- Modified the ClientDelegate to handle heartbeat and idelTimeout value properly.
- Added support to specify config options via the connection URL - QPID-2351
- Added support to handle the heartbeat/idle_timeout options properly in the 0-10 code - QPID-2350. However once QPID-2343 is completed, the code will be further simplified.
Added:
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java?rev=901506&r1=901505&r2=901506&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java Thu Jan 21 02:45:35 2010
@@ -255,11 +255,11 @@
return BrokerDetails.DEFAULT_CONNECT_TIMEOUT;
}
- public boolean useSSL()
+ public boolean getBooleanProperty(String propName)
{
- if (_options.containsKey(ConnectionURL.OPTIONS_SSL))
+ if (_options.containsKey(propName))
{
- return Boolean.parseBoolean(_options.get(ConnectionURL.OPTIONS_SSL));
+ return Boolean.parseBoolean(_options.get(propName));
}
return false;
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=901506&r1=901505&r2=901506&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Thu Jan 21 02:45:35 2010
@@ -381,10 +381,10 @@
useSSL
? (ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@"
+ ((clientName == null) ? "" : clientName) + virtualHost + "?brokerlist='tcp://" + host + ":" + port
- + "'" + "," + ConnectionURL.OPTIONS_SSL + "='true'")
+ + "'" + "," + BrokerDetails.OPTIONS_SSL + "='true'")
: (ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password + "@"
+ ((clientName == null) ? "" : clientName) + virtualHost + "?brokerlist='tcp://" + host + ":" + port
- + "'" + "," + ConnectionURL.OPTIONS_SSL + "='false'")), sslConfig);
+ + "'" + "," + BrokerDetails.OPTIONS_SSL + "='false'")), sslConfig);
}
public AMQConnection(String connection) throws AMQException, URLSyntaxException
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java?rev=901506&r1=901505&r2=901506&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java Thu Jan 21 02:45:35 2010
@@ -58,8 +58,6 @@
<T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E;
- void setIdleTimeout(long l);
-
int getMaxChannelID();
ProtocolVersion getProtocolVersion();
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=901506&r1=901505&r2=901506&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java Thu Jan 21 02:45:35 2010
@@ -41,6 +41,7 @@
import org.apache.qpid.transport.ConnectionClose;
import org.apache.qpid.transport.ConnectionException;
import org.apache.qpid.transport.ConnectionListener;
+import org.apache.qpid.transport.ConnectionSettings;
import org.apache.qpid.transport.ProtocolVersionException;
import org.apache.qpid.transport.TransportException;
import org.slf4j.Logger;
@@ -69,7 +70,7 @@
{
_conn = conn;
_qpidConnection = new Connection();
- _qpidConnection.setConnectionListener(this);
+ _qpidConnection.addConnectionListener(this);
}
/**
@@ -149,40 +150,64 @@
{
if (_logger.isDebugEnabled())
{
- _logger.debug("connecting to host: " + brokerDetail.getHost() +
- " port: " + brokerDetail.getPort() +
- " vhost: " + _conn.getVirtualHost() +
- " username: " + _conn.getUsername() +
- " password: " + _conn.getPassword());
+ _logger.debug("connecting to host: " + brokerDetail.getHost()
+ + " port: " + brokerDetail.getPort() + " vhost: "
+ + _conn.getVirtualHost() + " username: "
+ + _conn.getUsername() + " password: "
+ + _conn.getPassword());
}
-
- if (brokerDetail.getProperty(BrokerDetails.OPTIONS_IDLE_TIMEOUT) != null)
- {
- this.setIdleTimeout(Long.parseLong(brokerDetail.getProperty(BrokerDetails.OPTIONS_IDLE_TIMEOUT)));
- }
- else
- {
- // use the default value set for all connections
- this.setIdleTimeout(Long.getLong(ClientProperties.IDLE_TIMEOUT_PROP_NAME,ClientProperties.DEFAULT_IDLE_TIMEOUT));
- }
-
- String saslMechs = brokerDetail.getProperty("sasl_mechs")!= null?
- brokerDetail.getProperty("sasl_mechs"):
- System.getProperty("qpid.sasl_mechs","PLAIN");
-
- _qpidConnection.connect(brokerDetail.getHost(), brokerDetail.getPort(), _conn.getVirtualHost(),
- _conn.getUsername(), _conn.getPassword(), brokerDetail.useSSL(),saslMechs);
+
+ String saslMechs = brokerDetail.getProperty(BrokerDetails.OPTIONS_SASL_MECHS) != null ?
+ brokerDetail.getProperty(BrokerDetails.OPTIONS_SASL_MECHS):
+ System.getProperty("qpid.sasl_mechs", "PLAIN");
+
+ // Sun SASL Kerberos client uses the
+ // protocol + servername as the service key.
+ String protocol = brokerDetail.getProperty(BrokerDetails.OPTIONS_SASL_PROTOCOL_NAME) != null ?
+ brokerDetail.getProperty(BrokerDetails.OPTIONS_SASL_PROTOCOL_NAME):
+ System.getProperty("qpid.sasl_protocol", "AMQP");
+
+ String saslServerName = brokerDetail.getProperty(BrokerDetails.OPTIONS_SASL_SERVER_NAME) != null ?
+ brokerDetail.getProperty(BrokerDetails.OPTIONS_SASL_SERVER_NAME):
+ System.getProperty("qpid.sasl_server_name", "localhost");
+
+ boolean useSSL = brokerDetail.getBooleanProperty(BrokerDetails.OPTIONS_SASL_ENCRYPTION);
+
+ boolean useSASLEncryption = brokerDetail.getBooleanProperty(BrokerDetails.OPTIONS_SASL_ENCRYPTION)?
+ brokerDetail.getBooleanProperty(BrokerDetails.OPTIONS_SASL_ENCRYPTION):
+ Boolean.getBoolean("qpid.sasl_encryption");
+
+ boolean useTcpNodelay = brokerDetail.getBooleanProperty(BrokerDetails.OPTIONS_TCP_NO_DELAY)?
+ brokerDetail.getBooleanProperty(BrokerDetails.OPTIONS_TCP_NO_DELAY):
+ Boolean.getBoolean("amqj.tcp_nodelay");
+
+
+ ConnectionSettings conSettings = new ConnectionSettings();
+ conSettings.setHost(brokerDetail.getHost());
+ conSettings.setPort(brokerDetail.getPort());
+ conSettings.setVhost(_conn.getVirtualHost());
+ conSettings.setUsername(_conn.getUsername());
+ conSettings.setPassword(_conn.getPassword());
+ conSettings.setUseSASLEncryption(useSASLEncryption);
+ conSettings.setUseSSL(useSSL);
+ conSettings.setSaslMechs(saslMechs);
+ conSettings.setTcpNodelay(useTcpNodelay);
+ conSettings.setSaslProtocol(protocol);
+ conSettings.setSaslServerName(saslServerName);
+ conSettings.setHeartbeatInterval(getHeartbeatInterval(brokerDetail));
+
+ _qpidConnection.connect(conSettings);
+
_conn._connected = true;
_conn.setUsername(_qpidConnection.getUserID());
_conn._failoverPolicy.attainedConnection();
- }
- catch(ProtocolVersionException pe)
+ } catch (ProtocolVersionException pe)
{
return new ProtocolVersion(pe.getMajor(), pe.getMinor());
- }
- catch (ConnectionException e)
- {
- throw new AMQException(AMQConstant.CHANNEL_ERROR, "cannot connect to broker", e);
+ } catch (ConnectionException e)
+ {
+ throw new AMQException(AMQConstant.CHANNEL_ERROR,
+ "cannot connect to broker", e);
}
return null;
@@ -293,11 +318,6 @@
}
}
- public void setIdleTimeout(long l)
- {
- _qpidConnection.setIdleTimeout((int)l);
- }
-
public int getMaxChannelID()
{
return Integer.MAX_VALUE;
@@ -307,4 +327,30 @@
{
return ProtocolVersion.v0_10;
}
+
+ // The idle_timeout prop is in milisecs while
+ // the new heartbeat prop is in secs
+ private int getHeartbeatInterval(BrokerDetails brokerDetail)
+ {
+ int heartbeat = 0;
+ if (brokerDetail.getProperty(BrokerDetails.OPTIONS_IDLE_TIMEOUT) != null)
+ {
+ _logger.warn("Broker property idle_timeout=<mili_secs> is deprecated, please use heartbeat=<secs>");
+ heartbeat = Integer.parseInt(brokerDetail.getProperty(BrokerDetails.OPTIONS_IDLE_TIMEOUT))/1000;
+ }
+ else if (brokerDetail.getProperty(BrokerDetails.OPTIONS_HEARTBEAT) != null)
+ {
+ heartbeat = Integer.parseInt(brokerDetail.getProperty(BrokerDetails.OPTIONS_HEARTBEAT));
+ }
+ else if (Integer.getInteger(ClientProperties.IDLE_TIMEOUT_PROP_NAME) != null)
+ {
+ heartbeat = Integer.getInteger(ClientProperties.IDLE_TIMEOUT_PROP_NAME)/1000;
+ _logger.warn("JVM arg -Didle_timeout=<mili_secs> is deprecated, please use -Dqpid.heartbeat=<secs>");
+ }
+ else
+ {
+ heartbeat = Integer.getInteger(ClientProperties.HEARTBEAT,ClientProperties.HEARTBEAT_DEFAULT);
+ }
+ return 0;
+ }
}
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java?rev=901506&r1=901505&r2=901506&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java Thu Jan 21 02:45:35 2010
@@ -304,8 +304,6 @@
}
}
- public void setIdleTimeout(long l){}
-
public int getMaxChannelID()
{
return (int) (Math.pow(2, 16)-1);
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=901506&r1=901505&r2=901506&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Thu Jan 21 02:45:35 2010
@@ -204,7 +204,7 @@
IoTransport.connect_0_9(getProtocolSession(),
brokerDetail.getHost(),
brokerDetail.getPort(),
- brokerDetail.useSSL());
+ brokerDetail.getBooleanProperty(BrokerDetails.OPTIONS_SSL));
_protocolSession.init();
}
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java?rev=901506&r1=901505&r2=901506&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java Thu Jan 21 02:45:35 2010
@@ -34,8 +34,14 @@
public static final String OPTIONS_RETRY = "retries";
public static final String OPTIONS_CONNECT_TIMEOUT = "connecttimeout";
public static final String OPTIONS_CONNECT_DELAY = "connectdelay";
- public static final String OPTIONS_IDLE_TIMEOUT = "idle_timeout";
+ public static final String OPTIONS_IDLE_TIMEOUT = "idle_timeout"; // deprecated
+ public static final String OPTIONS_HEARTBEAT = "heartbeat";
public static final String OPTIONS_SASL_MECHS = "sasl_mechs";
+ public static final String OPTIONS_SASL_ENCRYPTION = "sasl_encryption";
+ public static final String OPTIONS_SSL = "ssl";
+ public static final String OPTIONS_TCP_NO_DELAY = "tcp_nodelay";
+ public static final String OPTIONS_SASL_PROTOCOL_NAME = "sasl_protocol";
+ public static final String OPTIONS_SASL_SERVER_NAME = "sasl_server";
public static final int DEFAULT_PORT = 5672;
public static final String SOCKET = "socket";
@@ -97,7 +103,7 @@
void setSSLConfiguration(SSLConfiguration sslConfiguration);
- boolean useSSL();
+ boolean getBooleanProperty(String propName);
String toString();
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java?rev=901506&r1=901505&r2=901506&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java Thu Jan 21 02:45:35 2010
@@ -40,8 +40,7 @@
public static final String OPTIONS_SYNC_PUBLISH = "sync_publish";
public static final String OPTIONS_BROKERLIST = "brokerlist";
public static final String OPTIONS_FAILOVER = "failover";
- public static final String OPTIONS_FAILOVER_CYCLE = "cyclecount";
- public static final String OPTIONS_SSL = "ssl";
+ public static final String OPTIONS_FAILOVER_CYCLE = "cyclecount";
public static final String OPTIONS_DEFAULT_TOPIC_EXCHANGE = "defaultTopicExchange";
public static final String OPTIONS_DEFAULT_QUEUE_EXCHANGE = "defaultQueueExchange";
public static final String OPTIONS_TEMPORARY_TOPIC_EXCHANGE = "temporaryTopicExchange";
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java?rev=901506&r1=901505&r2=901506&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java Thu Jan 21 02:45:35 2010
@@ -61,24 +61,13 @@
} catch (GSSException ignore) {}
}
- private String vhost;
- private String username;
- private String password;
private List<String> clientMechs;
- private String protocol;
- private String serverName;
+ private ConnectionSettings conSettings;
- public ClientDelegate(String vhost, String username, String password,String saslMechs)
+ public ClientDelegate(ConnectionSettings settings)
{
- this.vhost = vhost;
- this.username = username;
- this.password = password;
- this.clientMechs = Arrays.asList(saslMechs.split(" "));
-
- // Looks kinda of silly but the Sun SASL Kerberos client uses the
- // protocol + servername as the service key.
- this.protocol = System.getProperty("qpid.sasl_protocol","AMQP");
- this.serverName = System.getProperty("qpid.sasl_server_name","localhost");
+ this.conSettings = settings;
+ this.clientMechs = Arrays.asList(settings.getSaslMechs().split(" "));
}
public void init(Connection conn, ProtocolHeader hdr)
@@ -128,11 +117,16 @@
try
{
+ Map<String,Object> saslProps = new HashMap<String,Object>();
+ if (conSettings.isUseSASLEncryption())
+ {
+ saslProps.put(Sasl.QOP, "auth-conf");
+ }
UsernamePasswordCallbackHandler handler =
new UsernamePasswordCallbackHandler();
- handler.initialise(username, password);
+ handler.initialise(conSettings.getUsername(), conSettings.getPassword());
SaslClient sc = Sasl.createSaslClient
- (mechs, null, protocol, serverName, null, handler);
+ (mechs, null, conSettings.getSaslProtocol(), conSettings.getSaslServerName(), saslProps, handler);
conn.setSaslClient(sc);
byte[] response = sc.hasInitialResponse() ?
@@ -164,15 +158,16 @@
@Override public void connectionTune(Connection conn, ConnectionTune tune)
{
conn.setChannelMax(tune.getChannelMax());
- int hb_interval = calculateHeartbeatInterval(conn,
+ int hb_interval = calculateHeartbeatInterval(conSettings.getHeartbeatInterval(),
tune.getHeartbeatMin(),
tune.getHeartbeatMax()
);
conn.connectionTuneOk(tune.getChannelMax(),
tune.getMaxFrameSize(),
hb_interval);
- conn.setIdleTimeout(hb_interval*1000);
- conn.connectionOpen(vhost, null, Option.INSIST);
+ // The idle timeout is twice the heartbeat amount (in milisecs)
+ conn.setIdleTimeout(hb_interval*1000*2);
+ conn.connectionOpen(conSettings.getVhost(), null, Option.INSIST);
}
@Override public void connectionOpenOk(Connection conn, ConnectionOpenOk ok)
@@ -198,9 +193,9 @@
/**
* Currently the spec specified the min and max for heartbeat using secs
*/
- private int calculateHeartbeatInterval(Connection conn,int min, int max)
+ private int calculateHeartbeatInterval(int heartbeat,int min, int max)
{
- int i = conn.getIdleTimeout()/1000;
+ int i = heartbeat;
if (i == 0)
{
log.warn("Idle timeout is zero. Heartbeats are disabled");
@@ -245,7 +240,7 @@
private String getUserID()
{
log.debug("Obtaining userID from kerberos");
- String service = protocol + "@" + serverName;
+ String service = conSettings.getSaslProtocol() + "@" + conSettings.getSaslServerName();
GSSManager manager = GSSManager.getInstance();
try
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java?rev=901506&r1=901505&r2=901506&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java Thu Jan 21 02:45:35 2010
@@ -76,7 +76,7 @@
private State state = NEW;
final private Object lock = new Object();
private long timeout = 60000;
- private ConnectionListener listener = new DefaultConnectionListener();
+ private List<ConnectionListener> listeners = new ArrayList<ConnectionListener>();
private ConnectionException error = null;
private int channelMax = 1;
@@ -86,7 +86,8 @@
private int idleTimeout = 0;
private String _authorizationID;
private String userID;
-
+ private ConnectionSettings conSettings;
+
// want to make this final
private int _connectionId;
@@ -97,16 +98,9 @@
this.delegate = delegate;
}
- public void setConnectionListener(ConnectionListener listener)
+ public void addConnectionListener(ConnectionListener listener)
{
- if (listener == null)
- {
- this.listener = new DefaultConnectionListener();
- }
- else
- {
- this.listener = listener;
- }
+ listeners.add(listener);
}
public Sender<ProtocolEvent> getSender()
@@ -154,7 +148,7 @@
this.saslClient = saslClient;
}
- SaslClient getSaslClient()
+ public SaslClient getSaslClient()
{
return saslClient;
}
@@ -171,13 +165,30 @@
public void connect(String host, int port, String vhost, String username, String password, boolean ssl,String saslMechs)
{
+ ConnectionSettings settings = new ConnectionSettings();
+ settings.setHost(host);
+ settings.setPort(port);
+ settings.setVhost(vhost);
+ settings.setUsername(username);
+ settings.setPassword(password);
+ settings.setUseSSL(ssl);
+ settings.setSaslMechs(saslMechs);
+ connect(settings);
+ }
+
+ public void connect(ConnectionSettings settings)
+ {
synchronized (lock)
{
+ conSettings = settings;
state = OPENING;
- userID = username;
- delegate = new ClientDelegate(vhost, username, password,saslMechs);
+ userID = settings.getUsername();
+ delegate = new ClientDelegate(settings);
- IoTransport.connect(host, port, ConnectionBinding.get(this), ssl);
+ IoTransport.connect(settings.getHost(),
+ settings.getPort(),
+ ConnectionBinding.get(this),
+ settings.isUseSSL());
send(new ProtocolHeader(1, 0, 10));
Waiter w = new Waiter(lock, timeout);
@@ -218,7 +229,10 @@
}
}
- listener.opened(this);
+ for (ConnectionListener listener: listeners)
+ {
+ listener.opened(this);
+ }
}
public Session createSession()
@@ -407,7 +421,11 @@
}
}
- listener.exception(this, e);
+ for (ConnectionListener listener: listeners)
+ {
+ listener.exception(this, e);
+ }
+
}
public void exception(Throwable t)
@@ -460,7 +478,10 @@
setState(CLOSED);
}
- listener.closed(this);
+ for (ConnectionListener listener: listeners)
+ {
+ listener.closed(this);
+ }
}
public void close()
@@ -560,5 +581,10 @@
{
return String.format("conn:%x", System.identityHashCode(this));
}
+
+ public ConnectionSettings getConnectionSettings()
+ {
+ return conSettings;
+ }
}
Added: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java?rev=901506&view=auto
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java (added)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java Thu Jan 21 02:45:35 2010
@@ -0,0 +1,190 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+public class ConnectionSettings
+{
+ String protocol = "tcp";
+ String host = "localhost";
+ String vhost;
+ String username = "guest";
+ String password = "guest";
+ String saslMechs = "PLAIN";
+ String saslProtocol = "AMQP";
+ String saslServerName = "localhost";
+ int port = 5672;
+ int maxChannelCount = 32767;
+ int maxFrameSize = 65535;
+ int heartbeatInterval;
+ boolean useSSL;
+ boolean useSASLEncryption;
+ boolean tcpNodelay;
+
+ public boolean isTcpNodelay()
+ {
+ return tcpNodelay;
+ }
+
+ public void setTcpNodelay(boolean tcpNodelay)
+ {
+ this.tcpNodelay = tcpNodelay;
+ }
+
+ public int getHeartbeatInterval()
+ {
+ return heartbeatInterval;
+ }
+
+ public void setHeartbeatInterval(int heartbeatInterval)
+ {
+ this.heartbeatInterval = heartbeatInterval;
+ }
+
+ public String getProtocol()
+ {
+ return protocol;
+ }
+
+ public void setProtocol(String protocol)
+ {
+ this.protocol = protocol;
+ }
+
+ public String getHost()
+ {
+ return host;
+ }
+
+ public void setHost(String host)
+ {
+ this.host = host;
+ }
+
+ public int getPort()
+ {
+ return port;
+ }
+
+ public void setPort(int port)
+ {
+ this.port = port;
+ }
+
+ public String getVhost()
+ {
+ return vhost;
+ }
+
+ public void setVhost(String vhost)
+ {
+ this.vhost = vhost;
+ }
+
+ public String getUsername()
+ {
+ return username;
+ }
+
+ public void setUsername(String username)
+ {
+ this.username = username;
+ }
+
+ public String getPassword()
+ {
+ return password;
+ }
+
+ public void setPassword(String password)
+ {
+ this.password = password;
+ }
+
+ public boolean isUseSSL()
+ {
+ return useSSL;
+ }
+
+ public void setUseSSL(boolean useSSL)
+ {
+ this.useSSL = useSSL;
+ }
+
+ public boolean isUseSASLEncryption()
+ {
+ return useSASLEncryption;
+ }
+
+ public void setUseSASLEncryption(boolean useSASLEncryption)
+ {
+ this.useSASLEncryption = useSASLEncryption;
+ }
+
+ public String getSaslMechs()
+ {
+ return saslMechs;
+ }
+
+ public void setSaslMechs(String saslMechs)
+ {
+ this.saslMechs = saslMechs;
+ }
+
+ public String getSaslProtocol()
+ {
+ return saslProtocol;
+ }
+
+ public void setSaslProtocol(String saslProtocol)
+ {
+ this.saslProtocol = saslProtocol;
+ }
+
+ public String getSaslServerName()
+ {
+ return saslServerName;
+ }
+
+ public void setSaslServerName(String saslServerName)
+ {
+ this.saslServerName = saslServerName;
+ }
+
+ public int getMaxChannelCount()
+ {
+ return maxChannelCount;
+ }
+
+ public void setMaxChannelCount(int maxChannelCount)
+ {
+ this.maxChannelCount = maxChannelCount;
+ }
+
+ public int getMaxFrameSize()
+ {
+ return maxFrameSize;
+ }
+
+ public void setMaxFrameSize(int maxFrameSize)
+ {
+ this.maxFrameSize = maxFrameSize;
+ }
+}
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java?rev=901506&r1=901505&r2=901506&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java Thu Jan 21 02:45:35 2010
@@ -297,7 +297,7 @@
{
try
{
- socket.setSoTimeout(i*2);
+ socket.setSoTimeout(i);
}
catch (Exception e)
{
Modified: qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java?rev=901506&r1=901505&r2=901506&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java (original)
+++ qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java Thu Jan 21 02:45:35 2010
@@ -160,7 +160,7 @@
private Connection connect(final Condition closed)
{
Connection conn = new Connection();
- conn.setConnectionListener(new ConnectionListener()
+ conn.addConnectionListener(new ConnectionListener()
{
public void opened(Connection conn) {}
public void exception(Connection conn, ConnectionException exc)
@@ -311,7 +311,7 @@
startServer();
Connection conn = new Connection();
- conn.setConnectionListener(new FailoverConnectionListener());
+ conn.addConnectionListener(new FailoverConnectionListener());
conn.connect("localhost", port, null, "guest", "guest");
Session ssn = conn.createSession(1);
ssn.setSessionListener(new TestSessionListener());
@@ -366,7 +366,7 @@
startServer();
Connection conn = new Connection();
- conn.setConnectionListener(new FailoverConnectionListener());
+ conn.addConnectionListener(new FailoverConnectionListener());
conn.connect("localhost", port, null, "guest", "guest");
Session ssn = conn.createSession(1);
ssn.setSessionListener(new TestSessionListener());
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org