You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by ge...@apache.org on 2017/05/10 21:11:11 UTC
[9/9] oozie git commit: Merge branch 'master' into oya
Merge branch 'master' into oya
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/80854d15
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/80854d15
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/80854d15
Branch: refs/heads/oya
Commit: 80854d151d67666132a02797de4efdbf3b2af175
Parents: 40ee320 1103882
Author: Gezapeti Cseh <ge...@gmail.com>
Authored: Wed May 10 14:09:31 2017 -0700
Committer: Gezapeti Cseh <ge...@gmail.com>
Committed: Wed May 10 14:09:31 2017 -0700
----------------------------------------------------------------------
.../org/apache/oozie/client/OozieClient.java | 4 +-
.../apache/oozie/client/rest/RestConstants.java | 2 +-
.../main/java/org/apache/oozie/BaseEngine.java | 18 +-
.../org/apache/oozie/BaseLocalOozieClient.java | 601 +++++++++++++++++++
.../java/org/apache/oozie/LocalOozieClient.java | 245 +-------
.../apache/oozie/LocalOozieClientBundle.java | 93 +++
.../org/apache/oozie/LocalOozieClientCoord.java | 328 ++--------
.../oozie/OozieClientOperationHandler.java | 173 ++++++
.../java/org/apache/oozie/OozieJsonFactory.java | 55 ++
.../oozie/action/hadoop/JavaActionExecutor.java | 15 +-
.../oozie/action/hadoop/LauncherHelper.java | 2 +
.../hadoop/LauncherInputFormatClassLocator.java | 84 +++
.../command/coord/CoordSubmitXCommand.java | 25 -
.../java/org/apache/oozie/local/LocalOozie.java | 93 ++-
.../org/apache/oozie/servlet/V0JobsServlet.java | 12 +-
.../org/apache/oozie/servlet/V1JobsServlet.java | 184 ++----
core/src/main/resources/oozie-default.xml | 8 +
.../apache/oozie/TestLocalOozieClientCoord.java | 72 ++-
.../TestLauncherInputFormatClassLocator.java | 57 ++
.../command/coord/TestCoordSubmitXCommand.java | 30 -
.../resources/coord-invalid-el-function.xml | 35 --
.../resources/coord-invalid-output-instance.xml | 58 --
.../coord-multiple-input-instance4.xml | 2 +-
.../coord-multiple-input-start-instance2.xml | 2 +-
.../coord-multiple-output-instance4.xml | 2 +-
docs/src/site/twiki/AG_Install.twiki | 4 +-
docs/src/site/twiki/AG_Monitoring.twiki | 2 +-
.../site/twiki/CoordinatorFunctionalSpec.twiki | 2 +-
.../site/twiki/DG_ActionAuthentication.twiki | 4 +-
docs/src/site/twiki/DG_SLAMonitoring.twiki | 2 +-
docs/src/site/twiki/WebServicesAPI.twiki | 6 +-
.../src/site/twiki/WorkflowFunctionalSpec.twiki | 2 +-
release-log.txt | 4 +-
33 files changed, 1412 insertions(+), 814 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/80854d15/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
index f3e5e32,d60a5c7..7a762ab
--- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
@@@ -49,14 -48,13 +49,15 @@@ import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.AccessControlException;
-import org.apache.oozie.hadoop.utils.HadoopShims;
+import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.TaskLog;
+import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.security.Credentials;
+ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.DiskChecker;
@@@ -144,11 -119,18 +148,12 @@@ public class JavaActionExecutor extend
private static int maxActionOutputLen;
private static int maxExternalStatsSize;
private static int maxFSGlobMax;
- private static final String SUCCEEDED = "SUCCEEDED";
- private static final String KILLED = "KILLED";
- private static final String FAILED = "FAILED";
- private static final String FAILED_KILLED = "FAILED/KILLED";
+
+ protected static final String HADOOP_USER = "user.name";
+
protected XLog LOG = XLog.getLog(getClass());
- private static final Pattern heapPattern = Pattern.compile("-Xmx(([0-9]+)[mMgG])");
private static final String JAVA_TMP_DIR_SETTINGS = "-Djava.io.tmpdir=";
- public static final String CONF_HADOOP_YARN_UBER_MODE = OOZIE_ACTION_LAUNCHER_PREFIX + HADOOP_YARN_UBER_MODE;
- public static final String HADOOP_JOB_CLASSLOADER = "mapreduce.job.classloader";
- public static final String HADOOP_USER_CLASSPATH_FIRST = "mapreduce.user.classpath.first";
- public static final String OOZIE_CREDENTIALS_SKIP = "oozie.credentials.skip";
+ private static final LauncherInputFormatClassLocator launcherInputFormatClassLocator = new LauncherInputFormatClassLocator();
public XConfiguration workflowConf = null;
@@@ -166,15 -150,15 +171,15 @@@
super(type);
}
- public static List<Class> getCommonLauncherClasses() {
- List<Class> classes = new ArrayList<Class>();
- classes.add(LauncherMapper.class);
+ public static List<Class<?>> getCommonLauncherClasses() {
+ List<Class<?>> classes = new ArrayList<Class<?>>();
- classes.add(OozieLauncherInputFormat.class);
+ classes.add(LauncherMain.class);
+ classes.add(launcherInputFormatClassLocator.locateOrGet());
classes.add(OozieLauncherOutputFormat.class);
classes.add(OozieLauncherOutputCommitter.class);
- classes.add(LauncherMainHadoopUtils.class);
- classes.add(HadoopShims.class);
classes.addAll(Services.get().get(URIHandlerService.class).getClassesForLauncher());
+ classes.add(LauncherAM.class);
+ classes.add(LauncherAMCallbackNotifier.class);
return classes;
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/80854d15/core/src/main/java/org/apache/oozie/action/hadoop/LauncherHelper.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/oozie/action/hadoop/LauncherHelper.java
index 5ac1a16,0000000..442f9a3
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/LauncherHelper.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/LauncherHelper.java
@@@ -1,321 -1,0 +1,323 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.action.hadoop;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.math.BigInteger;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.service.HadoopAccessorException;
+import org.apache.oozie.service.HadoopAccessorService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.service.URIHandlerService;
+import org.apache.oozie.service.UserGroupInformationService;
+import org.apache.oozie.util.IOUtils;
+import org.apache.oozie.util.PropertiesUtils;
+
+// TODO: we're no longer using Launcher Mapper -- give this class a better name
+public class LauncherHelper {
+
+ public static final String OOZIE_ACTION_YARN_TAG = "oozie.action.yarn.tag";
+
++ private static final LauncherInputFormatClassLocator launcherInputFormatClassLocator = new LauncherInputFormatClassLocator();
++
+ public static String getRecoveryId(Configuration launcherConf, Path actionDir, String recoveryId)
+ throws HadoopAccessorException, IOException {
+ String jobId = null;
+ Path recoveryFile = new Path(actionDir, recoveryId);
+ FileSystem fs = Services.get().get(HadoopAccessorService.class)
+ .createFileSystem(launcherConf.get("user.name"),recoveryFile.toUri(), launcherConf);
+
+ if (fs.exists(recoveryFile)) {
+ InputStream is = fs.open(recoveryFile);
+ BufferedReader reader = new BufferedReader(new InputStreamReader(is));
+ jobId = reader.readLine();
+ reader.close();
+ }
+ return jobId;
+
+ }
+
+ public static void setupMainClass(Configuration launcherConf, String javaMainClass) {
+ // Only set the javaMainClass if its not null or empty string, this way the user can override the action's main class via
+ // <configuration> property
+ if (javaMainClass != null && !javaMainClass.equals("")) {
+ launcherConf.set(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, javaMainClass);
+ }
+ }
+
+ public static void setupLauncherURIHandlerConf(Configuration launcherConf) {
+ for(Map.Entry<String, String> entry : Services.get().get(URIHandlerService.class).getLauncherConfig()) {
+ launcherConf.set(entry.getKey(), entry.getValue());
+ }
+ }
+
+ public static void setupMainArguments(Configuration launcherConf, String[] args) {
+ launcherConf.setInt(LauncherMapper.CONF_OOZIE_ACTION_MAIN_ARG_COUNT, args.length);
+ for (int i = 0; i < args.length; i++) {
+ launcherConf.set(LauncherMapper.CONF_OOZIE_ACTION_MAIN_ARG_PREFIX + i, args[i]);
+ }
+ }
+
+ public static void setupMaxOutputData(Configuration launcherConf, int maxOutputData) {
+ launcherConf.setInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, maxOutputData);
+ }
+
+ /**
+ * Set the maximum value of stats data
+ *
+ * @param launcherConf the oozie launcher configuration
+ * @param maxStatsData the maximum allowed size of stats data
+ */
+ public static void setupMaxExternalStatsSize(Configuration launcherConf, int maxStatsData){
+ launcherConf.setInt(LauncherMapper.CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE, maxStatsData);
+ }
+
+ /**
+ * Set the maximum number of globbed files/dirs
+ *
+ * @param launcherConf the oozie launcher configuration
+ * @param fsGlobMax the maximum number of files/dirs for FS operation
+ */
+ public static void setupMaxFSGlob(Configuration launcherConf, int fsGlobMax){
+ launcherConf.setInt(LauncherMapper.CONF_OOZIE_ACTION_FS_GLOB_MAX, fsGlobMax);
+ }
+
+ public static void setupLauncherInfo(Configuration launcherConf, String jobId, String actionId, Path actionDir,
+ String recoveryId, Configuration actionConf, String prepareXML) throws IOException, HadoopAccessorException {
+
+ launcherConf.set(LauncherMapper.OOZIE_JOB_ID, jobId);
+ launcherConf.set(LauncherMapper.OOZIE_ACTION_ID, actionId);
+ launcherConf.set(LauncherMapper.OOZIE_ACTION_DIR_PATH, actionDir.toString());
+ launcherConf.set(LauncherMapper.OOZIE_ACTION_RECOVERY_ID, recoveryId);
+ launcherConf.set(LauncherMapper.ACTION_PREPARE_XML, prepareXML);
+
+ actionConf.set(LauncherMapper.OOZIE_JOB_ID, jobId);
+ actionConf.set(LauncherMapper.OOZIE_ACTION_ID, actionId);
+
+ if (Services.get().getConf().getBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", false)) {
+ List<String> purgedEntries = new ArrayList<String>();
+ Collection<String> entries = actionConf.getStringCollection("mapreduce.job.cache.files");
+ for (String entry : entries) {
+ if (entry.contains("#")) {
+ purgedEntries.add(entry);
+ }
+ }
+ actionConf.setStrings("mapreduce.job.cache.files", purgedEntries.toArray(new String[purgedEntries.size()]));
+ launcherConf.setBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", true);
+ }
+ }
+
+ public static void setupYarnRestartHandling(Configuration launcherJobConf, Configuration actionConf, String launcherTag,
+ long launcherTime)
+ throws NoSuchAlgorithmException {
+ launcherJobConf.setLong(LauncherMain.OOZIE_JOB_LAUNCH_TIME, launcherTime);
+ // Tags are limited to 100 chars so we need to hash them to make sure (the actionId otherwise doesn't have a max length)
+ String tag = getTag(launcherTag);
+ // keeping the oozie.child.mapreduce.job.tags instead of mapreduce.job.tags to avoid killing launcher itself.
+ // mapreduce.job.tags should only go to child job launch by launcher.
+ actionConf.set(LauncherMain.CHILD_MAPREDUCE_JOB_TAGS, tag);
+ }
+
+ public static String getTag(String launcherTag) throws NoSuchAlgorithmException {
+ MessageDigest digest = MessageDigest.getInstance("MD5");
+ digest.update(launcherTag.getBytes(), 0, launcherTag.length());
+ String md5 = "oozie-" + new BigInteger(1, digest.digest()).toString(16);
+ return md5;
+ }
+
+ public static boolean isMainDone(RunningJob runningJob) throws IOException {
+ return runningJob.isComplete();
+ }
+
+ public static boolean isMainSuccessful(RunningJob runningJob) throws IOException {
+ boolean succeeded = runningJob.isSuccessful();
+ if (succeeded) {
+ Counters counters = runningJob.getCounters();
+ if (counters != null) {
+ Counters.Group group = counters.getGroup(LauncherMapper.COUNTER_GROUP);
+ if (group != null) {
+ succeeded = group.getCounter(LauncherMapper.COUNTER_LAUNCHER_ERROR) == 0;
+ }
+ }
+ }
+ return succeeded;
+ }
+
+ /**
+ * Determine whether action has external child jobs or not
+ * @param actionData
+ * @return true/false
+ * @throws IOException
+ */
+ public static boolean hasExternalChildJobs(Map<String, String> actionData) throws IOException {
+ return actionData.containsKey(LauncherMapper.ACTION_DATA_EXTERNAL_CHILD_IDS);
+ }
+
+ /**
+ * Determine whether action has output data or not
+ * @param actionData
+ * @return true/false
+ * @throws IOException
+ */
+ public static boolean hasOutputData(Map<String, String> actionData) throws IOException {
+ return actionData.containsKey(LauncherMapper.ACTION_DATA_OUTPUT_PROPS);
+ }
+
+ /**
+ * Determine whether action has external stats or not
+ * @param actionData
+ * @return true/false
+ * @throws IOException
+ */
+ public static boolean hasStatsData(Map<String, String> actionData) throws IOException{
+ return actionData.containsKey(LauncherMapper.ACTION_DATA_STATS);
+ }
+
+ /**
+ * Determine whether action has new id (id swap) or not
+ * @param actionData
+ * @return true/false
+ * @throws IOException
+ */
+ public static boolean hasIdSwap(Map<String, String> actionData) throws IOException {
+ return actionData.containsKey(LauncherMapper.ACTION_DATA_NEW_ID);
+ }
+
+ /**
+ * Get the sequence file path storing all action data
+ * @param actionDir
+ * @return
+ */
+ public static Path getActionDataSequenceFilePath(Path actionDir) {
+ return new Path(actionDir, LauncherMapper.ACTION_DATA_SEQUENCE_FILE);
+ }
+
+ /**
+ * Utility function to load the contents of action data sequence file into
+ * memory object
+ *
+ * @param fs Action Filesystem
+ * @param actionDir Path
+ * @param conf Configuration
+ * @return Map action data
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public static Map<String, String> getActionData(final FileSystem fs, final Path actionDir, final Configuration conf)
+ throws IOException, InterruptedException {
+ UserGroupInformationService ugiService = Services.get().get(UserGroupInformationService.class);
+ UserGroupInformation ugi = ugiService.getProxyUser(conf.get(OozieClient.USER_NAME));
+
+ return ugi.doAs(new PrivilegedExceptionAction<Map<String, String>>() {
+ @Override
+ public Map<String, String> run() throws IOException {
+ Map<String, String> ret = new HashMap<String, String>();
+ Path seqFilePath = getActionDataSequenceFilePath(actionDir);
+ if (fs.exists(seqFilePath)) {
+ SequenceFile.Reader seqFile = new SequenceFile.Reader(fs, seqFilePath, conf);
+ Text key = new Text(), value = new Text();
+ while (seqFile.next(key, value)) {
+ ret.put(key.toString(), value.toString());
+ }
+ seqFile.close();
+ }
+ else { // maintain backward-compatibility. to be deprecated
+ org.apache.hadoop.fs.FileStatus[] files = fs.listStatus(actionDir);
+ InputStream is;
+ BufferedReader reader = null;
+ Properties props;
+ if (files != null && files.length > 0) {
+ for (int x = 0; x < files.length; x++) {
+ Path file = files[x].getPath();
+ if (file.equals(new Path(actionDir, "externalChildIds.properties"))) {
+ is = fs.open(file);
+ reader = new BufferedReader(new InputStreamReader(is));
+ ret.put(LauncherMapper.ACTION_DATA_EXTERNAL_CHILD_IDS,
+ IOUtils.getReaderAsString(reader, -1));
+ }
+ else if (file.equals(new Path(actionDir, "newId.properties"))) {
+ is = fs.open(file);
+ reader = new BufferedReader(new InputStreamReader(is));
+ props = PropertiesUtils.readProperties(reader, -1);
+ ret.put(LauncherMapper.ACTION_DATA_NEW_ID, props.getProperty("id"));
+ }
+ else if (file.equals(new Path(actionDir, LauncherMapper.ACTION_DATA_OUTPUT_PROPS))) {
+ int maxOutputData = conf.getInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA,
+ 2 * 1024);
+ is = fs.open(file);
+ reader = new BufferedReader(new InputStreamReader(is));
+ ret.put(LauncherMapper.ACTION_DATA_OUTPUT_PROPS, PropertiesUtils
+ .propertiesToString(PropertiesUtils.readProperties(reader, maxOutputData)));
+ }
+ else if (file.equals(new Path(actionDir, LauncherMapper.ACTION_DATA_STATS))) {
+ int statsMaxOutputData = conf.getInt(LauncherMapper.CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE,
+ Integer.MAX_VALUE);
+ is = fs.open(file);
+ reader = new BufferedReader(new InputStreamReader(is));
+ ret.put(LauncherMapper.ACTION_DATA_STATS, PropertiesUtils
+ .propertiesToString(PropertiesUtils.readProperties(reader, statsMaxOutputData)));
+ }
+ else if (file.equals(new Path(actionDir, LauncherMapper.ACTION_DATA_ERROR_PROPS))) {
+ is = fs.open(file);
+ reader = new BufferedReader(new InputStreamReader(is));
+ ret.put(LauncherMapper.ACTION_DATA_ERROR_PROPS, IOUtils.getReaderAsString(reader, -1));
+ }
+ }
+ }
+ }
+ return ret;
+ }
+ });
+ }
+
+ public static String getActionYarnTag(Configuration conf, String parentId, WorkflowAction wfAction) {
+ String tag;
+ if ( conf != null && conf.get(OOZIE_ACTION_YARN_TAG) != null) {
+ tag = conf.get(OOZIE_ACTION_YARN_TAG) + "@" + wfAction.getName();
+ } else if (parentId != null) {
+ tag = parentId + "@" + wfAction.getName();
+ } else {
+ tag = wfAction.getId();
+ }
+ return tag;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/80854d15/core/src/main/resources/oozie-default.xml
----------------------------------------------------------------------
diff --cc core/src/main/resources/oozie-default.xml
index 4ab7512,076401d..61b1d88
--- a/core/src/main/resources/oozie-default.xml
+++ b/core/src/main/resources/oozie-default.xml
@@@ -1835,6 -1835,39 +1835,14 @@@ will be the requeue interval for the ac
</property>
<property>
- <name>oozie.action.launcher.mapreduce.job.ubertask.enable</name>
- <value>true</value>
- <description>
- Enables Uber Mode for the launcher job in YARN/Hadoop 2 (no effect in Hadoop 1) for all action types by default.
- This can be overridden on a per-action-type basis by setting
- oozie.action.#action-type#.launcher.mapreduce.job.ubertask.enable in oozie-site.xml (where #action-type# is the action
- type; for example, "pig"). And that can be overridden on a per-action basis by setting
- oozie.launcher.mapreduce.job.ubertask.enable in an action's configuration section in a workflow. In summary, the
- priority is this:
- 1. action's configuration section in a workflow
- 2. oozie.action.#action-type#.launcher.mapreduce.job.ubertask.enable in oozie-site
- 3. oozie.action.launcher.mapreduce.job.ubertask.enable in oozie-site
- </description>
- </property>
-
- <property>
- <name>oozie.action.shell.launcher.mapreduce.job.ubertask.enable</name>
- <value>false</value>
- <description>
- The Shell action may have issues with the $PATH environment when using Uber Mode, and so Uber Mode is disabled by
- default for it. See oozie.action.launcher.mapreduce.job.ubertask.enable
- </description>
- </property>
-
- <property>
+ <name>oozie.action.launcher.mapreduce.input.format.class</name>
+ <value>org.apache.oozie.action.hadoop.OozieLauncherInputFormat</value>
+ <description>
+ Make the Launcher Mapper map-only job's InputFormat class pluggable in order to provide alternative implementations.
+ </description>
+ </property>
+
+ <property>
<name>oozie.action.spark.setup.hadoop.conf.dir</name>
<value>false</value>
<description>
http://git-wip-us.apache.org/repos/asf/oozie/blob/80854d15/release-log.txt
----------------------------------------------------------------------