You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2018/01/14 12:14:20 UTC
[3/7] qpid-broker-j git commit: QPID-6933: [System Tests] Move AMQP
0-x HeartbeatTest to Qpid JMS 0-X client
QPID-6933: [System Tests] Move AMQP 0-x HeartbeatTest to Qpid JMS 0-X client
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/48fb97c6
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/48fb97c6
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/48fb97c6
Branch: refs/heads/master
Commit: 48fb97c6f99c615fea46e6f6506d78ce27184315
Parents: 16d2ece
Author: Keith Wall <kw...@apache.org>
Authored: Fri Jan 12 23:32:47 2018 +0000
Committer: Keith Wall <kw...@apache.org>
Committed: Fri Jan 12 23:32:47 2018 +0000
----------------------------------------------------------------------
.../org/apache/qpid/client/HeartbeatTest.java | 326 -------------------
test-profiles/CPPExcludes | 4 -
test-profiles/Java10Excludes | 3 -
3 files changed, 333 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/48fb97c6/systests/src/test/java/org/apache/qpid/client/HeartbeatTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/client/HeartbeatTest.java b/systests/src/test/java/org/apache/qpid/client/HeartbeatTest.java
deleted file mode 100644
index ca221df..0000000
--- a/systests/src/test/java/org/apache/qpid/client/HeartbeatTest.java
+++ /dev/null
@@ -1,326 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.client;
-
-import static org.apache.qpid.configuration.ClientProperties.QPID_HEARTBEAT_INTERVAL;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.Port;
-import org.apache.qpid.server.model.port.AmqpPort;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-import org.apache.qpid.test.utils.TCPTunneler;
-import org.apache.qpid.test.utils.TestBrokerConfiguration;
-
-public class HeartbeatTest extends QpidBrokerTestCase
-{
- private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatTest.class);
-
- private static final String CONNECTION_URL_WITH_HEARTBEAT = "amqp://guest:guest@clientid/?brokerlist='localhost:%d?heartbeat='%d''";
- private static final int MAXIMUM_WAIT_TIME = 2900;
- private TestListener _listener = new TestListener("listener", 2, 2);
-
- @Override
- public void setUp() throws Exception
- {
- if (getName().equals("testHeartbeatsEnabledBrokerSide"))
- {
- getDefaultBrokerConfiguration().setObjectAttribute(Port.class,
- TestBrokerConfiguration.ENTRY_NAME_AMQP_PORT,
- AmqpPort.CONTEXT,
- Collections.singletonMap(AmqpPort.HEART_BEAT_DELAY, "1"));
- }
- super.setUp();
- }
-
- public void testHeartbeatsEnabledUsingUrl() throws Exception
- {
- final String url = String.format(CONNECTION_URL_WITH_HEARTBEAT, getDefaultBroker().getAmqpPort(), 1);
- AMQConnection conn = (AMQConnection) getConnection(url);
- conn.setHeartbeatListener(_listener);
- conn.start();
-
- _listener.awaitExpectedHeartbeats(MAXIMUM_WAIT_TIME);
-
- assertTrue("Too few heartbeats received: "+_listener.getHeartbeatsReceived() +" (expected at least 2)", _listener.getHeartbeatsReceived() >=2);
- assertTrue("Too few heartbeats sent "+_listener.getHeartbeatsSent() +" (expected at least 2)", _listener.getHeartbeatsSent() >=2);
-
- conn.close();
- }
-
- public void testHeartbeatsEnabledUsingSystemProperty() throws Exception
- {
- setTestSystemProperty(QPID_HEARTBEAT_INTERVAL, "1");
- AMQConnection conn = (AMQConnection) getConnection();
- conn.setHeartbeatListener(_listener);
- conn.start();
-
- _listener.awaitExpectedHeartbeats(MAXIMUM_WAIT_TIME);
-
- assertTrue("Too few heartbeats received: "+_listener.getHeartbeatsReceived() +" (expected at least 2)", _listener.getHeartbeatsReceived() >=2);
- assertTrue("Too few heartbeats sent "+_listener.getHeartbeatsSent() +" (expected at least 2)", _listener.getHeartbeatsSent() >=2);
-
- conn.close();
- }
-
- public void testHeartbeatsDisabledUsingSystemProperty() throws Exception
- {
- setTestSystemProperty(QPID_HEARTBEAT_INTERVAL, "0");
- AMQConnection conn = (AMQConnection) getConnection();
- conn.setHeartbeatListener(_listener);
- conn.start();
-
- _listener.awaitExpectedHeartbeats(2000);
-
- assertEquals("Heartbeats unexpectedly received", 0, _listener.getHeartbeatsReceived());
- assertEquals("Heartbeats unexpectedly sent ", 0, _listener.getHeartbeatsSent());
-
- conn.close();
- }
-
- /**
- * This test carefully arranges message flow so that bytes flow only from producer to broker
- * on the producer side and broker to consumer on the consumer side, deliberately leaving the
- * reverse path quiet so heartbeats will flow.
- */
- public void testUnidirectionalHeartbeating() throws Exception
- {
- setTestSystemProperty(QPID_HEARTBEAT_INTERVAL,"1");
- AMQConnection receiveConn = (AMQConnection) getConnection();
- AMQConnection sendConn = (AMQConnection) getConnection();
- Destination destination = getTestQueue();
- TestListener receiveListener = new TestListener("receiverListener", 2, 0);
- TestListener sendListener = new TestListener("senderListener", 0, 2);
-
-
- Session receiveSession = receiveConn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- Session senderSession = sendConn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
- MessageConsumer consumer = receiveSession.createConsumer(destination);
- MessageProducer producer = senderSession.createProducer(destination);
-
- receiveConn.setHeartbeatListener(receiveListener);
- sendConn.setHeartbeatListener(sendListener);
- receiveConn.start();
-
- // Start the flow of messages to the consumer
- consumer.receiveNoWait();
-
- for(int i = 0; i < 5; i++)
- {
- producer.send(senderSession.createTextMessage("Msg " + i));
- Thread.sleep(500);
- assertNotNull("Expected to receive message within " + RECEIVE_TIMEOUT + "ms.", consumer.receive(RECEIVE_TIMEOUT));
- // Consumer does not ack the message in order that no bytes flow from consumer connection back to Broker
- }
-
- assertTrue("Too few heartbeats sent "+ receiveListener.getHeartbeatsSent() +" (expected at least 2)", receiveListener.getHeartbeatsSent()>=2);
- assertEquals("Unexpected number of heartbeats sent by the sender: ",0,sendListener.getHeartbeatsSent());
-
- assertTrue("Too few heartbeats received at the sender "+ sendListener.getHeartbeatsReceived() +" (expected at least 2)", sendListener.getHeartbeatsReceived()>=2);
- assertEquals("Unexpected number of heartbeats received by the receiver: ",0,receiveListener.getHeartbeatsReceived());
-
- receiveConn.close();
- sendConn.close();
- }
-
- public void testHeartbeatsEnabledBrokerSide() throws Exception
- {
-
- AMQConnection conn = (AMQConnection) getConnection();
- conn.setHeartbeatListener(_listener);
- conn.start();
-
- _listener.awaitExpectedHeartbeats(MAXIMUM_WAIT_TIME);
-
- assertTrue("Too few heartbeats received: "+_listener.getHeartbeatsReceived() +" (expected at least 2)", _listener.getHeartbeatsReceived()>=2);
- assertTrue("Too few heartbeats sent "+ _listener.getHeartbeatsSent() +" (expected at least 2)", _listener.getHeartbeatsSent() >=2);
-
- conn.close();
- }
-
-
- @SuppressWarnings("deprecation")
- public void testHeartbeatsEnabledUsingAmqjLegacySystemProperty() throws Exception
- {
- setTestSystemProperty("amqj.heartbeat.delay", "1");
- AMQConnection conn = (AMQConnection) getConnection();
- conn.setHeartbeatListener(_listener);
- conn.start();
-
- _listener.awaitExpectedHeartbeats(MAXIMUM_WAIT_TIME);
-
- assertTrue("Too few heartbeats received: "+_listener.getHeartbeatsReceived()+" (expected at least 2)", _listener.getHeartbeatsReceived()>=2);
- assertTrue("Too few heartbeats sent "+_listener.getHeartbeatsSent() +" (expected at least 2)", _listener.getHeartbeatsSent()>=2);
-
- conn.close();
- }
-
- @SuppressWarnings("deprecation")
- public void testHeartbeatsEnabledUsingOlderLegacySystemProperty() throws Exception
- {
- setTestSystemProperty("idle_timeout", "1000");
- AMQConnection conn = (AMQConnection) getConnection();
- conn.setHeartbeatListener(_listener);
- conn.start();
-
- _listener.awaitExpectedHeartbeats(MAXIMUM_WAIT_TIME);
-
- assertTrue("Too few heartbeats received: "+_listener.getHeartbeatsReceived() +" (expected at least 2)", _listener.getHeartbeatsReceived()>=2);
- assertTrue("Too few heartbeats sent "+_listener.getHeartbeatsSent() +" (expected at least 2)", _listener.getHeartbeatsSent()>=2);
-
- conn.close();
- }
-
- public void testClientStopsSendingHeartbeats_BrokerClosesConnection() throws Exception
- {
- try(TCPTunneler tcpTunneler = new TCPTunneler(getFailingPort(), "localhost", getDefaultBroker().getAmqpPort(), 1))
- {
- tcpTunneler.start();
-
- final AtomicReference<InetSocketAddress> clientAddressRef = new AtomicReference<>();
- tcpTunneler.addClientListener(new TCPTunneler.TunnelListener()
- {
- @Override
- public void clientConnected(final InetSocketAddress clientAddress)
- {
- clientAddressRef.set(clientAddress);
- }
-
- @Override
- public void clientDisconnected(final InetSocketAddress clientAddress)
- {
- }
-
- @Override
- public void notifyClientToServerBytesDelivered(final InetAddress inetAddress,
- final int numberOfBytesForwarded)
- {
-
- }
-
- @Override
- public void notifyServerToClientBytesDelivered(final InetAddress inetAddress,
- final int numberOfBytesForwarded)
- {
-
- }
- });
-
- final CountDownLatch exceptionLatch = new CountDownLatch(1);
- final String url = String.format(CONNECTION_URL_WITH_HEARTBEAT, tcpTunneler.getLocalPort(), 1);
- AMQConnection conn = (AMQConnection) getConnection(url);
- conn.setHeartbeatListener(_listener);
- conn.setExceptionListener(new ExceptionListener()
- {
- @Override
- public void onException(final JMSException exception)
- {
- LOGGER.debug("Exception listener got exception", exception);
- exceptionLatch.countDown();
- }
- });
- conn.start();
-
- assertNotNull(clientAddressRef.get());
-
- _listener.awaitExpectedHeartbeats(MAXIMUM_WAIT_TIME);
-
- assertTrue("Too few heartbeats received: "+_listener.getHeartbeatsReceived() +" (expected at least 2)", _listener.getHeartbeatsReceived() >=2);
- assertTrue("Too few heartbeats sent "+_listener.getHeartbeatsSent() +" (expected at least 2)", _listener.getHeartbeatsSent() >=2);
-
- tcpTunneler.stopClientToServerForwarding(clientAddressRef.get());
-
- exceptionLatch.await(5, TimeUnit.SECONDS);
- assertTrue("Connection should be disconnected within timeout", conn.isConnected());
- }
- }
-
- private class TestListener implements HeartbeatListener
- {
- private final String _name;
- private final AtomicInteger _heartbeatsReceived = new AtomicInteger(0);
- private final AtomicInteger _heartbeatsSent = new AtomicInteger(0);
- private final CountDownLatch _expectedReceivedHeartbeats;
- private final CountDownLatch _expectedSentHeartbeats;
-
- public TestListener(String name, int expectedSentHeartbeats, int expectedReceivedHeartbeats)
- {
- _name = name;
- _expectedReceivedHeartbeats = new CountDownLatch(expectedReceivedHeartbeats);
- _expectedSentHeartbeats = new CountDownLatch(expectedSentHeartbeats);
- }
-
- @Override
- public void heartbeatReceived()
- {
- LOGGER.debug(_name + " heartbeat received");
- _heartbeatsReceived.incrementAndGet();
- _expectedReceivedHeartbeats.countDown();
- }
-
- public int getHeartbeatsReceived()
- {
- return _heartbeatsReceived.get();
- }
-
- @Override
- public void heartbeatSent()
- {
- LOGGER.debug(_name + " heartbeat sent");
- _heartbeatsSent.incrementAndGet();
- _expectedSentHeartbeats.countDown();
- }
-
- public int getHeartbeatsSent()
- {
- return _heartbeatsSent.get();
- }
-
- public void awaitExpectedHeartbeats(final long maximumWaitTime) throws InterruptedException
- {
- long startTime = System.currentTimeMillis();
- _expectedSentHeartbeats.await(maximumWaitTime, TimeUnit.MILLISECONDS);
-
- long remainingTime = maximumWaitTime - (System.currentTimeMillis() - startTime);
- if (remainingTime > 0)
- {
- _expectedReceivedHeartbeats.await(remainingTime, TimeUnit.MILLISECONDS);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/48fb97c6/test-profiles/CPPExcludes
----------------------------------------------------------------------
diff --git a/test-profiles/CPPExcludes b/test-profiles/CPPExcludes
index 5c197e7..50fadaf 100755
--- a/test-profiles/CPPExcludes
+++ b/test-profiles/CPPExcludes
@@ -102,10 +102,6 @@ org.apache.qpid.systest.rest.acl.*
// Exclude failover tests requiring virtual host functionality
org.apache.qpid.client.failover.MultipleBrokersFailoverTest#*
-// QPID-2796 : JMS client for AMQP 0-10 only sends heartbeats in response to heartbeats from the server, not timeout based
-org.apache.qpid.client.HeartbeatTest#testUnidirectionalHeartbeating
-org.apache.qpid.client.HeartbeatTest#testHeartbeatsEnabledBrokerSide
-
// CPP Broker does not timeout connections with no activity like the Qpid Broker-J
org.apache.qpid.transport.ProtocolNegotiationTest#testNoProtocolHeaderSent_BrokerClosesConnection
org.apache.qpid.transport.ProtocolNegotiationTest#testNoConnectionOpenSent_BrokerClosesConnection
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/48fb97c6/test-profiles/Java10Excludes
----------------------------------------------------------------------
diff --git a/test-profiles/Java10Excludes b/test-profiles/Java10Excludes
index be9a0b9..64957e3 100644
--- a/test-profiles/Java10Excludes
+++ b/test-profiles/Java10Excludes
@@ -26,9 +26,6 @@ org.apache.qpid.client.failover.AddressBasedFailoverBehaviourTest#*
// Exclude tests of connection URL failover method property
org.apache.qpid.server.failover.FailoverMethodTest#*
-// This test mainly covers the AMQP 0-x client's heartbeating implementation
-org.apache.qpid.client.HeartbeatTest#*
-
// Uses an 0-x client API to acknowledge up to a particular message rather than the most recent
org.apache.qpid.test.unit.ack.RecoverTest#testRecoverResendsMsgsAckOnEarlier
// Tests the effect of setting the prefetch value
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org