You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by an...@apache.org on 2018/09/11 14:21:34 UTC
oozie git commit: OOZIE-3298 [MapReduce action] External ID is not
filled properly and failing MR job is treated as SUCCEEDED (andras.piros via
pbacsko, asasvari, gezapeti)
Repository: oozie
Updated Branches:
refs/heads/master 7f59c8769 -> d5c4f3b7b
OOZIE-3298 [MapReduce action] External ID is not filled properly and failing MR job is treated as SUCCEEDED (andras.piros via pbacsko, asasvari, gezapeti)
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/d5c4f3b7
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/d5c4f3b7
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/d5c4f3b7
Branch: refs/heads/master
Commit: d5c4f3b7bf3dd83b986a375016e7805a3c079086
Parents: 7f59c87
Author: Andras Piros <an...@cloudera.com>
Authored: Tue Sep 11 16:19:13 2018 +0200
Committer: Andras Piros <an...@cloudera.com>
Committed: Tue Sep 11 16:19:13 2018 +0200
----------------------------------------------------------------------
.../oozie/action/hadoop/JavaActionExecutor.java | 69 +++--
.../action/hadoop/MapReduceActionExecutor.java | 289 ++++++++++++++++++-
.../action/hadoop/ActionExecutorTestCase.java | 170 +++++++++++
.../action/hadoop/TestJavaActionExecutor.java | 5 -
.../hadoop/TestYarnApplicationIdComparator.java | 71 +++++
.../hadoop/TestYarnApplicationIdFinder.java | 242 ++++++++++++++++
.../oozie/TestSubWorkflowActionExecutor.java | 136 +--------
release-log.txt | 1 +
.../oozie/action/hadoop/LauncherMain.java | 67 +++--
.../hadoop/TestMapReduceActionExecutor.java | 171 ++++++++++-
10 files changed, 1014 insertions(+), 207 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/d5c4f3b7/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
index 05fac39..0385c77 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
@@ -977,7 +977,8 @@ public class JavaActionExecutor extends ActionExecutor {
}
}
- public void submitLauncher(FileSystem actionFs, final Context context, WorkflowAction action) throws ActionExecutorException {
+ public void submitLauncher(final FileSystem actionFs, final Context context, final WorkflowAction action)
+ throws ActionExecutorException {
YarnClient yarnClient = null;
try {
Path appPathRoot = new Path(context.getWorkflow().getAppPath());
@@ -993,15 +994,13 @@ public class JavaActionExecutor extends ActionExecutor {
// action job configuration
Configuration actionConf = loadHadoopDefaultResources(context, actionXml);
setupActionConf(actionConf, context, actionXml, appPathRoot);
- addAppNameContext(action, context);
+ addAppNameContext(context, action);
LOG.debug("Setting LibFilesArchives ");
setLibFilesArchives(context, actionXml, appPathRoot, actionConf);
String jobName = actionConf.get(HADOOP_JOB_NAME);
if (jobName == null || jobName.isEmpty()) {
- jobName = XLog.format("oozie:action:T={0}:W={1}:A={2}:ID={3}",
- getType(), context.getWorkflow().getAppName(),
- action.getName(), context.getWorkflow().getId());
+ jobName = getYarnApplicationName(context, action, "oozie:action");
actionConf.set(HADOOP_JOB_NAME, jobName);
}
@@ -1067,8 +1066,7 @@ public class JavaActionExecutor extends ActionExecutor {
YarnClientApplication newApp = yarnClient.createApplication();
ApplicationId appId = newApp.getNewApplicationResponse().getApplicationId();
ApplicationSubmissionContext appContext =
- createAppSubmissionContext(appId, launcherConf, context, actionConf, action.getName(),
- credentials, actionXml);
+ createAppSubmissionContext(appId, launcherConf, context, actionConf, action, credentials, actionXml);
yarnClient.submitApplication(appContext);
launcherId = appId.toString();
@@ -1090,6 +1088,15 @@ public class JavaActionExecutor extends ActionExecutor {
}
}
+ private String getYarnApplicationName(final Context context, final WorkflowAction action, final String prefix) {
+ return XLog.format("{0}:T={1}:W={2}:A={3}:ID={4}",
+ prefix,
+ getType(),
+ context.getWorkflow().getAppName(),
+ action.getName(),
+ context.getWorkflow().getId());
+ }
+
private void removeHBaseSettingFromOozieDefaultResource(final Configuration jobConf) {
final String[] propertySources = jobConf.getPropertySources(HbaseCredentials.HBASE_USE_DYNAMIC_JARS);
if (propertySources != null && propertySources.length > 0 &&
@@ -1100,12 +1107,8 @@ public class JavaActionExecutor extends ActionExecutor {
}
}
- protected void addAppNameContext(WorkflowAction action, Context context) {
- String oozieActionName = String.format("oozie:launcher:T=%s:W=%s:A=%s:ID=%s",
- getType(),
- context.getWorkflow().getAppName(),
- action.getName(),
- context.getWorkflow().getId());
+ private void addAppNameContext(final Context context, final WorkflowAction action) {
+ final String oozieActionName = getYarnApplicationName(context, action, "oozie:launcher");
context.setVar(OOZIE_ACTION_NAME, oozieActionName);
}
@@ -1128,7 +1131,7 @@ public class JavaActionExecutor extends ActionExecutor {
final Configuration launcherJobConf,
final Context actionContext,
final Configuration actionConf,
- final String actionName,
+ final WorkflowAction action,
final Credentials credentials,
final Element actionXml)
throws IOException, HadoopAccessorException, URISyntaxException {
@@ -1139,7 +1142,7 @@ public class JavaActionExecutor extends ActionExecutor {
setPriority(launcherJobConf, appContext);
setQueue(launcherJobConf, appContext);
appContext.setApplicationId(appId);
- setApplicationName(actionContext, actionName, appContext);
+ setApplicationName(actionContext, action, appContext);
appContext.setApplicationType("Oozie Launcher");
setMaxAttempts(launcherJobConf, appContext);
@@ -1286,10 +1289,10 @@ public class JavaActionExecutor extends ActionExecutor {
return oldJavaOpts;
}
- private void setApplicationName(Context context, String actionName, ApplicationSubmissionContext appContext) {
- String jobName = XLog.format("oozie:launcher:T={0}:W={1}:A={2}:ID={3}", getType(),
- context.getWorkflow().getAppName(), actionName,
- context.getWorkflow().getId());
+ private void setApplicationName(final Context context,
+ final WorkflowAction action,
+ final ApplicationSubmissionContext appContext) {
+ final String jobName = getYarnApplicationName(context, action, "oozie:launcher");
appContext.setApplicationName(jobName);
}
@@ -1642,15 +1645,18 @@ public class JavaActionExecutor extends ActionExecutor {
yarnClient = createYarnClient(context, jobConf);
FinalApplicationStatus appStatus = null;
try {
- ApplicationReport appReport =
- yarnClient.getApplicationReport(ConverterUtils.toApplicationId(action.getExternalId()));
- YarnApplicationState appState = appReport.getYarnApplicationState();
+ final String effectiveApplicationId = findYarnApplicationId(context, action);
+ final ApplicationId applicationId = ConverterUtils.toApplicationId(effectiveApplicationId);
+ final ApplicationReport appReport = yarnClient.getApplicationReport(applicationId);
+ final YarnApplicationState appState = appReport.getYarnApplicationState();
if (appState == YarnApplicationState.FAILED || appState == YarnApplicationState.FINISHED
|| appState == YarnApplicationState.KILLED) {
appStatus = appReport.getFinalApplicationStatus();
}
-
- } catch (Exception ye) {
+ } catch (final ActionExecutorException aae) {
+ LOG.warn("Foreseen Exception occurred while action execution; rethrowing ", aae);
+ throw aae;
+ } catch (final Exception ye) {
LOG.warn("Exception occurred while checking Launcher AM status; will try checking action data file instead ", ye);
// Fallback to action data file if we can't find the Launcher AM (maybe it got purged)
fallback = true;
@@ -1750,6 +1756,19 @@ public class JavaActionExecutor extends ActionExecutor {
}
/**
+ * For every {@link JavaActionExecutor} that is not {@link MapReduceActionExecutor}, the effective YARN application ID of the
+ * action is the one where {@link LauncherAM} is run, hence this default implementation.
+ * @param context the execution context
+ * @param action the workflow action
+ * @return a {@code String} that depicts the application ID of the launcher ApplicationMaster of this action
+ * @throws ActionExecutorException
+ */
+ protected String findYarnApplicationId(final Context context, final WorkflowAction action)
+ throws ActionExecutorException {
+ return action.getExternalId();
+ }
+
+ /**
* Get the output data of an action. Subclasses should override this method
* to get action specific output data.
* @param actionFs the FileSystem object
@@ -1861,7 +1880,7 @@ public class JavaActionExecutor extends ActionExecutor {
}
}
- private String getActionYarnTag(Context context, WorkflowAction action) {
+ protected String getActionYarnTag(Context context, WorkflowAction action) {
return LauncherHelper.getActionYarnTag(context.getProtoActionConf(), context.getWorkflow().getParentId(), action);
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/d5c4f3b7/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
index 83a23f5..a4dd13b 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
@@ -20,11 +20,22 @@ package org.apache.oozie.action.hadoop;
import java.io.IOException;
import java.io.StringReader;
+import java.net.URISyntaxException;
+import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
+import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import java.util.function.Predicate;
import com.google.common.base.Charsets;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Ordering;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -34,15 +45,22 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.action.ActionExecutorException;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.service.ConfigurationService;
+import org.apache.oozie.service.HadoopAccessorException;
import org.apache.oozie.util.XConfiguration;
import org.apache.oozie.util.XLog;
import org.apache.oozie.util.XmlUtils;
import org.jdom.Element;
+import org.jdom.JDOMException;
import org.jdom.Namespace;
+import static org.apache.oozie.action.hadoop.LauncherMain.CHILD_MAPREDUCE_JOB_TAGS;
+
public class MapReduceActionExecutor extends JavaActionExecutor {
public static final String OOZIE_ACTION_EXTERNAL_STATS_WRITE = "oozie.action.external.stats.write";
@@ -51,6 +69,7 @@ public class MapReduceActionExecutor extends JavaActionExecutor {
private static final String STREAMING_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.StreamingMain";
public static final String JOB_END_NOTIFICATION_URL = "job.end.notification.url";
private static final String MAPREDUCE_JOB_NAME = "mapreduce.job.name";
+ static final String YARN_APPLICATION_TYPE_MAPREDUCE = "MAPREDUCE";
private XLog log = XLog.getLog(getClass());
public MapReduceActionExecutor() {
@@ -341,33 +360,35 @@ public class MapReduceActionExecutor extends JavaActionExecutor {
Map<String, String> actionData;
Configuration jobConf;
+ // Need to emit jobConf and actionData for later usage
try {
- FileSystem actionFs = context.getAppFileSystem();
- Element actionXml = XmlUtils.parseXml(action.getConf());
+ final FileSystem actionFs = context.getAppFileSystem();
+ final Element actionXml = XmlUtils.parseXml(action.getConf());
jobConf = createBaseHadoopConf(context, actionXml);
- Path actionDir = context.getActionDir();
+ final Path actionDir = context.getActionDir();
actionData = LauncherHelper.getActionData(actionFs, actionDir, jobConf);
- } catch (Exception e) {
+ } catch (final Exception e) {
LOG.warn("Exception in check(). Message[{0}]", e.getMessage(), e);
throw convertException(e);
}
- final String newId = actionData.get(LauncherAMUtils.ACTION_DATA_NEW_ID);
+ final String newJobId = findNewHadoopJobId(context, action);
// check the Hadoop job if newID is defined (which should be the case here) - otherwise perform the normal check()
- if (newId != null) {
+ if (newJobId != null) {
boolean jobCompleted;
JobClient jobClient = null;
boolean exception = false;
try {
jobClient = createJobClient(context, new JobConf(jobConf));
- RunningJob runningJob = jobClient.getJob(JobID.forName(newId));
+ final JobID jobid = JobID.forName(newJobId);
+ final RunningJob runningJob = jobClient.getJob(jobid);
if (runningJob == null) {
context.setExternalStatus(FAILED);
throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017",
- "Unknown hadoop job [{0}] associated with action [{1}]. Failing this action!", newId,
+ "Unknown hadoop job [{0}] associated with action [{1}]. Failing this action!", newJobId,
action.getId());
}
@@ -396,7 +417,7 @@ public class MapReduceActionExecutor extends JavaActionExecutor {
super.check(context, action);
} else {
context.setExternalStatus(RUNNING);
- String externalAppId = TypeConverter.toYarn(JobID.forName(newId)).getAppId().toString();
+ final String externalAppId = TypeConverter.toYarn(JobID.forName(newJobId)).getAppId().toString();
context.setExternalChildIDs(externalAppId);
}
} else {
@@ -409,4 +430,254 @@ public class MapReduceActionExecutor extends JavaActionExecutor {
injectCallback(context, actionConf);
}
+ private String findNewHadoopJobId(final Context context, final WorkflowAction action) throws ActionExecutorException {
+ try {
+ final Configuration jobConf = createJobConfFromActionConf(context, action);
+
+ return new HadoopJobIdFinder(jobConf, context).find();
+ } catch (final HadoopAccessorException | IOException | JDOMException | URISyntaxException | InterruptedException |
+ NoSuchAlgorithmException e) {
+ LOG.warn("Exception while trying to find new Hadoop job id(). Message[{0}]", e.getMessage(), e);
+ throw convertException(e);
+ }
+ }
+
+ private Configuration createJobConfFromActionConf(final Context context, final WorkflowAction action)
+ throws JDOMException, NoSuchAlgorithmException {
+ final Element actionXml = XmlUtils.parseXml(action.getConf());
+ final Configuration jobConf = createBaseHadoopConf(context, actionXml);
+
+ final String launcherTag = getActionYarnTag(context, action);
+ jobConf.set(CHILD_MAPREDUCE_JOB_TAGS, LauncherHelper.getTag(launcherTag));
+
+ return jobConf;
+ }
+
+ /**
+ * Find YARN application ID only for {@link MapReduceActionExecutor} delegating to {@link YarnApplicationIdFinder}.
+ * @param context the execution context
+ * @param action the workflow action
+ * @return the YARN application ID as a {@code String}
+ * @throws ActionExecutorException when the YARN application ID could not be found
+ */
+ @Override
+ protected String findYarnApplicationId(final Context context, final WorkflowAction action) throws ActionExecutorException {
+ try {
+ final Configuration jobConf = createJobConfFromActionConf(context, action);
+ final HadoopJobIdFinder hadoopJobIdFinder = new HadoopJobIdFinder(jobConf, context);
+
+ return new YarnApplicationIdFinder(hadoopJobIdFinder,
+ new YarnApplicationReportReader(jobConf), (WorkflowActionBean) action).find();
+ }
+ catch (final IOException | HadoopAccessorException | JDOMException | InterruptedException | URISyntaxException |
+ NoSuchAlgorithmException e) {
+ LOG.warn("Exception while finding YARN application id. Message[{0}]", e.getMessage(), e);
+ throw convertException(e);
+ }
+ }
+
+ /**
+ * Finds a Hadoop job ID based on {@code action-data.seq} file stored on HDFS by {@link MapReduceMain}.
+ */
+ @VisibleForTesting
+ static class HadoopJobIdFinder {
+ private final Configuration jobConf;
+ private final Context executorContext;
+
+ HadoopJobIdFinder(final Configuration jobConf, final Context executorContext) {
+ this.jobConf = jobConf;
+ this.executorContext = executorContext;
+ }
+
+ String find() throws HadoopAccessorException, IOException, URISyntaxException, InterruptedException {
+ final FileSystem actionFs = executorContext.getAppFileSystem();
+ final Path actionDir = executorContext.getActionDir();
+ final Map<String, String> actionData = LauncherHelper.getActionData(actionFs, actionDir, jobConf);
+
+ return actionData.get(LauncherAMUtils.ACTION_DATA_NEW_ID);
+ }
+ }
+
+ /**
+ * Find YARN application ID in three stages:
+ * <ul>
+ * <li>based on {@code action-data.seq} written by {@link MapReduceMain}, if already present. If present and is not the
+ * Oozie Launcher's application ID ({@link WorkflowAction#getExternalId()}), gets used. Else, fall back to following:</li>
+ * <li>if not found, look up the appropriate YARN child ID</li>
+ * <li>if an appropriate YARN application ID is not found, go with Oozie Launcher's application ID
+ * ({@link WorkflowAction#getExternalId()})</li>
+ * </ul>
+ */
+ @VisibleForTesting
+ static class YarnApplicationIdFinder {
+ private static final XLog LOG = XLog.getLog(YarnApplicationIdFinder.class);
+
+ private final HadoopJobIdFinder hadoopJobIdFinder;
+ private final YarnApplicationReportReader reader;
+ private final WorkflowActionBean workflowActionBean;
+
+ YarnApplicationIdFinder(final HadoopJobIdFinder hadoopJobIdFinder,
+ final YarnApplicationReportReader reader,
+ final WorkflowActionBean workflowActionBean) {
+ this.hadoopJobIdFinder = hadoopJobIdFinder;
+ this.reader = reader;
+ this.workflowActionBean = workflowActionBean;
+ }
+
+ String find() throws IOException, HadoopAccessorException, URISyntaxException, InterruptedException {
+ final String newJobId = hadoopJobIdFinder.find();
+ if (Strings.isNullOrEmpty(newJobId) && !isHadoopJobId(newJobId)) {
+ LOG.trace("Is not a Hadoop Job Id, falling back.");
+ return fallbackToYarnChildOrExternalId();
+ }
+
+ final String effectiveApplicationId;
+ final String newApplicationId = TypeConverter.toYarn(JobID.forName(newJobId)).getAppId().toString();
+
+ if (workflowActionBean.getExternalId().equals(newApplicationId) || newApplicationId == null) {
+ LOG.trace("New YARN application ID {0} is empty or is the same as {1}, falling back.",
+ newApplicationId, workflowActionBean.getExternalId());
+ effectiveApplicationId = fallbackToYarnChildOrExternalId();
+ }
+ else {
+ LOG.trace("New YARN application ID {0} is different, using it.", newApplicationId);
+ effectiveApplicationId = newApplicationId;
+ }
+
+ return effectiveApplicationId;
+ }
+
+ /**
+ * When a Hadoop could not be found, fall back finding the YARN child application ID, or the workflow's {@code externalId}:
+ * <ul>
+ * <li>look for YARN children of the actual {@code WorkflowActionBean}</li>
+ * <li>filter for type {@code MAPREDUCE}. Note that those will be the YARN application children of the original
+ * {@code Oozie Launcher} type. Filter also for the ones not in YARN applications' terminal states. What remains is the
+ * one we call YARN child ID</li>
+ * <li>if not found, go with {@link WorkflowActionBean#externalId}</li>
+ * <li>if the found one is not newer than the one already stored, go with {@link WorkflowActionBean#externalId}</li>
+ * <li>if found and there is no {@link WorkflowActionBean#externalId}, go with the YARN child ID</li>
+ * <li>else, go with the YARN child ID</li>
+ * </ul>
+ * @return the YARN child application's ID, or the workflow action's external ID
+ */
+ private String fallbackToYarnChildOrExternalId() {
+ final List<ApplicationReport> childYarnApplications = reader.read();
+ childYarnApplications.removeIf(new Predicate<ApplicationReport>() {
+ @Override
+ public boolean test(ApplicationReport applicationReport) {
+ return !applicationReport.getApplicationType().equals(YARN_APPLICATION_TYPE_MAPREDUCE);
+ }
+ });
+
+ if (childYarnApplications.isEmpty()) {
+ LOG.trace("No child YARN applications present, returning {0} instead", workflowActionBean.getExternalId());
+ return workflowActionBean.getExternalId();
+ }
+
+ final String yarnChildId = getLastYarnId(childYarnApplications);
+
+ if (Strings.isNullOrEmpty(yarnChildId)) {
+ LOG.trace("yarnChildId is empty, returning {0} instead", workflowActionBean.getExternalId());
+ return workflowActionBean.getExternalId();
+ }
+
+ if (Strings.isNullOrEmpty(workflowActionBean.getExternalId())) {
+ LOG.trace("workflowActionBean.externalId is empty, returning {0} instead", yarnChildId);
+ return yarnChildId;
+ }
+
+ if (new YarnApplicationIdComparator().compare(yarnChildId, workflowActionBean.getExternalId()) > 0) {
+ LOG.trace("yarnChildId is newer, returning {0}", yarnChildId);
+ return yarnChildId;
+ }
+
+ LOG.trace("yarnChildId is not newer, returning {0}", workflowActionBean.getExternalId());
+ return workflowActionBean.getExternalId();
+ }
+
+ /**
+ * Get the biggest YARN application ID given {@link YarnApplicationIdComparator}.
+ * @param yarnApplications the YARN application reports
+ * @return the biggest {@link ApplicationReport#getApplicationId()#toString()}
+ */
+ @VisibleForTesting
+ protected String getLastYarnId(final List<ApplicationReport> yarnApplications) {
+ Preconditions.checkNotNull(yarnApplications, "YARN application list should be filled");
+ Preconditions.checkArgument(!yarnApplications.isEmpty(), "no YARN applications in the list");
+
+ final Iterable<String> unorderedApplicationIds =
+ Iterables.transform(yarnApplications, new Function<ApplicationReport, String>() {
+ @Override
+ public String apply(final ApplicationReport input) {
+ Preconditions.checkNotNull(input, "YARN application should be filled");
+ return input.getApplicationId().toString();
+ }
+ });
+
+ return Ordering.from(new YarnApplicationIdComparator()).max(unorderedApplicationIds);
+ }
+
+ private boolean isHadoopJobId(final String jobIdCandidate) {
+ try {
+ return JobID.forName(jobIdCandidate) != null;
+ } catch (final IllegalArgumentException e) {
+ LOG.warn("Job ID candidate is not a Hadoop Job ID.", e);
+ return false;
+ }
+ }
+ }
+
+ /**
+ * Compares two YARN application IDs in the sense:
+ * <ul>
+ * <li>originating from different cluster timestamps the one with the bigger timestamp is considered greater</li>
+ * <li>originating from the same cluster timestamp the one with the higher sequence number is considered greater</li>
+ * <li>originating from the same cluster timestamp and with the same sequence number both are considered equal</li>
+ * </ul>
+ */
+ @VisibleForTesting
+ @SuppressFBWarnings(value = "SE_COMPARATOR_SHOULD_BE_SERIALIZABLE", justification = "instances will never be serialized")
+ static class YarnApplicationIdComparator implements Comparator<String> {
+ private static final String PREFIX = "application_";
+ private static final String SEPARATOR = "_";
+
+ @Override
+ public int compare(final String left, final String right) {
+ // Let's say two application IDs with different cluster timestamps are equal
+ final int middleLongPartComparisonResult = Long.compare(getMiddleLongPart(left), getMiddleLongPart(right));
+ if (middleLongPartComparisonResult != 0) {
+ return middleLongPartComparisonResult;
+ }
+
+ // Else we compare the sequence number
+ return Integer.compare(getLastIntegerPart(left), getLastIntegerPart(right));
+ }
+
+ private long getMiddleLongPart(final String applicationId) {
+ return Long.parseLong(applicationId.substring(applicationId.indexOf(PREFIX) + PREFIX.length(),
+ applicationId.lastIndexOf(SEPARATOR)));
+ }
+
+ private int getLastIntegerPart(final String applicationId) {
+ return Integer.parseInt(applicationId.substring(applicationId.lastIndexOf(SEPARATOR) + SEPARATOR.length()));
+ }
+ }
+
+ /**
+ * Encapsulates call to the static method
+ * {@link LauncherMain#getChildYarnApplications(Configuration, ApplicationsRequestScope, long)} for better testability.
+ */
+ @VisibleForTesting
+ static class YarnApplicationReportReader {
+ private final Configuration jobConf;
+
+ YarnApplicationReportReader(final Configuration jobConf) {
+ this.jobConf = jobConf;
+ }
+
+ List<ApplicationReport> read() {
+ return LauncherMain.getChildYarnApplications(jobConf, ApplicationsRequestScope.OWN, 0L);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/d5c4f3b7/core/src/test/java/org/apache/oozie/action/hadoop/ActionExecutorTestCase.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/ActionExecutorTestCase.java b/core/src/test/java/org/apache/oozie/action/hadoop/ActionExecutorTestCase.java
index f39bba2..05511e4 100644
--- a/core/src/test/java/org/apache/oozie/action/hadoop/ActionExecutorTestCase.java
+++ b/core/src/test/java/org/apache/oozie/action/hadoop/ActionExecutorTestCase.java
@@ -21,15 +21,25 @@ package org.apache.oozie.action.hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.oozie.DagELFunctions;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.action.ActionExecutor;
+import org.apache.oozie.action.oozie.JavaSleepAction;
import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.OozieClientException;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.command.wf.KillXCommand;
import org.apache.oozie.service.CallbackService;
import org.apache.oozie.service.ELService;
+import org.apache.oozie.service.HadoopAccessorException;
+import org.apache.oozie.service.HadoopAccessorService;
import org.apache.oozie.service.LiteWorkflowStoreService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.UUIDService;
@@ -47,18 +57,27 @@ import org.apache.oozie.workflow.lite.EndNodeDef;
import org.apache.oozie.workflow.lite.LiteWorkflowApp;
import org.apache.oozie.workflow.lite.StartNodeDef;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileWriter;
import java.io.IOException;
+import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
import java.io.Reader;
import java.io.StringReader;
import java.io.Writer;
+import java.net.URI;
import java.net.URISyntaxException;
+import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
public abstract class ActionExecutorTestCase extends XHCatTestCase {
+ protected static final int JOB_TIMEOUT = 100_000;
@Override
protected void setUp() throws Exception {
@@ -325,4 +344,155 @@ public abstract class ActionExecutorTestCase extends XHCatTestCase {
writer.close();
}
+ protected void writeToFile(final String appXml, final String appPath) throws IOException {
+ final File wf = new File(URI.create(appPath));
+ PrintWriter out = null;
+ try {
+ out = new PrintWriter(new FileWriter(wf));
+ out.println(appXml);
+ }
+ catch (final IOException iex) {
+ throw iex;
+ }
+ finally {
+ if (out != null) {
+ out.close();
+ }
+ }
+ }
+
+ protected String submitWorkflow(final String workflowUri, final OozieClient wfClient) throws OozieClientException {
+ final Properties conf = wfClient.createConfiguration();
+ conf.setProperty(OozieClient.APP_PATH, workflowUri);
+ conf.setProperty(OozieClient.USER_NAME, getTestUser());
+ conf.setProperty("appName", "var-app-name");
+
+ final String jobId = wfClient.submit(conf);
+ wfClient.start(jobId);
+
+ return jobId;
+ }
+
+ protected ApplicationId getChildMRJobApplicationId(final Configuration conf) throws IOException {
+ final List<ApplicationId> applicationIdList = new ArrayList<>();
+ final Path inputDir = new Path(getFsTestCaseDir(), "input");
+ final Path wfIDFile = new Path(inputDir, LauncherMainTester.JOB_ID_FILE_NAME);
+ final FileSystem fs = FileSystem.get(conf);
+
+ // wait until we have the running child MR job's ID from HDFS
+ waitFor(JOB_TIMEOUT, new ApplicationIdExistsPredicate(fs, wfIDFile));
+ if (!fs.exists(wfIDFile) || !fs.isFile(wfIDFile)) {
+ throw new IOException("Workflow ID file does not exist: " + wfIDFile.toString());
+ }
+
+ try (final BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(wfIDFile)))) {
+ final String line = reader.readLine();
+ JobID.forName(line);
+ final String jobID = line;
+ final String appID = jobID.replace("job", "application");
+ final ApplicationId id = ConverterUtils.toApplicationId(appID);
+ applicationIdList.add(id);
+ }
+
+ assertTrue("Application ID should've been found. No external Child ID was found in " + wfIDFile.toString(),
+ applicationIdList.size() == 1);
+
+ return applicationIdList.get(0);
+ }
+
+ private static class ApplicationIdExistsPredicate implements Predicate {
+ private final FileSystem fs;
+ private final Path wfIDFile;
+
+ ApplicationIdExistsPredicate(final FileSystem fs, final Path wfIDFile) {
+ this.fs = fs;
+ this.wfIDFile = wfIDFile;
+ }
+
+ @Override
+ public boolean evaluate() throws Exception {
+ return fs.exists(wfIDFile) && fs.getFileStatus(wfIDFile).getLen() > 0;
+ }
+ }
+
+ protected static class WorkflowActionRunningPredicate extends WorkflowActionStatusPredicate {
+ WorkflowActionRunningPredicate(final OozieClient wfClient, final String jobId) {
+ super(wfClient, jobId, WorkflowJob.Status.RUNNING, WorkflowAction.Status.RUNNING);
+ }
+ }
+
+ protected static class WorkflowActionKilledPredicate extends WorkflowActionStatusPredicate {
+ WorkflowActionKilledPredicate(final OozieClient wfClient, final String jobId) {
+ super(wfClient, jobId, WorkflowJob.Status.KILLED, WorkflowAction.Status.KILLED);
+ }
+ }
+
+ private static abstract class WorkflowActionStatusPredicate implements Predicate {
+ private final OozieClient wfClient;
+ private final String jobId;
+ private final WorkflowJob.Status expectedWorkflowJobStatus;
+ private final WorkflowAction.Status expectedWorkflowActionStatus;
+
+ WorkflowActionStatusPredicate(final OozieClient wfClient,
+ final String jobId,
+ final WorkflowJob.Status expectedWorkflowJobStatus,
+ final WorkflowAction.Status expectedWorkflowActionStatus) {
+ this.wfClient = wfClient;
+ this.jobId = jobId;
+ this.expectedWorkflowJobStatus = expectedWorkflowJobStatus;
+ this.expectedWorkflowActionStatus = expectedWorkflowActionStatus;
+ }
+
+ @Override
+ public boolean evaluate() throws Exception {
+ final WorkflowJob.Status actualWorkflowJobStatus = wfClient.getJobInfo(jobId).getStatus();
+ final boolean isWorkflowInState = actualWorkflowJobStatus.equals(expectedWorkflowJobStatus);
+
+ final WorkflowAction.Status actualWorkflowActionStatus = wfClient.getJobInfo(jobId).getActions().get(1).getStatus();
+ final boolean isWorkflowActionInState = actualWorkflowActionStatus.equals(expectedWorkflowActionStatus);
+
+ return isWorkflowInState && isWorkflowActionInState;
+ }
+ }
+
+ protected void killWorkflow(final String jobId) throws CommandException {
+ new KillXCommand(jobId).call();
+ }
+
+ protected void waitForWorkflowToStart(final OozieClient wfClient, final String jobId) {
+ waitFor(JOB_TIMEOUT, new WorkflowActionRunningPredicate(wfClient,jobId));
+ }
+
+ protected void waitForWorkflowToKill(final OozieClient wfClient, final String jobId) {
+ waitFor(JOB_TIMEOUT, new WorkflowActionKilledPredicate(wfClient,jobId));
+ }
+
+ protected String getJavaAction(final boolean launchMRAction) {
+ final Path inputDir = new Path(getFsTestCaseDir(), "input");
+ final Path outputDir = new Path(getFsTestCaseDir(), "output");
+ final String javaActionXml = "<java>" +
+ "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
+ "<name-node>" + getNameNodeUri() + "</name-node>" +
+ "<main-class>" + JavaSleepAction.class.getName()+ "</main-class>" +
+ "</java>";
+ final String javaWithMRActionXml = "<java>" +
+ "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
+ "<name-node>" + getNameNodeUri() + "</name-node>" +
+ "<main-class>" + LauncherMainTester.class.getName()+ "</main-class>" +
+ "<arg>javamapreduce</arg>" +
+ "<arg>"+inputDir.toString()+"</arg>" +
+ "<arg>"+outputDir.toString()+"</arg>" +
+ "</java>";
+
+ return launchMRAction ? javaWithMRActionXml : javaActionXml;
+ }
+
+ void killYarnApplication(final Configuration configuration, final ApplicationId yarnApplicationId)
+ throws HadoopAccessorException, IOException, YarnException {
+ getHadoopAccessorService().createYarnClient(getTestUser(), configuration).killApplication(yarnApplicationId);
+ }
+
+ HadoopAccessorService getHadoopAccessorService() {
+ return Services.get().get(HadoopAccessorService.class);
+ }
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/d5c4f3b7/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
index a31079a..784dc96 100644
--- a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
+++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
@@ -2616,11 +2616,6 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase {
waitUntilYarnAppDoneAndAssertSuccess(applicationId);
}
- private HadoopAccessorService getHadoopAccessorService() {
- return Services.get().get(HadoopAccessorService.class);
- }
-
-
public void testChildKill() throws Exception {
final JobConf clusterConf = createJobConf();
FileSystem fileSystem = FileSystem.get(clusterConf);
http://git-wip-us.apache.org/repos/asf/oozie/blob/d5c4f3b7/core/src/test/java/org/apache/oozie/action/hadoop/TestYarnApplicationIdComparator.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestYarnApplicationIdComparator.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestYarnApplicationIdComparator.java
new file mode 100644
index 0000000..aabb363
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestYarnApplicationIdComparator.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.action.hadoop;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestYarnApplicationIdComparator {
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ private MapReduceActionExecutor.YarnApplicationIdComparator comparator;
+
+ @Before
+ public void setUp() {
+ this.comparator = new MapReduceActionExecutor.YarnApplicationIdComparator();
+ }
+
+ @Test
+ public void whenWrongParametersGivenExceptionIsThrown() {
+ expectedException.expect(NullPointerException.class);
+ comparator.compare(null, null);
+
+ expectedException.expect(NumberFormatException.class);
+ comparator.compare("application_a_b", "application_c_d");
+
+ expectedException.expect(IndexOutOfBoundsException.class);
+ comparator.compare("a_b_c", "d_e_f");
+ }
+
+ @Test
+ public void whenDifferentTimestampsLeftEqualsRight() {
+ assertEquals("cluster timestamps are different, the one with bigger timestamp wins",
+ -1,
+ comparator.compare("application_1534164756526_0001", "application_1534164756527_0002"));
+ }
+
+ @Test
+ public void whenSameTimestampsGreaterSequenceWins() {
+ assertEquals("cluster timestamps are the same but sequences are different, left should be greater than right",
+ 1,
+ comparator.compare("application_1534164756526_0002", "application_1534164756526_0001"));
+ }
+
+ @Test
+ public void whenSameTimestampsAndSameSequencesLeftEqualsRight() {
+ assertEquals("cluster timestamps and sequences are the same, left should equal right",
+ 0,
+ comparator.compare("application_1534164756526_0001", "application_1534164756526_0001"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/d5c4f3b7/core/src/test/java/org/apache/oozie/action/hadoop/TestYarnApplicationIdFinder.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestYarnApplicationIdFinder.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestYarnApplicationIdFinder.java
new file mode 100644
index 0000000..3fd7149
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestYarnApplicationIdFinder.java
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.action.hadoop;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.service.HadoopAccessorException;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestYarnApplicationIdFinder {
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ @Mock
+ private MapReduceActionExecutor.HadoopJobIdFinder hadoopJobIdFinder;
+
+ @Mock
+ private MapReduceActionExecutor.YarnApplicationReportReader reader;
+
+ @Mock
+ private WorkflowActionBean workflowActionBean;
+
+ @Mock
+ private ApplicationReport applicationReport;
+
+ @Mock
+ private ApplicationId applicationId;
+
+ private MapReduceActionExecutor.YarnApplicationIdFinder yarnApplicationIdFinder;
+
+ @Before
+ public void setUp() throws Exception {
+ yarnApplicationIdFinder = new MapReduceActionExecutor.YarnApplicationIdFinder(hadoopJobIdFinder,
+ reader, workflowActionBean);
+ }
+
+ @Test
+ public void whenHadoopJobIdAndChildYarnApplicationAreNotPresentActionExternalIdIsFound() throws Exception {
+ when(hadoopJobIdFinder.find()).thenReturn(null);
+ when(reader.read()).thenReturn(Collections.emptyList());
+ when(workflowActionBean.getExternalId()).thenReturn("application_1534164756526_0000");
+
+ assertEquals("no Hadoop Job ID nor YARN applications: WorkflowActionBean.externalId should be found",
+ "application_1534164756526_0000",
+ yarnApplicationIdFinder.find());
+
+ when(applicationReport.getApplicationType()).thenReturn("Oozie Launcher");
+ when(applicationReport.getApplicationId()).thenReturn(applicationId);
+ when(applicationId.toString()).thenReturn("application_1534164756526_0001");
+ when(reader.read()).thenReturn(Lists.newArrayList(applicationReport));
+
+ assertEquals(
+ "no Hadoop Job ID nor YARN applications of MAPREDUCE type: WorkflowActionBean.externalId should be found",
+ "application_1534164756526_0000",
+ yarnApplicationIdFinder.find());
+
+ when(applicationReport.getApplicationType()).thenReturn("MAPREDUCE");
+ when(workflowActionBean.getWfId()).thenReturn("workflowId");
+
+ assertEquals(
+ "no Hadoop Job ID nor YARN applications of the same workflow: WorkflowActionBean.externalId should be found",
+ "application_1534164756526_0000",
+ yarnApplicationIdFinder.find());
+ }
+
+ @Test
+ public void whenHadoopJobIdIsNotCorrectExceptionIsThrown() throws Exception {
+ when(hadoopJobIdFinder.find()).thenReturn("notAHadoopJobId");
+ expectedException.expect(IllegalArgumentException.class);
+
+ yarnApplicationIdFinder.find();
+ }
+
+ @Test
+ public void whenHadoopJobIdIsNotPresentChildYarnApplicationIdIsFound() throws Exception {
+ when(hadoopJobIdFinder.find()).thenReturn(null);
+ when(applicationReport.getApplicationType()).thenReturn("MAPREDUCE");
+ when(workflowActionBean.getWfId()).thenReturn("workflowId");
+ when(applicationReport.getYarnApplicationState()).thenReturn(YarnApplicationState.RUNNING);
+ when(applicationId.toString()).thenReturn("application_1534164756526_0000");
+ when(applicationReport.getApplicationId()).thenReturn(applicationId);
+ when(reader.read()).thenReturn(Lists.newArrayList(applicationReport));
+
+ assertEquals("no Hadoop Job ID, but an appropriate YARN application: applicationId should be found",
+ "application_1534164756526_0000",
+ yarnApplicationIdFinder.find());
+ }
+
+ @Test
+ public void whenHadoopJobIsNotPresentAsYarnApplicationHadoopJobIdIsUsed() throws Exception {
+ setupMocks("job_1534164756526_0002", "application_1534164756526_0000", "application_1534164756526_0001");
+
+ assertEquals("Hadoop Job ID should be found when it is not present as a YARN application ID",
+ "application_1534164756526_0002",
+ yarnApplicationIdFinder.find());
+ }
+
+ private void setupMocks(final String mrJobId, final String wfExternalId, final String yarnApplicationId)
+ throws HadoopAccessorException, IOException, URISyntaxException, InterruptedException, YarnException {
+ when(hadoopJobIdFinder.find()).thenReturn(mrJobId);
+ when(applicationReport.getApplicationType()).thenReturn("MAPREDUCE");
+ when(workflowActionBean.getWfId()).thenReturn("workflowId");
+ when(workflowActionBean.getExternalId()).thenReturn(wfExternalId);
+ when(applicationReport.getYarnApplicationState()).thenReturn(YarnApplicationState.RUNNING);
+ when(applicationId.toString()).thenReturn(yarnApplicationId);
+ when(applicationReport.getApplicationId()).thenReturn(applicationId);
+ when(reader.read()).thenReturn(Lists.newArrayList(applicationReport));
+ }
+
+ @Test
+ public void whenHadoopJobIsPresentAsYarnApplicationAndDifferentFromItsUsed() throws Exception {
+ setupMocks("job_1534164756526_0002", "application_1534164756526_0001", "application_1534164756526_0003");
+
+ assertEquals("Hadoop Job ID should be found when different from the YARN application ID",
+ "application_1534164756526_0002",
+ yarnApplicationIdFinder.find());
+ }
+
+ @Test
+ public void whenHadoopJobIsPresentAsYarnApplicationAndContainWorkflowIdNotUsed() throws Exception {
+ setupMocks("job_1534164756526_0002", "application_1534164756526_0002", "application_1534164756526_0003");
+
+ assertEquals("YARN application ID should be found when greater than WorkflowActionBean.externalId",
+ "application_1534164756526_0003",
+ yarnApplicationIdFinder.find());
+ }
+
+ @Test
+ public void whenOldLauncherAndMRobApplicationsAreFinishedAndNewLauncherPresentNewLauncherIsUsed() throws Exception {
+ final ApplicationReport oldLauncher = mock(ApplicationReport.class);
+ when(oldLauncher.getApplicationType()).thenReturn("Oozie Launcher");
+ when(oldLauncher.getYarnApplicationState()).thenReturn(YarnApplicationState.FINISHED);
+ final ApplicationId oldLauncherId = mock(ApplicationId.class);
+ when(oldLauncherId.toString()).thenReturn("application_1534164756526_0001");
+ when(oldLauncher.getApplicationId()).thenReturn(oldLauncherId);
+ final ApplicationReport oldMRJob = mock(ApplicationReport.class);
+ when(oldMRJob.getApplicationType()).thenReturn("MAPREDUCE");
+ when(oldMRJob.getYarnApplicationState()).thenReturn(YarnApplicationState.FINISHED);
+ final ApplicationId oldMRJobId = mock(ApplicationId.class);
+ when(oldMRJobId.toString()).thenReturn("application_1534164756526_0002");
+ when(oldMRJob.getApplicationId()).thenReturn(oldMRJobId);
+ final ApplicationReport newLauncher = mock(ApplicationReport.class);
+ when(newLauncher.getApplicationType()).thenReturn("Oozie Launcher");
+ when(newLauncher.getYarnApplicationState()).thenReturn(YarnApplicationState.FINISHED);
+ final ApplicationId newLauncherId = mock(ApplicationId.class);
+ when(newLauncherId.toString()).thenReturn("application_1534164756526_0003");
+ when(newLauncher.getApplicationId()).thenReturn(newLauncherId);
+ final ApplicationReport newMRJob = mock(ApplicationReport.class);
+ when(newMRJob.getApplicationType()).thenReturn("MAPREDUCE");
+ when(newMRJob.getYarnApplicationState()).thenReturn(YarnApplicationState.RUNNING);
+ final ApplicationId newMRJobId = mock(ApplicationId.class);
+ when(newMRJobId.toString()).thenReturn("application_1534164756526_0004");
+ when(newMRJob.getApplicationId()).thenReturn(newMRJobId);
+ when(reader.read()).thenReturn(Lists.newArrayList(oldLauncher, oldMRJob, newLauncher, newMRJob));
+
+ when(workflowActionBean.getExternalId()).thenReturn("application_1534164756526_0003");
+ assertEquals("newLauncher should be found", "application_1534164756526_0004", yarnApplicationIdFinder.find());
+
+ when(workflowActionBean.getExternalId()).thenReturn("application_1534164756526_0004");
+ assertEquals("newLauncher should be found", "application_1534164756526_0004", yarnApplicationIdFinder.find());
+
+ when(workflowActionBean.getExternalId()).thenReturn("application_1534164756526_0005");
+ assertEquals("workflowActionBean.externalId should be found",
+ "application_1534164756526_0005", yarnApplicationIdFinder.find());
+ }
+
+ @Test
+ public void testGetLastYarnIdOnNullThrows() {
+ expectedException.expect(NullPointerException.class);
+ yarnApplicationIdFinder.getLastYarnId(null);
+ }
+
+ @Test
+ public void testGetLastYarnIdOnEmptyListThrows() {
+ expectedException.expect(IllegalArgumentException.class);
+ yarnApplicationIdFinder.getLastYarnId(Collections.emptyList());
+ }
+
+ @Test
+ public void testGetLastYarnIdOnOneElementSuccess() {
+ when(applicationReport.getApplicationId()).thenReturn(applicationId);
+ when(applicationId.toString()).thenReturn("application_1534164756526_0000");
+
+ final String lastYarnId = yarnApplicationIdFinder.getLastYarnId(Collections.singletonList(applicationReport));
+ assertEquals("last YARN id should be the only element in the list", "application_1534164756526_0000", lastYarnId);
+ }
+
+ @Test
+ public void testGetLastYarnIdFromUnorderedListSuccess() {
+ final ApplicationReport newLauncher = mock(ApplicationReport.class);
+ when(newLauncher.getApplicationType()).thenReturn("Oozie Launcher");
+ when(newLauncher.getYarnApplicationState()).thenReturn(YarnApplicationState.FINISHED);
+ final ApplicationId newLauncherId = mock(ApplicationId.class);
+ when(newLauncherId.toString()).thenReturn("application_1534164756526_0003");
+ when(newLauncher.getApplicationId()).thenReturn(newLauncherId);
+ final ApplicationReport newMRJob = mock(ApplicationReport.class);
+ when(newMRJob.getApplicationType()).thenReturn("MAPREDUCE");
+ when(newMRJob.getYarnApplicationState()).thenReturn(YarnApplicationState.RUNNING);
+ final ApplicationId newMRJobId = mock(ApplicationId.class);
+ when(newMRJobId.toString()).thenReturn("application_1534164756526_0004");
+ when(newMRJob.getApplicationId()).thenReturn(newMRJobId);
+
+ final String lastYarnId = yarnApplicationIdFinder.getLastYarnId(Lists.newArrayList(newMRJob, newLauncher));
+ assertEquals("last YARN id should be the maximal element in the list", "application_1534164756526_0004", lastYarnId);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/oozie/blob/d5c4f3b7/core/src/test/java/org/apache/oozie/action/oozie/TestSubWorkflowActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/action/oozie/TestSubWorkflowActionExecutor.java b/core/src/test/java/org/apache/oozie/action/oozie/TestSubWorkflowActionExecutor.java
index 893405e..9c7f821 100644
--- a/core/src/test/java/org/apache/oozie/action/oozie/TestSubWorkflowActionExecutor.java
+++ b/core/src/test/java/org/apache/oozie/action/oozie/TestSubWorkflowActionExecutor.java
@@ -21,20 +21,14 @@ package org.apache.oozie.action.oozie;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.action.hadoop.ActionExecutorTestCase;
import org.apache.oozie.action.hadoop.LauncherMainTester;
-import org.apache.oozie.action.hadoop.OozieJobInfo;
import org.apache.oozie.client.OozieClient;
-import org.apache.oozie.client.OozieClientException;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.WorkflowJob;
-import org.apache.oozie.command.CommandException;
-import org.apache.oozie.command.wf.KillXCommand;
import org.apache.oozie.command.wf.SuspendXCommand;
import org.apache.oozie.local.LocalOozie;
import org.apache.oozie.service.HadoopAccessorService;
@@ -45,22 +39,13 @@ import org.apache.oozie.util.XConfiguration;
import org.apache.oozie.util.XmlUtils;
import org.jdom.Element;
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileWriter;
import java.io.IOException;
-import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
-import java.io.PrintWriter;
import java.io.StringReader;
import java.io.Writer;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Properties;
public class TestSubWorkflowActionExecutor extends ActionExecutorTestCase {
- private static final int JOB_TIMEOUT = 100 * 1000;
public void testType() {
SubWorkflowActionExecutor subWorkflow = new SubWorkflowActionExecutor();
@@ -506,7 +491,7 @@ public class TestSubWorkflowActionExecutor extends ActionExecutorTestCase {
final OozieClient wfClient = LocalOozie.getClient();
final String jobId = submitWorkflow(workflowUri, wfClient);
- waitForSubWFtoStart(wfClient, jobId);
+ waitForWorkflowToStart(wfClient, jobId);
WorkflowJob wf = wfClient.getJobInfo(jobId);
// Suspending subworkflow
new SuspendXCommand(wf.getActions().get(1).getExternalId()).call();
@@ -529,7 +514,7 @@ public class TestSubWorkflowActionExecutor extends ActionExecutorTestCase {
final String jobId = submitWorkflow(workflowUri, wfClient);
final Configuration conf = Services.get().get(HadoopAccessorService.class).createConfiguration(getJobTrackerUri());
- waitForSubWFtoStart(wfClient, jobId);
+ waitForWorkflowToStart(wfClient, jobId);
final ApplicationId externalChildJobId = getChildMRJobApplicationId(conf);
killWorkflow(jobId);
@@ -540,70 +525,11 @@ public class TestSubWorkflowActionExecutor extends ActionExecutorTestCase {
}
- private void killWorkflow(String jobId) throws CommandException {
- new KillXCommand(jobId).call();
- }
-
- private ApplicationId getChildMRJobApplicationId(Configuration conf) throws IOException {
- final List<ApplicationId> applicationIdList = new ArrayList<>();
- final Path inputDir = new Path(getFsTestCaseDir(), "input");
- final Path wfIDFile = new Path(inputDir, LauncherMainTester.JOB_ID_FILE_NAME);
- final FileSystem fs = FileSystem.get(conf);
-
- // wait until we have the running child MR job's ID from HDFS
- waitFor(JOB_TIMEOUT, new ApplicationIdExistsPredicate(fs, wfIDFile));
-
- try (BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(wfIDFile)))) {
- String line = reader.readLine();
- JobID.forName(line);
- String jobID = line;
- String appID = jobID.replace("job", "application");
- ApplicationId id = ConverterUtils.toApplicationId(appID);
- applicationIdList.add(id);
- }
-
- assertTrue("Application ID should've been found. No external Child ID was found in " + wfIDFile.toString(),
- applicationIdList.size() == 1);
- return applicationIdList.get(0);
- }
-
- private void waitForSubWFtoStart(OozieClient wfClient, String jobId) {
- waitFor(JOB_TIMEOUT, new SubWorkflowActionRunningPredicate(wfClient,jobId));
- }
-
- private String submitWorkflow(String workflowUri, OozieClient wfClient) throws OozieClientException {
- Properties conf = wfClient.createConfiguration();
- conf.setProperty(OozieClient.APP_PATH, workflowUri);
- conf.setProperty(OozieClient.USER_NAME, getTestUser());
- conf.setProperty("appName", "var-app-name");
- final String jobId = wfClient.submit(conf);
- wfClient.start(jobId);
- return jobId;
- }
-
- private void writeToFile(String appXml, String appPath) throws IOException {
- // TODO Auto-generated method stub
- File wf = new File(URI.create(appPath));
- PrintWriter out = null;
- try {
- out = new PrintWriter(new FileWriter(wf));
- out.println(appXml);
- }
- catch (IOException iex) {
- throw iex;
- }
- finally {
- if (out != null) {
- out.close();
- }
- }
- }
-
public String getLazyWorkflow(boolean launchMRAction) {
return "<workflow-app xmlns='uri:oozie:workflow:0.4' name='app'>" +
"<start to='java' />" +
" <action name='java'>" +
- getAction(launchMRAction)
+ getJavaAction(launchMRAction)
+ "<ok to='end' />"
+ "<error to='fail' />"
+ "</action>"
@@ -614,26 +540,6 @@ public class TestSubWorkflowActionExecutor extends ActionExecutorTestCase {
+ "</workflow-app>";
}
- private String getAction(boolean launchMRAction) {
- Path inputDir = new Path(getFsTestCaseDir(), "input");
- Path outputDir = new Path(getFsTestCaseDir(), "output");
- String javaActionXml = "<java>" +
- "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
- "<name-node>" + getNameNodeUri() + "</name-node>" +
- "<main-class>" + JavaSleepAction.class.getName()+ "</main-class>" +
- "</java>";
- String javaWithMRActionXml = "<java>" +
- "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
- "<name-node>" + getNameNodeUri() + "</name-node>" +
- "<main-class>" + LauncherMainTester.class.getName()+ "</main-class>" +
- "<arg>javamapreduce</arg>" +
- "<arg>"+inputDir.toString()+"</arg>" +
- "<arg>"+outputDir.toString()+"</arg>" +
- "</java>";
- String actionXml = launchMRAction ? javaWithMRActionXml : javaActionXml;
- return actionXml;
- }
-
public void testSubWorkflowRerun() throws Exception {
try {
String workflowUri = createSubWorkflowWithLazyAction(false);
@@ -646,7 +552,7 @@ public class TestSubWorkflowActionExecutor extends ActionExecutorTestCase {
final String jobId = wfClient.submit(conf);
wfClient.start(jobId);
- waitForSubWFtoStart(wfClient, jobId);
+ waitForWorkflowToStart(wfClient, jobId);
String subWorkflowExternalId = wfClient.getJobInfo(jobId).getActions().get(1).getExternalId();
wfClient.kill(wfClient.getJobInfo(jobId).getActions().get(1).getExternalId());
@@ -925,38 +831,4 @@ public class TestSubWorkflowActionExecutor extends ActionExecutorTestCase {
+ "<end name='end' />"
+ "</workflow-app>";
}
-
- private static class ApplicationIdExistsPredicate implements Predicate {
-
- private final FileSystem fs;
- private final Path wfIDFile;
-
- public ApplicationIdExistsPredicate(FileSystem fs, Path wfIDFile) {
- this.fs = fs;
- this.wfIDFile = wfIDFile;
- }
-
- @Override
- public boolean evaluate() throws Exception {
- return fs.exists(wfIDFile) && fs.getFileStatus(wfIDFile).getLen() > 0;
- }
- }
-
- private static class SubWorkflowActionRunningPredicate implements Predicate {
- private final OozieClient wfClient;
- private final String jobId;
-
- public SubWorkflowActionRunningPredicate(OozieClient wfClient, String jobId) {
- this.wfClient = wfClient;
- this.jobId = jobId;
- }
-
- @Override
- public boolean evaluate() throws Exception {
- boolean isSubWfRunning = wfClient.getJobInfo(jobId).getStatus() == WorkflowJob.Status.RUNNING;
- boolean isSubWfActionRunning = wfClient.getJobInfo(jobId)
- .getActions().get(1).getStatus() == WorkflowAction.Status.RUNNING;
- return isSubWfRunning && isSubWfActionRunning;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/d5c4f3b7/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 371a60d..b835951 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 5.1.0 release (trunk - unreleased)
+OOZIE-3298 [MapReduce action] External ID is not filled properly and failing MR job is treated as SUCCEEDED (andras.piros via pbacsko, asasvari, gezapeti)
OOZIE-3317 amend [build] Fix false positive precommit reports (kmarton via andras.piros)
OOZIE-3160 PriorityDelayQueue put()/take() can cause significant CPU load due to busy waiting (pbacsko)
OOZIE-2877 Git action (clayb, andras.piros via pbacsko, gezapeti)
http://git-wip-us.apache.org/repos/asf/oozie/blob/d5c4f3b7/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java
index c9e2a91..b6599f7 100644
--- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java
@@ -199,37 +199,9 @@ public abstract class LauncherMain {
public static Set<ApplicationId> getChildYarnJobs(Configuration actionConf, ApplicationsRequestScope scope,
long startTime) {
- Set<ApplicationId> childYarnJobs = new HashSet<ApplicationId>();
- String tag = actionConf.get(CHILD_MAPREDUCE_JOB_TAGS);
- if (tag == null) {
- System.out.print("Could not find YARN tags property " + CHILD_MAPREDUCE_JOB_TAGS);
- return childYarnJobs;
- }
-
- System.out.println("tag id : " + tag);
- GetApplicationsRequest gar = GetApplicationsRequest.newInstance();
- gar.setScope(scope);
- gar.setApplicationTags(Collections.singleton(tag));
- long endTime = System.currentTimeMillis();
- if (startTime > endTime) {
- System.out.println("WARNING: Clock skew between the Oozie server host and this host detected. Please fix this. " +
- "Attempting to work around...");
- // We don't know which one is wrong (relative to the RM), so to be safe, let's assume they're both wrong and add an
- // offset in both directions
- long diff = 2 * (startTime - endTime);
- startTime = startTime - diff;
- endTime = endTime + diff;
- }
- gar.setStartRange(startTime, endTime);
- try {
- ApplicationClientProtocol proxy = ClientRMProxy.createRMProxy(actionConf, ApplicationClientProtocol.class);
- GetApplicationsResponse apps = proxy.getApplications(gar);
- List<ApplicationReport> appsList = apps.getApplicationList();
- for(ApplicationReport appReport : appsList) {
- childYarnJobs.add(appReport.getApplicationId());
- }
- } catch (YarnException | IOException ioe) {
- throw new RuntimeException("Exception occurred while finding child jobs", ioe);
+ final Set<ApplicationId> childYarnJobs = new HashSet<ApplicationId>();
+ for (final ApplicationReport applicationReport : getChildYarnApplications(actionConf, scope, startTime)) {
+ childYarnJobs.add(applicationReport.getApplicationId());
}
if (childYarnJobs.isEmpty()) {
@@ -254,6 +226,39 @@ public abstract class LauncherMain {
return getChildYarnJobs(actionConf, scope, startTime);
}
+ public static List<ApplicationReport> getChildYarnApplications(final Configuration actionConf,
+ final ApplicationsRequestScope scope,
+ long startTime) {
+ final String tag = actionConf.get(CHILD_MAPREDUCE_JOB_TAGS);
+ if (tag == null) {
+ System.out.print("Could not find YARN tags property " + CHILD_MAPREDUCE_JOB_TAGS);
+ return Collections.emptyList();
+ }
+
+ System.out.println("tag id : " + tag);
+ final GetApplicationsRequest gar = GetApplicationsRequest.newInstance();
+ gar.setScope(scope);
+ gar.setApplicationTags(Collections.singleton(tag));
+ long endTime = System.currentTimeMillis();
+ if (startTime > endTime) {
+ System.out.println("WARNING: Clock skew between the Oozie server host and this host detected. Please fix this. " +
+ "Attempting to work around...");
+ // We don't know which one is wrong (relative to the RM), so to be safe, let's assume they're both wrong and add an
+ // offset in both directions
+ final long diff = 2 * (startTime - endTime);
+ startTime = startTime - diff;
+ endTime = endTime + diff;
+ }
+ gar.setStartRange(startTime, endTime);
+ try {
+ final ApplicationClientProtocol proxy = ClientRMProxy.createRMProxy(actionConf, ApplicationClientProtocol.class);
+ final GetApplicationsResponse apps = proxy.getApplications(gar);
+ return apps.getApplicationList();
+ } catch (final YarnException | IOException e) {
+ throw new RuntimeException("Exception occurred while finding child jobs", e);
+ }
+ }
+
public static void killChildYarnJobs(Configuration actionConf) {
try {
Set<ApplicationId> childYarnJobs = getChildYarnJobs(actionConf);
http://git-wip-us.apache.org/repos/asf/oozie/blob/d5c4f3b7/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
----------------------------------------------------------------------
diff --git a/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java b/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
index f460b6b..68e83fa 100644
--- a/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
+++ b/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
@@ -56,9 +56,11 @@ import org.apache.hadoop.streaming.StreamJob;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.action.ActionExecutor;
import org.apache.oozie.action.ActionExecutorException;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.WorkflowAction;
@@ -67,6 +69,8 @@ import org.apache.oozie.command.wf.StartXCommand;
import org.apache.oozie.command.wf.SubmitXCommand;
import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
+import org.apache.oozie.local.LocalOozie;
+import org.apache.oozie.service.HadoopAccessorException;
import org.apache.oozie.service.HadoopAccessorService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.WorkflowAppService;
@@ -74,10 +78,12 @@ import org.apache.oozie.util.ClassUtils;
import org.apache.oozie.util.IOUtils;
import org.apache.oozie.util.PropertiesUtils;
import org.apache.oozie.util.XConfiguration;
+import org.apache.oozie.util.XLog;
import org.apache.oozie.util.XmlUtils;
import org.jdom.Element;
public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
+ private static final XLog LOG = XLog.getLog(TestMapReduceActionExecutor.class);
private static final String PIPES = "pipes";
private static final String MAP_REDUCE = "map-reduce";
@@ -85,7 +91,8 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
@Override
protected void setSystemProps() throws Exception {
super.setSystemProps();
- setSystemProperty("oozie.service.ActionService.executor.classes", MapReduceActionExecutor.class.getName());
+ setSystemProperty("oozie.service.ActionService.executor.classes",
+ String.join(",", MapReduceActionExecutor.class.getName(), JavaActionExecutor.class.getName()));
setSystemProperty("oozie.credentials.credentialclasses", "cred=org.apache.oozie.action.hadoop.CredentialForTest");
}
@@ -803,10 +810,6 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
w.close();
}
- private HadoopAccessorService getHadoopAccessorService() {
- return Services.get().get(HadoopAccessorService.class);
- }
-
public void testMapReduceWithUberJarEnabled() throws Exception {
Services serv = Services.get();
boolean originalUberJarDisabled = serv.getConf().getBoolean("oozie.action.mapreduce.uber.jar.enable", false);
@@ -1306,4 +1309,162 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
});
}
+ public void testFailingMapReduceJobCausesOozieLauncherAMToFail() throws Exception {
+ final String workflowUri = createWorkflowWithMapReduceAction();
+
+ startWorkflowAndFailChildMRJob(workflowUri);
+ }
+
+ private String createWorkflowWithMapReduceAction() throws IOException {
+ final String workflowUri = getTestCaseFileUri("workflow.xml");
+ final String appXml = "<workflow-app xmlns=\"uri:oozie:workflow:1.0\" name=\"workflow\">" +
+ " <start to=\"map-reduce\"/>" +
+ " <action name=\"map-reduce\">" +
+ " <map-reduce>" +
+ " <resource-manager>" + getJobTrackerUri() + "</resource-manager>" +
+ " <name-node>" + getNameNodeUri() + "</name-node>" +
+ " <configuration>\n" +
+ " <property>\n" +
+ " <name>mapred.job.queue.name</name>\n" +
+ " <value>default</value>\n" +
+ " </property>\n" +
+ " <property>\n" +
+ " <name>mapred.mapper.class</name>\n" +
+ " <value>org.apache.oozie.action.hadoop.SleepMapperReducerForTest</value>\n" +
+ " </property>\n" +
+ " <property>\n" +
+ " <name>mapred.reducer.class</name>\n" +
+ " <value>org.apache.oozie.action.hadoop.SleepMapperReducerForTest</value>\n" +
+ " </property>\n" +
+ " <property>\n" +
+ " <name>mapred.input.dir</name>\n" +
+ " <value>" + getFsTestCaseDir() + "/input</value>\n" +
+ " </property>\n" +
+ " <property>\n" +
+ " <name>mapred.output.dir</name>\n" +
+ " <value>" + getFsTestCaseDir() + "/output</value>\n" +
+ " </property>\n" +
+ " </configuration>\n" +
+ " </map-reduce>" +
+ " <ok to=\"end\"/>" +
+ " <error to=\"fail\"/>" +
+ " </action>" +
+ " <kill name=\"fail\">" +
+ " <message>Sub workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>" +
+ " </kill>" +
+ " <end name=\"end\"/>" +
+ "</workflow-app>";
+
+ writeToFile(appXml, workflowUri);
+
+ return workflowUri;
+ }
+
+ private void startWorkflowAndFailChildMRJob(final String workflowUri) throws Exception {
+ try {
+ LocalOozie.start();
+ final OozieClient wfClient = LocalOozie.getClient();
+ final String workflowId = submitWorkflow(workflowUri, wfClient);
+ final Configuration conf = Services.get().get(HadoopAccessorService.class).createConfiguration(getJobTrackerUri());
+
+ final Path inputFolder = createInputFolder(conf);
+
+ waitForWorkflowToStart(wfClient, workflowId);
+ waitForChildYarnApplication(getHadoopAccessorService().createYarnClient(getTestUser(), conf), workflowId);
+ assertAndWriteNextMRJobId(workflowId, conf, inputFolder);
+
+ final ApplicationId externalChildJobId = getChildMRJobApplicationId(conf);
+
+ killYarnApplication(conf, externalChildJobId);
+ waitUntilYarnAppKilledAndAssertSuccess(externalChildJobId.toString());
+ waitForWorkflowToKill(wfClient, workflowId);
+ } finally {
+ LocalOozie.stop();
+ }
+ }
+
+ /**
+ * Get all YARN application IDs, select the one of type {@code MAPREDUCE} that is relevant to {@code workflowId},
+ * and write to {@code inputFolder/jobID.txt}.
+ * <p>
+ * Simulating functional parts of {@link LauncherMain#writeExternalChildIDs(String, Pattern[], String)} in order
+ * {@link MapReduceActionExecutor#check(ActionExecutor.Context, WorkflowAction)} can find it later on the call chain.
+ * <p>
+ * We need to write out an own sequence file to {@link LauncherMainTester#JOB_ID_FILE_NAME} in order
+ * {@link ActionExecutorTestCase#getChildMRJobApplicationId(Configuration)} can find it. We unfortunately cannot rely on the
+ * original sequence file written by {@link LauncherMain#writeExternalChildIDs(String, Pattern[], String)} because we don't own
+ * a reference to the original {@link ActionExecutor.Context} as in {@link MapReduceActionExecutor}.
+ * @param workflowId the workflow ID
+ * @param conf the {@link Configuration} used for Hadoop Common / YARN API calls
+ * @param inputFolder where to write the output text file
+ * @throws IOException when the output text file cannot be written
+ * @throws YarnException when the list of YARN applications cannot be queried
+ * @throws HadoopAccessorException when {@link YarnClient} cannot be created
+ */
+ private void assertAndWriteNextMRJobId(final String workflowId, final Configuration conf, final Path inputFolder)
+ throws IOException, YarnException, HadoopAccessorException {
+ final Path wfIDFile = new Path(inputFolder, LauncherMainTester.JOB_ID_FILE_NAME);
+ try (final FileSystem fs = FileSystem.get(conf);
+ final Writer w = new OutputStreamWriter(fs.create(wfIDFile))) {
+ final List<ApplicationReport> allApplications =
+ getHadoopAccessorService().createYarnClient(getTestUser(), conf).getApplications();
+
+ assertTrue("YARN applications number mismatch", allApplications.size() >= 2);
+
+ ApplicationReport mapReduce = null;
+ for (final ApplicationReport candidate : allApplications) {
+ if (candidate.getApplicationType().equals(MapReduceActionExecutor.YARN_APPLICATION_TYPE_MAPREDUCE)
+ && candidate.getName().contains(workflowId)) {
+ mapReduce = candidate;
+ }
+ }
+ assertNotNull("MAPREDUCE YARN application not found", mapReduce);
+
+ final String applicationId = mapReduce.getApplicationId().toString();
+ final String nextMRJobId = applicationId.replace("application", "job");
+
+ LOG.debug("Writing next MapReduce job ID: {0}", nextMRJobId);
+
+ w.write(nextMRJobId);
+ }
+ }
+
+ private Path createInputFolder(final Configuration conf) throws IOException {
+ final Path inputDir = new Path(getFsTestCaseDir(), "input");
+ try (final FileSystem fs = FileSystem.get(conf)) {
+ fs.mkdirs(inputDir);
+ }
+ return inputDir;
+ }
+
+ private void waitForChildYarnApplication(final YarnClient yarnClient, final String workflowId) {
+ waitFor(JOB_TIMEOUT, new ChildYarnApplicationPresentPredicate(yarnClient, workflowId));
+ }
+
+ private class ChildYarnApplicationPresentPredicate implements Predicate {
+ private final YarnClient yarnClient;
+ private final String workflowId;
+
+ ChildYarnApplicationPresentPredicate(final YarnClient yarnClient, final String workflowId) {
+ this.yarnClient = yarnClient;
+ this.workflowId = workflowId;
+ }
+
+ @Override
+ public boolean evaluate() throws Exception {
+ if (yarnClient.getApplications().isEmpty()) {
+ return false;
+ }
+
+ for (final ApplicationReport applicationReport : yarnClient.getApplications()) {
+ final String name = applicationReport.getName();
+ final String type = applicationReport.getApplicationType();
+ if (type.equals(MapReduceActionExecutor.YARN_APPLICATION_TYPE_MAPREDUCE) && name.contains(workflowId)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+ }
}