You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by rk...@apache.org on 2016/03/18 02:32:35 UTC

oozie git commit: OOZIE-2429 TestEventGeneration test is flakey (fdenes via rkanter)

Repository: oozie
Updated Branches:
  refs/heads/master ca98c6793 -> b8c6b702b


OOZIE-2429 TestEventGeneration test is flakey (fdenes via rkanter)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/b8c6b702
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/b8c6b702
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/b8c6b702

Branch: refs/heads/master
Commit: b8c6b702b0e5e120f8231448dbc32ea01443a150
Parents: ca98c67
Author: Robert Kanter <rk...@cloudera.com>
Authored: Thu Mar 17 18:32:24 2016 -0700
Committer: Robert Kanter <rk...@cloudera.com>
Committed: Thu Mar 17 18:32:24 2016 -0700

----------------------------------------------------------------------
 .../coord/CoordActionUpdateXCommand.java        | 43 ++++++++++++--------
 .../apache/oozie/event/TestEventGeneration.java | 32 +++++++++++----
 release-log.txt                                 |  1 +
 3 files changed, 51 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/b8c6b702/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java
index d628a9f..ef49ea5 100644
--- a/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java
@@ -77,7 +77,9 @@ public class CoordActionUpdateXCommand extends CoordinatorXCommand<Void> {
     @Override
     protected Void execute() throws CommandException {
         try {
-            LOG.debug("STARTED CoordActionUpdateXCommand for wfId=" + workflow.getId());
+            LOG.debug("STARTED CoordActionUpdateXCommand for wfId=[{0}]", workflow.getId());
+            final CoordinatorAction.Status formerCoordinatorStatus = coordAction.getStatus();
+            final int formerCoordinatorPending = coordAction.getPending();
             Status slaStatus = null;
             if (workflow.getStatus() == WorkflowJob.Status.SUCCEEDED) {
                 coordAction.setStatus(CoordinatorAction.Status.SUCCEEDED);
@@ -105,7 +107,7 @@ public class CoordActionUpdateXCommand extends CoordinatorXCommand<Void> {
                 coordAction.decrementAndGetPending();
             }
             else {
-                LOG.warn("Unexpected workflow " + workflow.getId() + " STATUS " + workflow.getStatus());
+                LOG.warn("Unexpected workflow [{0}] STATUS [{1}]", workflow.getId(), workflow.getStatus());
                 // update lastModifiedTime
                 coordAction.setLastModifiedTime(new Date());
                 CoordActionQueryExecutor.getInstance().executeUpdate(
@@ -119,8 +121,9 @@ public class CoordActionUpdateXCommand extends CoordinatorXCommand<Void> {
                 return null;
             }
 
-            LOG.info("Updating Coordintaor action id :" + coordAction.getId() + " status "
-                    + " to " + coordAction.getStatus() + ", pending = " + coordAction.getPending());
+            LOG.info("Updating Coordinator action id: [{0}] status [{1}] to [{2}] pending [{3}] to [{4}]",
+                    coordAction.getId(), formerCoordinatorStatus, coordAction.getStatus(),
+                    formerCoordinatorPending, coordAction.getPending());
 
             coordAction.setLastModifiedTime(new Date());
             updateList.add(new UpdateEntry<CoordActionQuery>(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME,
@@ -133,9 +136,9 @@ public class CoordActionUpdateXCommand extends CoordinatorXCommand<Void> {
                 LOG.info("Updating Coordinator job "+ coordJob.getId() + "pending to true");
             }*/
             if (slaStatus != null) {
-                SLAEventBean slaEvent = SLADbOperations.createStatusEvent(coordAction.getSlaXml(), coordAction.getId(), slaStatus,
-                        SlaAppType.COORDINATOR_ACTION, LOG);
-                if(slaEvent != null) {
+                SLAEventBean slaEvent = SLADbOperations.createStatusEvent(coordAction.getSlaXml(),
+                        coordAction.getId(), slaStatus, SlaAppType.COORDINATOR_ACTION, LOG);
+                if (slaEvent != null) {
                     insertList.add(slaEvent);
                 }
             }
@@ -149,10 +152,10 @@ public class CoordActionUpdateXCommand extends CoordinatorXCommand<Void> {
                 generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName(), workflow.getStartTime());
             }
 
-            LOG.debug("ENDED CoordActionUpdateXCommand for wfId=" + workflow.getId());
+            LOG.debug("ENDED CoordActionUpdateXCommand for wfId= [{0}]", workflow.getId());
         }
         catch (XException ex) {
-            LOG.warn("CoordActionUpdate Failed ", ex.getMessage());
+            LOG.warn("CoordActionUpdate Failed [{0}]", ex.getMessage());
             throw new CommandException(ex);
         }
         return null;
@@ -202,18 +205,26 @@ public class CoordActionUpdateXCommand extends CoordinatorXCommand<Void> {
 
     @Override
     protected void verifyPrecondition() throws CommandException, PreconditionException {
-
-        // if coord action is RUNNING and pending false and workflow is RUNNING, this doesn't need to be updated.
-        if (workflow.getStatus() == WorkflowJob.Status.RUNNING
-                && coordAction.getStatus() == CoordinatorAction.Status.RUNNING && !coordAction.isPending()) {
+        // if coord action status == workflow, and pending false then coord action is already updated
+        if (((workflow.getStatus() == WorkflowJob.Status.SUCCEEDED
+                && coordAction.getStatus() == CoordinatorAction.Status.SUCCEEDED) ||
+                (workflow.getStatus() == WorkflowJob.Status.FAILED
+                        && coordAction.getStatus() == CoordinatorAction.Status.FAILED) ||
+                (workflow.getStatus() == WorkflowJob.Status.KILLED
+                        && coordAction.getStatus() == CoordinatorAction.Status.KILLED) ||
+                (workflow.getStatus() == WorkflowJob.Status.SUSPENDED
+                        && coordAction.getStatus() == CoordinatorAction.Status.SUSPENDED) ||
+                (workflow.getStatus() == WorkflowJob.Status.RUNNING
+                        && coordAction.getStatus() == CoordinatorAction.Status.RUNNING)
+            ) && !coordAction.isPending()) {
             try {
                 CoordActionQueryExecutor.getInstance().executeUpdate(
                         CoordActionQueryExecutor.CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, coordAction);
-            }
-            catch (JPAExecutorException je) {
+            } catch (JPAExecutorException je) {
                 throw new CommandException(je);
             }
-            throw new PreconditionException(ErrorCode.E1100, ", workflow is RUNNING and coordinator action is RUNNING and pending false");
+            throw new PreconditionException(ErrorCode.E1100, ", workflow is " + workflow.getStatus() +
+                    " and coordinator action is " + coordAction.getStatus() + " and pending false");
         }
     }
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/b8c6b702/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java b/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java
index 14f5294..5aaa344 100644
--- a/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java
+++ b/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java
@@ -85,7 +85,6 @@ import org.apache.oozie.service.LiteWorkflowStoreService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.service.UUIDService;
 import org.apache.oozie.test.XDataTestCase;
-import org.apache.oozie.test.XTestCase.Predicate;
 import org.apache.oozie.util.DateUtils;
 import org.apache.oozie.util.IOUtils;
 import org.apache.oozie.util.XConfiguration;
@@ -120,6 +119,9 @@ public class TestEventGeneration extends XDataTestCase {
         super.setUp();
         services = new Services();
         Configuration conf = services.getConf();
+        // The EventHandlerService manipulates the queues in the background, so the actual test results depend on the
+        // circumstances (like the speed of the machine, debugging etc).
+        conf.setInt("oozie.service.EventHandlerService.worker.threads", 0);
         conf.set(Services.CONF_SERVICE_EXT_CLASSES, "org.apache.oozie.service.EventHandlerService");
         services.init();
         ehs = services.get(EventHandlerService.class);
@@ -491,7 +493,7 @@ public class TestEventGeneration extends XDataTestCase {
     }
 
     @Test
-    public void testForNoDuplicates() throws Exception {
+    public void testForNoDuplicatesWorkflowEvents() throws Exception {
         // test workflow job events
         Reader reader = IOUtils.getResourceAsReader("wf-no-op.xml", -1);
         Writer writer = new FileWriter(getTestCaseDir() + "/workflow.xml");
@@ -515,7 +517,10 @@ public class TestEventGeneration extends XDataTestCase {
         assertEquals(EventStatus.STARTED, ((JobEvent)queue.poll()).getEventStatus());
         assertEquals(EventStatus.SUCCESS, ((JobEvent)queue.poll()).getEventStatus());
         queue.clear();
+    }
 
+    @Test
+    public void testForNoDuplicatesCoordinatorActionEvents() throws Exception {
         // test coordinator action events (failure case)
         Date startTime = DateUtils.parseDateOozieTZ("2009-02-01T23:59Z");
         Date endTime = DateUtils.parseDateOozieTZ("2009-02-02T23:59Z");
@@ -535,10 +540,17 @@ public class TestEventGeneration extends XDataTestCase {
         assertEquals(2, queue.size());
         assertEquals(EventStatus.WAITING, ((JobEvent)queue.poll()).getEventStatus());
         assertEquals(EventStatus.FAILURE, ((JobEvent)queue.poll()).getEventStatus());
+        queue.clear();
+    }
+
+    @Test
+    public void testInvalidXMLCoordinatorFailsForNoDuplicates() throws Exception {
+        Date startTime = DateUtils.parseDateOozieTZ("2009-02-01T23:59Z");
+        Date endTime = DateUtils.parseDateOozieTZ("2009-02-02T23:59Z");
 
         // test coordinator action events (failure from ActionStartX)
         ehs.getAppTypes().add("workflow_action");
-        coord = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, false, false, 0);
+        CoordinatorJobBean coord = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, false, false, 0);
         CoordinatorActionBean action = addRecordToCoordActionTable(coord.getId(), 1, CoordinatorAction.Status.RUNNING,
                 "coord-action-sla1.xml", 0);
         WorkflowJobBean wf = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING,
@@ -549,14 +561,19 @@ public class TestEventGeneration extends XDataTestCase {
         String waId = _createWorkflowAction(wf.getId(), "wf-action");
         new ActionStartXCommand(waId, action.getType()).call();
 
-        final WorkflowJobGetJPAExecutor readCmd2 = new WorkflowJobGetJPAExecutor(jobId1);
+        final CoordJobGetJPAExecutor readCmd2 = new CoordJobGetJPAExecutor(coord.getId());
         waitFor(1 * 100, new Predicate() {
             @Override
             public boolean evaluate() throws Exception {
-                return jpaService.execute(readCmd2).getStatus() == WorkflowJob.Status.KILLED;
+                return jpaService.execute(readCmd2).getStatus() == CoordinatorJob.Status.KILLED;
             }
         });
+
         assertEquals(3, queue.size());
+        JobEvent coordActionEvent = (JobEvent) queue.poll();
+        assertEquals(EventStatus.FAILURE, coordActionEvent.getEventStatus());
+        assertEquals(action.getId(), coordActionEvent.getId());
+        assertEquals(AppType.COORDINATOR_ACTION, coordActionEvent.getAppType());
         JobEvent wfActionEvent = (JobEvent) queue.poll();
         assertEquals(EventStatus.FAILURE, wfActionEvent.getEventStatus());
         assertEquals(waId, wfActionEvent.getId());
@@ -565,10 +582,7 @@ public class TestEventGeneration extends XDataTestCase {
         assertEquals(EventStatus.FAILURE, wfJobEvent.getEventStatus());
         assertEquals(wf.getId(), wfJobEvent.getId());
         assertEquals(AppType.WORKFLOW_JOB, wfJobEvent.getAppType());
-        JobEvent coordActionEvent = (JobEvent) queue.poll();
-        assertEquals(EventStatus.FAILURE, coordActionEvent.getEventStatus());
-        assertEquals(action.getId(), coordActionEvent.getId());
-        assertEquals(AppType.COORDINATOR_ACTION, coordActionEvent.getAppType());
+        queue.clear();
     }
 
     private class ActionCheckXCommandForTest extends ActionCheckXCommand {

http://git-wip-us.apache.org/repos/asf/oozie/blob/b8c6b702/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index f766a7c..9341014 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.3.0 release (trunk - unreleased)
 
+OOZIE-2429 TestEventGeneration test is flakey (fdenes via rkanter)
 OOZIE-2466 Repeated failure of TestMetricsInstrumentation.testSamplers (fdenes via rkanter)
 OOZIE-2470 Remove infinite socket timeouts in the Oozie email action (harsh)
 OOZIE-2246 CoordinatorInputCheckCommand does not behave properly when har file is one of data dependency and doesn't exist (satishsaley via puru)