You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by ge...@apache.org on 2018/05/16 11:53:15 UTC

[1/2] oozie git commit: OOZIE-3246 Flaky test TestJMSJobEventListener#testConnectionDrop (pbacsko via gezapeti)

Repository: oozie
Updated Branches:
  refs/heads/master 88aa654d6 -> 61c646c33


OOZIE-3246 Flaky test TestJMSJobEventListener#testConnectionDrop (pbacsko via gezapeti)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/117153a9
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/117153a9
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/117153a9

Branch: refs/heads/master
Commit: 117153a90f7a7b42fbdaa252b8ec7821e6ef3d29
Parents: 88aa654
Author: Gezapeti Cseh <ge...@apache.org>
Authored: Wed May 16 13:52:10 2018 +0200
Committer: Gezapeti Cseh <ge...@apache.org>
Committed: Wed May 16 13:52:10 2018 +0200

----------------------------------------------------------------------
 .../oozie/jms/TestJMSJobEventListener.java      | 534 ++++++++-----------
 release-log.txt                                 |   1 +
 2 files changed, 226 insertions(+), 309 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/117153a9/core/src/test/java/org/apache/oozie/jms/TestJMSJobEventListener.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/jms/TestJMSJobEventListener.java b/core/src/test/java/org/apache/oozie/jms/TestJMSJobEventListener.java
index f375dec..8e8db51 100644
--- a/core/src/test/java/org/apache/oozie/jms/TestJMSJobEventListener.java
+++ b/core/src/test/java/org/apache/oozie/jms/TestJMSJobEventListener.java
@@ -18,7 +18,6 @@
 
 package org.apache.oozie.jms;
 
-import java.text.ParseException;
 import java.util.Date;
 import java.util.Random;
 
@@ -77,7 +76,7 @@ public class TestJMSJobEventListener extends XTestCase {
     }
 
     @Test
-    public void testOnWorkflowJobStartedEvent() throws ParseException {
+    public void testOnWorkflowJobStartedEvent() throws Exception {
         JMSJobEventListener wfEventListener = new JMSJobEventListener();
         wfEventListener.init(conf);
         Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z");
@@ -85,32 +84,26 @@ public class TestJMSJobEventListener extends XTestCase {
                 "wf-app-name1", startDate, null);
 
         ConnectionContext jmsContext = getConnectionContext();
-        try {
-            Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
-            MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe));
-            wfEventListener.onWorkflowJobEvent(wfe);
-            TextMessage message = (TextMessage) consumer.receive(5000);
-            assertFalse(message.getText().contains("endTime"));
-            WorkflowJobMessage wfStartMessage = JMSMessagingUtils.getEventMessage(message);
-            assertEquals(WorkflowJob.Status.RUNNING, wfStartMessage.getStatus());
-            assertEquals(startDate, wfStartMessage.getStartTime());
-            assertEquals("wfId1", wfStartMessage.getId());
-            assertEquals("caId1", wfStartMessage.getParentId());
-            assertEquals(MessageType.JOB, wfStartMessage.getMessageType());
-            assertEquals(AppType.WORKFLOW_JOB, wfStartMessage.getAppType());
-            assertEquals(EventStatus.STARTED, wfStartMessage.getEventStatus());
-            assertEquals("user1", wfStartMessage.getUser());
-            assertEquals("wf-app-name1", wfStartMessage.getAppName());
-            wfEventListener.destroy();
-        }
-        catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
-        }
+        Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe));
+        wfEventListener.onWorkflowJobEvent(wfe);
+        TextMessage message = (TextMessage) consumer.receive(5000);
+        assertFalse(message.getText().contains("endTime"));
+        WorkflowJobMessage wfStartMessage = JMSMessagingUtils.getEventMessage(message);
+        assertEquals(WorkflowJob.Status.RUNNING, wfStartMessage.getStatus());
+        assertEquals(startDate, wfStartMessage.getStartTime());
+        assertEquals("wfId1", wfStartMessage.getId());
+        assertEquals("caId1", wfStartMessage.getParentId());
+        assertEquals(MessageType.JOB, wfStartMessage.getMessageType());
+        assertEquals(AppType.WORKFLOW_JOB, wfStartMessage.getAppType());
+        assertEquals(EventStatus.STARTED, wfStartMessage.getEventStatus());
+        assertEquals("user1", wfStartMessage.getUser());
+        assertEquals("wf-app-name1", wfStartMessage.getAppName());
+        wfEventListener.destroy();
     }
 
     @Test
-    public void testOnWorkflowJobSuccessEvent() throws ParseException {
+    public void testOnWorkflowJobSuccessEvent() throws Exception {
         JMSJobEventListener wfEventListener = new JMSJobEventListener();
         wfEventListener.init(conf);
         Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z");
@@ -119,33 +112,26 @@ public class TestJMSJobEventListener extends XTestCase {
                 "wf-app-name1", startDate, endDate);
 
         ConnectionContext jmsContext = getConnectionContext();
-        try {
-            Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
-            MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe));
-            wfEventListener.onWorkflowJobEvent(wfe);
-            TextMessage message = (TextMessage) consumer.receive(5000);
-            WorkflowJobMessage wfSuccMessage = JMSMessagingUtils.getEventMessage(message);
-            assertEquals(WorkflowJob.Status.SUCCEEDED, wfSuccMessage.getStatus());
-            assertEquals(startDate, wfSuccMessage.getStartTime());
-            assertEquals(endDate, wfSuccMessage.getEndTime());
-            assertEquals("wfId1", wfSuccMessage.getId());
-            assertEquals("caId1", wfSuccMessage.getParentId());
-            assertEquals(MessageType.JOB, wfSuccMessage.getMessageType());
-            assertEquals(AppType.WORKFLOW_JOB, wfSuccMessage.getAppType());
-            assertEquals(EventStatus.SUCCESS, wfSuccMessage.getEventStatus());
-            assertEquals("user1", wfSuccMessage.getUser());
-            assertEquals("wf-app-name1", wfSuccMessage.getAppName());
-            wfEventListener.destroy();
-
-        }
-        catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
-        }
+        Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe));
+        wfEventListener.onWorkflowJobEvent(wfe);
+        TextMessage message = (TextMessage) consumer.receive(5000);
+        WorkflowJobMessage wfSuccMessage = JMSMessagingUtils.getEventMessage(message);
+        assertEquals(WorkflowJob.Status.SUCCEEDED, wfSuccMessage.getStatus());
+        assertEquals(startDate, wfSuccMessage.getStartTime());
+        assertEquals(endDate, wfSuccMessage.getEndTime());
+        assertEquals("wfId1", wfSuccMessage.getId());
+        assertEquals("caId1", wfSuccMessage.getParentId());
+        assertEquals(MessageType.JOB, wfSuccMessage.getMessageType());
+        assertEquals(AppType.WORKFLOW_JOB, wfSuccMessage.getAppType());
+        assertEquals(EventStatus.SUCCESS, wfSuccMessage.getEventStatus());
+        assertEquals("user1", wfSuccMessage.getUser());
+        assertEquals("wf-app-name1", wfSuccMessage.getAppName());
+        wfEventListener.destroy();
     }
 
     @Test
-    public void testOnWorkflowJobFailureEvent() throws ParseException {
+    public void testOnWorkflowJobFailureEvent() throws Exception {
         JMSJobEventListener wfEventListener = new JMSJobEventListener();
         wfEventListener.init(conf);
         Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z");
@@ -155,171 +141,136 @@ public class TestJMSJobEventListener extends XTestCase {
         wfe.setErrorCode("dummyErrorCode");
         wfe.setErrorMessage("dummyErrorMessage");
         ConnectionContext jmsContext = getConnectionContext();
-        try {
-            Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
-            MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe));
-            wfEventListener.onWorkflowJobEvent(wfe);
-            TextMessage message = (TextMessage) consumer.receive(5000);
-            WorkflowJobMessage wfFailMessage = JMSMessagingUtils.getEventMessage(message);
-            assertEquals(WorkflowJob.Status.FAILED, wfFailMessage.getStatus());
-            assertEquals(startDate, wfFailMessage.getStartTime());
-            assertEquals(endDate, wfFailMessage.getEndTime());
-            assertEquals("wfId1", wfFailMessage.getId());
-            assertEquals("caId1", wfFailMessage.getParentId());
-            assertEquals(MessageType.JOB, wfFailMessage.getMessageType());
-            assertEquals(AppType.WORKFLOW_JOB, wfFailMessage.getAppType());
-            assertEquals(EventStatus.FAILURE, wfFailMessage.getEventStatus());
-            assertEquals("user1", wfFailMessage.getUser());
-            assertEquals("wf-app-name1", wfFailMessage.getAppName());
-            assertEquals("dummyErrorCode", wfFailMessage.getErrorCode());
-            assertEquals("dummyErrorMessage", wfFailMessage.getErrorMessage());
-            wfEventListener.destroy();
-        }
-        catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
-        }
+        Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe));
+        wfEventListener.onWorkflowJobEvent(wfe);
+        TextMessage message = (TextMessage) consumer.receive(5000);
+        WorkflowJobMessage wfFailMessage = JMSMessagingUtils.getEventMessage(message);
+        assertEquals(WorkflowJob.Status.FAILED, wfFailMessage.getStatus());
+        assertEquals(startDate, wfFailMessage.getStartTime());
+        assertEquals(endDate, wfFailMessage.getEndTime());
+        assertEquals("wfId1", wfFailMessage.getId());
+        assertEquals("caId1", wfFailMessage.getParentId());
+        assertEquals(MessageType.JOB, wfFailMessage.getMessageType());
+        assertEquals(AppType.WORKFLOW_JOB, wfFailMessage.getAppType());
+        assertEquals(EventStatus.FAILURE, wfFailMessage.getEventStatus());
+        assertEquals("user1", wfFailMessage.getUser());
+        assertEquals("wf-app-name1", wfFailMessage.getAppName());
+        assertEquals("dummyErrorCode", wfFailMessage.getErrorCode());
+        assertEquals("dummyErrorMessage", wfFailMessage.getErrorMessage());
+        wfEventListener.destroy();
     }
 
     @Test
-    public void testOnWorkflowJobSuspendEvent() throws ParseException {
+    public void testOnWorkflowJobSuspendEvent() throws Exception {
         JMSJobEventListener wfEventListener = new JMSJobEventListener();
         wfEventListener.init(conf);
         Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z");
         WorkflowJobEvent wfe = new WorkflowJobEvent("wfId1", "caId1", WorkflowJob.Status.SUSPENDED, "user1",
                 "wf-app-name1", startDate, null);
         ConnectionContext jmsContext = getConnectionContext();
-        try {
-            Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
-            MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe));
-            wfEventListener.onWorkflowJobEvent(wfe);
-            TextMessage message = (TextMessage) consumer.receive(5000);
-            assertFalse(message.getText().contains("endTime"));
-            WorkflowJobMessage wfFailMessage = JMSMessagingUtils.getEventMessage(message);
-            assertEquals(WorkflowJob.Status.SUSPENDED, wfFailMessage.getStatus());
-            assertEquals(startDate, wfFailMessage.getStartTime());
-            assertEquals("wfId1", wfFailMessage.getId());
-            assertEquals("caId1", wfFailMessage.getParentId());
-            assertEquals(MessageType.JOB, wfFailMessage.getMessageType());
-            assertEquals(AppType.WORKFLOW_JOB, wfFailMessage.getAppType());
-            assertEquals(EventStatus.SUSPEND, wfFailMessage.getEventStatus());
-            assertEquals("user1", wfFailMessage.getUser());
-            assertEquals("wf-app-name1", wfFailMessage.getAppName());
-            assertNull(wfFailMessage.getErrorCode());
-            assertNull(wfFailMessage.getErrorMessage());
-            wfEventListener.destroy();
-        }
-        catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
-        }
+        Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe));
+        wfEventListener.onWorkflowJobEvent(wfe);
+        TextMessage message = (TextMessage) consumer.receive(5000);
+        assertFalse(message.getText().contains("endTime"));
+        WorkflowJobMessage wfFailMessage = JMSMessagingUtils.getEventMessage(message);
+        assertEquals(WorkflowJob.Status.SUSPENDED, wfFailMessage.getStatus());
+        assertEquals(startDate, wfFailMessage.getStartTime());
+        assertEquals("wfId1", wfFailMessage.getId());
+        assertEquals("caId1", wfFailMessage.getParentId());
+        assertEquals(MessageType.JOB, wfFailMessage.getMessageType());
+        assertEquals(AppType.WORKFLOW_JOB, wfFailMessage.getAppType());
+        assertEquals(EventStatus.SUSPEND, wfFailMessage.getEventStatus());
+        assertEquals("user1", wfFailMessage.getUser());
+        assertEquals("wf-app-name1", wfFailMessage.getAppName());
+        assertNull(wfFailMessage.getErrorCode());
+        assertNull(wfFailMessage.getErrorMessage());
+        wfEventListener.destroy();
     }
 
     @Test
-    public void testWorkflowJobSelectors() throws ParseException {
+    public void testWorkflowJobSelectors() throws Exception {
         JMSJobEventListener wfEventListener = new JMSJobEventListener();
         wfEventListener.init(conf);
         WorkflowJobEvent wfe = new WorkflowJobEvent("wfId1", "caId1", WorkflowJob.Status.FAILED, "user_1",
                 "wf-app-name1", new Date(), new Date());
         ConnectionContext jmsContext = getConnectionContext();
-        try {
-            Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
-            String selector = JMSHeaderConstants.USER + "='user_1'";
-            MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe), selector);
-            wfEventListener.onWorkflowJobEvent(wfe);
-            TextMessage message = (TextMessage) consumer.receive(5000);
-            WorkflowJobMessage wfFailMessage = JMSMessagingUtils.getEventMessage(message);
-            Assert.assertEquals(WorkflowJob.Status.FAILED, wfFailMessage.getStatus());
-            assertEquals("user_1", wfFailMessage.getUser());
-            assertEquals(MessageType.JOB, wfFailMessage.getMessageType());
-            wfEventListener.destroy();
-        }
-        catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
-        }
+        Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
+        String selector = JMSHeaderConstants.USER + "='user_1'";
+        MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe), selector);
+        wfEventListener.onWorkflowJobEvent(wfe);
+        TextMessage message = (TextMessage) consumer.receive(5000);
+        WorkflowJobMessage wfFailMessage = JMSMessagingUtils.getEventMessage(message);
+        Assert.assertEquals(WorkflowJob.Status.FAILED, wfFailMessage.getStatus());
+        assertEquals("user_1", wfFailMessage.getUser());
+        assertEquals(MessageType.JOB, wfFailMessage.getMessageType());
+        wfEventListener.destroy();
     }
 
     @Test
-    public void testWorkflowJobSelectorsNegative() {
+    public void testWorkflowJobSelectorsNegative() throws Exception {
         JMSJobEventListener wfEventListener = new JMSJobEventListener();
         wfEventListener.init(conf);
         WorkflowJobEvent wfe = new WorkflowJobEvent("wfId1", "caId1", WorkflowJob.Status.FAILED, "user1",
                 "wf-app-name1", new Date(), new Date());
         ConnectionContext jmsContext = getConnectionContext();
-        try {
-            Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
-            // Pass a selector which wont match and assert for null message
-            String selector = JMSHeaderConstants.USER + "='Non_matching_user'";
-            MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe), selector);
-            wfEventListener.onWorkflowJobEvent(wfe);
-            TextMessage message = (TextMessage) consumer.receive(5000);
-            assertNull(message);
-            wfEventListener.destroy();
-        }
-        catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
-        }
+        Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
+        // Pass a selector which wont match and assert for null message
+        String selector = JMSHeaderConstants.USER + "='Non_matching_user'";
+        MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe), selector);
+        wfEventListener.onWorkflowJobEvent(wfe);
+        TextMessage message = (TextMessage) consumer.receive(5000);
+        assertNull(message);
+        wfEventListener.destroy();
     }
 
     @Test
-    public void testWorkflowJobSelectorsOr() {
+    public void testWorkflowJobSelectorsOr() throws Exception {
         JMSJobEventListener wfEventListener = new JMSJobEventListener();
         wfEventListener.init(conf);
         WorkflowJobEvent wfe = new WorkflowJobEvent("wfId1", "caId1", WorkflowJob.Status.FAILED, "user1",
                 "wf-app-name1", new Date(), new Date());
         ConnectionContext jmsContext = getConnectionContext();
-        try {
-            Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
-            // Pass a selector using OR condition
-            String selector = JMSHeaderConstants.USER + "='Non_matching_user' OR " + JMSHeaderConstants.APP_NAME
-                    + "='wf-app-name1'";
-            MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe), selector);
-            wfEventListener.onWorkflowJobEvent(wfe);
-            TextMessage message = (TextMessage) consumer.receive(5000);
-            WorkflowJobMessage wfFailMessage = JMSMessagingUtils.getEventMessage(message);
-            Assert.assertEquals(WorkflowJob.Status.FAILED, wfFailMessage.getStatus());
-            assertEquals("user1", wfFailMessage.getUser());
-            assertEquals(MessageType.JOB, wfFailMessage.getMessageType());
-            wfEventListener.destroy();
-        }
-        catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
-        }
+        Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
+        // Pass a selector using OR condition
+        String selector = JMSHeaderConstants.USER + "='Non_matching_user' OR " + JMSHeaderConstants.APP_NAME
+                + "='wf-app-name1'";
+        MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe), selector);
+        wfEventListener.onWorkflowJobEvent(wfe);
+        TextMessage message = (TextMessage) consumer.receive(5000);
+        WorkflowJobMessage wfFailMessage = JMSMessagingUtils.getEventMessage(message);
+        Assert.assertEquals(WorkflowJob.Status.FAILED, wfFailMessage.getStatus());
+        assertEquals("user1", wfFailMessage.getUser());
+        assertEquals(MessageType.JOB, wfFailMessage.getMessageType());
+        wfEventListener.destroy();
     }
 
     @Test
-    public void testWorkflowJobSelectorsAnd() {
+    public void testWorkflowJobSelectorsAnd() throws Exception {
         JMSJobEventListener wfEventListener = new JMSJobEventListener();
         wfEventListener.init(conf);
         WorkflowJobEvent wfe = new WorkflowJobEvent("wfId1", "caId1", WorkflowJob.Status.FAILED, "user1",
                 "wf-app-name1", new Date(), new Date());
         ConnectionContext jmsContext = getConnectionContext();
-        try {
-            Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
-            // Pass a selector using AND condition
-            String selector = JMSHeaderConstants.EVENT_STATUS + "='FAILURE' AND " + JMSHeaderConstants.APP_TYPE
-                    + "='WORKFLOW_JOB' AND " + JMSHeaderConstants.MESSAGE_TYPE + "='JOB'";
-            MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe), selector);
-            wfEventListener.onWorkflowJobEvent(wfe);
-            TextMessage message = (TextMessage) consumer.receive(5000);
-            WorkflowJobMessage wfFailMessage = JMSMessagingUtils.getEventMessage(message);
-            Assert.assertEquals(WorkflowJob.Status.FAILED, wfFailMessage.getStatus());
-            assertEquals("user1", wfFailMessage.getUser());
-            assertEquals(MessageType.JOB, wfFailMessage.getMessageType());
-            wfEventListener.destroy();
-        }
-        catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
-        }
+        Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
+        // Pass a selector using AND condition
+        String selector = JMSHeaderConstants.EVENT_STATUS + "='FAILURE' AND " + JMSHeaderConstants.APP_TYPE
+                + "='WORKFLOW_JOB' AND " + JMSHeaderConstants.MESSAGE_TYPE + "='JOB'";
+        MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe), selector);
+        wfEventListener.onWorkflowJobEvent(wfe);
+        TextMessage message = (TextMessage) consumer.receive(5000);
+        WorkflowJobMessage wfFailMessage = JMSMessagingUtils.getEventMessage(message);
+        Assert.assertEquals(WorkflowJob.Status.FAILED, wfFailMessage.getStatus());
+        assertEquals("user1", wfFailMessage.getUser());
+        assertEquals(MessageType.JOB, wfFailMessage.getMessageType());
+        wfEventListener.destroy();
     }
 
     @Test
-    public void testConnectionDrop() {
+    public void testConnectionDrop() throws Exception {
         Random random = new Random();
+        BrokerService broker = null;
         try {
             services.destroy();
             services = new Services();
@@ -334,7 +285,7 @@ public class TestJMSJobEventListener extends XTestCase {
             services.init();
             JMSJobEventListener wfEventListener = new JMSJobEventListener();
             wfEventListener.init(conf);
-            BrokerService broker = new BrokerService();
+            broker = new BrokerService();
             broker.setDataDirectory(getTestCaseDir());
             broker.addConnector(brokerURl);
             broker.start();
@@ -360,15 +311,16 @@ public class TestJMSJobEventListener extends XTestCase {
             broker.stop();
             wfEventListener.destroy();
         }
-        catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
+        finally {
+            if (broker != null) {
+                broker.stop();
+            }
         }
 
     }
 
     @Test
-    public void testOnCoordinatorActionWaitingEvent() throws ParseException {
+    public void testOnCoordinatorActionWaitingEvent() throws Exception {
         JMSJobEventListener wfEventListner = new JMSJobEventListener();
         wfEventListner.init(conf);
         Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z");
@@ -376,36 +328,30 @@ public class TestJMSJobEventListener extends XTestCase {
         CoordinatorActionEvent cae = new CoordinatorActionEvent("caId1", "caJobId1", CoordinatorAction.Status.WAITING,
                 "user1", "wf-app-name1", nominalTime, startDate, "missingDep1");
         ConnectionContext jmsContext = getConnectionContext();
-        try {
-            Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
-            MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListner.getTopic(cae));
-            wfEventListner.onCoordinatorActionEvent(cae);
-            TextMessage message = (TextMessage) consumer.receive(5000);
-            assertFalse(message.getText().contains("endTime"));
-            assertFalse(message.getText().contains("errorCode"));
-            assertFalse(message.getText().contains("errorMessage"));
-            CoordinatorActionMessage coordActionWaitingMessage = JMSMessagingUtils
-                    .getEventMessage(message);
-            assertEquals(CoordinatorAction.Status.WAITING, coordActionWaitingMessage.getStatus());
-            assertEquals(startDate, coordActionWaitingMessage.getStartTime());
-            assertEquals(nominalTime, coordActionWaitingMessage.getNominalTime());
-            assertEquals("caJobId1", coordActionWaitingMessage.getParentId());
-            assertEquals("caId1", coordActionWaitingMessage.getId());
-            assertEquals(MessageType.JOB, coordActionWaitingMessage.getMessageType());
-            assertEquals(AppType.COORDINATOR_ACTION, coordActionWaitingMessage.getAppType());
-            assertEquals(EventStatus.WAITING, coordActionWaitingMessage.getEventStatus());
-            assertEquals("user1", coordActionWaitingMessage.getUser());
-            assertEquals("wf-app-name1", coordActionWaitingMessage.getAppName());
-            assertEquals("missingDep1", coordActionWaitingMessage.getMissingDependency());
-        }
-        catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
-        }
+        Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListner.getTopic(cae));
+        wfEventListner.onCoordinatorActionEvent(cae);
+        TextMessage message = (TextMessage) consumer.receive(5000);
+        assertFalse(message.getText().contains("endTime"));
+        assertFalse(message.getText().contains("errorCode"));
+        assertFalse(message.getText().contains("errorMessage"));
+        CoordinatorActionMessage coordActionWaitingMessage = JMSMessagingUtils
+                .getEventMessage(message);
+        assertEquals(CoordinatorAction.Status.WAITING, coordActionWaitingMessage.getStatus());
+        assertEquals(startDate, coordActionWaitingMessage.getStartTime());
+        assertEquals(nominalTime, coordActionWaitingMessage.getNominalTime());
+        assertEquals("caJobId1", coordActionWaitingMessage.getParentId());
+        assertEquals("caId1", coordActionWaitingMessage.getId());
+        assertEquals(MessageType.JOB, coordActionWaitingMessage.getMessageType());
+        assertEquals(AppType.COORDINATOR_ACTION, coordActionWaitingMessage.getAppType());
+        assertEquals(EventStatus.WAITING, coordActionWaitingMessage.getEventStatus());
+        assertEquals("user1", coordActionWaitingMessage.getUser());
+        assertEquals("wf-app-name1", coordActionWaitingMessage.getAppName());
+        assertEquals("missingDep1", coordActionWaitingMessage.getMissingDependency());
     }
 
     @Test
-    public void testOnCoordinatorActionStartEvent() throws ParseException {
+    public void testOnCoordinatorActionStartEvent() throws Exception {
         JMSJobEventListener coordEventListener = new JMSJobEventListener();
         coordEventListener.init(conf);
         Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z");
@@ -413,35 +359,29 @@ public class TestJMSJobEventListener extends XTestCase {
         CoordinatorActionEvent cae = new CoordinatorActionEvent("caId1", "caJobId1", CoordinatorAction.Status.RUNNING,
                 "user1", "wf-app-name1", nominalTime, startDate, null);
         ConnectionContext jmsContext = getConnectionContext();
-        try {
-            Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
-            MessageConsumer consumer = jmsContext.createConsumer(session, coordEventListener.getTopic(cae));
-            coordEventListener.onCoordinatorActionEvent(cae);
-            TextMessage message = (TextMessage) consumer.receive(5000);
-            assertFalse(message.getText().contains("endTime"));
-            assertFalse(message.getText().contains("errorCode"));
-            assertFalse(message.getText().contains("errorMessage"));
-            assertFalse(message.getText().contains("missingDependency"));
-            CoordinatorActionMessage coordActionStartMessage = JMSMessagingUtils
-                    .getEventMessage(message);
-            assertEquals(CoordinatorAction.Status.RUNNING, coordActionStartMessage.getStatus());
-            assertEquals(startDate, coordActionStartMessage.getStartTime());
-            assertEquals("caJobId1", coordActionStartMessage.getParentId());
-            assertEquals("caId1", coordActionStartMessage.getId());
-            assertEquals(MessageType.JOB, coordActionStartMessage.getMessageType());
-            assertEquals(AppType.COORDINATOR_ACTION, coordActionStartMessage.getAppType());
-            assertEquals(EventStatus.STARTED, coordActionStartMessage.getEventStatus());
-            assertEquals("user1", coordActionStartMessage.getUser());
-            assertEquals("wf-app-name1", coordActionStartMessage.getAppName());
-        }
-        catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
-        }
+        Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = jmsContext.createConsumer(session, coordEventListener.getTopic(cae));
+        coordEventListener.onCoordinatorActionEvent(cae);
+        TextMessage message = (TextMessage) consumer.receive(5000);
+        assertFalse(message.getText().contains("endTime"));
+        assertFalse(message.getText().contains("errorCode"));
+        assertFalse(message.getText().contains("errorMessage"));
+        assertFalse(message.getText().contains("missingDependency"));
+        CoordinatorActionMessage coordActionStartMessage = JMSMessagingUtils
+                .getEventMessage(message);
+        assertEquals(CoordinatorAction.Status.RUNNING, coordActionStartMessage.getStatus());
+        assertEquals(startDate, coordActionStartMessage.getStartTime());
+        assertEquals("caJobId1", coordActionStartMessage.getParentId());
+        assertEquals("caId1", coordActionStartMessage.getId());
+        assertEquals(MessageType.JOB, coordActionStartMessage.getMessageType());
+        assertEquals(AppType.COORDINATOR_ACTION, coordActionStartMessage.getAppType());
+        assertEquals(EventStatus.STARTED, coordActionStartMessage.getEventStatus());
+        assertEquals("user1", coordActionStartMessage.getUser());
+        assertEquals("wf-app-name1", coordActionStartMessage.getAppName());
     }
 
     @Test
-    public void testOnCoordinatorJobSuccessEvent() throws ParseException {
+    public void testOnCoordinatorJobSuccessEvent() throws Exception {
         JMSJobEventListener coordEventListener = new JMSJobEventListener();
         coordEventListener.init(conf);
         Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z");
@@ -451,35 +391,29 @@ public class TestJMSJobEventListener extends XTestCase {
                 CoordinatorAction.Status.SUCCEEDED, "user1", "wf-app-name1", nominalTime, startDate, null);
         cae.setEndTime(endDate);
         ConnectionContext jmsContext = getConnectionContext();
-        try {
-            Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
-            MessageConsumer consumer = jmsContext.createConsumer(session, coordEventListener.getTopic(cae));
-            coordEventListener.onCoordinatorActionEvent(cae);
-            TextMessage message = (TextMessage) consumer.receive(5000);
-            assertFalse(message.getText().contains("errorCode"));
-            assertFalse(message.getText().contains("errorMessage"));
-            assertFalse(message.getText().contains("missingDependency"));
-            CoordinatorActionMessage coordActionSuccessMessage = JMSMessagingUtils
-                    .getEventMessage(message);
-            assertEquals(CoordinatorAction.Status.SUCCEEDED, coordActionSuccessMessage.getStatus());
-            assertEquals(startDate, coordActionSuccessMessage.getStartTime());
-            assertEquals(endDate, coordActionSuccessMessage.getEndTime());
-            assertEquals("caJobId1", coordActionSuccessMessage.getParentId());
-            assertEquals("caId1", coordActionSuccessMessage.getId());
-            assertEquals(MessageType.JOB, coordActionSuccessMessage.getMessageType());
-            assertEquals(AppType.COORDINATOR_ACTION, coordActionSuccessMessage.getAppType());
-            assertEquals(EventStatus.SUCCESS, coordActionSuccessMessage.getEventStatus());
-            assertEquals("user1", coordActionSuccessMessage.getUser());
-            assertEquals("wf-app-name1", coordActionSuccessMessage.getAppName());
-        }
-        catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
-        }
+        Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = jmsContext.createConsumer(session, coordEventListener.getTopic(cae));
+        coordEventListener.onCoordinatorActionEvent(cae);
+        TextMessage message = (TextMessage) consumer.receive(5000);
+        assertFalse(message.getText().contains("errorCode"));
+        assertFalse(message.getText().contains("errorMessage"));
+        assertFalse(message.getText().contains("missingDependency"));
+        CoordinatorActionMessage coordActionSuccessMessage = JMSMessagingUtils
+                .getEventMessage(message);
+        assertEquals(CoordinatorAction.Status.SUCCEEDED, coordActionSuccessMessage.getStatus());
+        assertEquals(startDate, coordActionSuccessMessage.getStartTime());
+        assertEquals(endDate, coordActionSuccessMessage.getEndTime());
+        assertEquals("caJobId1", coordActionSuccessMessage.getParentId());
+        assertEquals("caId1", coordActionSuccessMessage.getId());
+        assertEquals(MessageType.JOB, coordActionSuccessMessage.getMessageType());
+        assertEquals(AppType.COORDINATOR_ACTION, coordActionSuccessMessage.getAppType());
+        assertEquals(EventStatus.SUCCESS, coordActionSuccessMessage.getEventStatus());
+        assertEquals("user1", coordActionSuccessMessage.getUser());
+        assertEquals("wf-app-name1", coordActionSuccessMessage.getAppName());
     }
 
     @Test
-    public void testOnCoordinatorJobFailureEvent() throws ParseException {
+    public void testOnCoordinatorJobFailureEvent() throws Exception {
         JMSJobEventListener coordEventListener = new JMSJobEventListener();
         coordEventListener.init(conf);
         Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z");
@@ -491,35 +425,29 @@ public class TestJMSJobEventListener extends XTestCase {
         cae.setErrorCode("E0101");
         cae.setErrorMessage("dummyError");
         ConnectionContext jmsContext = getConnectionContext();
-        try {
-            Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
-            MessageConsumer consumer = jmsContext.createConsumer(session, coordEventListener.getTopic(cae));
-            coordEventListener.onCoordinatorActionEvent(cae);
-            TextMessage message = (TextMessage) consumer.receive(5000);
-            assertFalse(message.getText().contains("missingDependency"));
-            CoordinatorActionMessage coordActionFailMessage = JMSMessagingUtils
-                    .getEventMessage(message);
-            assertEquals(CoordinatorAction.Status.FAILED, coordActionFailMessage.getStatus());
-            assertEquals(startDate, coordActionFailMessage.getStartTime());
-            assertEquals(endDate, coordActionFailMessage.getEndTime());
-            assertEquals("caJobId1", coordActionFailMessage.getParentId());
-            assertEquals("caId1", coordActionFailMessage.getId());
-            assertEquals(MessageType.JOB, coordActionFailMessage.getMessageType());
-            assertEquals(AppType.COORDINATOR_ACTION, coordActionFailMessage.getAppType());
-            assertEquals(EventStatus.FAILURE, coordActionFailMessage.getEventStatus());
-            assertEquals("user1", coordActionFailMessage.getUser());
-            assertEquals("wf-app-name1", coordActionFailMessage.getAppName());
-            assertEquals("E0101", coordActionFailMessage.getErrorCode());
-            assertEquals("dummyError", coordActionFailMessage.getErrorMessage());
-        }
-        catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
-        }
+        Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = jmsContext.createConsumer(session, coordEventListener.getTopic(cae));
+        coordEventListener.onCoordinatorActionEvent(cae);
+        TextMessage message = (TextMessage) consumer.receive(5000);
+        assertFalse(message.getText().contains("missingDependency"));
+        CoordinatorActionMessage coordActionFailMessage = JMSMessagingUtils
+                .getEventMessage(message);
+        assertEquals(CoordinatorAction.Status.FAILED, coordActionFailMessage.getStatus());
+        assertEquals(startDate, coordActionFailMessage.getStartTime());
+        assertEquals(endDate, coordActionFailMessage.getEndTime());
+        assertEquals("caJobId1", coordActionFailMessage.getParentId());
+        assertEquals("caId1", coordActionFailMessage.getId());
+        assertEquals(MessageType.JOB, coordActionFailMessage.getMessageType());
+        assertEquals(AppType.COORDINATOR_ACTION, coordActionFailMessage.getAppType());
+        assertEquals(EventStatus.FAILURE, coordActionFailMessage.getEventStatus());
+        assertEquals("user1", coordActionFailMessage.getUser());
+        assertEquals("wf-app-name1", coordActionFailMessage.getAppName());
+        assertEquals("E0101", coordActionFailMessage.getErrorCode());
+        assertEquals("dummyError", coordActionFailMessage.getErrorMessage());
     }
 
     @Test
-    public void testCoordinatorActionSelectors() throws ParseException {
+    public void testCoordinatorActionSelectors() throws Exception {
         JMSJobEventListener coordEventListener = new JMSJobEventListener();
         coordEventListener.init(conf);
         Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z");
@@ -527,26 +455,20 @@ public class TestJMSJobEventListener extends XTestCase {
         CoordinatorActionEvent cae = new CoordinatorActionEvent("caId1", "caJobId1", CoordinatorAction.Status.FAILED,
                 "user1", "wf-app-name1", nominalTime, startDate, null);
         ConnectionContext jmsContext = getConnectionContext();
-        try {
-            Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
-            String selector = JMSHeaderConstants.USER + "='user1'";
-            MessageConsumer consumer = jmsContext.createConsumer(session, coordEventListener.getTopic(cae), selector);
-            coordEventListener.onCoordinatorActionEvent(cae);
-            TextMessage message = (TextMessage) consumer.receive(5000);
-            CoordinatorActionMessage coordActionFailMessage = JMSMessagingUtils
-                    .getEventMessage(message);
-            Assert.assertEquals(CoordinatorAction.Status.FAILED, coordActionFailMessage.getStatus());
-            assertEquals("user1", coordActionFailMessage.getUser());
-            assertEquals(MessageType.JOB, coordActionFailMessage.getMessageType());
-        }
-        catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
-        }
+        Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
+        String selector = JMSHeaderConstants.USER + "='user1'";
+        MessageConsumer consumer = jmsContext.createConsumer(session, coordEventListener.getTopic(cae), selector);
+        coordEventListener.onCoordinatorActionEvent(cae);
+        TextMessage message = (TextMessage) consumer.receive(5000);
+        CoordinatorActionMessage coordActionFailMessage = JMSMessagingUtils
+                .getEventMessage(message);
+        Assert.assertEquals(CoordinatorAction.Status.FAILED, coordActionFailMessage.getStatus());
+        assertEquals("user1", coordActionFailMessage.getUser());
+        assertEquals(MessageType.JOB, coordActionFailMessage.getMessageType());
     }
 
     @Test
-    public void testCoordinatorActionSelectorsNegative() throws ParseException {
+    public void testCoordinatorActionSelectorsNegative() throws Exception {
         JMSJobEventListener coordEventListener = new JMSJobEventListener();
         coordEventListener.init(conf);
         Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z");
@@ -554,19 +476,13 @@ public class TestJMSJobEventListener extends XTestCase {
         CoordinatorActionEvent cae = new CoordinatorActionEvent("caId1", "caJobId1", CoordinatorAction.Status.FAILED,
                 "user1", "wf-app-name1", nominalTime, startDate, null);
         ConnectionContext jmsContext = getConnectionContext();
-        try {
-            Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
-            // Pass a selector which wont match and assert for null message
-            String selector = JMSHeaderConstants.USER + "='Non_matching_user'";
-            MessageConsumer consumer = jmsContext.createConsumer(session, coordEventListener.getTopic(cae), selector);
-            coordEventListener.onCoordinatorActionEvent(cae);
-            TextMessage message = (TextMessage) consumer.receive(5000);
-            assertNull(message);
-        }
-        catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
-        }
+        Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
+        // Pass a selector which wont match and assert for null message
+        String selector = JMSHeaderConstants.USER + "='Non_matching_user'";
+        MessageConsumer consumer = jmsContext.createConsumer(session, coordEventListener.getTopic(cae), selector);
+        coordEventListener.onCoordinatorActionEvent(cae);
+        TextMessage message = (TextMessage) consumer.receive(5000);
+        assertNull(message);
     }
 
     private ConnectionContext getConnectionContext() {

http://git-wip-us.apache.org/repos/asf/oozie/blob/117153a9/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 6dbe922..060f4fb 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 5.1.0 release (trunk - unreleased)
 
+OOZIE-3246 Flaky test TestJMSJobEventListener#testConnectionDrop (pbacsko via gezapeti)
 OOZIE-3236 Fix flaky test TestHiveActionExecutor#testHiveAction (pbacsko via gezapeti)
 OOZIE-3235 Upgrade ActiveMQ to 5.15.3 (matijhs via andras.piros)
 OOZIE-3217 Enable definition of admin users using oozie-site.xml (orova via andras.piros)


[2/2] oozie git commit: OOZIE-3240 Flaky test TestJMSAccessorService#testConnectionRetry (pbacsko via gezapeti)

Posted by ge...@apache.org.
OOZIE-3240 Flaky test TestJMSAccessorService#testConnectionRetry (pbacsko via gezapeti)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/61c646c3
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/61c646c3
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/61c646c3

Branch: refs/heads/master
Commit: 61c646c332e6129f3502ee235e5e6d28fe55addd
Parents: 117153a
Author: Gezapeti Cseh <ge...@apache.org>
Authored: Wed May 16 13:52:57 2018 +0200
Committer: Gezapeti Cseh <ge...@apache.org>
Committed: Wed May 16 13:52:57 2018 +0200

----------------------------------------------------------------------
 .../oozie/service/TestJMSAccessorService.java   | 267 ++++++++++---------
 release-log.txt                                 |   1 +
 2 files changed, 137 insertions(+), 131 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/61c646c3/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java b/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java
index 41241d2..dbf892e 100644
--- a/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java
@@ -19,6 +19,7 @@
 package org.apache.oozie.service;
 
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.Random;
 
 import javax.jms.Session;
@@ -36,6 +37,7 @@ import org.junit.Test;
 public class TestJMSAccessorService extends XTestCase {
     private Services services;
     private static Random random = new Random();
+    private static final int JMS_TIMEOUT_MS = 5000;
 
     @Override
     protected void setUp() throws Exception {
@@ -66,89 +68,63 @@ public class TestJMSAccessorService extends XTestCase {
     }
 
     @Test
-    public void testRegisterSingleConsumerPerTopic() {
-
-        try {
-            HCatAccessorService hcatService = services.get(HCatAccessorService.class);
-            JMSAccessorService jmsService = services.get(JMSAccessorService.class);
-            String server = "hcat.server.com:5080";
-            String topic = "hcat.mydb.mytable";
-
-            JMSConnectionInfo connInfo = hcatService.getJMSConnectionInfo(new URI("hcat://hcat.server.com:8020"));
-            jmsService.registerForNotification(connInfo, topic, new HCatMessageHandler(server));
+    public void testRegisterSingleConsumerPerTopic() throws URISyntaxException {
+        HCatAccessorService hcatService = services.get(HCatAccessorService.class);
+        JMSAccessorService jmsService = services.get(JMSAccessorService.class);
+        String server = "hcat.server.com:5080";
+        String topic = "hcat.mydb.mytable";
 
-            MessageReceiver receiver1 = jmsService.getMessageReceiver(connInfo, topic);
-            jmsService.registerForNotification(connInfo, topic, new HCatMessageHandler(server));
+        JMSConnectionInfo connInfo = hcatService.getJMSConnectionInfo(new URI("hcat://hcat.server.com:8020"));
+        jmsService.registerForNotification(connInfo, topic, new HCatMessageHandler(server));
 
-            MessageReceiver receiver2 = jmsService.getMessageReceiver(connInfo, topic);
-            assertEquals(receiver1, receiver2);
-        }
-        catch (Exception e) {
-            e.printStackTrace();
-            fail("Exception encountered : " + e);
-        }
+        MessageReceiver receiver1 = jmsService.getMessageReceiver(connInfo, topic);
+        jmsService.registerForNotification(connInfo, topic, new HCatMessageHandler(server));
 
+        MessageReceiver receiver2 = jmsService.getMessageReceiver(connInfo, topic);
+        assertEquals(receiver1, receiver2);
     }
 
     @Test
-    public void testUnRegisterTopic() {
-
-        try {
-            HCatAccessorService hcatService = services.get(HCatAccessorService.class);
-            JMSAccessorService jmsService = services.get(JMSAccessorService.class);
-            String server = "hcat.server.com:5080";
-            String topic = "hcatalog.mydb.mytable";
-
-            JMSConnectionInfo connInfo = hcatService.getJMSConnectionInfo(new URI("hcat://hcat.server.com:8020"));
-            jmsService.registerForNotification(connInfo, topic, new HCatMessageHandler(server));
+    public void testUnRegisterTopic() throws URISyntaxException {
+        HCatAccessorService hcatService = services.get(HCatAccessorService.class);
+        JMSAccessorService jmsService = services.get(JMSAccessorService.class);
+        String server = "hcat.server.com:5080";
+        String topic = "hcatalog.mydb.mytable";
 
-            MessageReceiver receiver1 = jmsService.getMessageReceiver(connInfo, topic);
-            assertNotNull(receiver1);
+        JMSConnectionInfo connInfo = hcatService.getJMSConnectionInfo(new URI("hcat://hcat.server.com:8020"));
+        jmsService.registerForNotification(connInfo, topic, new HCatMessageHandler(server));
 
-            jmsService.unregisterFromNotification(connInfo, topic);
+        MessageReceiver receiver1 = jmsService.getMessageReceiver(connInfo, topic);
+        assertNotNull(receiver1);
 
-            receiver1 = jmsService.getMessageReceiver(connInfo, topic);
-            assertEquals(null, receiver1);
-        }
-        catch (Exception e) {
-            e.printStackTrace();
-            fail("Exception encountered : " + e);
-        }
+        jmsService.unregisterFromNotification(connInfo, topic);
 
+        receiver1 = jmsService.getMessageReceiver(connInfo, topic);
+        assertEquals(null, receiver1);
     }
 
     @Test
-    public void testConnectionContext() throws ServiceException {
-        try {
-            services.destroy();
-            services = super.setupServicesForHCatalog();
-            Configuration conf = services.getConf();
-            // set the connection factory name
-            String jmsURL = "hcat://${1}.${2}.server.com:8020=java.naming.factory.initial#" +
-                    "org.apache.activemq.jndi.ActiveMQInitialContextFactory" +
-                    ";java.naming.provider.url#vm://localhost?broker.persistent=false;" +
-                    "connectionFactoryNames#dynamicFactories/hcat.prod.${1}";
-            conf.set(HCatAccessorService.JMS_CONNECTIONS_PROPERTIES, jmsURL);
-            services.init();
-            HCatAccessorService hcatService = services.get(HCatAccessorService.class);
-            JMSConnectionInfo connInfo = hcatService.getJMSConnectionInfo(new URI("hcat://hcatserver.blue.server.com:8020"));
-            assertEquals(
-                    "java.naming.factory.initial#org.apache.activemq.jndi.ActiveMQInitialContextFactory;java.naming.provider.url#" +
-                    "vm://localhost?broker.persistent=false;connectionFactoryNames#dynamicFactories/hcat.prod.hcatserver",
-                    connInfo.getJNDIPropertiesString());
-
-            ConnectionContext ctx1 = new DefaultConnectionContext();
-            ctx1.createConnection(connInfo.getJNDIProperties());
-            BrokerService broker = new BrokerService();
-            broker.setDataDirectory(getTestCaseDir());
-            // Without this stop testConnectionRetry fails with
-            // javax.management.InstanceAlreadyExistsException: org.apache.activemq:BrokerName=localhost,Type=Broker
-            broker.stop();
-        }
-        catch (Exception e) {
-            e.printStackTrace();
-            fail("Unexpected exception " + e);
-        }
+    public void testConnectionContext() throws Exception {
+        services.destroy();
+        services = super.setupServicesForHCatalog();
+        Configuration conf = services.getConf();
+        // set the connection factory name
+        String jmsURL = "hcat://${1}.${2}.server.com:8020=java.naming.factory.initial#" +
+                "org.apache.activemq.jndi.ActiveMQInitialContextFactory" +
+                ";java.naming.provider.url#vm://localhost?broker.persistent=false;" +
+                "connectionFactoryNames#dynamicFactories/hcat.prod.${1}";
+        conf.set(HCatAccessorService.JMS_CONNECTIONS_PROPERTIES, jmsURL);
+        services.init();
+        HCatAccessorService hcatService = services.get(HCatAccessorService.class);
+        JMSConnectionInfo connInfo = hcatService.getJMSConnectionInfo(new URI("hcat://hcatserver.blue.server.com:8020"));
+        assertEquals(
+                "java.naming.factory.initial#org.apache.activemq.jndi.ActiveMQInitialContextFactory;java.naming.provider.url#" +
+                        "vm://localhost?broker.persistent=false;connectionFactoryNames#dynamicFactories/hcat.prod.hcatserver",
+                        connInfo.getJNDIPropertiesString());
+
+        ConnectionContext ctx = new DefaultConnectionContext();
+        ctx.createConnection(connInfo.getJNDIProperties());
+        ctx.close();
     }
 
     @Test
@@ -176,69 +152,94 @@ public class TestJMSAccessorService extends XTestCase {
         assertTrue(jmsService.isTopicInRetryList(connInfo, topic));
         // Start the broker and check if listening to topic now
         BrokerService broker = new BrokerService();
-        broker.addConnector(brokerURl);
-        broker.setDataDirectory(getTestCaseDir());
-        broker.start();
-        Thread.sleep(1000);
-        assertTrue(jmsService.isListeningToTopic(connInfo, topic));
-        assertFalse(jmsService.isConnectionInRetryList(connInfo));
-        assertFalse(jmsService.isTopicInRetryList(connInfo, topic));
-        broker.stop();
-        jmsService.destroy();
-
+        try {
+            broker.addConnector(brokerURl);
+            broker.setDataDirectory(getTestCaseDir());
+            broker.start();
+
+            waitFor(JMS_TIMEOUT_MS, new Predicate() {
+                @Override
+                public boolean evaluate() throws Exception {
+                    return jmsService.isListeningToTopic(connInfo, topic);
+                }
+            });
+            assertTrue(jmsService.isListeningToTopic(connInfo, topic));
+            assertFalse(jmsService.isConnectionInRetryList(connInfo));
+            assertFalse(jmsService.isTopicInRetryList(connInfo, topic));
+        } finally {
+            broker.stop();
+        }
     }
 
     @Test
     public void testConnectionRetryExceptionListener() throws Exception {
-        services.destroy();
-        services = super.setupServicesForHCatalog();
-        int randomPort = 30000 + random.nextInt(10000);
-        String brokerURL = "tcp://localhost:" + randomPort;
-        String jndiPropertiesString = "java.naming.factory.initial#" + ActiveMQConnFactory + ";"
-                + "java.naming.provider.url#" + brokerURL + ";" + "connectionFactoryNames#" + "ConnectionFactory";
-        Configuration servicesConf = services.getConf();
-        servicesConf.set(JMSAccessorService.CONF_RETRY_INITIAL_DELAY, "1");
-        servicesConf.set(JMSAccessorService.CONF_RETRY_MAX_ATTEMPTS, "3");
-        servicesConf.set(HCatAccessorService.JMS_CONNECTIONS_PROPERTIES, "default=" + jndiPropertiesString);
-        services.init();
-        HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
-        JMSAccessorService jmsService = Services.get().get(JMSAccessorService.class);
-
-        String publisherAuthority = "hcat.server.com:5080";
-        String topic = "topic.topic1";
-        // Start the broker
-        BrokerService broker = new BrokerService();
-        broker.addConnector(brokerURL);
-        broker.setDataDirectory(getTestCaseDir());
-        broker.start();
-        JMSConnectionInfo connInfo = hcatService.getJMSConnectionInfo(new URI("hcat://hcat.server.com:8020"));
-        jmsService.registerForNotification(connInfo, topic, new HCatMessageHandler(publisherAuthority));
-        assertTrue(jmsService.isListeningToTopic(connInfo, topic));
-        assertFalse(jmsService.isConnectionInRetryList(connInfo));
-        assertFalse(jmsService.isTopicInRetryList(connInfo, topic));
-        ConnectionContext connCtxt = jmsService.createConnectionContext(connInfo);
-        broker.stop();
+        BrokerService broker = null;
         try {
-            connCtxt.createSession(Session.AUTO_ACKNOWLEDGE);
-            fail("Exception expected");
-        }
-        catch (Exception e) {
-            Thread.sleep(100);
-            assertFalse(jmsService.isListeningToTopic(connInfo, topic));
-            assertTrue(jmsService.isConnectionInRetryList(connInfo));
-            assertTrue(jmsService.isTopicInRetryList(connInfo, topic));
-        }
-        broker = new BrokerService();
-        broker.addConnector(brokerURL);
-        broker.setDataDirectory(getTestCaseDir());
-        broker.start();
-        Thread.sleep(1000);
-        assertTrue(jmsService.isListeningToTopic(connInfo, topic));
-        assertFalse(jmsService.isConnectionInRetryList(connInfo));
-        assertFalse(jmsService.isTopicInRetryList(connInfo, topic));
-        broker.stop();
-        jmsService.destroy();
+            services.destroy();
+            services = super.setupServicesForHCatalog();
+            int randomPort = 30000 + random.nextInt(10000);
+            String brokerURL = "tcp://localhost:" + randomPort;
+            String jndiPropertiesString = "java.naming.factory.initial#" + ActiveMQConnFactory + ";"
+                    + "java.naming.provider.url#" + brokerURL + ";" + "connectionFactoryNames#" + "ConnectionFactory";
+            Configuration servicesConf = services.getConf();
+            servicesConf.set(JMSAccessorService.CONF_RETRY_INITIAL_DELAY, "1");
+            servicesConf.set(JMSAccessorService.CONF_RETRY_MAX_ATTEMPTS, "3");
+            servicesConf.set(HCatAccessorService.JMS_CONNECTIONS_PROPERTIES, "default=" + jndiPropertiesString);
+            services.init();
+            HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
+            JMSAccessorService jmsService = Services.get().get(JMSAccessorService.class);
+
+            String publisherAuthority = "hcat.server.com:5080";
+            String topic = "topic.topic1";
+            // Start the broker
+            broker = new BrokerService();
+            broker.addConnector(brokerURL);
+            broker.setDataDirectory(getTestCaseDir());
+            broker.start();
+            JMSConnectionInfo connInfo = hcatService.getJMSConnectionInfo(new URI("hcat://hcat.server.com:8020"));
+            jmsService.registerForNotification(connInfo, topic, new HCatMessageHandler(publisherAuthority));
+            assertTrue(jmsService.isListeningToTopic(connInfo, topic));
+            assertFalse(jmsService.isConnectionInRetryList(connInfo));
+            assertFalse(jmsService.isTopicInRetryList(connInfo, topic));
+            ConnectionContext connCtxt = jmsService.createConnectionContext(connInfo);
+            broker.stop();
+
+            try {
+                connCtxt.createSession(Session.AUTO_ACKNOWLEDGE);
+                fail("Exception expected");
+            }
+            catch (Exception e) {
+                waitFor(JMS_TIMEOUT_MS, new Predicate() {
+                    @Override
+                    public boolean evaluate() throws Exception {
+                        return !jmsService.isListeningToTopic(connInfo, topic);
+                    }
+                });
+                assertFalse(jmsService.isListeningToTopic(connInfo, topic));
+                assertTrue(jmsService.isConnectionInRetryList(connInfo));
+                assertTrue(jmsService.isTopicInRetryList(connInfo, topic));
+            }
+            broker = new BrokerService();
+
+            broker.addConnector(brokerURL);
+            broker.setDataDirectory(getTestCaseDir());
+            broker.start();
+            waitFor(JMS_TIMEOUT_MS, new Predicate() {
+                @Override
+                public boolean evaluate() throws Exception {
+                    return jmsService.isListeningToTopic(connInfo, topic);
+                }
+            });
+            assertTrue(jmsService.isListeningToTopic(connInfo, topic));
+            assertFalse(jmsService.isConnectionInRetryList(connInfo));
+            assertFalse(jmsService.isTopicInRetryList(connInfo, topic));
 
+            broker.stop();
+        } finally {
+            if (broker != null) {
+                broker.stop();
+            }
+        }
     }
 
     @Test
@@ -258,18 +259,22 @@ public class TestJMSAccessorService extends XTestCase {
         String publisherAuthority = "hcat.server.com:5080";
         String topic = "topic.topic1";
         JMSConnectionInfo connInfo = hcatService.getJMSConnectionInfo(new URI("hcat://hcat.server.com:8020"));
+
         jmsService.registerForNotification(connInfo, topic, new HCatMessageHandler(publisherAuthority));
         assertTrue(jmsService.isConnectionInRetryList(connInfo));
         assertTrue(jmsService.isTopicInRetryList(connInfo, topic));
         assertFalse(jmsService.isListeningToTopic(connInfo, topic));
-        Thread.sleep(1100);
+        waitFor(JMS_TIMEOUT_MS, new Predicate() {
+            @Override
+            public boolean evaluate() throws Exception {
+                return jmsService.getNumConnectionAttempts(connInfo) == 1;
+            }
+        });
+
         // Should not retry again as max attempt is 1
         assertTrue(jmsService.isConnectionInRetryList(connInfo));
         assertTrue(jmsService.isTopicInRetryList(connInfo, topic));
         assertFalse(jmsService.isListeningToTopic(connInfo, topic));
-        assertEquals(1, jmsService.getNumConnectionAttempts(connInfo));
         assertFalse(jmsService.retryConnection(connInfo));
-        jmsService.destroy();
     }
-
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/61c646c3/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 060f4fb..fd7bd76 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 5.1.0 release (trunk - unreleased)
 
+OOZIE-3240 Flaky test TestJMSAccessorService#testConnectionRetry (pbacsko via gezapeti)
 OOZIE-3246 Flaky test TestJMSJobEventListener#testConnectionDrop (pbacsko via gezapeti)
 OOZIE-3236 Fix flaky test TestHiveActionExecutor#testHiveAction (pbacsko via gezapeti)
 OOZIE-3235 Upgrade ActiveMQ to 5.15.3 (matijhs via andras.piros)