You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by rk...@apache.org on 2016/07/26 01:25:09 UTC
[3/3] oozie git commit: OOZIE-2590 OYA: Create basic Oozie Launcher
Application Master (rkanter)
OOZIE-2590 OYA: Create basic Oozie Launcher Application Master (rkanter)
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/fea512cf
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/fea512cf
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/fea512cf
Branch: refs/heads/oya
Commit: fea512cf66aec92d867e13c200978fd103868ab1
Parents: a37835f
Author: Robert Kanter <rk...@cloudera.com>
Authored: Mon Jul 25 18:24:35 2016 -0700
Committer: Robert Kanter <rk...@cloudera.com>
Committed: Mon Jul 25 18:24:35 2016 -0700
----------------------------------------------------------------------
core/pom.xml | 17 -
.../action/hadoop/DistcpActionExecutor.java | 7 +-
.../action/hadoop/Hive2ActionExecutor.java | 5 +-
.../oozie/action/hadoop/HiveActionExecutor.java | 5 +-
.../oozie/action/hadoop/JavaActionExecutor.java | 593 ++++++++---------
.../action/hadoop/LauncherMapperHelper.java | 12 -
.../action/hadoop/SparkActionExecutor.java | 5 +-
.../action/hadoop/SqoopActionExecutor.java | 9 +-
.../oozie/service/HadoopAccessorService.java | 97 ++-
.../org/apache/oozie/util/ClasspathUtils.java | 145 +++++
core/src/main/resources/oozie-default.xml | 25 -
.../java/org/apache/oozie/QueryServlet.java | 40 ++
.../action/hadoop/TestJavaActionExecutor.java | 531 ++--------------
.../oozie/action/hadoop/TestLauncherAM.java | 46 ++
.../hadoop/TestLauncherAMCallbackNotifier.java | 170 +++++
.../action/hadoop/TestPrepareActionsDriver.java | 15 +-
.../action/hadoop/TestShellActionExecutor.java | 21 -
.../apache/oozie/command/wf/HangServlet.java | 19 +-
.../oozie/service/TestConfigurationService.java | 3 -
.../service/TestHadoopAccessorService.java | 121 +++-
.../java/org/apache/oozie/test/XTestCase.java | 4 +
.../apache/oozie/util/TestClasspathUtils.java | 110 ++++
release-log.txt | 1 +
sharelib/distcp/pom.xml | 12 -
sharelib/hcatalog/pom.xml | 12 -
sharelib/hive/pom.xml | 12 -
sharelib/hive2/pom.xml | 12 -
sharelib/oozie/pom.xml | 12 -
.../apache/oozie/action/hadoop/LauncherAM.java | 636 +++++++++++++++++++
.../hadoop/LauncherAMCallbackNotifier.java | 175 +++++
.../oozie/action/hadoop/LauncherMapper.java | 6 +-
.../action/hadoop/PrepareActionsDriver.java | 43 +-
sharelib/pig/pom.xml | 12 -
sharelib/spark/pom.xml | 12 -
sharelib/sqoop/pom.xml | 12 -
sharelib/streaming/pom.xml | 33 -
36 files changed, 1899 insertions(+), 1091 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
index 6584af8..86feea0 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -453,23 +453,6 @@
</configuration>
</plugin>
<plugin>
- <artifactId>maven-dependency-plugin</artifactId>
- <executions>
- <execution>
- <id>create-mrapp-generated-classpath</id>
- <phase>generate-test-resources</phase>
- <goals>
- <goal>build-classpath</goal>
- </goals>
- <configuration>
- <!-- needed to run the unit test for DS to generate the required classpath
- that is required in the env of the launch container in the mini mr/yarn cluster -->
- <outputFile>${project.build.directory}/test-classes/mrapp-generated-classpath</outputFile>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <plugin>
<groupId>org.apache.openjpa</groupId>
<artifactId>openjpa-maven-plugin</artifactId>
<executions>
http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/main/java/org/apache/oozie/action/hadoop/DistcpActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/DistcpActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/DistcpActionExecutor.java
index 96726da..99652e8 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/DistcpActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/DistcpActionExecutor.java
@@ -26,13 +26,10 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.RunningJob;
import org.apache.oozie.action.ActionExecutorException;
-import org.apache.oozie.action.ActionExecutor.Context;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.service.ConfigurationService;
import org.apache.oozie.service.HadoopAccessorException;
-import org.apache.oozie.service.Services;
import org.apache.oozie.util.XLog;
import org.jdom.Element;
import org.jdom.JDOMException;
@@ -126,9 +123,9 @@ public class DistcpActionExecutor extends JavaActionExecutor{
}
@Override
- protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context)
+ protected void getActionData(FileSystem actionFs, WorkflowAction action, Context context)
throws HadoopAccessorException, JDOMException, IOException, URISyntaxException {
- super.getActionData(actionFs, runningJob, action, context);
+ super.getActionData(actionFs, action, context);
readExternalChildIDs(action, context);
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/main/java/org/apache/oozie/action/hadoop/Hive2ActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/Hive2ActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/Hive2ActionExecutor.java
index b5b1bf9..9ba6318 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/Hive2ActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/Hive2ActionExecutor.java
@@ -28,7 +28,6 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.RunningJob;
import org.apache.oozie.action.ActionExecutorException;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.service.HadoopAccessorException;
@@ -134,9 +133,9 @@ public class Hive2ActionExecutor extends ScriptLanguageActionExecutor {
}
@Override
- protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context)
+ protected void getActionData(FileSystem actionFs, WorkflowAction action, Context context)
throws HadoopAccessorException, JDOMException, IOException, URISyntaxException {
- super.getActionData(actionFs, runningJob, action, context);
+ super.getActionData(actionFs, action, context);
readExternalChildIDs(action, context);
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/main/java/org/apache/oozie/action/hadoop/HiveActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/HiveActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/HiveActionExecutor.java
index c74e9e6..a850957 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/HiveActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/HiveActionExecutor.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RunningJob;
import org.apache.oozie.action.ActionExecutorException;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.XOozieClient;
@@ -125,9 +124,9 @@ public class HiveActionExecutor extends ScriptLanguageActionExecutor {
}
@Override
- protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context)
+ protected void getActionData(FileSystem actionFs, WorkflowAction action, Context context)
throws HadoopAccessorException, JDOMException, IOException, URISyntaxException {
- super.getActionData(actionFs, runningJob, action, context);
+ super.getActionData(actionFs, action, context);
readExternalChildIDs(action, context);
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
index 99e3344..d573fc3 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
@@ -18,42 +18,38 @@
package org.apache.oozie.action.hadoop;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.StringReader;
-import java.net.ConnectException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.UnknownHostException;
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.Set;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
+import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.AccessControlException;
+import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.DiskChecker;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.action.ActionExecutor;
@@ -69,18 +65,41 @@ import org.apache.oozie.service.Services;
import org.apache.oozie.service.ShareLibService;
import org.apache.oozie.service.URIHandlerService;
import org.apache.oozie.service.WorkflowAppService;
+import org.apache.oozie.util.ClasspathUtils;
import org.apache.oozie.util.ELEvaluationException;
import org.apache.oozie.util.ELEvaluator;
-import org.apache.oozie.util.XLog;
-import org.apache.oozie.util.XConfiguration;
-import org.apache.oozie.util.XmlUtils;
import org.apache.oozie.util.JobUtils;
import org.apache.oozie.util.LogUtils;
import org.apache.oozie.util.PropertiesUtils;
+import org.apache.oozie.util.XConfiguration;
+import org.apache.oozie.util.XLog;
+import org.apache.oozie.util.XmlUtils;
import org.jdom.Element;
import org.jdom.JDOMException;
import org.jdom.Namespace;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.StringReader;
+import java.net.ConnectException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
public class JavaActionExecutor extends ActionExecutor {
@@ -94,7 +113,6 @@ public class JavaActionExecutor extends ActionExecutor {
public static final String ACL_VIEW_JOB = "mapreduce.job.acl-view-job";
public static final String ACL_MODIFY_JOB = "mapreduce.job.acl-modify-job";
public static final String HADOOP_YARN_TIMELINE_SERVICE_ENABLED = "yarn.timeline-service.enabled";
- public static final String HADOOP_YARN_UBER_MODE = "mapreduce.job.ubertask.enable";
public static final String HADOOP_YARN_KILL_CHILD_JOBS_ON_AMRESTART = "oozie.action.launcher.am.restart.kill.childjobs";
public static final String HADOOP_MAP_MEMORY_MB = "mapreduce.map.memory.mb";
public static final String HADOOP_CHILD_JAVA_OPTS = "mapred.child.java.opts";
@@ -117,7 +135,6 @@ public class JavaActionExecutor extends ActionExecutor {
protected XLog LOG = XLog.getLog(getClass());
private static final Pattern heapPattern = Pattern.compile("-Xmx(([0-9]+)[mMgG])");
private static final String JAVA_TMP_DIR_SETTINGS = "-Djava.io.tmpdir=";
- public static final String CONF_HADOOP_YARN_UBER_MODE = "oozie.action.launcher." + HADOOP_YARN_UBER_MODE;
public static final String HADOOP_JOB_CLASSLOADER = "mapreduce.job.classloader";
public static final String HADOOP_USER_CLASSPATH_FIRST = "mapreduce.user.classpath.first";
public static final String OOZIE_CREDENTIALS_SKIP = "oozie.credentials.skip";
@@ -138,10 +155,11 @@ public class JavaActionExecutor extends ActionExecutor {
public static List<Class> getCommonLauncherClasses() {
List<Class> classes = new ArrayList<Class>();
- classes.add(LauncherMapper.class);
classes.add(OozieLauncherInputFormat.class);
classes.add(LauncherMain.class);
classes.addAll(Services.get().get(URIHandlerService.class).getClassesForLauncher());
+ classes.add(LauncherAM.class);
+ classes.add(LauncherAMCallbackNotifier.class);
return classes;
}
@@ -159,7 +177,7 @@ public class JavaActionExecutor extends ActionExecutor {
@Override
public void initActionType() {
super.initActionType();
- maxActionOutputLen = ConfigurationService.getInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA);
+ maxActionOutputLen = ConfigurationService.getInt(LauncherAM.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA);
//Get the limit for the maximum allowed size of action stats
maxExternalStatsSize = ConfigurationService.getInt(JavaActionExecutor.MAX_EXTERNAL_STATS_SIZE);
maxExternalStatsSize = (maxExternalStatsSize == -1) ? Integer.MAX_VALUE : maxExternalStatsSize;
@@ -257,8 +275,6 @@ public class JavaActionExecutor extends ActionExecutor {
} catch (URISyntaxException ex) {
throw convertException(ex);
}
- // Inject use uber mode for launcher
- injectLauncherUseUberMode(launcherConf);
XConfiguration.copy(launcherConf, conf);
checkForDisallowedProps(launcherConf, "launcher configuration");
// Inject config-class for launcher to use for action
@@ -273,25 +289,6 @@ public class JavaActionExecutor extends ActionExecutor {
}
}
- void injectLauncherUseUberMode(Configuration launcherConf) {
- // Set Uber Mode for the launcher (YARN only, ignored by MR1)
- // Priority:
- // 1. action's <configuration>
- // 2. oozie.action.#action-type#.launcher.mapreduce.job.ubertask.enable
- // 3. oozie.action.launcher.mapreduce.job.ubertask.enable
- if (launcherConf.get(HADOOP_YARN_UBER_MODE) == null) {
- if (ConfigurationService.get("oozie.action." + getType() + ".launcher." + HADOOP_YARN_UBER_MODE).length() > 0) {
- if (ConfigurationService.getBoolean("oozie.action." + getType() + ".launcher." + HADOOP_YARN_UBER_MODE)) {
- launcherConf.setBoolean(HADOOP_YARN_UBER_MODE, true);
- }
- } else {
- if (ConfigurationService.getBoolean("oozie.action.launcher." + HADOOP_YARN_UBER_MODE)) {
- launcherConf.setBoolean(HADOOP_YARN_UBER_MODE, true);
- }
- }
- }
- }
-
void injectLauncherTimelineServiceEnabled(Configuration launcherConf, Configuration actionConf) {
// Getting delegation token for ATS. If tez-site.xml is present in distributed cache, turn on timeline service.
if (actionConf.get("oozie.launcher." + HADOOP_YARN_TIMELINE_SERVICE_ENABLED) == null
@@ -303,104 +300,6 @@ public class JavaActionExecutor extends ActionExecutor {
}
}
- void updateConfForUberMode(Configuration launcherConf) {
-
- // child.env
- boolean hasConflictEnv = false;
- String launcherMapEnv = launcherConf.get(HADOOP_MAP_JAVA_ENV);
- if (launcherMapEnv == null) {
- launcherMapEnv = launcherConf.get(HADOOP_CHILD_JAVA_ENV);
- }
- String amEnv = launcherConf.get(YARN_AM_ENV);
- StringBuffer envStr = new StringBuffer();
- HashMap<String, List<String>> amEnvMap = null;
- HashMap<String, List<String>> launcherMapEnvMap = null;
- if (amEnv != null) {
- envStr.append(amEnv);
- amEnvMap = populateEnvMap(amEnv);
- }
- if (launcherMapEnv != null) {
- launcherMapEnvMap = populateEnvMap(launcherMapEnv);
- if (amEnvMap != null) {
- Iterator<String> envKeyItr = launcherMapEnvMap.keySet().iterator();
- while (envKeyItr.hasNext()) {
- String envKey = envKeyItr.next();
- if (amEnvMap.containsKey(envKey)) {
- List<String> amValList = amEnvMap.get(envKey);
- List<String> launcherValList = launcherMapEnvMap.get(envKey);
- Iterator<String> valItr = launcherValList.iterator();
- while (valItr.hasNext()) {
- String val = valItr.next();
- if (!amValList.contains(val)) {
- hasConflictEnv = true;
- break;
- }
- else {
- valItr.remove();
- }
- }
- if (launcherValList.isEmpty()) {
- envKeyItr.remove();
- }
- }
- }
- }
- }
- if (hasConflictEnv) {
- launcherConf.setBoolean(HADOOP_YARN_UBER_MODE, false);
- }
- else {
- if (launcherMapEnvMap != null) {
- for (String key : launcherMapEnvMap.keySet()) {
- List<String> launcherValList = launcherMapEnvMap.get(key);
- for (String val : launcherValList) {
- if (envStr.length() > 0) {
- envStr.append(",");
- }
- envStr.append(key).append("=").append(val);
- }
- }
- }
-
- launcherConf.set(YARN_AM_ENV, envStr.toString());
-
- // memory.mb
- int launcherMapMemoryMB = launcherConf.getInt(HADOOP_MAP_MEMORY_MB, 1536);
- int amMemoryMB = launcherConf.getInt(YARN_AM_RESOURCE_MB, 1536);
- // YARN_MEMORY_MB_MIN to provide buffer.
- // suppose launcher map aggressively use high memory, need some
- // headroom for AM
- int memoryMB = Math.max(launcherMapMemoryMB, amMemoryMB) + YARN_MEMORY_MB_MIN;
- // limit to 4096 in case of 32 bit
- if (launcherMapMemoryMB < 4096 && amMemoryMB < 4096 && memoryMB > 4096) {
- memoryMB = 4096;
- }
- launcherConf.setInt(YARN_AM_RESOURCE_MB, memoryMB);
-
- // We already made mapred.child.java.opts and
- // mapreduce.map.java.opts equal, so just start with one of them
- String launcherMapOpts = launcherConf.get(HADOOP_MAP_JAVA_OPTS, "");
- String amChildOpts = launcherConf.get(YARN_AM_COMMAND_OPTS);
- StringBuilder optsStr = new StringBuilder();
- int heapSizeForMap = extractHeapSizeMB(launcherMapOpts);
- int heapSizeForAm = extractHeapSizeMB(amChildOpts);
- int heapSize = Math.max(heapSizeForMap, heapSizeForAm) + YARN_MEMORY_MB_MIN;
- // limit to 3584 in case of 32 bit
- if (heapSizeForMap < 4096 && heapSizeForAm < 4096 && heapSize > 3584) {
- heapSize = 3584;
- }
- if (amChildOpts != null) {
- optsStr.append(amChildOpts);
- }
- optsStr.append(" ").append(launcherMapOpts.trim());
- if (heapSize > 0) {
- // append calculated total heap size to the end
- optsStr.append(" ").append("-Xmx").append(heapSize).append("m");
- }
- launcherConf.set(YARN_AM_COMMAND_OPTS, optsStr.toString().trim());
- }
- }
-
void updateConfForJavaTmpDir(Configuration conf) {
String amChildOpts = conf.get(YARN_AM_COMMAND_OPTS);
String oozieJavaTmpDirSetting = "-Djava.io.tmpdir=./tmp";
@@ -868,7 +767,7 @@ public class JavaActionExecutor extends ActionExecutor {
protected String getLauncherMain(Configuration launcherConf, Element actionXml) {
- return launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, JavaMain.class.getName());
+ return launcherConf.get(LauncherAM.CONF_OOZIE_ACTION_MAIN_CLASS, JavaMain.class.getName());
}
private void setJavaMain(Configuration actionConf, Element actionXml) {
@@ -1004,15 +903,6 @@ public class JavaActionExecutor extends ActionExecutor {
launcherJobConf.set(HADOOP_CHILD_JAVA_OPTS, opts.toString().trim());
launcherJobConf.set(HADOOP_MAP_JAVA_OPTS, opts.toString().trim());
- // setting for uber mode
- if (launcherJobConf.getBoolean(HADOOP_YARN_UBER_MODE, false)) {
- if (checkPropertiesToDisableUber(launcherJobConf)) {
- launcherJobConf.setBoolean(HADOOP_YARN_UBER_MODE, false);
- }
- else {
- updateConfForUberMode(launcherJobConf);
- }
- }
updateConfForJavaTmpDir(launcherJobConf);
injectLauncherTimelineServiceEnabled(launcherJobConf, actionConf);
@@ -1027,23 +917,9 @@ public class JavaActionExecutor extends ActionExecutor {
}
}
- private boolean checkPropertiesToDisableUber(Configuration launcherConf) {
- boolean disable = false;
- if (launcherConf.getBoolean(HADOOP_JOB_CLASSLOADER, false)) {
- disable = true;
- }
- else if (launcherConf.getBoolean(HADOOP_USER_CLASSPATH_FIRST, false)) {
- disable = true;
- }
- return disable;
- }
-
private void injectCallback(Context context, Configuration conf) {
- String callback = context.getCallbackUrl("$jobStatus");
- if (conf.get("job.end.notification.url") != null) {
- LOG.warn("Overriding the action job end notification URI");
- }
- conf.set("job.end.notification.url", callback);
+ String callback = context.getCallbackUrl(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_JOBSTATUS_TOKEN);
+ conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_URL, callback);
}
void injectActionCallback(Context context, Configuration actionConf) {
@@ -1062,7 +938,7 @@ public class JavaActionExecutor extends ActionExecutor {
}
}
- public void submitLauncher(FileSystem actionFs, Context context, WorkflowAction action) throws ActionExecutorException {
+ public void submitLauncher(FileSystem actionFs, final Context context, WorkflowAction action) throws ActionExecutorException {
JobClient jobClient = null;
boolean exception = false;
try {
@@ -1119,14 +995,17 @@ public class JavaActionExecutor extends ActionExecutor {
}
}
}
-
JobConf launcherJobConf = createLauncherConf(actionFs, context, action, actionXml, actionConf);
- LOG.debug("Creating Job Client for action " + action.getId());
- jobClient = createJobClient(context, launcherJobConf);
- String launcherId = LauncherMapperHelper.getRecoveryId(launcherJobConf, context.getActionDir(), context
- .getRecoveryId());
- boolean alreadyRunning = launcherId != null;
+ boolean alreadyRunning = false;
+ String launcherId = null;
+ String consoleUrl = null;
+ // TODO: OYA: equivalent of this? (recovery, alreadyRunning) When does this happen?
+// LOG.debug("Creating Job Client for action " + action.getId());
+// jobClient = createJobClient(context, launcherJobConf);
+// launcherId = LauncherMapperHelper.getRecoveryId(launcherJobConf, context.getActionDir(), context
+// .getRecoveryId());
+// alreadyRunning = launcherId != null;
RunningJob runningJob;
// if user-retry is on, always submit new launcher
@@ -1141,13 +1020,13 @@ public class JavaActionExecutor extends ActionExecutor {
}
}
else {
- LOG.debug("Submitting the job through Job Client for action " + action.getId());
-
- // setting up propagation of the delegation token.
- HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
- Token<DelegationTokenIdentifier> mrdt = jobClient.getDelegationToken(has
- .getMRDelegationTokenRenewer(launcherJobConf));
- launcherJobConf.getCredentials().addToken(HadoopAccessorService.MR_TOKEN_ALIAS, mrdt);
+ // TODO: OYA: do we actually need an MR token? IIRC, it's issued by the JHS
+// // setting up propagation of the delegation token.
+// Token<DelegationTokenIdentifier> mrdt = null;
+// HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
+// mrdt = jobClient.getDelegationToken(has
+// .getMRDelegationTokenRenewer(launcherJobConf));
+// launcherJobConf.getCredentials().addToken(HadoopAccessorService.MR_TOKEN_ALIAS, mrdt);
// insert credentials tokens to launcher job conf if needed
if (needInjectCredentials() && credentialsConf != null) {
@@ -1173,17 +1052,36 @@ public class JavaActionExecutor extends ActionExecutor {
else {
LOG.info("No need to inject credentials.");
}
- runningJob = jobClient.submitJob(launcherJobConf);
- if (runningJob == null) {
- throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA017",
- "Error submitting launcher for action [{0}]", action.getId());
+
+ YarnClient yarnClient = null;
+ try {
+ String user = context.getWorkflow().getUser();
+
+ // Create application
+ yarnClient = createYarnClient(context, launcherJobConf);
+ YarnClientApplication newApp = yarnClient.createApplication();
+ ApplicationId appId = newApp.getNewApplicationResponse().getApplicationId();
+
+ // Create launch context for app master
+ ApplicationSubmissionContext appContext =
+ createAppSubmissionContext(appId, launcherJobConf, user, context, actionConf);
+
+ // Submit the launcher AM
+ yarnClient.submitApplication(appContext);
+
+ launcherId = appId.toString();
+ LOG.debug("After submission get the launcherId [{0}]", launcherId);
+ ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+ consoleUrl = appReport.getTrackingUrl();
+ } finally {
+ if (yarnClient != null) {
+ yarnClient.close();
+ yarnClient = null;
+ }
}
- launcherId = runningJob.getID().toString();
- LOG.debug("After submission get the launcherId " + launcherId);
}
String jobTracker = launcherJobConf.get(HADOOP_YARN_RM);
- String consoleUrl = runningJob.getTrackingURL();
context.setStartData(launcherId, jobTracker, consoleUrl);
}
catch (Exception ex) {
@@ -1206,6 +1104,91 @@ public class JavaActionExecutor extends ActionExecutor {
}
}
}
+
+ private ApplicationSubmissionContext createAppSubmissionContext(ApplicationId appId, JobConf launcherJobConf, String user,
+ Context context, Configuration actionConf)
+ throws IOException, HadoopAccessorException, URISyntaxException {
+ // Create launch context for app master
+ ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class);
+
+ // set the application id
+ appContext.setApplicationId(appId);
+
+ // set the application name
+ appContext.setApplicationName(launcherJobConf.getJobName());
+ appContext.setApplicationType("Oozie Launcher");
+
+ // Set the priority for the application master
+ Priority pri = Records.newRecord(Priority.class);
+ int priority = 0; // TODO: OYA: Add a constant or a config
+ pri.setPriority(priority);
+ appContext.setPriority(pri);
+
+ // Set the queue to which this application is to be submitted in the RM
+ appContext.setQueue(launcherJobConf.getQueueName());
+
+ // Set up the container launch context for the application master
+ ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
+
+ // Set the resources to localize
+ Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+ ClientDistributedCacheManager.determineTimestampsAndCacheVisibilities(launcherJobConf);
+ MRApps.setupDistributedCache(launcherJobConf, localResources);
+ // Add the Launcher and Action configs as Resources
+ HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
+ LocalResource launcherJobConfLR = has.createLocalResourceForConfigurationFile(LauncherAM.LAUNCHER_JOB_CONF_XML, user,
+ launcherJobConf, context.getAppFileSystem().getUri(), context.getActionDir());
+ localResources.put(LauncherAM.LAUNCHER_JOB_CONF_XML, launcherJobConfLR);
+ LocalResource actionConfLR = has.createLocalResourceForConfigurationFile(LauncherAM.ACTION_CONF_XML, user, actionConf,
+ context.getAppFileSystem().getUri(), context.getActionDir());
+ localResources.put(LauncherAM.ACTION_CONF_XML, actionConfLR);
+ amContainer.setLocalResources(localResources);
+
+ // Set the environment variables
+ Map<String, String> env = new HashMap<String, String>();
+ // This adds the Hadoop jars to the classpath in the Launcher JVM
+ ClasspathUtils.setupClasspath(env, launcherJobConf);
+ if (false) { // TODO: OYA: config to add MR jars? Probably also needed for MR Action
+ ClasspathUtils.addMapReduceToClasspath(env, launcherJobConf);
+ }
+ amContainer.setEnvironment(env);
+
+ // Set the command
+ List<String> vargs = new ArrayList<String>(6);
+ vargs.add(MRApps.crossPlatformifyMREnv(launcherJobConf, ApplicationConstants.Environment.JAVA_HOME)
+ + "/bin/java");
+ // TODO: OYA: remove attach debugger to AM; useful for debugging
+// vargs.add("-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005");
+ MRApps.addLog4jSystemProperties("INFO", 1024 * 1024, 0, vargs);
+ vargs.add(LauncherAM.class.getName());
+ vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
+ Path.SEPARATOR + ApplicationConstants.STDOUT);
+ vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
+ Path.SEPARATOR + ApplicationConstants.STDERR);
+ List<String> vargsFinal = new ArrayList<String>(6);
+ StringBuilder mergedCommand = new StringBuilder();
+ for (CharSequence str : vargs) {
+ mergedCommand.append(str).append(" ");
+ }
+ vargsFinal.add(mergedCommand.toString());
+ LOG.debug("Command to launch container for ApplicationMaster is : "
+ + mergedCommand);
+ amContainer.setCommands(vargsFinal);
+ appContext.setAMContainerSpec(amContainer);
+
+ // Set tokens
+ DataOutputBuffer dob = new DataOutputBuffer();
+ launcherJobConf.getCredentials().writeTokenStorageToStream(dob);
+ amContainer.setTokens(ByteBuffer.wrap(dob.getData(), 0, dob.getLength()));
+
+ // Set Resources
+ // TODO: OYA: make resources allocated for the AM configurable and choose good defaults (memory MB, vcores)
+ Resource resource = Resource.newInstance(2048, 1);
+ appContext.setResource(resource);
+
+ return appContext;
+ }
+
private boolean needInjectCredentials() {
boolean methodExists = true;
@@ -1409,6 +1392,19 @@ public class JavaActionExecutor extends ActionExecutor {
return Services.get().get(HadoopAccessorService.class).createJobClient(user, jobConf);
}
+ /**
+ * Create yarn client object
+ *
+ * @param context
+ * @param jobConf
+ * @return YarnClient
+ * @throws HadoopAccessorException
+ */
+ protected YarnClient createYarnClient(Context context, JobConf jobConf) throws HadoopAccessorException {
+ String user = context.getWorkflow().getUser();
+ return Services.get().get(HadoopAccessorService.class).createYarnClient(user, jobConf);
+ }
+
protected RunningJob getRunningJob(Context context, WorkflowAction action, JobClient jobClient) throws Exception{
RunningJob runningJob = jobClient.getJob(JobID.forName(action.getExternalId()));
return runningJob;
@@ -1425,129 +1421,112 @@ public class JavaActionExecutor extends ActionExecutor {
@Override
public void check(Context context, WorkflowAction action) throws ActionExecutorException {
- JobClient jobClient = null;
- boolean exception = false;
+ boolean fallback = false;
+ LOG = XLog.resetPrefix(LOG);
LogUtils.setLogInfo(action);
+ YarnClient yarnClient = null;
try {
Element actionXml = XmlUtils.parseXml(action.getConf());
- FileSystem actionFs = context.getAppFileSystem();
JobConf jobConf = createBaseHadoopConf(context, actionXml);
- jobClient = createJobClient(context, jobConf);
- RunningJob runningJob = getRunningJob(context, action, jobClient);
- if (runningJob == null) {
- context.setExecutionData(FAILED, null);
- throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017",
- "Could not lookup launched hadoop Job ID [{0}] which was associated with " +
- " action [{1}]. Failing this action!", getActualExternalId(action), action.getId());
- }
- if (runningJob.isComplete()) {
+ FileSystem actionFs = context.getAppFileSystem();
+ yarnClient = createYarnClient(context, jobConf);
+ FinalApplicationStatus appStatus = null;
+ try {
+ ApplicationReport appReport =
+ yarnClient.getApplicationReport(ConverterUtils.toApplicationId(action.getExternalId()));
+ YarnApplicationState appState = appReport.getYarnApplicationState();
+ if (appState == YarnApplicationState.FAILED || appState == YarnApplicationState.FINISHED
+ || appState == YarnApplicationState.KILLED) {
+ appStatus = appReport.getFinalApplicationStatus();
+ }
+
+ } catch (Exception ye) {
+ LOG.debug("Exception occurred while checking Launcher AM status; will try checking action data file instead ", ye);
+ // Fallback to action data file if we can't find the Launcher AM (maybe it got purged)
+ fallback = true;
+ }
+ if (appStatus != null || fallback) {
Path actionDir = context.getActionDir();
String newId = null;
// load sequence file into object
Map<String, String> actionData = LauncherMapperHelper.getActionData(actionFs, actionDir, jobConf);
- if (actionData.containsKey(LauncherMapper.ACTION_DATA_NEW_ID)) {
- newId = actionData.get(LauncherMapper.ACTION_DATA_NEW_ID);
- String launcherId = action.getExternalId();
- runningJob = jobClient.getJob(JobID.forName(newId));
- if (runningJob == null) {
- context.setExternalStatus(FAILED);
+ if (fallback) {
+ String finalStatus = actionData.get(LauncherAM.ACTION_DATA_FINAL_STATUS);
+ if (finalStatus != null) {
+ appStatus = FinalApplicationStatus.valueOf(finalStatus);
+ } else {
+ context.setExecutionData(FAILED, null);
throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017",
- "Unknown hadoop job [{0}] associated with action [{1}]. Failing this action!", newId,
- action.getId());
+ "Unknown hadoop job [{0}] associated with action [{1}] and couldn't determine status from" +
+ " action data. Failing this action!", action.getExternalId(), action.getId());
}
- context.setExternalChildIDs(newId);
- LOG.info(XLog.STD, "External ID swap, old ID [{0}] new ID [{1}]", launcherId,
- newId);
}
- else {
- String externalIDs = actionData.get(LauncherMapper.ACTION_DATA_EXTERNAL_CHILD_IDS);
- if (externalIDs != null) {
- context.setExternalChildIDs(externalIDs);
- LOG.info(XLog.STD, "Hadoop Jobs launched : [{0}]", externalIDs);
- }
+ String externalIDs = actionData.get(LauncherAM.ACTION_DATA_EXTERNAL_CHILD_IDS);
+ if (externalIDs != null) {
+ context.setExternalChildIDs(externalIDs);
+ LOG.info(XLog.STD, "Hadoop Jobs launched : [{0}]", externalIDs);
}
- if (runningJob.isComplete()) {
- // fetching action output and stats for the Map-Reduce action.
- if (newId != null) {
- actionData = LauncherMapperHelper.getActionData(actionFs, context.getActionDir(), jobConf);
+ LOG.info(XLog.STD, "action completed, external ID [{0}]",
+ action.getExternalId());
+ context.setExecutionData(appStatus.toString(), null);
+ if (appStatus == FinalApplicationStatus.SUCCEEDED) {
+ if (getCaptureOutput(action) && LauncherMapperHelper.hasOutputData(actionData)) {
+ context.setExecutionData(SUCCEEDED, PropertiesUtils.stringToProperties(actionData
+ .get(LauncherAM.ACTION_DATA_OUTPUT_PROPS)));
+ LOG.info(XLog.STD, "action produced output");
}
- LOG.info(XLog.STD, "action completed, external ID [{0}]",
- action.getExternalId());
- if (LauncherMapperHelper.isMainSuccessful(runningJob)) {
- if (getCaptureOutput(action) && LauncherMapperHelper.hasOutputData(actionData)) {
- context.setExecutionData(SUCCEEDED, PropertiesUtils.stringToProperties(actionData
- .get(LauncherMapper.ACTION_DATA_OUTPUT_PROPS)));
- LOG.info(XLog.STD, "action produced output");
+ else {
+ context.setExecutionData(SUCCEEDED, null);
+ }
+ if (LauncherMapperHelper.hasStatsData(actionData)) {
+ context.setExecutionStats(actionData.get(LauncherAM.ACTION_DATA_STATS));
+ LOG.info(XLog.STD, "action produced stats");
+ }
+ getActionData(actionFs, action, context);
+ }
+ else {
+ String errorReason;
+ if (actionData.containsKey(LauncherAM.ACTION_DATA_ERROR_PROPS)) {
+ Properties props = PropertiesUtils.stringToProperties(actionData
+ .get(LauncherAM.ACTION_DATA_ERROR_PROPS));
+ String errorCode = props.getProperty("error.code");
+ if ("0".equals(errorCode)) {
+ errorCode = "JA018";
}
- else {
- context.setExecutionData(SUCCEEDED, null);
+ if ("-1".equals(errorCode)) {
+ errorCode = "JA019";
}
- if (LauncherMapperHelper.hasStatsData(actionData)) {
- context.setExecutionStats(actionData.get(LauncherMapper.ACTION_DATA_STATS));
- LOG.info(XLog.STD, "action produced stats");
+ errorReason = props.getProperty("error.reason");
+ LOG.warn("Launcher ERROR, reason: {0}", errorReason);
+ String exMsg = props.getProperty("exception.message");
+ String errorInfo = (exMsg != null) ? exMsg : errorReason;
+ context.setErrorInfo(errorCode, errorInfo);
+ String exStackTrace = props.getProperty("exception.stacktrace");
+ if (exMsg != null) {
+ LOG.warn("Launcher exception: {0}{E}{1}", exMsg, exStackTrace);
}
- getActionData(actionFs, runningJob, action, context);
}
else {
- String errorReason;
- if (actionData.containsKey(LauncherMapper.ACTION_DATA_ERROR_PROPS)) {
- Properties props = PropertiesUtils.stringToProperties(actionData
- .get(LauncherMapper.ACTION_DATA_ERROR_PROPS));
- String errorCode = props.getProperty("error.code");
- if ("0".equals(errorCode)) {
- errorCode = "JA018";
- }
- if ("-1".equals(errorCode)) {
- errorCode = "JA019";
- }
- errorReason = props.getProperty("error.reason");
- LOG.warn("Launcher ERROR, reason: {0}", errorReason);
- String exMsg = props.getProperty("exception.message");
- String errorInfo = (exMsg != null) ? exMsg : errorReason;
- context.setErrorInfo(errorCode, errorInfo);
- String exStackTrace = props.getProperty("exception.stacktrace");
- if (exMsg != null) {
- LOG.warn("Launcher exception: {0}{E}{1}", exMsg, exStackTrace);
- }
- }
- else {
- errorReason = XLog.format("LauncherMapper died, check Hadoop LOG for job [{0}:{1}]", action
- .getTrackerUri(), action.getExternalId());
- LOG.warn(errorReason);
- }
- context.setExecutionData(FAILED_KILLED, null);
+ errorReason = XLog.format("Launcher AM died, check Hadoop LOG for job [{0}:{1}]", action
+ .getTrackerUri(), action.getExternalId());
+ LOG.warn(errorReason);
}
- }
- else {
- context.setExternalStatus("RUNNING");
- LOG.info(XLog.STD, "checking action, hadoop job ID [{0}] status [RUNNING]",
- runningJob.getID());
+ context.setExecutionData(FAILED_KILLED, null);
}
}
else {
- context.setExternalStatus("RUNNING");
+ context.setExternalStatus(YarnApplicationState.RUNNING.toString());
LOG.info(XLog.STD, "checking action, hadoop job ID [{0}] status [RUNNING]",
- runningJob.getID());
+ action.getExternalId());
}
}
catch (Exception ex) {
LOG.warn("Exception in check(). Message[{0}]", ex.getMessage(), ex);
- exception = true;
throw convertException(ex);
}
finally {
- if (jobClient != null) {
- try {
- jobClient.close();
- }
- catch (Exception e) {
- if (exception) {
- LOG.error("JobClient error: ", e);
- }
- else {
- throw convertException(e);
- }
- }
+ if (yarnClient != null) {
+ IOUtils.closeQuietly(yarnClient);
}
}
}
@@ -1555,14 +1534,12 @@ public class JavaActionExecutor extends ActionExecutor {
/**
* Get the output data of an action. Subclasses should override this method
* to get action specific output data.
- *
* @param actionFs the FileSystem object
- * @param runningJob the runningJob
* @param action the Workflow action
* @param context executor context
*
*/
- protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context)
+ protected void getActionData(FileSystem actionFs, WorkflowAction action, Context context)
throws HadoopAccessorException, JDOMException, IOException, URISyntaxException {
}
@@ -1585,38 +1562,28 @@ public class JavaActionExecutor extends ActionExecutor {
@Override
public void kill(Context context, WorkflowAction action) throws ActionExecutorException {
- JobClient jobClient = null;
- boolean exception = false;
+ YarnClient yarnClient = null;
try {
Element actionXml = XmlUtils.parseXml(action.getConf());
+ String user = context.getWorkflow().getUser();
JobConf jobConf = createBaseHadoopConf(context, actionXml);
- jobClient = createJobClient(context, jobConf);
- RunningJob runningJob = getRunningJob(context, action, jobClient);
- if (runningJob != null) {
- runningJob.killJob();
- }
+ yarnClient = createYarnClient(context, jobConf);
+ yarnClient.killApplication(ConverterUtils.toApplicationId(action.getExternalId()));
context.setExternalStatus(KILLED);
context.setExecutionData(KILLED, null);
- }
- catch (Exception ex) {
- exception = true;
+ } catch (Exception ex) {
+ LOG.error("Error: ", ex);
throw convertException(ex);
- }
- finally {
+ } finally {
try {
FileSystem actionFs = context.getAppFileSystem();
cleanUpActionDir(actionFs, context);
- if (jobClient != null) {
- jobClient.close();
- }
- }
- catch (Exception ex) {
- if (exception) {
- LOG.error("Error: ", ex);
- }
- else {
- throw convertException(ex);
+ if (yarnClient != null) {
+ yarnClient.close();
}
+ } catch (Exception ex) {
+ LOG.error("Error: ", ex);
+ throw convertException(ex);
}
}
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java b/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java
index 69e1044..07d1262 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java
@@ -145,18 +145,6 @@ public class LauncherMapperHelper {
launcherConf.setBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", true);
}
- FileSystem fs =
- Services.get().get(HadoopAccessorService.class).createFileSystem(launcherConf.get("user.name"),
- actionDir.toUri(), launcherConf);
- fs.mkdirs(actionDir);
-
- OutputStream os = fs.create(new Path(actionDir, LauncherMapper.ACTION_CONF_XML));
- try {
- actionConf.writeXml(os);
- } finally {
- IOUtils.closeSafely(os);
- }
-
launcherConf.setInputFormat(OozieLauncherInputFormat.class);
launcherConf.set("mapred.output.dir", new Path(actionDir, "output").toString());
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java
index 252f461..6a41235 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RunningJob;
import org.apache.oozie.action.ActionExecutorException;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.service.HadoopAccessorException;
@@ -157,9 +156,9 @@ public class SparkActionExecutor extends JavaActionExecutor {
}
@Override
- protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context)
+ protected void getActionData(FileSystem actionFs, WorkflowAction action, Context context)
throws HadoopAccessorException, JDOMException, IOException, URISyntaxException {
- super.getActionData(actionFs, runningJob, action, context);
+ super.getActionData(actionFs, action, context);
readExternalChildIDs(action, context);
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java
index 6813a37..82e5f0c 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java
@@ -23,7 +23,6 @@ import java.io.StringReader;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
-import java.util.Properties;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
@@ -232,17 +231,15 @@ public class SqoopActionExecutor extends JavaActionExecutor {
/**
* Get the stats and external child IDs
- *
- * @param actionFs the FileSystem object
- * @param runningJob the runningJob
+ * @param actionFs the FileSystem object
* @param action the Workflow action
* @param context executor context
*
*/
@Override
- protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context)
+ protected void getActionData(FileSystem actionFs, WorkflowAction action, Context context)
throws HadoopAccessorException, JDOMException, IOException, URISyntaxException{
- super.getActionData(actionFs, runningJob, action, context);
+ super.getActionData(actionFs, action, context);
readExternalChildIDs(action, context);
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java b/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java
index 794e825..0177241 100644
--- a/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java
+++ b/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java
@@ -18,6 +18,7 @@
package org.apache.oozie.service;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
@@ -29,7 +30,14 @@ import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
import org.apache.oozie.ErrorCode;
+import org.apache.oozie.action.ActionExecutor;
import org.apache.oozie.action.hadoop.JavaActionExecutor;
import org.apache.oozie.util.ParamChecker;
import org.apache.oozie.util.XConfiguration;
@@ -39,6 +47,7 @@ import org.apache.oozie.workflow.lite.LiteWorkflowAppParser;
import java.io.File;
import java.io.FileInputStream;
+import java.io.FileOutputStream;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.InputStream;
@@ -511,9 +520,43 @@ public class HadoopAccessorService implements Service {
}
/**
- * Return a FileSystem created with the provided user for the specified URI.
+ * Return a YarnClient created with the provided user and configuration.
*
+ * @param user The username to impersonate
+ * @param conf The conf
+ * @return a YarnClient with the provided user and configuration
+ * @throws HadoopAccessorException if the client could not be created.
+ */
+ public YarnClient createYarnClient(String user, final Configuration conf) throws HadoopAccessorException {
+ ParamChecker.notEmpty(user, "user");
+ if (!conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) {
+ throw new HadoopAccessorException(ErrorCode.E0903);
+ }
+ String rm = conf.get(JavaActionExecutor.HADOOP_YARN_RM);
+ validateJobTracker(rm);
+ try {
+ UserGroupInformation ugi = getUGI(user);
+ YarnClient yarnClient = ugi.doAs(new PrivilegedExceptionAction<YarnClient>() {
+ @Override
+ public YarnClient run() throws Exception {
+ YarnClient yarnClient = YarnClient.createYarnClient();
+ yarnClient.init(conf);
+ yarnClient.start();
+ return yarnClient;
+ }
+ });
+ return yarnClient;
+ } catch (InterruptedException ex) {
+ throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex);
+ } catch (IOException ex) {
+ throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex);
+ }
+ }
+
+ /**
+ * Return a FileSystem created with the provided user for the specified URI.
*
+ * @param user The username to impersonate
* @param uri file system URI.
* @param conf Configuration with all necessary information to create the FileSystem.
* @return FileSystem created with the provided user/group.
@@ -667,4 +710,56 @@ public class HadoopAccessorService implements Service {
return supportedSchemes;
}
+ /**
+ * Creates a {@link LocalResource} for the Configuration to localize it for a Yarn Container. This involves also writing it
+ * to HDFS.
+ * Example usage:
+ * * <pre>
+ * {@code
+ * LocalResource res1 = createLocalResourceForConfigurationFile(filename1, user, conf, uri, dir);
+ * LocalResource res2 = createLocalResourceForConfigurationFile(filename2, user, conf, uri, dir);
+ * ...
+ * Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+ * localResources.put(filename1, res1);
+ * localResources.put(filename2, res2);
+ * ...
+ * containerLaunchContext.setLocalResources(localResources);
+ * }
+ * </pre>
+ *
+ * @param filename The filename to use on the remote filesystem and once it has been localized.
+ * @param user The user
+ * @param conf The configuration to process
+ * @param uri The URI of the remote filesystem (e.g. HDFS)
+ * @param dir The directory on the remote filesystem to write the file to
+ * @return
+ * @throws IOException A problem occurred writing the file
+ * @throws HadoopAccessorException A problem occured with Hadoop
+ * @throws URISyntaxException A problem occurred parsing the URI
+ */
+ public LocalResource createLocalResourceForConfigurationFile(String filename, String user, Configuration conf, URI uri,
+ Path dir)
+ throws IOException, HadoopAccessorException, URISyntaxException {
+ File f = File.createTempFile(filename, ".tmp");
+ FileOutputStream fos = null;
+ try {
+ fos = new FileOutputStream(f);
+ conf.writeXml(fos);
+ } finally {
+ if (fos != null) {
+ fos.close();
+ }
+ }
+ FileSystem fs = createFileSystem(user, uri, conf);
+ Path dst = new Path(dir, filename);
+ fs.copyFromLocalFile(new Path(f.getAbsolutePath()), dst);
+ LocalResource localResource = Records.newRecord(LocalResource.class);
+ localResource.setType(LocalResourceType.FILE); localResource.setVisibility(LocalResourceVisibility.APPLICATION);
+ localResource.setResource(ConverterUtils.getYarnUrlFromPath(dst));
+ FileStatus destStatus = fs.getFileStatus(dst);
+ localResource.setTimestamp(destStatus.getModificationTime());
+ localResource.setSize(destStatus.getLen());
+ return localResource;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/main/java/org/apache/oozie/util/ClasspathUtils.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/util/ClasspathUtils.java b/core/src/main/java/org/apache/oozie/util/ClasspathUtils.java
new file mode 100644
index 0000000..8533371
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/util/ClasspathUtils.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.util;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+public class ClasspathUtils {
+ private static boolean usingMiniYarnCluster = false;
+ private static final List<String> CLASSPATH_ENTRIES = Arrays.asList(
+ ApplicationConstants.Environment.PWD.$(),
+ MRJobConfig.JOB_JAR + Path.SEPARATOR + MRJobConfig.JOB_JAR,
+ MRJobConfig.JOB_JAR + Path.SEPARATOR + "classes" + Path.SEPARATOR,
+ MRJobConfig.JOB_JAR + Path.SEPARATOR + "lib" + Path.SEPARATOR + "*",
+ ApplicationConstants.Environment.PWD.$() + Path.SEPARATOR + "*"
+ );
+
+ @VisibleForTesting
+ public static void setUsingMiniYarnCluster(boolean useMiniYarnCluster) {
+ usingMiniYarnCluster = useMiniYarnCluster;
+ }
+
+ // Adapted from MRApps#setClasspath. Adds Yarn, HDFS, Common, and distributed cache jars.
+ public static void setupClasspath(Map<String, String> env, Configuration conf) throws IOException {
+ // Propagate the system classpath when using the mini cluster
+ if (usingMiniYarnCluster) {
+ MRApps.addToEnvironment(
+ env,
+ ApplicationConstants.Environment.CLASSPATH.name(),
+ System.getProperty("java.class.path"), conf);
+ }
+
+ for (String entry : CLASSPATH_ENTRIES) {
+ MRApps.addToEnvironment(env, ApplicationConstants.Environment.CLASSPATH.name(), entry, conf);
+ }
+
+ // a * in the classpath will only find a .jar, so we need to filter out
+ // all .jars and add everything else
+ addToClasspathIfNotJar(org.apache.hadoop.mapreduce.filecache.DistributedCache.getFileClassPaths(conf),
+ org.apache.hadoop.mapreduce.filecache.DistributedCache.getCacheFiles(conf),
+ conf,
+ env, ApplicationConstants.Environment.PWD.$());
+ addToClasspathIfNotJar(org.apache.hadoop.mapreduce.filecache.DistributedCache.getArchiveClassPaths(conf),
+ org.apache.hadoop.mapreduce.filecache.DistributedCache.getCacheArchives(conf),
+ conf,
+ env, ApplicationConstants.Environment.PWD.$());
+
+
+ boolean crossPlatform = conf.getBoolean(MRConfig.MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM,
+ MRConfig.DEFAULT_MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM);
+
+ for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+ crossPlatform
+ ? YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH
+ : YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
+ MRApps.addToEnvironment(env, ApplicationConstants.Environment.CLASSPATH.name(),
+ c.trim(), conf);
+ }
+ }
+
+ // Adapted from MRApps#setClasspath
+ public static void addMapReduceToClasspath(Map<String, String> env, Configuration conf) {
+ boolean crossPlatform = conf.getBoolean(MRConfig.MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM,
+ MRConfig.DEFAULT_MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM);
+
+ for (String c : conf.getStrings(MRJobConfig.MAPREDUCE_APPLICATION_CLASSPATH,
+ crossPlatform ?
+ StringUtils.getStrings(MRJobConfig.DEFAULT_MAPREDUCE_CROSS_PLATFORM_APPLICATION_CLASSPATH)
+ : StringUtils.getStrings(MRJobConfig.DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH))) {
+ MRApps.addToEnvironment(env, ApplicationConstants.Environment.CLASSPATH.name(),
+ c.trim(), conf);
+ }
+ }
+
+ // Borrowed from MRApps#addToClasspathIfNotJar
+ private static void addToClasspathIfNotJar(Path[] paths,
+ URI[] withLinks, Configuration conf,
+ Map<String, String> environment,
+ String classpathEnvVar) throws IOException {
+ if (paths != null) {
+ HashMap<Path, String> linkLookup = new HashMap<Path, String>();
+ if (withLinks != null) {
+ for (URI u: withLinks) {
+ Path p = new Path(u);
+ FileSystem remoteFS = p.getFileSystem(conf);
+ p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
+ remoteFS.getWorkingDirectory()));
+ String name = (null == u.getFragment())
+ ? p.getName() : u.getFragment();
+ if (!name.toLowerCase(Locale.ENGLISH).endsWith(".jar")) {
+ linkLookup.put(p, name);
+ }
+ }
+ }
+
+ for (Path p : paths) {
+ FileSystem remoteFS = p.getFileSystem(conf);
+ p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
+ remoteFS.getWorkingDirectory()));
+ String name = linkLookup.get(p);
+ if (name == null) {
+ name = p.getName();
+ }
+ if(!name.toLowerCase(Locale.ENGLISH).endsWith(".jar")) {
+ MRApps.addToEnvironment(
+ environment,
+ classpathEnvVar,
+ ApplicationConstants.Environment.PWD.$() + Path.SEPARATOR + name, conf);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/main/resources/oozie-default.xml
----------------------------------------------------------------------
diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml
index 6c2f7d8..5f4645c 100644
--- a/core/src/main/resources/oozie-default.xml
+++ b/core/src/main/resources/oozie-default.xml
@@ -1782,31 +1782,6 @@ will be the requeue interval for the actions which are waiting for a long time w
</property>
<property>
- <name>oozie.action.launcher.mapreduce.job.ubertask.enable</name>
- <value>true</value>
- <description>
- Enables Uber Mode for the launcher job in YARN/Hadoop 2 (no effect in Hadoop 1) for all action types by default.
- This can be overridden on a per-action-type basis by setting
- oozie.action.#action-type#.launcher.mapreduce.job.ubertask.enable in oozie-site.xml (where #action-type# is the action
- type; for example, "pig"). And that can be overridden on a per-action basis by setting
- oozie.launcher.mapreduce.job.ubertask.enable in an action's configuration section in a workflow. In summary, the
- priority is this:
- 1. action's configuration section in a workflow
- 2. oozie.action.#action-type#.launcher.mapreduce.job.ubertask.enable in oozie-site
- 3. oozie.action.launcher.mapreduce.job.ubertask.enable in oozie-site
- </description>
- </property>
-
- <property>
- <name>oozie.action.shell.launcher.mapreduce.job.ubertask.enable</name>
- <value>false</value>
- <description>
- The Shell action may have issues with the $PATH environment when using Uber Mode, and so Uber Mode is disabled by
- default for it. See oozie.action.launcher.mapreduce.job.ubertask.enable
- </description>
- </property>
-
- <property>
<name>oozie.action.shell.setup.hadoop.conf.dir</name>
<value>false</value>
<description>
http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/test/java/org/apache/oozie/QueryServlet.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/QueryServlet.java b/core/src/test/java/org/apache/oozie/QueryServlet.java
new file mode 100644
index 0000000..8789438
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/QueryServlet.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.net.URLDecoder;
+
+/**
+ * Servlet that keeps track of the last query string it recieved
+ */
+public class QueryServlet extends HttpServlet {
+
+ public static String lastQueryString = null;
+
+ protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
+ lastQueryString = URLDecoder.decode(request.getQueryString(), "UTF-8");
+ response.setStatus(HttpServletResponse.SC_OK);
+ }
+
+}