You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by pu...@apache.org on 2016/07/01 20:51:04 UTC
oozie git commit: OOZIE-2436 Fork/join workflow fails with
oozie.action.yarn.tag must not be null
Repository: oozie
Updated Branches:
refs/heads/master 1c4d56164 -> 2322d496c
OOZIE-2436 Fork/join workflow fails with oozie.action.yarn.tag must not be null
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/2322d496
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/2322d496
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/2322d496
Branch: refs/heads/master
Commit: 2322d496c73dac43859771a9564776c098289e5e
Parents: 1c4d561
Author: Purshotam Shah <pu...@yahoo-inc.com>
Authored: Fri Jul 1 13:50:53 2016 -0700
Committer: Purshotam Shah <pu...@yahoo-inc.com>
Committed: Fri Jul 1 13:50:53 2016 -0700
----------------------------------------------------------------------
.../org/apache/oozie/action/ActionExecutor.java | 26 ++++++++
.../oozie/action/hadoop/JavaActionExecutor.java | 43 ++++++-------
.../action/oozie/SubWorkflowActionExecutor.java | 9 ++-
.../oozie/command/wf/ActionStartXCommand.java | 30 ++++-----
.../apache/oozie/command/wf/ActionXCommand.java | 40 +++++++++++-
.../command/wf/ForkedActionStartXCommand.java | 7 +++
.../apache/oozie/command/wf/SignalXCommand.java | 65 +++++++++++++-------
.../oozie/workflow/lite/ControlNodeHandler.java | 6 ++
.../action/hadoop/TestJavaActionExecutor.java | 6 ++
release-log.txt | 1 +
10 files changed, 162 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/2322d496/core/src/main/java/org/apache/oozie/action/ActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/ActionExecutor.java b/core/src/main/java/org/apache/oozie/action/ActionExecutor.java
index 2be4549..3f978fd 100644
--- a/core/src/main/java/org/apache/oozie/action/ActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/ActionExecutor.java
@@ -56,6 +56,9 @@ public abstract class ActionExecutor {
public static final String ACTION_RETRY_POLICY = CONF_PREFIX + "retry.policy";
+ public static final String OOZIE_ACTION_YARN_TAG = "oozie.action.yarn.tag";
+
+
/**
* Error code used by {@link #convertException} when there is not register error information for an exception.
*/
@@ -581,4 +584,27 @@ public abstract class ActionExecutor {
public boolean supportsConfigurationJobXML() {
return false;
}
+
+ /**
+ * Creating and forwarding the tag, It will be useful during repeat attempts of Launcher, to ensure only
+ * one child job is running. Tag is formed as follows:
+ * For workflow job, tag = action-id
+ * For Coord job, tag = coord-action-id@action-name (if not part of sub flow), else
+ * coord-action-id@subflow-action-name@action-name.
+ * @param conf the conf
+ * @param wfJob the wf job
+ * @param action the action
+ * @return the action yarn tag
+ */
+ public String getActionYarnTag(Configuration conf, WorkflowJob wfJob, WorkflowAction action) {
+ if (conf.get(OOZIE_ACTION_YARN_TAG) != null) {
+ return conf.get(OOZIE_ACTION_YARN_TAG) + "@" + action.getName();
+ }
+ else if (wfJob.getParentId() != null) {
+ return wfJob.getParentId() + "@" + action.getName();
+ }
+ else {
+ return action.getId();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/2322d496/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
index 639003e..f2273d6 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
@@ -62,7 +62,6 @@ import org.apache.oozie.action.ActionExecutorException;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.command.coord.CoordActionStartXCommand;
-import org.apache.oozie.command.wf.ActionStartXCommand;
import org.apache.oozie.service.ConfigurationService;
import org.apache.oozie.service.HadoopAccessorException;
import org.apache.oozie.service.HadoopAccessorService;
@@ -125,6 +124,8 @@ public class JavaActionExecutor extends ActionExecutor {
public static final String HADOOP_USER_CLASSPATH_FIRST = "mapreduce.user.classpath.first";
public static final String OOZIE_CREDENTIALS_SKIP = "oozie.credentials.skip";
+ public XConfiguration workflowConf = null;
+
static {
DISALLOWED_PROPERTIES.add(HADOOP_USER);
DISALLOWED_PROPERTIES.add(HADOOP_JOB_TRACKER);
@@ -852,7 +853,7 @@ public class JavaActionExecutor extends ActionExecutor {
throws ActionExecutorException {
XConfiguration wfJobConf = null;
try {
- wfJobConf = new XConfiguration(new StringReader(context.getWorkflow().getConf()));
+ wfJobConf = getWorkflowConf(context);
}
catch (IOException ioe) {
throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "It should never happen",
@@ -915,14 +916,6 @@ public class JavaActionExecutor extends ActionExecutor {
launcherJobConf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", true);
setupLauncherConf(launcherJobConf, actionXml, appPathRoot, context);
- String launcherTag = null;
- // Extracting tag and appending action name to maintain the uniqueness.
- if (context.getVar(ActionStartXCommand.OOZIE_ACTION_YARN_TAG) != null) {
- launcherTag = context.getVar(ActionStartXCommand.OOZIE_ACTION_YARN_TAG);
- } else { //Keeping it to maintain backward compatibly with test cases.
- launcherTag = action.getId();
- }
-
// Properties for when a launcher job's AM gets restarted
if (ConfigurationService.getBoolean(HADOOP_YARN_KILL_CHILD_JOBS_ON_AMRESTART)) {
// launcher time filter is required to prune the search of launcher tag.
@@ -930,14 +923,16 @@ public class JavaActionExecutor extends ActionExecutor {
// time. Workflow created time is good enough when workflow is running independently or workflow is
// rerunning from failed node.
long launcherTime = System.currentTimeMillis();
- String coordActionNominalTime = context.getProtoActionConf()
- .get(CoordActionStartXCommand.OOZIE_COORD_ACTION_NOMINAL_TIME);
+ String coordActionNominalTime = context.getProtoActionConf().get(
+ CoordActionStartXCommand.OOZIE_COORD_ACTION_NOMINAL_TIME);
if (coordActionNominalTime != null) {
launcherTime = Long.parseLong(coordActionNominalTime);
- } else if (context.getWorkflow().getCreatedTime() != null) {
+ }
+ else if (context.getWorkflow().getCreatedTime() != null) {
launcherTime = context.getWorkflow().getCreatedTime().getTime();
}
- LauncherMapperHelper.setupYarnRestartHandling(launcherJobConf, actionConf, launcherTag, launcherTime);
+ String actionYarnTag = getActionYarnTag(getWorkflowConf(context), context.getWorkflow(), action);
+ LauncherMapperHelper.setupYarnRestartHandling(launcherJobConf, actionConf, actionYarnTag, launcherTime);
}
else {
LOG.info(MessageFormat.format("{0} is set to false, not setting YARN restart properties",
@@ -1237,13 +1232,7 @@ public class JavaActionExecutor extends ActionExecutor {
HashMap<String, CredentialsProperties> credPropertiesMap = null;
if (context != null && action != null) {
if (!"true".equals(actionConf.get(OOZIE_CREDENTIALS_SKIP))) {
- XConfiguration wfJobConf = null;
- try {
- wfJobConf = new XConfiguration(new StringReader(context.getWorkflow().getConf()));
- } catch (IOException ioe) {
- throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "It should never happen",
- ioe.getMessage());
- }
+ XConfiguration wfJobConf = getWorkflowConf(context);
if ("false".equals(actionConf.get(OOZIE_CREDENTIALS_SKIP)) ||
!wfJobConf.getBoolean(OOZIE_CREDENTIALS_SKIP, ConfigurationService.getBoolean(OOZIE_CREDENTIALS_SKIP))) {
credPropertiesMap = getActionCredentialsProperties(context, action);
@@ -1327,7 +1316,7 @@ public class JavaActionExecutor extends ActionExecutor {
throws Exception {
CredentialsProperties credProp = null;
String workflowXml = ((WorkflowJobBean) context.getWorkflow()).getWorkflowInstance().getApp().getDefinition();
- XConfiguration wfjobConf = new XConfiguration(new StringReader(context.getWorkflow().getConf()));
+ XConfiguration wfjobConf = getWorkflowConf(context);
Element elementJob = XmlUtils.parseXml(workflowXml);
Element credentials = elementJob.getChild("credentials", elementJob.getNamespace());
if (credentials != null) {
@@ -1675,7 +1664,7 @@ public class JavaActionExecutor extends ActionExecutor {
String[] names = conf.getStrings(ACTION_SHARELIB_FOR + getType());
if (names == null || names.length == 0) {
try {
- XConfiguration jobConf = new XConfiguration(new StringReader(context.getWorkflow().getConf()));
+ XConfiguration jobConf = getWorkflowConf(context);
names = jobConf.getStrings(ACTION_SHARELIB_FOR + getType());
if (names == null || names.length == 0) {
names = Services.get().getConf().getStrings(ACTION_SHARELIB_FOR + getType());
@@ -1745,4 +1734,12 @@ public class JavaActionExecutor extends ActionExecutor {
public boolean supportsConfigurationJobXML() {
return true;
}
+
+ private XConfiguration getWorkflowConf(Context context) throws IOException {
+ if (workflowConf == null) {
+ workflowConf = new XConfiguration(new StringReader(context.getWorkflow().getConf()));
+ }
+ return workflowConf;
+
+ }
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/2322d496/core/src/main/java/org/apache/oozie/action/oozie/SubWorkflowActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/oozie/SubWorkflowActionExecutor.java b/core/src/main/java/org/apache/oozie/action/oozie/SubWorkflowActionExecutor.java
index f77e52c..1ea7097 100644
--- a/core/src/main/java/org/apache/oozie/action/oozie/SubWorkflowActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/oozie/SubWorkflowActionExecutor.java
@@ -58,6 +58,8 @@ public class SubWorkflowActionExecutor extends ActionExecutor {
public static final String SUBWORKFLOW_RERUN = "oozie.action.subworkflow.rerun";
private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
+ public XLog LOG = XLog.getLog(getClass());
+
static {
String[] badUserProps = {PropertiesUtils.DAYS, PropertiesUtils.HOURS, PropertiesUtils.MINUTES,
@@ -220,11 +222,7 @@ public class SubWorkflowActionExecutor extends ActionExecutor {
JobUtils.normalizeAppPath(context.getWorkflow().getUser(), context.getWorkflow().getGroup(),
subWorkflowConf);
- // pushing the tag to conf for using by Launcher.
- if(context.getVar(ActionStartXCommand.OOZIE_ACTION_YARN_TAG) != null) {
- subWorkflowConf.set(ActionStartXCommand.OOZIE_ACTION_YARN_TAG,
- context.getVar(ActionStartXCommand.OOZIE_ACTION_YARN_TAG));
- }
+ subWorkflowConf.set(OOZIE_ACTION_YARN_TAG, getActionYarnTag(parentConf, context.getWorkflow(), action));
// if the rerun failed node option is provided during the time of rerun command, old subworkflow will
// rerun again.
@@ -247,6 +245,7 @@ public class SubWorkflowActionExecutor extends ActionExecutor {
}
}
catch (Exception ex) {
+ LOG.error(ex);
throw convertException(ex);
}
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/2322d496/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
index 8b0be9c..41f4430 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
@@ -67,11 +67,10 @@ public class ActionStartXCommand extends ActionXCommand<org.apache.oozie.command
public static final String COULD_NOT_START = "COULD_NOT_START";
public static final String START_DATA_MISSING = "START_DATA_MISSING";
public static final String EXEC_DATA_MISSING = "EXEC_DATA_MISSING";
- public static final String OOZIE_ACTION_YARN_TAG = "oozie.action.yarn.tag";
private String jobId = null;
protected String actionId = null;
- private WorkflowJobBean wfJob = null;
+ protected WorkflowJobBean wfJob = null;
protected WorkflowActionBean wfAction = null;
private JPAService jpaService = null;
private ActionExecutor executor = null;
@@ -185,7 +184,7 @@ public class ActionStartXCommand extends ActionXCommand<org.apache.oozie.command
isUserRetry = true;
prepareForRetry(wfAction);
}
- context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry);
+ context = getContext(isRetry, isUserRetry);
boolean caught = false;
try {
if (!(executor instanceof ControlNodeActionExecutor)) {
@@ -230,21 +229,6 @@ public class ActionStartXCommand extends ActionXCommand<org.apache.oozie.command
Instrumentation.Cron cron = new Instrumentation.Cron();
cron.start();
context.setStartTime();
- /*
- Creating and forwarding the tag, It will be useful during repeat attempts of Launcher, to ensure only
- one child job is running. Tag is formed as follows:
- For workflow job, tag = action-id
- For Coord job, tag = coord-action-id@action-name (if not part of sub flow), else
- coord-action-id@subflow-action-name@action-name.
- */
- if (conf.get(OOZIE_ACTION_YARN_TAG) != null) {
- context.setVar(OOZIE_ACTION_YARN_TAG, conf.get(OOZIE_ACTION_YARN_TAG) + "@" + wfAction.getName());
- } else if (wfJob.getParentId() != null) {
- context.setVar(OOZIE_ACTION_YARN_TAG, wfJob.getParentId() + "@" + wfAction.getName());
- } else {
- context.setVar(OOZIE_ACTION_YARN_TAG, wfAction.getId());
- }
-
executor.start(context, wfAction);
cron.stop();
FaultInjection.activate("org.apache.oozie.command.SkipCommitFaultInjection");
@@ -356,6 +340,16 @@ public class ActionStartXCommand extends ActionXCommand<org.apache.oozie.command
new ActionEndXCommand(wfAction.getId(), wfAction.getType()).call();
}
+ /**
+ * Get action executor context
+ * @param isRetry
+ * @param isUserRetry
+ * @return
+ */
+ protected ActionExecutorContext getContext(boolean isRetry, boolean isUserRetry) {
+ return new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry);
+ }
+
protected void updateJobLastModified(){
wfJob.setLastModifiedTime(new Date());
updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, wfJob));
http://git-wip-us.apache.org/repos/asf/oozie/blob/2322d496/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java
index 525ef94..e65c3bf 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java
@@ -23,6 +23,8 @@ import java.io.StringReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Properties;
import java.util.Set;
@@ -277,9 +279,9 @@ public abstract class ActionXCommand<T> extends WorkflowXCommand<T> {
*
*/
public static class ActionExecutorContext implements ActionExecutor.Context {
- private final WorkflowJobBean workflow;
+ protected final WorkflowJobBean workflow;
private Configuration protoConf;
- private final WorkflowActionBean action;
+ protected final WorkflowActionBean action;
private final boolean isRetry;
private final boolean isUserRetry;
private boolean started;
@@ -353,6 +355,15 @@ public abstract class ActionXCommand<T> extends WorkflowXCommand<T> {
* @see org.apache.oozie.action.ActionExecutor.Context#setVar(java.lang.String, java.lang.String)
*/
public void setVar(String name, String value) {
+ setVarToWorkflow(name, value);
+ }
+
+ /**
+ * This is not thread safe, don't use if workflowjob is shared among multiple actions command
+ * @param name
+ * @param value
+ */
+ public void setVarToWorkflow(String name, String value) {
name = action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + name;
WorkflowInstance wfInstance = workflow.getWorkflowInstance();
wfInstance.setVar(name, value);
@@ -520,7 +531,32 @@ public abstract class ActionXCommand<T> extends WorkflowXCommand<T> {
public void setJobStatus(Job.Status jobStatus) {
this.jobStatus = jobStatus;
}
+ }
+
+ public static class ForkedActionExecutorContext extends ActionExecutorContext {
+ private Map<String, String> contextVariableMap = new HashMap<String, String>();
+
+ public ForkedActionExecutorContext(WorkflowJobBean workflow, WorkflowActionBean action, boolean isRetry,
+ boolean isUserRetry) {
+ super(workflow, action, isRetry, isUserRetry);
+ }
+
+ public void setVar(String name, String value) {
+ if (value != null) {
+ contextVariableMap.remove(name);
+ }
+ else {
+ contextVariableMap.put(name, value);
+ }
+ }
+ public String getVar(String name) {
+ return contextVariableMap.get(name);
+ }
+
+ public Map<String, String> getContextMap() {
+ return contextVariableMap;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/2322d496/core/src/main/java/org/apache/oozie/command/wf/ForkedActionStartXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/wf/ForkedActionStartXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ForkedActionStartXCommand.java
index 47dca75..91da0b8 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/ForkedActionStartXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/ForkedActionStartXCommand.java
@@ -25,6 +25,7 @@ import org.apache.oozie.client.Job;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.XCommand;
+import org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext;
public class ForkedActionStartXCommand extends ActionStartXCommand {
@@ -96,4 +97,10 @@ public class ForkedActionStartXCommand extends ActionStartXCommand {
protected void callActionEnd() {
queue(new ActionEndXCommand(wfAction.getId(), wfAction.getType()));
}
+
+ @Override
+ protected ActionExecutorContext getContext(boolean isRetry, boolean isUserRetry){
+ return new ActionXCommand.ForkedActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/2322d496/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
index d2bb403..e95a60a 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
@@ -39,6 +39,7 @@ import org.apache.oozie.XException;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.PreconditionException;
import org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext;
+import org.apache.oozie.command.wf.ActionXCommand.ForkedActionExecutorContext;
import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
import org.apache.oozie.executor.jpa.BatchQueryExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
@@ -448,8 +449,7 @@ public class SignalXCommand extends WorkflowXCommand<Void> {
}
// Changing to synchronous call from asynchronous queuing to prevent
// undue delay from between end of previous and start of next action
- if (wfJob.getStatus() != WorkflowJob.Status.RUNNING
- && wfJob.getStatus() != WorkflowJob.Status.SUSPENDED) {
+ if (wfJob.getStatus() != WorkflowJob.Status.RUNNING && wfJob.getStatus() != WorkflowJob.Status.SUSPENDED) {
// only for asynchronous actions, parent coord action's external id will
// persisted and following update will succeed.
updateParentIfNecessary(wfJob);
@@ -458,7 +458,7 @@ public class SignalXCommand extends WorkflowXCommand<Void> {
else if (syncAction != null) {
new ActionStartXCommand(wfJob, syncAction.getId(), syncAction.getType()).call();
}
- else if (!workflowActionBeanListForForked.isEmpty() && !checkForSuspendNode(workflowActionBeanListForForked)){
+ else if (!workflowActionBeanListForForked.isEmpty() && !checkForSuspendNode(workflowActionBeanListForForked)) {
startForkedActions(workflowActionBeanListForForked);
}
LOG.debug("ENDED SignalCommand for jobid=" + jobId + ", actionId=" + actionId);
@@ -467,9 +467,12 @@ public class SignalXCommand extends WorkflowXCommand<Void> {
public void startForkedActions(List<WorkflowActionBean> workflowActionBeanListForForked) throws CommandException {
- List<CallableWrapper<ActionExecutorContext>> tasks =
- new ArrayList<CallableWrapper<ActionExecutorContext>>();
- boolean updateLastModified = true;
+ List<CallableWrapper<ActionExecutorContext>> tasks = new ArrayList<CallableWrapper<ActionExecutorContext>>();
+ List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
+ List<JsonBean> insertList = new ArrayList<JsonBean>();
+
+ boolean endWorkflow = false;
+ boolean submitJobByQueuing = false;
for (WorkflowActionBean workflowActionBean : workflowActionBeanListForForked) {
LOG.debug("Starting forked actions parallely : " + workflowActionBean.getId());
tasks.add(Services.get().get(CallableQueueService.class).new CallableWrapper<ActionExecutorContext>(
@@ -477,18 +480,26 @@ public class SignalXCommand extends WorkflowXCommand<Void> {
}
try {
- List<Future<ActionExecutorContext>> futures = Services.get().get(CallableQueueService.class).invokeAll(tasks);
+ List<Future<ActionExecutorContext>> futures = Services.get().get(CallableQueueService.class)
+ .invokeAll(tasks);
for (Future<ActionExecutorContext> result : futures) {
+ if (result == null) {
+ submitJobByQueuing = true;
+ continue;
+ }
ActionExecutorContext context = result.get();
+ Map<String, String> contextVariableMap = ((ForkedActionExecutorContext) context).getContextMap();
+ LOG.debug("contextVariableMap size of action " + context.getAction().getId() + " is " + contextVariableMap.size());
+ for (String key : contextVariableMap.keySet()) {
+ context.setVarToWorkflow(key, contextVariableMap.get(key));
+ }
if (context.getJobStatus() != null && context.getJobStatus().equals(Job.Status.FAILED)) {
LOG.warn("Action has failed, failing job" + context.getAction().getId());
new ActionStartXCommand(context.getAction().getId(), null).failJob(context);
updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_START,
(WorkflowActionBean) context.getAction()));
if (context.isShouldEndWF()) {
- endWF();
- updateLastModified = false;
- break;
+ endWorkflow = true;
}
}
if (context.getJobStatus() != null && context.getJobStatus().equals(Job.Status.SUSPENDED)) {
@@ -498,20 +509,34 @@ public class SignalXCommand extends WorkflowXCommand<Void> {
updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_START,
(WorkflowActionBean) context.getAction()));
if (context.isShouldEndWF()) {
- endWF();
- updateLastModified = false;
- break;
+ endWorkflow = true;
}
}
}
+ if (endWorkflow) {
+ endWF(insertList);
+ }
+
}
catch (Exception e) {
LOG.error("Error running forked jobs parallely", e);
startForkedActionsByQueuing(workflowActionBeanListForForked);
+ submitJobByQueuing = false;
+ }
+ if (submitJobByQueuing && !endWorkflow) {
+ LOG.error("There is error in running forked jobs parallely");
+ startForkedActionsByQueuing(workflowActionBeanListForForked);
+ }
+ wfJob.setLastModifiedTime(new Date());
+ updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED,
+ wfJob));
+ try {
+ BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null);
}
- if (updateLastModified) {
- updateJobLastModified();
+ catch (JPAExecutorException e) {
+ throw new CommandException(e);
}
+
LOG.debug("forked actions submitted parallely");
}
@@ -519,17 +544,11 @@ public class SignalXCommand extends WorkflowXCommand<Void> {
//queuing all jobs, submitted job will fail in precondition
for (WorkflowActionBean workflowActionBean : workflowActionBeanListForForked) {
LOG.debug("Queuing fork action " + workflowActionBean.getId());
- queue(new ActionStartXCommand(wfJob, workflowActionBean.getId(), workflowActionBean.getType()));
+ queue(new ActionStartXCommand(workflowActionBean.getId(), workflowActionBean.getType()));
}
}
- protected void updateJobLastModified() {
- wfJob.setLastModifiedTime(new Date());
- updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED,
- wfJob));
- }
-
- protected void endWF() throws CommandException {
+ private void endWF(List<JsonBean> insertList) throws CommandException {
updateParentIfNecessary(wfJob, 3);
new WfEndXCommand(wfJob).call(); // To delete the WF temp dir
SLAEventBean slaEvent2 = SLADbXOperations.createStatusEvent(wfJob.getSlaXml(), wfJob.getId(), Status.FAILED,
http://git-wip-us.apache.org/repos/asf/oozie/blob/2322d496/core/src/main/java/org/apache/oozie/workflow/lite/ControlNodeHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/workflow/lite/ControlNodeHandler.java b/core/src/main/java/org/apache/oozie/workflow/lite/ControlNodeHandler.java
index c1f7cb1..8da8f03 100644
--- a/core/src/main/java/org/apache/oozie/workflow/lite/ControlNodeHandler.java
+++ b/core/src/main/java/org/apache/oozie/workflow/lite/ControlNodeHandler.java
@@ -19,6 +19,7 @@
package org.apache.oozie.workflow.lite;
import org.apache.oozie.ErrorCode;
+import org.apache.oozie.util.XLog;
import org.apache.oozie.workflow.WorkflowException;
import java.util.ArrayList;
@@ -31,6 +32,8 @@ import java.util.List;
public abstract class ControlNodeHandler extends NodeHandler {
public static final String FORK_COUNT_PREFIX = "workflow.fork.";
+ public XLog LOG = XLog.getLog(getClass());
+
/**
* Called by {@link #enter(Context)} when returning TRUE.
@@ -62,6 +65,7 @@ public abstract class ControlNodeHandler extends NodeHandler {
else if (nodeClass.equals(JoinNodeDef.class)) {
String parentExecutionPath = context.getParentExecutionPath(context.getExecutionPath());
String forkCount = context.getVar(FORK_COUNT_PREFIX + parentExecutionPath);
+
if (forkCount == null) {
throw new WorkflowException(ErrorCode.E0720, context.getNodeDef().getName());
}
@@ -73,6 +77,8 @@ public abstract class ControlNodeHandler extends NodeHandler {
else {
context.setVar(FORK_COUNT_PREFIX + parentExecutionPath, null);
}
+ LOG.debug("count = " + count + " for parent execution path " + parentExecutionPath);
+
doTouch = (count == 0);
}
else {
http://git-wip-us.apache.org/repos/asf/oozie/blob/2322d496/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
index 85bb993..5f9e29a 100644
--- a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
+++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
@@ -1256,6 +1256,12 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase {
+ "<value>java-job-conf</value>" + "</property>"
+ "</configuration>";
wfBean.setConf(jobConf);
+ ae = new JavaActionExecutor() {
+ @Override
+ protected String getDefaultShareLibName(Element actionXml) {
+ return "java-action-executor";
+ }
+ };
Assert.assertArrayEquals(new String[] { "java-job-conf" },
ae.getShareLibNames(context, new Element("java"), actionConf));
http://git-wip-us.apache.org/repos/asf/oozie/blob/2322d496/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 8612854..ab44c24 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 4.3.0 release (trunk - unreleased)
+OOZIE-2436 Fork/join workflow fails with "oozie.action.yarn.tag must not be null" (puru)
OOZIE-2578 Oozie example distcp job fails to run within an encrypted zone with checksum match error (pbacsko via rkanter)
OOZIE-2362 SQL injection in BulkJPAExecutor (pbacsko via rkanter)
OOZIE-2577 Flaky tests TestCoordActionInputCheckXCommand.testTimeout and testTimeoutWithException (pbacsko via rkanter)