You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2012/11/26 11:10:27 UTC

svn commit: r1413539 - in /qpid/trunk/qpid/java: client/src/main/java/org/apache/qpid/client/ client/src/main/java/org/apache/qpid/client/protocol/ client/src/main/java/org/apache/qpid/client/transport/ systests/src/main/java/org/apache/qpid/client/

Author: rgodfrey
Date: Mon Nov 26 10:10:26 2012
New Revision: 1413539

URL: http://svn.apache.org/viewvc?rev=1413539&view=rev
Log:
QPID-2796 : Added Java system test for heartbeating

Added:
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/HeartbeatListener.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/HeartbeatTest.java
Removed:
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/HeartbeatDiagnostics.java
Modified:
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=1413539&r1=1413538&r2=1413539&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Mon Nov 26 10:10:26 2012
@@ -1560,4 +1560,9 @@ public class AMQConnection extends Close
                          + localAddress + " to " + remoteAddress);
         }
     }
+
+    void setHeartbeatListener(HeartbeatListener listener)
+    {
+        _delegate.setHeartbeatListener(listener);
+    }
 }

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java?rev=1413539&r1=1413538&r2=1413539&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java Mon Nov 26 10:10:26 2012
@@ -78,4 +78,6 @@ public interface AMQConnectionDelegate
      * @return true if the feature is supported by the server
      */
     boolean isSupportedServerFeature(final String featureName);
+
+    void setHeartbeatListener(HeartbeatListener listener);
 }

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=1413539&r1=1413538&r2=1413539&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java Mon Nov 26 10:10:26 2012
@@ -422,6 +422,12 @@ public class AMQConnectionDelegate_0_10 
         return featureSupported;
     }
 
+    @Override
+    public void setHeartbeatListener(HeartbeatListener listener)
+    {
+        ((ClientConnectionDelegate)(_qpidConnection.getConnectionDelegate())).setHeartbeatListener(listener);
+    }
+
     private ConnectionSettings retrieveConnectionSettings(BrokerDetails brokerDetail)
     {
         ConnectionSettings conSettings = brokerDetail.buildConnectionSettings();

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java?rev=1413539&r1=1413538&r2=1413539&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java Mon Nov 26 10:10:26 2012
@@ -378,4 +378,10 @@ public class AMQConnectionDelegate_8_0 i
         // we just hardcode JMS selectors as supported.
         return ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR.equals(featureName);
     }
+
+    @Override
+    public void setHeartbeatListener(HeartbeatListener listener)
+    {
+        _conn.getProtocolHandler().setHeartbeatListener(listener);
+    }
 }

Added: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/HeartbeatListener.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/HeartbeatListener.java?rev=1413539&view=auto
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/HeartbeatListener.java (added)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/HeartbeatListener.java Mon Nov 26 10:10:26 2012
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.client;
+
+public interface HeartbeatListener
+{
+    void heartbeatReceived();
+
+    void heartbeatSent();
+
+    static final HeartbeatListener DEFAULT = new HeartbeatListener()
+    {
+        public void heartbeatReceived()
+        {
+        }
+
+        public void heartbeatSent()
+        {
+        }
+    };
+}

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=1413539&r1=1413538&r2=1413539&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Mon Nov 26 10:10:26 2012
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.client.protocol;
 
+import org.apache.qpid.client.HeartbeatListener;
 import org.apache.qpid.util.BytesDataOutput;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -180,6 +181,7 @@ public class AMQProtocolHandler implemen
     private Sender<ByteBuffer> _sender;
     private long _lastReadTime = System.currentTimeMillis();
     private long _lastWriteTime = System.currentTimeMillis();
+    private HeartbeatListener _heartbeatListener = HeartbeatListener.DEFAULT;
 
     /**
      * Creates a new protocol handler, associated with the specified client connection instance.
@@ -302,7 +304,6 @@ public class AMQProtocolHandler implemen
     {
         _logger.debug("Protocol Session [" + this + "] idle: reader");
         //  failover:
-        HeartbeatDiagnostics.timeout();
         _logger.warn("Timed out while waiting for heartbeat from peer.");
         _network.close();
     }
@@ -311,7 +312,7 @@ public class AMQProtocolHandler implemen
     {
         _logger.debug("Protocol Session [" + this + "] idle: reader");
         writeFrame(HeartbeatBody.FRAME);
-        HeartbeatDiagnostics.sent();
+        _heartbeatListener.heartbeatSent();
     }
 
     /**
@@ -473,8 +474,6 @@ public class AMQProtocolHandler implemen
 
                         final AMQBody bodyFrame = frame.getBodyFrame();
 
-                        HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
-
                         bodyFrame.handle(frame.getChannel(), _protocolSession);
 
                         _connection.bytesReceived(_readBytes);
@@ -910,7 +909,6 @@ public class AMQProtocolHandler implemen
         {
             _network.setMaxWriteIdle(delay);
             _network.setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay));
-            HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay));
         }
     }
 
@@ -925,5 +923,13 @@ public class AMQProtocolHandler implemen
     }
 
 
+    public void setHeartbeatListener(HeartbeatListener listener)
+    {
+        _heartbeatListener = listener == null ? HeartbeatListener.DEFAULT : listener;
+    }
 
+    public void heartbeatBodyReceived()
+    {
+        _heartbeatListener.heartbeatReceived();
+    }
 }

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?rev=1413539&r1=1413538&r2=1413539&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Mon Nov 26 10:10:26 2012
@@ -267,7 +267,7 @@ public class AMQProtocolSession implemen
 
     public void heartbeatBodyReceived(int channelId, HeartbeatBody body) throws AMQException
     {
-
+        _protocolHandler.heartbeatBodyReceived();
     }
 
     /**

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java?rev=1413539&r1=1413538&r2=1413539&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java Mon Nov 26 10:10:26 2012
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.client.transport;
 
+import org.apache.qpid.client.HeartbeatListener;
+import org.apache.qpid.transport.ConnectionHeartbeat;
 import org.ietf.jgss.GSSContext;
 import org.ietf.jgss.GSSException;
 import org.ietf.jgss.GSSManager;
@@ -70,6 +72,7 @@ public class ClientConnectionDelegate ex
     }
 
     private final ConnectionURL _connectionURL;
+    private HeartbeatListener _heartbeatListener = HeartbeatListener.DEFAULT;
 
     /**
      * @param settings
@@ -165,4 +168,19 @@ public class ClientConnectionDelegate ex
 
         return null;
     }
+
+    @Override
+    public void connectionHeartbeat(Connection conn, ConnectionHeartbeat hearbeat)
+    {
+        // ClientDelegate simply responds to heartbeats with heartbeats
+        _heartbeatListener.heartbeatReceived();
+        super.connectionHeartbeat(conn, hearbeat);
+        _heartbeatListener.heartbeatSent();
+    }
+
+
+    public void setHeartbeatListener(HeartbeatListener listener)
+    {
+        _heartbeatListener = listener == null ? HeartbeatListener.DEFAULT : listener;
+    }
 }

Added: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/HeartbeatTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/HeartbeatTest.java?rev=1413539&view=auto
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/HeartbeatTest.java (added)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/HeartbeatTest.java Mon Nov 26 10:10:26 2012
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.client;
+
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+public class HeartbeatTest extends QpidBrokerTestCase
+{
+    public void testHeartbeats() throws Exception
+    {
+        setTestSystemProperty("amqj.heartbeat.delay","1");
+        AMQConnection conn = (AMQConnection) getConnection();
+        TestListener listener = new TestListener();
+        conn.setHeartbeatListener(listener);
+        conn.start();
+
+        Thread.sleep(2500);
+
+        assertTrue("Too few heartbeats received: "+listener._heartbeatsReceived+" (expected at least 2)", listener._heartbeatsReceived>=2);
+        assertTrue("Too few heartbeats sent "+listener._heartbeatsSent+" (expected at least 2)", listener._heartbeatsSent>=2);
+
+        conn.close();
+    }
+
+    public void testNoHeartbeats() throws Exception
+    {
+         setTestSystemProperty("amqj.heartbeat.delay","0");
+         AMQConnection conn = (AMQConnection) getConnection();
+         TestListener listener = new TestListener();
+         conn.setHeartbeatListener(listener);
+         conn.start();
+
+         Thread.sleep(2500);
+
+         assertEquals("Heartbeats unexpectedly received", 0, listener._heartbeatsReceived);
+         assertEquals("Heartbeats unexpectedly sent ", 0, listener._heartbeatsSent);
+
+         conn.close();
+    }
+
+    private class TestListener implements HeartbeatListener
+    {
+        int _heartbeatsReceived;
+        int _heartbeatsSent;
+        @Override
+        public void heartbeatReceived()
+        {
+            _heartbeatsReceived++;
+        }
+
+        @Override
+        public void heartbeatSent()
+        {
+            _heartbeatsSent++;
+        }
+    }
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org