You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by mo...@apache.org on 2013/05/24 03:05:48 UTC
svn commit: r1485913 - in /oozie/trunk/core/src:
main/java/org/apache/oozie/command/ main/java/org/apache/oozie/command/coord/
main/java/org/apache/oozie/command/wf/ main/java/org/apache/oozie/event/
main/java/org/apache/oozie/service/ test/java/org/ap...
Author: mona
Date: Fri May 24 01:05:47 2013
New Revision: 1485913
URL: http://svn.apache.org/r1485913
Log:
OOZIE-1364 Fix duplicate events generated and fields missing in events (mona)
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/command/TransitionXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/event/MemoryEventQueue.java
oozie/trunk/core/src/main/java/org/apache/oozie/service/EventHandlerService.java
oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/TransitionXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/TransitionXCommand.java?rev=1485913&r1=1485912&r2=1485913&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/TransitionXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/TransitionXCommand.java Fri May 24 01:05:47 2013
@@ -80,9 +80,10 @@ public abstract class TransitionXCommand
public void generateEvents(CoordinatorJobBean coordJob) throws CommandException {
for (JsonBean actionBean : updateList) {
if (actionBean instanceof CoordinatorActionBean) {
+ CoordinatorActionBean caBean = (CoordinatorActionBean) actionBean;
+ caBean.setJobId(coordJob.getId());
if (EventHandlerService.isEnabled()) {
- CoordinatorXCommand.generateEvent((CoordinatorActionBean) actionBean, coordJob.getUser(),
- coordJob.getAppName());
+ CoordinatorXCommand.generateEvent(caBean, coordJob.getUser(), coordJob.getAppName());
}
}
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java?rev=1485913&r1=1485912&r2=1485913&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java Fri May 24 01:05:47 2013
@@ -74,6 +74,7 @@ public class CoordActionCheckXCommand ex
InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
WorkflowJobBean wf = jpaService.execute(new WorkflowJobGetJPAExecutor(coordAction.getExternalId()));
Status slaStatus = null;
+ CoordinatorAction.Status initialStatus = coordAction.getStatus();
if (wf.getStatus() == WorkflowJob.Status.SUCCEEDED) {
coordAction.setStatus(CoordinatorAction.Status.SUCCEEDED);
@@ -119,7 +120,8 @@ public class CoordActionCheckXCommand ex
}
jpaService.execute(new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, insertList));
- if (EventHandlerService.isEnabled()) {
+ CoordinatorAction.Status endStatus = coordAction.getStatus();
+ if (endStatus != initialStatus && EventHandlerService.isEnabled()) {
CoordinatorJobBean coordJob = jpaService.execute(new CoordinatorJobGetForUserAppnameJPAExecutor(
coordAction.getJobId()));
generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName());
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java?rev=1485913&r1=1485912&r2=1485913&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java Fri May 24 01:05:47 2013
@@ -134,9 +134,9 @@ public class CoordActionUpdateXCommand e
}
jpaService.execute(new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, insertList));
- if (EventHandlerService.isEnabled()) {
- CoordinatorJobBean coordJob = jpaService.execute(new CoordinatorJobGetForUserAppnameJPAExecutor(coordAction
- .getJobId()));
+ if (preCoordStatus != coordAction.getStatus() && EventHandlerService.isEnabled()) {
+ CoordinatorJobBean coordJob = jpaService.execute(new CoordinatorJobGetForUserAppnameJPAExecutor(
+ coordAction.getJobId()));
generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName());
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java?rev=1485913&r1=1485912&r2=1485913&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java Fri May 24 01:05:47 2013
@@ -73,6 +73,9 @@ public class SignalXCommand extends Work
private WorkflowActionBean wfAction;
private List<JsonBean> updateList = new ArrayList<JsonBean>();
private List<JsonBean> insertList = new ArrayList<JsonBean>();
+ private boolean generateEvent;
+ private String wfJobErrorCode;
+ private String wfJobErrorMsg;
public SignalXCommand(String name, int priority, String jobId) {
@@ -146,6 +149,7 @@ public class SignalXCommand extends Work
wfJob.setStatus(WorkflowJob.Status.RUNNING);
wfJob.setStartTime(new Date());
wfJob.setWorkflowInstance(workflowInstance);
+ generateEvent = true;
// 1. Add SLA status event for WF-JOB with status STARTED
SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfJob.getSlaXml(), jobId,
Status.STARTED, SlaAppType.WORKFLOW_JOB);
@@ -162,6 +166,7 @@ public class SignalXCommand extends Work
}
}
else {
+ WorkflowInstance.Status initialStatus = workflowInstance.getStatus();
String skipVar = workflowInstance.getVar(wfAction.getName() + WorkflowInstance.NODE_VAR_SEPARATOR
+ ReRunXCommand.TO_SKIP);
if (skipVar != null) {
@@ -180,6 +185,10 @@ public class SignalXCommand extends Work
queue(new NotificationXCommand(wfJob, wfAction));
}
updateList.add(wfAction);
+ WorkflowInstance.Status endStatus = workflowInstance.getStatus();
+ if (endStatus != initialStatus) {
+ generateEvent = true;
+ }
}
if (completed) {
@@ -200,6 +209,10 @@ public class SignalXCommand extends Work
actionToFailId));
actionToFail.resetPending();
actionToFail.setStatus(WorkflowActionBean.Status.FAILED);
+ if (wfJobErrorCode != null) {
+ wfJobErrorCode = actionToFail.getErrorCode();
+ wfJobErrorMsg = actionToFail.getErrorMessage();
+ }
queue(new NotificationXCommand(wfJob, actionToFail));
SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(),
Status.FAILED, SlaAppType.WORKFLOW_ACTION);
@@ -310,12 +323,10 @@ public class SignalXCommand extends Work
updateList.add(wfJob);
// call JPAExecutor to do the bulk writes
jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
- if (EventHandlerService.isEnabled()) {
+ if (generateEvent && EventHandlerService.isEnabled()) {
+ generateEvent(wfJob, wfJobErrorCode, wfJobErrorMsg);
if (wfAction != null) {
- generateEvent(wfJob, wfAction.getErrorCode(), wfAction.getErrorMessage());
- }
- else {
- generateEvent(wfJob);
+ generateEvent(wfAction, wfJob.getUser());
}
}
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/event/MemoryEventQueue.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/event/MemoryEventQueue.java?rev=1485913&r1=1485912&r2=1485913&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/event/MemoryEventQueue.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/event/MemoryEventQueue.java Fri May 24 01:05:47 2013
@@ -123,6 +123,7 @@ public class MemoryEventQueue implements
@Override
public void clear() {
eventQueue.clear();
+ currentSize.set(0);
}
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/service/EventHandlerService.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/service/EventHandlerService.java?rev=1485913&r1=1485912&r2=1485913&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/service/EventHandlerService.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/service/EventHandlerService.java Fri May 24 01:05:47 2013
@@ -207,6 +207,7 @@ public class EventHandlerService impleme
}
public void queueEvent(Event event) {
+ LOG.trace("Stack trace while queueing event : " + event, new Throwable());
eventQueue.add(event);
}
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java?rev=1485913&r1=1485912&r2=1485913&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java Fri May 24 01:05:47 2013
@@ -17,6 +17,10 @@
*/
package org.apache.oozie.event;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.Reader;
+import java.io.Writer;
import java.util.Arrays;
import java.util.Date;
import java.util.HashSet;
@@ -26,6 +30,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.oozie.AppType;
import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.DagEngine;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.client.CoordinatorAction;
@@ -34,6 +39,7 @@ import org.apache.oozie.client.OozieClie
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.client.event.Event;
+import org.apache.oozie.client.event.JobEvent;
import org.apache.oozie.client.event.JobEvent.EventStatus;
import org.apache.oozie.command.coord.CoordActionCheckXCommand;
import org.apache.oozie.command.coord.CoordActionInputCheckXCommand;
@@ -48,6 +54,8 @@ import org.apache.oozie.command.wf.Workf
import org.apache.oozie.coord.CoordELFunctions;
import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordActionUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowActionUpdateJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
@@ -58,6 +66,8 @@ import org.apache.oozie.service.JPAServi
import org.apache.oozie.service.Services;
import org.apache.oozie.test.XDataTestCase;
import org.apache.oozie.util.DateUtils;
+import org.apache.oozie.util.IOUtils;
+import org.apache.oozie.util.XConfiguration;
import org.apache.oozie.workflow.WorkflowInstance;
import org.apache.oozie.workflow.lite.ActionNodeDef;
import org.apache.oozie.workflow.lite.EndNodeDef;
@@ -77,12 +87,13 @@ import org.junit.Test;
public class TestEventGeneration extends XDataTestCase {
EventQueue queue;
+ Services services;
@Override
@Before
protected void setUp() throws Exception {
super.setUp();
- Services services = new Services();
+ services = new Services();
Configuration conf = services.getConf();
conf.set(Services.CONF_SERVICE_EXT_CLASSES, "org.apache.oozie.service.EventHandlerService");
services.init();
@@ -98,9 +109,9 @@ public class TestEventGeneration extends
@Test
public void testWorkflowJobEvent() throws Exception {
- assertEquals(queue.size(), 0);
+ assertEquals(0, queue.size());
WorkflowJobBean job = addRecordToWfJobTable(WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP);
- JPAService jpaService = Services.get().get(JPAService.class);
+ JPAService jpaService = services.get(JPAService.class);
// Starting job
new StartXCommand(job.getId()).call();
@@ -187,7 +198,7 @@ public class TestEventGeneration extends
@Test
public void testCoordinatorActionEvent() throws Exception {
- EventHandlerService ehs = Services.get().get(EventHandlerService.class);
+ EventHandlerService ehs = services.get(EventHandlerService.class);
// reduce noise from WF Job events (also default) by setting it to only
// coord action
ehs.setAppTypes(new HashSet<String>(Arrays.asList(new String[] { "coordinator_action" })));
@@ -197,7 +208,7 @@ public class TestEventGeneration extends
CoordinatorJobBean coord = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, false,
false, 0);
modifyCoordForRunning(coord);
- final JPAService jpaService = Services.get().get(JPAService.class);
+ final JPAService jpaService = services.get(JPAService.class);
// Action WAITING on materialization
new CoordMaterializeTransitionXCommand(coord.getId(), 3600).call();
@@ -302,7 +313,7 @@ public class TestEventGeneration extends
final CoordinatorJobBean coord = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
final CoordinatorActionBean action = addRecordToCoordActionTable(coord.getId(), 1,
CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0);
- final JPAService jpaService = Services.get().get(JPAService.class);
+ JPAService jpaService = services.get(JPAService.class);
CoordinatorXCommand<Void> myCmd = new CoordActionCheckXCommand(action.getId(), 0) {
@Override
@@ -339,6 +350,54 @@ public class TestEventGeneration extends
}
+ @Test
+ public void testForNoDuplicates() throws Exception {
+ // test workflow job events
+ Reader reader = IOUtils.getResourceAsReader("wf-no-op.xml", -1);
+ Writer writer = new FileWriter(getTestCaseDir() + "/workflow.xml");
+ IOUtils.copyCharStream(reader, writer);
+
+ final DagEngine engine = new DagEngine(getTestUser(), "authtoken");
+ Configuration conf = new XConfiguration();
+ conf.set(OozieClient.APP_PATH, "file://" + getTestCaseDir() + File.separator + "workflow.xml");
+ conf.set(OozieClient.USER_NAME, getTestUser());
+
+ final String jobId1 = engine.submitJob(conf, true);
+ final WorkflowJobGetJPAExecutor readCmd = new WorkflowJobGetJPAExecutor(jobId1);
+ final JPAService jpaService = services.get(JPAService.class);
+
+ waitFor(1 * 100, new Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return jpaService.execute(readCmd).getStatus() == WorkflowJob.Status.SUCCEEDED;
+ }
+ });
+ assertEquals(2, queue.size());
+ assertEquals(EventStatus.STARTED, ((JobEvent)queue.poll()).getEventStatus());
+ assertEquals(EventStatus.SUCCESS, ((JobEvent)queue.poll()).getEventStatus());
+ queue.clear();
+
+ // test coordinator action events (failure case)
+ Date startTime = DateUtils.parseDateOozieTZ("2009-02-01T23:59Z");
+ Date endTime = DateUtils.parseDateOozieTZ("2009-02-02T23:59Z");
+ CoordinatorJobBean coord = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, false,
+ false, 0);
+ _modifyCoordForFailureAction(coord);
+ new CoordMaterializeTransitionXCommand(coord.getId(), 3600).call();
+ final CoordJobGetJPAExecutor readCmd1 = new CoordJobGetJPAExecutor(coord.getId());
+ waitFor(1 * 300, new Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ CoordinatorJobBean bean = jpaService.execute(readCmd1);
+ return bean.getStatus() == CoordinatorJob.Status.SUCCEEDED
+ || bean.getStatus() == CoordinatorJob.Status.KILLED;
+ }
+ });
+ assertEquals(2, queue.size());
+ assertEquals(EventStatus.WAITING, ((JobEvent)queue.poll()).getEventStatus());
+ assertEquals(EventStatus.FAILURE, ((JobEvent)queue.poll()).getEventStatus());
+ }
+
private WorkflowJobBean _createWorkflowJob() throws Exception {
LiteWorkflowApp app = new LiteWorkflowApp("my-app", "<workflow-app/>",
new StartNodeDef(TestControlNodeHandler.class, "one"))
@@ -353,7 +412,7 @@ public class TestEventGeneration extends
WorkflowInstance.Status.PREP);
String executionPath = "/";
- JPAService jpaService = Services.get().get(JPAService.class);
+ JPAService jpaService = services.get(JPAService.class);
assertNotNull(jpaService);
WorkflowJobInsertJPAExecutor wfInsertCmd = new WorkflowJobInsertJPAExecutor(workflow);
jpaService.execute(wfInsertCmd);
@@ -374,4 +433,12 @@ public class TestEventGeneration extends
return (CoordinatorActionEvent) e;
}
+ private void _modifyCoordForFailureAction(CoordinatorJobBean coord) throws Exception {
+ String wfXml = IOUtils.getResourceAsString("wf-invalid-fork.xml", -1);
+ writeToFile(wfXml, getFsTestCaseDir(), "workflow.xml");
+ String coordXml = coord.getJobXml();
+ coord.setJobXml(coordXml.replace("hdfs:///tmp/workflows/", getFsTestCaseDir() + "/workflow.xml"));
+ services.get(JPAService.class).execute(new CoordJobUpdateJPAExecutor(coord));
+ }
+
}