You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by vi...@apache.org on 2013/06/21 04:05:57 UTC
svn commit: r1495272 [1/2] - in /oozie/trunk: ./
core/src/main/java/org/apache/oozie/
core/src/main/java/org/apache/oozie/command/
core/src/main/java/org/apache/oozie/command/coord/
core/src/main/java/org/apache/oozie/command/wf/ core/src/main/java/org...
Author: virag
Date: Fri Jun 21 02:05:56 2013
New Revision: 1495272
URL: http://svn.apache.org/r1495272
Log:
OOZIE-1424 Improve SLA reliability on restart, fix bugs related to SLA and event generation (virag)
Added:
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForSLAJPAExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionGetForSLAJPAExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobGetForSLAJPAExecutor.java
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
oozie/trunk/core/src/main/java/org/apache/oozie/DagEngine.java
oozie/trunk/core/src/main/java/org/apache/oozie/WorkflowActionBean.java
oozie/trunk/core/src/main/java/org/apache/oozie/WorkflowJobBean.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/TransitionXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionTimeOutXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordKillXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordResumeXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSuspendXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordinatorXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/StartXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/event/EventQueue.java
oozie/trunk/core/src/main/java/org/apache/oozie/event/MemoryEventQueue.java
oozie/trunk/core/src/main/java/org/apache/oozie/jms/JMSJobEventListener.java
oozie/trunk/core/src/main/java/org/apache/oozie/service/EventHandlerService.java
oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java
oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLARegistrationBean.java
oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java
oozie/trunk/core/src/main/java/org/apache/oozie/sla/listener/SLAJobEventListener.java
oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java
oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java
oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java
oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAService.java
oozie/trunk/release-log.txt
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java Fri Jun 21 02:05:56 2013
@@ -74,6 +74,8 @@ import org.apache.openjpa.persistence.jd
// Select query used only by test cases
@NamedQuery(name = "GET_COORD_ACTION", query = "select OBJECT(a) from CoordinatorActionBean a where a.id = :id"),
+ // Select query used by SLAService on restart
+ @NamedQuery(name = "GET_COORD_ACTION_FOR_SLA", query = "select a.id, a.jobId, a.status, a.externalId, a.lastModifiedTimestamp from CoordinatorActionBean a where a.id = :id"),
// Select query used by ActionInfo command
@NamedQuery(name = "GET_COORD_ACTION_FOR_INFO", query = "select a.id, a.jobId, a.actionNumber, a.consoleUrl, a.errorCode, a.errorMessage, a.externalId, a.externalStatus, a.trackerUri, a.createdTimestamp, a.nominalTimestamp, a.status, a.lastModifiedTimestamp, a.missingDependencies, a.pushMissingDependencies from CoordinatorActionBean a where a.id = :id"),
// Select Query used by Timeout command
@@ -266,6 +268,14 @@ public class CoordinatorActionBean exten
return Status.valueOf(status);
}
+ /**
+ * Return the status in string
+ * @return
+ */
+ public String getStatusStr() {
+ return status;
+ }
+
@Override
public void setStatus(Status status) {
super.setStatus(status);
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/DagEngine.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/DagEngine.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/DagEngine.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/DagEngine.java Fri Jun 21 02:05:56 2013
@@ -117,6 +117,27 @@ public class DagEngine extends BaseEngin
}
/**
+ * Submit a workflow through a coordinator. It validates configuration properties.
+ * @param conf job conf
+ * @param parentId parent of workflow
+ * @return
+ * @throws DagEngineException
+ */
+ public String submitJobFromCoordinator(Configuration conf, String parentId) throws DagEngineException {
+ validateSubmitConfiguration(conf);
+ try {
+ String jobId;
+ SubmitXCommand submit = new SubmitXCommand(conf);
+ jobId = submit.call();
+ new StartXCommand(jobId, parentId).call();
+ return jobId;
+ }
+ catch (CommandException ex) {
+ throw new DagEngineException(ex);
+ }
+ }
+
+ /**
* Submit a pig/hive/mapreduce job through HTTP.
* <p/>
* It validates configuration properties.
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/WorkflowActionBean.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/WorkflowActionBean.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/WorkflowActionBean.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/WorkflowActionBean.java Fri Jun 21 02:05:56 2013
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -59,6 +59,8 @@ import org.apache.openjpa.persistence.jd
@NamedQuery(name = "GET_ACTION_FOR_UPDATE", query = "select OBJECT(a) from WorkflowActionBean a where a.id = :id"),
+ @NamedQuery(name = "GET_ACTION_FOR_SLA", query = "select a.id, a.status, a.startTimestamp, a.endTimestamp from WorkflowActionBean a where a.id = :id"),
+
@NamedQuery(name = "GET_ACTIONS_FOR_WORKFLOW", query = "select OBJECT(a) from WorkflowActionBean a where a.wfId = :wfId order by a.startTimestamp"),
@NamedQuery(name = "GET_ACTIONS_OF_WORKFLOW_FOR_UPDATE", query = "select OBJECT(a) from WorkflowActionBean a where a.wfId = :wfId order by a.startTimestamp"),
@@ -219,6 +221,26 @@ public class WorkflowActionBean extends
}
/**
+ * Return whether workflow action in terminal state or not
+ *
+ * @return
+ */
+ public boolean inTerminalState() {
+ boolean isTerminalState = false;
+ switch (WorkflowAction.Status.valueOf(status)) {
+ case ERROR:
+ case FAILED:
+ case KILLED:
+ case OK:
+ isTerminalState = true;
+ break;
+ default:
+ break;
+ }
+ return isTerminalState;
+ }
+
+ /**
* Return if the action execution is complete.
*
* @return if the action start is complete.
@@ -238,7 +260,7 @@ public class WorkflowActionBean extends
return (getStatus() == WorkflowAction.Status.START_RETRY || getStatus() == WorkflowAction.Status.START_MANUAL
|| getStatus() == WorkflowAction.Status.END_RETRY || getStatus() == WorkflowAction.Status.END_MANUAL);
}
-
+
/**
* Return true if the action is USER_RETRY
*
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/WorkflowJobBean.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/WorkflowJobBean.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/WorkflowJobBean.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/WorkflowJobBean.java Fri Jun 21 02:05:56 2013
@@ -63,6 +63,8 @@ import org.apache.openjpa.persistence.jd
@NamedQuery(name = "GET_WORKFLOW_FOR_UPDATE", query = "select OBJECT(w) from WorkflowJobBean w where w.id = :id"),
+ @NamedQuery(name = "GET_WORKFLOW_FOR_SLA", query = "select w.id, w.status, w.startTimestamp, w.endTimestamp from WorkflowJobBean w where w.id = :id"),
+
@NamedQuery(name = "GET_WORKFLOW_ID_FOR_EXTERNAL_ID", query = "select w.id from WorkflowJobBean w where w.externalId = :externalId"),
@NamedQuery(name = "GET_WORKFLOWS_COUNT_WITH_STATUS", query = "select count(w) from WorkflowJobBean w where w.status = :status"),
@@ -192,6 +194,20 @@ public class WorkflowJobBean extends Jso
setProtoActionConf(protoActionConf);
}
+ public boolean inTerminalState() {
+ boolean inTerminalState = false;
+ switch (WorkflowJob.Status.valueOf(status)) {
+ case FAILED:
+ case KILLED:
+ case SUCCEEDED:
+ inTerminalState = true;
+ break;
+ default:
+ break;
+ }
+ return inTerminalState;
+ }
+
public String getLogToken() {
return logToken;
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/TransitionXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/TransitionXCommand.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/TransitionXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/TransitionXCommand.java Fri Jun 21 02:05:56 2013
@@ -82,7 +82,7 @@ public abstract class TransitionXCommand
if (actionBean instanceof CoordinatorActionBean) {
CoordinatorActionBean caBean = (CoordinatorActionBean) actionBean;
caBean.setJobId(coordJob.getId());
- CoordinatorXCommand.generateEvent(caBean, coordJob.getUser(), coordJob.getAppName());
+ CoordinatorXCommand.generateEvent(caBean, coordJob.getUser(), coordJob.getAppName(), null);
}
// TODO generate Coord Job event
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionCheckXCommand.java Fri Jun 21 02:05:56 2013
@@ -45,6 +45,8 @@ import org.apache.oozie.command.Precondi
import org.apache.oozie.executor.jpa.BulkUpdateInsertForCoordActionStatusJPAExecutor;
import org.apache.oozie.executor.jpa.CoordActionGetForCheckJPAExecutor;
import org.apache.oozie.executor.jpa.CoordinatorJobGetForUserAppnameJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowActionGetForSLAJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobGetForSLAJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
/**
@@ -55,6 +57,8 @@ public class CoordActionCheckXCommand ex
private String actionId;
private int actionCheckDelay;
private CoordinatorActionBean coordAction = null;
+ private CoordinatorJobBean coordJob;
+ private WorkflowJobBean workflowJob;
private JPAService jpaService = null;
private List<JsonBean> updateList = new ArrayList<JsonBean>();
private List<JsonBean> insertList = new ArrayList<JsonBean>();
@@ -122,9 +126,7 @@ public class CoordActionCheckXCommand ex
jpaService.execute(new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, insertList));
CoordinatorAction.Status endStatus = coordAction.getStatus();
if (endStatus != initialStatus && EventHandlerService.isEnabled()) {
- CoordinatorJobBean coordJob = jpaService.execute(new CoordinatorJobGetForUserAppnameJPAExecutor(
- coordAction.getJobId()));
- generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName());
+ generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName(), workflowJob.getStartTime());
}
}
catch (XException ex) {
@@ -160,6 +162,9 @@ public class CoordActionCheckXCommand ex
if (jpaService != null) {
coordAction = jpaService.execute(new CoordActionGetForCheckJPAExecutor(actionId));
+ coordJob = jpaService.execute(new CoordinatorJobGetForUserAppnameJPAExecutor(
+ coordAction.getJobId()));
+ workflowJob = jpaService.execute (new WorkflowJobGetForSLAJPAExecutor(coordAction.getExternalId()));
LogUtils.setLogInfo(coordAction, logInfo);
}
else {
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java Fri Jun 21 02:05:56 2013
@@ -215,7 +215,7 @@ public class CoordActionInputCheckXComma
if (EventHandlerService.isEnabled()
&& coordAction.getStatus() != CoordinatorAction.Status.READY) {
//since event is not to be generated unless action RUNNING via StartX
- generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName());
+ generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName(), null);
}
}
else {
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java Fri Jun 21 02:05:56 2013
@@ -165,7 +165,6 @@ public class CoordActionStartXCommand ex
// XmlUtils.prettyPrint(runConf).toString());
DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user);
try {
- boolean startJob = true;
Configuration conf = new XConfiguration(new StringReader(coordAction.getRunConf()));
SLAEventBean slaEvent = SLADbOperations.createStatusEvent(coordAction.getSlaXml(), coordAction.getId(), Status.STARTED,
SlaAppType.COORDINATOR_ACTION, log);
@@ -175,7 +174,7 @@ public class CoordActionStartXCommand ex
// Normalize workflow appPath here;
JobUtils.normalizeAppPath(conf.get(OozieClient.USER_NAME), conf.get(OozieClient.GROUP_NAME), conf);
- String wfId = dagEngine.submitJob(conf, startJob);
+ String wfId = dagEngine.submitJobFromCoordinator(conf, actionId);
coordAction.setStatus(CoordinatorAction.Status.RUNNING);
coordAction.setExternalId(wfId);
coordAction.incrementAndGetPending();
@@ -192,7 +191,7 @@ public class CoordActionStartXCommand ex
try {
jpaService.execute(new BulkUpdateInsertForCoordActionStartJPAExecutor(updateList, insertList));
if (EventHandlerService.isEnabled()) {
- generateEvent(coordAction, user, appName);
+ generateEvent(coordAction, user, appName, wfJob.getStartTime());
}
}
catch (JPAExecutorException je) {
@@ -248,7 +247,7 @@ public class CoordActionStartXCommand ex
// call JPAExecutor to do the bulk writes
jpaService.execute(new BulkUpdateInsertForCoordActionStartJPAExecutor(updateList, insertList));
if (EventHandlerService.isEnabled()) {
- generateEvent(coordAction, user, appName);
+ generateEvent(coordAction, user, appName, null);
}
}
catch (JPAExecutorException je) {
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionTimeOutXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionTimeOutXCommand.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionTimeOutXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionTimeOutXCommand.java Fri Jun 21 02:05:56 2013
@@ -60,7 +60,7 @@ public class CoordActionTimeOutXCommand
actionBean.setLastModifiedTime(new Date());
jpaService.execute(new org.apache.oozie.executor.jpa.CoordActionUpdateStatusJPAExecutor(actionBean));
if (EventHandlerService.isEnabled()) {
- generateEvent(actionBean, user, appName);
+ generateEvent(actionBean, user, appName, null);
}
}
catch (JPAExecutorException e) {
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java Fri Jun 21 02:05:56 2013
@@ -48,6 +48,7 @@ import org.apache.oozie.executor.jpa.JPA
public class CoordActionUpdateXCommand extends CoordinatorXCommand<Void> {
private WorkflowJobBean workflow;
private CoordinatorActionBean coordAction = null;
+ private CoordinatorJobBean coordJob;
private JPAService jpaService = null;
private int maxRetries = 1;
private List<JsonBean> updateList = new ArrayList<JsonBean>();
@@ -69,7 +70,6 @@ public class CoordActionUpdateXCommand e
try {
LOG.debug("STARTED CoordActionUpdateXCommand for wfId=" + workflow.getId());
Status slaStatus = null;
- CoordinatorAction.Status preCoordStatus = coordAction.getStatus();
if (workflow.getStatus() == WorkflowJob.Status.SUCCEEDED) {
coordAction.setStatus(CoordinatorAction.Status.SUCCEEDED);
coordAction.setPending(0);
@@ -109,7 +109,7 @@ public class CoordActionUpdateXCommand e
return null;
}
- LOG.info("Updating Coordintaor action id :" + coordAction.getId() + " status from " + preCoordStatus
+ LOG.info("Updating Coordintaor action id :" + coordAction.getId() + " status "
+ " to " + coordAction.getStatus() + ", pending = " + coordAction.getPending());
coordAction.setLastModifiedTime(new Date());
@@ -134,10 +134,8 @@ public class CoordActionUpdateXCommand e
}
jpaService.execute(new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, insertList));
- if (preCoordStatus != coordAction.getStatus() && EventHandlerService.isEnabled()) {
- CoordinatorJobBean coordJob = jpaService.execute(new CoordinatorJobGetForUserAppnameJPAExecutor(
- coordAction.getJobId()));
- generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName());
+ if (EventHandlerService.isEnabled()) {
+ generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName(), workflow.getStartTime());
}
LOG.debug("ENDED CoordActionUpdateXCommand for wfId=" + workflow.getId());
@@ -180,6 +178,8 @@ public class CoordActionUpdateXCommand e
try {
coordAction = jpaService.execute(new CoordActionGetForExternalIdJPAExecutor(workflow.getId()));
if (coordAction != null) {
+ coordJob = jpaService.execute(new CoordinatorJobGetForUserAppnameJPAExecutor(
+ coordAction.getJobId()));
break;
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordKillXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordKillXCommand.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordKillXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordKillXCommand.java Fri Jun 21 02:05:56 2013
@@ -169,9 +169,6 @@ public class CoordKillXCommand extends K
public void performWrites() throws CommandException {
try {
jpaService.execute(new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, null));
- if (EventHandlerService.isEnabled()) {
- generateEvents(coordJob);
- }
}
catch (JPAExecutorException e) {
throw new CommandException(e);
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java Fri Jun 21 02:05:56 2013
@@ -116,7 +116,7 @@ public class CoordMaterializeTransitionX
if (actionBean instanceof CoordinatorActionBean) {
CoordinatorActionBean coordAction = (CoordinatorActionBean) actionBean;
if (EventHandlerService.isEnabled()) {
- CoordinatorXCommand.generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName());
+ CoordinatorXCommand.generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName(), null);
}
if (coordAction.getPushMissingDependencies() != null) {
// TODO: Delay in catchup mode?
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java Fri Jun 21 02:05:56 2013
@@ -256,7 +256,7 @@ public class CoordPushDependencyCheckXCo
if (EventHandlerService.isEnabled()
&& coordAction.getStatus() != CoordinatorAction.Status.READY) {
//since event is not to be generated unless action RUNNING via StartX
- generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName());
+ generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName(), null);
}
}
else {
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordRerunXCommand.java Fri Jun 21 02:05:56 2013
@@ -399,7 +399,6 @@ public class CoordRerunXCommand extends
coordJob.resetPending();
}
}
-
updateList.add(coordJob);
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordResumeXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordResumeXCommand.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordResumeXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordResumeXCommand.java Fri Jun 21 02:05:56 2013
@@ -181,9 +181,6 @@ public class CoordResumeXCommand extends
public void performWrites() throws CommandException {
try {
jpaService.execute(new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, null));
- if (EventHandlerService.isEnabled()) {
- generateEvents(coordJob);
- }
}
catch (JPAExecutorException e) {
throw new CommandException(e);
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSuspendXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSuspendXCommand.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSuspendXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordSuspendXCommand.java Fri Jun 21 02:05:56 2013
@@ -186,9 +186,6 @@ public class CoordSuspendXCommand extend
public void performWrites() throws CommandException {
try {
jpaService.execute(new BulkUpdateInsertForCoordActionStatusJPAExecutor(updateList, null));
- if (EventHandlerService.isEnabled()) {
- generateEvents(coordJob);
- }
}
catch (JPAExecutorException jex) {
throw new CommandException(jex);
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordinatorXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordinatorXCommand.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordinatorXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordinatorXCommand.java Fri Jun 21 02:05:56 2013
@@ -17,6 +17,8 @@
*/
package org.apache.oozie.command.coord;
+import java.util.Date;
+
import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.AppType;
@@ -53,7 +55,7 @@ public abstract class CoordinatorXComman
super(name, type, priority, dryrun);
}
- public static void generateEvent(CoordinatorActionBean coordAction, String user, String appName) {
+ public static void generateEvent(CoordinatorActionBean coordAction, String user, String appName, Date startTime) {
if (eventService.isSupportedApptype(AppType.COORDINATOR_ACTION.name())) {
String missDep = coordAction.getMissingDependencies();
if (missDep != null && missDep.length() > 0) {
@@ -66,7 +68,7 @@ public abstract class CoordinatorXComman
String deps = missDep == null ? (pushMissDep == null ? null : pushMissDep) : (pushMissDep == null ? missDep
: missDep + CoordELFunctions.INSTANCE_SEPARATOR + pushMissDep);
CoordinatorActionEvent event = new CoordinatorActionEvent(coordAction.getId(), coordAction.getJobId(),
- coordAction.getStatus(), user, appName, coordAction.getNominalTime(), coordAction.getCreatedTime(),
+ coordAction.getStatus(), user, appName, coordAction.getNominalTime(), startTime,
deps);
event.setErrorCode(coordAction.getErrorCode());
event.setErrorMessage(coordAction.getErrorMessage());
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java Fri Jun 21 02:05:56 2013
@@ -71,6 +71,8 @@ public class SignalXCommand extends Work
private JPAService jpaService = null;
private String jobId;
private String actionId;
+ private String parentId;
+ private CoordinatorActionBean coordAction;
private WorkflowJobBean wfJob;
private WorkflowActionBean wfAction;
private List<JsonBean> updateList = new ArrayList<JsonBean>();
@@ -80,13 +82,14 @@ public class SignalXCommand extends Work
private String wfJobErrorMsg;
- public SignalXCommand(String name, int priority, String jobId) {
+ public SignalXCommand(String name, int priority, String jobId, String parentId) {
super(name, name, priority);
this.jobId = ParamChecker.notEmpty(jobId, "jobId");
+ this.parentId = parentId;
}
public SignalXCommand(String jobId, String actionId) {
- this("signal", 1, jobId);
+ this("signal", 1, jobId, null);
this.actionId = ParamChecker.notEmpty(actionId, "actionId");
}
@@ -109,6 +112,8 @@ public class SignalXCommand extends Work
LogUtils.setLogInfo(wfJob, logInfo);
if (actionId != null) {
this.wfAction = jpaService.execute(new WorkflowActionGetJPAExecutor(actionId));
+ coordAction = jpaService.execute(new CoordActionGetForExternalIdJPAExecutor(wfJob
+ .getId()));
LogUtils.setLogInfo(wfAction, logInfo);
}
}
@@ -326,11 +331,12 @@ public class SignalXCommand extends Work
// call JPAExecutor to do the bulk writes
jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
if (generateEvent && EventHandlerService.isEnabled()) {
- CoordinatorActionBean coordAction = jpaService.execute(new CoordActionGetForExternalIdJPAExecutor(wfJob
- .getId()));
if (coordAction != null) {
wfJob.setParentId(coordAction.getId());
}
+ else {
+ wfJob.setParentId(parentId);
+ }
generateEvent(wfJob, wfJobErrorCode, wfJobErrorMsg);
}
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/StartXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/StartXCommand.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/StartXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/StartXCommand.java Fri Jun 21 02:05:56 2013
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -25,7 +25,11 @@ import org.apache.oozie.util.InstrumentU
public class StartXCommand extends SignalXCommand {
public StartXCommand(String id) {
- super("start", 1, id);
+ this(id, null);
+ }
+
+ public StartXCommand(String jobId, String parentId) {
+ super("start", 1, jobId, parentId);
InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
}
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/event/EventQueue.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/event/EventQueue.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/event/EventQueue.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/event/EventQueue.java Fri Jun 21 02:05:56 2013
@@ -18,7 +18,7 @@
package org.apache.oozie.event;
import java.io.Serializable;
-import java.util.Set;
+import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.client.event.Event;
@@ -54,7 +54,7 @@ public interface EventQueue {
* Fetch events from queue in batch
* @return events set
*/
- public Set<Event> pollBatch();
+ public List<Event> pollBatch();
/**
* Fetch single event from queue
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/event/MemoryEventQueue.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/event/MemoryEventQueue.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/event/MemoryEventQueue.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/event/MemoryEventQueue.java Fri Jun 21 02:05:56 2013
@@ -18,8 +18,8 @@
package org.apache.oozie.event;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
@@ -74,9 +74,9 @@ public class MemoryEventQueue implements
}
@Override
- public Set<Event> pollBatch() {
+ public List<Event> pollBatch() {
// batch drain
- Set<Event> eventBatch = new HashSet<Event>();
+ List<Event> eventBatch = new ArrayList<Event>();
for (int i = 0; i < batchSize; i++) {
EventQueueElement polled = eventQueue.poll();
if (polled != null) {
Added: oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForSLAJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForSLAJPAExecutor.java?rev=1495272&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForSLAJPAExecutor.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForSLAJPAExecutor.java Fri Jun 21 02:05:56 2013
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import java.sql.Timestamp;
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.util.DateUtils;
+import org.apache.oozie.util.ParamChecker;
+
+/**
+ * JPAExecutor to get attributes of CoordinatorActionBean required by SLAService on restart
+ */
+public class CoordActionGetForSLAJPAExecutor implements JPAExecutor<CoordinatorActionBean> {
+
+ private String coordActionId;
+
+ public CoordActionGetForSLAJPAExecutor(String coordActionId) {
+ ParamChecker.notNull(coordActionId, "coordActionId");
+ this.coordActionId = coordActionId;
+ }
+
+ @Override
+ public String getName() {
+ return "CoordActionGetForSLAJPAExecutor";
+ }
+
+ @Override
+ public CoordinatorActionBean execute(EntityManager em) throws JPAExecutorException {
+ try {
+ Query q = em.createNamedQuery("GET_COORD_ACTION_FOR_SLA");
+ q.setParameter("id", coordActionId);
+ Object[] obj = (Object[]) q.getSingleResult();
+ CoordinatorActionBean caBean = getBeanForRunningCoordAction(obj);
+ return caBean;
+ }
+ catch (Exception e) {
+ throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
+ }
+
+ }
+
+ private CoordinatorActionBean getBeanForRunningCoordAction(Object[] arr) {
+ CoordinatorActionBean bean = new CoordinatorActionBean();
+ if (arr[0] != null) {
+ bean.setId((String) arr[0]);
+ }
+ if (arr[1] != null) {
+ bean.setJobId((String) arr[1]);
+ }
+ if (arr[2] != null) {
+ bean.setStatus(CoordinatorAction.Status.valueOf((String) arr[2]));
+ }
+ if (arr[3] != null) {
+ bean.setExternalId((String) arr[3]);
+ }
+ if (arr[4] != null) {
+ bean.setLastModifiedTime(DateUtils.toDate((Timestamp)arr[4]));
+ }
+ return bean;
+ }
+}
Added: oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionGetForSLAJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionGetForSLAJPAExecutor.java?rev=1495272&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionGetForSLAJPAExecutor.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionGetForSLAJPAExecutor.java Fri Jun 21 02:05:56 2013
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import java.sql.Timestamp;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.util.DateUtils;
+import org.apache.oozie.util.ParamChecker;
+
+/**
+ * Retrieve the workflow action bean for sla service
+ */
+public class WorkflowActionGetForSLAJPAExecutor implements JPAExecutor<WorkflowActionBean> {
+
+ private String wfActionId;
+
+ public WorkflowActionGetForSLAJPAExecutor(String wfActionId) {
+ ParamChecker.notNull(wfActionId, "wfActionId");
+ this.wfActionId = wfActionId;
+ }
+
+ @Override
+ public String getName() {
+ return "WorkflowActionGetForSLAJPAExecutor";
+ }
+
+ @Override
+ public WorkflowActionBean execute(EntityManager em) throws JPAExecutorException {
+ try {
+ Query q = em.createNamedQuery("GET_ACTION_FOR_SLA");
+ q.setParameter("id", wfActionId);
+ Object[] obj = (Object[]) q.getSingleResult();
+ return getBeanFromArray(obj);
+ }
+ catch (Exception e) {
+ throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
+ }
+ }
+
+ private WorkflowActionBean getBeanFromArray(Object[] arr) {
+ WorkflowActionBean wab = new WorkflowActionBean();
+ if (arr[0] != null) {
+ wab.setId((String) arr[0]);
+ }
+ if (arr[1] != null) {
+ wab.setStatus(WorkflowAction.Status.valueOf((String) arr[1]));
+ }
+ if (arr[2] != null) {
+ wab.setStartTime(DateUtils.toDate((Timestamp) arr[2]));
+ }
+ if (arr[3] != null) {
+ wab.setEndTime(DateUtils.toDate((Timestamp) arr[3]));
+ }
+ return wab;
+ }
+}
\ No newline at end of file
Added: oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobGetForSLAJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobGetForSLAJPAExecutor.java?rev=1495272&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobGetForSLAJPAExecutor.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobGetForSLAJPAExecutor.java Fri Jun 21 02:05:56 2013
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import java.sql.Timestamp;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.util.DateUtils;
+import org.apache.oozie.util.ParamChecker;
+
+/**
+ * Retrieve the workflow job bean for sla service
+ */
+public class WorkflowJobGetForSLAJPAExecutor implements JPAExecutor<WorkflowJobBean> {
+
+ private String wfJobId;
+
+ public WorkflowJobGetForSLAJPAExecutor(String wfJobId) {
+ ParamChecker.notNull(wfJobId, "wfJobId");
+ this.wfJobId = wfJobId;
+ }
+
+ @Override
+ public String getName() {
+ return "WorkflowJobGetForSLAJPAExecutor";
+ }
+
+ @Override
+ public WorkflowJobBean execute(EntityManager em) throws JPAExecutorException {
+ try {
+ Query q = em.createNamedQuery("GET_WORKFLOW_FOR_SLA");
+ q.setParameter("id", wfJobId);
+ Object[] obj = (Object[]) q.getSingleResult();
+ return getBeanFromArray(obj);
+ }
+ catch (Exception e) {
+ throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
+ }
+ }
+
+ private WorkflowJobBean getBeanFromArray(Object[] arr) {
+ WorkflowJobBean wjb = new WorkflowJobBean();
+ if (arr[0] != null) {
+ wjb.setId((String) arr[0]);
+ }
+ if (arr[1] != null) {
+ wjb.setStatus(WorkflowJob.Status.valueOf((String) arr[1]));
+ }
+ if (arr[2] != null) {
+ wjb.setStartTime(DateUtils.toDate((Timestamp) arr[2]));
+ }
+ if (arr[3] != null) {
+ wjb.setEndTime(DateUtils.toDate((Timestamp) arr[3]));
+ }
+ return wjb;
+ }
+}
\ No newline at end of file
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/jms/JMSJobEventListener.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/jms/JMSJobEventListener.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/jms/JMSJobEventListener.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/jms/JMSJobEventListener.java Fri Jun 21 02:05:56 2013
@@ -84,7 +84,8 @@ public class JMSJobEventListener extends
textMessage.setStringProperty(property.getKey(), property.getValue());
}
textMessage.setStringProperty(JMSHeaderConstants.MESSAGE_FORMAT, messageFormat);
- LOG.trace("Event related JMS message [{0}]", textMessage.toString());
+ LOG.trace("Event related JMS text body [{0}]", textMessage.getText());
+ LOG.trace("Event related JMS entire message [{0}]", textMessage.toString());
MessageProducer producer = jmsContext.createProducer(session, topicName);
producer.setDeliveryMode(jmsDeliveryMode);
producer.setTimeToLive(jmsExpirationDate);
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/service/EventHandlerService.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/service/EventHandlerService.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/service/EventHandlerService.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/service/EventHandlerService.java Fri Jun 21 02:05:56 2013
@@ -224,7 +224,7 @@ public class EventHandlerService impleme
}
try {
if (!eventQueue.isEmpty()) {
- Set<Event> work = eventQueue.pollBatch();
+ List<Event> work = eventQueue.pollBatch();
for (Event event : work) {
MessageType msgType = event.getMsgType();
List<?> listeners = listenerMap.get(msgType);
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java Fri Jun 21 02:05:56 2013
@@ -266,7 +266,7 @@ public class SLACalcStatus extends SLAEv
@Override
public String toString() {
- return "ID: " + getId() + " SLAStatus: " + slaStatus + " EventProcessed: "+eventProcessed;
+ return "ID: " + getId() + " SLAStatus: " + slaStatus + " EventStatus: "+eventStatus + "AppType " + getAppType();
}
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java Fri Jun 21 02:05:56 2013
@@ -30,11 +30,22 @@ import java.util.TimeZone;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.AppType;
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.client.event.JobEvent;
import org.apache.oozie.client.event.SLAEvent.EventStatus;
import org.apache.oozie.client.event.SLAEvent.SLAStatus;
import org.apache.oozie.client.rest.JsonBean;
+import org.apache.oozie.executor.jpa.CoordActionGetForSLAJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.WorkflowActionGetForSLAJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobGetForSLAJPAExecutor;
+
import org.apache.oozie.executor.jpa.sla.SLACalculationInsertUpdateJPAExecutor;
import org.apache.oozie.executor.jpa.sla.SLARegistrationGetOnRestartJPAExecutor;
import org.apache.oozie.executor.jpa.sla.SLASummaryGetJPAExecutor;
@@ -48,6 +59,7 @@ import org.apache.oozie.service.Services
import org.apache.oozie.sla.service.SLAService;
import org.apache.oozie.util.XLog;
+
/**
* Implementation class for SLACalculator that calculates SLA related to
* start/end/duration of jobs using a memory-based map
@@ -77,32 +89,159 @@ public class SLACalculatorMemory impleme
}
private void loadOnRestart() {
+ boolean isJobModified = false;
try {
List<SLASummaryBean> summaryBeans = jpaService.execute(new SLASummaryGetRecordsOnRestartJPAExecutor(
modifiedAfter));
for (SLASummaryBean summaryBean : summaryBeans) {
String jobId = summaryBean.getJobId();
- if (summaryBean.getEventProcessed() == 7) {
- historySet.add(jobId);
+ try {
+ switch (summaryBean.getAppType()) {
+ case COORDINATOR_ACTION:
+ isJobModified = processSummaryBeanForCoordAction(summaryBean, jobId);
+ break;
+ case WORKFLOW_ACTION:
+ isJobModified = processSummaryBeanForWorkflowAction(summaryBean, jobId);
+ break;
+ case WORKFLOW_JOB:
+ isJobModified = processSummaryBeanForWorkflowJob(summaryBean, jobId);
+ break;
+ default:
+ break;
+ }
+ if (isJobModified) {
+ jpaService.execute(new SLASummaryUpdateForSLAStatusActualTimesJPAExecutor(summaryBean));
+ }
}
- else {
- try {
+ catch (Exception e) {
+ XLog.getLog(SLAService.class).warn("Failed to load records for " + jobId, e);
+ }
+ try {
+ if (summaryBean.getEventProcessed() == 7) {
+ historySet.add(jobId);
+ }
+ else if (summaryBean.getEventProcessed() <= 7) {
SLARegistrationBean slaRegBean = jpaService.execute(new SLARegistrationGetOnRestartJPAExecutor(
jobId));
SLACalcStatus slaCalcStatus = new SLACalcStatus(summaryBean, slaRegBean);
slaMap.put(jobId, slaCalcStatus);
}
- catch (JPAExecutorException e) {
- XLog.getLog(SLAService.class).warn("Cannot retrieve registration record for " + jobId, e);
- }
}
+ catch (Exception e) {
+ XLog.getLog(SLAService.class).warn("Failed to fetch/update records for " + jobId, e);
+ }
+
}
+
}
- catch (JPAExecutorException e) {
+ catch (Exception e) {
XLog.getLog(SLAService.class).warn("Failed to retrieve SLASummary records on restart", e);
}
}
+ private boolean processSummaryBeanForCoordAction(SLASummaryBean summaryBean, String jobId)
+ throws JPAExecutorException {
+ boolean isJobModified = false;
+ CoordinatorActionBean coordAction = null;
+ coordAction = jpaService.execute(new CoordActionGetForSLAJPAExecutor(jobId));
+ if (!coordAction.getStatusStr().equals(summaryBean.getJobStatus())) {
+ XLog.getLog(SLAService.class).trace(
+ "Coordinator action status is " + coordAction.getStatusStr() + " and summary bean status is "
+ + summaryBean.getJobStatus());
+ isJobModified = true;
+ summaryBean.setJobStatus(coordAction.getStatusStr());
+ if (coordAction.isTerminalStatus()) {
+ WorkflowJobBean wfJob = jpaService.execute(new WorkflowJobGetForSLAJPAExecutor(coordAction
+ .getExternalId()));
+ setSLASummaryBean(summaryBean, wfJob.getStartTime(), coordAction.getLastModifiedTime(),
+ coordAction.getStatusStr());
+ }
+ else if (coordAction.getStatus() != CoordinatorAction.Status.WAITING) {
+ WorkflowJobBean wfJob = jpaService.execute(new WorkflowJobGetForSLAJPAExecutor(coordAction
+ .getExternalId()));
+ setSLASummaryBean(summaryBean, summaryBean.getEventProcessed(), wfJob.getStartTime());
+ }
+ }
+ return isJobModified;
+ }
+
+ private boolean processSummaryBeanForWorkflowAction(SLASummaryBean summaryBean, String jobId)
+ throws JPAExecutorException {
+ boolean isJobModified = false;
+ WorkflowActionBean wfAction = null;
+ wfAction = jpaService.execute(new WorkflowActionGetForSLAJPAExecutor(jobId));
+ if (!wfAction.getStatusStr().equals(summaryBean.getJobStatus())) {
+ XLog.getLog(SLAService.class).trace(
+ "Workflow action status is " + wfAction.getStatusStr() + "and summary bean status is "
+ + summaryBean.getJobStatus());
+ isJobModified = true;
+ summaryBean.setJobStatus(wfAction.getStatusStr());
+ if (wfAction.inTerminalState()) {
+ setSLASummaryBean(summaryBean, wfAction.getStartTime(), wfAction.getEndTime(), wfAction.getStatusStr());
+ }
+ else if (wfAction.getStatus() != WorkflowAction.Status.PREP) {
+ setSLASummaryBean(summaryBean, summaryBean.getEventProcessed(), wfAction.getStartTime());
+ }
+ }
+ return isJobModified;
+ }
+
+ private boolean processSummaryBeanForWorkflowJob(SLASummaryBean summaryBean, String jobId)
+ throws JPAExecutorException {
+ boolean isJobModified = false;
+ WorkflowJobBean wfJob = null;
+ wfJob = jpaService.execute(new WorkflowJobGetForSLAJPAExecutor(jobId));
+ if (!wfJob.getStatusStr().equals(summaryBean.getJobStatus())) {
+ XLog.getLog(SLAService.class).trace(
+ "Workflow job status is " + wfJob.getStatusStr() + "and summary bean status is "
+ + summaryBean.getJobStatus());
+ isJobModified = true;
+ summaryBean.setJobStatus(wfJob.getStatusStr());
+ if (wfJob.inTerminalState()) {
+ setSLASummaryBean(summaryBean, wfJob.getStartTime(), wfJob.getEndTime(), wfJob.getStatusStr());
+ }
+ else if (wfJob.getStatus() != WorkflowJob.Status.PREP) {
+ setSLASummaryBean(summaryBean, summaryBean.getEventProcessed(), wfJob.getStartTime());
+ }
+ }
+ return isJobModified;
+ }
+
+ private void setSLASummaryBean(SLASummaryBean summaryBean, Date startTime, Date endTime, String status) {
+ byte eventProc = summaryBean.getEventProcessed();
+ summaryBean.setEventProcessed(8);
+ summaryBean.setActualStart(startTime);
+ summaryBean.setActualEnd(endTime);
+ long actualDuration = endTime.getTime() - startTime.getTime();
+ summaryBean.setActualDuration(actualDuration);
+ if (eventProc < 4) {
+ if (status.equals(WorkflowJob.Status.SUCCEEDED.name()) || status.equals(WorkflowAction.Status.OK.name())
+ || status.equals(CoordinatorAction.Status.SUCCEEDED.name())) {
+ if (endTime.getTime() <= summaryBean.getExpectedEnd().getTime()) {
+ summaryBean.setSLAStatus(SLAStatus.MET);
+ }
+ else {
+ summaryBean.setSLAStatus(SLAStatus.MISS);
+ }
+ }
+ else {
+ summaryBean.setSLAStatus(SLAStatus.MISS);
+ }
+ }
+
+ }
+
+ private void setSLASummaryBean(SLASummaryBean summaryBean, byte eventProc, Date startTime) {
+ if (((eventProc & 1) == 0)) {
+ eventProc += 1;
+ summaryBean.setEventProcessed(eventProc);
+ }
+ if (summaryBean.getSLAStatus().equals(SLAStatus.NOT_STARTED)) {
+ summaryBean.setSLAStatus(SLAStatus.IN_PROCESS);
+ }
+ summaryBean.setActualStart(startTime);
+ }
+
@Override
public int size() {
return slaMap.size();
@@ -161,7 +300,11 @@ public class SLACalculatorMemory impleme
}
if (((eventProc >> 1) & 1) == 0) { // check if second bit (duration-processed) is unset
- if (slaCalc.getActualStart() != null) {
+ if (reg.getExpectedDuration() == -1) {
+ eventProc += 2;
+ change = true;
+ }
+ else if (slaCalc.getActualStart() != null) {
if (reg.getExpectedDuration() + jobEventLatency < Calendar.getInstance(TimeZone.getTimeZone("UTC"))
.getTimeInMillis() - slaCalc.getActualStart().getTime()) {
slaCalc.setEventStatus(EventStatus.DURATION_MISS);
@@ -226,6 +369,7 @@ public class SLACalculatorMemory impleme
if (slaMap.size() < capacity) {
SLACalcStatus slaCalc = new SLACalcStatus(reg);
slaCalc.setSLAStatus(SLAStatus.NOT_STARTED);
+ slaCalc.setJobStatus(getJobStatus(reg.getAppType()));
slaMap.put(jobId, slaCalc);
List<JsonBean> insertList = new ArrayList<JsonBean>();
insertList.add(reg);
@@ -246,6 +390,24 @@ public class SLACalculatorMemory impleme
return false;
}
+ private String getJobStatus(AppType appType) {
+ String status = null;
+ switch (appType) {
+ case COORDINATOR_ACTION:
+ status = CoordinatorAction.Status.WAITING.name();
+ break;
+ case WORKFLOW_ACTION:
+ status = WorkflowAction.Status.PREP.name();
+ break;
+ case WORKFLOW_JOB:
+ status = WorkflowJob.Status.PREP.name();
+ break;
+ default:
+ break;
+ }
+ return status;
+ }
+
/**
* Update job into the map for SLA tracking
*/
@@ -255,6 +417,7 @@ public class SLACalculatorMemory impleme
if (slaMap.size() < capacity) {
SLACalcStatus slaCalc = new SLACalcStatus(reg);
slaCalc.setSLAStatus(SLAStatus.NOT_STARTED);
+ slaCalc.setJobStatus(getJobStatus(reg.getAppType()));
slaMap.put(jobId, slaCalc);
List<JsonBean> updateList = new ArrayList<JsonBean>();
updateList.add(reg);
@@ -299,8 +462,8 @@ public class SLACalculatorMemory impleme
slaInfo = processJobEndFailureSLA(slaCalc, startTime, endTime);
break;
default:
- XLog.getLog(SLAService.class).debug("Unknown Job Status [{0}]", jobEventStatus);
- return false;
+ XLog.getLog(SLAService.class).debug("Unknown Job Status for SLA purpose[{0}]", jobEventStatus);
+ slaInfo = getSLASummaryBean(slaCalc);
}
if (slaCalc.getEventProcessed() == 7) {
@@ -382,15 +545,16 @@ public class SLACalculatorMemory impleme
//check event proc
byte eventProc = slaCalc.getEventProcessed();
if (((eventProc >> 1) & 1) == 0) {
- if (actualDuration > expectedDuration) {
+ if (expectedDuration != -1 && actualDuration > expectedDuration) {
slaCalc.setEventStatus(EventStatus.DURATION_MISS);
+ eventHandler.queueEvent(new SLACalcStatus(slaCalc));
}
- else {
+ else if (expectedDuration != -1 && actualDuration <= expectedDuration) {
slaCalc.setEventStatus(EventStatus.DURATION_MET);
+ eventHandler.queueEvent(new SLACalcStatus(slaCalc));
}
eventProc += 2;
slaCalc.setEventProcessed(eventProc);
- eventHandler.queueEvent(new SLACalcStatus(slaCalc));
}
if (eventProc < 4) {
@@ -421,6 +585,15 @@ public class SLACalculatorMemory impleme
* @throws JPAExecutorException
*/
private SLASummaryBean processJobEndFailureSLA(SLACalcStatus slaCalc, Date actualStart, Date actualEnd) throws JPAExecutorException {
+ if (actualStart == null) {
+ // job failed before starting
+ slaCalc.setEventProcessed(7);
+ slaCalc.setActualEnd(actualEnd);
+ slaCalc.setEventStatus(EventStatus.END_MISS);
+ slaCalc.setSLAStatus(SLAStatus.MISS);
+ eventHandler.queueEvent(new SLACalcStatus(slaCalc));
+ return getSLASummaryBean(slaCalc);
+ }
slaCalc.setActualStart(actualStart);
slaCalc.setActualEnd(actualEnd);
SLARegistrationBean reg = slaCalc.getSLARegistrationBean();
@@ -430,16 +603,18 @@ public class SLACalculatorMemory impleme
byte eventProc = slaCalc.getEventProcessed();
if (((eventProc >> 1) & 1) == 0) {
- if (actualDuration > expectedDuration) {
+ if (expectedDuration != -1 && actualDuration > expectedDuration) {
slaCalc.setEventStatus(EventStatus.DURATION_MISS);
+ eventHandler.queueEvent(new SLACalcStatus(slaCalc));
}
- else {
+ else if (expectedDuration != -1 && actualDuration <= expectedDuration) {
slaCalc.setEventStatus(EventStatus.DURATION_MET);
+ eventHandler.queueEvent(new SLACalcStatus(slaCalc));
}
eventProc += 2;
slaCalc.setEventProcessed(eventProc);
- eventHandler.queueEvent(new SLACalcStatus(slaCalc));
}
+
if (eventProc < 4) {
slaCalc.setEventStatus(EventStatus.END_MISS);
slaCalc.setSLAStatus(SLAStatus.MISS);
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLARegistrationBean.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLARegistrationBean.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLARegistrationBean.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLARegistrationBean.java Fri Jun 21 02:05:56 2013
@@ -83,7 +83,7 @@ public class SLARegistrationBean impleme
@Basic
@Column(name = "expected_duration")
- private long expectedDuration = 0;
+ private long expectedDuration = -1;
@Basic
@Column(name = "user_name")
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java Fri Jun 21 02:05:56 2013
@@ -94,7 +94,7 @@ public class SLASummaryBean implements J
@Basic
@Column(name = "expected_duration")
- private long expectedDuration;
+ private long expectedDuration = -1;
@Basic
@Column(name = "actual_start")
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/sla/listener/SLAJobEventListener.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/sla/listener/SLAJobEventListener.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/sla/listener/SLAJobEventListener.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/sla/listener/SLAJobEventListener.java Fri Jun 21 02:05:56 2013
@@ -68,17 +68,14 @@ public class SLAJobEventListener extends
}
private void sendEventToSLAService(JobEvent event, String status) {
- if (!status.equals(CoordinatorAction.Status.WAITING.name())
- && !status.equals(CoordinatorAction.Status.SUSPENDED.name())) {
- Date startTime = event.getStartTime();
- Date endTime = event.getEndTime();
- try {
- Services.get().get(SLAService.class)
- .addStatusEvent(event.getId(), status, event.getEventStatus(), startTime, endTime);
- }
- catch (ServiceException se) {
- XLog.getLog(SLAService.class).error("Exception happened while sending Job-Status event for SLA", se);
- }
+ Date startTime = event.getStartTime();
+ Date endTime = event.getEndTime();
+ try {
+ Services.get().get(SLAService.class)
+ .addStatusEvent(event.getId(), status, event.getEventStatus(), startTime, endTime);
+ }
+ catch (ServiceException se) {
+ XLog.getLog(SLAService.class).error("Exception happened while sending Job-Status event for SLA", se);
}
}
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java?rev=1495272&r1=1495271&r2=1495272&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java Fri Jun 21 02:05:56 2013
@@ -24,6 +24,7 @@ import java.io.Writer;
import java.util.Arrays;
import java.util.Date;
import java.util.HashSet;
+import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -40,6 +41,7 @@ import org.apache.oozie.client.Coordinat
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.client.event.Event;
import org.apache.oozie.client.event.JobEvent;
import org.apache.oozie.client.event.JobEvent.EventStatus;
import org.apache.oozie.client.rest.RestConstants;
@@ -215,7 +217,7 @@ public class TestEventGeneration extends
ehs.setAppTypes(new HashSet<String>(Arrays.asList("coordinator_action")));
assertEquals(queue.size(), 0);
Date startTime = DateUtils.parseDateOozieTZ("2013-01-01T10:00Z");
- Date endTime = DateUtils.parseDateOozieTZ("2013-01-01T10:14Z");
+ Date endTime = DateUtils.parseDateOozieTZ("2013-01-01T10:01Z");
CoordinatorJobBean coord = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, false,
false, 0);
modifyCoordForRunning(coord);
@@ -234,7 +236,7 @@ public class TestEventGeneration extends
assertEquals(action.getId(), event.getId());
assertEquals(action.getJobId(), event.getParentId());
assertEquals(action.getNominalTime(), ((CoordinatorActionEvent) event).getNominalTime());
- assertEquals(action.getCreatedTime(), event.getStartTime());
+ assertNull(event.getStartTime());
assertEquals(coord.getUser(), event.getUser());
assertEquals(coord.getAppName(), event.getAppName());
assertEquals(0, queue.size());
@@ -251,13 +253,15 @@ public class TestEventGeneration extends
}
});
+ action = jpaService.execute(coordGetCmd);
event = (JobEvent) queue.poll();
assertEquals(EventStatus.STARTED, event.getEventStatus());
assertEquals(AppType.COORDINATOR_ACTION, event.getAppType());
assertEquals(action.getId(), event.getId());
assertEquals(action.getJobId(), event.getParentId());
assertEquals(action.getNominalTime(), ((CoordinatorActionEvent) event).getNominalTime());
- assertEquals(action.getCreatedTime(), event.getStartTime());
+ WorkflowJobBean wjb = jpaService.execute(new WorkflowJobGetJPAExecutor(action.getExternalId()));
+ assertEquals(wjb.getStartTime(), event.getStartTime());
assertEquals(coord.getUser(), event.getUser());
assertEquals(coord.getAppName(), event.getAppName());
@@ -266,17 +270,19 @@ public class TestEventGeneration extends
WorkflowJobBean wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(action.getExternalId()));
wfJob.setStatus(WorkflowJob.Status.SUCCEEDED);
jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
+ action.setStatus(CoordinatorAction.Status.RUNNING);
+ jpaService.execute(new CoordActionUpdateJPAExecutor(action));
new CoordActionCheckXCommand(action.getId(), 0).call();
- Thread.sleep(300);
action = jpaService.execute(coordGetCmd);
assertEquals(CoordinatorAction.Status.SUCCEEDED, action.getStatus());
- event = (JobEvent) queue.poll();
+ List<Event> list = queue.pollBatch();
+ event = (JobEvent)list.get(list.size()-1);
assertEquals(EventStatus.SUCCESS, event.getEventStatus());
assertEquals(AppType.COORDINATOR_ACTION, event.getAppType());
assertEquals(action.getId(), event.getId());
assertEquals(action.getJobId(), event.getParentId());
assertEquals(action.getNominalTime(), ((CoordinatorActionEvent) event).getNominalTime());
- assertEquals(action.getCreatedTime(), event.getStartTime());
+ assertEquals(wfJob.getStartTime(), event.getStartTime());
assertEquals(coord.getUser(), event.getUser());
assertEquals(coord.getAppName(), event.getAppName());
@@ -294,7 +300,7 @@ public class TestEventGeneration extends
assertEquals(action.getId(), event.getId());
assertEquals(action.getJobId(), event.getParentId());
assertEquals(action.getNominalTime(), ((CoordinatorActionEvent) event).getNominalTime());
- assertEquals(action.getCreatedTime(), event.getStartTime());
+ assertEquals(wfJob.getStartTime(), event.getStartTime());
assertEquals(coord.getUser(), event.getUser());
assertEquals(coord.getAppName(), event.getAppName());
@@ -303,18 +309,25 @@ public class TestEventGeneration extends
jpaService.execute(new CoordJobUpdateJPAExecutor(coord));
action.setStatus(CoordinatorAction.Status.SUSPENDED);
jpaService.execute(new CoordActionUpdateJPAExecutor(action));
+ wfJob.setStatus(WorkflowJob.Status.SUSPENDED);
+ WorkflowInstance wfInstance = wfJob.getWorkflowInstance();
+ ((LiteWorkflowInstance) wfInstance).setStatus(WorkflowInstance.Status.SUSPENDED);
+ wfJob.setWorkflowInstance(wfInstance);
+ jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
new CoordResumeXCommand(coord.getId()).call();
+ Thread.sleep(5000);
CoordinatorActionEvent cevent = (CoordinatorActionEvent) queue.poll();
assertEquals(EventStatus.STARTED, cevent.getEventStatus());
assertEquals(AppType.COORDINATOR_ACTION, cevent.getAppType());
assertEquals(action.getId(), cevent.getId());
assertEquals(action.getJobId(), cevent.getParentId());
assertEquals(action.getNominalTime(), cevent.getNominalTime());
- assertEquals(action.getCreatedTime(), cevent.getStartTime());
+ assertEquals(wfJob.getStartTime(), cevent.getStartTime());
// Action going to WAITING on Coord Rerun
action.setStatus(CoordinatorAction.Status.SUCCEEDED);
jpaService.execute(new CoordActionUpdateJPAExecutor(action));
+ queue.clear();
new CoordRerunXCommand(coord.getId(), RestConstants.JOB_COORD_RERUN_ACTION, "1", false, true)
.call();
waitFor(3 * 100, new Predicate() {
@@ -329,7 +342,7 @@ public class TestEventGeneration extends
assertEquals(action.getId(), cevent.getId());
assertEquals(action.getJobId(), cevent.getParentId());
assertEquals(action.getNominalTime(), cevent.getNominalTime());
- assertEquals(action.getCreatedTime(), event.getStartTime());
+ assertEquals(wfJob.getStartTime(), event.getStartTime());
assertNotNull(cevent.getMissingDeps());
}
@@ -426,12 +439,15 @@ public class TestEventGeneration extends
final CoordinatorJobBean coord = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
final CoordinatorActionBean action = addRecordToCoordActionTable(coord.getId(), 1,
CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0);
+ WorkflowJobBean wjb = new WorkflowJobBean();
+ wjb.setId(action.getExternalId());
JPAService jpaService = services.get(JPAService.class);
+ jpaService.execute(new WorkflowJobUpdateJPAExecutor(wjb));
CoordinatorXCommand<Void> myCmd = new CoordActionCheckXCommand(action.getId(), 0) {
@Override
protected Void execute() {
- CoordinatorXCommand.generateEvent(action, coord.getUser(), coord.getAppName());
+ CoordinatorXCommand.generateEvent(action, coord.getUser(), coord.getAppName(), null);
return null;
}
};