You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by ge...@apache.org on 2017/05/26 09:28:01 UTC
[09/10] oozie git commit: OOZIE-1770 Create Oozie Application Master
for YARN (asasvari, pbacsko, rkanter, gezapeti)
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
index 06ae5fd..f4c1127 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
@@ -25,21 +25,22 @@ import java.net.ConnectException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
-import java.security.PrivilegedExceptionAction;
+import java.nio.ByteBuffer;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Objects;
import java.util.Properties;
import java.util.Set;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
+import org.apache.commons.io.IOUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.primitives.Ints;
import org.apache.hadoop.conf.Configuration;
@@ -48,23 +49,41 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.AccessControlException;
-import org.apache.oozie.hadoop.utils.HadoopShims;
+import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.TaskLog;
+import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.DiskChecker;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.Apps;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.action.ActionExecutor;
import org.apache.oozie.action.ActionExecutorException;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.WorkflowAction;
-import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.command.coord.CoordActionStartXCommand;
import org.apache.oozie.service.ConfigurationService;
import org.apache.oozie.service.HadoopAccessorException;
@@ -72,8 +91,8 @@ import org.apache.oozie.service.HadoopAccessorService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.ShareLibService;
import org.apache.oozie.service.URIHandlerService;
-import org.apache.oozie.service.UserGroupInformationService;
import org.apache.oozie.service.WorkflowAppService;
+import org.apache.oozie.util.ClasspathUtils;
import org.apache.oozie.util.ELEvaluationException;
import org.apache.oozie.util.ELEvaluator;
import org.apache.oozie.util.JobUtils;
@@ -86,18 +105,22 @@ import org.jdom.Element;
import org.jdom.JDOMException;
import org.jdom.Namespace;
+import com.google.common.collect.ImmutableList;
+import com.google.common.io.Closeables;
+
public class JavaActionExecutor extends ActionExecutor {
- protected static final String HADOOP_USER = "user.name";
- public static final String HADOOP_JOB_TRACKER = "mapred.job.tracker";
- public static final String HADOOP_JOB_TRACKER_2 = "mapreduce.jobtracker.address";
+ public static final String RUNNING = "RUNNING";
+ public static final String SUCCEEDED = "SUCCEEDED";
+ public static final String KILLED = "KILLED";
+ public static final String FAILED = "FAILED";
+ public static final String FAILED_KILLED = "FAILED/KILLED";
public static final String HADOOP_YARN_RM = "yarn.resourcemanager.address";
public static final String HADOOP_NAME_NODE = "fs.default.name";
- private static final String HADOOP_JOB_NAME = "mapred.job.name";
public static final String OOZIE_COMMON_LIBDIR = "oozie";
- private static final Set<String> DISALLOWED_PROPERTIES = new HashSet<String>();
- public final static String MAX_EXTERNAL_STATS_SIZE = "oozie.external.stats.max.size";
+
+ public static final String MAX_EXTERNAL_STATS_SIZE = "oozie.external.stats.max.size";
public static final String ACL_VIEW_JOB = "mapreduce.job.acl-view-job";
public static final String ACL_MODIFY_JOB = "mapreduce.job.acl-modify-job";
public static final String HADOOP_YARN_TIMELINE_SERVICE_ENABLED = "yarn.timeline-service.enabled";
@@ -111,34 +134,32 @@ public class JavaActionExecutor extends ActionExecutor {
public static final String HADOOP_REDUCE_JAVA_OPTS = "mapreduce.reduce.java.opts";
public static final String HADOOP_CHILD_JAVA_ENV = "mapred.child.env";
public static final String HADOOP_MAP_JAVA_ENV = "mapreduce.map.env";
+ public static final String HADOOP_JOB_CLASSLOADER = "mapreduce.job.classloader";
+ public static final String HADOOP_USER_CLASSPATH_FIRST = "mapreduce.user.classpath.first";
+ public static final String OOZIE_CREDENTIALS_SKIP = "oozie.credentials.skip";
public static final String YARN_AM_RESOURCE_MB = "yarn.app.mapreduce.am.resource.mb";
public static final String YARN_AM_COMMAND_OPTS = "yarn.app.mapreduce.am.command-opts";
public static final String YARN_AM_ENV = "yarn.app.mapreduce.am.env";
- private static final String JAVA_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.JavaMain";
public static final int YARN_MEMORY_MB_MIN = 512;
+
+ private static final String JAVA_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.JavaMain";
+ private static final String HADOOP_JOB_NAME = "mapred.job.name";
+ private static final Set<String> DISALLOWED_PROPERTIES = new HashSet<String>();
+
private static int maxActionOutputLen;
private static int maxExternalStatsSize;
private static int maxFSGlobMax;
- private static final String SUCCEEDED = "SUCCEEDED";
- private static final String KILLED = "KILLED";
- private static final String FAILED = "FAILED";
- private static final String FAILED_KILLED = "FAILED/KILLED";
+
+ protected static final String HADOOP_USER = "user.name";
+
protected XLog LOG = XLog.getLog(getClass());
- private static final Pattern heapPattern = Pattern.compile("-Xmx(([0-9]+)[mMgG])");
private static final String JAVA_TMP_DIR_SETTINGS = "-Djava.io.tmpdir=";
- public static final String CONF_HADOOP_YARN_UBER_MODE = OOZIE_ACTION_LAUNCHER_PREFIX + HADOOP_YARN_UBER_MODE;
- public static final String HADOOP_JOB_CLASSLOADER = "mapreduce.job.classloader";
- public static final String HADOOP_USER_CLASSPATH_FIRST = "mapreduce.user.classpath.first";
- public static final String OOZIE_CREDENTIALS_SKIP = "oozie.credentials.skip";
- private static final LauncherInputFormatClassLocator launcherInputFormatClassLocator = new LauncherInputFormatClassLocator();
public XConfiguration workflowConf = null;
static {
DISALLOWED_PROPERTIES.add(HADOOP_USER);
- DISALLOWED_PROPERTIES.add(HADOOP_JOB_TRACKER);
DISALLOWED_PROPERTIES.add(HADOOP_NAME_NODE);
- DISALLOWED_PROPERTIES.add(HADOOP_JOB_TRACKER_2);
DISALLOWED_PROPERTIES.add(HADOOP_YARN_RM);
}
@@ -150,20 +171,17 @@ public class JavaActionExecutor extends ActionExecutor {
super(type);
}
- public static List<Class> getCommonLauncherClasses() {
- List<Class> classes = new ArrayList<Class>();
- classes.add(LauncherMapper.class);
- classes.add(launcherInputFormatClassLocator.locateOrGet());
- classes.add(OozieLauncherOutputFormat.class);
- classes.add(OozieLauncherOutputCommitter.class);
- classes.add(LauncherMainHadoopUtils.class);
- classes.add(HadoopShims.class);
+ public static List<Class<?>> getCommonLauncherClasses() {
+ List<Class<?>> classes = new ArrayList<Class<?>>();
+ classes.add(LauncherMain.class);
classes.addAll(Services.get().get(URIHandlerService.class).getClassesForLauncher());
+ classes.add(LauncherAM.class);
+ classes.add(LauncherAMCallbackNotifier.class);
return classes;
}
- public List<Class> getLauncherClasses() {
- List<Class> classes = new ArrayList<Class>();
+ public List<Class<?>> getLauncherClasses() {
+ List<Class<?>> classes = new ArrayList<Class<?>>();
try {
classes.add(Class.forName(JAVA_MAIN_CLASS_NAME));
}
@@ -176,7 +194,7 @@ public class JavaActionExecutor extends ActionExecutor {
@Override
public void initActionType() {
super.initActionType();
- maxActionOutputLen = ConfigurationService.getInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA);
+ maxActionOutputLen = ConfigurationService.getInt(LauncherAM.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA);
//Get the limit for the maximum allowed size of action stats
maxExternalStatsSize = ConfigurationService.getInt(JavaActionExecutor.MAX_EXTERNAL_STATS_SIZE);
maxExternalStatsSize = (maxExternalStatsSize == -1) ? Integer.MAX_VALUE : maxExternalStatsSize;
@@ -217,31 +235,32 @@ public class JavaActionExecutor extends ActionExecutor {
}
}
- public JobConf createBaseHadoopConf(Context context, Element actionXml) {
+ public Configuration createBaseHadoopConf(Context context, Element actionXml) {
return createBaseHadoopConf(context, actionXml, true);
}
- protected JobConf createBaseHadoopConf(Context context, Element actionXml, boolean loadResources) {
+ protected Configuration createBaseHadoopConf(Context context, Element actionXml, boolean loadResources) {
+
Namespace ns = actionXml.getNamespace();
String jobTracker = actionXml.getChild("job-tracker", ns).getTextTrim();
String nameNode = actionXml.getChild("name-node", ns).getTextTrim();
- JobConf conf = null;
+ Configuration conf = null;
if (loadResources) {
conf = Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker);
}
else {
- conf = new JobConf(false);
+ conf = new Configuration(false);
}
+
conf.set(HADOOP_USER, context.getProtoActionConf().get(WorkflowAppService.HADOOP_USER));
- conf.set(HADOOP_JOB_TRACKER, jobTracker);
- conf.set(HADOOP_JOB_TRACKER_2, jobTracker);
conf.set(HADOOP_YARN_RM, jobTracker);
conf.set(HADOOP_NAME_NODE, nameNode);
conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "true");
+
return conf;
}
- protected JobConf loadHadoopDefaultResources(Context context, Element actionXml) {
+ protected Configuration loadHadoopDefaultResources(Context context, Element actionXml) {
return createBaseHadoopConf(context, actionXml);
}
@@ -266,7 +285,7 @@ public class JavaActionExecutor extends ActionExecutor {
XConfiguration launcherConf = new XConfiguration();
// Inject action defaults for launcher
HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
- XConfiguration actionDefaultConf = has.createActionDefaultConf(conf.get(HADOOP_JOB_TRACKER), getType());
+ XConfiguration actionDefaultConf = has.createActionDefaultConf(conf.get(HADOOP_YARN_RM), getType());
injectLauncherProperties(actionDefaultConf, launcherConf);
// Inject <job-xml> and <configuration> for launcher
try {
@@ -276,15 +295,8 @@ public class JavaActionExecutor extends ActionExecutor {
} catch (URISyntaxException ex) {
throw convertException(ex);
}
- // Inject use uber mode for launcher
- injectLauncherUseUberMode(launcherConf);
XConfiguration.copy(launcherConf, conf);
checkForDisallowedProps(launcherConf, "launcher configuration");
- // Inject config-class for launcher to use for action
- Element e = actionXml.getChild("config-class", ns);
- if (e != null) {
- conf.set(LauncherMapper.OOZIE_ACTION_CONFIG_CLASS, e.getTextTrim());
- }
return conf;
}
catch (IOException ex) {
@@ -292,25 +304,6 @@ public class JavaActionExecutor extends ActionExecutor {
}
}
- void injectLauncherUseUberMode(Configuration launcherConf) {
- // Set Uber Mode for the launcher (YARN only, ignored by MR1)
- // Priority:
- // 1. action's <configuration>
- // 2. oozie.action.#action-type#.launcher.mapreduce.job.ubertask.enable
- // 3. oozie.action.launcher.mapreduce.job.ubertask.enable
- if (launcherConf.get(HADOOP_YARN_UBER_MODE) == null) {
- if (ConfigurationService.get(getActionTypeLauncherPrefix() + HADOOP_YARN_UBER_MODE).length() > 0) {
- if (ConfigurationService.getBoolean(getActionTypeLauncherPrefix() + HADOOP_YARN_UBER_MODE)) {
- launcherConf.setBoolean(HADOOP_YARN_UBER_MODE, true);
- }
- } else {
- if (ConfigurationService.getBoolean(OOZIE_ACTION_LAUNCHER_PREFIX + HADOOP_YARN_UBER_MODE)) {
- launcherConf.setBoolean(HADOOP_YARN_UBER_MODE, true);
- }
- }
- }
- }
-
void injectLauncherTimelineServiceEnabled(Configuration launcherConf, Configuration actionConf) {
// Getting delegation token for ATS. If tez-site.xml is present in distributed cache, turn on timeline service.
if (actionConf.get("oozie.launcher." + HADOOP_YARN_TIMELINE_SERVICE_ENABLED) == null
@@ -322,151 +315,6 @@ public class JavaActionExecutor extends ActionExecutor {
}
}
- void updateConfForUberMode(Configuration launcherConf) {
-
- // child.env
- boolean hasConflictEnv = false;
- String launcherMapEnv = launcherConf.get(HADOOP_MAP_JAVA_ENV);
- if (launcherMapEnv == null) {
- launcherMapEnv = launcherConf.get(HADOOP_CHILD_JAVA_ENV);
- }
- String amEnv = launcherConf.get(YARN_AM_ENV);
- StringBuffer envStr = new StringBuffer();
- HashMap<String, List<String>> amEnvMap = null;
- HashMap<String, List<String>> launcherMapEnvMap = null;
- if (amEnv != null) {
- envStr.append(amEnv);
- amEnvMap = populateEnvMap(amEnv);
- }
- if (launcherMapEnv != null) {
- launcherMapEnvMap = populateEnvMap(launcherMapEnv);
- if (amEnvMap != null) {
- Iterator<String> envKeyItr = launcherMapEnvMap.keySet().iterator();
- while (envKeyItr.hasNext()) {
- String envKey = envKeyItr.next();
- if (amEnvMap.containsKey(envKey)) {
- List<String> amValList = amEnvMap.get(envKey);
- List<String> launcherValList = launcherMapEnvMap.get(envKey);
- Iterator<String> valItr = launcherValList.iterator();
- while (valItr.hasNext()) {
- String val = valItr.next();
- if (!amValList.contains(val)) {
- hasConflictEnv = true;
- break;
- }
- else {
- valItr.remove();
- }
- }
- if (launcherValList.isEmpty()) {
- envKeyItr.remove();
- }
- }
- }
- }
- }
- if (hasConflictEnv) {
- launcherConf.setBoolean(HADOOP_YARN_UBER_MODE, false);
- }
- else {
- if (launcherMapEnvMap != null) {
- for (String key : launcherMapEnvMap.keySet()) {
- List<String> launcherValList = launcherMapEnvMap.get(key);
- for (String val : launcherValList) {
- if (envStr.length() > 0) {
- envStr.append(",");
- }
- envStr.append(key).append("=").append(val);
- }
- }
- }
-
- launcherConf.set(YARN_AM_ENV, envStr.toString());
-
- // memory.mb
- int launcherMapMemoryMB = launcherConf.getInt(HADOOP_MAP_MEMORY_MB, 1536);
- int amMemoryMB = launcherConf.getInt(YARN_AM_RESOURCE_MB, 1536);
- // YARN_MEMORY_MB_MIN to provide buffer.
- // suppose launcher map aggressively use high memory, need some
- // headroom for AM
- int memoryMB = Math.max(launcherMapMemoryMB, amMemoryMB) + YARN_MEMORY_MB_MIN;
- // limit to 4096 in case of 32 bit
- if (launcherMapMemoryMB < 4096 && amMemoryMB < 4096 && memoryMB > 4096) {
- memoryMB = 4096;
- }
- launcherConf.setInt(YARN_AM_RESOURCE_MB, memoryMB);
-
- // We already made mapred.child.java.opts and
- // mapreduce.map.java.opts equal, so just start with one of them
- String launcherMapOpts = launcherConf.get(HADOOP_MAP_JAVA_OPTS, "");
- String amChildOpts = launcherConf.get(YARN_AM_COMMAND_OPTS);
- StringBuilder optsStr = new StringBuilder();
- int heapSizeForMap = extractHeapSizeMB(launcherMapOpts);
- int heapSizeForAm = extractHeapSizeMB(amChildOpts);
- int heapSize = Math.max(heapSizeForMap, heapSizeForAm) + YARN_MEMORY_MB_MIN;
- // limit to 3584 in case of 32 bit
- if (heapSizeForMap < 4096 && heapSizeForAm < 4096 && heapSize > 3584) {
- heapSize = 3584;
- }
- if (amChildOpts != null) {
- optsStr.append(amChildOpts);
- }
- optsStr.append(" ").append(launcherMapOpts.trim());
- if (heapSize > 0) {
- // append calculated total heap size to the end
- optsStr.append(" ").append("-Xmx").append(heapSize).append("m");
- }
- launcherConf.set(YARN_AM_COMMAND_OPTS, optsStr.toString().trim());
- }
- }
-
- void updateConfForJavaTmpDir(Configuration conf) {
- String amChildOpts = conf.get(YARN_AM_COMMAND_OPTS);
- String oozieJavaTmpDirSetting = "-Djava.io.tmpdir=./tmp";
- if (amChildOpts != null && !amChildOpts.contains(JAVA_TMP_DIR_SETTINGS)) {
- conf.set(YARN_AM_COMMAND_OPTS, amChildOpts + " " + oozieJavaTmpDirSetting);
- }
- }
-
- private HashMap<String, List<String>> populateEnvMap(String input) {
- HashMap<String, List<String>> envMaps = new HashMap<String, List<String>>();
- String[] envEntries = input.split(",");
- for (String envEntry : envEntries) {
- String[] envKeyVal = envEntry.split("=");
- String envKey = envKeyVal[0].trim();
- List<String> valList = envMaps.get(envKey);
- if (valList == null) {
- valList = new ArrayList<String>();
- }
- valList.add(envKeyVal[1].trim());
- envMaps.put(envKey, valList);
- }
- return envMaps;
- }
-
- public int extractHeapSizeMB(String input) {
- int ret = 0;
- if(input == null || input.equals(""))
- return ret;
- Matcher m = heapPattern.matcher(input);
- String heapStr = null;
- String heapNum = null;
- // Grabs the last match which takes effect (in case that multiple Xmx options specified)
- while (m.find()) {
- heapStr = m.group(1);
- heapNum = m.group(2);
- }
- if (heapStr != null) {
- // when Xmx specified in Gigabyte
- if(heapStr.endsWith("g") || heapStr.endsWith("G")) {
- ret = Integer.parseInt(heapNum) * 1024;
- } else {
- ret = Integer.parseInt(heapNum);
- }
- }
- return ret;
- }
-
public static void parseJobXmlAndConfiguration(Context context, Element element, Path appPath, Configuration conf)
throws IOException, ActionExecutorException, HadoopAccessorException, URISyntaxException {
parseJobXmlAndConfiguration(context, element, appPath, conf, false);
@@ -475,6 +323,7 @@ public class JavaActionExecutor extends ActionExecutor {
public static void parseJobXmlAndConfiguration(Context context, Element element, Path appPath, Configuration conf,
boolean isLauncher) throws IOException, ActionExecutorException, HadoopAccessorException, URISyntaxException {
Namespace ns = element.getNamespace();
+ @SuppressWarnings("unchecked")
Iterator<Element> it = element.getChildren("job-xml", ns).iterator();
HashMap<String, FileSystem> filesystemsMap = new HashMap<String, FileSystem>();
HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
@@ -535,8 +384,8 @@ public class JavaActionExecutor extends ActionExecutor {
throws ActionExecutorException {
try {
HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
- XConfiguration actionDefaults = has.createActionDefaultConf(actionConf.get(HADOOP_JOB_TRACKER), getType());
- XConfiguration.copy(actionDefaults, actionConf);
+ XConfiguration actionDefaults = has.createActionDefaultConf(actionConf.get(HADOOP_YARN_RM), getType());
+ XConfiguration.injectDefaults(actionDefaults, actionConf);
has.checkSupportedFilesystem(appPath.toUri());
// Set the Java Main Class for the Java action to give to the Java launcher
@@ -546,7 +395,6 @@ public class JavaActionExecutor extends ActionExecutor {
// set cancel.delegation.token in actionConf that child job doesn't cancel delegation token
actionConf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", false);
- updateConfForJavaTmpDir(actionConf);
setRootLoggerLevel(actionConf);
return actionConf;
}
@@ -634,8 +482,8 @@ public class JavaActionExecutor extends ActionExecutor {
}
catch (Exception ex) {
LOG.debug(
- "Errors when add to DistributedCache. Path=" + uri.toString() + ", archive=" + archive + ", conf="
- + XmlUtils.prettyPrint(conf).toString());
+ "Errors when add to DistributedCache. Path=" + Objects.toString(uri, "<null>") + ", archive="
+ + archive + ", conf=" + XmlUtils.prettyPrint(conf).toString());
throw convertException(ex);
}
}
@@ -758,7 +606,7 @@ public class JavaActionExecutor extends ActionExecutor {
if (shareLibService != null) {
try {
List<Path> listOfPaths = shareLibService.getSystemLibJars(JavaActionExecutor.OOZIE_COMMON_LIBDIR);
- if (listOfPaths == null || listOfPaths.isEmpty()) {
+ if (listOfPaths.isEmpty()) {
throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "EJ001",
"Could not locate Oozie sharelib");
}
@@ -768,7 +616,7 @@ public class JavaActionExecutor extends ActionExecutor {
DistributedCache.createSymlink(conf);
}
listOfPaths = shareLibService.getSystemLibJars(getType());
- if (listOfPaths != null) {
+ if (!listOfPaths.isEmpty()) {
for (Path actionLibPath : listOfPaths) {
JobUtils.addFileToClassPath(actionLibPath, conf, fs);
DistributedCache.createSymlink(conf);
@@ -885,7 +733,7 @@ public class JavaActionExecutor extends ActionExecutor {
protected String getLauncherMain(Configuration launcherConf, Element actionXml) {
- return launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, JavaMain.class.getName());
+ return launcherConf.get(LauncherAM.CONF_OOZIE_ACTION_MAIN_CLASS, JavaMain.class.getName());
}
private void setJavaMain(Configuration actionConf, Element actionXml) {
@@ -907,8 +755,8 @@ public class JavaActionExecutor extends ActionExecutor {
}
@SuppressWarnings("unchecked")
- JobConf createLauncherConf(FileSystem actionFs, Context context, WorkflowAction action, Element actionXml, Configuration actionConf)
- throws ActionExecutorException {
+ Configuration createLauncherConf(FileSystem actionFs, Context context, WorkflowAction action, Element actionXml,
+ Configuration actionConf) throws ActionExecutorException {
try {
// app path could be a file
@@ -918,7 +766,7 @@ public class JavaActionExecutor extends ActionExecutor {
}
// launcher job configuration
- JobConf launcherJobConf = createBaseHadoopConf(context, actionXml);
+ Configuration launcherJobConf = createBaseHadoopConf(context, actionXml);
// cancel delegation token on a launcher job which stays alive till child job(s) finishes
// otherwise (in mapred action), doesn't cancel not to disturb running child job
launcherJobConf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", true);
@@ -940,7 +788,7 @@ public class JavaActionExecutor extends ActionExecutor {
launcherTime = context.getWorkflow().getCreatedTime().getTime();
}
String actionYarnTag = getActionYarnTag(getWorkflowConf(context), context.getWorkflow(), action);
- LauncherMapperHelper.setupYarnRestartHandling(launcherJobConf, actionConf, actionYarnTag, launcherTime);
+ LauncherHelper.setupYarnRestartHandling(launcherJobConf, actionConf, actionYarnTag, launcherTime);
}
else {
LOG.info(MessageFormat.format("{0} is set to false, not setting YARN restart properties",
@@ -953,15 +801,6 @@ public class JavaActionExecutor extends ActionExecutor {
}
setLibFilesArchives(context, actionXml, appPathRoot, launcherJobConf);
- String jobName = launcherJobConf.get(HADOOP_JOB_NAME);
- if (jobName == null || jobName.isEmpty()) {
- jobName = XLog.format(
- "oozie:launcher:T={0}:W={1}:A={2}:ID={3}", getType(),
- context.getWorkflow().getAppName(), action.getName(),
- context.getWorkflow().getId());
- launcherJobConf.setJobName(jobName);
- }
-
// Inject Oozie job information if enabled.
injectJobInfo(launcherJobConf, actionConf, context, action);
@@ -981,23 +820,22 @@ public class JavaActionExecutor extends ActionExecutor {
prepareXML = XmlUtils.prettyPrint(prepareElement).toString().trim();
}
}
- LauncherMapperHelper.setupLauncherInfo(launcherJobConf, jobId, actionId, actionDir, recoveryId, actionConf,
+ LauncherHelper.setupLauncherInfo(launcherJobConf, jobId, actionId, actionDir, recoveryId, actionConf,
prepareXML);
// Set the launcher Main Class
- LauncherMapperHelper.setupMainClass(launcherJobConf, getLauncherMain(launcherJobConf, actionXml));
- LauncherMapperHelper.setupLauncherURIHandlerConf(launcherJobConf);
-
- LauncherMapperHelper.setupMaxOutputData(launcherJobConf, getMaxOutputData(actionConf));
- LauncherMapperHelper.setupMaxExternalStatsSize(launcherJobConf, maxExternalStatsSize);
- LauncherMapperHelper.setupMaxFSGlob(launcherJobConf, maxFSGlobMax);
+ LauncherHelper.setupMainClass(launcherJobConf, getLauncherMain(launcherJobConf, actionXml));
+ LauncherHelper.setupLauncherURIHandlerConf(launcherJobConf);
+ LauncherHelper.setupMaxOutputData(launcherJobConf, getMaxOutputData(actionConf));
+ LauncherHelper.setupMaxExternalStatsSize(launcherJobConf, maxExternalStatsSize);
+ LauncherHelper.setupMaxFSGlob(launcherJobConf, maxFSGlobMax);
List<Element> list = actionXml.getChildren("arg", ns);
String[] args = new String[list.size()];
for (int i = 0; i < list.size(); i++) {
args[i] = list.get(i).getTextTrim();
}
- LauncherMapperHelper.setupMainArguments(launcherJobConf, args);
+ LauncherHelper.setupMainArguments(launcherJobConf, args);
// backward compatibility flag - see OOZIE-2872
if (ConfigurationService.getBoolean(LauncherMapper.CONF_OOZIE_NULL_ARGS_ALLOWED)) {
launcherJobConf.setBoolean(LauncherMapper.CONF_OOZIE_NULL_ARGS_ALLOWED, true);
@@ -1022,16 +860,6 @@ public class JavaActionExecutor extends ActionExecutor {
launcherJobConf.set(HADOOP_CHILD_JAVA_OPTS, opts.toString().trim());
launcherJobConf.set(HADOOP_MAP_JAVA_OPTS, opts.toString().trim());
- // setting for uber mode
- if (launcherJobConf.getBoolean(HADOOP_YARN_UBER_MODE, false)) {
- if (checkPropertiesToDisableUber(launcherJobConf)) {
- launcherJobConf.setBoolean(HADOOP_YARN_UBER_MODE, false);
- }
- else {
- updateConfForUberMode(launcherJobConf);
- }
- }
- updateConfForJavaTmpDir(launcherJobConf);
injectLauncherTimelineServiceEnabled(launcherJobConf, actionConf);
// properties from action that are needed by the launcher (e.g. QUEUE NAME, ACLs)
@@ -1055,23 +883,9 @@ public class JavaActionExecutor extends ActionExecutor {
return maxActionOutputLen;
}
- private boolean checkPropertiesToDisableUber(Configuration launcherConf) {
- boolean disable = false;
- if (launcherConf.getBoolean(HADOOP_JOB_CLASSLOADER, false)) {
- disable = true;
- }
- else if (launcherConf.getBoolean(HADOOP_USER_CLASSPATH_FIRST, false)) {
- disable = true;
- }
- return disable;
- }
-
protected void injectCallback(Context context, Configuration conf) {
- String callback = context.getCallbackUrl("$jobStatus");
- if (conf.get("job.end.notification.url") != null) {
- LOG.warn("Overriding the action job end notification URI");
- }
- conf.set("job.end.notification.url", callback);
+ String callback = context.getCallbackUrl(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_JOBSTATUS_TOKEN);
+ conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_URL, callback);
}
void injectActionCallback(Context context, Configuration actionConf) {
@@ -1082,7 +896,7 @@ public class JavaActionExecutor extends ActionExecutor {
injectCallback(context, launcherConf);
}
- private void actionConfToLauncherConf(Configuration actionConf, JobConf launcherConf) {
+ private void actionConfToLauncherConf(Configuration actionConf, Configuration launcherConf) {
for (String name : SPECIAL_PROPERTIES) {
if (actionConf.get(name) != null && launcherConf.get("oozie.launcher." + name) == null) {
launcherConf.set(name, actionConf.get(name));
@@ -1090,9 +904,8 @@ public class JavaActionExecutor extends ActionExecutor {
}
}
- public void submitLauncher(FileSystem actionFs, Context context, WorkflowAction action) throws ActionExecutorException {
- JobClient jobClient = null;
- boolean exception = false;
+ public void submitLauncher(FileSystem actionFs, final Context context, WorkflowAction action) throws ActionExecutorException {
+ YarnClient yarnClient = null;
try {
Path appPathRoot = new Path(context.getWorkflow().getAppPath());
@@ -1109,14 +922,6 @@ public class JavaActionExecutor extends ActionExecutor {
LOG.debug("Setting LibFilesArchives ");
setLibFilesArchives(context, actionXml, appPathRoot, actionConf);
- String jobName = actionConf.get(HADOOP_JOB_NAME);
- if (jobName == null || jobName.isEmpty()) {
- jobName = XLog.format("oozie:action:T={0}:W={1}:A={2}:ID={3}",
- getType(), context.getWorkflow().getAppName(),
- action.getName(), context.getWorkflow().getId());
- actionConf.set(HADOOP_JOB_NAME, jobName);
- }
-
injectActionCallback(context, actionConf);
if(actionConf.get(ACL_MODIFY_JOB) == null || actionConf.get(ACL_MODIFY_JOB).trim().equals("")) {
@@ -1130,15 +935,17 @@ public class JavaActionExecutor extends ActionExecutor {
}
// Setting the credential properties in launcher conf
- JobConf credentialsConf = null;
+ Configuration credentialsConf = null;
+
HashMap<String, CredentialsProperties> credentialsProperties = setCredentialPropertyToActionConf(context,
action, actionConf);
+ Credentials credentials = null;
if (credentialsProperties != null) {
-
+ credentials = new Credentials();
// Adding if action need to set more credential tokens
- credentialsConf = new JobConf(false);
+ credentialsConf = new Configuration(false);
XConfiguration.copy(actionConf, credentialsConf);
- setCredentialTokens(credentialsConf, context, action, credentialsProperties);
+ setCredentialTokens(credentials, credentialsConf, context, action, credentialsProperties);
// insert conf to action conf from credentialsConf
for (Entry<String, String> entry : credentialsConf) {
@@ -1147,49 +954,56 @@ public class JavaActionExecutor extends ActionExecutor {
}
}
}
+ Configuration launcherJobConf = createLauncherConf(actionFs, context, action, actionXml, actionConf);
- JobConf launcherJobConf = createLauncherConf(actionFs, context, action, actionXml, actionConf);
-
- LOG.debug("Creating Job Client for action " + action.getId());
- jobClient = createJobClient(context, launcherJobConf);
- String launcherId = LauncherMapperHelper.getRecoveryId(launcherJobConf, context.getActionDir(), context
+ String consoleUrl;
+ String launcherId = LauncherHelper.getRecoveryId(launcherJobConf, context.getActionDir(), context
.getRecoveryId());
boolean alreadyRunning = launcherId != null;
- RunningJob runningJob;
// if user-retry is on, always submit new launcher
boolean isUserRetry = ((WorkflowActionBean)action).isUserRetry();
+ LOG.debug("Creating yarnClient for action {0}", action.getId());
+ yarnClient = createYarnClient(context, launcherJobConf);
if (alreadyRunning && !isUserRetry) {
- runningJob = jobClient.getJob(JobID.forName(launcherId));
- if (runningJob == null) {
- String jobTracker = launcherJobConf.get(HADOOP_JOB_TRACKER);
+ try {
+ ApplicationId appId = ConverterUtils.toApplicationId(launcherId);
+ ApplicationReport report = yarnClient.getApplicationReport(appId);
+ consoleUrl = report.getTrackingUrl();
+ } catch (RemoteException e) {
+ // caught when the application id does not exist
+ LOG.error("Got RemoteException from YARN", e);
+ String jobTracker = launcherJobConf.get(HADOOP_YARN_RM);
throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA017",
"unknown job [{0}@{1}], cannot recover", launcherId, jobTracker);
}
}
else {
- LOG.debug("Submitting the job through Job Client for action " + action.getId());
-
- // setting up propagation of the delegation token.
- Services.get().get(HadoopAccessorService.class).addRMDelegationToken(jobClient, launcherJobConf);
+ // TODO: OYA: do we actually need an MR token? IIRC, it's issued by the JHS
+// // setting up propagation of the delegation token.
+// Token<DelegationTokenIdentifier> mrdt = null;
+// HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
+// mrdt = jobClient.getDelegationToken(has
+// .getMRDelegationTokenRenewer(launcherJobConf));
+// launcherJobConf.getCredentials().addToken(HadoopAccessorService.MR_TOKEN_ALIAS, mrdt);
// insert credentials tokens to launcher job conf if needed
- if (needInjectCredentials() && credentialsConf != null) {
- for (Token<? extends TokenIdentifier> tk : credentialsConf.getCredentials().getAllTokens()) {
+ if (credentialsConf != null) {
+ for (Token<? extends TokenIdentifier> tk :credentials.getAllTokens()) {
Text fauxAlias = new Text(tk.getKind() + "_" + tk.getService());
LOG.debug("ADDING TOKEN: " + fauxAlias);
- launcherJobConf.getCredentials().addToken(fauxAlias, tk);
+ credentials.addToken(fauxAlias, tk);
}
- if (credentialsConf.getCredentials().numberOfSecretKeys() > 0) {
+ if (credentials.numberOfSecretKeys() > 0) {
for (Entry<String, CredentialsProperties> entry : credentialsProperties.entrySet()) {
CredentialsProperties credProps = entry.getValue();
if (credProps != null) {
Text credName = new Text(credProps.getName());
- byte[] secKey = credentialsConf.getCredentials().getSecretKey(credName);
+ byte[] secKey = credentials.getSecretKey(credName);
if (secKey != null) {
LOG.debug("ADDING CREDENTIAL: " + credProps.getName());
- launcherJobConf.getCredentials().addSecretKey(credName, secKey);
+ credentials.addSecretKey(credName, secKey);
}
}
}
@@ -1198,55 +1012,129 @@ public class JavaActionExecutor extends ActionExecutor {
else {
LOG.info("No need to inject credentials.");
}
- runningJob = jobClient.submitJob(launcherJobConf);
- if (runningJob == null) {
- throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA017",
- "Error submitting launcher for action [{0}]", action.getId());
- }
- launcherId = runningJob.getID().toString();
- LOG.debug("After submission get the launcherId " + launcherId);
+
+ String user = context.getWorkflow().getUser();
+
+ YarnClientApplication newApp = yarnClient.createApplication();
+ ApplicationId appId = newApp.getNewApplicationResponse().getApplicationId();
+ ApplicationSubmissionContext appContext =
+ createAppSubmissionContext(appId, launcherJobConf, user, context, actionConf, action.getName(),
+ credentials);
+ yarnClient.submitApplication(appContext);
+
+ launcherId = appId.toString();
+ LOG.debug("After submission get the launcherId [{0}]", launcherId);
+ ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+ consoleUrl = appReport.getTrackingUrl();
}
- String jobTracker = launcherJobConf.get(HADOOP_JOB_TRACKER);
- String consoleUrl = runningJob.getTrackingURL();
+ String jobTracker = launcherJobConf.get(HADOOP_YARN_RM);
context.setStartData(launcherId, jobTracker, consoleUrl);
}
catch (Exception ex) {
- exception = true;
throw convertException(ex);
}
finally {
- if (jobClient != null) {
- try {
- jobClient.close();
- }
- catch (Exception e) {
- if (exception) {
- LOG.error("JobClient error: ", e);
- }
- else {
- throw convertException(e);
- }
- }
+ if (yarnClient != null) {
+ Closeables.closeQuietly(yarnClient);
}
}
}
- private boolean needInjectCredentials() {
- boolean methodExists = true;
- Class klass;
- try {
- klass = Class.forName("org.apache.hadoop.mapred.JobConf");
- klass.getMethod("getCredentials");
- }
- catch (ClassNotFoundException ex) {
- methodExists = false;
- }
- catch (NoSuchMethodException ex) {
- methodExists = false;
+ private ApplicationSubmissionContext createAppSubmissionContext(ApplicationId appId, Configuration launcherJobConf,
+ String user, Context context, Configuration actionConf, String actionName,
+ Credentials credentials)
+ throws IOException, HadoopAccessorException, URISyntaxException {
+
+ ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class);
+
+ String jobName = XLog.format(
+ "oozie:launcher:T={0}:W={1}:A={2}:ID={3}", getType(),
+ context.getWorkflow().getAppName(), actionName,
+ context.getWorkflow().getId());
+
+ appContext.setApplicationId(appId);
+ appContext.setApplicationName(jobName);
+ appContext.setApplicationType("Oozie Launcher");
+ Priority pri = Records.newRecord(Priority.class);
+ int priority = 0; // TODO: OYA: Add a constant or a config
+ pri.setPriority(priority);
+ appContext.setPriority(pri);
+ appContext.setQueue("default"); // TODO: will be possible to set in <launcher>
+ ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
+
+ // Set the resources to localize
+ Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+ ClientDistributedCacheManager.determineTimestampsAndCacheVisibilities(launcherJobConf);
+ MRApps.setupDistributedCache(launcherJobConf, localResources);
+ // Add the Launcher and Action configs as Resources
+ HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
+ LocalResource launcherJobConfLR = has.createLocalResourceForConfigurationFile(LauncherAM.LAUNCHER_JOB_CONF_XML, user,
+ launcherJobConf, context.getAppFileSystem().getUri(), context.getActionDir());
+ localResources.put(LauncherAM.LAUNCHER_JOB_CONF_XML, launcherJobConfLR);
+ LocalResource actionConfLR = has.createLocalResourceForConfigurationFile(LauncherAM.ACTION_CONF_XML, user, actionConf,
+ context.getAppFileSystem().getUri(), context.getActionDir());
+ localResources.put(LauncherAM.ACTION_CONF_XML, actionConfLR);
+ amContainer.setLocalResources(localResources);
+
+ // Set the environment variables
+ Map<String, String> env = new HashMap<String, String>();
+ // This adds the Hadoop jars to the classpath in the Launcher JVM
+ ClasspathUtils.setupClasspath(env, launcherJobConf);
+
+ if (needToAddMapReduceToClassPath()) {
+ ClasspathUtils.addMapReduceToClasspath(env, launcherJobConf);
+ }
+
+ addActionSpecificEnvVars(env);
+ amContainer.setEnvironment(Collections.unmodifiableMap(env));
+
+ // Set the command
+ List<String> vargs = new ArrayList<String>(6);
+ vargs.add(Apps.crossPlatformify(ApplicationConstants.Environment.JAVA_HOME.toString())
+ + "/bin/java");
+
+ vargs.add("-Dlog4j.configuration=container-log4j.properties");
+ vargs.add("-Dlog4j.debug=true");
+ vargs.add("-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_DIR + "=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR);
+ vargs.add("-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_SIZE + "=" + 0);
+ vargs.add("-Dhadoop.root.logger=INFO,CLA");
+ vargs.add("-Dhadoop.root.logfile=" + TaskLog.LogName.SYSLOG);
+ vargs.add("-Dsubmitter.user=" + context.getWorkflow().getUser());
+
+ Path amTmpDir = new Path(Apps.crossPlatformify(ApplicationConstants.Environment.PWD.toString()),
+ YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR);
+ vargs.add("-Djava.io.tmpdir=" + amTmpDir);
+
+ vargs.add(LauncherAM.class.getCanonicalName());
+ vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
+ Path.SEPARATOR + ApplicationConstants.STDOUT);
+ vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
+ Path.SEPARATOR + ApplicationConstants.STDERR);
+ StringBuilder mergedCommand = new StringBuilder();
+ for (CharSequence str : vargs) {
+ mergedCommand.append(str).append(" ");
+ }
+
+ List<String> vargsFinal = ImmutableList.of(mergedCommand.toString());
+ LOG.debug("Command to launch container for ApplicationMaster is: {0}", mergedCommand);
+ amContainer.setCommands(vargsFinal);
+ appContext.setAMContainerSpec(amContainer);
+
+ // Set tokens
+ if (credentials != null) {
+ DataOutputBuffer dob = new DataOutputBuffer();
+ credentials.writeTokenStorageToStream(dob);
+ amContainer.setTokens(ByteBuffer.wrap(dob.getData(), 0, dob.getLength()));
}
- return methodExists;
+ // Set Resources
+ // TODO: OYA: make resources allocated for the AM configurable and choose good defaults (memory MB, vcores)
+ Resource resource = Resource.newInstance(2048, 1);
+ appContext.setResource(resource);
+ appContext.setCancelTokensWhenComplete(true);
+
+ return appContext;
}
protected HashMap<String, CredentialsProperties> setCredentialPropertyToActionConf(Context context,
@@ -1258,15 +1146,16 @@ public class JavaActionExecutor extends ActionExecutor {
if ("false".equals(actionConf.get(OOZIE_CREDENTIALS_SKIP)) ||
!wfJobConf.getBoolean(OOZIE_CREDENTIALS_SKIP, ConfigurationService.getBoolean(OOZIE_CREDENTIALS_SKIP))) {
credPropertiesMap = getActionCredentialsProperties(context, action);
- if (credPropertiesMap != null) {
- for (String key : credPropertiesMap.keySet()) {
- CredentialsProperties prop = credPropertiesMap.get(key);
- if (prop != null) {
+ if (!credPropertiesMap.isEmpty()) {
+ for (Entry<String, CredentialsProperties> entry : credPropertiesMap.entrySet()) {
+ if (entry.getValue() != null) {
+ CredentialsProperties prop = entry.getValue();
LOG.debug("Credential Properties set for action : " + action.getId());
- for (String property : prop.getProperties().keySet()) {
- actionConf.set(property, prop.getProperties().get(property));
- LOG.debug("property : '" + property + "', value : '" + prop.getProperties().get(property)
- + "'");
+ for (Entry<String, String> propEntry : prop.getProperties().entrySet()) {
+ String key = propEntry.getKey();
+ String value = propEntry.getValue();
+ actionConf.set(key, value);
+ LOG.debug("property : '" + key + "', value : '" + value + "'");
}
}
}
@@ -1285,20 +1174,20 @@ public class JavaActionExecutor extends ActionExecutor {
return credPropertiesMap;
}
- protected void setCredentialTokens(JobConf jobconf, Context context, WorkflowAction action,
+ protected void setCredentialTokens(Credentials credentials, Configuration jobconf, Context context, WorkflowAction action,
HashMap<String, CredentialsProperties> credPropertiesMap) throws Exception {
if (context != null && action != null && credPropertiesMap != null) {
// Make sure we're logged into Kerberos; if not, or near expiration, it will relogin
- CredentialsProvider.ensureKerberosLogin();
+ CredentialsProviderFactory.ensureKerberosLogin();
for (Entry<String, CredentialsProperties> entry : credPropertiesMap.entrySet()) {
String credName = entry.getKey();
CredentialsProperties credProps = entry.getValue();
if (credProps != null) {
- CredentialsProvider credProvider = new CredentialsProvider(credProps.getType());
- Credentials credentialObject = credProvider.createCredentialObject();
- if (credentialObject != null) {
- credentialObject.addtoJobConf(jobconf, credProps, context);
+ CredentialsProvider tokenProvider = CredentialsProviderFactory.getInstance()
+ .createCredentialsProvider(credProps.getType());
+ if (tokenProvider != null) {
+ tokenProvider.updateCredentials(credentials, jobconf, credProps, context);
LOG.debug("Retrieved Credential '" + credName + "' for action " + action.getId());
}
else {
@@ -1310,7 +1199,6 @@ public class JavaActionExecutor extends ActionExecutor {
}
}
}
-
}
protected HashMap<String, CredentialsProperties> getActionCredentialsProperties(Context context,
@@ -1424,19 +1312,22 @@ public class JavaActionExecutor extends ActionExecutor {
* @return JobClient
* @throws HadoopAccessorException
*/
- protected JobClient createJobClient(Context context, JobConf jobConf) throws HadoopAccessorException {
+ protected JobClient createJobClient(Context context, Configuration jobConf) throws HadoopAccessorException {
String user = context.getWorkflow().getUser();
- String group = context.getWorkflow().getGroup();
return Services.get().get(HadoopAccessorService.class).createJobClient(user, jobConf);
}
- protected RunningJob getRunningJob(Context context, WorkflowAction action, JobClient jobClient) throws Exception{
- String externalId = action.getExternalId();
- RunningJob runningJob = null;
- if (externalId != null) {
- runningJob = jobClient.getJob(JobID.forName(externalId));
- }
- return runningJob;
+ /**
+ * Create yarn client object
+ *
+ * @param context
+ * @param jobConf
+ * @return YarnClient
+ * @throws HadoopAccessorException
+ */
+ protected YarnClient createYarnClient(Context context, Configuration jobConf) throws HadoopAccessorException {
+ String user = context.getWorkflow().getUser();
+ return Services.get().get(HadoopAccessorService.class).createYarnClient(user, jobConf);
}
/**
@@ -1448,142 +1339,141 @@ public class JavaActionExecutor extends ActionExecutor {
return action.getExternalId();
}
+ /**
+ * If returns true, it means that we have to add Hadoop MR jars to the classpath.
+ * Subclasses should override this method if necessary. By default we don't add
+ * MR jars to the classpath.
+ * @return false by default
+ */
+ protected boolean needToAddMapReduceToClassPath() {
+ return false;
+ }
+
+ /**
+ * Adds action-specific environment variables. Default implementation is no-op.
+ * Subclasses should override this method if necessary.
+ *
+ */
+ protected void addActionSpecificEnvVars(Map<String, String> env) {
+ // nop
+ }
+
@Override
public void check(Context context, WorkflowAction action) throws ActionExecutorException {
- JobClient jobClient = null;
- boolean exception = false;
+ boolean fallback = false;
+ LOG = XLog.resetPrefix(LOG);
LogUtils.setLogInfo(action);
+ YarnClient yarnClient = null;
try {
Element actionXml = XmlUtils.parseXml(action.getConf());
+ Configuration jobConf = createBaseHadoopConf(context, actionXml);
FileSystem actionFs = context.getAppFileSystem();
- JobConf jobConf = createBaseHadoopConf(context, actionXml);
- jobClient = createJobClient(context, jobConf);
- RunningJob runningJob = getRunningJob(context, action, jobClient);
- if (runningJob == null) {
- context.setExecutionData(FAILED, null);
- throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017",
- "Could not lookup launched hadoop Job ID [{0}] which was associated with " +
- " action [{1}]. Failing this action!", getActualExternalId(action), action.getId());
- }
- if (runningJob.isComplete()) {
+ yarnClient = createYarnClient(context, jobConf);
+ FinalApplicationStatus appStatus = null;
+ try {
+ ApplicationReport appReport =
+ yarnClient.getApplicationReport(ConverterUtils.toApplicationId(action.getExternalId()));
+ YarnApplicationState appState = appReport.getYarnApplicationState();
+ if (appState == YarnApplicationState.FAILED || appState == YarnApplicationState.FINISHED
+ || appState == YarnApplicationState.KILLED) {
+ appStatus = appReport.getFinalApplicationStatus();
+ }
+
+ } catch (Exception ye) {
+ LOG.warn("Exception occurred while checking Launcher AM status; will try checking action data file instead ", ye);
+ // Fallback to action data file if we can't find the Launcher AM (maybe it got purged)
+ fallback = true;
+ }
+ if (appStatus != null || fallback) {
Path actionDir = context.getActionDir();
- String newId = null;
// load sequence file into object
- Map<String, String> actionData = LauncherMapperHelper.getActionData(actionFs, actionDir, jobConf);
- if (actionData.containsKey(LauncherMapper.ACTION_DATA_NEW_ID)) {
- newId = actionData.get(LauncherMapper.ACTION_DATA_NEW_ID);
- String launcherId = action.getExternalId();
- runningJob = jobClient.getJob(JobID.forName(newId));
- if (runningJob == null) {
- context.setExternalStatus(FAILED);
+ Map<String, String> actionData = LauncherHelper.getActionData(actionFs, actionDir, jobConf);
+ if (fallback) {
+ String finalStatus = actionData.get(LauncherAM.ACTION_DATA_FINAL_STATUS);
+ if (finalStatus != null) {
+ appStatus = FinalApplicationStatus.valueOf(finalStatus);
+ } else {
+ context.setExecutionData(FAILED, null);
throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017",
- "Unknown hadoop job [{0}] associated with action [{1}]. Failing this action!", newId,
- action.getId());
+ "Unknown hadoop job [{0}] associated with action [{1}] and couldn't determine status from" +
+ " action data. Failing this action!", action.getExternalId(), action.getId());
}
- context.setExternalChildIDs(newId);
- LOG.info(XLog.STD, "External ID swap, old ID [{0}] new ID [{1}]", launcherId,
- newId);
}
- else {
- String externalIDs = actionData.get(LauncherMapper.ACTION_DATA_EXTERNAL_CHILD_IDS);
- if (externalIDs != null) {
- context.setExternalChildIDs(externalIDs);
- LOG.info(XLog.STD, "Hadoop Jobs launched : [{0}]", externalIDs);
+
+ String externalID = actionData.get(LauncherAM.ACTION_DATA_NEW_ID); // MapReduce was launched
+ if (externalID != null) {
+ context.setExternalChildIDs(externalID);
+ LOG.info(XLog.STD, "Hadoop Job was launched : [{0}]", externalID);
+ }
+
+ // Multiple child IDs - Pig or Hive action
+ String externalIDs = actionData.get(LauncherAM.ACTION_DATA_EXTERNAL_CHILD_IDS);
+ if (externalIDs != null) {
+ context.setExternalChildIDs(externalIDs);
+ LOG.info(XLog.STD, "External Child IDs : [{0}]", externalIDs);
+
+ }
+
+ LOG.info(XLog.STD, "action completed, external ID [{0}]", action.getExternalId());
+ context.setExecutionData(appStatus.toString(), null);
+ if (appStatus == FinalApplicationStatus.SUCCEEDED) {
+ if (getCaptureOutput(action) && LauncherHelper.hasOutputData(actionData)) {
+ context.setExecutionData(SUCCEEDED, PropertiesUtils.stringToProperties(actionData
+ .get(LauncherAM.ACTION_DATA_OUTPUT_PROPS)));
+ LOG.info(XLog.STD, "action produced output");
}
- else if (LauncherMapperHelper.hasOutputData(actionData)) {
- // Load stored Hadoop jobs ids and promote them as external child ids
- // This is for jobs launched with older release during upgrade to Oozie 4.3
- Properties props = PropertiesUtils.stringToProperties(actionData
- .get(LauncherMapper.ACTION_DATA_OUTPUT_PROPS));
- if (props.get(LauncherMain.HADOOP_JOBS) != null) {
- externalIDs = (String) props.get(LauncherMain.HADOOP_JOBS);
- context.setExternalChildIDs(externalIDs);
- LOG.info(XLog.STD, "Hadoop Jobs launched : [{0}]", externalIDs);
- }
+ else {
+ context.setExecutionData(SUCCEEDED, null);
}
- }
- if (runningJob.isComplete()) {
- // fetching action output and stats for the Map-Reduce action.
- if (newId != null) {
- actionData = LauncherMapperHelper.getActionData(actionFs, context.getActionDir(), jobConf);
+ if (LauncherHelper.hasStatsData(actionData)) {
+ context.setExecutionStats(actionData.get(LauncherAM.ACTION_DATA_STATS));
+ LOG.info(XLog.STD, "action produced stats");
}
- LOG.info(XLog.STD, "action completed, external ID [{0}]",
- action.getExternalId());
- if (LauncherMapperHelper.isMainSuccessful(runningJob)) {
- if (getCaptureOutput(action) && LauncherMapperHelper.hasOutputData(actionData)) {
- context.setExecutionData(SUCCEEDED, PropertiesUtils.stringToProperties(actionData
- .get(LauncherMapper.ACTION_DATA_OUTPUT_PROPS)));
- LOG.info(XLog.STD, "action produced output");
+ getActionData(actionFs, action, context);
+ }
+ else {
+ String errorReason;
+ if (actionData.containsKey(LauncherAM.ACTION_DATA_ERROR_PROPS)) {
+ Properties props = PropertiesUtils.stringToProperties(actionData
+ .get(LauncherAM.ACTION_DATA_ERROR_PROPS));
+ String errorCode = props.getProperty("error.code");
+ if ("0".equals(errorCode)) {
+ errorCode = "JA018";
}
- else {
- context.setExecutionData(SUCCEEDED, null);
+ if ("-1".equals(errorCode)) {
+ errorCode = "JA019";
}
- if (LauncherMapperHelper.hasStatsData(actionData)) {
- context.setExecutionStats(actionData.get(LauncherMapper.ACTION_DATA_STATS));
- LOG.info(XLog.STD, "action produced stats");
+ errorReason = props.getProperty("error.reason");
+ LOG.warn("Launcher ERROR, reason: {0}", errorReason);
+ String exMsg = props.getProperty("exception.message");
+ String errorInfo = (exMsg != null) ? exMsg : errorReason;
+ context.setErrorInfo(errorCode, errorInfo);
+ String exStackTrace = props.getProperty("exception.stacktrace");
+ if (exMsg != null) {
+ LOG.warn("Launcher exception: {0}{E}{1}", exMsg, exStackTrace);
}
- getActionData(actionFs, runningJob, action, context);
}
else {
- String errorReason;
- if (actionData.containsKey(LauncherMapper.ACTION_DATA_ERROR_PROPS)) {
- Properties props = PropertiesUtils.stringToProperties(actionData
- .get(LauncherMapper.ACTION_DATA_ERROR_PROPS));
- String errorCode = props.getProperty("error.code");
- if ("0".equals(errorCode)) {
- errorCode = "JA018";
- }
- if ("-1".equals(errorCode)) {
- errorCode = "JA019";
- }
- errorReason = props.getProperty("error.reason");
- LOG.warn("Launcher ERROR, reason: {0}", errorReason);
- String exMsg = props.getProperty("exception.message");
- String errorInfo = (exMsg != null) ? exMsg : errorReason;
- context.setErrorInfo(errorCode, errorInfo);
- String exStackTrace = props.getProperty("exception.stacktrace");
- if (exMsg != null) {
- LOG.warn("Launcher exception: {0}{E}{1}", exMsg, exStackTrace);
- }
- }
- else {
- errorReason = XLog.format("LauncherMapper died, check Hadoop LOG for job [{0}:{1}]", action
- .getTrackerUri(), action.getExternalId());
- LOG.warn(errorReason);
- }
- context.setExecutionData(FAILED_KILLED, null);
+ errorReason = XLog.format("Launcher AM died, check Hadoop LOG for job [{0}:{1}]", action
+ .getTrackerUri(), action.getExternalId());
+ LOG.warn(errorReason);
}
- }
- else {
- context.setExternalStatus("RUNNING");
- LOG.info(XLog.STD, "checking action, hadoop job ID [{0}] status [RUNNING]",
- runningJob.getID());
+ context.setExecutionData(FAILED_KILLED, null);
}
}
else {
- context.setExternalStatus("RUNNING");
+ context.setExternalStatus(YarnApplicationState.RUNNING.toString());
LOG.info(XLog.STD, "checking action, hadoop job ID [{0}] status [RUNNING]",
- runningJob.getID());
+ action.getExternalId());
}
}
catch (Exception ex) {
LOG.warn("Exception in check(). Message[{0}]", ex.getMessage(), ex);
- exception = true;
throw convertException(ex);
}
finally {
- if (jobClient != null) {
- try {
- jobClient.close();
- }
- catch (Exception e) {
- if (exception) {
- LOG.error("JobClient error: ", e);
- }
- else {
- throw convertException(e);
- }
- }
+ if (yarnClient != null) {
+ IOUtils.closeQuietly(yarnClient);
}
}
}
@@ -1591,14 +1481,12 @@ public class JavaActionExecutor extends ActionExecutor {
/**
* Get the output data of an action. Subclasses should override this method
* to get action specific output data.
- *
* @param actionFs the FileSystem object
- * @param runningJob the runningJob
* @param action the Workflow action
* @param context executor context
*
*/
- protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context)
+ protected void getActionData(FileSystem actionFs, WorkflowAction action, Context context)
throws HadoopAccessorException, JDOMException, IOException, URISyntaxException {
}
@@ -1611,55 +1499,39 @@ public class JavaActionExecutor extends ActionExecutor {
@Override
public void kill(Context context, WorkflowAction action) throws ActionExecutorException {
- JobClient jobClient = null;
- boolean exception = false;
+ YarnClient yarnClient = null;
try {
Element actionXml = XmlUtils.parseXml(action.getConf());
- final JobConf jobConf = createBaseHadoopConf(context, actionXml);
- WorkflowJob wfJob = context.getWorkflow();
- Configuration conf = null;
- if ( wfJob.getConf() != null ) {
- conf = new XConfiguration(new StringReader(wfJob.getConf()));
- }
- String launcherTag = LauncherMapperHelper.getActionYarnTag(conf, wfJob.getParentId(), action);
- jobConf.set(LauncherMainHadoopUtils.CHILD_MAPREDUCE_JOB_TAGS, LauncherMapperHelper.getTag(launcherTag));
- jobConf.set(LauncherMainHadoopUtils.OOZIE_JOB_LAUNCH_TIME, Long.toString(action.getStartTime().getTime()));
- UserGroupInformation ugi = Services.get().get(UserGroupInformationService.class)
- .getProxyUser(context.getWorkflow().getUser());
- ugi.doAs(new PrivilegedExceptionAction<Void>() {
- @Override
- public Void run() throws Exception {
- LauncherMainHadoopUtils.killChildYarnJobs(jobConf);
- return null;
+
+ final Configuration jobConf = createBaseHadoopConf(context, actionXml);
+ String launcherTag = LauncherHelper.getActionYarnTag(jobConf, context.getWorkflow().getParentId(), action);
+ jobConf.set(LauncherMain.CHILD_MAPREDUCE_JOB_TAGS, LauncherHelper.getTag(launcherTag));
+ yarnClient = createYarnClient(context, jobConf);
+ if(action.getExternalId() != null) {
+ yarnClient.killApplication(ConverterUtils.toApplicationId(action.getExternalId()));
+ }
+ for(ApplicationId id : LauncherMain.getChildYarnJobs(jobConf, ApplicationsRequestScope.ALL,
+ action.getStartTime().getTime())){
+ try {
+ yarnClient.killApplication(id);
+ } catch (Exception e) {
+ LOG.warn("Could not kill child of {0}, {1}", action.getExternalId(), id);
}
- });
- jobClient = createJobClient(context, jobConf);
- RunningJob runningJob = getRunningJob(context, action, jobClient);
- if (runningJob != null) {
- runningJob.killJob();
}
+
context.setExternalStatus(KILLED);
context.setExecutionData(KILLED, null);
- }
- catch (Exception ex) {
- exception = true;
+ } catch (Exception ex) {
+ LOG.error("Error when killing YARN application", ex);
throw convertException(ex);
- }
- finally {
+ } finally {
try {
FileSystem actionFs = context.getAppFileSystem();
cleanUpActionDir(actionFs, context);
- if (jobClient != null) {
- jobClient.close();
- }
- }
- catch (Exception ex) {
- if (exception) {
- LOG.error("Error: ", ex);
- }
- else {
- throw convertException(ex);
- }
+ Closeables.closeQuietly(yarnClient);
+ } catch (Exception ex) {
+ LOG.error("Error when cleaning up action dir", ex);
+ throw convertException(ex);
}
}
}
@@ -1754,7 +1626,7 @@ public class JavaActionExecutor extends ActionExecutor {
HadoopAccessorException, URISyntaxException {
}
- private void injectJobInfo(JobConf launcherJobConf, Configuration actionConf, Context context, WorkflowAction action) {
+ private void injectJobInfo(Configuration launcherJobConf, Configuration actionConf, Context context, WorkflowAction action) {
if (OozieJobInfo.isJobInfoEnabled()) {
try {
OozieJobInfo jobInfo = new OozieJobInfo(actionConf, context, action);
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/action/hadoop/LauncherHelper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/LauncherHelper.java b/core/src/main/java/org/apache/oozie/action/hadoop/LauncherHelper.java
new file mode 100644
index 0000000..f80141c
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/LauncherHelper.java
@@ -0,0 +1,322 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.action.hadoop;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.math.BigInteger;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.service.HadoopAccessorException;
+import org.apache.oozie.service.HadoopAccessorService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.service.URIHandlerService;
+import org.apache.oozie.service.UserGroupInformationService;
+import org.apache.oozie.util.IOUtils;
+import org.apache.oozie.util.PropertiesUtils;
+
+public class LauncherHelper {
+
+ public static final String OOZIE_ACTION_YARN_TAG = "oozie.action.yarn.tag";
+
+ private static final LauncherInputFormatClassLocator launcherInputFormatClassLocator = new LauncherInputFormatClassLocator();
+
+ public static String getRecoveryId(Configuration launcherConf, Path actionDir, String recoveryId)
+ throws HadoopAccessorException, IOException {
+ String jobId = null;
+ Path recoveryFile = new Path(actionDir, recoveryId);
+ FileSystem fs = Services.get().get(HadoopAccessorService.class)
+ .createFileSystem(launcherConf.get("user.name"),recoveryFile.toUri(), launcherConf);
+
+ if (fs.exists(recoveryFile)) {
+ InputStream is = fs.open(recoveryFile);
+ BufferedReader reader = new BufferedReader(new InputStreamReader(is));
+ jobId = reader.readLine();
+ reader.close();
+ }
+ return jobId;
+
+ }
+
+ public static void setupMainClass(Configuration launcherConf, String javaMainClass) {
+ // Only set the javaMainClass if its not null or empty string, this way the user can override the action's main class via
+ // <configuration> property
+ if (javaMainClass != null && !javaMainClass.equals("")) {
+ launcherConf.set(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, javaMainClass);
+ }
+ }
+
+ public static void setupLauncherURIHandlerConf(Configuration launcherConf) {
+ for(Map.Entry<String, String> entry : Services.get().get(URIHandlerService.class).getLauncherConfig()) {
+ launcherConf.set(entry.getKey(), entry.getValue());
+ }
+ }
+
+ public static void setupMainArguments(Configuration launcherConf, String[] args) {
+ launcherConf.setInt(LauncherMapper.CONF_OOZIE_ACTION_MAIN_ARG_COUNT, args.length);
+ for (int i = 0; i < args.length; i++) {
+ launcherConf.set(LauncherMapper.CONF_OOZIE_ACTION_MAIN_ARG_PREFIX + i, args[i]);
+ }
+ }
+
+ public static void setupMaxOutputData(Configuration launcherConf, int maxOutputData) {
+ launcherConf.setInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, maxOutputData);
+ }
+
+ /**
+ * Set the maximum value of stats data
+ *
+ * @param launcherConf the oozie launcher configuration
+ * @param maxStatsData the maximum allowed size of stats data
+ */
+ public static void setupMaxExternalStatsSize(Configuration launcherConf, int maxStatsData){
+ launcherConf.setInt(LauncherMapper.CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE, maxStatsData);
+ }
+
+ /**
+ * Set the maximum number of globbed files/dirs
+ *
+ * @param launcherConf the oozie launcher configuration
+ * @param fsGlobMax the maximum number of files/dirs for FS operation
+ */
+ public static void setupMaxFSGlob(Configuration launcherConf, int fsGlobMax){
+ launcherConf.setInt(LauncherMapper.CONF_OOZIE_ACTION_FS_GLOB_MAX, fsGlobMax);
+ }
+
+ public static void setupLauncherInfo(Configuration launcherConf, String jobId, String actionId, Path actionDir,
+ String recoveryId, Configuration actionConf, String prepareXML) throws IOException, HadoopAccessorException {
+
+ launcherConf.set(LauncherMapper.OOZIE_JOB_ID, jobId);
+ launcherConf.set(LauncherMapper.OOZIE_ACTION_ID, actionId);
+ launcherConf.set(LauncherMapper.OOZIE_ACTION_DIR_PATH, actionDir.toString());
+ launcherConf.set(LauncherMapper.OOZIE_ACTION_RECOVERY_ID, recoveryId);
+ launcherConf.set(LauncherMapper.ACTION_PREPARE_XML, prepareXML);
+
+ actionConf.set(LauncherMapper.OOZIE_JOB_ID, jobId);
+ actionConf.set(LauncherMapper.OOZIE_ACTION_ID, actionId);
+
+ if (Services.get().getConf().getBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", false)) {
+ List<String> purgedEntries = new ArrayList<String>();
+ Collection<String> entries = actionConf.getStringCollection("mapreduce.job.cache.files");
+ for (String entry : entries) {
+ if (entry.contains("#")) {
+ purgedEntries.add(entry);
+ }
+ }
+ actionConf.setStrings("mapreduce.job.cache.files", purgedEntries.toArray(new String[purgedEntries.size()]));
+ launcherConf.setBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", true);
+ }
+ }
+
+ public static void setupYarnRestartHandling(Configuration launcherJobConf, Configuration actionConf, String launcherTag,
+ long launcherTime)
+ throws NoSuchAlgorithmException {
+ launcherJobConf.setLong(LauncherMain.OOZIE_JOB_LAUNCH_TIME, launcherTime);
+ // Tags are limited to 100 chars so we need to hash them to make sure (the actionId otherwise doesn't have a max length)
+ String tag = getTag(launcherTag);
+ // keeping the oozie.child.mapreduce.job.tags instead of mapreduce.job.tags to avoid killing launcher itself.
+ // mapreduce.job.tags should only go to child job launch by launcher.
+ actionConf.set(LauncherMain.CHILD_MAPREDUCE_JOB_TAGS, tag);
+ }
+
+ public static String getTag(String launcherTag) throws NoSuchAlgorithmException {
+ MessageDigest digest = MessageDigest.getInstance("MD5");
+ digest.update(launcherTag.getBytes(), 0, launcherTag.length());
+ String md5 = "oozie-" + new BigInteger(1, digest.digest()).toString(16);
+ return md5;
+ }
+
+ public static boolean isMainDone(RunningJob runningJob) throws IOException {
+ return runningJob.isComplete();
+ }
+
+ public static boolean isMainSuccessful(RunningJob runningJob) throws IOException {
+ boolean succeeded = runningJob.isSuccessful();
+ if (succeeded) {
+ Counters counters = runningJob.getCounters();
+ if (counters != null) {
+ Counters.Group group = counters.getGroup(LauncherMapper.COUNTER_GROUP);
+ if (group != null) {
+ succeeded = group.getCounter(LauncherMapper.COUNTER_LAUNCHER_ERROR) == 0;
+ }
+ }
+ }
+ return succeeded;
+ }
+
+ /**
+ * Determine whether action has external child jobs or not
+ * @param actionData
+ * @return true/false
+ * @throws IOException
+ */
+ public static boolean hasExternalChildJobs(Map<String, String> actionData) throws IOException {
+ return actionData.containsKey(LauncherMapper.ACTION_DATA_EXTERNAL_CHILD_IDS);
+ }
+
+ /**
+ * Determine whether action has output data or not
+ * @param actionData
+ * @return true/false
+ * @throws IOException
+ */
+ public static boolean hasOutputData(Map<String, String> actionData) throws IOException {
+ return actionData.containsKey(LauncherMapper.ACTION_DATA_OUTPUT_PROPS);
+ }
+
+ /**
+ * Determine whether action has external stats or not
+ * @param actionData
+ * @return true/false
+ * @throws IOException
+ */
+ public static boolean hasStatsData(Map<String, String> actionData) throws IOException{
+ return actionData.containsKey(LauncherMapper.ACTION_DATA_STATS);
+ }
+
+ /**
+ * Determine whether action has new id (id swap) or not
+ * @param actionData
+ * @return true/false
+ * @throws IOException
+ */
+ public static boolean hasIdSwap(Map<String, String> actionData) throws IOException {
+ return actionData.containsKey(LauncherMapper.ACTION_DATA_NEW_ID);
+ }
+
+ /**
+ * Get the sequence file path storing all action data
+ * @param actionDir
+ * @return
+ */
+ public static Path getActionDataSequenceFilePath(Path actionDir) {
+ return new Path(actionDir, LauncherMapper.ACTION_DATA_SEQUENCE_FILE);
+ }
+
+ /**
+ * Utility function to load the contents of action data sequence file into
+ * memory object
+ *
+ * @param fs Action Filesystem
+ * @param actionDir Path
+ * @param conf Configuration
+ * @return Map action data
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public static Map<String, String> getActionData(final FileSystem fs, final Path actionDir, final Configuration conf)
+ throws IOException, InterruptedException {
+ UserGroupInformationService ugiService = Services.get().get(UserGroupInformationService.class);
+ UserGroupInformation ugi = ugiService.getProxyUser(conf.get(OozieClient.USER_NAME));
+
+ return ugi.doAs(new PrivilegedExceptionAction<Map<String, String>>() {
+ @Override
+ public Map<String, String> run() throws IOException {
+ Map<String, String> ret = new HashMap<String, String>();
+ Path seqFilePath = getActionDataSequenceFilePath(actionDir);
+ if (fs.exists(seqFilePath)) {
+ SequenceFile.Reader seqFile = new SequenceFile.Reader(fs, seqFilePath, conf);
+ Text key = new Text(), value = new Text();
+ while (seqFile.next(key, value)) {
+ ret.put(key.toString(), value.toString());
+ }
+ seqFile.close();
+ }
+ else { // maintain backward-compatibility. to be deprecated
+ org.apache.hadoop.fs.FileStatus[] files = fs.listStatus(actionDir);
+ InputStream is;
+ BufferedReader reader = null;
+ Properties props;
+ if (files != null && files.length > 0) {
+ for (int x = 0; x < files.length; x++) {
+ Path file = files[x].getPath();
+ if (file.equals(new Path(actionDir, "externalChildIds.properties"))) {
+ is = fs.open(file);
+ reader = new BufferedReader(new InputStreamReader(is));
+ ret.put(LauncherMapper.ACTION_DATA_EXTERNAL_CHILD_IDS,
+ IOUtils.getReaderAsString(reader, -1));
+ }
+ else if (file.equals(new Path(actionDir, "newId.properties"))) {
+ is = fs.open(file);
+ reader = new BufferedReader(new InputStreamReader(is));
+ props = PropertiesUtils.readProperties(reader, -1);
+ ret.put(LauncherMapper.ACTION_DATA_NEW_ID, props.getProperty("id"));
+ }
+ else if (file.equals(new Path(actionDir, LauncherMapper.ACTION_DATA_OUTPUT_PROPS))) {
+ int maxOutputData = conf.getInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA,
+ 2 * 1024);
+ is = fs.open(file);
+ reader = new BufferedReader(new InputStreamReader(is));
+ ret.put(LauncherMapper.ACTION_DATA_OUTPUT_PROPS, PropertiesUtils
+ .propertiesToString(PropertiesUtils.readProperties(reader, maxOutputData)));
+ }
+ else if (file.equals(new Path(actionDir, LauncherMapper.ACTION_DATA_STATS))) {
+ int statsMaxOutputData = conf.getInt(LauncherMapper.CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE,
+ Integer.MAX_VALUE);
+ is = fs.open(file);
+ reader = new BufferedReader(new InputStreamReader(is));
+ ret.put(LauncherMapper.ACTION_DATA_STATS, PropertiesUtils
+ .propertiesToString(PropertiesUtils.readProperties(reader, statsMaxOutputData)));
+ }
+ else if (file.equals(new Path(actionDir, LauncherMapper.ACTION_DATA_ERROR_PROPS))) {
+ is = fs.open(file);
+ reader = new BufferedReader(new InputStreamReader(is));
+ ret.put(LauncherMapper.ACTION_DATA_ERROR_PROPS, IOUtils.getReaderAsString(reader, -1));
+ }
+ }
+ }
+ }
+ return ret;
+ }
+ });
+ }
+
+ public static String getActionYarnTag(Configuration conf, String parentId, WorkflowAction wfAction) {
+ String tag;
+ if ( conf != null && conf.get(OOZIE_ACTION_YARN_TAG) != null) {
+ tag = conf.get(OOZIE_ACTION_YARN_TAG) + "@" + wfAction.getName();
+ } else if (parentId != null) {
+ tag = parentId + "@" + wfAction.getName();
+ } else {
+ tag = wfAction.getId();
+ }
+ return tag;
+ }
+
+}