You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by mo...@apache.org on 2013/06/05 20:34:19 UTC
svn commit: r1489996 - in /oozie/trunk: ./
core/src/main/java/org/apache/oozie/command/wf/
core/src/main/java/org/apache/oozie/event/
core/src/main/java/org/apache/oozie/sla/
core/src/main/java/org/apache/oozie/sla/listener/
core/src/main/java/org/apac...
Author: mona
Date: Wed Jun 5 18:34:19 2013
New Revision: 1489996
URL: http://svn.apache.org/r1489996
Log:
OOZIE-1375 Generate Job notification events for Workflow Actions (mona)
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionKillXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/KillXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/WorkflowXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/event/WorkflowActionEvent.java
oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
oozie/trunk/core/src/main/java/org/apache/oozie/sla/listener/SLAJobEventListener.java
oozie/trunk/core/src/main/java/org/apache/oozie/sla/service/SLAService.java
oozie/trunk/core/src/main/resources/oozie-default.xml
oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java
oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventQueue.java
oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowActionGetJPAExecutor.java
oozie/trunk/core/src/test/java/org/apache/oozie/service/TestEventHandlerService.java
oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java
oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAJobEventListener.java
oozie/trunk/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
oozie/trunk/release-log.txt
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java?rev=1489996&r1=1489995&r2=1489996&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java Wed Jun 5 18:34:19 2013
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -30,7 +30,6 @@ import org.apache.oozie.action.ActionExe
import org.apache.oozie.action.ActionExecutorException;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.WorkflowJob;
-import org.apache.oozie.client.WorkflowAction.Status;
import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.PreconditionException;
@@ -41,6 +40,7 @@ import org.apache.oozie.executor.jpa.Wor
import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
import org.apache.oozie.service.ActionCheckerService;
import org.apache.oozie.service.ActionService;
+import org.apache.oozie.service.EventHandlerService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.UUIDService;
@@ -63,6 +63,7 @@ public class ActionCheckXCommand extends
private JPAService jpaService = null;
private ActionExecutor executor = null;
private List<JsonBean> updateList = new ArrayList<JsonBean>();
+ private boolean generateEvent = false;
public ActionCheckXCommand(String actionId) {
this(actionId, -1);
@@ -183,6 +184,7 @@ public class ActionCheckXCommand extends
wfAction.setErrorInfo(EXEC_DATA_MISSING,
"Execution Complete, but Execution Data Missing from Action");
failJob(context);
+ generateEvent = true;
} else {
wfAction.setPending();
queue(new ActionEndXCommand(wfAction.getId(), wfAction.getType()));
@@ -198,10 +200,10 @@ public class ActionCheckXCommand extends
.getMessage(), ex);
wfAction.setErrorInfo(ex.getErrorCode(), ex.getMessage());
-
switch (ex.getErrorType()) {
case FAILED:
failJob(context, wfAction);
+ generateEvent = true;
break;
case ERROR:
handleUserRetry(wfAction);
@@ -209,6 +211,7 @@ public class ActionCheckXCommand extends
case TRANSIENT: // retry N times, then suspend workflow
if (!handleTransient(context, executor, WorkflowAction.Status.RUNNING)) {
handleNonTransient(context, executor, WorkflowAction.Status.START_MANUAL);
+ generateEvent = true;
wfAction.setPendingAge(new Date());
wfAction.setRetries(0);
wfAction.setStartTime(null);
@@ -224,6 +227,9 @@ public class ActionCheckXCommand extends
finally {
try {
jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, null));
+ if (generateEvent && EventHandlerService.isEnabled()) {
+ generateEvent(wfAction, wfJob.getUser());
+ }
}
catch (JPAExecutorException e) {
throw new CommandException(e);
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java?rev=1489996&r1=1489995&r2=1489996&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java Wed Jun 5 18:34:19 2013
@@ -44,6 +44,7 @@ import org.apache.oozie.executor.jpa.JPA
import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
import org.apache.oozie.service.ActionService;
+import org.apache.oozie.service.EventHandlerService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.UUIDService;
@@ -256,6 +257,9 @@ public class ActionEndXCommand extends A
finally {
try {
jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
+ if (!(executor instanceof ControlNodeActionExecutor) && EventHandlerService.isEnabled()) {
+ generateEvent(wfAction, wfJob.getUser());
+ }
}
catch (JPAExecutorException e) {
throw new CommandException(e);
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionKillXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionKillXCommand.java?rev=1489996&r1=1489995&r2=1489996&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionKillXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionKillXCommand.java Wed Jun 5 18:34:19 2013
@@ -37,7 +37,9 @@ import org.apache.oozie.executor.jpa.Wor
import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
import org.apache.oozie.action.ActionExecutor;
import org.apache.oozie.action.ActionExecutorException;
+import org.apache.oozie.action.control.ControlNodeActionExecutor;
import org.apache.oozie.service.ActionService;
+import org.apache.oozie.service.EventHandlerService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.UUIDService;
import org.apache.oozie.service.Services;
@@ -49,6 +51,7 @@ import org.apache.oozie.util.db.SLADbXOp
* Kill workflow action and invoke action executor to kill the underlying context.
*
*/
+@SuppressWarnings("deprecation")
public class ActionKillXCommand extends ActionXCommand<Void> {
private String actionId;
private String jobId;
@@ -160,6 +163,9 @@ public class ActionKillXCommand extends
finally {
try {
jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
+ if (!(executor instanceof ControlNodeActionExecutor) && EventHandlerService.isEnabled()) {
+ generateEvent(wfAction, wfJob.getUser());
+ }
}
catch (JPAExecutorException e) {
throw new CommandException(e);
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java?rev=1489996&r1=1489995&r2=1489996&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java Wed Jun 5 18:34:19 2013
@@ -32,7 +32,6 @@ import org.apache.oozie.XException;
import org.apache.oozie.action.ActionExecutor;
import org.apache.oozie.action.ActionExecutorException;
import org.apache.oozie.action.control.ControlNodeActionExecutor;
-import org.apache.oozie.action.hadoop.MapReduceActionExecutor;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.WorkflowJob;
@@ -47,6 +46,7 @@ import org.apache.oozie.executor.jpa.JPA
import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
import org.apache.oozie.service.ActionService;
+import org.apache.oozie.service.EventHandlerService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.UUIDService;
@@ -57,6 +57,7 @@ import org.apache.oozie.util.XLog;
import org.apache.oozie.util.XmlUtils;
import org.apache.oozie.util.db.SLADbXOperations;
+@SuppressWarnings("deprecation")
public class ActionStartXCommand extends ActionXCommand<Void> {
public static final String EL_ERROR = "EL_ERROR";
public static final String EL_EVAL_ERROR = "EL_EVAL_ERROR";
@@ -303,6 +304,9 @@ public class ActionStartXCommand extends
finally {
try {
jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
+ if (!(executor instanceof ControlNodeActionExecutor) && EventHandlerService.isEnabled()) {
+ generateEvent(wfAction, wfJob.getUser());
+ }
}
catch (JPAExecutorException e) {
throw new CommandException(e);
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/KillXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/KillXCommand.java?rev=1489996&r1=1489995&r2=1489996&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/KillXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/KillXCommand.java Wed Jun 5 18:34:19 2013
@@ -138,7 +138,7 @@ public class KillXCommand extends Workfl
queue(new ActionKillXCommand(action.getId(), action.getType()));
}
- if (action.getStatus() == WorkflowActionBean.Status.PREP
+ else if (action.getStatus() == WorkflowActionBean.Status.PREP
|| action.getStatus() == WorkflowActionBean.Status.START_RETRY
|| action.getStatus() == WorkflowActionBean.Status.START_MANUAL
|| action.getStatus() == WorkflowActionBean.Status.END_RETRY
@@ -152,6 +152,9 @@ public class KillXCommand extends Workfl
insertList.add(slaEvent);
}
updateList.add(action);
+ if (EventHandlerService.isEnabled()) {
+ generateEvent(action, wfJob.getUser());
+ }
}
}
wfJob.setLastModifiedTime(new Date());
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java?rev=1489996&r1=1489995&r2=1489996&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java Wed Jun 5 18:34:19 2013
@@ -73,7 +73,7 @@ public class SignalXCommand extends Work
private WorkflowActionBean wfAction;
private List<JsonBean> updateList = new ArrayList<JsonBean>();
private List<JsonBean> insertList = new ArrayList<JsonBean>();
- private boolean generateEvent;
+ private boolean generateEvent = false;
private String wfJobErrorCode;
private String wfJobErrorMsg;
@@ -325,9 +325,6 @@ public class SignalXCommand extends Work
jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
if (generateEvent && EventHandlerService.isEnabled()) {
generateEvent(wfJob, wfJobErrorCode, wfJobErrorMsg);
- if (wfAction != null) {
- generateEvent(wfAction, wfJob.getUser());
- }
}
}
catch (JPAExecutorException je) {
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/WorkflowXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/WorkflowXCommand.java?rev=1489996&r1=1489995&r2=1489996&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/WorkflowXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/WorkflowXCommand.java Wed Jun 5 18:34:19 2013
@@ -68,12 +68,13 @@ public abstract class WorkflowXCommand<T
}
protected void generateEvent(WorkflowActionBean wfAction, String wfUser) {
- // Workflow action events not filtered since required for sla
- WorkflowActionEvent event = new WorkflowActionEvent(wfAction.getId(), wfAction.getJobId(),
- wfAction.getStatus(), wfUser, wfAction.getName(), wfAction.getStartTime(), wfAction.getEndTime());
- event.setErrorCode(wfAction.getErrorCode());
- event.setErrorMessage(wfAction.getErrorMessage());
- eventService.queueEvent(event);
+ if (eventService.isSupportedApptype(AppType.WORKFLOW_ACTION.name())) {
+ WorkflowActionEvent event = new WorkflowActionEvent(wfAction.getId(), wfAction.getJobId(),
+ wfAction.getStatus(), wfUser, wfAction.getName(), wfAction.getStartTime(), wfAction.getEndTime());
+ event.setErrorCode(wfAction.getErrorCode());
+ event.setErrorMessage(wfAction.getErrorMessage());
+ eventService.queueEvent(event);
+ }
}
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/event/WorkflowActionEvent.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/event/WorkflowActionEvent.java?rev=1489996&r1=1489995&r2=1489996&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/event/WorkflowActionEvent.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/event/WorkflowActionEvent.java Wed Jun 5 18:34:19 2013
@@ -64,6 +64,7 @@ public class WorkflowActionEvent extends
case KILLED:
case FAILED:
setEventStatus(EventStatus.FAILURE);
+ break;
case START_MANUAL:
case END_MANUAL:
setEventStatus(EventStatus.SUSPEND);
@@ -91,7 +92,7 @@ public class WorkflowActionEvent extends
}
public void setErrorMessage(String msg) {
- errorCode = msg;
+ errorMessage = msg;
}
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java?rev=1489996&r1=1489995&r2=1489996&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java Wed Jun 5 18:34:19 2013
@@ -215,7 +215,7 @@ public class SLACalculatorMemory impleme
SLACalcStatus slaCalc = slaMap.get(jobId);
List<JsonBean> updateList = new ArrayList<JsonBean>();
SLASummaryBean slaInfo = null;
- boolean ret = false;
+ boolean hasSla = false;
if (slaCalc != null) {
synchronized (slaCalc) {
byte eventProc = slaCalc.getEventProcessed();
@@ -244,7 +244,7 @@ public class SLACalculatorMemory impleme
if (slaInfo.getSlaProcessed() == 2) {
slaMap.remove(jobId);
}
- ret = true;
+ hasSla = true;
}
}
else if (historySet.contains(jobId)) {
@@ -257,16 +257,19 @@ public class SLACalculatorMemory impleme
}
slaInfo.setSlaProcessed(2);
historySet.remove(jobId);
- ret = true;
+ hasSla = true;
}
- slaInfo.setLastModifiedTime(new Date());
- updateList.add(slaInfo);
- if (jpa != null) {
- jpa.execute(new SLACalculationInsertUpdateJPAExecutor(null, updateList));
+ if (hasSla) {
+ slaInfo.setLastModifiedTime(new Date());
+ updateList.add(slaInfo);
+ if (jpa != null) {
+ jpa.execute(new SLACalculationInsertUpdateJPAExecutor(null, updateList));
+ }
+ XLog.getLog(SLAService.class)
+ .trace("SLA Status Event - Job:" + jobId + " Status:" + slaCalc.getSLAStatus());
}
- XLog.getLog(SLAService.class).trace("SLA Status Event - Job:" + jobId + " Status:" + slaCalc.getSLAStatus());
- return ret;
+ return hasSla;
}
/**
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/sla/listener/SLAJobEventListener.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/sla/listener/SLAJobEventListener.java?rev=1489996&r1=1489995&r2=1489996&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/sla/listener/SLAJobEventListener.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/sla/listener/SLAJobEventListener.java Wed Jun 5 18:34:19 2013
@@ -68,13 +68,13 @@ public class SLAJobEventListener extends
}
private void sendEventToSLAService(JobEvent event, String status) {
- SLAService slaService = Services.get().get(SLAService.class);
- if (slaService != null && !status.equals(CoordinatorAction.Status.WAITING.name())
+ if (!status.equals(CoordinatorAction.Status.WAITING.name())
&& !status.equals(CoordinatorAction.Status.SUSPENDED.name())) {
Date startTime = event.getStartTime();
Date endTime = event.getEndTime();
try {
- slaService.addStatusEvent(event.getId(), status, event.getEventStatus(), startTime, endTime);
+ Services.get().get(SLAService.class)
+ .addStatusEvent(event.getId(), status, event.getEventStatus(), startTime, endTime);
}
catch (ServiceException se) {
XLog.getLog(SLAService.class).error("Exception happened while sending Job-Status event for SLA", se);
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/sla/service/SLAService.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/sla/service/SLAService.java?rev=1489996&r1=1489995&r2=1489996&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/sla/service/SLAService.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/sla/service/SLAService.java Wed Jun 5 18:34:19 2013
@@ -31,7 +31,6 @@ import org.apache.oozie.sla.SLACalculato
import org.apache.oozie.sla.SLACalculatorMemory;
import org.apache.oozie.sla.SLARegistrationBean;
import org.apache.oozie.util.XLog;
-
import com.google.common.annotations.VisibleForTesting;
public class SLAService implements Service {
@@ -60,6 +59,9 @@ public class SLAService implements Servi
+ Services.CONF_SERVICE_EXT_CLASSES + " or declare it BEFORE SLAService");
}
LOG = XLog.getLog(getClass());
+ java.util.Set<String> appTypes = eventHandler.getAppTypes();
+ appTypes.add("workflow_action");
+ eventHandler.setAppTypes(appTypes);
Runnable slaThread = new SLAWorker(calcImpl);
// schedule runnable by default every 30 sec
Modified: oozie/trunk/core/src/main/resources/oozie-default.xml
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/resources/oozie-default.xml?rev=1489996&r1=1489995&r2=1489996&view=diff
==============================================================================
--- oozie/trunk/core/src/main/resources/oozie-default.xml (original)
+++ oozie/trunk/core/src/main/resources/oozie-default.xml Wed Jun 5 18:34:19 2013
@@ -1845,9 +1845,7 @@
<property>
<name>oozie.service.EventHandlerService.event.listeners</name>
- <value>org.apache.oozie.jms.JMSJobEventListener,
- org.apache.oozie.sla.listener.SLAJobEventListener
- </value>
+ <value>org.apache.oozie.jms.JMSJobEventListener</value>
</property>
<property>
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java?rev=1489996&r1=1489995&r2=1489996&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java Wed Jun 5 18:34:19 2013
@@ -33,18 +33,24 @@ import org.apache.oozie.CoordinatorJobBe
import org.apache.oozie.DagEngine;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.action.ActionExecutor;
+import org.apache.oozie.action.control.ControlNodeActionExecutor;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.WorkflowJob;
-import org.apache.oozie.client.event.Event;
import org.apache.oozie.client.event.JobEvent;
import org.apache.oozie.client.event.JobEvent.EventStatus;
+import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.coord.CoordActionCheckXCommand;
import org.apache.oozie.command.coord.CoordActionInputCheckXCommand;
import org.apache.oozie.command.coord.CoordMaterializeTransitionXCommand;
import org.apache.oozie.command.coord.CoordinatorXCommand;
+import org.apache.oozie.command.wf.ActionCheckXCommand;
+import org.apache.oozie.command.wf.ActionKillXCommand;
+import org.apache.oozie.command.wf.ActionStartXCommand;
+import org.apache.oozie.command.wf.ActionXCommand;
import org.apache.oozie.command.wf.KillXCommand;
import org.apache.oozie.command.wf.ResumeXCommand;
import org.apache.oozie.command.wf.SignalXCommand;
@@ -56,11 +62,13 @@ import org.apache.oozie.executor.jpa.Coo
import org.apache.oozie.executor.jpa.CoordActionUpdateJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowActionUpdateJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobInsertJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobUpdateJPAExecutor;
+import org.apache.oozie.service.ActionService;
import org.apache.oozie.service.EventHandlerService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
@@ -88,6 +96,7 @@ public class TestEventGeneration extends
EventQueue queue;
Services services;
+ EventHandlerService ehs;
@Override
@Before
@@ -97,7 +106,8 @@ public class TestEventGeneration extends
Configuration conf = services.getConf();
conf.set(Services.CONF_SERVICE_EXT_CLASSES, "org.apache.oozie.service.EventHandlerService");
services.init();
- queue = services.get(EventHandlerService.class).getEventQueue();
+ ehs = services.get(EventHandlerService.class);
+ queue = ehs.getEventQueue();
}
@Override
@@ -119,7 +129,7 @@ public class TestEventGeneration extends
job = jpaService.execute(wfJobGetCmd);
assertEquals(WorkflowJob.Status.RUNNING, job.getStatus());
assertEquals(1, queue.size());
- WorkflowJobEvent event = (WorkflowJobEvent) queue.poll();
+ JobEvent event = (JobEvent) queue.poll();
assertNotNull(event);
assertEquals(EventStatus.STARTED, event.getEventStatus());
assertEquals(AppType.WORKFLOW_JOB, event.getAppType());
@@ -134,7 +144,7 @@ public class TestEventGeneration extends
job = jpaService.execute(wfJobGetCmd);
assertEquals(WorkflowJob.Status.SUSPENDED, job.getStatus());
assertEquals(1, queue.size());
- event = (WorkflowJobEvent) queue.poll();
+ event = (JobEvent) queue.poll();
assertNotNull(event);
assertEquals(EventStatus.SUSPEND, event.getEventStatus());
assertEquals(AppType.WORKFLOW_JOB, event.getAppType());
@@ -148,7 +158,7 @@ public class TestEventGeneration extends
job = jpaService.execute(wfJobGetCmd);
assertEquals(WorkflowJob.Status.RUNNING, job.getStatus());
assertEquals(1, queue.size());
- event = (WorkflowJobEvent) queue.poll();
+ event = (JobEvent) queue.poll();
assertNotNull(event);
assertEquals(AppType.WORKFLOW_JOB, event.getAppType());
assertEquals(job.getId(), event.getId());
@@ -162,7 +172,7 @@ public class TestEventGeneration extends
job = jpaService.execute(wfJobGetCmd);
assertEquals(WorkflowJob.Status.KILLED, job.getStatus());
assertEquals(1, queue.size());
- event = (WorkflowJobEvent) queue.poll();
+ event = (JobEvent) queue.poll();
assertNotNull(event);
assertEquals(EventStatus.FAILURE, event.getEventStatus());
assertEquals(AppType.WORKFLOW_JOB, event.getAppType());
@@ -184,7 +194,7 @@ public class TestEventGeneration extends
job = jpaService.execute(new WorkflowJobGetJPAExecutor(job.getId()));
assertEquals(WorkflowJob.Status.SUCCEEDED, job.getStatus());
assertEquals(1, queue.size());
- event = (WorkflowJobEvent) queue.poll();
+ event = (JobEvent) queue.poll();
assertNotNull(event);
assertEquals(AppType.WORKFLOW_JOB, event.getAppType());
assertEquals(job.getId(), event.getId());
@@ -192,16 +202,14 @@ public class TestEventGeneration extends
assertEquals(job.getAppName(), event.getAppName());
assertEquals(job.getStartTime(), event.getStartTime());
assertEquals(job.getEndTime(), event.getEndTime());
- assertEquals(0, queue.size());
}
@Test
public void testCoordinatorActionEvent() throws Exception {
- EventHandlerService ehs = services.get(EventHandlerService.class);
- // reduce noise from WF Job events (also default) by setting it to only
+ // avoid noise from other apptype events by setting it to only
// coord action
- ehs.setAppTypes(new HashSet<String>(Arrays.asList(new String[] { "coordinator_action" })));
+ ehs.setAppTypes(new HashSet<String>(Arrays.asList("coordinator_action")));
assertEquals(queue.size(), 0);
Date startTime = DateUtils.parseDateOozieTZ("2013-01-01T10:00Z");
Date endTime = DateUtils.parseDateOozieTZ("2013-01-01T10:14Z");
@@ -216,13 +224,13 @@ public class TestEventGeneration extends
CoordinatorActionBean action = jpaService.execute(coordGetCmd);
assertEquals(CoordinatorAction.Status.WAITING, action.getStatus());
assertEquals(1, queue.size());
- CoordinatorActionEvent event = (CoordinatorActionEvent) queue.poll();
+ JobEvent event = (JobEvent) queue.poll();
assertNotNull(event);
assertEquals(EventStatus.WAITING, event.getEventStatus());
assertEquals(AppType.COORDINATOR_ACTION, event.getAppType());
assertEquals(action.getId(), event.getId());
assertEquals(action.getJobId(), event.getParentId());
- assertEquals(action.getNominalTime(), event.getNominalTime());
+ assertEquals(action.getNominalTime(), ((CoordinatorActionEvent) event).getNominalTime());
assertEquals(action.getCreatedTime(), event.getStartTime());
assertEquals(coord.getUser(), event.getUser());
assertEquals(coord.getAppName(), event.getAppName());
@@ -233,19 +241,19 @@ public class TestEventGeneration extends
action = jpaService.execute(coordGetCmd);
assertEquals(CoordinatorAction.Status.READY, action.getStatus());
- waitFor(1 * 100, new Predicate() {
+ waitFor(4 * 100, new Predicate() {
@Override
public boolean evaluate() throws Exception {
return jpaService.execute(coordGetCmd).getStatus() == CoordinatorAction.Status.RUNNING;
}
});
- event = _pollQueue();
+ event = (JobEvent) queue.poll();
assertEquals(EventStatus.STARTED, event.getEventStatus());
assertEquals(AppType.COORDINATOR_ACTION, event.getAppType());
assertEquals(action.getId(), event.getId());
assertEquals(action.getJobId(), event.getParentId());
- assertEquals(action.getNominalTime(), event.getNominalTime());
+ assertEquals(action.getNominalTime(), ((CoordinatorActionEvent) event).getNominalTime());
assertEquals(action.getCreatedTime(), event.getStartTime());
assertEquals(coord.getUser(), event.getUser());
assertEquals(coord.getAppName(), event.getAppName());
@@ -256,14 +264,15 @@ public class TestEventGeneration extends
wfJob.setStatus(WorkflowJob.Status.SUCCEEDED);
jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
new CoordActionCheckXCommand(action.getId(), 0).call();
+ Thread.sleep(300);
action = jpaService.execute(coordGetCmd);
assertEquals(CoordinatorAction.Status.SUCCEEDED, action.getStatus());
- event = _pollQueue();
+ event = (JobEvent) queue.poll();
assertEquals(EventStatus.SUCCESS, event.getEventStatus());
assertEquals(AppType.COORDINATOR_ACTION, event.getAppType());
assertEquals(action.getId(), event.getId());
assertEquals(action.getJobId(), event.getParentId());
- assertEquals(action.getNominalTime(), event.getNominalTime());
+ assertEquals(action.getNominalTime(), ((CoordinatorActionEvent) event).getNominalTime());
assertEquals(action.getCreatedTime(), event.getStartTime());
assertEquals(coord.getUser(), event.getUser());
assertEquals(coord.getAppName(), event.getAppName());
@@ -271,17 +280,17 @@ public class TestEventGeneration extends
// Action Failure
action.setStatus(CoordinatorAction.Status.RUNNING);
jpaService.execute(new CoordActionUpdateJPAExecutor(action));
- wfJob.setStatus(WorkflowJob.Status.KILLED);
+ wfJob.setStatus(WorkflowJob.Status.FAILED);
jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
new CoordActionCheckXCommand(action.getId(), 0).call();
action = jpaService.execute(coordGetCmd);
- assertEquals(CoordinatorAction.Status.KILLED, action.getStatus());
- event = _pollQueue();
+ assertEquals(CoordinatorAction.Status.FAILED, action.getStatus());
+ event = (JobEvent) queue.poll();
assertEquals(EventStatus.FAILURE, event.getEventStatus());
assertEquals(AppType.COORDINATOR_ACTION, event.getAppType());
assertEquals(action.getId(), event.getId());
assertEquals(action.getJobId(), event.getParentId());
- assertEquals(action.getNominalTime(), event.getNominalTime());
+ assertEquals(action.getNominalTime(), ((CoordinatorActionEvent) event).getNominalTime());
assertEquals(action.getCreatedTime(), event.getStartTime());
assertEquals(coord.getUser(), event.getUser());
assertEquals(coord.getAppName(), event.getAppName());
@@ -289,6 +298,70 @@ public class TestEventGeneration extends
}
@Test
+ public void testWorkflowActionEvent() throws Exception {
+ assertEquals(queue.size(), 0);
+ // avoid noise from other apptype events by setting it to only
+ // workflow action
+ ehs.setAppTypes(new HashSet<String>(Arrays.asList("workflow_action")));
+ WorkflowJobBean job = this.addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING);
+ WorkflowActionBean action = this.addRecordToWfActionTable(job.getId(), "1", WorkflowAction.Status.PREP, true);
+ WorkflowActionGetJPAExecutor wfActionGetCmd = new WorkflowActionGetJPAExecutor(action.getId());
+ JPAService jpaService = Services.get().get(JPAService.class);
+
+ // Starting job
+ new ActionStartXCommand(action.getId(), "map-reduce").call();
+ action = jpaService.execute(wfActionGetCmd);
+ assertEquals(WorkflowAction.Status.RUNNING, action.getStatus());
+ assertEquals(1, queue.size());
+ WorkflowActionEvent event = (WorkflowActionEvent) queue.poll();
+ assertNotNull(event);
+ assertEquals(EventStatus.STARTED, event.getEventStatus());
+ assertEquals(AppType.WORKFLOW_ACTION, event.getAppType());
+ assertEquals(action.getId(), event.getId());
+ assertEquals(job.getUser(), event.getUser());
+ assertEquals(action.getName(), event.getAppName());
+ assertEquals(action.getStartTime(), event.getStartTime());
+ assertEquals(0, queue.size());
+
+ // Suspending job
+ ActionExecutor.Context context = new ActionXCommand.ActionExecutorContext(job, action, false, false);
+ ActionExecutor executor = Services.get().get(ActionService.class).getExecutor(action.getType());
+ ActionCheckXCommandForTest dac = new ActionCheckXCommandForTest(context, executor, action.getId());
+ dac.execute();
+ action = dac.getAction();
+ assertEquals(WorkflowAction.Status.START_MANUAL, action.getStatus());
+ assertEquals(1, queue.size());
+ event = (WorkflowActionEvent) queue.poll();
+ assertNotNull(event);
+ assertEquals(EventStatus.SUSPEND, event.getEventStatus());
+ assertEquals(AppType.WORKFLOW_ACTION, event.getAppType());
+ assertEquals(action.getId(), event.getId());
+ assertEquals(job.getUser(), event.getUser());
+ assertEquals(action.getName(), event.getAppName());
+ assertEquals(0, queue.size());
+
+ // Killing job
+ action.setStatus(WorkflowAction.Status.KILLED);
+ action.setPendingOnly();
+ jpaService.execute(new WorkflowActionUpdateJPAExecutor(action));
+ new ActionKillXCommand(action.getId()).call();
+ action = jpaService.execute(wfActionGetCmd);
+ assertEquals(WorkflowAction.Status.KILLED, action.getStatus());
+ assertEquals(1, queue.size());
+ event = (WorkflowActionEvent) queue.poll();
+ assertNotNull(event);
+ assertEquals(EventStatus.FAILURE, event.getEventStatus());
+ assertEquals(AppType.WORKFLOW_ACTION, event.getAppType());
+ assertEquals(action.getId(), event.getId());
+ assertEquals(job.getUser(), event.getUser());
+ assertEquals(action.getName(), event.getAppName());
+ assertEquals(action.getStartTime(), event.getStartTime());
+ assertEquals(action.getEndTime(), event.getEndTime());
+ assertEquals(0, queue.size());
+
+ }
+
+ @Test
public void testWorkflowJobEventError() throws Exception {
final WorkflowJobBean job = addRecordToWfJobTable(WorkflowJob.Status.FAILED, WorkflowInstance.Status.FAILED);
// create event with error code and message
@@ -398,6 +471,38 @@ public class TestEventGeneration extends
assertEquals(EventStatus.FAILURE, ((JobEvent)queue.poll()).getEventStatus());
}
+ private class ActionCheckXCommandForTest extends ActionCheckXCommand {
+
+ ActionExecutor.Context context;
+ ActionExecutor executor;
+ WorkflowActionBean action;
+ JPAService jpa;
+
+ public ActionCheckXCommandForTest(ActionExecutor.Context context, ActionExecutor executor, String actionId)
+ throws JPAExecutorException {
+ super(actionId);
+ this.context = context;
+ this.executor = executor;
+ jpa = Services.get().get(JPAService.class);
+ this.action = jpa.execute(new WorkflowActionGetJPAExecutor(actionId));
+ }
+
+ @Override
+ public Void execute() throws CommandException {
+ handleNonTransient(context, executor, WorkflowAction.Status.START_MANUAL);
+ action = (WorkflowActionBean) ((ActionExecutorContext) context).getAction();
+ if (!(executor instanceof ControlNodeActionExecutor) && EventHandlerService.isEnabled()) {
+ generateEvent(action, getTestUser());
+ }
+ return null;
+ }
+
+ public WorkflowActionBean getAction() {
+ return action;
+ }
+
+ }
+
private WorkflowJobBean _createWorkflowJob() throws Exception {
LiteWorkflowApp app = new LiteWorkflowApp("my-app", "<workflow-app/>",
new StartNodeDef(TestControlNodeHandler.class, "one"))
@@ -417,7 +522,7 @@ public class TestEventGeneration extends
WorkflowJobInsertJPAExecutor wfInsertCmd = new WorkflowJobInsertJPAExecutor(workflow);
jpaService.execute(wfInsertCmd);
WorkflowActionBean wfAction = addRecordToWfActionTable(workflow.getId(), "one", WorkflowAction.Status.OK,
- executionPath);
+ executionPath, true);
wfAction.setPending();
wfAction.setSignalValue(WorkflowAction.Status.OK.name());
jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
@@ -425,14 +530,6 @@ public class TestEventGeneration extends
return workflow;
}
- private CoordinatorActionEvent _pollQueue() {
- Event e;
- do {
- e = queue.poll();
- } while (!(e instanceof CoordinatorActionEvent));
- return (CoordinatorActionEvent) e;
- }
-
private void _modifyCoordForFailureAction(CoordinatorJobBean coord) throws Exception {
String wfXml = IOUtils.getResourceAsString("wf-invalid-fork.xml", -1);
writeToFile(wfXml, getFsTestCaseDir(), "workflow.xml");
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventQueue.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventQueue.java?rev=1489996&r1=1489995&r2=1489996&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventQueue.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventQueue.java Wed Jun 5 18:34:19 2013
@@ -45,6 +45,8 @@ public class TestEventQueue extends XDat
JMSAccessorService.class.getName() + "," + JMSTopicService.class.getName() + ","
+ EventHandlerService.class.getName() + "," + SLAService.class.getName());
conf.setInt(EventHandlerService.CONF_BATCH_SIZE, 3);
+ conf.set(EventHandlerService.CONF_LISTENERS, ""); // this unit test is meant to
+ // target queue operations only
services.init();
}
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowActionGetJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowActionGetJPAExecutor.java?rev=1489996&r1=1489995&r2=1489996&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowActionGetJPAExecutor.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowActionGetJPAExecutor.java Wed Jun 5 18:34:19 2013
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -66,7 +66,7 @@ public class TestWorkflowActionGetJPAExe
execPath.append("/fork" + i);
}
WorkflowActionBean action = addRecordToWfActionTable(job.getId(), "1", WorkflowAction.Status.PREP, execPath
- .toString());
+ .toString(), false);
_testGetActionWithExecPath(action.getId(), execPath.toString());
}
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/service/TestEventHandlerService.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestEventHandlerService.java?rev=1489996&r1=1489995&r2=1489996&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/service/TestEventHandlerService.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/service/TestEventHandlerService.java Wed Jun 5 18:34:19 2013
@@ -21,6 +21,7 @@ import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.event.BundleJobEvent;
import org.apache.oozie.event.CoordinatorActionEvent;
@@ -142,6 +143,47 @@ public class TestEventHandlerService ext
ehs.new EventWorker().run();
assertTrue(output.toString().contains("Coord Action event FAILURE"));
output.setLength(0);
+
+ /*
+ * Workflow Action events
+ */
+ WorkflowActionEvent event3 = new WorkflowActionEvent("waction-1", "parentid",
+ WorkflowAction.Status.RUNNING, getTestUser(), "myapp", null, null);
+ ehs.queueEvent(event3);
+ ehs.new EventWorker().run();
+ assertTrue(output.toString().contains("Workflow Action event STARTED"));
+ output.setLength(0);
+
+ event3.setStatus(WorkflowAction.Status.START_MANUAL);
+ ehs.queueEvent(event3);
+ ehs.new EventWorker().run();
+ assertTrue(output.toString().contains("Workflow Action event SUSPEND"));
+ output.setLength(0);
+
+ event3.setStatus(WorkflowAction.Status.OK);
+ ehs.queueEvent(event3);
+ ehs.new EventWorker().run();
+ assertTrue(output.toString().contains("Workflow Action event SUCCESS"));
+ output.setLength(0);
+
+ event3.setStatus(WorkflowAction.Status.ERROR);
+ ehs.queueEvent(event3);
+ ehs.new EventWorker().run();
+ assertTrue(output.toString().contains("Workflow Action event FAILURE"));
+ output.setLength(0);
+
+ event3.setStatus(WorkflowAction.Status.KILLED);
+ ehs.queueEvent(event3);
+ ehs.new EventWorker().run();
+ assertTrue(output.toString().contains("Workflow Action event FAILURE"));
+ output.setLength(0);
+
+ event3.setStatus(WorkflowAction.Status.FAILED);
+ ehs.queueEvent(event3);
+ ehs.new EventWorker().run();
+ assertTrue(output.toString().contains("Workflow Action event FAILURE"));
+ output.setLength(0);
+
}
private EventHandlerService _testEventHandlerService() throws Exception {
@@ -171,21 +213,21 @@ public class TestEventHandlerService ext
@Override
public void onCoordinatorJobEvent(CoordinatorJobEvent cje) {
if (cje != null) {
- output.append("Dummy Coord Job event "+cje.getEventStatus());
+ output.append("Dummy Coord Job event " + cje.getEventStatus());
}
}
@Override
public void onCoordinatorActionEvent(CoordinatorActionEvent cae) {
if (cae != null) {
- output.append("Dummy Coord Action event "+cae.getEventStatus());
+ output.append("Dummy Coord Action event " + cae.getEventStatus());
}
}
@Override
public void onBundleJobEvent(BundleJobEvent bje) {
if (bje != null) {
- output.append("Dummy Bundle Job event "+bje.getEventStatus());
+ output.append("Dummy Bundle Job event " + bje.getEventStatus());
}
}
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java?rev=1489996&r1=1489995&r2=1489996&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java Wed Jun 5 18:34:19 2013
@@ -179,7 +179,7 @@ public class TestSLAEventGeneration exte
slaEvent = slas.getSLACalculator().get(jobId);
slaEvent.setEventProcessed(0); //resetting to receive sla events
ehs.new EventWorker().run();
- Thread.sleep(300); //time for event listeners to run
+ Thread.sleep(300); // time for listeners to run
slaEvent = (SLACalcStatus) ehs.getEventQueue().poll();
assertEquals(jobId, slaEvent.getId());
assertNotNull(slaEvent.getActualStart());
@@ -189,8 +189,9 @@ public class TestSLAEventGeneration exte
// test that sla processes the Job Event from Kill command
new KillXCommand(jobId).call();
+ ehs.getEventQueue().poll(); //ignore the wf-action event generated
ehs.new EventWorker().run();
- Thread.sleep(300);
+ Thread.sleep(300); // time for listeners to run
slaEvent = (SLACalcStatus) ehs.getEventQueue().poll();
assertEquals(jobId, slaEvent.getId());
assertNotNull(slaEvent.getActualEnd());
@@ -268,7 +269,6 @@ public class TestSLAEventGeneration exte
slaEvent = slas.getSLACalculator().get(actionId);
slaEvent.setEventProcessed(0); //resetting for testing sla event
ehs.new EventWorker().run();
- Thread.sleep(300); //time for event listeners to run
slaEvent = (SLACalcStatus) ehs.getEventQueue().poll();
assertEquals(actionId, slaEvent.getId());
assertNotNull(slaEvent.getActualStart());
@@ -278,7 +278,6 @@ public class TestSLAEventGeneration exte
// test that sla processes the Job Event from Kill command
new CoordKillXCommand(jobId).call();
ehs.new EventWorker().run();
- Thread.sleep(300); //time for event listeners to run
slaEvent = (SLACalcStatus) ehs.getEventQueue().poll();
assertEquals(actionId, slaEvent.getId());
assertNotNull(slaEvent.getActualEnd());
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAJobEventListener.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAJobEventListener.java?rev=1489996&r1=1489995&r2=1489996&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAJobEventListener.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAJobEventListener.java Wed Jun 5 18:34:19 2013
@@ -30,6 +30,7 @@ import org.apache.oozie.event.Coordinato
import org.apache.oozie.event.CoordinatorJobEvent;
import org.apache.oozie.event.WorkflowActionEvent;
import org.apache.oozie.event.WorkflowJobEvent;
+import org.apache.oozie.event.listener.JobEventListener;
import org.apache.oozie.executor.jpa.sla.SLASummaryGetJPAExecutor;
import org.apache.oozie.service.EventHandlerService;
import org.apache.oozie.service.JPAService;
@@ -54,6 +55,7 @@ public class TestSLAJobEventListener ext
Configuration conf = services.getConf();
conf.set(Services.CONF_SERVICE_EXT_CLASSES, "org.apache.oozie.service.EventHandlerService,"
+ "org.apache.oozie.sla.service.SLAService");
+ conf.setClass(EventHandlerService.CONF_LISTENERS, SLAJobEventListener.class, JobEventListener.class);
services.init();
}
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/test/XDataTestCase.java?rev=1489996&r1=1489995&r2=1489996&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/test/XDataTestCase.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/test/XDataTestCase.java Wed Jun 5 18:34:19 2013
@@ -88,6 +88,7 @@ import org.jdom.JDOMException;
import com.google.common.annotations.VisibleForTesting;
+@SuppressWarnings("deprecation")
public abstract class XDataTestCase extends XHCatTestCase {
protected static String slaXml = " <sla:info xmlns:sla='uri:oozie:sla:0.1'>"
@@ -717,12 +718,17 @@ public abstract class XDataTestCase exte
*/
protected WorkflowActionBean addRecordToWfActionTable(String wfId, String actionName, WorkflowAction.Status status)
throws Exception {
- return addRecordToWfActionTable(wfId, actionName, status, "");
+ return addRecordToWfActionTable(wfId, actionName, status, "", false);
}
protected WorkflowActionBean addRecordToWfActionTable(String wfId, String actionName, WorkflowAction.Status status,
- String execPath) throws Exception {
- WorkflowActionBean action = createWorkflowAction(wfId, actionName, status);
+ boolean pending) throws Exception {
+ return addRecordToWfActionTable(wfId, actionName, status, "", pending);
+ }
+
+ protected WorkflowActionBean addRecordToWfActionTable(String wfId, String actionName, WorkflowAction.Status status,
+ String execPath, boolean pending) throws Exception {
+ WorkflowActionBean action = createWorkflowAction(wfId, actionName, status, pending);
action.setExecutionPath(execPath);
try {
JPAService jpaService = Services.get().get(JPAService.class);
@@ -1127,8 +1133,8 @@ public abstract class XDataTestCase exte
* @return workflow action bean
* @throws Exception thrown if unable to create workflow action bean
*/
- protected WorkflowActionBean createWorkflowAction(String wfId, String actionName, WorkflowAction.Status status)
- throws Exception {
+ protected WorkflowActionBean createWorkflowAction(String wfId, String actionName, WorkflowAction.Status status,
+ boolean pending) throws Exception {
WorkflowActionBean action = new WorkflowActionBean();
action.setName(actionName);
action.setId(Services.get().get(UUIDService.class).generateChildId(wfId, actionName));
@@ -1139,7 +1145,13 @@ public abstract class XDataTestCase exte
action.setStartTime(new Date());
action.setEndTime(new Date());
action.setLastCheckTime(new Date());
- action.resetPendingOnly();
+ action.setCred("null");
+ if (pending) {
+ action.setPendingOnly();
+ }
+ else {
+ action.resetPendingOnly();
+ }
Path inputDir = new Path(getFsTestCaseDir(), "input");
Path outputDir = new Path(getFsTestCaseDir(), "output");
@@ -1163,6 +1175,11 @@ public abstract class XDataTestCase exte
return action;
}
+ protected WorkflowActionBean createWorkflowAction(String wfId, String actionName, WorkflowAction.Status status)
+ throws Exception {
+ return createWorkflowAction(wfId, actionName, status, false);
+ }
+
/**
* Create bundle job bean
*
Modified: oozie/trunk/release-log.txt
URL: http://svn.apache.org/viewvc/oozie/trunk/release-log.txt?rev=1489996&r1=1489995&r2=1489996&view=diff
==============================================================================
--- oozie/trunk/release-log.txt (original)
+++ oozie/trunk/release-log.txt Wed Jun 5 18:34:19 2013
@@ -1,5 +1,6 @@
-- Oozie 4.1.0 release (trunk - unreleased)
+OOZIE-1375 Generate Job notification events for Workflow Actions (mona)
OOZIE-1357 Can't view more than 1000 actions of a coordinator and paging does not work (ryota)
OOZIE-1381 Oozie does not support access to the distributed cache file under different name node (ryota)
OOZIE-1298 TestPartitionDependencyManagerEhcache.testEvictionOnTimeToIdle is flakey (rohini)