You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by mo...@apache.org on 2013/05/24 03:05:48 UTC

svn commit: r1485913 - in /oozie/trunk/core/src: main/java/org/apache/oozie/command/ main/java/org/apache/oozie/command/coord/ main/java/org/apache/oozie/command/wf/ main/java/org/apache/oozie/event/ main/java/org/apache/oozie/service/ test/java/org/ap...

Author: mona
Date: Fri May 24 01:05:47 2013
New Revision: 1485913

URL: http://svn.apache.org/r1485913
Log:
OOZIE-1364 Fix duplicate events generated and fields missing in events (mona)

Modified:
    oozie/trunk/core/src/main/java/org/apache/oozie/command/TransitionXCommand.java
    oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java
    oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java
    oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
    oozie/trunk/core/src/main/java/org/apache/oozie/event/MemoryEventQueue.java
    oozie/trunk/core/src/main/java/org/apache/oozie/service/EventHandlerService.java
    oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/TransitionXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/TransitionXCommand.java?rev=1485913&r1=1485912&r2=1485913&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/TransitionXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/TransitionXCommand.java Fri May 24 01:05:47 2013
@@ -80,9 +80,10 @@ public abstract class TransitionXCommand
     public void generateEvents(CoordinatorJobBean coordJob) throws CommandException {
         for (JsonBean actionBean : updateList) {
             if (actionBean instanceof CoordinatorActionBean) {
+                CoordinatorActionBean caBean = (CoordinatorActionBean) actionBean;
+                caBean.setJobId(coordJob.getId());
                 if (EventHandlerService.isEnabled()) {
-                    CoordinatorXCommand.generateEvent((CoordinatorActionBean) actionBean, coordJob.getUser(),
-                            coordJob.getAppName());
+                    CoordinatorXCommand.generateEvent(caBean, coordJob.getUser(), coordJob.getAppName());
                 }
             }
         }

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java?rev=1485913&r1=1485912&r2=1485913&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java Fri May 24 01:05:47 2013
@@ -74,6 +74,7 @@ public class CoordActionCheckXCommand ex
             InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
             WorkflowJobBean wf = jpaService.execute(new WorkflowJobGetJPAExecutor(coordAction.getExternalId()));
             Status slaStatus = null;
+            CoordinatorAction.Status initialStatus = coordAction.getStatus();
 
             if (wf.getStatus() == WorkflowJob.Status.SUCCEEDED) {
                 coordAction.setStatus(CoordinatorAction.Status.SUCCEEDED);
@@ -119,7 +120,8 @@ public class CoordActionCheckXCommand ex
             }
 
             jpaService.execute(new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, insertList));
-            if (EventHandlerService.isEnabled()) {
+            CoordinatorAction.Status endStatus = coordAction.getStatus();
+            if (endStatus != initialStatus && EventHandlerService.isEnabled()) {
                 CoordinatorJobBean coordJob = jpaService.execute(new CoordinatorJobGetForUserAppnameJPAExecutor(
                         coordAction.getJobId()));
                 generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName());

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java?rev=1485913&r1=1485912&r2=1485913&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java Fri May 24 01:05:47 2013
@@ -134,9 +134,9 @@ public class CoordActionUpdateXCommand e
             }
 
             jpaService.execute(new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, insertList));
-            if (EventHandlerService.isEnabled()) {
-                CoordinatorJobBean coordJob = jpaService.execute(new CoordinatorJobGetForUserAppnameJPAExecutor(coordAction
-                        .getJobId()));
+            if (preCoordStatus != coordAction.getStatus() && EventHandlerService.isEnabled()) {
+                CoordinatorJobBean coordJob = jpaService.execute(new CoordinatorJobGetForUserAppnameJPAExecutor(
+                        coordAction.getJobId()));
                 generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName());
             }
 

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java?rev=1485913&r1=1485912&r2=1485913&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java Fri May 24 01:05:47 2013
@@ -73,6 +73,9 @@ public class SignalXCommand extends Work
     private WorkflowActionBean wfAction;
     private List<JsonBean> updateList = new ArrayList<JsonBean>();
     private List<JsonBean> insertList = new ArrayList<JsonBean>();
+    private boolean generateEvent;
+    private String wfJobErrorCode;
+    private String wfJobErrorMsg;
 
 
     public SignalXCommand(String name, int priority, String jobId) {
@@ -146,6 +149,7 @@ public class SignalXCommand extends Work
                 wfJob.setStatus(WorkflowJob.Status.RUNNING);
                 wfJob.setStartTime(new Date());
                 wfJob.setWorkflowInstance(workflowInstance);
+                generateEvent = true;
                 // 1. Add SLA status event for WF-JOB with status STARTED
                 SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfJob.getSlaXml(), jobId,
                         Status.STARTED, SlaAppType.WORKFLOW_JOB);
@@ -162,6 +166,7 @@ public class SignalXCommand extends Work
             }
         }
         else {
+            WorkflowInstance.Status initialStatus = workflowInstance.getStatus();
             String skipVar = workflowInstance.getVar(wfAction.getName() + WorkflowInstance.NODE_VAR_SEPARATOR
                     + ReRunXCommand.TO_SKIP);
             if (skipVar != null) {
@@ -180,6 +185,10 @@ public class SignalXCommand extends Work
                 queue(new NotificationXCommand(wfJob, wfAction));
             }
             updateList.add(wfAction);
+            WorkflowInstance.Status endStatus = workflowInstance.getStatus();
+            if (endStatus != initialStatus) {
+                generateEvent = true;
+            }
         }
 
         if (completed) {
@@ -200,6 +209,10 @@ public class SignalXCommand extends Work
                             actionToFailId));
                     actionToFail.resetPending();
                     actionToFail.setStatus(WorkflowActionBean.Status.FAILED);
+                    if (wfJobErrorCode != null) {
+                        wfJobErrorCode = actionToFail.getErrorCode();
+                        wfJobErrorMsg = actionToFail.getErrorMessage();
+                    }
                     queue(new NotificationXCommand(wfJob, actionToFail));
                     SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(),
                             Status.FAILED, SlaAppType.WORKFLOW_ACTION);
@@ -310,12 +323,10 @@ public class SignalXCommand extends Work
             updateList.add(wfJob);
             // call JPAExecutor to do the bulk writes
             jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
-            if (EventHandlerService.isEnabled()) {
+            if (generateEvent && EventHandlerService.isEnabled()) {
+                generateEvent(wfJob, wfJobErrorCode, wfJobErrorMsg);
                 if (wfAction != null) {
-                    generateEvent(wfJob, wfAction.getErrorCode(), wfAction.getErrorMessage());
-                }
-                else {
-                    generateEvent(wfJob);
+                    generateEvent(wfAction, wfJob.getUser());
                 }
             }
         }

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/event/MemoryEventQueue.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/event/MemoryEventQueue.java?rev=1485913&r1=1485912&r2=1485913&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/event/MemoryEventQueue.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/event/MemoryEventQueue.java Fri May 24 01:05:47 2013
@@ -123,6 +123,7 @@ public class MemoryEventQueue implements
     @Override
     public void clear() {
         eventQueue.clear();
+        currentSize.set(0);
     }
 
 }

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/service/EventHandlerService.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/service/EventHandlerService.java?rev=1485913&r1=1485912&r2=1485913&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/service/EventHandlerService.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/service/EventHandlerService.java Fri May 24 01:05:47 2013
@@ -207,6 +207,7 @@ public class EventHandlerService impleme
     }
 
     public void queueEvent(Event event) {
+        LOG.trace("Stack trace while queueing event : " + event, new Throwable());
         eventQueue.add(event);
     }
 

Modified: oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java?rev=1485913&r1=1485912&r2=1485913&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java Fri May 24 01:05:47 2013
@@ -17,6 +17,10 @@
  */
 package org.apache.oozie.event;
 
+import java.io.File;
+import java.io.FileWriter;
+import java.io.Reader;
+import java.io.Writer;
 import java.util.Arrays;
 import java.util.Date;
 import java.util.HashSet;
@@ -26,6 +30,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.oozie.AppType;
 import org.apache.oozie.CoordinatorActionBean;
 import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.DagEngine;
 import org.apache.oozie.WorkflowActionBean;
 import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.client.CoordinatorAction;
@@ -34,6 +39,7 @@ import org.apache.oozie.client.OozieClie
 import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.client.WorkflowJob;
 import org.apache.oozie.client.event.Event;
+import org.apache.oozie.client.event.JobEvent;
 import org.apache.oozie.client.event.JobEvent.EventStatus;
 import org.apache.oozie.command.coord.CoordActionCheckXCommand;
 import org.apache.oozie.command.coord.CoordActionInputCheckXCommand;
@@ -48,6 +54,8 @@ import org.apache.oozie.command.wf.Workf
 import org.apache.oozie.coord.CoordELFunctions;
 import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordActionUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowActionUpdateJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
@@ -58,6 +66,8 @@ import org.apache.oozie.service.JPAServi
 import org.apache.oozie.service.Services;
 import org.apache.oozie.test.XDataTestCase;
 import org.apache.oozie.util.DateUtils;
+import org.apache.oozie.util.IOUtils;
+import org.apache.oozie.util.XConfiguration;
 import org.apache.oozie.workflow.WorkflowInstance;
 import org.apache.oozie.workflow.lite.ActionNodeDef;
 import org.apache.oozie.workflow.lite.EndNodeDef;
@@ -77,12 +87,13 @@ import org.junit.Test;
 public class TestEventGeneration extends XDataTestCase {
 
     EventQueue queue;
+    Services services;
 
     @Override
     @Before
     protected void setUp() throws Exception {
         super.setUp();
-        Services services = new Services();
+        services = new Services();
         Configuration conf = services.getConf();
         conf.set(Services.CONF_SERVICE_EXT_CLASSES, "org.apache.oozie.service.EventHandlerService");
         services.init();
@@ -98,9 +109,9 @@ public class TestEventGeneration extends
 
     @Test
     public void testWorkflowJobEvent() throws Exception {
-        assertEquals(queue.size(), 0);
+        assertEquals(0, queue.size());
         WorkflowJobBean job = addRecordToWfJobTable(WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP);
-        JPAService jpaService = Services.get().get(JPAService.class);
+        JPAService jpaService = services.get(JPAService.class);
 
         // Starting job
         new StartXCommand(job.getId()).call();
@@ -187,7 +198,7 @@ public class TestEventGeneration extends
 
     @Test
     public void testCoordinatorActionEvent() throws Exception {
-        EventHandlerService ehs = Services.get().get(EventHandlerService.class);
+        EventHandlerService ehs = services.get(EventHandlerService.class);
         // reduce noise from WF Job events (also default) by setting it to only
         // coord action
         ehs.setAppTypes(new HashSet<String>(Arrays.asList(new String[] { "coordinator_action" })));
@@ -197,7 +208,7 @@ public class TestEventGeneration extends
         CoordinatorJobBean coord = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, false,
                 false, 0);
         modifyCoordForRunning(coord);
-        final JPAService jpaService = Services.get().get(JPAService.class);
+        final JPAService jpaService = services.get(JPAService.class);
 
         // Action WAITING on materialization
         new CoordMaterializeTransitionXCommand(coord.getId(), 3600).call();
@@ -302,7 +313,7 @@ public class TestEventGeneration extends
         final CoordinatorJobBean coord = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
         final CoordinatorActionBean action = addRecordToCoordActionTable(coord.getId(), 1,
                 CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0);
-        final JPAService jpaService = Services.get().get(JPAService.class);
+        JPAService jpaService = services.get(JPAService.class);
 
         CoordinatorXCommand<Void> myCmd = new CoordActionCheckXCommand(action.getId(), 0) {
             @Override
@@ -339,6 +350,54 @@ public class TestEventGeneration extends
 
     }
 
+    @Test
+    public void testForNoDuplicates() throws Exception {
+        // test workflow job events
+        Reader reader = IOUtils.getResourceAsReader("wf-no-op.xml", -1);
+        Writer writer = new FileWriter(getTestCaseDir() + "/workflow.xml");
+        IOUtils.copyCharStream(reader, writer);
+
+        final DagEngine engine = new DagEngine(getTestUser(), "authtoken");
+        Configuration conf = new XConfiguration();
+        conf.set(OozieClient.APP_PATH, "file://" + getTestCaseDir() + File.separator + "workflow.xml");
+        conf.set(OozieClient.USER_NAME, getTestUser());
+
+        final String jobId1 = engine.submitJob(conf, true);
+        final WorkflowJobGetJPAExecutor readCmd = new WorkflowJobGetJPAExecutor(jobId1);
+        final JPAService jpaService = services.get(JPAService.class);
+
+        waitFor(1 * 100, new Predicate() {
+            @Override
+            public boolean evaluate() throws Exception {
+                return jpaService.execute(readCmd).getStatus() == WorkflowJob.Status.SUCCEEDED;
+            }
+        });
+        assertEquals(2, queue.size());
+        assertEquals(EventStatus.STARTED, ((JobEvent)queue.poll()).getEventStatus());
+        assertEquals(EventStatus.SUCCESS, ((JobEvent)queue.poll()).getEventStatus());
+        queue.clear();
+
+        // test coordinator action events (failure case)
+        Date startTime = DateUtils.parseDateOozieTZ("2009-02-01T23:59Z");
+        Date endTime = DateUtils.parseDateOozieTZ("2009-02-02T23:59Z");
+        CoordinatorJobBean coord = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, false,
+                false, 0);
+        _modifyCoordForFailureAction(coord);
+        new CoordMaterializeTransitionXCommand(coord.getId(), 3600).call();
+        final CoordJobGetJPAExecutor readCmd1 = new CoordJobGetJPAExecutor(coord.getId());
+        waitFor(1 * 300, new Predicate() {
+            @Override
+            public boolean evaluate() throws Exception {
+                CoordinatorJobBean bean = jpaService.execute(readCmd1);
+                return bean.getStatus() == CoordinatorJob.Status.SUCCEEDED
+                        || bean.getStatus() == CoordinatorJob.Status.KILLED;
+            }
+        });
+        assertEquals(2, queue.size());
+        assertEquals(EventStatus.WAITING, ((JobEvent)queue.poll()).getEventStatus());
+        assertEquals(EventStatus.FAILURE, ((JobEvent)queue.poll()).getEventStatus());
+    }
+
     private WorkflowJobBean _createWorkflowJob() throws Exception {
         LiteWorkflowApp app = new LiteWorkflowApp("my-app", "<workflow-app/>",
                 new StartNodeDef(TestControlNodeHandler.class, "one"))
@@ -353,7 +412,7 @@ public class TestEventGeneration extends
                 WorkflowInstance.Status.PREP);
         String executionPath = "/";
 
-        JPAService jpaService = Services.get().get(JPAService.class);
+        JPAService jpaService = services.get(JPAService.class);
         assertNotNull(jpaService);
         WorkflowJobInsertJPAExecutor wfInsertCmd = new WorkflowJobInsertJPAExecutor(workflow);
         jpaService.execute(wfInsertCmd);
@@ -374,4 +433,12 @@ public class TestEventGeneration extends
         return (CoordinatorActionEvent) e;
     }
 
+    private void _modifyCoordForFailureAction(CoordinatorJobBean coord) throws Exception {
+        String wfXml = IOUtils.getResourceAsString("wf-invalid-fork.xml", -1);
+        writeToFile(wfXml, getFsTestCaseDir(), "workflow.xml");
+        String coordXml = coord.getJobXml();
+        coord.setJobXml(coordXml.replace("hdfs:///tmp/workflows/", getFsTestCaseDir() + "/workflow.xml"));
+        services.get(JPAService.class).execute(new CoordJobUpdateJPAExecutor(coord));
+    }
+
 }