You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by mo...@apache.org on 2013/06/29 21:51:28 UTC
svn commit: r1497996 [1/2] - in /oozie/branches/branch-4.0: ./
core/src/main/java/org/apache/oozie/
core/src/main/java/org/apache/oozie/command/
core/src/main/java/org/apache/oozie/command/coord/
core/src/main/java/org/apache/oozie/command/wf/ core/src...
Author: mona
Date: Sat Jun 29 18:30:15 2013
New Revision: 1497996
URL: http://svn.apache.org/r1497996
Log:
OOZIE-1379 Generate SLA end_miss event only after confirming against persistent store (mona)
Added:
oozie/branches/branch-4.0/core/src/test/resources/coord-action-sla1.xml
Removed:
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLASummaryUpdateForSLAStatusJPAExecutor.java
Modified:
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/WorkflowActionBean.java
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/TransitionXCommand.java
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordKillXCommand.java
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/event/BundleJobEvent.java
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/event/CoordinatorActionEvent.java
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/event/CoordinatorJobEvent.java
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/event/WorkflowActionEvent.java
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/event/WorkflowJobEvent.java
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLASummaryUpdateForSLAStatusActualTimesJPAExecutor.java
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/sla/SLACalculator.java
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/sla/SLAOperations.java
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/sla/SLARegistrationBean.java
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java
oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java
oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/jms/TestJMSSLAEventListener.java
oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/servlet/TestV2SLAServlet.java
oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/sla/TestSLACalculationJPAExecutor.java
oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java
oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/sla/TestSLAEmailEventListener.java
oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java
oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/sla/TestSLAJobEventListener.java
oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/sla/TestSLARegistrationGetJPAExecutor.java
oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/sla/TestSLARegistrationGetRecordsOnRestartJPAExecutor.java
oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/sla/TestSLAService.java
oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/sla/TestSLASummaryGetOnRestartJPAExecutor.java
oozie/branches/branch-4.0/release-log.txt
Modified: oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
URL: http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java?rev=1497996&r1=1497995&r2=1497996&view=diff
==============================================================================
--- oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java (original)
+++ oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java Sat Jun 29 18:30:15 2013
@@ -379,6 +379,22 @@ public class CoordinatorActionBean exten
}
/**
+ * Return if the action is complete with failure.
+ *
+ * @return if the action is complete with failure.
+ */
+ public boolean isTerminalWithFailure() {
+ boolean result = false;
+ switch (getStatus()) {
+ case FAILED:
+ case KILLED:
+ case TIMEDOUT:
+ result = true;
+ }
+ return result;
+ }
+
+ /**
* Set some actions are in progress for particular coordinator action.
*
* @param pending set pending to true
Modified: oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/WorkflowActionBean.java
URL: http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/WorkflowActionBean.java?rev=1497996&r1=1497995&r2=1497996&view=diff
==============================================================================
--- oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/WorkflowActionBean.java (original)
+++ oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/WorkflowActionBean.java Sat Jun 29 18:30:15 2013
@@ -281,6 +281,22 @@ public class WorkflowActionBean extends
}
/**
+ * Return if the action is complete with failure.
+ *
+ * @return if the action is complete with failure.
+ */
+ public boolean isTerminalWithFailure() {
+ boolean result = false;
+ switch (getStatus()) {
+ case FAILED:
+ case KILLED:
+ case ERROR:
+ result = true;
+ }
+ return result;
+ }
+
+ /**
* Set the action pending flag to true.
*/
public void setPendingOnly() {
Modified: oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/TransitionXCommand.java
URL: http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/TransitionXCommand.java?rev=1497996&r1=1497995&r2=1497996&view=diff
==============================================================================
--- oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/TransitionXCommand.java (original)
+++ oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/TransitionXCommand.java Sat Jun 29 18:30:15 2013
@@ -25,7 +25,6 @@ import org.apache.oozie.CoordinatorJobBe
import org.apache.oozie.client.Job;
import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.command.coord.CoordinatorXCommand;
-import org.apache.oozie.service.EventHandlerService;
import org.apache.oozie.util.ParamChecker;
/**
Modified: oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordKillXCommand.java
URL: http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordKillXCommand.java?rev=1497996&r1=1497995&r2=1497996&view=diff
==============================================================================
--- oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordKillXCommand.java (original)
+++ oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordKillXCommand.java Sat Jun 29 18:30:15 2013
@@ -17,6 +17,7 @@
*/
package org.apache.oozie.command.coord;
+import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.Job;
import org.apache.oozie.CoordinatorActionBean;
@@ -33,7 +34,6 @@ import org.apache.oozie.executor.jpa.Bul
import org.apache.oozie.executor.jpa.CoordJobGetActionsNotCompletedJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
-import org.apache.oozie.service.EventHandlerService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
import org.apache.oozie.util.LogUtils;
@@ -105,6 +105,7 @@ public class CoordKillXCommand extends K
}
private void updateCoordAction(CoordinatorActionBean action, boolean makePending) {
+ CoordinatorAction.Status prevStatus = action.getStatus();
action.setStatus(CoordinatorActionBean.Status.KILLED);
if (makePending) {
action.incrementAndGetPending();
@@ -112,6 +113,9 @@ public class CoordKillXCommand extends K
// set pending to false
action.setPending(0);
}
+ if (prevStatus != CoordinatorAction.Status.RUNNING && prevStatus != CoordinatorAction.Status.SUSPENDED) {
+ CoordinatorXCommand.generateEvent(action, coordJob.getUser(), coordJob.getAppName(), null);
+ }
action.setLastModifiedTime(new Date());
updateList.add(action);
}
Modified: oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
URL: http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java?rev=1497996&r1=1497995&r2=1497996&view=diff
==============================================================================
--- oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java (original)
+++ oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java Sat Jun 29 18:30:15 2013
@@ -334,8 +334,7 @@ public class ActionStartXCommand extends
if(slaEvent2 != null) {
insertList.add(slaEvent2);
}
- // update coordinator action
- new CoordActionUpdateXCommand(workflow, 3).call();
+
new WfEndXCommand(wfJob).call(); //To delete the WF temp dir
return;
}
Modified: oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/event/BundleJobEvent.java
URL: http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/event/BundleJobEvent.java?rev=1497996&r1=1497995&r2=1497996&view=diff
==============================================================================
--- oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/event/BundleJobEvent.java (original)
+++ oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/event/BundleJobEvent.java Sat Jun 29 18:30:15 2013
@@ -30,6 +30,7 @@ import org.apache.oozie.util.XLog;
* Class implementing JobEvent for events generated by Bundle Jobs
*
*/
+@SuppressWarnings("serial")
public class BundleJobEvent extends JobEvent {
private BundleJob.Status status;
Modified: oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/event/CoordinatorActionEvent.java
URL: http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/event/CoordinatorActionEvent.java?rev=1497996&r1=1497995&r2=1497996&view=diff
==============================================================================
--- oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/event/CoordinatorActionEvent.java (original)
+++ oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/event/CoordinatorActionEvent.java Sat Jun 29 18:30:15 2013
@@ -29,6 +29,7 @@ import org.apache.oozie.util.XLog;
/**
* Class implementing JobEvent for events generated by Coordinator Actions
*/
+@SuppressWarnings("serial")
public class CoordinatorActionEvent extends JobEvent {
private CoordinatorAction.Status status;
Modified: oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/event/CoordinatorJobEvent.java
URL: http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/event/CoordinatorJobEvent.java?rev=1497996&r1=1497995&r2=1497996&view=diff
==============================================================================
--- oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/event/CoordinatorJobEvent.java (original)
+++ oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/event/CoordinatorJobEvent.java Sat Jun 29 18:30:15 2013
@@ -29,6 +29,7 @@ import org.apache.oozie.util.XLog;
/**
* Class implementing JobEvent for events generated by Coordinator Jobs
*/
+@SuppressWarnings("serial")
public class CoordinatorJobEvent extends JobEvent {
private CoordinatorJob.Status status;
Modified: oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/event/WorkflowActionEvent.java
URL: http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/event/WorkflowActionEvent.java?rev=1497996&r1=1497995&r2=1497996&view=diff
==============================================================================
--- oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/event/WorkflowActionEvent.java (original)
+++ oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/event/WorkflowActionEvent.java Sat Jun 29 18:30:15 2013
@@ -29,6 +29,7 @@ import org.apache.oozie.util.XLog;
/**
* Class implementing JobEvent for events generated by Workflow Actions
*/
+@SuppressWarnings("serial")
public class WorkflowActionEvent extends JobEvent {
private WorkflowAction.Status status;
Modified: oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/event/WorkflowJobEvent.java
URL: http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/event/WorkflowJobEvent.java?rev=1497996&r1=1497995&r2=1497996&view=diff
==============================================================================
--- oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/event/WorkflowJobEvent.java (original)
+++ oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/event/WorkflowJobEvent.java Sat Jun 29 18:30:15 2013
@@ -29,6 +29,7 @@ import org.apache.oozie.util.XLog;
/**
* Class implementing JobEvent for events generated by Workflow Jobs
*/
+@SuppressWarnings("serial")
public class WorkflowJobEvent extends JobEvent {
private WorkflowJob.Status status;
Modified: oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLASummaryUpdateForSLAStatusActualTimesJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLASummaryUpdateForSLAStatusActualTimesJPAExecutor.java?rev=1497996&r1=1497995&r2=1497996&view=diff
==============================================================================
--- oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLASummaryUpdateForSLAStatusActualTimesJPAExecutor.java (original)
+++ oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/executor/jpa/sla/SLASummaryUpdateForSLAStatusActualTimesJPAExecutor.java Sat Jun 29 18:30:15 2013
@@ -51,7 +51,7 @@ public class SLASummaryUpdateForSLAStatu
public Void execute(EntityManager em) throws JPAExecutorException {
try {
Query q = em.createNamedQuery("UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES");
- q.setParameter("jobId", slaSummaryBean.getJobId());
+ q.setParameter("jobId", slaSummaryBean.getId());
q.setParameter("slaStatus", slaSummaryBean.getSLAStatusString());
q.setParameter("lastModifiedTS", new Date());
q.setParameter("eventStatus", slaSummaryBean.getEventStatusString());
Modified: oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java
URL: http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java?rev=1497996&r1=1497995&r2=1497996&view=diff
==============================================================================
--- oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java (original)
+++ oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java Sat Jun 29 18:30:15 2013
@@ -51,7 +51,7 @@ public class SLACalcStatus extends SLAEv
reg.setAlertContact(regBean.getAlertContact());
reg.setAlertEvents(regBean.getAlertEvents());
reg.setJobData(regBean.getJobData());
- reg.setJobId(summary.getJobId());
+ reg.setId(summary.getId());
reg.setAppType(summary.getAppType());
reg.setUser(summary.getUser());
reg.setAppName(summary.getAppName());
@@ -106,8 +106,8 @@ public class SLACalcStatus extends SLAEv
return regBean.getId();
}
- public void setJobId(String id) {
- regBean.setJobId(id);
+ public void setId(String id) {
+ regBean.setId(id);
}
@Override
Modified: oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/sla/SLACalculator.java
URL: http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/sla/SLACalculator.java?rev=1497996&r1=1497995&r2=1497996&view=diff
==============================================================================
--- oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/sla/SLACalculator.java (original)
+++ oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/sla/SLACalculator.java Sat Jun 29 18:30:15 2013
@@ -42,13 +42,11 @@ public interface SLACalculator {
boolean addJobStatus(String jobId, String jobStatus, EventStatus jobEventStatus, Date startTime, Date endTime)
throws JPAExecutorException, ServiceException;
- void updateSlaStatus(String jobId) throws JPAExecutorException, ServiceException;
-
void updateAllSlaStatus();
void clear();
- SLACalcStatus get(String jobId);
+ SLACalcStatus get(String jobId) throws JPAExecutorException;
boolean updateRegistration(String jobId, SLARegistrationBean reg) throws JPAExecutorException;
Modified: oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
URL: http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java?rev=1497996&r1=1497995&r2=1497996&view=diff
==============================================================================
--- oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java (original)
+++ oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java Sat Jun 29 18:30:15 2013
@@ -51,7 +51,6 @@ import org.apache.oozie.executor.jpa.sla
import org.apache.oozie.executor.jpa.sla.SLASummaryGetJPAExecutor;
import org.apache.oozie.executor.jpa.sla.SLASummaryGetRecordsOnRestartJPAExecutor;
import org.apache.oozie.executor.jpa.sla.SLASummaryUpdateForSLAStatusActualTimesJPAExecutor;
-import org.apache.oozie.executor.jpa.sla.SLASummaryUpdateForSLAStatusJPAExecutor;
import org.apache.oozie.service.EventHandlerService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.ServiceException;
@@ -94,7 +93,7 @@ public class SLACalculatorMemory impleme
List<SLASummaryBean> summaryBeans = jpaService.execute(new SLASummaryGetRecordsOnRestartJPAExecutor(
modifiedAfter));
for (SLASummaryBean summaryBean : summaryBeans) {
- String jobId = summaryBean.getJobId();
+ String jobId = summaryBean.getId();
try {
switch (summaryBean.getAppType()) {
case COORDINATOR_ACTION:
@@ -248,12 +247,14 @@ public class SLACalculatorMemory impleme
}
@Override
- public SLACalcStatus get(String jobId) {
- return slaMap.get(jobId);
- }
-
- public Map<String, SLACalcStatus> getMap() {
- return slaMap;
+ public SLACalcStatus get(String jobId) throws JPAExecutorException {
+ SLACalcStatus memObj;
+ memObj = slaMap.get(jobId);
+ if (memObj == null && historySet.contains(jobId)) {
+ memObj = new SLACalcStatus(jpaService.execute(new SLASummaryGetJPAExecutor(jobId)),
+ jpaService.execute(new SLARegistrationGetOnRestartJPAExecutor(jobId)));
+ }
+ return memObj;
}
@Override
@@ -275,8 +276,7 @@ public class SLACalculatorMemory impleme
/**
* Invoked via periodic run, update the SLA for registered jobs
*/
- @Override
- public void updateSlaStatus(String jobId) throws JPAExecutorException, ServiceException {
+ protected void updateJobSla(String jobId) throws JPAExecutorException, ServiceException {
SLACalcStatus slaCalc = slaMap.get(jobId);
synchronized (slaCalc) {
boolean change = false;
@@ -297,7 +297,6 @@ public class SLACalculatorMemory impleme
eventProc++; //disable further processing for optional start sla condition
change = true;
}
-
}
if (((eventProc >> 1) & 1) == 0) { // check if second bit (duration-processed) is unset
if (reg.getExpectedDuration() == -1) {
@@ -317,21 +316,30 @@ public class SLACalculatorMemory impleme
if (eventProc < 4) {
if (reg.getExpectedEnd().getTime() + jobEventLatency < Calendar
.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis()) {
- slaCalc.setEventStatus(EventStatus.END_MISS);
- slaCalc.setSLAStatus(SLAStatus.MISS);
+ confirmWithDB(slaCalc);
change = true;
eventHandler.queueEvent(new SLACalcStatus(slaCalc));
eventProc += 4;
}
}
if (change) {
- slaCalc.setEventProcessed(eventProc);
+ if (slaCalc.getEventProcessed() == 8) { //no more processing, no transfer to history set
+ eventProc = slaCalc.getEventProcessed();
+ slaMap.remove(jobId);
+ }
+ else {
+ slaCalc.setEventProcessed(eventProc);
+ }
SLASummaryBean slaSummaryBean = new SLASummaryBean();
- slaSummaryBean.setJobId(slaCalc.getId());
+ slaSummaryBean.setId(slaCalc.getId());
slaSummaryBean.setEventProcessed(eventProc);
slaSummaryBean.setSLAStatus(slaCalc.getSLAStatus());
slaSummaryBean.setEventStatus(slaCalc.getEventStatus());
- jpaService.execute(new SLASummaryUpdateForSLAStatusJPAExecutor(slaSummaryBean));
+ slaSummaryBean.setActualEnd(slaCalc.getActualEnd());
+ slaSummaryBean.setActualStart(slaCalc.getActualStart());
+ slaSummaryBean.setActualDuration(slaCalc.getActualDuration());
+ slaSummaryBean.setJobStatus(slaCalc.getJobStatus());
+ jpaService.execute(new SLASummaryUpdateForSLAStatusActualTimesJPAExecutor(slaSummaryBean));
if (eventProc == 7) {
historySet.add(jobId);
slaMap.remove(jobId);
@@ -352,7 +360,7 @@ public class SLACalculatorMemory impleme
while (iterator.hasNext()) {
String jobId = iterator.next();
try {
- updateSlaStatus(jobId);
+ updateJobSla(jobId);
}
catch (Exception e) {
XLog.getLog(SLAService.class).error("Exception in SLA processing for job [{0}]", jobId, e);
@@ -555,14 +563,7 @@ public class SLACalculatorMemory impleme
//check event proc
byte eventProc = slaCalc.getEventProcessed();
if (((eventProc >> 1) & 1) == 0) {
- if (expectedDuration != -1 && actualDuration > expectedDuration) {
- slaCalc.setEventStatus(EventStatus.DURATION_MISS);
- eventHandler.queueEvent(new SLACalcStatus(slaCalc));
- }
- else if (expectedDuration != -1 && actualDuration <= expectedDuration) {
- slaCalc.setEventStatus(EventStatus.DURATION_MET);
- eventHandler.queueEvent(new SLACalcStatus(slaCalc));
- }
+ processDurationSLA(expectedDuration, actualDuration, slaCalc);
eventProc += 2;
slaCalc.setEventProcessed(eventProc);
}
@@ -613,24 +614,16 @@ public class SLACalculatorMemory impleme
byte eventProc = slaCalc.getEventProcessed();
if (((eventProc >> 1) & 1) == 0) {
- if (expectedDuration != -1 && actualDuration > expectedDuration) {
- slaCalc.setEventStatus(EventStatus.DURATION_MISS);
- eventHandler.queueEvent(new SLACalcStatus(slaCalc));
- }
- else if (expectedDuration != -1 && actualDuration <= expectedDuration) {
- slaCalc.setEventStatus(EventStatus.DURATION_MET);
- eventHandler.queueEvent(new SLACalcStatus(slaCalc));
- }
+ processDurationSLA(expectedDuration, actualDuration, slaCalc);
eventProc += 2;
slaCalc.setEventProcessed(eventProc);
}
-
if (eventProc < 4) {
slaCalc.setEventStatus(EventStatus.END_MISS);
slaCalc.setSLAStatus(SLAStatus.MISS);
eventProc += 4;
slaCalc.setEventProcessed(eventProc);
- eventHandler.queueEvent(new SLACalcStatus(slaCalc));
+ eventHandler.queueEvent(new SLACalcStatus(slaCalc));
}
return getSLASummaryBean(slaCalc);
}
@@ -643,9 +636,109 @@ public class SLACalculatorMemory impleme
slaSummaryBean.setSLAStatus(slaCalc.getSLAStatus());
slaSummaryBean.setEventStatus(slaCalc.getEventStatus());
slaSummaryBean.setEventProcessed(slaCalc.getEventProcessed());
- slaSummaryBean.setJobId(slaCalc.getId());
+ slaSummaryBean.setId(slaCalc.getId());
slaSummaryBean.setJobStatus(slaCalc.getJobStatus());
return slaSummaryBean;
}
+ private void processDurationSLA(long expected, long actual, SLACalcStatus slaCalc) {
+ if (expected != -1 && actual > expected) {
+ slaCalc.setEventStatus(EventStatus.DURATION_MISS);
+ eventHandler.queueEvent(new SLACalcStatus(slaCalc));
+ }
+ else if (expected != -1 && actual <= expected) {
+ slaCalc.setEventStatus(EventStatus.DURATION_MET);
+ eventHandler.queueEvent(new SLACalcStatus(slaCalc));
+ }
+ }
+
+ /*
+ * Confirm END_MISS alert against source of truth - DB
+ */
+ private void confirmWithDB(SLACalcStatus slaCalc) {
+ boolean isMiss = false, ended = false;
+ try {
+ switch (slaCalc.getAppType()) {
+ case WORKFLOW_JOB:
+ WorkflowJobBean wf = jpaService.execute(new WorkflowJobGetForSLAJPAExecutor(slaCalc.getId()));
+ if (wf.getEndTime() == null) {
+ isMiss = true;
+ }
+ else {
+ ended = true;
+ if (wf.getStatus() == WorkflowJob.Status.KILLED || wf.getStatus() == WorkflowJob.Status.FAILED
+ || wf.getEndTime().getTime() > slaCalc.getExpectedEnd().getTime()) {
+ isMiss = true;
+ }
+ }
+ slaCalc.setActualStart(wf.getStartTime());
+ slaCalc.setActualEnd(wf.getEndTime());
+ slaCalc.setJobStatus(wf.getStatusStr());
+ break;
+ case WORKFLOW_ACTION:
+ WorkflowActionBean wa = jpaService.execute(new WorkflowActionGetForSLAJPAExecutor(slaCalc.getId()));
+ if (wa.getEndTime() == null) {
+ isMiss = true;
+ }
+ else {
+ ended = true;
+ if (wa.isTerminalWithFailure()
+ || wa.getEndTime().getTime() > slaCalc.getExpectedEnd().getTime()) {
+ isMiss = true;
+ }
+ }
+ slaCalc.setActualStart(wa.getStartTime());
+ slaCalc.setActualEnd(wa.getEndTime());
+ slaCalc.setJobStatus(wa.getStatusStr());
+ break;
+ case COORDINATOR_ACTION:
+ CoordinatorActionBean ca = jpaService.execute(new CoordActionGetForSLAJPAExecutor(slaCalc.getId()));
+ if (ca.isTerminalWithFailure()) {
+ isMiss = ended = true;
+ slaCalc.setActualStart(null);
+ slaCalc.setActualEnd(ca.getLastModifiedTime());
+ }
+ if (ca.getExternalId() != null) {
+ wf = jpaService.execute(new WorkflowJobGetForSLAJPAExecutor(ca.getExternalId()));
+ if (wf.getEndTime() == null) {
+ isMiss = true;
+ }
+ else {
+ ended = true;
+ if (wf.getEndTime().getTime() > slaCalc.getExpectedEnd().getTime()) {
+ isMiss = true;
+ }
+ }
+ slaCalc.setActualEnd(wf.getEndTime());
+ slaCalc.setActualStart(wf.getStartTime());
+ }
+ slaCalc.setJobStatus(ca.getStatusStr());
+ break;
+ default:
+ XLog.getLog(SLAService.class).debug("Unsupported App-type for SLA - " + slaCalc.getAppType());
+ }
+ if (ended) {
+ slaCalc.setActualDuration(slaCalc.getActualEnd().getTime() - slaCalc.getActualStart().getTime());
+ processDurationSLA(slaCalc.getExpectedDuration(), slaCalc.getActualDuration(), slaCalc);
+ slaCalc.setEventProcessed(8);
+ }
+ if (isMiss) {
+ slaCalc.setEventStatus(EventStatus.END_MISS);
+ slaCalc.setSLAStatus(SLAStatus.MISS);
+ }
+ else {
+ slaCalc.setEventStatus(EventStatus.END_MET);
+ slaCalc.setSLAStatus(SLAStatus.MET);
+ }
+ }
+ catch (Exception e) {
+ XLog.getLog(SLAService.class).warn(
+ "Error while confirming End_miss against DB: " + e
+ + ". Setting END_MISS since time limit has been exceeded");
+ slaCalc.setEventStatus(EventStatus.END_MISS);
+ slaCalc.setSLAStatus(SLAStatus.MISS);
+ slaCalc.setEventProcessed(8);
+ }
+ }
+
}
Modified: oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/sla/SLAOperations.java
URL: http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/sla/SLAOperations.java?rev=1497996&r1=1497995&r2=1497996&view=diff
==============================================================================
--- oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/sla/SLAOperations.java (original)
+++ oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/sla/SLAOperations.java Sat Jun 29 18:30:15 2013
@@ -132,7 +132,7 @@ public class SLAOperations {
sla.setUpstreamApps(getTagElement(eSla, "upstream-apps"));
// Oozie defined
- sla.setJobId(jobId);
+ sla.setId(jobId);
sla.setAppType(appType);
sla.setAppName(appName);
sla.setUser(user);
Modified: oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/sla/SLARegistrationBean.java
URL: http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/sla/SLARegistrationBean.java?rev=1497996&r1=1497995&r2=1497996&view=diff
==============================================================================
--- oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/sla/SLARegistrationBean.java (original)
+++ oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/sla/SLARegistrationBean.java Sat Jun 29 18:30:15 2013
@@ -128,7 +128,7 @@ public class SLARegistrationBean impleme
return jobId;
}
- public void setJobId(String jobId) {
+ public void setId(String jobId) {
this.jobId = jobId;
}
Modified: oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java
URL: http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java?rev=1497996&r1=1497995&r2=1497996&view=diff
==============================================================================
--- oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java (original)
+++ oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java Sat Jun 29 18:30:15 2013
@@ -135,7 +135,7 @@ public class SLASummaryBean implements J
public SLASummaryBean(SLACalcStatus slaCalc) {
SLARegistrationBean reg = slaCalc.getSLARegistrationBean();
- setJobId(slaCalc.getId());
+ setId(slaCalc.getId());
setAppName(reg.getAppName());
setAppType(reg.getAppType());
setNominalTime(reg.getNominalTime());
@@ -154,11 +154,11 @@ public class SLASummaryBean implements J
setActualStart(slaCalc.getActualStart());
}
- public String getJobId() {
+ public String getId() {
return jobId;
}
- public void setJobId(String jobId) {
+ public void setId(String jobId) {
this.jobId = jobId;
}
Modified: oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java
URL: http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java?rev=1497996&r1=1497995&r2=1497996&view=diff
==============================================================================
--- oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java (original)
+++ oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java Sat Jun 29 18:30:15 2013
@@ -69,6 +69,7 @@ import org.apache.oozie.executor.jpa.Coo
import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowActionInsertJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowActionUpdateJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobInsertJPAExecutor;
@@ -77,6 +78,7 @@ import org.apache.oozie.service.ActionSe
import org.apache.oozie.service.EventHandlerService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
+import org.apache.oozie.service.UUIDService;
import org.apache.oozie.test.XDataTestCase;
import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.IOUtils;
@@ -102,6 +104,7 @@ public class TestEventGeneration extends
EventQueue queue;
Services services;
EventHandlerService ehs;
+ JPAService jpaService;
@Override
@Before
@@ -113,6 +116,7 @@ public class TestEventGeneration extends
services.init();
ehs = services.get(EventHandlerService.class);
queue = ehs.getEventQueue();
+ jpaService = services.get(JPAService.class);
}
@Override
@@ -126,7 +130,6 @@ public class TestEventGeneration extends
public void testWorkflowJobEvent() throws Exception {
assertEquals(0, queue.size());
WorkflowJobBean job = addRecordToWfJobTable(WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP);
- JPAService jpaService = services.get(JPAService.class);
// Starting job
new StartXCommand(job.getId()).call();
@@ -221,7 +224,6 @@ public class TestEventGeneration extends
CoordinatorJobBean coord = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, false,
false, 0);
modifyCoordForRunning(coord);
- final JPAService jpaService = services.get(JPAService.class);
// Action WAITING on materialization
new CoordMaterializeTransitionXCommand(coord.getId(), 3600).call();
@@ -356,7 +358,6 @@ public class TestEventGeneration extends
WorkflowJobBean job = this.addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING);
WorkflowActionBean action = this.addRecordToWfActionTable(job.getId(), "1", WorkflowAction.Status.PREP, true);
WorkflowActionGetJPAExecutor wfActionGetCmd = new WorkflowActionGetJPAExecutor(action.getId());
- JPAService jpaService = Services.get().get(JPAService.class);
// Starting job
new ActionStartXCommand(action.getId(), "map-reduce").call();
@@ -441,7 +442,6 @@ public class TestEventGeneration extends
CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0);
WorkflowJobBean wjb = new WorkflowJobBean();
wjb.setId(action.getExternalId());
- JPAService jpaService = services.get(JPAService.class);
jpaService.execute(new WorkflowJobUpdateJPAExecutor(wjb));
CoordinatorXCommand<Void> myCmd = new CoordActionCheckXCommand(action.getId(), 0) {
@@ -493,7 +493,6 @@ public class TestEventGeneration extends
final String jobId1 = engine.submitJob(conf, true);
final WorkflowJobGetJPAExecutor readCmd = new WorkflowJobGetJPAExecutor(jobId1);
- final JPAService jpaService = services.get(JPAService.class);
waitFor(1 * 100, new Predicate() {
@Override
@@ -511,7 +510,7 @@ public class TestEventGeneration extends
Date endTime = DateUtils.parseDateOozieTZ("2009-02-02T23:59Z");
CoordinatorJobBean coord = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, false,
false, 0);
- _modifyCoordForFailureAction(coord);
+ _modifyCoordForFailureAction(coord, "wf-invalid-fork.xml");
new CoordMaterializeTransitionXCommand(coord.getId(), 3600).call();
final CoordJobGetJPAExecutor readCmd1 = new CoordJobGetJPAExecutor(coord.getId());
waitFor(1 * 100, new Predicate() {
@@ -525,6 +524,40 @@ public class TestEventGeneration extends
assertEquals(2, queue.size());
assertEquals(EventStatus.WAITING, ((JobEvent)queue.poll()).getEventStatus());
assertEquals(EventStatus.FAILURE, ((JobEvent)queue.poll()).getEventStatus());
+
+ // test coordinator action events (failure from ActionStartX)
+ ehs.getAppTypes().add("workflow_action");
+ coord = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, false, false, 0);
+ CoordinatorActionBean action = addRecordToCoordActionTable(coord.getId(), 1, CoordinatorAction.Status.RUNNING,
+ "coord-action-sla1.xml", 0);
+ WorkflowJobBean wf = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING,
+ action.getId());
+ action.setExternalId(wf.getId());
+ jpaService.execute(new CoordActionUpdateJPAExecutor(action));
+
+ String waId = _createWorkflowAction(wf.getId(), "wf-action");
+ new ActionStartXCommand(waId, action.getType()).call();
+
+ final WorkflowJobGetJPAExecutor readCmd2 = new WorkflowJobGetJPAExecutor(jobId1);
+ waitFor(1 * 100, new Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return jpaService.execute(readCmd2).getStatus() == WorkflowJob.Status.KILLED;
+ }
+ });
+ assertEquals(3, queue.size());
+ JobEvent wfActionEvent = (JobEvent) queue.poll();
+ assertEquals(EventStatus.FAILURE, wfActionEvent.getEventStatus());
+ assertEquals(waId, wfActionEvent.getId());
+ assertEquals(AppType.WORKFLOW_ACTION, wfActionEvent.getAppType());
+ JobEvent wfJobEvent = (JobEvent) queue.poll();
+ assertEquals(EventStatus.FAILURE, wfJobEvent.getEventStatus());
+ assertEquals(wf.getId(), wfJobEvent.getId());
+ assertEquals(AppType.WORKFLOW_JOB, wfJobEvent.getAppType());
+ JobEvent coordActionEvent = (JobEvent) queue.poll();
+ assertEquals(EventStatus.FAILURE, coordActionEvent.getEventStatus());
+ assertEquals(action.getId(), coordActionEvent.getId());
+ assertEquals(AppType.COORDINATOR_ACTION, coordActionEvent.getAppType());
}
private class ActionCheckXCommandForTest extends ActionCheckXCommand {
@@ -573,7 +606,6 @@ public class TestEventGeneration extends
WorkflowInstance.Status.PREP);
String executionPath = "/";
- JPAService jpaService = services.get(JPAService.class);
assertNotNull(jpaService);
WorkflowJobInsertJPAExecutor wfInsertCmd = new WorkflowJobInsertJPAExecutor(workflow);
jpaService.execute(wfInsertCmd);
@@ -586,12 +618,33 @@ public class TestEventGeneration extends
return workflow;
}
- private void _modifyCoordForFailureAction(CoordinatorJobBean coord) throws Exception {
- String wfXml = IOUtils.getResourceAsString("wf-invalid-fork.xml", -1);
+ private void _modifyCoordForFailureAction(CoordinatorJobBean coord, String resourceXml) throws Exception {
+ String wfXml = IOUtils.getResourceAsString(resourceXml, -1);
writeToFile(wfXml, getFsTestCaseDir(), "workflow.xml");
String coordXml = coord.getJobXml();
coord.setJobXml(coordXml.replace("hdfs:///tmp/workflows/", getFsTestCaseDir() + "/workflow.xml"));
- services.get(JPAService.class).execute(new CoordJobUpdateJPAExecutor(coord));
+ jpaService.execute(new CoordJobUpdateJPAExecutor(coord));
+ }
+
+ private String _createWorkflowAction(String wfId, String actionName) throws JPAExecutorException {
+ WorkflowActionBean action = new WorkflowActionBean();
+ action.setName(actionName);
+ action.setId(Services.get().get(UUIDService.class).generateChildId(wfId, actionName));
+ action.setJobId(wfId);
+ action.setType("java");
+ action.setTransition("transition");
+ action.setStatus(WorkflowAction.Status.PREP);
+ action.setStartTime(new Date());
+ action.setEndTime(new Date());
+ action.setLastCheckTime(new Date());
+ action.setCred("null");
+ action.setPendingOnly();
+
+ String actionXml = "<java>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>"
+ + getNameNodeUri() + "</name-node>" + "<main-class>" + "${dummy}" + "</java>";
+ action.setConf(actionXml);
+ jpaService.execute(new WorkflowActionInsertJPAExecutor(action));
+ return action.getId();
}
}
Modified: oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/jms/TestJMSSLAEventListener.java
URL: http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/jms/TestJMSSLAEventListener.java?rev=1497996&r1=1497995&r2=1497996&view=diff
==============================================================================
--- oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/jms/TestJMSSLAEventListener.java (original)
+++ oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/jms/TestJMSSLAEventListener.java Sat Jun 29 18:30:15 2013
@@ -84,7 +84,7 @@ public class TestJMSSLAEventListener ext
SLACalcStatus startMiss = new SLACalcStatus(new SLARegistrationBean());
SLARegistrationBean startMissBean = startMiss.getSLARegistrationBean();
Date startDate = DateUtils.parseDateUTC("2013-01-01T00:00Z");
- startMiss.setJobId("0000000-000000000000001-oozie-wrkf-C@1");
+ startMiss.setId("0000000-000000000000001-oozie-wrkf-C@1");
startMissBean.setParentId("0000000-000000000000001-oozie-wrkf-C");
startMissBean.setAppName("Test-SLA-Start-Miss");
startMissBean.setUser("dummyuser");
@@ -124,7 +124,7 @@ public class TestJMSSLAEventListener ext
SLARegistrationBean endMissBean = endMiss.getSLARegistrationBean();
Date expectedEndDate = DateUtils.parseDateUTC("2013-01-01T00:00Z");
Date actualEndDate = DateUtils.parseDateUTC("2013-01-01T01:00Z");
- endMiss.setJobId("0000000-000000000000001-oozie-wrkf-C@1");
+ endMiss.setId("0000000-000000000000001-oozie-wrkf-C@1");
endMissBean.setParentId("0000000-000000000000001-oozie-wrkf-C");
endMissBean.setAppName("Test-SLA-End-Miss");
endMiss.setEventStatus(EventStatus.END_MISS);
@@ -167,7 +167,7 @@ public class TestJMSSLAEventListener ext
Date expectedEndDate = DateUtils.parseDateUTC("2013-01-01T12:00Z");
Date actualEndDate = DateUtils.parseDateUTC("2013-01-01T14:00Z");
long expectedDuration = ( expectedEndDate.getTime() - actualStartDate.getTime() ) / (1000 * 60);
- durationMiss.setJobId("0000000-000000000000001-oozie-wrkf-C@1");
+ durationMiss.setId("0000000-000000000000001-oozie-wrkf-C@1");
durationMissBean.setParentId("0000000-000000000000001-oozie-wrkf-C");
durationMissBean.setAppName("Test-SLA-Duration-Miss");
durationMiss.setEventStatus(EventStatus.DURATION_MISS);
@@ -214,7 +214,7 @@ public class TestJMSSLAEventListener ext
slaListener.init(conf);
SLACalcStatus startMiss = new SLACalcStatus(new SLARegistrationBean());
SLARegistrationBean startMissBean = startMiss.getSLARegistrationBean();
- startMiss.setJobId("0000000-000000000000001-oozie-wrkf-C@1");
+ startMiss.setId("0000000-000000000000001-oozie-wrkf-C@1");
startMissBean.setAppName("Test-SLA-Start-Miss");
startMissBean.setAppType(AppType.COORDINATOR_ACTION);
startMissBean.setUser("dummyuser");
@@ -242,7 +242,7 @@ public class TestJMSSLAEventListener ext
SLACalcStatus startMiss = new SLACalcStatus(new SLARegistrationBean());
SLARegistrationBean startMissBean = startMiss.getSLARegistrationBean();
- startMiss.setJobId("0000000-000000000000001-oozie-wrkf-C@1");
+ startMiss.setId("0000000-000000000000001-oozie-wrkf-C@1");
startMissBean.setAppName("Test-SLA-Start-Miss");
startMissBean.setAppType(AppType.COORDINATOR_ACTION);
startMissBean.setUser("dummyuser");
@@ -273,7 +273,7 @@ public class TestJMSSLAEventListener ext
startMet.setEventStatus(EventStatus.START_MET);
startMet.setSLAStatus(SLAStatus.IN_PROCESS);
startMetBean.setAppType(AppType.COORDINATOR_ACTION);
- startMet.setJobId("0000000-000000000000001-oozie-wrkf-C@1");
+ startMet.setId("0000000-000000000000001-oozie-wrkf-C@1");
startMetBean.setParentId("0000000-000000000000001-oozie-wrkf-C");
startMetBean.setUser("dummyuser");
startMetBean.setNotificationMsg("notification of start miss");
@@ -308,7 +308,7 @@ public class TestJMSSLAEventListener ext
SLARegistrationBean endMetBean = endMet.getSLARegistrationBean();
Date expectedEndDate = DateUtils.parseDateUTC("2013-01-01T12:00Z");
Date actualEndDate = DateUtils.parseDateUTC("2013-01-01T11:00Z");
- endMet.setJobId("0000000-000000000000001-oozie-wrkf-C@1");
+ endMet.setId("0000000-000000000000001-oozie-wrkf-C@1");
endMetBean.setParentId("0000000-000000000000001-oozie-wrkf-C");
endMetBean.setAppName("Test-SLA-End-Met");
endMet.setEventStatus(EventStatus.END_MET);
@@ -352,7 +352,7 @@ public class TestJMSSLAEventListener ext
Date expectedEndDate = DateUtils.parseDateUTC("2013-01-01T12:00Z");
Date actualEndDate = DateUtils.parseDateUTC("2013-01-01T14:00Z");
long expectedDuration = ( expectedEndDate.getTime() - actualStartDate.getTime() ) / (1000 * 60);
- durationMet.setJobId("0000000-000000000000001-oozie-wrkf-C@1");
+ durationMet.setId("0000000-000000000000001-oozie-wrkf-C@1");
durationMetBean.setParentId("0000000-000000000000001-oozie-wrkf-C");
durationMetBean.setAppName("Test-SLA-Duration-Met");
durationMet.setEventStatus(EventStatus.DURATION_MET);
Modified: oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/servlet/TestV2SLAServlet.java
URL: http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/servlet/TestV2SLAServlet.java?rev=1497996&r1=1497995&r2=1497996&view=diff
==============================================================================
--- oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/servlet/TestV2SLAServlet.java (original)
+++ oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/servlet/TestV2SLAServlet.java Sat Jun 29 18:30:15 2013
@@ -167,7 +167,7 @@ public class TestV2SLAServlet extends Da
Calendar actualEnd = (Calendar) expectedEnd.clone();
actualEnd.add(Calendar.MINUTE, i);
SLASummaryBean bean = new SLASummaryBean();
- bean.setJobId(jobIDPrefix + i + jobIDSuffix);
+ bean.setId(jobIDPrefix + i + jobIDSuffix);
bean.setParentId(parentId);
bean.setAppName(appName);
bean.setAppType(appType);
Modified: oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/sla/TestSLACalculationJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/sla/TestSLACalculationJPAExecutor.java?rev=1497996&r1=1497995&r2=1497996&view=diff
==============================================================================
--- oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/sla/TestSLACalculationJPAExecutor.java (original)
+++ oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/sla/TestSLACalculationJPAExecutor.java Sat Jun 29 18:30:15 2013
@@ -92,7 +92,7 @@ public class TestSLACalculationJPAExecut
SLASummaryGetJPAExecutor readCmd2 = new SLASummaryGetJPAExecutor(wfId);
SLASummaryBean sBean = jpaService.execute(readCmd2);
- assertEquals(wfId, sBean.getJobId());
+ assertEquals(wfId, sBean.getId());
assertEquals("RUNNING", sBean.getJobStatus());
assertEquals(EventStatus.START_MISS, sBean.getEventStatus());
assertEquals(expStart, sBean.getExpectedStart());
@@ -144,7 +144,7 @@ public class TestSLACalculationJPAExecut
SLASummaryGetJPAExecutor readCmd2 = new SLASummaryGetJPAExecutor(wfId);
SLASummaryBean sBean = jpaService.execute(readCmd2);
// check updated + original fields
- assertEquals(wfId, sBean.getJobId());
+ assertEquals(wfId, sBean.getId());
assertEquals(EventStatus.DURATION_MISS, sBean.getEventStatus());
assertEquals(expStart, sBean.getExpectedStart());
assertEquals(expEnd, sBean.getExpectedEnd());
@@ -179,7 +179,7 @@ public class TestSLACalculationJPAExecut
// update existing record and insert another
Date newDate = new Date();
bean1 = new SLASummaryBean();
- bean1.setJobId(wfId1);
+ bean1.setId(wfId1);
bean1.setActualEnd(newDate);
List<JsonBean> updateList = new ArrayList<JsonBean>();
updateList.add(bean1);
@@ -221,7 +221,7 @@ public class TestSLACalculationJPAExecut
private SLASummaryBean _createSLASummaryBean(String jobId, String status, EventStatus slaType, Date eStart,
Date eEnd, long eDur, Date aStart, Date aEnd, long aDur, int slaProc, Date lastMod) {
SLASummaryBean bean = new SLASummaryBean();
- bean.setJobId(jobId);
+ bean.setId(jobId);
bean.setJobStatus(status);
bean.setEventStatus(slaType);
bean.setExpectedStart(eStart);
Modified: oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java
URL: http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java?rev=1497996&r1=1497995&r2=1497996&view=diff
==============================================================================
--- oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java (original)
+++ oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java Sat Jun 29 18:30:15 2013
@@ -44,6 +44,7 @@ import org.apache.oozie.service.EventHan
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.workflow.WorkflowInstance;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -158,7 +159,7 @@ public class TestSLACalculatorMemory ext
assertEquals(5, calc.getEventProcessed());
assertEquals(6, slaCalcMemory.get(jobId2).getEventProcessed());
// jobId3 should be in history set as eventprocessed is 7 (111)
- assertNull(slaCalcMemory.get(jobId3));
+ assertEquals(2, slaCalcMemory.size()); // 2 out of 3 jobs in map
slaCalcMemory.addJobStatus(jobId3, WorkflowJob.Status.SUCCEEDED.toString(), EventStatus.SUCCESS,
sdf.parse("2011-03-09"), sdf.parse("2011-04-09"));
SLASummaryBean slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId3));
@@ -207,7 +208,7 @@ public class TestSLACalculatorMemory ext
// As job succeeded, it should not be in memory
assertEquals(0, slaCalcMemory.size());
slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId1));
- assertEquals("job-1", slaSummary.getJobId());
+ assertEquals("job-1", slaSummary.getId());
assertEquals(8, slaSummary.getEventProcessed());
assertEquals(AppType.WORKFLOW_JOB, slaSummary.getAppType());
assertEquals("SUCCEEDED", slaSummary.getJobStatus());
@@ -305,7 +306,7 @@ public class TestSLACalculatorMemory ext
// As job succeeded, it should not be in memory
assertEquals(0, slaCalcMemory.size());
SLASummaryBean slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId1));
- assertEquals("job-1", slaSummary.getJobId());
+ assertEquals("job-1", slaSummary.getId());
assertEquals(8, slaSummary.getEventProcessed());
assertEquals(AppType.WORKFLOW_ACTION, slaSummary.getAppType());
assertEquals("OK", slaSummary.getJobStatus());
@@ -357,7 +358,7 @@ public class TestSLACalculatorMemory ext
// As job succeeded, it should not be in memory
assertEquals(0, slaCalcMemory.size());
SLASummaryBean slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId1));
- assertEquals("job-1", slaSummary.getJobId());
+ assertEquals("job-1", slaSummary.getId());
assertEquals(8, slaSummary.getEventProcessed());
assertEquals(AppType.COORDINATOR_ACTION, slaSummary.getAppType());
assertEquals("FAILED", slaSummary.getJobStatus());
@@ -373,18 +374,18 @@ public class TestSLACalculatorMemory ext
SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
EventHandlerService ehs = Services.get().get(EventHandlerService.class);
slaCalcMemory.init(new Configuration(false));
- SLARegistrationBean slaRegBean = _createSLARegistration("job-1", AppType.WORKFLOW_JOB);
- slaRegBean.setExpectedStart(new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000)); // 1
+ WorkflowJobBean job1 = addRecordToWfJobTable(WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP);
+ SLARegistrationBean slaRegBean = _createSLARegistration(job1.getId(), AppType.WORKFLOW_JOB);
+ slaRegBean.setExpectedStart(new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000)); // 1 hour
slaRegBean.setExpectedDuration(2 * 3600 * 1000);
- slaRegBean.setExpectedEnd(new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000)); // 1
- // hour
+ slaRegBean.setExpectedEnd(new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000)); // 1 hour
String jobId = slaRegBean.getId();
slaCalcMemory.addRegistration(jobId, slaRegBean);
assertEquals(1, slaCalcMemory.size());
SLASummaryBean slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
assertEquals(SLAStatus.NOT_STARTED, slaSummary.getSLAStatus());
assertEquals("PREP", slaSummary.getJobStatus());
- slaCalcMemory.updateSlaStatus(jobId);
+ slaCalcMemory.updateJobSla(jobId);
assertEquals(2, ehs.getEventQueue().size());
slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
// both start miss and end miss (101)
@@ -431,7 +432,7 @@ public class TestSLACalculatorMemory ext
String jobId = slaRegBean.getId();
slaCalcMemory.addRegistration(jobId, slaRegBean);
assertEquals(1, slaCalcMemory.size());
- slaCalcMemory.updateSlaStatus(jobId);
+ slaCalcMemory.updateJobSla(jobId);
SLASummaryBean slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
// Duration bit should be processed as expected duration is not set
assertEquals(3, slaSummary.getEventProcessed());
@@ -497,13 +498,13 @@ public class TestSLACalculatorMemory ext
// ahead
String jobId = slaRegBean.getId();
slaCalcMemory.addRegistration(slaRegBean.getId(), slaRegBean);
- slaCalcMemory.updateSlaStatus(jobId);
+ slaCalcMemory.updateJobSla(jobId);
SLASummaryBean slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
assertEquals(1, slaSummary.getEventProcessed());
assertEquals(SLAStatus.NOT_STARTED, slaSummary.getSLAStatus());
slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.RUNNING.toString(), EventStatus.STARTED,
new Date(System.currentTimeMillis()), null);
- slaCalcMemory.updateSlaStatus(jobId);
+ slaCalcMemory.updateJobSla(jobId);
slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
assertEquals(1, slaSummary.getEventProcessed());
assertEquals(SLAStatus.IN_PROCESS, slaSummary.getSLAStatus());
@@ -516,7 +517,8 @@ public class TestSLACalculatorMemory ext
EventHandlerService ehs = Services.get().get(EventHandlerService.class);
SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
slaCalcMemory.init(new Configuration(false));
- SLARegistrationBean slaRegBean = _createSLARegistration("job-1", AppType.WORKFLOW_JOB);
+ WorkflowJobBean job1 = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING);
+ SLARegistrationBean slaRegBean = _createSLARegistration(job1.getId(), AppType.WORKFLOW_JOB);
Date startTime = new Date(System.currentTimeMillis() + 1 * 1 * 3600 * 1000); // 1 hour ahead
slaRegBean.setExpectedStart(startTime);
slaRegBean.setExpectedDuration(3600 * 1000);
@@ -525,11 +527,11 @@ public class TestSLACalculatorMemory ext
// back
String jobId = slaRegBean.getId();
slaCalcMemory.addRegistration(slaRegBean.getId(), slaRegBean);
- slaCalcMemory.updateSlaStatus(jobId);
+ slaCalcMemory.updateJobSla(jobId);
SLASummaryBean slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
// Only end sla should be processed (100)
assertEquals(4, slaSummary.getEventProcessed());
- slaCalcMemory.updateSlaStatus(jobId);
+ slaCalcMemory.updateJobSla(jobId);
slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
assertEquals(4, slaSummary.getEventProcessed());
assertEquals(SLAStatus.MISS, slaSummary.getSLAStatus());
@@ -556,17 +558,18 @@ public class TestSLACalculatorMemory ext
EventHandlerService ehs = Services.get().get(EventHandlerService.class);
SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
slaCalcMemory.init(new Configuration(false));
- SLARegistrationBean slaRegBean = _createSLARegistration("job-1", AppType.WORKFLOW_JOB);
+ WorkflowJobBean job1 = addRecordToWfJobTable(WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP);
+ SLARegistrationBean slaRegBean = _createSLARegistration(job1.getId(), AppType.WORKFLOW_JOB);
Date startTime = new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000);
slaRegBean.setExpectedStart(startTime); // 1 hour back
slaRegBean.setExpectedDuration(1000);
slaRegBean.setExpectedEnd(new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000));
String jobId = slaRegBean.getId();
slaCalcMemory.addRegistration(slaRegBean.getId(), slaRegBean);
- slaCalcMemory.updateSlaStatus(jobId);
+ slaCalcMemory.updateJobSla(jobId);
slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.RUNNING.toString(), EventStatus.STARTED, new Date(
System.currentTimeMillis() - 3600 * 1000), null);
- slaCalcMemory.updateSlaStatus(jobId);
+ slaCalcMemory.updateJobSla(jobId);
SLASummaryBean slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId));
// The actual end times are not stored, but sla's processed so (111)
assertEquals(7, slaSummary.getEventProcessed());
@@ -584,7 +587,7 @@ public class TestSLACalculatorMemory ext
private SLARegistrationBean _createSLARegistration(String jobId, AppType appType) {
SLARegistrationBean bean = new SLARegistrationBean();
- bean.setJobId(jobId);
+ bean.setId(jobId);
bean.setAppType(appType);
return bean;
}
Modified: oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/sla/TestSLAEmailEventListener.java
URL: http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/sla/TestSLAEmailEventListener.java?rev=1497996&r1=1497995&r2=1497996&view=diff
==============================================================================
--- oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/sla/TestSLAEmailEventListener.java (original)
+++ oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/sla/TestSLAEmailEventListener.java Sat Jun 29 18:30:15 2013
@@ -89,7 +89,7 @@ public class TestSLAEmailEventListener e
Date actualstartDate = DateUtils.parseDateUTC("2013-01-01T01:00Z");
event.setEventStatus(EventStatus.START_MISS);
event.setJobStatus(JobEvent.EventStatus.STARTED.toString());
- event.setJobId("0000000-000000000000001-oozie-wrkf-C@1");
+ event.setId("0000000-000000000000001-oozie-wrkf-C@1");
eventBean.setParentId("0000000-000000000000001-oozie-wrkf-C");
eventBean.setAppName("Test-SLA-Start-Miss");
eventBean.setUser("dummyuser");
@@ -146,7 +146,7 @@ public class TestSLAEmailEventListener e
Date actualStartDate = DateUtils.parseDateUTC("2013-01-01T01:00Z");
Date expectedEndDate = DateUtils.parseDateUTC("2013-01-01T12:00Z");
Date actualEndDate = DateUtils.parseDateUTC("2013-01-01T13:00Z");
- event.setJobId("0000000-000000000000001-oozie-wrkf-C@1");
+ event.setId("0000000-000000000000001-oozie-wrkf-C@1");
eventBean.setParentId("0000000-000000000000001-oozie-wrkf-C");
event.setEventStatus(EventStatus.END_MISS);
event.setJobStatus(JobEvent.EventStatus.SUCCESS.toString());
@@ -215,7 +215,7 @@ public class TestSLAEmailEventListener e
Date actualEndDate = DateUtils.parseDateUTC("2013-01-01T00:40Z");
long expectedDuration = (expectedEndDate.getTime() - expectedStartDate.getTime()) / (1000 * 60);
long actualDuration = (actualEndDate.getTime() - actualStartDate.getTime()) / (1000 * 60);
- event.setJobId("0000000-000000000000001-oozie-wrkf-C@1");
+ event.setId("0000000-000000000000001-oozie-wrkf-C@1");
eventBean.setParentId("0000000-000000000000001-oozie-wrkf-C");
event.setEventStatus(EventStatus.DURATION_MISS);
event.setJobStatus(JobEvent.EventStatus.SUCCESS.toString());
@@ -288,7 +288,7 @@ public class TestSLAEmailEventListener e
Date startDate = DateUtils.parseDateUTC("2013-01-01T00:00Z");
Date actualstartDate = DateUtils.parseDateUTC("2013-01-01T01:00Z");
event.setEventStatus(EventStatus.START_MISS);
- event.setJobId("0000000-000000000000001-oozie-wrkf-C@1");
+ event.setId("0000000-000000000000001-oozie-wrkf-C@1");
eventBean.setAppName("Test-SLA-Start-Miss");
eventBean.setUser("dummyuser");
eventBean.setNominalTime(startDate);
@@ -323,7 +323,7 @@ public class TestSLAEmailEventListener e
// set invalid address as alert contact
eventBean.setAlertContact("invalidAddress");
event.setEventStatus(EventStatus.START_MISS);
- event.setJobId("0000000-000000000000001-oozie-wrkf-C@1");
+ event.setId("0000000-000000000000001-oozie-wrkf-C@1");
eventBean.setAppType(AppType.COORDINATOR_ACTION);
eventBean.setAppName("Test-SLA-Start-Miss");
eventBean.setUser("dummyuser");
@@ -345,7 +345,7 @@ public class TestSLAEmailEventListener e
// set multiple addresses as alert contact
eventBean.setAlertContact("alert-receiver1@oozie.com, alert-receiver2@oozie.com");
event.setEventStatus(EventStatus.START_MISS);
- event.setJobId("0000000-000000000000001-oozie-wrkf-C@1");
+ event.setId("0000000-000000000000001-oozie-wrkf-C@1");
eventBean.setAppType(AppType.COORDINATOR_ACTION);
eventBean.setAppName("Test-SLA-Start-Miss");
eventBean.setUser("dummyuser");
Modified: oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java
URL: http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java?rev=1497996&r1=1497995&r2=1497996&view=diff
==============================================================================
--- oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java (original)
+++ oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java Sat Jun 29 18:30:15 2013
@@ -33,11 +33,13 @@ import org.apache.oozie.client.Coordinat
import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.client.event.JobEvent;
import org.apache.oozie.client.event.SLAEvent.SLAStatus;
import org.apache.oozie.client.event.SLAEvent.EventStatus;
import org.apache.oozie.client.rest.RestConstants;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.coord.CoordActionStartXCommand;
+import org.apache.oozie.command.coord.CoordActionUpdateXCommand;
import org.apache.oozie.command.coord.CoordKillXCommand;
import org.apache.oozie.command.coord.CoordRerunXCommand;
import org.apache.oozie.command.coord.CoordSubmitXCommand;
@@ -45,10 +47,12 @@ import org.apache.oozie.command.wf.KillX
import org.apache.oozie.command.wf.ReRunXCommand;
import org.apache.oozie.command.wf.StartXCommand;
import org.apache.oozie.command.wf.SubmitXCommand;
+import org.apache.oozie.event.CoordinatorActionEvent;
import org.apache.oozie.event.listener.JobEventListener;
import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordActionUpdateJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobInsertJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobUpdateJPAExecutor;
import org.apache.oozie.executor.jpa.sla.SLASummaryGetJPAExecutor;
import org.apache.oozie.service.EventHandlerService;
@@ -328,66 +332,6 @@ public class TestSLAEventGeneration exte
_testWorkflowJobCommands(conf, ehs, slas, false);
}
- private void _testWorkflowJobCommands(Configuration conf, EventHandlerService ehs, SLAService slas, boolean isNew)
- throws CommandException, InterruptedException {
- cal.setTime(new Date());
- cal.add(Calendar.MINUTE, -20); // for start_miss
- Date nominal = cal.getTime();
- String nominalTime = DateUtils.formatDateOozieTZ(nominal);
- conf.set("nominal_time", nominalTime);
- cal.setTime(nominal);
- cal.add(Calendar.MINUTE, 10); // as per the sla xml
- String expectedStart = DateUtils.formatDateOozieTZ(cal.getTime());
- cal.setTime(nominal);
- cal.add(Calendar.MINUTE, 30); // as per the sla xml
- String expectedEnd = DateUtils.formatDateOozieTZ(cal.getTime());
-
- // testing creation of new sla registration via Submit command
- SubmitXCommand sc = new SubmitXCommand(conf);
- String jobId = sc.call();
- SLACalcStatus slaEvent = slas.getSLACalculator().get(jobId);
- assertEquals(jobId, slaEvent.getId());
- assertEquals("test-wf-job-sla", slaEvent.getAppName());
- assertEquals(AppType.WORKFLOW_JOB, slaEvent.getAppType());
- assertEquals(nominalTime, DateUtils.formatDateOozieTZ(slaEvent.getNominalTime()));
- assertEquals(expectedStart, DateUtils.formatDateOozieTZ(slaEvent.getExpectedStart()));
- assertEquals(expectedEnd, DateUtils.formatDateOozieTZ(slaEvent.getExpectedEnd()));
- if (isNew) {
- assertEquals(30 * 60 * 1000, slaEvent.getExpectedDuration());
- assertEquals(alert_events, slaEvent.getAlertEvents());
- }
- slas.runSLAWorker();
- slaEvent = (SLACalcStatus) ehs.getEventQueue().poll();
- assertEquals(SLAStatus.NOT_STARTED, slaEvent.getSLAStatus());
- assertEquals(EventStatus.START_MISS, slaEvent.getEventStatus());
-
- // test that sla processes the Job Event from Start command
- new StartXCommand(jobId).call();
- slaEvent = slas.getSLACalculator().get(jobId);
- slaEvent.setEventProcessed(0); //resetting to receive sla events
- ehs.new EventWorker().run();
- Thread.sleep(300); // time for listeners to run
- slaEvent = (SLACalcStatus) ehs.getEventQueue().poll();
- assertEquals(jobId, slaEvent.getId());
- assertNotNull(slaEvent.getActualStart());
- assertEquals(SLAStatus.IN_PROCESS, slaEvent.getSLAStatus());
- assertEquals(WorkflowJob.Status.RUNNING.name(), slaEvent.getJobStatus());
- ehs.getEventQueue().clear();
-
- // test that sla processes the Job Event from Kill command
- new KillXCommand(jobId).call();
- ehs.getEventQueue().poll(); //ignore the wf-action event generated
- ehs.new EventWorker().run();
- Thread.sleep(300); // time for listeners to run
- ehs.getEventQueue().poll(); // ignore duration event
- slaEvent = (SLACalcStatus) ehs.getEventQueue().poll();
- assertEquals(jobId, slaEvent.getId());
- assertNotNull(slaEvent.getActualEnd());
- assertEquals(EventStatus.END_MISS, slaEvent.getEventStatus());
- assertEquals(SLAStatus.MISS, slaEvent.getSLAStatus());
- assertEquals(WorkflowJob.Status.KILLED.name(), slaEvent.getJobStatus());
- }
-
/**
* Test for SLA Events generated through Coordinator Action commands
*
@@ -475,4 +419,134 @@ public class TestSLAEventGeneration exte
assertEquals(WorkflowJob.Status.KILLED.name(), slaEvent.getJobStatus());
}
+ /**
+ * Test Coord action KILLED from WAITING generates corresponding events Job
+ * - FAILURE and SLA - END_MISS
+ *
+ * @throws Exception
+ */
+ public void testFailureAndMissEventsOnKill() throws Exception {
+ EventHandlerService ehs = services.get(EventHandlerService.class);
+ assertEquals(0, ehs.getEventQueue().size());
+ // CASE 1: Coord Job status - RUNNING (similar to RunningWithError,Paused and PausedWithError for
+ // this test's purpose)
+ CoordinatorJobBean job = this.addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
+ CoordinatorActionBean action = addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.WAITING,
+ "coord-action-sla1.xml", 0);
+ // reset dummy externalId set by above test method
+ action.setExternalId(null);
+ jpa.execute(new CoordActionUpdateJPAExecutor(action));
+ services.get(SLAService.class).addRegistrationEvent(
+ TestSLAService._createSLARegistration(action.getId(), AppType.COORDINATOR_ACTION));
+
+ new CoordKillXCommand(job.getId()).call();
+ assertEquals(1, ehs.getEventQueue().size());
+
+ CoordinatorActionEvent jobEvent = (CoordinatorActionEvent) ehs.getEventQueue().peek();
+ assertEquals(AppType.COORDINATOR_ACTION, jobEvent.getAppType());
+ assertEquals(JobEvent.EventStatus.FAILURE, jobEvent.getEventStatus());
+ assertEquals(action.getId(), jobEvent.getId());
+
+ ehs.new EventWorker().run();
+ SLACalcStatus slaEvent = (SLACalcStatus) ehs.getEventQueue().poll();
+ assertEquals(EventStatus.END_MISS, slaEvent.getEventStatus());
+ assertEquals(SLAStatus.MISS, slaEvent.getSLAStatus());
+ assertEquals(CoordinatorAction.Status.KILLED.name(), slaEvent.getJobStatus());
+ assertEquals(action.getId(), slaEvent.getId());
+ assertNotNull(slaEvent.getActualEnd());
+
+ // CASE 2: Coord Job status - PAUSED - Should not create event via CoordKill
+ // but via CoordActionUpdate
+ assertEquals(0, ehs.getEventQueue().size());
+ job = this.addRecordToCoordJobTable(CoordinatorJob.Status.PAUSED, false, false);
+ action = addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.RUNNING, "coord-action-sla1.xml",
+ 0);
+ services.get(SLAService.class).addRegistrationEvent(
+ TestSLAService._createSLARegistration(action.getId(), AppType.COORDINATOR_ACTION));
+
+ new CoordKillXCommand(job.getId()).call();
+ assertEquals(0, ehs.getEventQueue().size());
+
+ WorkflowJobBean wf = new WorkflowJobBean();
+ wf.setId(action.getExternalId());
+ wf.setStatus(WorkflowJob.Status.KILLED);
+ jpa.execute(new WorkflowJobInsertJPAExecutor(wf));
+ new CoordActionUpdateXCommand(wf).call();
+ assertEquals(1, ehs.getEventQueue().size());
+
+ jobEvent = (CoordinatorActionEvent) ehs.getEventQueue().peek();
+ assertEquals(AppType.COORDINATOR_ACTION, jobEvent.getAppType());
+ assertEquals(JobEvent.EventStatus.FAILURE, jobEvent.getEventStatus());
+ assertEquals(action.getId(), jobEvent.getId());
+
+ ehs.new EventWorker().run();
+ slaEvent = (SLACalcStatus) ehs.getEventQueue().poll();
+ assertEquals(EventStatus.END_MISS, slaEvent.getEventStatus());
+ assertEquals(SLAStatus.MISS, slaEvent.getSLAStatus());
+ assertEquals(CoordinatorAction.Status.KILLED.name(), slaEvent.getJobStatus());
+ assertEquals(action.getId(), slaEvent.getId());
+ assertNotNull(slaEvent.getActualEnd());
+
+ }
+
+ private void _testWorkflowJobCommands(Configuration conf, EventHandlerService ehs, SLAService slas, boolean isNew)
+ throws Exception {
+ cal.setTime(new Date());
+ cal.add(Calendar.MINUTE, -20); // for start_miss
+ Date nominal = cal.getTime();
+ String nominalTime = DateUtils.formatDateOozieTZ(nominal);
+ conf.set("nominal_time", nominalTime);
+ cal.setTime(nominal);
+ cal.add(Calendar.MINUTE, 10); // as per the sla xml
+ String expectedStart = DateUtils.formatDateOozieTZ(cal.getTime());
+ cal.setTime(nominal);
+ cal.add(Calendar.MINUTE, 30); // as per the sla xml
+ String expectedEnd = DateUtils.formatDateOozieTZ(cal.getTime());
+
+ // testing creation of new sla registration via Submit command
+ SubmitXCommand sc = new SubmitXCommand(conf);
+ String jobId = sc.call();
+ SLACalcStatus slaEvent = slas.getSLACalculator().get(jobId);
+ assertEquals(jobId, slaEvent.getId());
+ assertEquals("test-wf-job-sla", slaEvent.getAppName());
+ assertEquals(AppType.WORKFLOW_JOB, slaEvent.getAppType());
+ assertEquals(nominalTime, DateUtils.formatDateOozieTZ(slaEvent.getNominalTime()));
+ assertEquals(expectedStart, DateUtils.formatDateOozieTZ(slaEvent.getExpectedStart()));
+ assertEquals(expectedEnd, DateUtils.formatDateOozieTZ(slaEvent.getExpectedEnd()));
+ if (isNew) {
+ assertEquals(30 * 60 * 1000, slaEvent.getExpectedDuration());
+ assertEquals(alert_events, slaEvent.getAlertEvents());
+ }
+ slas.runSLAWorker();
+ slaEvent = (SLACalcStatus) ehs.getEventQueue().poll();
+ assertEquals(SLAStatus.NOT_STARTED, slaEvent.getSLAStatus());
+ assertEquals(EventStatus.START_MISS, slaEvent.getEventStatus());
+
+ // test that sla processes the Job Event from Start command
+ new StartXCommand(jobId).call();
+ slaEvent = slas.getSLACalculator().get(jobId);
+ slaEvent.setEventProcessed(0); //resetting to receive sla events
+ ehs.new EventWorker().run();
+ Thread.sleep(300); // time for listeners to run
+ slaEvent = (SLACalcStatus) ehs.getEventQueue().poll();
+ assertEquals(jobId, slaEvent.getId());
+ assertNotNull(slaEvent.getActualStart());
+ assertEquals(SLAStatus.IN_PROCESS, slaEvent.getSLAStatus());
+ assertEquals(WorkflowJob.Status.RUNNING.name(), slaEvent.getJobStatus());
+ ehs.getEventQueue().clear();
+
+ // test that sla processes the Job Event from Kill command
+ new KillXCommand(jobId).call();
+ ehs.getEventQueue().poll(); //ignore the wf-action event generated
+ ehs.new EventWorker().run();
+ Thread.sleep(300); // time for listeners to run
+ ehs.getEventQueue().poll(); // ignore duration event
+ slaEvent = (SLACalcStatus) ehs.getEventQueue().poll();
+ assertEquals(jobId, slaEvent.getId());
+ assertNotNull(slaEvent.getActualEnd());
+ assertEquals(EventStatus.END_MISS, slaEvent.getEventStatus());
+ assertEquals(SLAStatus.MISS, slaEvent.getSLAStatus());
+ assertEquals(WorkflowJob.Status.KILLED.name(), slaEvent.getJobStatus());
+ }
+
}
Modified: oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/sla/TestSLAJobEventListener.java
URL: http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/sla/TestSLAJobEventListener.java?rev=1497996&r1=1497995&r2=1497996&view=diff
==============================================================================
--- oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/sla/TestSLAJobEventListener.java (original)
+++ oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/sla/TestSLAJobEventListener.java Sat Jun 29 18:30:15 2013
@@ -140,7 +140,7 @@ public class TestSLAJobEventListener ext
private SLARegistrationBean _createSLARegBean(String jobId, AppType appType) {
SLARegistrationBean reg = new SLARegistrationBean();
- reg.setJobId(jobId);
+ reg.setId(jobId);
reg.setAppType(appType);
return reg;
}
Modified: oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/sla/TestSLARegistrationGetJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/sla/TestSLARegistrationGetJPAExecutor.java?rev=1497996&r1=1497995&r2=1497996&view=diff
==============================================================================
--- oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/sla/TestSLARegistrationGetJPAExecutor.java (original)
+++ oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/sla/TestSLARegistrationGetJPAExecutor.java Sat Jun 29 18:30:15 2013
@@ -69,7 +69,7 @@ public class TestSLARegistrationGetJPAEx
private void _addRecordToSLARegistrationTable(String jobId, AppType appType, Date start, Date end,
String alertEvent, String alertContact) throws Exception {
SLARegistrationBean reg = new SLARegistrationBean();
- reg.setJobId(jobId);
+ reg.setId(jobId);
reg.setAppType(appType);
reg.setExpectedStart(start);
reg.setExpectedEnd(end);
Modified: oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/sla/TestSLARegistrationGetRecordsOnRestartJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/sla/TestSLARegistrationGetRecordsOnRestartJPAExecutor.java?rev=1497996&r1=1497995&r2=1497996&view=diff
==============================================================================
--- oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/sla/TestSLARegistrationGetRecordsOnRestartJPAExecutor.java (original)
+++ oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/sla/TestSLARegistrationGetRecordsOnRestartJPAExecutor.java Sat Jun 29 18:30:15 2013
@@ -50,7 +50,7 @@ public class TestSLARegistrationGetRecor
Date current = new Date();
final String jobId = "0000000-" + current.getTime() + "-TestSLARegGetRestartJPAExecutor-W";
SLARegistrationBean reg = new SLARegistrationBean();
- reg.setJobId(jobId);
+ reg.setId(jobId);
reg.setNotificationMsg("dummyMessage");
reg.setUpstreamApps("upApps");
reg.setAlertEvents("miss");