You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by rk...@apache.org on 2016/03/18 02:32:35 UTC
oozie git commit: OOZIE-2429 TestEventGeneration test is flakey
(fdenes via rkanter)
Repository: oozie
Updated Branches:
refs/heads/master ca98c6793 -> b8c6b702b
OOZIE-2429 TestEventGeneration test is flakey (fdenes via rkanter)
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/b8c6b702
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/b8c6b702
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/b8c6b702
Branch: refs/heads/master
Commit: b8c6b702b0e5e120f8231448dbc32ea01443a150
Parents: ca98c67
Author: Robert Kanter <rk...@cloudera.com>
Authored: Thu Mar 17 18:32:24 2016 -0700
Committer: Robert Kanter <rk...@cloudera.com>
Committed: Thu Mar 17 18:32:24 2016 -0700
----------------------------------------------------------------------
.../coord/CoordActionUpdateXCommand.java | 43 ++++++++++++--------
.../apache/oozie/event/TestEventGeneration.java | 32 +++++++++++----
release-log.txt | 1 +
3 files changed, 51 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/b8c6b702/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java
index d628a9f..ef49ea5 100644
--- a/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java
@@ -77,7 +77,9 @@ public class CoordActionUpdateXCommand extends CoordinatorXCommand<Void> {
@Override
protected Void execute() throws CommandException {
try {
- LOG.debug("STARTED CoordActionUpdateXCommand for wfId=" + workflow.getId());
+ LOG.debug("STARTED CoordActionUpdateXCommand for wfId=[{0}]", workflow.getId());
+ final CoordinatorAction.Status formerCoordinatorStatus = coordAction.getStatus();
+ final int formerCoordinatorPending = coordAction.getPending();
Status slaStatus = null;
if (workflow.getStatus() == WorkflowJob.Status.SUCCEEDED) {
coordAction.setStatus(CoordinatorAction.Status.SUCCEEDED);
@@ -105,7 +107,7 @@ public class CoordActionUpdateXCommand extends CoordinatorXCommand<Void> {
coordAction.decrementAndGetPending();
}
else {
- LOG.warn("Unexpected workflow " + workflow.getId() + " STATUS " + workflow.getStatus());
+ LOG.warn("Unexpected workflow [{0}] STATUS [{1}]", workflow.getId(), workflow.getStatus());
// update lastModifiedTime
coordAction.setLastModifiedTime(new Date());
CoordActionQueryExecutor.getInstance().executeUpdate(
@@ -119,8 +121,9 @@ public class CoordActionUpdateXCommand extends CoordinatorXCommand<Void> {
return null;
}
- LOG.info("Updating Coordintaor action id :" + coordAction.getId() + " status "
- + " to " + coordAction.getStatus() + ", pending = " + coordAction.getPending());
+ LOG.info("Updating Coordinator action id: [{0}] status [{1}] to [{2}] pending [{3}] to [{4}]",
+ coordAction.getId(), formerCoordinatorStatus, coordAction.getStatus(),
+ formerCoordinatorPending, coordAction.getPending());
coordAction.setLastModifiedTime(new Date());
updateList.add(new UpdateEntry<CoordActionQuery>(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME,
@@ -133,9 +136,9 @@ public class CoordActionUpdateXCommand extends CoordinatorXCommand<Void> {
LOG.info("Updating Coordinator job "+ coordJob.getId() + "pending to true");
}*/
if (slaStatus != null) {
- SLAEventBean slaEvent = SLADbOperations.createStatusEvent(coordAction.getSlaXml(), coordAction.getId(), slaStatus,
- SlaAppType.COORDINATOR_ACTION, LOG);
- if(slaEvent != null) {
+ SLAEventBean slaEvent = SLADbOperations.createStatusEvent(coordAction.getSlaXml(),
+ coordAction.getId(), slaStatus, SlaAppType.COORDINATOR_ACTION, LOG);
+ if (slaEvent != null) {
insertList.add(slaEvent);
}
}
@@ -149,10 +152,10 @@ public class CoordActionUpdateXCommand extends CoordinatorXCommand<Void> {
generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName(), workflow.getStartTime());
}
- LOG.debug("ENDED CoordActionUpdateXCommand for wfId=" + workflow.getId());
+ LOG.debug("ENDED CoordActionUpdateXCommand for wfId= [{0}]", workflow.getId());
}
catch (XException ex) {
- LOG.warn("CoordActionUpdate Failed ", ex.getMessage());
+ LOG.warn("CoordActionUpdate Failed [{0}]", ex.getMessage());
throw new CommandException(ex);
}
return null;
@@ -202,18 +205,26 @@ public class CoordActionUpdateXCommand extends CoordinatorXCommand<Void> {
@Override
protected void verifyPrecondition() throws CommandException, PreconditionException {
-
- // if coord action is RUNNING and pending false and workflow is RUNNING, this doesn't need to be updated.
- if (workflow.getStatus() == WorkflowJob.Status.RUNNING
- && coordAction.getStatus() == CoordinatorAction.Status.RUNNING && !coordAction.isPending()) {
+ // if coord action status == workflow, and pending false then coord action is already updated
+ if (((workflow.getStatus() == WorkflowJob.Status.SUCCEEDED
+ && coordAction.getStatus() == CoordinatorAction.Status.SUCCEEDED) ||
+ (workflow.getStatus() == WorkflowJob.Status.FAILED
+ && coordAction.getStatus() == CoordinatorAction.Status.FAILED) ||
+ (workflow.getStatus() == WorkflowJob.Status.KILLED
+ && coordAction.getStatus() == CoordinatorAction.Status.KILLED) ||
+ (workflow.getStatus() == WorkflowJob.Status.SUSPENDED
+ && coordAction.getStatus() == CoordinatorAction.Status.SUSPENDED) ||
+ (workflow.getStatus() == WorkflowJob.Status.RUNNING
+ && coordAction.getStatus() == CoordinatorAction.Status.RUNNING)
+ ) && !coordAction.isPending()) {
try {
CoordActionQueryExecutor.getInstance().executeUpdate(
CoordActionQueryExecutor.CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, coordAction);
- }
- catch (JPAExecutorException je) {
+ } catch (JPAExecutorException je) {
throw new CommandException(je);
}
- throw new PreconditionException(ErrorCode.E1100, ", workflow is RUNNING and coordinator action is RUNNING and pending false");
+ throw new PreconditionException(ErrorCode.E1100, ", workflow is " + workflow.getStatus() +
+ " and coordinator action is " + coordAction.getStatus() + " and pending false");
}
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/b8c6b702/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java b/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java
index 14f5294..5aaa344 100644
--- a/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java
+++ b/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java
@@ -85,7 +85,6 @@ import org.apache.oozie.service.LiteWorkflowStoreService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.UUIDService;
import org.apache.oozie.test.XDataTestCase;
-import org.apache.oozie.test.XTestCase.Predicate;
import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.IOUtils;
import org.apache.oozie.util.XConfiguration;
@@ -120,6 +119,9 @@ public class TestEventGeneration extends XDataTestCase {
super.setUp();
services = new Services();
Configuration conf = services.getConf();
+ // The EventHandlerService manipulates the queues in the background, so the actual test results depend on the
+ // circumstances (like the speed of the machine, debugging etc).
+ conf.setInt("oozie.service.EventHandlerService.worker.threads", 0);
conf.set(Services.CONF_SERVICE_EXT_CLASSES, "org.apache.oozie.service.EventHandlerService");
services.init();
ehs = services.get(EventHandlerService.class);
@@ -491,7 +493,7 @@ public class TestEventGeneration extends XDataTestCase {
}
@Test
- public void testForNoDuplicates() throws Exception {
+ public void testForNoDuplicatesWorkflowEvents() throws Exception {
// test workflow job events
Reader reader = IOUtils.getResourceAsReader("wf-no-op.xml", -1);
Writer writer = new FileWriter(getTestCaseDir() + "/workflow.xml");
@@ -515,7 +517,10 @@ public class TestEventGeneration extends XDataTestCase {
assertEquals(EventStatus.STARTED, ((JobEvent)queue.poll()).getEventStatus());
assertEquals(EventStatus.SUCCESS, ((JobEvent)queue.poll()).getEventStatus());
queue.clear();
+ }
+ @Test
+ public void testForNoDuplicatesCoordinatorActionEvents() throws Exception {
// test coordinator action events (failure case)
Date startTime = DateUtils.parseDateOozieTZ("2009-02-01T23:59Z");
Date endTime = DateUtils.parseDateOozieTZ("2009-02-02T23:59Z");
@@ -535,10 +540,17 @@ public class TestEventGeneration extends XDataTestCase {
assertEquals(2, queue.size());
assertEquals(EventStatus.WAITING, ((JobEvent)queue.poll()).getEventStatus());
assertEquals(EventStatus.FAILURE, ((JobEvent)queue.poll()).getEventStatus());
+ queue.clear();
+ }
+
+ @Test
+ public void testInvalidXMLCoordinatorFailsForNoDuplicates() throws Exception {
+ Date startTime = DateUtils.parseDateOozieTZ("2009-02-01T23:59Z");
+ Date endTime = DateUtils.parseDateOozieTZ("2009-02-02T23:59Z");
// test coordinator action events (failure from ActionStartX)
ehs.getAppTypes().add("workflow_action");
- coord = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, false, false, 0);
+ CoordinatorJobBean coord = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, false, false, 0);
CoordinatorActionBean action = addRecordToCoordActionTable(coord.getId(), 1, CoordinatorAction.Status.RUNNING,
"coord-action-sla1.xml", 0);
WorkflowJobBean wf = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING,
@@ -549,14 +561,19 @@ public class TestEventGeneration extends XDataTestCase {
String waId = _createWorkflowAction(wf.getId(), "wf-action");
new ActionStartXCommand(waId, action.getType()).call();
- final WorkflowJobGetJPAExecutor readCmd2 = new WorkflowJobGetJPAExecutor(jobId1);
+ final CoordJobGetJPAExecutor readCmd2 = new CoordJobGetJPAExecutor(coord.getId());
waitFor(1 * 100, new Predicate() {
@Override
public boolean evaluate() throws Exception {
- return jpaService.execute(readCmd2).getStatus() == WorkflowJob.Status.KILLED;
+ return jpaService.execute(readCmd2).getStatus() == CoordinatorJob.Status.KILLED;
}
});
+
assertEquals(3, queue.size());
+ JobEvent coordActionEvent = (JobEvent) queue.poll();
+ assertEquals(EventStatus.FAILURE, coordActionEvent.getEventStatus());
+ assertEquals(action.getId(), coordActionEvent.getId());
+ assertEquals(AppType.COORDINATOR_ACTION, coordActionEvent.getAppType());
JobEvent wfActionEvent = (JobEvent) queue.poll();
assertEquals(EventStatus.FAILURE, wfActionEvent.getEventStatus());
assertEquals(waId, wfActionEvent.getId());
@@ -565,10 +582,7 @@ public class TestEventGeneration extends XDataTestCase {
assertEquals(EventStatus.FAILURE, wfJobEvent.getEventStatus());
assertEquals(wf.getId(), wfJobEvent.getId());
assertEquals(AppType.WORKFLOW_JOB, wfJobEvent.getAppType());
- JobEvent coordActionEvent = (JobEvent) queue.poll();
- assertEquals(EventStatus.FAILURE, coordActionEvent.getEventStatus());
- assertEquals(action.getId(), coordActionEvent.getId());
- assertEquals(AppType.COORDINATOR_ACTION, coordActionEvent.getAppType());
+ queue.clear();
}
private class ActionCheckXCommandForTest extends ActionCheckXCommand {
http://git-wip-us.apache.org/repos/asf/oozie/blob/b8c6b702/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index f766a7c..9341014 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 4.3.0 release (trunk - unreleased)
+OOZIE-2429 TestEventGeneration test is flakey (fdenes via rkanter)
OOZIE-2466 Repeated failure of TestMetricsInstrumentation.testSamplers (fdenes via rkanter)
OOZIE-2470 Remove infinite socket timeouts in the Oozie email action (harsh)
OOZIE-2246 CoordinatorInputCheckCommand does not behave properly when har file is one of data dependency and doesn't exist (satishsaley via puru)