You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by ge...@apache.org on 2018/05/16 11:53:16 UTC

[2/2] oozie git commit: OOZIE-3240 Flaky test TestJMSAccessorService#testConnectionRetry (pbacsko via gezapeti)

OOZIE-3240 Flaky test TestJMSAccessorService#testConnectionRetry (pbacsko via gezapeti)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/61c646c3
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/61c646c3
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/61c646c3

Branch: refs/heads/master
Commit: 61c646c332e6129f3502ee235e5e6d28fe55addd
Parents: 117153a
Author: Gezapeti Cseh <ge...@apache.org>
Authored: Wed May 16 13:52:57 2018 +0200
Committer: Gezapeti Cseh <ge...@apache.org>
Committed: Wed May 16 13:52:57 2018 +0200

----------------------------------------------------------------------
 .../oozie/service/TestJMSAccessorService.java   | 267 ++++++++++---------
 release-log.txt                                 |   1 +
 2 files changed, 137 insertions(+), 131 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/61c646c3/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java b/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java
index 41241d2..dbf892e 100644
--- a/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java
@@ -19,6 +19,7 @@
 package org.apache.oozie.service;
 
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.Random;
 
 import javax.jms.Session;
@@ -36,6 +37,7 @@ import org.junit.Test;
 public class TestJMSAccessorService extends XTestCase {
     private Services services;
     private static Random random = new Random();
+    private static final int JMS_TIMEOUT_MS = 5000;
 
     @Override
     protected void setUp() throws Exception {
@@ -66,89 +68,63 @@ public class TestJMSAccessorService extends XTestCase {
     }
 
     @Test
-    public void testRegisterSingleConsumerPerTopic() {
-
-        try {
-            HCatAccessorService hcatService = services.get(HCatAccessorService.class);
-            JMSAccessorService jmsService = services.get(JMSAccessorService.class);
-            String server = "hcat.server.com:5080";
-            String topic = "hcat.mydb.mytable";
-
-            JMSConnectionInfo connInfo = hcatService.getJMSConnectionInfo(new URI("hcat://hcat.server.com:8020"));
-            jmsService.registerForNotification(connInfo, topic, new HCatMessageHandler(server));
+    public void testRegisterSingleConsumerPerTopic() throws URISyntaxException {
+        HCatAccessorService hcatService = services.get(HCatAccessorService.class);
+        JMSAccessorService jmsService = services.get(JMSAccessorService.class);
+        String server = "hcat.server.com:5080";
+        String topic = "hcat.mydb.mytable";
 
-            MessageReceiver receiver1 = jmsService.getMessageReceiver(connInfo, topic);
-            jmsService.registerForNotification(connInfo, topic, new HCatMessageHandler(server));
+        JMSConnectionInfo connInfo = hcatService.getJMSConnectionInfo(new URI("hcat://hcat.server.com:8020"));
+        jmsService.registerForNotification(connInfo, topic, new HCatMessageHandler(server));
 
-            MessageReceiver receiver2 = jmsService.getMessageReceiver(connInfo, topic);
-            assertEquals(receiver1, receiver2);
-        }
-        catch (Exception e) {
-            e.printStackTrace();
-            fail("Exception encountered : " + e);
-        }
+        MessageReceiver receiver1 = jmsService.getMessageReceiver(connInfo, topic);
+        jmsService.registerForNotification(connInfo, topic, new HCatMessageHandler(server));
 
+        MessageReceiver receiver2 = jmsService.getMessageReceiver(connInfo, topic);
+        assertEquals(receiver1, receiver2);
     }
 
     @Test
-    public void testUnRegisterTopic() {
-
-        try {
-            HCatAccessorService hcatService = services.get(HCatAccessorService.class);
-            JMSAccessorService jmsService = services.get(JMSAccessorService.class);
-            String server = "hcat.server.com:5080";
-            String topic = "hcatalog.mydb.mytable";
-
-            JMSConnectionInfo connInfo = hcatService.getJMSConnectionInfo(new URI("hcat://hcat.server.com:8020"));
-            jmsService.registerForNotification(connInfo, topic, new HCatMessageHandler(server));
+    public void testUnRegisterTopic() throws URISyntaxException {
+        HCatAccessorService hcatService = services.get(HCatAccessorService.class);
+        JMSAccessorService jmsService = services.get(JMSAccessorService.class);
+        String server = "hcat.server.com:5080";
+        String topic = "hcatalog.mydb.mytable";
 
-            MessageReceiver receiver1 = jmsService.getMessageReceiver(connInfo, topic);
-            assertNotNull(receiver1);
+        JMSConnectionInfo connInfo = hcatService.getJMSConnectionInfo(new URI("hcat://hcat.server.com:8020"));
+        jmsService.registerForNotification(connInfo, topic, new HCatMessageHandler(server));
 
-            jmsService.unregisterFromNotification(connInfo, topic);
+        MessageReceiver receiver1 = jmsService.getMessageReceiver(connInfo, topic);
+        assertNotNull(receiver1);
 
-            receiver1 = jmsService.getMessageReceiver(connInfo, topic);
-            assertEquals(null, receiver1);
-        }
-        catch (Exception e) {
-            e.printStackTrace();
-            fail("Exception encountered : " + e);
-        }
+        jmsService.unregisterFromNotification(connInfo, topic);
 
+        receiver1 = jmsService.getMessageReceiver(connInfo, topic);
+        assertEquals(null, receiver1);
     }
 
     @Test
-    public void testConnectionContext() throws ServiceException {
-        try {
-            services.destroy();
-            services = super.setupServicesForHCatalog();
-            Configuration conf = services.getConf();
-            // set the connection factory name
-            String jmsURL = "hcat://${1}.${2}.server.com:8020=java.naming.factory.initial#" +
-                    "org.apache.activemq.jndi.ActiveMQInitialContextFactory" +
-                    ";java.naming.provider.url#vm://localhost?broker.persistent=false;" +
-                    "connectionFactoryNames#dynamicFactories/hcat.prod.${1}";
-            conf.set(HCatAccessorService.JMS_CONNECTIONS_PROPERTIES, jmsURL);
-            services.init();
-            HCatAccessorService hcatService = services.get(HCatAccessorService.class);
-            JMSConnectionInfo connInfo = hcatService.getJMSConnectionInfo(new URI("hcat://hcatserver.blue.server.com:8020"));
-            assertEquals(
-                    "java.naming.factory.initial#org.apache.activemq.jndi.ActiveMQInitialContextFactory;java.naming.provider.url#" +
-                    "vm://localhost?broker.persistent=false;connectionFactoryNames#dynamicFactories/hcat.prod.hcatserver",
-                    connInfo.getJNDIPropertiesString());
-
-            ConnectionContext ctx1 = new DefaultConnectionContext();
-            ctx1.createConnection(connInfo.getJNDIProperties());
-            BrokerService broker = new BrokerService();
-            broker.setDataDirectory(getTestCaseDir());
-            // Without this stop testConnectionRetry fails with
-            // javax.management.InstanceAlreadyExistsException: org.apache.activemq:BrokerName=localhost,Type=Broker
-            broker.stop();
-        }
-        catch (Exception e) {
-            e.printStackTrace();
-            fail("Unexpected exception " + e);
-        }
+    public void testConnectionContext() throws Exception {
+        services.destroy();
+        services = super.setupServicesForHCatalog();
+        Configuration conf = services.getConf();
+        // set the connection factory name
+        String jmsURL = "hcat://${1}.${2}.server.com:8020=java.naming.factory.initial#" +
+                "org.apache.activemq.jndi.ActiveMQInitialContextFactory" +
+                ";java.naming.provider.url#vm://localhost?broker.persistent=false;" +
+                "connectionFactoryNames#dynamicFactories/hcat.prod.${1}";
+        conf.set(HCatAccessorService.JMS_CONNECTIONS_PROPERTIES, jmsURL);
+        services.init();
+        HCatAccessorService hcatService = services.get(HCatAccessorService.class);
+        JMSConnectionInfo connInfo = hcatService.getJMSConnectionInfo(new URI("hcat://hcatserver.blue.server.com:8020"));
+        assertEquals(
+                "java.naming.factory.initial#org.apache.activemq.jndi.ActiveMQInitialContextFactory;java.naming.provider.url#" +
+                        "vm://localhost?broker.persistent=false;connectionFactoryNames#dynamicFactories/hcat.prod.hcatserver",
+                        connInfo.getJNDIPropertiesString());
+
+        ConnectionContext ctx = new DefaultConnectionContext();
+        ctx.createConnection(connInfo.getJNDIProperties());
+        ctx.close();
     }
 
     @Test
@@ -176,69 +152,94 @@ public class TestJMSAccessorService extends XTestCase {
         assertTrue(jmsService.isTopicInRetryList(connInfo, topic));
         // Start the broker and check if listening to topic now
         BrokerService broker = new BrokerService();
-        broker.addConnector(brokerURl);
-        broker.setDataDirectory(getTestCaseDir());
-        broker.start();
-        Thread.sleep(1000);
-        assertTrue(jmsService.isListeningToTopic(connInfo, topic));
-        assertFalse(jmsService.isConnectionInRetryList(connInfo));
-        assertFalse(jmsService.isTopicInRetryList(connInfo, topic));
-        broker.stop();
-        jmsService.destroy();
-
+        try {
+            broker.addConnector(brokerURl);
+            broker.setDataDirectory(getTestCaseDir());
+            broker.start();
+
+            waitFor(JMS_TIMEOUT_MS, new Predicate() {
+                @Override
+                public boolean evaluate() throws Exception {
+                    return jmsService.isListeningToTopic(connInfo, topic);
+                }
+            });
+            assertTrue(jmsService.isListeningToTopic(connInfo, topic));
+            assertFalse(jmsService.isConnectionInRetryList(connInfo));
+            assertFalse(jmsService.isTopicInRetryList(connInfo, topic));
+        } finally {
+            broker.stop();
+        }
     }
 
     @Test
     public void testConnectionRetryExceptionListener() throws Exception {
-        services.destroy();
-        services = super.setupServicesForHCatalog();
-        int randomPort = 30000 + random.nextInt(10000);
-        String brokerURL = "tcp://localhost:" + randomPort;
-        String jndiPropertiesString = "java.naming.factory.initial#" + ActiveMQConnFactory + ";"
-                + "java.naming.provider.url#" + brokerURL + ";" + "connectionFactoryNames#" + "ConnectionFactory";
-        Configuration servicesConf = services.getConf();
-        servicesConf.set(JMSAccessorService.CONF_RETRY_INITIAL_DELAY, "1");
-        servicesConf.set(JMSAccessorService.CONF_RETRY_MAX_ATTEMPTS, "3");
-        servicesConf.set(HCatAccessorService.JMS_CONNECTIONS_PROPERTIES, "default=" + jndiPropertiesString);
-        services.init();
-        HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
-        JMSAccessorService jmsService = Services.get().get(JMSAccessorService.class);
-
-        String publisherAuthority = "hcat.server.com:5080";
-        String topic = "topic.topic1";
-        // Start the broker
-        BrokerService broker = new BrokerService();
-        broker.addConnector(brokerURL);
-        broker.setDataDirectory(getTestCaseDir());
-        broker.start();
-        JMSConnectionInfo connInfo = hcatService.getJMSConnectionInfo(new URI("hcat://hcat.server.com:8020"));
-        jmsService.registerForNotification(connInfo, topic, new HCatMessageHandler(publisherAuthority));
-        assertTrue(jmsService.isListeningToTopic(connInfo, topic));
-        assertFalse(jmsService.isConnectionInRetryList(connInfo));
-        assertFalse(jmsService.isTopicInRetryList(connInfo, topic));
-        ConnectionContext connCtxt = jmsService.createConnectionContext(connInfo);
-        broker.stop();
+        BrokerService broker = null;
         try {
-            connCtxt.createSession(Session.AUTO_ACKNOWLEDGE);
-            fail("Exception expected");
-        }
-        catch (Exception e) {
-            Thread.sleep(100);
-            assertFalse(jmsService.isListeningToTopic(connInfo, topic));
-            assertTrue(jmsService.isConnectionInRetryList(connInfo));
-            assertTrue(jmsService.isTopicInRetryList(connInfo, topic));
-        }
-        broker = new BrokerService();
-        broker.addConnector(brokerURL);
-        broker.setDataDirectory(getTestCaseDir());
-        broker.start();
-        Thread.sleep(1000);
-        assertTrue(jmsService.isListeningToTopic(connInfo, topic));
-        assertFalse(jmsService.isConnectionInRetryList(connInfo));
-        assertFalse(jmsService.isTopicInRetryList(connInfo, topic));
-        broker.stop();
-        jmsService.destroy();
+            services.destroy();
+            services = super.setupServicesForHCatalog();
+            int randomPort = 30000 + random.nextInt(10000);
+            String brokerURL = "tcp://localhost:" + randomPort;
+            String jndiPropertiesString = "java.naming.factory.initial#" + ActiveMQConnFactory + ";"
+                    + "java.naming.provider.url#" + brokerURL + ";" + "connectionFactoryNames#" + "ConnectionFactory";
+            Configuration servicesConf = services.getConf();
+            servicesConf.set(JMSAccessorService.CONF_RETRY_INITIAL_DELAY, "1");
+            servicesConf.set(JMSAccessorService.CONF_RETRY_MAX_ATTEMPTS, "3");
+            servicesConf.set(HCatAccessorService.JMS_CONNECTIONS_PROPERTIES, "default=" + jndiPropertiesString);
+            services.init();
+            HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
+            JMSAccessorService jmsService = Services.get().get(JMSAccessorService.class);
+
+            String publisherAuthority = "hcat.server.com:5080";
+            String topic = "topic.topic1";
+            // Start the broker
+            broker = new BrokerService();
+            broker.addConnector(brokerURL);
+            broker.setDataDirectory(getTestCaseDir());
+            broker.start();
+            JMSConnectionInfo connInfo = hcatService.getJMSConnectionInfo(new URI("hcat://hcat.server.com:8020"));
+            jmsService.registerForNotification(connInfo, topic, new HCatMessageHandler(publisherAuthority));
+            assertTrue(jmsService.isListeningToTopic(connInfo, topic));
+            assertFalse(jmsService.isConnectionInRetryList(connInfo));
+            assertFalse(jmsService.isTopicInRetryList(connInfo, topic));
+            ConnectionContext connCtxt = jmsService.createConnectionContext(connInfo);
+            broker.stop();
+
+            try {
+                connCtxt.createSession(Session.AUTO_ACKNOWLEDGE);
+                fail("Exception expected");
+            }
+            catch (Exception e) {
+                waitFor(JMS_TIMEOUT_MS, new Predicate() {
+                    @Override
+                    public boolean evaluate() throws Exception {
+                        return !jmsService.isListeningToTopic(connInfo, topic);
+                    }
+                });
+                assertFalse(jmsService.isListeningToTopic(connInfo, topic));
+                assertTrue(jmsService.isConnectionInRetryList(connInfo));
+                assertTrue(jmsService.isTopicInRetryList(connInfo, topic));
+            }
+            broker = new BrokerService();
+
+            broker.addConnector(brokerURL);
+            broker.setDataDirectory(getTestCaseDir());
+            broker.start();
+            waitFor(JMS_TIMEOUT_MS, new Predicate() {
+                @Override
+                public boolean evaluate() throws Exception {
+                    return jmsService.isListeningToTopic(connInfo, topic);
+                }
+            });
+            assertTrue(jmsService.isListeningToTopic(connInfo, topic));
+            assertFalse(jmsService.isConnectionInRetryList(connInfo));
+            assertFalse(jmsService.isTopicInRetryList(connInfo, topic));
 
+            broker.stop();
+        } finally {
+            if (broker != null) {
+                broker.stop();
+            }
+        }
     }
 
     @Test
@@ -258,18 +259,22 @@ public class TestJMSAccessorService extends XTestCase {
         String publisherAuthority = "hcat.server.com:5080";
         String topic = "topic.topic1";
         JMSConnectionInfo connInfo = hcatService.getJMSConnectionInfo(new URI("hcat://hcat.server.com:8020"));
+
         jmsService.registerForNotification(connInfo, topic, new HCatMessageHandler(publisherAuthority));
         assertTrue(jmsService.isConnectionInRetryList(connInfo));
         assertTrue(jmsService.isTopicInRetryList(connInfo, topic));
         assertFalse(jmsService.isListeningToTopic(connInfo, topic));
-        Thread.sleep(1100);
+        waitFor(JMS_TIMEOUT_MS, new Predicate() {
+            @Override
+            public boolean evaluate() throws Exception {
+                return jmsService.getNumConnectionAttempts(connInfo) == 1;
+            }
+        });
+
         // Should not retry again as max attempt is 1
         assertTrue(jmsService.isConnectionInRetryList(connInfo));
         assertTrue(jmsService.isTopicInRetryList(connInfo, topic));
         assertFalse(jmsService.isListeningToTopic(connInfo, topic));
-        assertEquals(1, jmsService.getNumConnectionAttempts(connInfo));
         assertFalse(jmsService.retryConnection(connInfo));
-        jmsService.destroy();
     }
-
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/61c646c3/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 060f4fb..fd7bd76 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 5.1.0 release (trunk - unreleased)
 
+OOZIE-3240 Flaky test TestJMSAccessorService#testConnectionRetry (pbacsko via gezapeti)
 OOZIE-3246 Flaky test TestJMSJobEventListener#testConnectionDrop (pbacsko via gezapeti)
 OOZIE-3236 Fix flaky test TestHiveActionExecutor#testHiveAction (pbacsko via gezapeti)
 OOZIE-3235 Upgrade ActiveMQ to 5.15.3 (matijhs via andras.piros)