You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by ge...@apache.org on 2018/05/16 11:53:16 UTC
[2/2] oozie git commit: OOZIE-3240 Flaky test
TestJMSAccessorService#testConnectionRetry (pbacsko via gezapeti)
OOZIE-3240 Flaky test TestJMSAccessorService#testConnectionRetry (pbacsko via gezapeti)
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/61c646c3
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/61c646c3
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/61c646c3
Branch: refs/heads/master
Commit: 61c646c332e6129f3502ee235e5e6d28fe55addd
Parents: 117153a
Author: Gezapeti Cseh <ge...@apache.org>
Authored: Wed May 16 13:52:57 2018 +0200
Committer: Gezapeti Cseh <ge...@apache.org>
Committed: Wed May 16 13:52:57 2018 +0200
----------------------------------------------------------------------
.../oozie/service/TestJMSAccessorService.java | 267 ++++++++++---------
release-log.txt | 1 +
2 files changed, 137 insertions(+), 131 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/61c646c3/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java b/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java
index 41241d2..dbf892e 100644
--- a/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java
@@ -19,6 +19,7 @@
package org.apache.oozie.service;
import java.net.URI;
+import java.net.URISyntaxException;
import java.util.Random;
import javax.jms.Session;
@@ -36,6 +37,7 @@ import org.junit.Test;
public class TestJMSAccessorService extends XTestCase {
private Services services;
private static Random random = new Random();
+ private static final int JMS_TIMEOUT_MS = 5000;
@Override
protected void setUp() throws Exception {
@@ -66,89 +68,63 @@ public class TestJMSAccessorService extends XTestCase {
}
@Test
- public void testRegisterSingleConsumerPerTopic() {
-
- try {
- HCatAccessorService hcatService = services.get(HCatAccessorService.class);
- JMSAccessorService jmsService = services.get(JMSAccessorService.class);
- String server = "hcat.server.com:5080";
- String topic = "hcat.mydb.mytable";
-
- JMSConnectionInfo connInfo = hcatService.getJMSConnectionInfo(new URI("hcat://hcat.server.com:8020"));
- jmsService.registerForNotification(connInfo, topic, new HCatMessageHandler(server));
+ public void testRegisterSingleConsumerPerTopic() throws URISyntaxException {
+ HCatAccessorService hcatService = services.get(HCatAccessorService.class);
+ JMSAccessorService jmsService = services.get(JMSAccessorService.class);
+ String server = "hcat.server.com:5080";
+ String topic = "hcat.mydb.mytable";
- MessageReceiver receiver1 = jmsService.getMessageReceiver(connInfo, topic);
- jmsService.registerForNotification(connInfo, topic, new HCatMessageHandler(server));
+ JMSConnectionInfo connInfo = hcatService.getJMSConnectionInfo(new URI("hcat://hcat.server.com:8020"));
+ jmsService.registerForNotification(connInfo, topic, new HCatMessageHandler(server));
- MessageReceiver receiver2 = jmsService.getMessageReceiver(connInfo, topic);
- assertEquals(receiver1, receiver2);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail("Exception encountered : " + e);
- }
+ MessageReceiver receiver1 = jmsService.getMessageReceiver(connInfo, topic);
+ jmsService.registerForNotification(connInfo, topic, new HCatMessageHandler(server));
+ MessageReceiver receiver2 = jmsService.getMessageReceiver(connInfo, topic);
+ assertEquals(receiver1, receiver2);
}
@Test
- public void testUnRegisterTopic() {
-
- try {
- HCatAccessorService hcatService = services.get(HCatAccessorService.class);
- JMSAccessorService jmsService = services.get(JMSAccessorService.class);
- String server = "hcat.server.com:5080";
- String topic = "hcatalog.mydb.mytable";
-
- JMSConnectionInfo connInfo = hcatService.getJMSConnectionInfo(new URI("hcat://hcat.server.com:8020"));
- jmsService.registerForNotification(connInfo, topic, new HCatMessageHandler(server));
+ public void testUnRegisterTopic() throws URISyntaxException {
+ HCatAccessorService hcatService = services.get(HCatAccessorService.class);
+ JMSAccessorService jmsService = services.get(JMSAccessorService.class);
+ String server = "hcat.server.com:5080";
+ String topic = "hcatalog.mydb.mytable";
- MessageReceiver receiver1 = jmsService.getMessageReceiver(connInfo, topic);
- assertNotNull(receiver1);
+ JMSConnectionInfo connInfo = hcatService.getJMSConnectionInfo(new URI("hcat://hcat.server.com:8020"));
+ jmsService.registerForNotification(connInfo, topic, new HCatMessageHandler(server));
- jmsService.unregisterFromNotification(connInfo, topic);
+ MessageReceiver receiver1 = jmsService.getMessageReceiver(connInfo, topic);
+ assertNotNull(receiver1);
- receiver1 = jmsService.getMessageReceiver(connInfo, topic);
- assertEquals(null, receiver1);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail("Exception encountered : " + e);
- }
+ jmsService.unregisterFromNotification(connInfo, topic);
+ receiver1 = jmsService.getMessageReceiver(connInfo, topic);
+ assertEquals(null, receiver1);
}
@Test
- public void testConnectionContext() throws ServiceException {
- try {
- services.destroy();
- services = super.setupServicesForHCatalog();
- Configuration conf = services.getConf();
- // set the connection factory name
- String jmsURL = "hcat://${1}.${2}.server.com:8020=java.naming.factory.initial#" +
- "org.apache.activemq.jndi.ActiveMQInitialContextFactory" +
- ";java.naming.provider.url#vm://localhost?broker.persistent=false;" +
- "connectionFactoryNames#dynamicFactories/hcat.prod.${1}";
- conf.set(HCatAccessorService.JMS_CONNECTIONS_PROPERTIES, jmsURL);
- services.init();
- HCatAccessorService hcatService = services.get(HCatAccessorService.class);
- JMSConnectionInfo connInfo = hcatService.getJMSConnectionInfo(new URI("hcat://hcatserver.blue.server.com:8020"));
- assertEquals(
- "java.naming.factory.initial#org.apache.activemq.jndi.ActiveMQInitialContextFactory;java.naming.provider.url#" +
- "vm://localhost?broker.persistent=false;connectionFactoryNames#dynamicFactories/hcat.prod.hcatserver",
- connInfo.getJNDIPropertiesString());
-
- ConnectionContext ctx1 = new DefaultConnectionContext();
- ctx1.createConnection(connInfo.getJNDIProperties());
- BrokerService broker = new BrokerService();
- broker.setDataDirectory(getTestCaseDir());
- // Without this stop testConnectionRetry fails with
- // javax.management.InstanceAlreadyExistsException: org.apache.activemq:BrokerName=localhost,Type=Broker
- broker.stop();
- }
- catch (Exception e) {
- e.printStackTrace();
- fail("Unexpected exception " + e);
- }
+ public void testConnectionContext() throws Exception {
+ services.destroy();
+ services = super.setupServicesForHCatalog();
+ Configuration conf = services.getConf();
+ // set the connection factory name
+ String jmsURL = "hcat://${1}.${2}.server.com:8020=java.naming.factory.initial#" +
+ "org.apache.activemq.jndi.ActiveMQInitialContextFactory" +
+ ";java.naming.provider.url#vm://localhost?broker.persistent=false;" +
+ "connectionFactoryNames#dynamicFactories/hcat.prod.${1}";
+ conf.set(HCatAccessorService.JMS_CONNECTIONS_PROPERTIES, jmsURL);
+ services.init();
+ HCatAccessorService hcatService = services.get(HCatAccessorService.class);
+ JMSConnectionInfo connInfo = hcatService.getJMSConnectionInfo(new URI("hcat://hcatserver.blue.server.com:8020"));
+ assertEquals(
+ "java.naming.factory.initial#org.apache.activemq.jndi.ActiveMQInitialContextFactory;java.naming.provider.url#" +
+ "vm://localhost?broker.persistent=false;connectionFactoryNames#dynamicFactories/hcat.prod.hcatserver",
+ connInfo.getJNDIPropertiesString());
+
+ ConnectionContext ctx = new DefaultConnectionContext();
+ ctx.createConnection(connInfo.getJNDIProperties());
+ ctx.close();
}
@Test
@@ -176,69 +152,94 @@ public class TestJMSAccessorService extends XTestCase {
assertTrue(jmsService.isTopicInRetryList(connInfo, topic));
// Start the broker and check if listening to topic now
BrokerService broker = new BrokerService();
- broker.addConnector(brokerURl);
- broker.setDataDirectory(getTestCaseDir());
- broker.start();
- Thread.sleep(1000);
- assertTrue(jmsService.isListeningToTopic(connInfo, topic));
- assertFalse(jmsService.isConnectionInRetryList(connInfo));
- assertFalse(jmsService.isTopicInRetryList(connInfo, topic));
- broker.stop();
- jmsService.destroy();
-
+ try {
+ broker.addConnector(brokerURl);
+ broker.setDataDirectory(getTestCaseDir());
+ broker.start();
+
+ waitFor(JMS_TIMEOUT_MS, new Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return jmsService.isListeningToTopic(connInfo, topic);
+ }
+ });
+ assertTrue(jmsService.isListeningToTopic(connInfo, topic));
+ assertFalse(jmsService.isConnectionInRetryList(connInfo));
+ assertFalse(jmsService.isTopicInRetryList(connInfo, topic));
+ } finally {
+ broker.stop();
+ }
}
@Test
public void testConnectionRetryExceptionListener() throws Exception {
- services.destroy();
- services = super.setupServicesForHCatalog();
- int randomPort = 30000 + random.nextInt(10000);
- String brokerURL = "tcp://localhost:" + randomPort;
- String jndiPropertiesString = "java.naming.factory.initial#" + ActiveMQConnFactory + ";"
- + "java.naming.provider.url#" + brokerURL + ";" + "connectionFactoryNames#" + "ConnectionFactory";
- Configuration servicesConf = services.getConf();
- servicesConf.set(JMSAccessorService.CONF_RETRY_INITIAL_DELAY, "1");
- servicesConf.set(JMSAccessorService.CONF_RETRY_MAX_ATTEMPTS, "3");
- servicesConf.set(HCatAccessorService.JMS_CONNECTIONS_PROPERTIES, "default=" + jndiPropertiesString);
- services.init();
- HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
- JMSAccessorService jmsService = Services.get().get(JMSAccessorService.class);
-
- String publisherAuthority = "hcat.server.com:5080";
- String topic = "topic.topic1";
- // Start the broker
- BrokerService broker = new BrokerService();
- broker.addConnector(brokerURL);
- broker.setDataDirectory(getTestCaseDir());
- broker.start();
- JMSConnectionInfo connInfo = hcatService.getJMSConnectionInfo(new URI("hcat://hcat.server.com:8020"));
- jmsService.registerForNotification(connInfo, topic, new HCatMessageHandler(publisherAuthority));
- assertTrue(jmsService.isListeningToTopic(connInfo, topic));
- assertFalse(jmsService.isConnectionInRetryList(connInfo));
- assertFalse(jmsService.isTopicInRetryList(connInfo, topic));
- ConnectionContext connCtxt = jmsService.createConnectionContext(connInfo);
- broker.stop();
+ BrokerService broker = null;
try {
- connCtxt.createSession(Session.AUTO_ACKNOWLEDGE);
- fail("Exception expected");
- }
- catch (Exception e) {
- Thread.sleep(100);
- assertFalse(jmsService.isListeningToTopic(connInfo, topic));
- assertTrue(jmsService.isConnectionInRetryList(connInfo));
- assertTrue(jmsService.isTopicInRetryList(connInfo, topic));
- }
- broker = new BrokerService();
- broker.addConnector(brokerURL);
- broker.setDataDirectory(getTestCaseDir());
- broker.start();
- Thread.sleep(1000);
- assertTrue(jmsService.isListeningToTopic(connInfo, topic));
- assertFalse(jmsService.isConnectionInRetryList(connInfo));
- assertFalse(jmsService.isTopicInRetryList(connInfo, topic));
- broker.stop();
- jmsService.destroy();
+ services.destroy();
+ services = super.setupServicesForHCatalog();
+ int randomPort = 30000 + random.nextInt(10000);
+ String brokerURL = "tcp://localhost:" + randomPort;
+ String jndiPropertiesString = "java.naming.factory.initial#" + ActiveMQConnFactory + ";"
+ + "java.naming.provider.url#" + brokerURL + ";" + "connectionFactoryNames#" + "ConnectionFactory";
+ Configuration servicesConf = services.getConf();
+ servicesConf.set(JMSAccessorService.CONF_RETRY_INITIAL_DELAY, "1");
+ servicesConf.set(JMSAccessorService.CONF_RETRY_MAX_ATTEMPTS, "3");
+ servicesConf.set(HCatAccessorService.JMS_CONNECTIONS_PROPERTIES, "default=" + jndiPropertiesString);
+ services.init();
+ HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
+ JMSAccessorService jmsService = Services.get().get(JMSAccessorService.class);
+
+ String publisherAuthority = "hcat.server.com:5080";
+ String topic = "topic.topic1";
+ // Start the broker
+ broker = new BrokerService();
+ broker.addConnector(brokerURL);
+ broker.setDataDirectory(getTestCaseDir());
+ broker.start();
+ JMSConnectionInfo connInfo = hcatService.getJMSConnectionInfo(new URI("hcat://hcat.server.com:8020"));
+ jmsService.registerForNotification(connInfo, topic, new HCatMessageHandler(publisherAuthority));
+ assertTrue(jmsService.isListeningToTopic(connInfo, topic));
+ assertFalse(jmsService.isConnectionInRetryList(connInfo));
+ assertFalse(jmsService.isTopicInRetryList(connInfo, topic));
+ ConnectionContext connCtxt = jmsService.createConnectionContext(connInfo);
+ broker.stop();
+
+ try {
+ connCtxt.createSession(Session.AUTO_ACKNOWLEDGE);
+ fail("Exception expected");
+ }
+ catch (Exception e) {
+ waitFor(JMS_TIMEOUT_MS, new Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return !jmsService.isListeningToTopic(connInfo, topic);
+ }
+ });
+ assertFalse(jmsService.isListeningToTopic(connInfo, topic));
+ assertTrue(jmsService.isConnectionInRetryList(connInfo));
+ assertTrue(jmsService.isTopicInRetryList(connInfo, topic));
+ }
+ broker = new BrokerService();
+
+ broker.addConnector(brokerURL);
+ broker.setDataDirectory(getTestCaseDir());
+ broker.start();
+ waitFor(JMS_TIMEOUT_MS, new Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return jmsService.isListeningToTopic(connInfo, topic);
+ }
+ });
+ assertTrue(jmsService.isListeningToTopic(connInfo, topic));
+ assertFalse(jmsService.isConnectionInRetryList(connInfo));
+ assertFalse(jmsService.isTopicInRetryList(connInfo, topic));
+ broker.stop();
+ } finally {
+ if (broker != null) {
+ broker.stop();
+ }
+ }
}
@Test
@@ -258,18 +259,22 @@ public class TestJMSAccessorService extends XTestCase {
String publisherAuthority = "hcat.server.com:5080";
String topic = "topic.topic1";
JMSConnectionInfo connInfo = hcatService.getJMSConnectionInfo(new URI("hcat://hcat.server.com:8020"));
+
jmsService.registerForNotification(connInfo, topic, new HCatMessageHandler(publisherAuthority));
assertTrue(jmsService.isConnectionInRetryList(connInfo));
assertTrue(jmsService.isTopicInRetryList(connInfo, topic));
assertFalse(jmsService.isListeningToTopic(connInfo, topic));
- Thread.sleep(1100);
+ waitFor(JMS_TIMEOUT_MS, new Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return jmsService.getNumConnectionAttempts(connInfo) == 1;
+ }
+ });
+
// Should not retry again as max attempt is 1
assertTrue(jmsService.isConnectionInRetryList(connInfo));
assertTrue(jmsService.isTopicInRetryList(connInfo, topic));
assertFalse(jmsService.isListeningToTopic(connInfo, topic));
- assertEquals(1, jmsService.getNumConnectionAttempts(connInfo));
assertFalse(jmsService.retryConnection(connInfo));
- jmsService.destroy();
}
-
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/61c646c3/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 060f4fb..fd7bd76 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 5.1.0 release (trunk - unreleased)
+OOZIE-3240 Flaky test TestJMSAccessorService#testConnectionRetry (pbacsko via gezapeti)
OOZIE-3246 Flaky test TestJMSJobEventListener#testConnectionDrop (pbacsko via gezapeti)
OOZIE-3236 Fix flaky test TestHiveActionExecutor#testHiveAction (pbacsko via gezapeti)
OOZIE-3235 Upgrade ActiveMQ to 5.15.3 (matijhs via andras.piros)