You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2013/10/30 22:38:04 UTC
svn commit: r1537313 - in /qpid/trunk/qpid: doc/book/src/cpp-broker/
doc/book/src/programming/
java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/
java/client/src/main/java/org/apache/qpid/client/ java/client/src/m...
Author: kwall
Date: Wed Oct 30 21:38:03 2013
New Revision: 1537313
URL: http://svn.apache.org/r1537313
Log:
QPID-4534: unify client heartbeat system properties/connection url options.
* Connection url 'heartbeat' broker-option (and deprecated 'idle_timeout') now understood for all protocols
* System property 'qpid.heartbeat' (and deprecated 'amqj.heartbeat.delay' and 'idle_timeout') now understood for all protocols
* Enhanced heartbeat system tests
* Docbook updates
Original patch from Keith Wall, plus updates from Robbie Gemmell
Removed:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatConfig.java
Modified:
qpid/trunk/qpid/doc/book/src/cpp-broker/Active-Passive-Cluster.xml
qpid/trunk/qpid/doc/book/src/programming/Programming-In-Apache-Qpid.xml
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/ConnectionTuneParameters.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/Accessor.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/QpidProperty.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java
qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/configuration/QpidPropertyTest.java
qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionSettingsTest.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/HeartbeatTest.java
qpid/trunk/qpid/java/test-profiles/CPPExcludes
qpid/trunk/qpid/java/test-profiles/Java010Excludes
Modified: qpid/trunk/qpid/doc/book/src/cpp-broker/Active-Passive-Cluster.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/doc/book/src/cpp-broker/Active-Passive-Cluster.xml?rev=1537313&r1=1537312&r2=1537313&view=diff
==============================================================================
--- qpid/trunk/qpid/doc/book/src/cpp-broker/Active-Passive-Cluster.xml (original)
+++ qpid/trunk/qpid/doc/book/src/cpp-broker/Active-Passive-Cluster.xml Wed Oct 30 21:38:03 2013
@@ -781,11 +781,11 @@ NOTE: fencing is not shown, you must con
</variablelist>
<para>
- In a Connection URL, heartbeat is set using the <command>idle_timeout</command> property, which is an integer corresponding to the heartbeat period in seconds. For instance, the following line from a JNDI properties file sets the heartbeat time out to 3 seconds:
+ In a Connection URL, heartbeat is set using the <command>heartbeat</command> property, which is an integer corresponding to the heartbeat period in seconds. For instance, the following line from a JNDI properties file sets the heartbeat time out to 3 seconds:
</para>
<screen>
- connectionfactory.qpidConnectionfactory = amqp://guest:guest@clientid/test?brokerlist='tcp://localhost:5672',idle_timeout=3
+ connectionfactory.qpidConnectionfactory = amqp://guest:guest@clientid/test?brokerlist='tcp://localhost:5672'&heartbeat='3'
</screen>
</section>
</section>
Modified: qpid/trunk/qpid/doc/book/src/programming/Programming-In-Apache-Qpid.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/doc/book/src/programming/Programming-In-Apache-Qpid.xml?rev=1537313&r1=1537312&r2=1537313&view=diff
==============================================================================
--- qpid/trunk/qpid/doc/book/src/programming/Programming-In-Apache-Qpid.xml (original)
+++ qpid/trunk/qpid/doc/book/src/programming/Programming-In-Apache-Qpid.xml Wed Oct 30 21:38:03 2013
@@ -3176,7 +3176,8 @@ spout - -content "$(cat rdu.xml | sed -e
integer
</entry>
<entry>
- frequency of heartbeat messages (in seconds)
+ Frequency of heartbeat messages (in seconds). A value of 0 disables heartbeating. <para>For compatibility
+ with old client configuration, option <varname>idle_timeout</varname> (in milliseconds) is also supported.</para>
</entry>
</row>
<row>
@@ -3599,10 +3600,9 @@ spout - -content "$(cat rdu.xml | sed -e
<row>
<entry>qpid.heartbeat</entry>
<entry>int</entry>
- <entry>120 (secs)</entry>
- <entry>The heartbeat interval in seconds. Two consective misssed heartbeats will result in the connection timing out.<para>This can also be set per connection using the <link linkend="section-jms-connection-url">Connection URL</link> options.</para></entry>
+ <entry><para>When using the 0-10 protocol, the default is 120 (secs)</para><para>When using protocols 0-8...0-91, the default is the broker-supplied value.</para></entry>
+ <entry>Frequency of heartbeat messages (in seconds). A value of 0 disables heartbeating. <para>Two consective misssed heartbeats will result in the connection timing out.</para><para>This can also be set per connection using the <link linkend="section-jms-connection-url">Connection URL</link> options.</para><para>For compatibility with old client configuration, the synonym <varname>amqj.heartbeat.delay</varname> is supported.</para></entry>
</row>
-
<row>
<entry>ignore_setclientID</entry>
<entry>boolean</entry>
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1537313&r1=1537312&r2=1537313&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java Wed Oct 30 21:38:03 2013
@@ -1287,11 +1287,6 @@ public class AMQProtocolEngine implement
}
}
- public void init()
- {
- // Do nothing
- }
-
public void setSender(Sender<ByteBuffer> sender)
{
// Do nothing
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java?rev=1537313&r1=1537312&r2=1537313&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java Wed Oct 30 21:38:03 2013
@@ -290,6 +290,19 @@ public class AMQBrokerDetails implements
}
}
+ private int getIntegerProperty(String key)
+ {
+ String stringValue = getProperty(key);
+ try
+ {
+ return Integer.parseInt(stringValue);
+ }
+ catch (NumberFormatException e)
+ {
+ throw new IllegalArgumentException("Cannot parse key " + key + " with value '" + stringValue + "' as integer.", e);
+ }
+ }
+
public String toString()
{
StringBuffer sb = new StringBuffer();
@@ -464,6 +477,16 @@ public class AMQBrokerDetails implements
conSettings.setConnectTimeout(lookupConnectTimeout());
+ if (getProperty(BrokerDetails.OPTIONS_HEARTBEAT) != null)
+ {
+ conSettings.setHeartbeatInterval(getIntegerProperty(BrokerDetails.OPTIONS_HEARTBEAT));
+ }
+ else if (getProperty(BrokerDetails.OPTIONS_IDLE_TIMEOUT) != null)
+ {
+ conSettings.setHeartbeatInterval(getIntegerProperty(BrokerDetails.OPTIONS_IDLE_TIMEOUT) / 1000);
+ }
+
return conSettings;
}
+
}
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=1537313&r1=1537312&r2=1537313&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java Wed Oct 30 21:38:03 2013
@@ -29,7 +29,7 @@ import org.apache.qpid.client.failover.F
import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.client.transport.ClientConnectionDelegate;
import org.apache.qpid.common.ServerPropertyNames;
-import org.apache.qpid.configuration.ClientProperties;
+
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.ChannelLimitReachedException;
@@ -448,8 +448,6 @@ public class AMQConnectionDelegate_0_10
// Ignore
}
- conSettings.setHeartbeatInterval(getHeartbeatInterval(brokerDetail));
-
//Check connection-level ssl override setting
String connectionSslOption = _conn.getConnectionURL().getOption(ConnectionURL.OPTIONS_SSL);
if(connectionSslOption != null)
@@ -470,37 +468,6 @@ public class AMQConnectionDelegate_0_10
return conSettings;
}
-
- // The idle_timeout prop is in milisecs while
- // the new heartbeat prop is in secs
- private int getHeartbeatInterval(BrokerDetails brokerDetail)
- {
- int heartbeat = 0;
- if (brokerDetail.getProperty(BrokerDetails.OPTIONS_IDLE_TIMEOUT) != null)
- {
- _logger.warn("Broker property idle_timeout=<mili_secs> is deprecated, please use heartbeat=<secs>");
- heartbeat = Integer.parseInt(brokerDetail.getProperty(BrokerDetails.OPTIONS_IDLE_TIMEOUT))/1000;
- }
- else if (brokerDetail.getProperty(BrokerDetails.OPTIONS_HEARTBEAT) != null)
- {
- heartbeat = Integer.parseInt(brokerDetail.getProperty(BrokerDetails.OPTIONS_HEARTBEAT));
- }
- else if (Integer.getInteger(ClientProperties.IDLE_TIMEOUT_PROP_NAME) != null)
- {
- heartbeat = Integer.getInteger(ClientProperties.IDLE_TIMEOUT_PROP_NAME)/1000;
- _logger.warn("JVM arg -Didle_timeout=<mili_secs> is deprecated, please use -Dqpid.heartbeat=<secs>");
- }
- else if(Integer.getInteger(ClientProperties.HEARTBEAT) != null)
- {
- heartbeat = Integer.getInteger(ClientProperties.HEARTBEAT,ClientProperties.HEARTBEAT_DEFAULT);
- }
- else
- {
- heartbeat = Integer.getInteger("amqj.heartbeat.delay", ClientProperties.HEARTBEAT_DEFAULT);
- }
- return heartbeat;
- }
-
protected org.apache.qpid.transport.Connection getQpidConnection()
{
return _qpidConnection;
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java?rev=1537313&r1=1537312&r2=1537313&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java Wed Oct 30 21:38:03 2013
@@ -124,10 +124,11 @@ public class AMQConnectionDelegate_8_0 i
NetworkConnection network = transport.connect(settings, securityLayer.receiver(_conn.getProtocolHandler()),
_conn.getProtocolHandler());
+
_conn.getProtocolHandler().setNetworkConnection(network, securityLayer.sender(network.getSender()));
StateWaiter waiter = _conn.getProtocolHandler().createWaiter(openOrClosedStates);
- _conn.getProtocolHandler().getProtocolSession().init();
+ _conn.getProtocolHandler().getProtocolSession().init(settings);
// this blocks until the connection has been set up or when an error
// has prevented the connection being set up
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/ConnectionTuneParameters.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/ConnectionTuneParameters.java?rev=1537313&r1=1537312&r2=1537313&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/ConnectionTuneParameters.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/ConnectionTuneParameters.java Wed Oct 30 21:38:03 2013
@@ -26,9 +26,20 @@ public class ConnectionTuneParameters
private int _channelMax;
- private int _heartbeat;
+ /** Heart-beating interval in seconds, null if not set, use 0 to disable */
+ private Integer _heartbeat;
- private long _txnLimit;
+ private float _heartbeatTimeoutFactor;
+
+ public float getHeartbeatTimeoutFactor()
+ {
+ return _heartbeatTimeoutFactor;
+ }
+
+ public void setHeartbeatTimeoutFactor(float heartbeatTimeoutFactor)
+ {
+ _heartbeatTimeoutFactor = heartbeatTimeoutFactor;
+ }
public long getFrameMax()
{
@@ -50,23 +61,13 @@ public class ConnectionTuneParameters
_channelMax = channelMax;
}
- public int getHeartbeat()
+ public Integer getHeartbeat()
{
return _heartbeat;
}
- public void setHeartbeat(int hearbeat)
+ public void setHeartbeat(Integer hearbeat)
{
_heartbeat = hearbeat;
}
-
- public long getTxnLimit()
- {
- return _txnLimit;
- }
-
- public void setTxnLimit(long txnLimit)
- {
- _txnLimit = txnLimit;
- }
}
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java?rev=1537313&r1=1537312&r2=1537313&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java Wed Oct 30 21:38:03 2013
@@ -52,20 +52,20 @@ public class ConnectionTuneMethodHandler
_logger.debug("ConnectionTune frame received");
final MethodRegistry methodRegistry = session.getMethodRegistry();
-
ConnectionTuneParameters params = session.getConnectionTuneParameters();
- if (params == null)
- {
- params = new ConnectionTuneParameters();
- }
-
+
int maxChannelNumber = frame.getChannelMax();
//0 implies no limit, except that forced by protocol limitations (0xFFFF)
params.setChannelMax(maxChannelNumber == 0 ? AMQProtocolSession.MAX_CHANNEL_MAX : maxChannelNumber);
-
params.setFrameMax(frame.getFrameMax());
- params.setHeartbeat(Integer.getInteger("amqj.heartbeat.delay", frame.getHeartbeat()));
- session.setConnectionTuneParameters(params);
+
+ //if the heart beat delay hasn't been configured, we use the broker-supplied value
+ if (params.getHeartbeat() == null)
+ {
+ params.setHeartbeat(frame.getHeartbeat());
+ }
+
+ session.tuneConnection(params);
session.getStateManager().changeState(AMQState.CONNECTION_NOT_OPENED);
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=1537313&r1=1537312&r2=1537313&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Wed Oct 30 21:38:03 2013
@@ -902,13 +902,13 @@ public class AMQProtocolHandler implemen
return _sender;
}
- /** @param delay delay in seconds (not ms) */
- void initHeartbeats(int delay)
+ void initHeartbeats(int delay, float timeoutFactor)
{
if (delay > 0)
{
_network.setMaxWriteIdle(delay);
- _network.setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay));
+ int readerIdle = (int)(delay * timeoutFactor);
+ _network.setMaxReadIdle(readerIdle);
}
}
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?rev=1537313&r1=1537312&r2=1537313&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Wed Oct 30 21:38:03 2013
@@ -43,6 +43,7 @@ import org.apache.qpid.framing.ProtocolI
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.transport.ConnectionSettings;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.TransportException;
@@ -63,18 +64,10 @@ public class AMQProtocolSession implemen
protected static final Logger _logger = LoggerFactory.getLogger(AMQProtocolSession.class);
- public static final String PROTOCOL_INITIATION_RECEIVED = "ProtocolInitiatiionReceived";
-
//Usable channels are numbered 1 to <ChannelMax>
public static final int MAX_CHANNEL_MAX = 0xFFFF;
public static final int MIN_USABLE_CHANNEL_NUM = 1;
- protected static final String CONNECTION_TUNE_PARAMETERS = "ConnectionTuneParameters";
-
- protected static final String AMQ_CONNECTION = "AMQConnection";
-
- protected static final String SASL_CLIENT = "SASLClient";
-
private final AMQProtocolHandler _protocolHandler;
private ConcurrentMap<Integer, AMQSession> _channelId2SessionMap = new ConcurrentHashMap<Integer, AMQSession>();
@@ -120,13 +113,38 @@ public class AMQProtocolSession implemen
_connection = connection;
}
- public void init()
+ public void init(ConnectionSettings settings)
{
// start the process of setting up the connection. This is the first place that
// data is written to the server.
+ initialiseTuneParameters(settings);
+
_protocolHandler.writeFrame(new ProtocolInitiation(_connection.getProtocolVersion()));
}
+ public ConnectionTuneParameters getConnectionTuneParameters()
+ {
+ return _connectionTuneParameters;
+ }
+
+ private void initialiseTuneParameters(ConnectionSettings settings)
+ {
+ _connectionTuneParameters = new ConnectionTuneParameters();
+ _connectionTuneParameters.setHeartbeat(settings.getHeartbeatInterval08());
+ _connectionTuneParameters.setHeartbeatTimeoutFactor(settings.getHeartbeatTimeoutFactor());
+ }
+
+ public void tuneConnection(ConnectionTuneParameters params)
+ {
+ _connectionTuneParameters = params;
+ AMQConnection con = getAMQConnection();
+
+ con.setMaximumChannelCount(params.getChannelMax());
+ con.setMaximumFrameSize(params.getFrameMax());
+
+ _protocolHandler.initHeartbeats(params.getHeartbeat(), params.getHeartbeatTimeoutFactor());
+ }
+
public String getClientID()
{
try
@@ -170,24 +188,8 @@ public class AMQProtocolSession implemen
_saslClient = client;
}
- public ConnectionTuneParameters getConnectionTuneParameters()
- {
- return _connectionTuneParameters;
- }
-
- public void setConnectionTuneParameters(ConnectionTuneParameters params)
- {
- _connectionTuneParameters = params;
- AMQConnection con = getAMQConnection();
-
- con.setMaximumChannelCount(params.getChannelMax());
- con.setMaximumFrameSize(params.getFrameMax());
- _protocolHandler.initHeartbeats((int) params.getHeartbeat());
- }
-
/**
- * Callback invoked from the BasicDeliverMethodHandler when a message has been received. This is invoked on the MINA
- * dispatcher thread.
+ * Callback invoked from the BasicDeliverMethodHandler when a message has been received.
*
* @param message
*
@@ -409,7 +411,7 @@ public class AMQProtocolSession implemen
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Setting ProtocolVersion to :" + pv);
+ _logger.debug("Setting ProtocolVersion to :" + pv);
}
_protocolVersion = pv;
_methodRegistry = MethodRegistry.getMethodRegistry(pv);
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java?rev=1537313&r1=1537312&r2=1537313&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java Wed Oct 30 21:38:03 2013
@@ -34,8 +34,9 @@ public interface BrokerDetails
public static final String OPTIONS_RETRY = "retries";
public static final String OPTIONS_CONNECT_TIMEOUT = "connecttimeout";
public static final String OPTIONS_CONNECT_DELAY = "connectdelay";
- public static final String OPTIONS_IDLE_TIMEOUT = "idle_timeout"; // deprecated
public static final String OPTIONS_HEARTBEAT = "heartbeat";
+ @Deprecated
+ public static final String OPTIONS_IDLE_TIMEOUT = "idle_timeout";
public static final String OPTIONS_SASL_MECHS = "sasl_mechs";
public static final String OPTIONS_SASL_ENCRYPTION = "sasl_encryption";
public static final String OPTIONS_SSL = "ssl";
Modified: qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java?rev=1537313&r1=1537312&r2=1537313&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java (original)
+++ qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java Wed Oct 30 21:38:03 2013
@@ -164,4 +164,30 @@ public class BrokerDetailsTest extends T
assertFalse("value should be false", Boolean.valueOf(broker.getProperty(BrokerDetails.OPTIONS_SSL)));
}
+
+ public void testHeartbeatDefaultsToNull() throws Exception
+ {
+ String brokerURL = "tcp://localhost:5672";
+ AMQBrokerDetails broker = new AMQBrokerDetails(brokerURL);
+ assertNull("unexpected default value for " + BrokerDetails.OPTIONS_HEARTBEAT, broker.getProperty(BrokerDetails.OPTIONS_HEARTBEAT));
+ }
+
+ public void testOverriddingHeartbeat() throws Exception
+ {
+ String brokerURL = "tcp://localhost:5672?heartbeat='60'";
+ AMQBrokerDetails broker = new AMQBrokerDetails(brokerURL);
+ assertEquals(60, Integer.parseInt(broker.getProperty(BrokerDetails.OPTIONS_HEARTBEAT)));
+
+ assertEquals(Integer.valueOf(60), broker.buildConnectionSettings().getHeartbeatInterval08());
+ }
+
+ @SuppressWarnings("deprecation")
+ public void testLegacyHeartbeat() throws Exception
+ {
+ String brokerURL = "tcp://localhost:5672?idle_timeout='60000'";
+ AMQBrokerDetails broker = new AMQBrokerDetails(brokerURL);
+ assertEquals(60000, Integer.parseInt(broker.getProperty(BrokerDetails.OPTIONS_IDLE_TIMEOUT)));
+
+ assertEquals(Integer.valueOf(60), broker.buildConnectionSettings().getHeartbeatInterval08());
+ }
}
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/Accessor.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/Accessor.java?rev=1537313&r1=1537312&r2=1537313&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/Accessor.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/Accessor.java Wed Oct 30 21:38:03 2013
@@ -20,13 +20,7 @@
*/
package org.apache.qpid.configuration;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
import java.util.Map;
-import java.util.Properties;
public interface Accessor
{
@@ -34,6 +28,7 @@ public interface Accessor
public Integer getInt(String name);
public Long getLong(String name);
public String getString(String name);
+ public Float getFloat(String name);
static class SystemPropertyAccessor implements Accessor
{
@@ -56,6 +51,11 @@ public interface Accessor
{
return System.getProperty(name);
}
+
+ public Float getFloat(String name)
+ {
+ return System.getProperty(name) == null ? null : Float.parseFloat(System.getProperty(name));
+ }
}
static class MapAccessor implements Accessor
@@ -147,132 +147,24 @@ public interface Accessor
return null;
}
}
- }
-
- static class PropertyFileAccessor extends MapAccessor
- {
- public PropertyFileAccessor(String fileName) throws FileNotFoundException, IOException
- {
- super(null);
- Properties props = new Properties();
- FileInputStream inStream = new FileInputStream(fileName);
- try
- {
- props.load(inStream);
- }
- finally
- {
- inStream.close();
- }
- setSource(props);
- }
-
-
- }
-
- static class CombinedAccessor implements Accessor
- {
- private List<Accessor> accessors;
-
- public CombinedAccessor(Accessor...accessors)
- {
- this.accessors = Arrays.asList(accessors);
- }
-
- public Boolean getBoolean(String name)
- {
- for (Accessor accessor: accessors)
- {
- if (accessor.getBoolean(name) != null)
- {
- return accessor.getBoolean(name);
- }
- }
- return null;
- }
- public Integer getInt(String name)
- {
- for (Accessor accessor: accessors)
- {
- if (accessor.getBoolean(name) != null)
- {
- return accessor.getInt(name);
- }
- }
- return null;
- }
-
- public Long getLong(String name)
+ public Float getFloat(String name)
{
- for (Accessor accessor: accessors)
+ if (source != null && source.containsKey(name))
{
- if (accessor.getBoolean(name) != null)
+ if (source.get(name) instanceof Float)
{
- return accessor.getLong(name);
+ return (Float)source.get(name);
}
- }
- return null;
- }
-
- public String getString(String name)
- {
- for (Accessor accessor: accessors)
- {
- if (accessor.getBoolean(name) != null)
+ else
{
- return accessor.getString(name);
+ return Float.parseFloat((String)source.get(name));
}
}
- return null;
- }
- }
-
- static class ValidationAccessor implements Accessor
- {
- private List<Validator> validators;
- private Accessor delegate;
-
- public ValidationAccessor(Accessor delegate,Validator...validators)
- {
- this.validators = Arrays.asList(validators);
- this.delegate = delegate;
- }
-
- public Boolean getBoolean(String name)
- {
- // there is nothing to validate in a boolean
- return delegate.getBoolean(name);
- }
-
- public Integer getInt(String name)
- {
- Integer v = delegate.getInt(name);
- for (Validator validator: validators)
- {
- validator.validate(v);
- }
- return v;
- }
-
- public Long getLong(String name)
- {
- Long v = delegate.getLong(name);
- for (Validator validator: validators)
- {
- validator.validate(v);
- }
- return v;
- }
-
- public String getString(String name)
- {
- String v = delegate.getString(name);
- for (Validator validator: validators)
+ else
{
- validator.validate(v);
+ return null;
}
- return v;
}
}
}
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java?rev=1537313&r1=1537312&r2=1537313&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java Wed Oct 30 21:38:03 2013
@@ -63,20 +63,47 @@ public class ClientProperties
public static final String SYNC_PUBLISH_PROP_NAME = "sync_publish";
/**
- * This value will be used in the following settings
- * To calculate the SO_TIMEOUT option of the socket (2*idle_timeout)
- * If this values is between the max and min values specified for heartbeat
- * by the broker in TuneOK it will be used as the heartbeat interval.
- * If not a warning will be printed and the max value specified for
- * heartbeat in TuneOK will be used
- *
- * The default idle timeout is set to 120 secs
+ * Frequency of heartbeat messages (in milliseconds)
+ * @see #QPID_HEARTBEAT_INTERVAL
*/
+ @Deprecated
public static final String IDLE_TIMEOUT_PROP_NAME = "idle_timeout";
- public static final long DEFAULT_IDLE_TIMEOUT = 120000;
- public static final String HEARTBEAT = "qpid.heartbeat";
- public static final int HEARTBEAT_DEFAULT = 120;
+ /**
+ * Frequency of heartbeat messages (in seconds)
+ * @see #QPID_HEARTBEAT_INTERVAL
+ */
+ @Deprecated
+ public static final String AMQJ_HEARTBEAT_DELAY = "amqj.heartbeat.delay";
+
+ /**
+ * Frequency of heartbeat messages (in seconds)
+ */
+ public static final String QPID_HEARTBEAT_INTERVAL = "qpid.heartbeat";
+
+ /**
+ * Default heartbeat interval (used by 0-10 protocol).
+ */
+ public static final int QPID_HEARTBEAT_INTERVAL_010_DEFAULT = 120;
+
+ /**
+ * @see #QPID_HEARTBEAT_TIMEOUT_FACTOR
+ */
+ @Deprecated
+ public static final String AMQJ_HEARTBEAT_TIMEOUT_FACTOR = "amqj.heartbeat.timeoutFactor";
+
+ /**
+ * The factor applied to {@link #QPID_HEARTBEAT_INTERVAL} that determines the maximum
+ * length of time that may elapse before the peer is deemed to have failed.
+ *
+ * @see #QPID_HEARTBEAT_TIMEOUT_FACTOR_DEFAULT
+ */
+ public static final String QPID_HEARTBEAT_TIMEOUT_FACTOR = "qpid.heartbeat_timeout_factor";
+
+ /**
+ * Default heartbeat timeout factor.
+ */
+ public static final float QPID_HEARTBEAT_TIMEOUT_FACTOR_DEFAULT = 2.0f;
/**
* This value will be used to determine the default destination syntax type.
@@ -215,6 +242,8 @@ public class ClientProperties
*/
public static final String SET_EXPIRATION_AS_TTL = "qpid.set_expiration_as_ttl";
+
+
private ClientProperties()
{
//No instances
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/QpidProperty.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/QpidProperty.java?rev=1537313&r1=1537312&r2=1537313&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/QpidProperty.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/QpidProperty.java Wed Oct 30 21:38:03 2013
@@ -102,6 +102,11 @@ public abstract class QpidProperty<T>
return new QpidStringProperty(accessor,defaultValue, names);
}
+ public static QpidProperty<Float> floatProperty(Float defaultValue, String... names)
+ {
+ return new QpidFloatProperty(defaultValue, names);
+ }
+
protected Accessor getAccessor()
{
return accessor;
@@ -183,4 +188,23 @@ public abstract class QpidProperty<T>
}
}
+ static class QpidFloatProperty extends QpidProperty<Float>
+ {
+ QpidFloatProperty(Float defValue, String... names)
+ {
+ super(defValue, names);
+ }
+
+ QpidFloatProperty(Accessor accessor,Float defValue, String... names)
+ {
+ super(accessor,defValue, names);
+ }
+
+ @Override
+ protected Float getByName(String name)
+ {
+ return getAccessor().getFloat(name);
+ }
+ }
+
}
\ No newline at end of file
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java?rev=1537313&r1=1537312&r2=1537313&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java Wed Oct 30 21:38:03 2013
@@ -62,6 +62,5 @@ public interface AMQVersionAwareProtocol
public void setSender(Sender<ByteBuffer> sender);
- public void init();
}
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java?rev=1537313&r1=1537312&r2=1537313&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java Wed Oct 30 21:38:03 2013
@@ -133,15 +133,17 @@ public class ClientDelegate extends Conn
@Override
public void connectionTune(Connection conn, ConnectionTune tune)
{
- int hb_interval = calculateHeartbeatInterval(_connectionSettings.getHeartbeatInterval(),
- tune.getHeartbeatMin(),
- tune.getHeartbeatMax()
- );
+ int heartbeatInterval = _connectionSettings.getHeartbeatInterval010();
+ float heartbeatTimeoutFactor = _connectionSettings.getHeartbeatTimeoutFactor();
+ int actualHeartbeatInterval = calculateHeartbeatInterval(heartbeatInterval,
+ tune.getHeartbeatMin(),
+ tune.getHeartbeatMax());
conn.connectionTuneOk(tune.getChannelMax(),
tune.getMaxFrameSize(),
- hb_interval);
- // The idle timeout is twice the heartbeat amount (in milisecs)
- conn.setIdleTimeout(hb_interval*1000*2);
+ actualHeartbeatInterval);
+
+ int idleTimeout = (int)(actualHeartbeatInterval * 1000 * heartbeatTimeoutFactor);
+ conn.setIdleTimeout(idleTimeout);
int channelMax = tune.getChannelMax();
//0 means no implied limit, except available server resources
@@ -184,7 +186,7 @@ public class ClientDelegate extends Conn
int i = heartbeat;
if (i == 0)
{
- log.info("Idle timeout is 0 sec. Heartbeats are disabled.");
+ log.info("Heartbeat interval is 0 sec. Heartbeats are disabled.");
return 0; // heartbeats are disabled.
}
else if (i >= min && i <= max)
@@ -193,7 +195,7 @@ public class ClientDelegate extends Conn
}
else
{
- log.info("The broker does not support the configured connection idle timeout of %s sec," +
+ log.info("The broker does not support the configured connection heartbeat interval of %s sec," +
" using the brokers max supported value of %s sec instead.", i,max);
return max;
}
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java?rev=1537313&r1=1537312&r2=1537313&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java Wed Oct 30 21:38:03 2013
@@ -20,6 +20,13 @@
*/
package org.apache.qpid.transport;
+import static org.apache.qpid.configuration.ClientProperties.AMQJ_HEARTBEAT_DELAY;
+import static org.apache.qpid.configuration.ClientProperties.AMQJ_HEARTBEAT_TIMEOUT_FACTOR;
+import static org.apache.qpid.configuration.ClientProperties.IDLE_TIMEOUT_PROP_NAME;
+import static org.apache.qpid.configuration.ClientProperties.QPID_HEARTBEAT_INTERVAL;
+import static org.apache.qpid.configuration.ClientProperties.QPID_HEARTBEAT_INTERVAL_010_DEFAULT;
+import static org.apache.qpid.configuration.ClientProperties.QPID_HEARTBEAT_TIMEOUT_FACTOR;
+import static org.apache.qpid.configuration.ClientProperties.QPID_HEARTBEAT_TIMEOUT_FACTOR_DEFAULT;
import static org.apache.qpid.configuration.ClientProperties.AMQJ_TCP_NODELAY_PROP_NAME;
import static org.apache.qpid.configuration.ClientProperties.QPID_SSL_KEY_MANAGER_FACTORY_ALGORITHM_PROP_NAME;
import static org.apache.qpid.configuration.ClientProperties.QPID_SSL_KEY_STORE_CERT_TYPE_PROP_NAME;
@@ -50,6 +57,7 @@ public class ConnectionSettings
{
public static final String WILDCARD_ADDRESS = "*";
+
private String protocol = "tcp";
private String host = "localhost";
private String vhost;
@@ -59,7 +67,9 @@ public class ConnectionSettings
private boolean tcpNodelay = QpidProperty.booleanProperty(Boolean.TRUE, QPID_TCP_NODELAY_PROP_NAME, AMQJ_TCP_NODELAY_PROP_NAME).get();
private int maxChannelCount = 32767;
private int maxFrameSize = 65535;
- private int heartbeatInterval;
+ private Integer hearbeatIntervalLegacyMs = QpidProperty.intProperty(null, IDLE_TIMEOUT_PROP_NAME).get();
+ private Integer heartbeatInterval = QpidProperty.intProperty(null, QPID_HEARTBEAT_INTERVAL, AMQJ_HEARTBEAT_DELAY).get();
+ private float heartbeatTimeoutFactor = QpidProperty.floatProperty(QPID_HEARTBEAT_TIMEOUT_FACTOR_DEFAULT, QPID_HEARTBEAT_TIMEOUT_FACTOR, AMQJ_HEARTBEAT_TIMEOUT_FACTOR).get();
private int connectTimeout = 30000;
private int readBufferSize = QpidProperty.intProperty(65535, RECEIVE_BUFFER_SIZE_PROP_NAME, LEGACY_RECEIVE_BUFFER_SIZE_PROP_NAME).get();
private int writeBufferSize = QpidProperty.intProperty(65535, SEND_BUFFER_SIZE_PROP_NAME, LEGACY_SEND_BUFFER_SIZE_PROP_NAME).get();;
@@ -95,9 +105,45 @@ public class ConnectionSettings
this.tcpNodelay = tcpNodelay;
}
- public int getHeartbeatInterval()
- {
- return heartbeatInterval;
+ /**
+ * Gets the heartbeat interval (seconds) for 0-8/9/9-1 protocols.
+ * 0 means heartbeating is disabled.
+ * null means use the broker-supplied value.
+ */
+ public Integer getHeartbeatInterval08()
+ {
+ if (heartbeatInterval != null)
+ {
+ return heartbeatInterval;
+ }
+ else if (hearbeatIntervalLegacyMs != null)
+ {
+ return hearbeatIntervalLegacyMs / 1000;
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ /**
+ * Gets the heartbeat interval (seconds) for the 0-10 protocol.
+ * 0 means heartbeating is disabled.
+ */
+ public int getHeartbeatInterval010()
+ {
+ if (heartbeatInterval != null)
+ {
+ return heartbeatInterval;
+ }
+ else if (hearbeatIntervalLegacyMs != null)
+ {
+ return hearbeatIntervalLegacyMs / 1000;
+ }
+ else
+ {
+ return QPID_HEARTBEAT_INTERVAL_010_DEFAULT;
+ }
}
public void setHeartbeatInterval(int heartbeatInterval)
@@ -105,6 +151,11 @@ public class ConnectionSettings
this.heartbeatInterval = heartbeatInterval;
}
+ public float getHeartbeatTimeoutFactor()
+ {
+ return this.heartbeatTimeoutFactor;
+ }
+
public String getProtocol()
{
return protocol;
@@ -374,4 +425,5 @@ public class ConnectionSettings
{
this.writeBufferSize = writeBufferSize;
}
+
}
Modified: qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/configuration/QpidPropertyTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/configuration/QpidPropertyTest.java?rev=1537313&r1=1537312&r2=1537313&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/configuration/QpidPropertyTest.java (original)
+++ qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/configuration/QpidPropertyTest.java Wed Oct 30 21:38:03 2013
@@ -145,6 +145,25 @@ public class QpidPropertyTest extends Qp
assertEquals(expectedValue, propertyValue);
}
+ public void testFloatValueReadFromSystemProperty() throws Exception
+ {
+ float expectedValue = 1.5f;
+ setTestSystemProperty(_systemPropertyName, Float.valueOf(expectedValue).toString());
+ assertSystemPropertiesSet(_systemPropertyName);
+
+ float propertyValue = QpidProperty.floatProperty(1.5f, _systemPropertyName).get();
+ assertEquals(expectedValue, propertyValue, 0.1);
+ }
+
+ public void testFloatValueIsDefaultWhenOneSystemPropertyIsNotSet() throws Exception
+ {
+ float expectedValue = 1.5f;
+ assertSystemPropertiesNotSet(_systemPropertyName);
+
+ float propertyValue = QpidProperty.floatProperty(expectedValue, _systemPropertyName).get();
+ assertEquals(expectedValue, propertyValue, 0.1);
+ }
+
private void assertSystemPropertiesSet(String... systemPropertyNames)
{
for (String systemPropertyName : systemPropertyNames)
Modified: qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionSettingsTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionSettingsTest.java?rev=1537313&r1=1537312&r2=1537313&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionSettingsTest.java (original)
+++ qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionSettingsTest.java Wed Oct 30 21:38:03 2013
@@ -128,11 +128,48 @@ public class ConnectionSettingsTest exte
}
@SuppressWarnings("deprecation")
- public void testtestReceiveBufferSizeOverriddenLegacyOverridden()
+ public void testReceiveBufferSizeOverriddenLegacyOverridden()
{
systemPropertyOverrideForSocketBufferSize(ClientProperties.LEGACY_RECEIVE_BUFFER_SIZE_PROP_NAME, 1024, true);
}
+ public void testHeartbeatingDefaults()
+ {
+ assertNull(_conConnectionSettings.getHeartbeatInterval08());
+ assertEquals(ClientProperties.QPID_HEARTBEAT_INTERVAL_010_DEFAULT,_conConnectionSettings.getHeartbeatInterval010());
+ assertEquals(2.0, _conConnectionSettings.getHeartbeatTimeoutFactor(), 0.1);
+ }
+
+ public void testHeartbeatingOverridden()
+ {
+ resetSystemProperty(ClientProperties.QPID_HEARTBEAT_INTERVAL, "60");
+ resetSystemProperty(ClientProperties.QPID_HEARTBEAT_TIMEOUT_FACTOR, "2.5");
+
+ assertEquals(Integer.valueOf(60), _conConnectionSettings.getHeartbeatInterval08());
+ assertEquals(60, _conConnectionSettings.getHeartbeatInterval010());
+ assertEquals(2.5, _conConnectionSettings.getHeartbeatTimeoutFactor(), 0.1);
+ }
+
+ @SuppressWarnings("deprecation")
+ public void testHeartbeatingOverriddenUsingAmqjLegacyOption()
+ {
+ resetSystemProperty(ClientProperties.AMQJ_HEARTBEAT_DELAY, "30");
+ resetSystemProperty(ClientProperties.AMQJ_HEARTBEAT_TIMEOUT_FACTOR, "1.5");
+
+ assertEquals(Integer.valueOf(30), _conConnectionSettings.getHeartbeatInterval08());
+ assertEquals(30, _conConnectionSettings.getHeartbeatInterval010());
+ assertEquals(1.5, _conConnectionSettings.getHeartbeatTimeoutFactor(), 0.1);
+ }
+
+ @SuppressWarnings("deprecation")
+ public void testHeartbeatingOverriddenUsingOlderLegacyOption()
+ {
+ resetSystemProperty(ClientProperties.IDLE_TIMEOUT_PROP_NAME, "30000");
+
+ assertEquals(Integer.valueOf(30), _conConnectionSettings.getHeartbeatInterval08());
+ assertEquals(30, _conConnectionSettings.getHeartbeatInterval010());
+ }
+
private void systemPropertyOverrideForTcpDelay(String propertyName, boolean value)
{
resetSystemProperty(propertyName, String.valueOf(value));
Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/HeartbeatTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/HeartbeatTest.java?rev=1537313&r1=1537312&r2=1537313&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/HeartbeatTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/HeartbeatTest.java Wed Oct 30 21:38:03 2013
@@ -18,49 +18,86 @@
*/
package org.apache.qpid.client;
+import static org.apache.qpid.configuration.ClientProperties.AMQJ_HEARTBEAT_DELAY;
+import static org.apache.qpid.configuration.ClientProperties.IDLE_TIMEOUT_PROP_NAME;
+import static org.apache.qpid.configuration.ClientProperties.QPID_HEARTBEAT_INTERVAL;
+
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
+
+import org.apache.qpid.server.model.Broker;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
public class HeartbeatTest extends QpidBrokerTestCase
{
- public void testHeartbeats() throws Exception
+ private static final String CONNECTION_URL_WITH_HEARTBEAT = "amqp://guest:guest@clientid/?brokerlist='localhost:%d?heartbeat='%d''";
+ private TestListener _listener = new TestListener();
+
+ @Override
+ public void setUp() throws Exception
+ {
+ if (getName().equals("testHeartbeatsEnabledBrokerSide"))
+ {
+ getBrokerConfiguration().setBrokerAttribute(Broker.CONNECTION_HEART_BEAT_DELAY, "1");
+ }
+ super.setUp();
+ }
+
+ public void testHeartbeatsEnabledUsingUrl() throws Exception
+ {
+ final String url = String.format(CONNECTION_URL_WITH_HEARTBEAT, DEFAULT_PORT, 1);
+ AMQConnection conn = (AMQConnection) getConnection(new AMQConnectionURL(url));
+ conn.setHeartbeatListener(_listener);
+ conn.start();
+
+ Thread.sleep(2500);
+
+ assertTrue("Too few heartbeats received: "+_listener._heartbeatsReceived+" (expected at least 2)", _listener._heartbeatsReceived>=2);
+ assertTrue("Too few heartbeats sent "+_listener._heartbeatsSent+" (expected at least 2)", _listener._heartbeatsSent>=2);
+
+ conn.close();
+ }
+
+ public void testHeartbeatsEnabledUsingSystemProperty() throws Exception
{
- setTestSystemProperty("amqj.heartbeat.delay", "1");
+ setTestSystemProperty(QPID_HEARTBEAT_INTERVAL, "1");
AMQConnection conn = (AMQConnection) getConnection();
- TestListener listener = new TestListener();
- conn.setHeartbeatListener(listener);
+ conn.setHeartbeatListener(_listener);
conn.start();
Thread.sleep(2500);
- assertTrue("Too few heartbeats received: "+listener._heartbeatsReceived+" (expected at least 2)", listener._heartbeatsReceived>=2);
- assertTrue("Too few heartbeats sent "+listener._heartbeatsSent+" (expected at least 2)", listener._heartbeatsSent>=2);
+ assertTrue("Too few heartbeats received: "+_listener._heartbeatsReceived+" (expected at least 2)", _listener._heartbeatsReceived>=2);
+ assertTrue("Too few heartbeats sent "+_listener._heartbeatsSent+" (expected at least 2)", _listener._heartbeatsSent>=2);
conn.close();
}
- public void testNoHeartbeats() throws Exception
+ public void testHeartbeatsDisabledUsingSystemProperty() throws Exception
{
- setTestSystemProperty("amqj.heartbeat.delay", "0");
+ setTestSystemProperty(QPID_HEARTBEAT_INTERVAL, "0");
AMQConnection conn = (AMQConnection) getConnection();
- TestListener listener = new TestListener();
- conn.setHeartbeatListener(listener);
+ conn.setHeartbeatListener(_listener);
conn.start();
Thread.sleep(2500);
- assertEquals("Heartbeats unexpectedly received", 0, listener._heartbeatsReceived);
- assertEquals("Heartbeats unexpectedly sent ", 0, listener._heartbeatsSent);
+ assertEquals("Heartbeats unexpectedly received", 0, _listener._heartbeatsReceived);
+ assertEquals("Heartbeats unexpectedly sent ", 0, _listener._heartbeatsSent);
conn.close();
}
- public void testReadOnlyConnectionHeartbeats() throws Exception
+ /**
+ * This test carefully arranges message flow so that bytes flow only from producer to broker
+ * on the producer side and broker to consumer on the consumer side, deliberately leaving the
+ * reverse path quiet so heartbeats will flow.
+ */
+ public void testUnidirectionalHeartbeating() throws Exception
{
- setTestSystemProperty("amqj.heartbeat.delay","1");
+ setTestSystemProperty(QPID_HEARTBEAT_INTERVAL,"1");
AMQConnection receiveConn = (AMQConnection) getConnection();
AMQConnection sendConn = (AMQConnection) getConnection();
Destination destination = getTestQueue();
@@ -83,10 +120,9 @@ public class HeartbeatTest extends QpidB
producer.send(senderSession.createTextMessage("Msg " + i));
Thread.sleep(500);
assertNotNull("Expected to received message", consumer.receive(500));
+ // Consumer does not ack the message in order to generate no bytes from consumer back to Broker
}
-
-
assertTrue("Too few heartbeats sent "+receiveListener._heartbeatsSent+" (expected at least 2)", receiveListener._heartbeatsSent>=2);
assertEquals("Unexpected sent at the sender: ",0,sendListener._heartbeatsSent);
@@ -97,6 +133,54 @@ public class HeartbeatTest extends QpidB
sendConn.close();
}
+ public void testHeartbeatsEnabledBrokerSide() throws Exception
+ {
+
+ AMQConnection conn = (AMQConnection) getConnection();
+ conn.setHeartbeatListener(_listener);
+ conn.start();
+
+ Thread.sleep(2500);
+
+ assertTrue("Too few heartbeats received: "+_listener._heartbeatsReceived+" (expected at least 2)", _listener._heartbeatsReceived>=2);
+ assertTrue("Too few heartbeats sent "+_listener._heartbeatsSent+" (expected at least 2)", _listener._heartbeatsSent>=2);
+
+ conn.close();
+ }
+
+
+ @SuppressWarnings("deprecation")
+ public void testHeartbeatsEnabledUsingAmqjLegacySystemProperty() throws Exception
+ {
+ setTestSystemProperty(AMQJ_HEARTBEAT_DELAY, "1");
+ AMQConnection conn = (AMQConnection) getConnection();
+ conn.setHeartbeatListener(_listener);
+ conn.start();
+
+ Thread.sleep(2500);
+
+ assertTrue("Too few heartbeats received: "+_listener._heartbeatsReceived+" (expected at least 2)", _listener._heartbeatsReceived>=2);
+ assertTrue("Too few heartbeats sent "+_listener._heartbeatsSent+" (expected at least 2)", _listener._heartbeatsSent>=2);
+
+ conn.close();
+ }
+
+ @SuppressWarnings("deprecation")
+ public void testHeartbeatsEnabledUsingOlderLegacySystemProperty() throws Exception
+ {
+ setTestSystemProperty(IDLE_TIMEOUT_PROP_NAME, "1000");
+ AMQConnection conn = (AMQConnection) getConnection();
+ conn.setHeartbeatListener(_listener);
+ conn.start();
+
+ Thread.sleep(2500);
+
+ assertTrue("Too few heartbeats received: "+_listener._heartbeatsReceived+" (expected at least 2)", _listener._heartbeatsReceived>=2);
+ assertTrue("Too few heartbeats sent "+_listener._heartbeatsSent+" (expected at least 2)", _listener._heartbeatsSent>=2);
+
+ conn.close();
+ }
+
private class TestListener implements HeartbeatListener
{
int _heartbeatsReceived;
Modified: qpid/trunk/qpid/java/test-profiles/CPPExcludes
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/CPPExcludes?rev=1537313&r1=1537312&r2=1537313&view=diff
==============================================================================
--- qpid/trunk/qpid/java/test-profiles/CPPExcludes (original)
+++ qpid/trunk/qpid/java/test-profiles/CPPExcludes Wed Oct 30 21:38:03 2013
@@ -186,7 +186,8 @@ org.apache.qpid.client.ssl.SSLTest#testC
// QPID-2796 : Java 0-10 client only sends heartbeats in response to heartbeats from the server, not timeout based
-org.apache.qpid.client.HeartbeatTest#testReadOnlyConnectionHeartbeats
+org.apache.qpid.client.HeartbeatTest#testUnidirectionalHeartbeating
+org.apache.qpid.client.HeartbeatTest#testHeartbeatsEnabledBrokerSide
// Exclude java broker specific behavior allowing queue re-bind to topic exchanges on 0.8/0-10 paths
org.apache.qpid.server.queue.QueueBindTest#testQueueCanBeReboundOnTopicExchange
Modified: qpid/trunk/qpid/java/test-profiles/Java010Excludes
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/Java010Excludes?rev=1537313&r1=1537312&r2=1537313&view=diff
==============================================================================
--- qpid/trunk/qpid/java/test-profiles/Java010Excludes (original)
+++ qpid/trunk/qpid/java/test-profiles/Java010Excludes Wed Oct 30 21:38:03 2013
@@ -68,7 +68,8 @@ org.apache.qpid.client.failover.AddressB
org.apache.qpid.client.AsynchMessageListenerTest#testImmediatePrefetchWithMessageListener
// QPID-2796 : Java 0-10 client only sends heartbeats in response to heartbeats from the server, not timeout based
-org.apache.qpid.client.HeartbeatTest#testReadOnlyConnectionHeartbeats
+org.apache.qpid.client.HeartbeatTest#testUnidirectionalHeartbeating
+org.apache.qpid.client.HeartbeatTest#testHeartbeatsEnabledBrokerSide
// Java 0-10 client does not support re-binding the queue to the same exchange
org.apache.qpid.server.queue.QueueBindTest#testQueueCanBeReboundOnTopicExchange
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org