You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2012/11/26 11:10:27 UTC
svn commit: r1413539 - in /qpid/trunk/qpid/java:
client/src/main/java/org/apache/qpid/client/
client/src/main/java/org/apache/qpid/client/protocol/
client/src/main/java/org/apache/qpid/client/transport/
systests/src/main/java/org/apache/qpid/client/
Author: rgodfrey
Date: Mon Nov 26 10:10:26 2012
New Revision: 1413539
URL: http://svn.apache.org/viewvc?rev=1413539&view=rev
Log:
QPID-2796 : Added Java system test for heartbeating
Added:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/HeartbeatListener.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/HeartbeatTest.java
Removed:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatDiagnostics.java
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=1413539&r1=1413538&r2=1413539&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Mon Nov 26 10:10:26 2012
@@ -1560,4 +1560,9 @@ public class AMQConnection extends Close
+ localAddress + " to " + remoteAddress);
}
}
+
+ void setHeartbeatListener(HeartbeatListener listener)
+ {
+ _delegate.setHeartbeatListener(listener);
+ }
}
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java?rev=1413539&r1=1413538&r2=1413539&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java Mon Nov 26 10:10:26 2012
@@ -78,4 +78,6 @@ public interface AMQConnectionDelegate
* @return true if the feature is supported by the server
*/
boolean isSupportedServerFeature(final String featureName);
+
+ void setHeartbeatListener(HeartbeatListener listener);
}
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=1413539&r1=1413538&r2=1413539&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java Mon Nov 26 10:10:26 2012
@@ -422,6 +422,12 @@ public class AMQConnectionDelegate_0_10
return featureSupported;
}
+ @Override
+ public void setHeartbeatListener(HeartbeatListener listener)
+ {
+ ((ClientConnectionDelegate)(_qpidConnection.getConnectionDelegate())).setHeartbeatListener(listener);
+ }
+
private ConnectionSettings retrieveConnectionSettings(BrokerDetails brokerDetail)
{
ConnectionSettings conSettings = brokerDetail.buildConnectionSettings();
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java?rev=1413539&r1=1413538&r2=1413539&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java Mon Nov 26 10:10:26 2012
@@ -378,4 +378,10 @@ public class AMQConnectionDelegate_8_0 i
// we just hardcode JMS selectors as supported.
return ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR.equals(featureName);
}
+
+ @Override
+ public void setHeartbeatListener(HeartbeatListener listener)
+ {
+ _conn.getProtocolHandler().setHeartbeatListener(listener);
+ }
}
Added: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/HeartbeatListener.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/HeartbeatListener.java?rev=1413539&view=auto
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/HeartbeatListener.java (added)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/HeartbeatListener.java Mon Nov 26 10:10:26 2012
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.client;
+
+public interface HeartbeatListener
+{
+ void heartbeatReceived();
+
+ void heartbeatSent();
+
+ static final HeartbeatListener DEFAULT = new HeartbeatListener()
+ {
+ public void heartbeatReceived()
+ {
+ }
+
+ public void heartbeatSent()
+ {
+ }
+ };
+}
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=1413539&r1=1413538&r2=1413539&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Mon Nov 26 10:10:26 2012
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.client.protocol;
+import org.apache.qpid.client.HeartbeatListener;
import org.apache.qpid.util.BytesDataOutput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -180,6 +181,7 @@ public class AMQProtocolHandler implemen
private Sender<ByteBuffer> _sender;
private long _lastReadTime = System.currentTimeMillis();
private long _lastWriteTime = System.currentTimeMillis();
+ private HeartbeatListener _heartbeatListener = HeartbeatListener.DEFAULT;
/**
* Creates a new protocol handler, associated with the specified client connection instance.
@@ -302,7 +304,6 @@ public class AMQProtocolHandler implemen
{
_logger.debug("Protocol Session [" + this + "] idle: reader");
// failover:
- HeartbeatDiagnostics.timeout();
_logger.warn("Timed out while waiting for heartbeat from peer.");
_network.close();
}
@@ -311,7 +312,7 @@ public class AMQProtocolHandler implemen
{
_logger.debug("Protocol Session [" + this + "] idle: reader");
writeFrame(HeartbeatBody.FRAME);
- HeartbeatDiagnostics.sent();
+ _heartbeatListener.heartbeatSent();
}
/**
@@ -473,8 +474,6 @@ public class AMQProtocolHandler implemen
final AMQBody bodyFrame = frame.getBodyFrame();
- HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
-
bodyFrame.handle(frame.getChannel(), _protocolSession);
_connection.bytesReceived(_readBytes);
@@ -910,7 +909,6 @@ public class AMQProtocolHandler implemen
{
_network.setMaxWriteIdle(delay);
_network.setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay));
- HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay));
}
}
@@ -925,5 +923,13 @@ public class AMQProtocolHandler implemen
}
+ public void setHeartbeatListener(HeartbeatListener listener)
+ {
+ _heartbeatListener = listener == null ? HeartbeatListener.DEFAULT : listener;
+ }
+ public void heartbeatBodyReceived()
+ {
+ _heartbeatListener.heartbeatReceived();
+ }
}
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?rev=1413539&r1=1413538&r2=1413539&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Mon Nov 26 10:10:26 2012
@@ -267,7 +267,7 @@ public class AMQProtocolSession implemen
public void heartbeatBodyReceived(int channelId, HeartbeatBody body) throws AMQException
{
-
+ _protocolHandler.heartbeatBodyReceived();
}
/**
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java?rev=1413539&r1=1413538&r2=1413539&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java Mon Nov 26 10:10:26 2012
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.client.transport;
+import org.apache.qpid.client.HeartbeatListener;
+import org.apache.qpid.transport.ConnectionHeartbeat;
import org.ietf.jgss.GSSContext;
import org.ietf.jgss.GSSException;
import org.ietf.jgss.GSSManager;
@@ -70,6 +72,7 @@ public class ClientConnectionDelegate ex
}
private final ConnectionURL _connectionURL;
+ private HeartbeatListener _heartbeatListener = HeartbeatListener.DEFAULT;
/**
* @param settings
@@ -165,4 +168,19 @@ public class ClientConnectionDelegate ex
return null;
}
+
+ @Override
+ public void connectionHeartbeat(Connection conn, ConnectionHeartbeat hearbeat)
+ {
+ // ClientDelegate simply responds to heartbeats with heartbeats
+ _heartbeatListener.heartbeatReceived();
+ super.connectionHeartbeat(conn, hearbeat);
+ _heartbeatListener.heartbeatSent();
+ }
+
+
+ public void setHeartbeatListener(HeartbeatListener listener)
+ {
+ _heartbeatListener = listener == null ? HeartbeatListener.DEFAULT : listener;
+ }
}
Added: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/HeartbeatTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/HeartbeatTest.java?rev=1413539&view=auto
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/HeartbeatTest.java (added)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/HeartbeatTest.java Mon Nov 26 10:10:26 2012
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.client;
+
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+public class HeartbeatTest extends QpidBrokerTestCase
+{
+ public void testHeartbeats() throws Exception
+ {
+ setTestSystemProperty("amqj.heartbeat.delay","1");
+ AMQConnection conn = (AMQConnection) getConnection();
+ TestListener listener = new TestListener();
+ conn.setHeartbeatListener(listener);
+ conn.start();
+
+ Thread.sleep(2500);
+
+ assertTrue("Too few heartbeats received: "+listener._heartbeatsReceived+" (expected at least 2)", listener._heartbeatsReceived>=2);
+ assertTrue("Too few heartbeats sent "+listener._heartbeatsSent+" (expected at least 2)", listener._heartbeatsSent>=2);
+
+ conn.close();
+ }
+
+ public void testNoHeartbeats() throws Exception
+ {
+ setTestSystemProperty("amqj.heartbeat.delay","0");
+ AMQConnection conn = (AMQConnection) getConnection();
+ TestListener listener = new TestListener();
+ conn.setHeartbeatListener(listener);
+ conn.start();
+
+ Thread.sleep(2500);
+
+ assertEquals("Heartbeats unexpectedly received", 0, listener._heartbeatsReceived);
+ assertEquals("Heartbeats unexpectedly sent ", 0, listener._heartbeatsSent);
+
+ conn.close();
+ }
+
+ private class TestListener implements HeartbeatListener
+ {
+ int _heartbeatsReceived;
+ int _heartbeatsSent;
+ @Override
+ public void heartbeatReceived()
+ {
+ _heartbeatsReceived++;
+ }
+
+ @Override
+ public void heartbeatSent()
+ {
+ _heartbeatsSent++;
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org