You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by ge...@apache.org on 2018/05/16 11:53:15 UTC
[1/2] oozie git commit: OOZIE-3246 Flaky test
TestJMSJobEventListener#testConnectionDrop (pbacsko via gezapeti)
Repository: oozie
Updated Branches:
refs/heads/master 88aa654d6 -> 61c646c33
OOZIE-3246 Flaky test TestJMSJobEventListener#testConnectionDrop (pbacsko via gezapeti)
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/117153a9
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/117153a9
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/117153a9
Branch: refs/heads/master
Commit: 117153a90f7a7b42fbdaa252b8ec7821e6ef3d29
Parents: 88aa654
Author: Gezapeti Cseh <ge...@apache.org>
Authored: Wed May 16 13:52:10 2018 +0200
Committer: Gezapeti Cseh <ge...@apache.org>
Committed: Wed May 16 13:52:10 2018 +0200
----------------------------------------------------------------------
.../oozie/jms/TestJMSJobEventListener.java | 534 ++++++++-----------
release-log.txt | 1 +
2 files changed, 226 insertions(+), 309 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/117153a9/core/src/test/java/org/apache/oozie/jms/TestJMSJobEventListener.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/jms/TestJMSJobEventListener.java b/core/src/test/java/org/apache/oozie/jms/TestJMSJobEventListener.java
index f375dec..8e8db51 100644
--- a/core/src/test/java/org/apache/oozie/jms/TestJMSJobEventListener.java
+++ b/core/src/test/java/org/apache/oozie/jms/TestJMSJobEventListener.java
@@ -18,7 +18,6 @@
package org.apache.oozie.jms;
-import java.text.ParseException;
import java.util.Date;
import java.util.Random;
@@ -77,7 +76,7 @@ public class TestJMSJobEventListener extends XTestCase {
}
@Test
- public void testOnWorkflowJobStartedEvent() throws ParseException {
+ public void testOnWorkflowJobStartedEvent() throws Exception {
JMSJobEventListener wfEventListener = new JMSJobEventListener();
wfEventListener.init(conf);
Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z");
@@ -85,32 +84,26 @@ public class TestJMSJobEventListener extends XTestCase {
"wf-app-name1", startDate, null);
ConnectionContext jmsContext = getConnectionContext();
- try {
- Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe));
- wfEventListener.onWorkflowJobEvent(wfe);
- TextMessage message = (TextMessage) consumer.receive(5000);
- assertFalse(message.getText().contains("endTime"));
- WorkflowJobMessage wfStartMessage = JMSMessagingUtils.getEventMessage(message);
- assertEquals(WorkflowJob.Status.RUNNING, wfStartMessage.getStatus());
- assertEquals(startDate, wfStartMessage.getStartTime());
- assertEquals("wfId1", wfStartMessage.getId());
- assertEquals("caId1", wfStartMessage.getParentId());
- assertEquals(MessageType.JOB, wfStartMessage.getMessageType());
- assertEquals(AppType.WORKFLOW_JOB, wfStartMessage.getAppType());
- assertEquals(EventStatus.STARTED, wfStartMessage.getEventStatus());
- assertEquals("user1", wfStartMessage.getUser());
- assertEquals("wf-app-name1", wfStartMessage.getAppName());
- wfEventListener.destroy();
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe));
+ wfEventListener.onWorkflowJobEvent(wfe);
+ TextMessage message = (TextMessage) consumer.receive(5000);
+ assertFalse(message.getText().contains("endTime"));
+ WorkflowJobMessage wfStartMessage = JMSMessagingUtils.getEventMessage(message);
+ assertEquals(WorkflowJob.Status.RUNNING, wfStartMessage.getStatus());
+ assertEquals(startDate, wfStartMessage.getStartTime());
+ assertEquals("wfId1", wfStartMessage.getId());
+ assertEquals("caId1", wfStartMessage.getParentId());
+ assertEquals(MessageType.JOB, wfStartMessage.getMessageType());
+ assertEquals(AppType.WORKFLOW_JOB, wfStartMessage.getAppType());
+ assertEquals(EventStatus.STARTED, wfStartMessage.getEventStatus());
+ assertEquals("user1", wfStartMessage.getUser());
+ assertEquals("wf-app-name1", wfStartMessage.getAppName());
+ wfEventListener.destroy();
}
@Test
- public void testOnWorkflowJobSuccessEvent() throws ParseException {
+ public void testOnWorkflowJobSuccessEvent() throws Exception {
JMSJobEventListener wfEventListener = new JMSJobEventListener();
wfEventListener.init(conf);
Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z");
@@ -119,33 +112,26 @@ public class TestJMSJobEventListener extends XTestCase {
"wf-app-name1", startDate, endDate);
ConnectionContext jmsContext = getConnectionContext();
- try {
- Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe));
- wfEventListener.onWorkflowJobEvent(wfe);
- TextMessage message = (TextMessage) consumer.receive(5000);
- WorkflowJobMessage wfSuccMessage = JMSMessagingUtils.getEventMessage(message);
- assertEquals(WorkflowJob.Status.SUCCEEDED, wfSuccMessage.getStatus());
- assertEquals(startDate, wfSuccMessage.getStartTime());
- assertEquals(endDate, wfSuccMessage.getEndTime());
- assertEquals("wfId1", wfSuccMessage.getId());
- assertEquals("caId1", wfSuccMessage.getParentId());
- assertEquals(MessageType.JOB, wfSuccMessage.getMessageType());
- assertEquals(AppType.WORKFLOW_JOB, wfSuccMessage.getAppType());
- assertEquals(EventStatus.SUCCESS, wfSuccMessage.getEventStatus());
- assertEquals("user1", wfSuccMessage.getUser());
- assertEquals("wf-app-name1", wfSuccMessage.getAppName());
- wfEventListener.destroy();
-
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe));
+ wfEventListener.onWorkflowJobEvent(wfe);
+ TextMessage message = (TextMessage) consumer.receive(5000);
+ WorkflowJobMessage wfSuccMessage = JMSMessagingUtils.getEventMessage(message);
+ assertEquals(WorkflowJob.Status.SUCCEEDED, wfSuccMessage.getStatus());
+ assertEquals(startDate, wfSuccMessage.getStartTime());
+ assertEquals(endDate, wfSuccMessage.getEndTime());
+ assertEquals("wfId1", wfSuccMessage.getId());
+ assertEquals("caId1", wfSuccMessage.getParentId());
+ assertEquals(MessageType.JOB, wfSuccMessage.getMessageType());
+ assertEquals(AppType.WORKFLOW_JOB, wfSuccMessage.getAppType());
+ assertEquals(EventStatus.SUCCESS, wfSuccMessage.getEventStatus());
+ assertEquals("user1", wfSuccMessage.getUser());
+ assertEquals("wf-app-name1", wfSuccMessage.getAppName());
+ wfEventListener.destroy();
}
@Test
- public void testOnWorkflowJobFailureEvent() throws ParseException {
+ public void testOnWorkflowJobFailureEvent() throws Exception {
JMSJobEventListener wfEventListener = new JMSJobEventListener();
wfEventListener.init(conf);
Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z");
@@ -155,171 +141,136 @@ public class TestJMSJobEventListener extends XTestCase {
wfe.setErrorCode("dummyErrorCode");
wfe.setErrorMessage("dummyErrorMessage");
ConnectionContext jmsContext = getConnectionContext();
- try {
- Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe));
- wfEventListener.onWorkflowJobEvent(wfe);
- TextMessage message = (TextMessage) consumer.receive(5000);
- WorkflowJobMessage wfFailMessage = JMSMessagingUtils.getEventMessage(message);
- assertEquals(WorkflowJob.Status.FAILED, wfFailMessage.getStatus());
- assertEquals(startDate, wfFailMessage.getStartTime());
- assertEquals(endDate, wfFailMessage.getEndTime());
- assertEquals("wfId1", wfFailMessage.getId());
- assertEquals("caId1", wfFailMessage.getParentId());
- assertEquals(MessageType.JOB, wfFailMessage.getMessageType());
- assertEquals(AppType.WORKFLOW_JOB, wfFailMessage.getAppType());
- assertEquals(EventStatus.FAILURE, wfFailMessage.getEventStatus());
- assertEquals("user1", wfFailMessage.getUser());
- assertEquals("wf-app-name1", wfFailMessage.getAppName());
- assertEquals("dummyErrorCode", wfFailMessage.getErrorCode());
- assertEquals("dummyErrorMessage", wfFailMessage.getErrorMessage());
- wfEventListener.destroy();
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe));
+ wfEventListener.onWorkflowJobEvent(wfe);
+ TextMessage message = (TextMessage) consumer.receive(5000);
+ WorkflowJobMessage wfFailMessage = JMSMessagingUtils.getEventMessage(message);
+ assertEquals(WorkflowJob.Status.FAILED, wfFailMessage.getStatus());
+ assertEquals(startDate, wfFailMessage.getStartTime());
+ assertEquals(endDate, wfFailMessage.getEndTime());
+ assertEquals("wfId1", wfFailMessage.getId());
+ assertEquals("caId1", wfFailMessage.getParentId());
+ assertEquals(MessageType.JOB, wfFailMessage.getMessageType());
+ assertEquals(AppType.WORKFLOW_JOB, wfFailMessage.getAppType());
+ assertEquals(EventStatus.FAILURE, wfFailMessage.getEventStatus());
+ assertEquals("user1", wfFailMessage.getUser());
+ assertEquals("wf-app-name1", wfFailMessage.getAppName());
+ assertEquals("dummyErrorCode", wfFailMessage.getErrorCode());
+ assertEquals("dummyErrorMessage", wfFailMessage.getErrorMessage());
+ wfEventListener.destroy();
}
@Test
- public void testOnWorkflowJobSuspendEvent() throws ParseException {
+ public void testOnWorkflowJobSuspendEvent() throws Exception {
JMSJobEventListener wfEventListener = new JMSJobEventListener();
wfEventListener.init(conf);
Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z");
WorkflowJobEvent wfe = new WorkflowJobEvent("wfId1", "caId1", WorkflowJob.Status.SUSPENDED, "user1",
"wf-app-name1", startDate, null);
ConnectionContext jmsContext = getConnectionContext();
- try {
- Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe));
- wfEventListener.onWorkflowJobEvent(wfe);
- TextMessage message = (TextMessage) consumer.receive(5000);
- assertFalse(message.getText().contains("endTime"));
- WorkflowJobMessage wfFailMessage = JMSMessagingUtils.getEventMessage(message);
- assertEquals(WorkflowJob.Status.SUSPENDED, wfFailMessage.getStatus());
- assertEquals(startDate, wfFailMessage.getStartTime());
- assertEquals("wfId1", wfFailMessage.getId());
- assertEquals("caId1", wfFailMessage.getParentId());
- assertEquals(MessageType.JOB, wfFailMessage.getMessageType());
- assertEquals(AppType.WORKFLOW_JOB, wfFailMessage.getAppType());
- assertEquals(EventStatus.SUSPEND, wfFailMessage.getEventStatus());
- assertEquals("user1", wfFailMessage.getUser());
- assertEquals("wf-app-name1", wfFailMessage.getAppName());
- assertNull(wfFailMessage.getErrorCode());
- assertNull(wfFailMessage.getErrorMessage());
- wfEventListener.destroy();
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe));
+ wfEventListener.onWorkflowJobEvent(wfe);
+ TextMessage message = (TextMessage) consumer.receive(5000);
+ assertFalse(message.getText().contains("endTime"));
+ WorkflowJobMessage wfFailMessage = JMSMessagingUtils.getEventMessage(message);
+ assertEquals(WorkflowJob.Status.SUSPENDED, wfFailMessage.getStatus());
+ assertEquals(startDate, wfFailMessage.getStartTime());
+ assertEquals("wfId1", wfFailMessage.getId());
+ assertEquals("caId1", wfFailMessage.getParentId());
+ assertEquals(MessageType.JOB, wfFailMessage.getMessageType());
+ assertEquals(AppType.WORKFLOW_JOB, wfFailMessage.getAppType());
+ assertEquals(EventStatus.SUSPEND, wfFailMessage.getEventStatus());
+ assertEquals("user1", wfFailMessage.getUser());
+ assertEquals("wf-app-name1", wfFailMessage.getAppName());
+ assertNull(wfFailMessage.getErrorCode());
+ assertNull(wfFailMessage.getErrorMessage());
+ wfEventListener.destroy();
}
@Test
- public void testWorkflowJobSelectors() throws ParseException {
+ public void testWorkflowJobSelectors() throws Exception {
JMSJobEventListener wfEventListener = new JMSJobEventListener();
wfEventListener.init(conf);
WorkflowJobEvent wfe = new WorkflowJobEvent("wfId1", "caId1", WorkflowJob.Status.FAILED, "user_1",
"wf-app-name1", new Date(), new Date());
ConnectionContext jmsContext = getConnectionContext();
- try {
- Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
- String selector = JMSHeaderConstants.USER + "='user_1'";
- MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe), selector);
- wfEventListener.onWorkflowJobEvent(wfe);
- TextMessage message = (TextMessage) consumer.receive(5000);
- WorkflowJobMessage wfFailMessage = JMSMessagingUtils.getEventMessage(message);
- Assert.assertEquals(WorkflowJob.Status.FAILED, wfFailMessage.getStatus());
- assertEquals("user_1", wfFailMessage.getUser());
- assertEquals(MessageType.JOB, wfFailMessage.getMessageType());
- wfEventListener.destroy();
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
+ String selector = JMSHeaderConstants.USER + "='user_1'";
+ MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe), selector);
+ wfEventListener.onWorkflowJobEvent(wfe);
+ TextMessage message = (TextMessage) consumer.receive(5000);
+ WorkflowJobMessage wfFailMessage = JMSMessagingUtils.getEventMessage(message);
+ Assert.assertEquals(WorkflowJob.Status.FAILED, wfFailMessage.getStatus());
+ assertEquals("user_1", wfFailMessage.getUser());
+ assertEquals(MessageType.JOB, wfFailMessage.getMessageType());
+ wfEventListener.destroy();
}
@Test
- public void testWorkflowJobSelectorsNegative() {
+ public void testWorkflowJobSelectorsNegative() throws Exception {
JMSJobEventListener wfEventListener = new JMSJobEventListener();
wfEventListener.init(conf);
WorkflowJobEvent wfe = new WorkflowJobEvent("wfId1", "caId1", WorkflowJob.Status.FAILED, "user1",
"wf-app-name1", new Date(), new Date());
ConnectionContext jmsContext = getConnectionContext();
- try {
- Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
- // Pass a selector which wont match and assert for null message
- String selector = JMSHeaderConstants.USER + "='Non_matching_user'";
- MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe), selector);
- wfEventListener.onWorkflowJobEvent(wfe);
- TextMessage message = (TextMessage) consumer.receive(5000);
- assertNull(message);
- wfEventListener.destroy();
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
+ // Pass a selector which wont match and assert for null message
+ String selector = JMSHeaderConstants.USER + "='Non_matching_user'";
+ MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe), selector);
+ wfEventListener.onWorkflowJobEvent(wfe);
+ TextMessage message = (TextMessage) consumer.receive(5000);
+ assertNull(message);
+ wfEventListener.destroy();
}
@Test
- public void testWorkflowJobSelectorsOr() {
+ public void testWorkflowJobSelectorsOr() throws Exception {
JMSJobEventListener wfEventListener = new JMSJobEventListener();
wfEventListener.init(conf);
WorkflowJobEvent wfe = new WorkflowJobEvent("wfId1", "caId1", WorkflowJob.Status.FAILED, "user1",
"wf-app-name1", new Date(), new Date());
ConnectionContext jmsContext = getConnectionContext();
- try {
- Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
- // Pass a selector using OR condition
- String selector = JMSHeaderConstants.USER + "='Non_matching_user' OR " + JMSHeaderConstants.APP_NAME
- + "='wf-app-name1'";
- MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe), selector);
- wfEventListener.onWorkflowJobEvent(wfe);
- TextMessage message = (TextMessage) consumer.receive(5000);
- WorkflowJobMessage wfFailMessage = JMSMessagingUtils.getEventMessage(message);
- Assert.assertEquals(WorkflowJob.Status.FAILED, wfFailMessage.getStatus());
- assertEquals("user1", wfFailMessage.getUser());
- assertEquals(MessageType.JOB, wfFailMessage.getMessageType());
- wfEventListener.destroy();
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
+ // Pass a selector using OR condition
+ String selector = JMSHeaderConstants.USER + "='Non_matching_user' OR " + JMSHeaderConstants.APP_NAME
+ + "='wf-app-name1'";
+ MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe), selector);
+ wfEventListener.onWorkflowJobEvent(wfe);
+ TextMessage message = (TextMessage) consumer.receive(5000);
+ WorkflowJobMessage wfFailMessage = JMSMessagingUtils.getEventMessage(message);
+ Assert.assertEquals(WorkflowJob.Status.FAILED, wfFailMessage.getStatus());
+ assertEquals("user1", wfFailMessage.getUser());
+ assertEquals(MessageType.JOB, wfFailMessage.getMessageType());
+ wfEventListener.destroy();
}
@Test
- public void testWorkflowJobSelectorsAnd() {
+ public void testWorkflowJobSelectorsAnd() throws Exception {
JMSJobEventListener wfEventListener = new JMSJobEventListener();
wfEventListener.init(conf);
WorkflowJobEvent wfe = new WorkflowJobEvent("wfId1", "caId1", WorkflowJob.Status.FAILED, "user1",
"wf-app-name1", new Date(), new Date());
ConnectionContext jmsContext = getConnectionContext();
- try {
- Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
- // Pass a selector using AND condition
- String selector = JMSHeaderConstants.EVENT_STATUS + "='FAILURE' AND " + JMSHeaderConstants.APP_TYPE
- + "='WORKFLOW_JOB' AND " + JMSHeaderConstants.MESSAGE_TYPE + "='JOB'";
- MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe), selector);
- wfEventListener.onWorkflowJobEvent(wfe);
- TextMessage message = (TextMessage) consumer.receive(5000);
- WorkflowJobMessage wfFailMessage = JMSMessagingUtils.getEventMessage(message);
- Assert.assertEquals(WorkflowJob.Status.FAILED, wfFailMessage.getStatus());
- assertEquals("user1", wfFailMessage.getUser());
- assertEquals(MessageType.JOB, wfFailMessage.getMessageType());
- wfEventListener.destroy();
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
+ // Pass a selector using AND condition
+ String selector = JMSHeaderConstants.EVENT_STATUS + "='FAILURE' AND " + JMSHeaderConstants.APP_TYPE
+ + "='WORKFLOW_JOB' AND " + JMSHeaderConstants.MESSAGE_TYPE + "='JOB'";
+ MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe), selector);
+ wfEventListener.onWorkflowJobEvent(wfe);
+ TextMessage message = (TextMessage) consumer.receive(5000);
+ WorkflowJobMessage wfFailMessage = JMSMessagingUtils.getEventMessage(message);
+ Assert.assertEquals(WorkflowJob.Status.FAILED, wfFailMessage.getStatus());
+ assertEquals("user1", wfFailMessage.getUser());
+ assertEquals(MessageType.JOB, wfFailMessage.getMessageType());
+ wfEventListener.destroy();
}
@Test
- public void testConnectionDrop() {
+ public void testConnectionDrop() throws Exception {
Random random = new Random();
+ BrokerService broker = null;
try {
services.destroy();
services = new Services();
@@ -334,7 +285,7 @@ public class TestJMSJobEventListener extends XTestCase {
services.init();
JMSJobEventListener wfEventListener = new JMSJobEventListener();
wfEventListener.init(conf);
- BrokerService broker = new BrokerService();
+ broker = new BrokerService();
broker.setDataDirectory(getTestCaseDir());
broker.addConnector(brokerURl);
broker.start();
@@ -360,15 +311,16 @@ public class TestJMSJobEventListener extends XTestCase {
broker.stop();
wfEventListener.destroy();
}
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
+ finally {
+ if (broker != null) {
+ broker.stop();
+ }
}
}
@Test
- public void testOnCoordinatorActionWaitingEvent() throws ParseException {
+ public void testOnCoordinatorActionWaitingEvent() throws Exception {
JMSJobEventListener wfEventListner = new JMSJobEventListener();
wfEventListner.init(conf);
Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z");
@@ -376,36 +328,30 @@ public class TestJMSJobEventListener extends XTestCase {
CoordinatorActionEvent cae = new CoordinatorActionEvent("caId1", "caJobId1", CoordinatorAction.Status.WAITING,
"user1", "wf-app-name1", nominalTime, startDate, "missingDep1");
ConnectionContext jmsContext = getConnectionContext();
- try {
- Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListner.getTopic(cae));
- wfEventListner.onCoordinatorActionEvent(cae);
- TextMessage message = (TextMessage) consumer.receive(5000);
- assertFalse(message.getText().contains("endTime"));
- assertFalse(message.getText().contains("errorCode"));
- assertFalse(message.getText().contains("errorMessage"));
- CoordinatorActionMessage coordActionWaitingMessage = JMSMessagingUtils
- .getEventMessage(message);
- assertEquals(CoordinatorAction.Status.WAITING, coordActionWaitingMessage.getStatus());
- assertEquals(startDate, coordActionWaitingMessage.getStartTime());
- assertEquals(nominalTime, coordActionWaitingMessage.getNominalTime());
- assertEquals("caJobId1", coordActionWaitingMessage.getParentId());
- assertEquals("caId1", coordActionWaitingMessage.getId());
- assertEquals(MessageType.JOB, coordActionWaitingMessage.getMessageType());
- assertEquals(AppType.COORDINATOR_ACTION, coordActionWaitingMessage.getAppType());
- assertEquals(EventStatus.WAITING, coordActionWaitingMessage.getEventStatus());
- assertEquals("user1", coordActionWaitingMessage.getUser());
- assertEquals("wf-app-name1", coordActionWaitingMessage.getAppName());
- assertEquals("missingDep1", coordActionWaitingMessage.getMissingDependency());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListner.getTopic(cae));
+ wfEventListner.onCoordinatorActionEvent(cae);
+ TextMessage message = (TextMessage) consumer.receive(5000);
+ assertFalse(message.getText().contains("endTime"));
+ assertFalse(message.getText().contains("errorCode"));
+ assertFalse(message.getText().contains("errorMessage"));
+ CoordinatorActionMessage coordActionWaitingMessage = JMSMessagingUtils
+ .getEventMessage(message);
+ assertEquals(CoordinatorAction.Status.WAITING, coordActionWaitingMessage.getStatus());
+ assertEquals(startDate, coordActionWaitingMessage.getStartTime());
+ assertEquals(nominalTime, coordActionWaitingMessage.getNominalTime());
+ assertEquals("caJobId1", coordActionWaitingMessage.getParentId());
+ assertEquals("caId1", coordActionWaitingMessage.getId());
+ assertEquals(MessageType.JOB, coordActionWaitingMessage.getMessageType());
+ assertEquals(AppType.COORDINATOR_ACTION, coordActionWaitingMessage.getAppType());
+ assertEquals(EventStatus.WAITING, coordActionWaitingMessage.getEventStatus());
+ assertEquals("user1", coordActionWaitingMessage.getUser());
+ assertEquals("wf-app-name1", coordActionWaitingMessage.getAppName());
+ assertEquals("missingDep1", coordActionWaitingMessage.getMissingDependency());
}
@Test
- public void testOnCoordinatorActionStartEvent() throws ParseException {
+ public void testOnCoordinatorActionStartEvent() throws Exception {
JMSJobEventListener coordEventListener = new JMSJobEventListener();
coordEventListener.init(conf);
Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z");
@@ -413,35 +359,29 @@ public class TestJMSJobEventListener extends XTestCase {
CoordinatorActionEvent cae = new CoordinatorActionEvent("caId1", "caJobId1", CoordinatorAction.Status.RUNNING,
"user1", "wf-app-name1", nominalTime, startDate, null);
ConnectionContext jmsContext = getConnectionContext();
- try {
- Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = jmsContext.createConsumer(session, coordEventListener.getTopic(cae));
- coordEventListener.onCoordinatorActionEvent(cae);
- TextMessage message = (TextMessage) consumer.receive(5000);
- assertFalse(message.getText().contains("endTime"));
- assertFalse(message.getText().contains("errorCode"));
- assertFalse(message.getText().contains("errorMessage"));
- assertFalse(message.getText().contains("missingDependency"));
- CoordinatorActionMessage coordActionStartMessage = JMSMessagingUtils
- .getEventMessage(message);
- assertEquals(CoordinatorAction.Status.RUNNING, coordActionStartMessage.getStatus());
- assertEquals(startDate, coordActionStartMessage.getStartTime());
- assertEquals("caJobId1", coordActionStartMessage.getParentId());
- assertEquals("caId1", coordActionStartMessage.getId());
- assertEquals(MessageType.JOB, coordActionStartMessage.getMessageType());
- assertEquals(AppType.COORDINATOR_ACTION, coordActionStartMessage.getAppType());
- assertEquals(EventStatus.STARTED, coordActionStartMessage.getEventStatus());
- assertEquals("user1", coordActionStartMessage.getUser());
- assertEquals("wf-app-name1", coordActionStartMessage.getAppName());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = jmsContext.createConsumer(session, coordEventListener.getTopic(cae));
+ coordEventListener.onCoordinatorActionEvent(cae);
+ TextMessage message = (TextMessage) consumer.receive(5000);
+ assertFalse(message.getText().contains("endTime"));
+ assertFalse(message.getText().contains("errorCode"));
+ assertFalse(message.getText().contains("errorMessage"));
+ assertFalse(message.getText().contains("missingDependency"));
+ CoordinatorActionMessage coordActionStartMessage = JMSMessagingUtils
+ .getEventMessage(message);
+ assertEquals(CoordinatorAction.Status.RUNNING, coordActionStartMessage.getStatus());
+ assertEquals(startDate, coordActionStartMessage.getStartTime());
+ assertEquals("caJobId1", coordActionStartMessage.getParentId());
+ assertEquals("caId1", coordActionStartMessage.getId());
+ assertEquals(MessageType.JOB, coordActionStartMessage.getMessageType());
+ assertEquals(AppType.COORDINATOR_ACTION, coordActionStartMessage.getAppType());
+ assertEquals(EventStatus.STARTED, coordActionStartMessage.getEventStatus());
+ assertEquals("user1", coordActionStartMessage.getUser());
+ assertEquals("wf-app-name1", coordActionStartMessage.getAppName());
}
@Test
- public void testOnCoordinatorJobSuccessEvent() throws ParseException {
+ public void testOnCoordinatorJobSuccessEvent() throws Exception {
JMSJobEventListener coordEventListener = new JMSJobEventListener();
coordEventListener.init(conf);
Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z");
@@ -451,35 +391,29 @@ public class TestJMSJobEventListener extends XTestCase {
CoordinatorAction.Status.SUCCEEDED, "user1", "wf-app-name1", nominalTime, startDate, null);
cae.setEndTime(endDate);
ConnectionContext jmsContext = getConnectionContext();
- try {
- Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = jmsContext.createConsumer(session, coordEventListener.getTopic(cae));
- coordEventListener.onCoordinatorActionEvent(cae);
- TextMessage message = (TextMessage) consumer.receive(5000);
- assertFalse(message.getText().contains("errorCode"));
- assertFalse(message.getText().contains("errorMessage"));
- assertFalse(message.getText().contains("missingDependency"));
- CoordinatorActionMessage coordActionSuccessMessage = JMSMessagingUtils
- .getEventMessage(message);
- assertEquals(CoordinatorAction.Status.SUCCEEDED, coordActionSuccessMessage.getStatus());
- assertEquals(startDate, coordActionSuccessMessage.getStartTime());
- assertEquals(endDate, coordActionSuccessMessage.getEndTime());
- assertEquals("caJobId1", coordActionSuccessMessage.getParentId());
- assertEquals("caId1", coordActionSuccessMessage.getId());
- assertEquals(MessageType.JOB, coordActionSuccessMessage.getMessageType());
- assertEquals(AppType.COORDINATOR_ACTION, coordActionSuccessMessage.getAppType());
- assertEquals(EventStatus.SUCCESS, coordActionSuccessMessage.getEventStatus());
- assertEquals("user1", coordActionSuccessMessage.getUser());
- assertEquals("wf-app-name1", coordActionSuccessMessage.getAppName());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = jmsContext.createConsumer(session, coordEventListener.getTopic(cae));
+ coordEventListener.onCoordinatorActionEvent(cae);
+ TextMessage message = (TextMessage) consumer.receive(5000);
+ assertFalse(message.getText().contains("errorCode"));
+ assertFalse(message.getText().contains("errorMessage"));
+ assertFalse(message.getText().contains("missingDependency"));
+ CoordinatorActionMessage coordActionSuccessMessage = JMSMessagingUtils
+ .getEventMessage(message);
+ assertEquals(CoordinatorAction.Status.SUCCEEDED, coordActionSuccessMessage.getStatus());
+ assertEquals(startDate, coordActionSuccessMessage.getStartTime());
+ assertEquals(endDate, coordActionSuccessMessage.getEndTime());
+ assertEquals("caJobId1", coordActionSuccessMessage.getParentId());
+ assertEquals("caId1", coordActionSuccessMessage.getId());
+ assertEquals(MessageType.JOB, coordActionSuccessMessage.getMessageType());
+ assertEquals(AppType.COORDINATOR_ACTION, coordActionSuccessMessage.getAppType());
+ assertEquals(EventStatus.SUCCESS, coordActionSuccessMessage.getEventStatus());
+ assertEquals("user1", coordActionSuccessMessage.getUser());
+ assertEquals("wf-app-name1", coordActionSuccessMessage.getAppName());
}
@Test
- public void testOnCoordinatorJobFailureEvent() throws ParseException {
+ public void testOnCoordinatorJobFailureEvent() throws Exception {
JMSJobEventListener coordEventListener = new JMSJobEventListener();
coordEventListener.init(conf);
Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z");
@@ -491,35 +425,29 @@ public class TestJMSJobEventListener extends XTestCase {
cae.setErrorCode("E0101");
cae.setErrorMessage("dummyError");
ConnectionContext jmsContext = getConnectionContext();
- try {
- Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = jmsContext.createConsumer(session, coordEventListener.getTopic(cae));
- coordEventListener.onCoordinatorActionEvent(cae);
- TextMessage message = (TextMessage) consumer.receive(5000);
- assertFalse(message.getText().contains("missingDependency"));
- CoordinatorActionMessage coordActionFailMessage = JMSMessagingUtils
- .getEventMessage(message);
- assertEquals(CoordinatorAction.Status.FAILED, coordActionFailMessage.getStatus());
- assertEquals(startDate, coordActionFailMessage.getStartTime());
- assertEquals(endDate, coordActionFailMessage.getEndTime());
- assertEquals("caJobId1", coordActionFailMessage.getParentId());
- assertEquals("caId1", coordActionFailMessage.getId());
- assertEquals(MessageType.JOB, coordActionFailMessage.getMessageType());
- assertEquals(AppType.COORDINATOR_ACTION, coordActionFailMessage.getAppType());
- assertEquals(EventStatus.FAILURE, coordActionFailMessage.getEventStatus());
- assertEquals("user1", coordActionFailMessage.getUser());
- assertEquals("wf-app-name1", coordActionFailMessage.getAppName());
- assertEquals("E0101", coordActionFailMessage.getErrorCode());
- assertEquals("dummyError", coordActionFailMessage.getErrorMessage());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = jmsContext.createConsumer(session, coordEventListener.getTopic(cae));
+ coordEventListener.onCoordinatorActionEvent(cae);
+ TextMessage message = (TextMessage) consumer.receive(5000);
+ assertFalse(message.getText().contains("missingDependency"));
+ CoordinatorActionMessage coordActionFailMessage = JMSMessagingUtils
+ .getEventMessage(message);
+ assertEquals(CoordinatorAction.Status.FAILED, coordActionFailMessage.getStatus());
+ assertEquals(startDate, coordActionFailMessage.getStartTime());
+ assertEquals(endDate, coordActionFailMessage.getEndTime());
+ assertEquals("caJobId1", coordActionFailMessage.getParentId());
+ assertEquals("caId1", coordActionFailMessage.getId());
+ assertEquals(MessageType.JOB, coordActionFailMessage.getMessageType());
+ assertEquals(AppType.COORDINATOR_ACTION, coordActionFailMessage.getAppType());
+ assertEquals(EventStatus.FAILURE, coordActionFailMessage.getEventStatus());
+ assertEquals("user1", coordActionFailMessage.getUser());
+ assertEquals("wf-app-name1", coordActionFailMessage.getAppName());
+ assertEquals("E0101", coordActionFailMessage.getErrorCode());
+ assertEquals("dummyError", coordActionFailMessage.getErrorMessage());
}
@Test
- public void testCoordinatorActionSelectors() throws ParseException {
+ public void testCoordinatorActionSelectors() throws Exception {
JMSJobEventListener coordEventListener = new JMSJobEventListener();
coordEventListener.init(conf);
Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z");
@@ -527,26 +455,20 @@ public class TestJMSJobEventListener extends XTestCase {
CoordinatorActionEvent cae = new CoordinatorActionEvent("caId1", "caJobId1", CoordinatorAction.Status.FAILED,
"user1", "wf-app-name1", nominalTime, startDate, null);
ConnectionContext jmsContext = getConnectionContext();
- try {
- Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
- String selector = JMSHeaderConstants.USER + "='user1'";
- MessageConsumer consumer = jmsContext.createConsumer(session, coordEventListener.getTopic(cae), selector);
- coordEventListener.onCoordinatorActionEvent(cae);
- TextMessage message = (TextMessage) consumer.receive(5000);
- CoordinatorActionMessage coordActionFailMessage = JMSMessagingUtils
- .getEventMessage(message);
- Assert.assertEquals(CoordinatorAction.Status.FAILED, coordActionFailMessage.getStatus());
- assertEquals("user1", coordActionFailMessage.getUser());
- assertEquals(MessageType.JOB, coordActionFailMessage.getMessageType());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
+ String selector = JMSHeaderConstants.USER + "='user1'";
+ MessageConsumer consumer = jmsContext.createConsumer(session, coordEventListener.getTopic(cae), selector);
+ coordEventListener.onCoordinatorActionEvent(cae);
+ TextMessage message = (TextMessage) consumer.receive(5000);
+ CoordinatorActionMessage coordActionFailMessage = JMSMessagingUtils
+ .getEventMessage(message);
+ Assert.assertEquals(CoordinatorAction.Status.FAILED, coordActionFailMessage.getStatus());
+ assertEquals("user1", coordActionFailMessage.getUser());
+ assertEquals(MessageType.JOB, coordActionFailMessage.getMessageType());
}
@Test
- public void testCoordinatorActionSelectorsNegative() throws ParseException {
+ public void testCoordinatorActionSelectorsNegative() throws Exception {
JMSJobEventListener coordEventListener = new JMSJobEventListener();
coordEventListener.init(conf);
Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z");
@@ -554,19 +476,13 @@ public class TestJMSJobEventListener extends XTestCase {
CoordinatorActionEvent cae = new CoordinatorActionEvent("caId1", "caJobId1", CoordinatorAction.Status.FAILED,
"user1", "wf-app-name1", nominalTime, startDate, null);
ConnectionContext jmsContext = getConnectionContext();
- try {
- Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
- // Pass a selector which wont match and assert for null message
- String selector = JMSHeaderConstants.USER + "='Non_matching_user'";
- MessageConsumer consumer = jmsContext.createConsumer(session, coordEventListener.getTopic(cae), selector);
- coordEventListener.onCoordinatorActionEvent(cae);
- TextMessage message = (TextMessage) consumer.receive(5000);
- assertNull(message);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
+ // Pass a selector which wont match and assert for null message
+ String selector = JMSHeaderConstants.USER + "='Non_matching_user'";
+ MessageConsumer consumer = jmsContext.createConsumer(session, coordEventListener.getTopic(cae), selector);
+ coordEventListener.onCoordinatorActionEvent(cae);
+ TextMessage message = (TextMessage) consumer.receive(5000);
+ assertNull(message);
}
private ConnectionContext getConnectionContext() {
http://git-wip-us.apache.org/repos/asf/oozie/blob/117153a9/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 6dbe922..060f4fb 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 5.1.0 release (trunk - unreleased)
+OOZIE-3246 Flaky test TestJMSJobEventListener#testConnectionDrop (pbacsko via gezapeti)
OOZIE-3236 Fix flaky test TestHiveActionExecutor#testHiveAction (pbacsko via gezapeti)
OOZIE-3235 Upgrade ActiveMQ to 5.15.3 (matijhs via andras.piros)
OOZIE-3217 Enable definition of admin users using oozie-site.xml (orova via andras.piros)
[2/2] oozie git commit: OOZIE-3240 Flaky test
TestJMSAccessorService#testConnectionRetry (pbacsko via gezapeti)
Posted by ge...@apache.org.
OOZIE-3240 Flaky test TestJMSAccessorService#testConnectionRetry (pbacsko via gezapeti)
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/61c646c3
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/61c646c3
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/61c646c3
Branch: refs/heads/master
Commit: 61c646c332e6129f3502ee235e5e6d28fe55addd
Parents: 117153a
Author: Gezapeti Cseh <ge...@apache.org>
Authored: Wed May 16 13:52:57 2018 +0200
Committer: Gezapeti Cseh <ge...@apache.org>
Committed: Wed May 16 13:52:57 2018 +0200
----------------------------------------------------------------------
.../oozie/service/TestJMSAccessorService.java | 267 ++++++++++---------
release-log.txt | 1 +
2 files changed, 137 insertions(+), 131 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/61c646c3/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java b/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java
index 41241d2..dbf892e 100644
--- a/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java
@@ -19,6 +19,7 @@
package org.apache.oozie.service;
import java.net.URI;
+import java.net.URISyntaxException;
import java.util.Random;
import javax.jms.Session;
@@ -36,6 +37,7 @@ import org.junit.Test;
public class TestJMSAccessorService extends XTestCase {
private Services services;
private static Random random = new Random();
+ private static final int JMS_TIMEOUT_MS = 5000;
@Override
protected void setUp() throws Exception {
@@ -66,89 +68,63 @@ public class TestJMSAccessorService extends XTestCase {
}
@Test
- public void testRegisterSingleConsumerPerTopic() {
-
- try {
- HCatAccessorService hcatService = services.get(HCatAccessorService.class);
- JMSAccessorService jmsService = services.get(JMSAccessorService.class);
- String server = "hcat.server.com:5080";
- String topic = "hcat.mydb.mytable";
-
- JMSConnectionInfo connInfo = hcatService.getJMSConnectionInfo(new URI("hcat://hcat.server.com:8020"));
- jmsService.registerForNotification(connInfo, topic, new HCatMessageHandler(server));
+ public void testRegisterSingleConsumerPerTopic() throws URISyntaxException {
+ HCatAccessorService hcatService = services.get(HCatAccessorService.class);
+ JMSAccessorService jmsService = services.get(JMSAccessorService.class);
+ String server = "hcat.server.com:5080";
+ String topic = "hcat.mydb.mytable";
- MessageReceiver receiver1 = jmsService.getMessageReceiver(connInfo, topic);
- jmsService.registerForNotification(connInfo, topic, new HCatMessageHandler(server));
+ JMSConnectionInfo connInfo = hcatService.getJMSConnectionInfo(new URI("hcat://hcat.server.com:8020"));
+ jmsService.registerForNotification(connInfo, topic, new HCatMessageHandler(server));
- MessageReceiver receiver2 = jmsService.getMessageReceiver(connInfo, topic);
- assertEquals(receiver1, receiver2);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail("Exception encountered : " + e);
- }
+ MessageReceiver receiver1 = jmsService.getMessageReceiver(connInfo, topic);
+ jmsService.registerForNotification(connInfo, topic, new HCatMessageHandler(server));
+ MessageReceiver receiver2 = jmsService.getMessageReceiver(connInfo, topic);
+ assertEquals(receiver1, receiver2);
}
@Test
- public void testUnRegisterTopic() {
-
- try {
- HCatAccessorService hcatService = services.get(HCatAccessorService.class);
- JMSAccessorService jmsService = services.get(JMSAccessorService.class);
- String server = "hcat.server.com:5080";
- String topic = "hcatalog.mydb.mytable";
-
- JMSConnectionInfo connInfo = hcatService.getJMSConnectionInfo(new URI("hcat://hcat.server.com:8020"));
- jmsService.registerForNotification(connInfo, topic, new HCatMessageHandler(server));
+ public void testUnRegisterTopic() throws URISyntaxException {
+ HCatAccessorService hcatService = services.get(HCatAccessorService.class);
+ JMSAccessorService jmsService = services.get(JMSAccessorService.class);
+ String server = "hcat.server.com:5080";
+ String topic = "hcatalog.mydb.mytable";
- MessageReceiver receiver1 = jmsService.getMessageReceiver(connInfo, topic);
- assertNotNull(receiver1);
+ JMSConnectionInfo connInfo = hcatService.getJMSConnectionInfo(new URI("hcat://hcat.server.com:8020"));
+ jmsService.registerForNotification(connInfo, topic, new HCatMessageHandler(server));
- jmsService.unregisterFromNotification(connInfo, topic);
+ MessageReceiver receiver1 = jmsService.getMessageReceiver(connInfo, topic);
+ assertNotNull(receiver1);
- receiver1 = jmsService.getMessageReceiver(connInfo, topic);
- assertEquals(null, receiver1);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail("Exception encountered : " + e);
- }
+ jmsService.unregisterFromNotification(connInfo, topic);
+ receiver1 = jmsService.getMessageReceiver(connInfo, topic);
+ assertEquals(null, receiver1);
}
@Test
- public void testConnectionContext() throws ServiceException {
- try {
- services.destroy();
- services = super.setupServicesForHCatalog();
- Configuration conf = services.getConf();
- // set the connection factory name
- String jmsURL = "hcat://${1}.${2}.server.com:8020=java.naming.factory.initial#" +
- "org.apache.activemq.jndi.ActiveMQInitialContextFactory" +
- ";java.naming.provider.url#vm://localhost?broker.persistent=false;" +
- "connectionFactoryNames#dynamicFactories/hcat.prod.${1}";
- conf.set(HCatAccessorService.JMS_CONNECTIONS_PROPERTIES, jmsURL);
- services.init();
- HCatAccessorService hcatService = services.get(HCatAccessorService.class);
- JMSConnectionInfo connInfo = hcatService.getJMSConnectionInfo(new URI("hcat://hcatserver.blue.server.com:8020"));
- assertEquals(
- "java.naming.factory.initial#org.apache.activemq.jndi.ActiveMQInitialContextFactory;java.naming.provider.url#" +
- "vm://localhost?broker.persistent=false;connectionFactoryNames#dynamicFactories/hcat.prod.hcatserver",
- connInfo.getJNDIPropertiesString());
-
- ConnectionContext ctx1 = new DefaultConnectionContext();
- ctx1.createConnection(connInfo.getJNDIProperties());
- BrokerService broker = new BrokerService();
- broker.setDataDirectory(getTestCaseDir());
- // Without this stop testConnectionRetry fails with
- // javax.management.InstanceAlreadyExistsException: org.apache.activemq:BrokerName=localhost,Type=Broker
- broker.stop();
- }
- catch (Exception e) {
- e.printStackTrace();
- fail("Unexpected exception " + e);
- }
+ public void testConnectionContext() throws Exception {
+ services.destroy();
+ services = super.setupServicesForHCatalog();
+ Configuration conf = services.getConf();
+ // set the connection factory name
+ String jmsURL = "hcat://${1}.${2}.server.com:8020=java.naming.factory.initial#" +
+ "org.apache.activemq.jndi.ActiveMQInitialContextFactory" +
+ ";java.naming.provider.url#vm://localhost?broker.persistent=false;" +
+ "connectionFactoryNames#dynamicFactories/hcat.prod.${1}";
+ conf.set(HCatAccessorService.JMS_CONNECTIONS_PROPERTIES, jmsURL);
+ services.init();
+ HCatAccessorService hcatService = services.get(HCatAccessorService.class);
+ JMSConnectionInfo connInfo = hcatService.getJMSConnectionInfo(new URI("hcat://hcatserver.blue.server.com:8020"));
+ assertEquals(
+ "java.naming.factory.initial#org.apache.activemq.jndi.ActiveMQInitialContextFactory;java.naming.provider.url#" +
+ "vm://localhost?broker.persistent=false;connectionFactoryNames#dynamicFactories/hcat.prod.hcatserver",
+ connInfo.getJNDIPropertiesString());
+
+ ConnectionContext ctx = new DefaultConnectionContext();
+ ctx.createConnection(connInfo.getJNDIProperties());
+ ctx.close();
}
@Test
@@ -176,69 +152,94 @@ public class TestJMSAccessorService extends XTestCase {
assertTrue(jmsService.isTopicInRetryList(connInfo, topic));
// Start the broker and check if listening to topic now
BrokerService broker = new BrokerService();
- broker.addConnector(brokerURl);
- broker.setDataDirectory(getTestCaseDir());
- broker.start();
- Thread.sleep(1000);
- assertTrue(jmsService.isListeningToTopic(connInfo, topic));
- assertFalse(jmsService.isConnectionInRetryList(connInfo));
- assertFalse(jmsService.isTopicInRetryList(connInfo, topic));
- broker.stop();
- jmsService.destroy();
-
+ try {
+ broker.addConnector(brokerURl);
+ broker.setDataDirectory(getTestCaseDir());
+ broker.start();
+
+ waitFor(JMS_TIMEOUT_MS, new Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return jmsService.isListeningToTopic(connInfo, topic);
+ }
+ });
+ assertTrue(jmsService.isListeningToTopic(connInfo, topic));
+ assertFalse(jmsService.isConnectionInRetryList(connInfo));
+ assertFalse(jmsService.isTopicInRetryList(connInfo, topic));
+ } finally {
+ broker.stop();
+ }
}
@Test
public void testConnectionRetryExceptionListener() throws Exception {
- services.destroy();
- services = super.setupServicesForHCatalog();
- int randomPort = 30000 + random.nextInt(10000);
- String brokerURL = "tcp://localhost:" + randomPort;
- String jndiPropertiesString = "java.naming.factory.initial#" + ActiveMQConnFactory + ";"
- + "java.naming.provider.url#" + brokerURL + ";" + "connectionFactoryNames#" + "ConnectionFactory";
- Configuration servicesConf = services.getConf();
- servicesConf.set(JMSAccessorService.CONF_RETRY_INITIAL_DELAY, "1");
- servicesConf.set(JMSAccessorService.CONF_RETRY_MAX_ATTEMPTS, "3");
- servicesConf.set(HCatAccessorService.JMS_CONNECTIONS_PROPERTIES, "default=" + jndiPropertiesString);
- services.init();
- HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
- JMSAccessorService jmsService = Services.get().get(JMSAccessorService.class);
-
- String publisherAuthority = "hcat.server.com:5080";
- String topic = "topic.topic1";
- // Start the broker
- BrokerService broker = new BrokerService();
- broker.addConnector(brokerURL);
- broker.setDataDirectory(getTestCaseDir());
- broker.start();
- JMSConnectionInfo connInfo = hcatService.getJMSConnectionInfo(new URI("hcat://hcat.server.com:8020"));
- jmsService.registerForNotification(connInfo, topic, new HCatMessageHandler(publisherAuthority));
- assertTrue(jmsService.isListeningToTopic(connInfo, topic));
- assertFalse(jmsService.isConnectionInRetryList(connInfo));
- assertFalse(jmsService.isTopicInRetryList(connInfo, topic));
- ConnectionContext connCtxt = jmsService.createConnectionContext(connInfo);
- broker.stop();
+ BrokerService broker = null;
try {
- connCtxt.createSession(Session.AUTO_ACKNOWLEDGE);
- fail("Exception expected");
- }
- catch (Exception e) {
- Thread.sleep(100);
- assertFalse(jmsService.isListeningToTopic(connInfo, topic));
- assertTrue(jmsService.isConnectionInRetryList(connInfo));
- assertTrue(jmsService.isTopicInRetryList(connInfo, topic));
- }
- broker = new BrokerService();
- broker.addConnector(brokerURL);
- broker.setDataDirectory(getTestCaseDir());
- broker.start();
- Thread.sleep(1000);
- assertTrue(jmsService.isListeningToTopic(connInfo, topic));
- assertFalse(jmsService.isConnectionInRetryList(connInfo));
- assertFalse(jmsService.isTopicInRetryList(connInfo, topic));
- broker.stop();
- jmsService.destroy();
+ services.destroy();
+ services = super.setupServicesForHCatalog();
+ int randomPort = 30000 + random.nextInt(10000);
+ String brokerURL = "tcp://localhost:" + randomPort;
+ String jndiPropertiesString = "java.naming.factory.initial#" + ActiveMQConnFactory + ";"
+ + "java.naming.provider.url#" + brokerURL + ";" + "connectionFactoryNames#" + "ConnectionFactory";
+ Configuration servicesConf = services.getConf();
+ servicesConf.set(JMSAccessorService.CONF_RETRY_INITIAL_DELAY, "1");
+ servicesConf.set(JMSAccessorService.CONF_RETRY_MAX_ATTEMPTS, "3");
+ servicesConf.set(HCatAccessorService.JMS_CONNECTIONS_PROPERTIES, "default=" + jndiPropertiesString);
+ services.init();
+ HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
+ JMSAccessorService jmsService = Services.get().get(JMSAccessorService.class);
+
+ String publisherAuthority = "hcat.server.com:5080";
+ String topic = "topic.topic1";
+ // Start the broker
+ broker = new BrokerService();
+ broker.addConnector(brokerURL);
+ broker.setDataDirectory(getTestCaseDir());
+ broker.start();
+ JMSConnectionInfo connInfo = hcatService.getJMSConnectionInfo(new URI("hcat://hcat.server.com:8020"));
+ jmsService.registerForNotification(connInfo, topic, new HCatMessageHandler(publisherAuthority));
+ assertTrue(jmsService.isListeningToTopic(connInfo, topic));
+ assertFalse(jmsService.isConnectionInRetryList(connInfo));
+ assertFalse(jmsService.isTopicInRetryList(connInfo, topic));
+ ConnectionContext connCtxt = jmsService.createConnectionContext(connInfo);
+ broker.stop();
+
+ try {
+ connCtxt.createSession(Session.AUTO_ACKNOWLEDGE);
+ fail("Exception expected");
+ }
+ catch (Exception e) {
+ waitFor(JMS_TIMEOUT_MS, new Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return !jmsService.isListeningToTopic(connInfo, topic);
+ }
+ });
+ assertFalse(jmsService.isListeningToTopic(connInfo, topic));
+ assertTrue(jmsService.isConnectionInRetryList(connInfo));
+ assertTrue(jmsService.isTopicInRetryList(connInfo, topic));
+ }
+ broker = new BrokerService();
+
+ broker.addConnector(brokerURL);
+ broker.setDataDirectory(getTestCaseDir());
+ broker.start();
+ waitFor(JMS_TIMEOUT_MS, new Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return jmsService.isListeningToTopic(connInfo, topic);
+ }
+ });
+ assertTrue(jmsService.isListeningToTopic(connInfo, topic));
+ assertFalse(jmsService.isConnectionInRetryList(connInfo));
+ assertFalse(jmsService.isTopicInRetryList(connInfo, topic));
+ broker.stop();
+ } finally {
+ if (broker != null) {
+ broker.stop();
+ }
+ }
}
@Test
@@ -258,18 +259,22 @@ public class TestJMSAccessorService extends XTestCase {
String publisherAuthority = "hcat.server.com:5080";
String topic = "topic.topic1";
JMSConnectionInfo connInfo = hcatService.getJMSConnectionInfo(new URI("hcat://hcat.server.com:8020"));
+
jmsService.registerForNotification(connInfo, topic, new HCatMessageHandler(publisherAuthority));
assertTrue(jmsService.isConnectionInRetryList(connInfo));
assertTrue(jmsService.isTopicInRetryList(connInfo, topic));
assertFalse(jmsService.isListeningToTopic(connInfo, topic));
- Thread.sleep(1100);
+ waitFor(JMS_TIMEOUT_MS, new Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return jmsService.getNumConnectionAttempts(connInfo) == 1;
+ }
+ });
+
// Should not retry again as max attempt is 1
assertTrue(jmsService.isConnectionInRetryList(connInfo));
assertTrue(jmsService.isTopicInRetryList(connInfo, topic));
assertFalse(jmsService.isListeningToTopic(connInfo, topic));
- assertEquals(1, jmsService.getNumConnectionAttempts(connInfo));
assertFalse(jmsService.retryConnection(connInfo));
- jmsService.destroy();
}
-
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/61c646c3/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 060f4fb..fd7bd76 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 5.1.0 release (trunk - unreleased)
+OOZIE-3240 Flaky test TestJMSAccessorService#testConnectionRetry (pbacsko via gezapeti)
OOZIE-3246 Flaky test TestJMSJobEventListener#testConnectionDrop (pbacsko via gezapeti)
OOZIE-3236 Fix flaky test TestHiveActionExecutor#testHiveAction (pbacsko via gezapeti)
OOZIE-3235 Upgrade ActiveMQ to 5.15.3 (matijhs via andras.piros)