You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by ge...@apache.org on 2017/05/26 09:28:00 UTC
[08/10] oozie git commit: OOZIE-1770 Create Oozie Application Master
for YARN (asasvari, pbacsko, rkanter, gezapeti)
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java b/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java
deleted file mode 100644
index 72ed2f1..0000000
--- a/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java
+++ /dev/null
@@ -1,345 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.oozie.action.hadoop;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.math.BigInteger;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.Counters;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.oozie.client.OozieClient;
-import org.apache.oozie.client.WorkflowAction;
-import org.apache.oozie.service.HadoopAccessorException;
-import org.apache.oozie.service.HadoopAccessorService;
-import org.apache.oozie.service.Services;
-import org.apache.oozie.service.URIHandlerService;
-import org.apache.oozie.service.UserGroupInformationService;
-import org.apache.oozie.util.IOUtils;
-import org.apache.oozie.util.PropertiesUtils;
-
-public class LauncherMapperHelper {
-
- public static final String OOZIE_ACTION_YARN_TAG = "oozie.action.yarn.tag";
-
- private static final LauncherInputFormatClassLocator launcherInputFormatClassLocator = new LauncherInputFormatClassLocator();
-
- public static String getRecoveryId(Configuration launcherConf, Path actionDir, String recoveryId)
- throws HadoopAccessorException, IOException {
- String jobId = null;
- Path recoveryFile = new Path(actionDir, recoveryId);
- FileSystem fs = Services.get().get(HadoopAccessorService.class)
- .createFileSystem(launcherConf.get("user.name"),recoveryFile.toUri(), launcherConf);
-
- if (fs.exists(recoveryFile)) {
- InputStream is = fs.open(recoveryFile);
- BufferedReader reader = new BufferedReader(new InputStreamReader(is));
- jobId = reader.readLine();
- reader.close();
- }
- return jobId;
-
- }
-
- public static void setupMainClass(Configuration launcherConf, String javaMainClass) {
- // Only set the javaMainClass if its not null or empty string, this way the user can override the action's main class via
- // <configuration> property
- if (javaMainClass != null && !javaMainClass.equals("")) {
- launcherConf.set(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, javaMainClass);
- }
- }
-
- public static void setupLauncherURIHandlerConf(Configuration launcherConf) {
- for(Map.Entry<String, String> entry : Services.get().get(URIHandlerService.class).getLauncherConfig()) {
- launcherConf.set(entry.getKey(), entry.getValue());
- }
- }
-
- public static void setupMainArguments(Configuration launcherConf, String[] args) {
- launcherConf.setInt(LauncherMapper.CONF_OOZIE_ACTION_MAIN_ARG_COUNT, args.length);
- for (int i = 0; i < args.length; i++) {
- launcherConf.set(LauncherMapper.CONF_OOZIE_ACTION_MAIN_ARG_PREFIX + i, args[i]);
- }
- }
-
- public static void setupMaxOutputData(Configuration launcherConf, int maxOutputData) {
- launcherConf.setInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, maxOutputData);
- }
-
- /**
- * Set the maximum value of stats data
- *
- * @param launcherConf the oozie launcher configuration
- * @param maxStatsData the maximum allowed size of stats data
- */
- public static void setupMaxExternalStatsSize(Configuration launcherConf, int maxStatsData){
- launcherConf.setInt(LauncherMapper.CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE, maxStatsData);
- }
-
- /**
- * Set the maximum number of globbed files/dirs
- *
- * @param launcherConf the oozie launcher configuration
- * @param fsGlobMax the maximum number of files/dirs for FS operation
- */
- public static void setupMaxFSGlob(Configuration launcherConf, int fsGlobMax){
- launcherConf.setInt(LauncherMapper.CONF_OOZIE_ACTION_FS_GLOB_MAX, fsGlobMax);
- }
-
- public static void setupLauncherInfo(JobConf launcherConf, String jobId, String actionId, Path actionDir,
- String recoveryId, Configuration actionConf, String prepareXML) throws IOException, HadoopAccessorException {
-
- launcherConf.setMapperClass(LauncherMapper.class);
- launcherConf.setSpeculativeExecution(false);
- launcherConf.setNumMapTasks(1);
- launcherConf.setNumReduceTasks(0);
-
- launcherConf.set(LauncherMapper.OOZIE_JOB_ID, jobId);
- launcherConf.set(LauncherMapper.OOZIE_ACTION_ID, actionId);
- launcherConf.set(LauncherMapper.OOZIE_ACTION_DIR_PATH, actionDir.toString());
- launcherConf.set(LauncherMapper.OOZIE_ACTION_RECOVERY_ID, recoveryId);
- launcherConf.set(LauncherMapper.ACTION_PREPARE_XML, prepareXML);
-
- actionConf.set(LauncherMapper.OOZIE_JOB_ID, jobId);
- actionConf.set(LauncherMapper.OOZIE_ACTION_ID, actionId);
-
- if (Services.get().getConf().getBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", false)) {
- List<String> purgedEntries = new ArrayList<String>();
- Collection<String> entries = actionConf.getStringCollection("mapreduce.job.cache.files");
- for (String entry : entries) {
- if (entry.contains("#")) {
- purgedEntries.add(entry);
- }
- }
- actionConf.setStrings("mapreduce.job.cache.files", purgedEntries.toArray(new String[purgedEntries.size()]));
- launcherConf.setBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", true);
- }
-
- FileSystem fs =
- Services.get().get(HadoopAccessorService.class).createFileSystem(launcherConf.get("user.name"),
- actionDir.toUri(), launcherConf);
- fs.mkdirs(actionDir);
-
- OutputStream os = fs.create(new Path(actionDir, LauncherMapper.ACTION_CONF_XML));
- try {
- actionConf.writeXml(os);
- } finally {
- IOUtils.closeSafely(os);
- }
-
- launcherConf.setInputFormat(launcherInputFormatClassLocator.locateOrGet());
- launcherConf.setOutputFormat(OozieLauncherOutputFormat.class);
- launcherConf.setOutputCommitter(OozieLauncherOutputCommitter.class);
- }
-
- public static void setupYarnRestartHandling(JobConf launcherJobConf, Configuration actionConf, String launcherTag,
- long launcherTime)
- throws NoSuchAlgorithmException {
- launcherJobConf.setLong(LauncherMainHadoopUtils.OOZIE_JOB_LAUNCH_TIME, launcherTime);
- // Tags are limited to 100 chars so we need to hash them to make sure (the actionId otherwise doesn't have a max length)
- String tag = getTag(launcherTag);
- // keeping the oozie.child.mapreduce.job.tags instead of mapreduce.job.tags to avoid killing launcher itself.
- // mapreduce.job.tags should only go to child job launch by launcher.
- actionConf.set(LauncherMainHadoopUtils.CHILD_MAPREDUCE_JOB_TAGS, tag);
- }
-
- public static String getTag(String launcherTag) throws NoSuchAlgorithmException {
- MessageDigest digest = MessageDigest.getInstance("MD5");
- digest.update(launcherTag.getBytes(), 0, launcherTag.length());
- String md5 = "oozie-" + new BigInteger(1, digest.digest()).toString(16);
- return md5;
- }
-
- public static boolean isMainDone(RunningJob runningJob) throws IOException {
- return runningJob.isComplete();
- }
-
- public static boolean isMainSuccessful(RunningJob runningJob) throws IOException {
- boolean succeeded = runningJob.isSuccessful();
- if (succeeded) {
- Counters counters = runningJob.getCounters();
- if (counters != null) {
- Counters.Group group = counters.getGroup(LauncherMapper.COUNTER_GROUP);
- if (group != null) {
- succeeded = group.getCounter(LauncherMapper.COUNTER_LAUNCHER_ERROR) == 0;
- }
- }
- }
- return succeeded;
- }
-
- /**
- * Determine whether action has external child jobs or not
- * @param actionData
- * @return true/false
- * @throws IOException
- */
- public static boolean hasExternalChildJobs(Map<String, String> actionData) throws IOException {
- return actionData.containsKey(LauncherMapper.ACTION_DATA_EXTERNAL_CHILD_IDS);
- }
-
- /**
- * Determine whether action has output data or not
- * @param actionData
- * @return true/false
- * @throws IOException
- */
- public static boolean hasOutputData(Map<String, String> actionData) throws IOException {
- return actionData.containsKey(LauncherMapper.ACTION_DATA_OUTPUT_PROPS);
- }
-
- /**
- * Determine whether action has external stats or not
- * @param actionData
- * @return true/false
- * @throws IOException
- */
- public static boolean hasStatsData(Map<String, String> actionData) throws IOException{
- return actionData.containsKey(LauncherMapper.ACTION_DATA_STATS);
- }
-
- /**
- * Determine whether action has new id (id swap) or not
- * @param actionData
- * @return true/false
- * @throws IOException
- */
- public static boolean hasIdSwap(Map<String, String> actionData) throws IOException {
- return actionData.containsKey(LauncherMapper.ACTION_DATA_NEW_ID);
- }
-
- /**
- * Get the sequence file path storing all action data
- * @param actionDir
- * @return
- */
- public static Path getActionDataSequenceFilePath(Path actionDir) {
- return new Path(actionDir, LauncherMapper.ACTION_DATA_SEQUENCE_FILE);
- }
-
- /**
- * Utility function to load the contents of action data sequence file into
- * memory object
- *
- * @param fs Action Filesystem
- * @param actionDir Path
- * @param conf Configuration
- * @return Map action data
- * @throws IOException
- * @throws InterruptedException
- */
- public static Map<String, String> getActionData(final FileSystem fs, final Path actionDir, final Configuration conf)
- throws IOException, InterruptedException {
- UserGroupInformationService ugiService = Services.get().get(UserGroupInformationService.class);
- UserGroupInformation ugi = ugiService.getProxyUser(conf.get(OozieClient.USER_NAME));
-
- return ugi.doAs(new PrivilegedExceptionAction<Map<String, String>>() {
- @Override
- public Map<String, String> run() throws IOException {
- Map<String, String> ret = new HashMap<String, String>();
- Path seqFilePath = getActionDataSequenceFilePath(actionDir);
- if (fs.exists(seqFilePath)) {
- SequenceFile.Reader seqFile = new SequenceFile.Reader(fs, seqFilePath, conf);
- Text key = new Text(), value = new Text();
- while (seqFile.next(key, value)) {
- ret.put(key.toString(), value.toString());
- }
- seqFile.close();
- }
- else { // maintain backward-compatibility. to be deprecated
- org.apache.hadoop.fs.FileStatus[] files = fs.listStatus(actionDir);
- InputStream is;
- BufferedReader reader = null;
- Properties props;
- if (files != null && files.length > 0) {
- for (int x = 0; x < files.length; x++) {
- Path file = files[x].getPath();
- if (file.equals(new Path(actionDir, "externalChildIds.properties"))) {
- is = fs.open(file);
- reader = new BufferedReader(new InputStreamReader(is));
- ret.put(LauncherMapper.ACTION_DATA_EXTERNAL_CHILD_IDS,
- IOUtils.getReaderAsString(reader, -1));
- }
- else if (file.equals(new Path(actionDir, "newId.properties"))) {
- is = fs.open(file);
- reader = new BufferedReader(new InputStreamReader(is));
- props = PropertiesUtils.readProperties(reader, -1);
- ret.put(LauncherMapper.ACTION_DATA_NEW_ID, props.getProperty("id"));
- }
- else if (file.equals(new Path(actionDir, LauncherMapper.ACTION_DATA_OUTPUT_PROPS))) {
- int maxOutputData = conf.getInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA,
- 2 * 1024);
- is = fs.open(file);
- reader = new BufferedReader(new InputStreamReader(is));
- ret.put(LauncherMapper.ACTION_DATA_OUTPUT_PROPS, PropertiesUtils
- .propertiesToString(PropertiesUtils.readProperties(reader, maxOutputData)));
- }
- else if (file.equals(new Path(actionDir, LauncherMapper.ACTION_DATA_STATS))) {
- int statsMaxOutputData = conf.getInt(LauncherMapper.CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE,
- Integer.MAX_VALUE);
- is = fs.open(file);
- reader = new BufferedReader(new InputStreamReader(is));
- ret.put(LauncherMapper.ACTION_DATA_STATS, PropertiesUtils
- .propertiesToString(PropertiesUtils.readProperties(reader, statsMaxOutputData)));
- }
- else if (file.equals(new Path(actionDir, LauncherMapper.ACTION_DATA_ERROR_PROPS))) {
- is = fs.open(file);
- reader = new BufferedReader(new InputStreamReader(is));
- ret.put(LauncherMapper.ACTION_DATA_ERROR_PROPS, IOUtils.getReaderAsString(reader, -1));
- }
- }
- }
- }
- return ret;
- }
- });
- }
-
- public static String getActionYarnTag(Configuration conf, String parentId, WorkflowAction wfAction) {
- String tag;
- if ( conf != null && conf.get(OOZIE_ACTION_YARN_TAG) != null) {
- tag = conf.get(OOZIE_ACTION_YARN_TAG) + "@" + wfAction.getName();
- } else if (parentId != null) {
- tag = parentId + "@" + wfAction.getName();
- } else {
- tag = wfAction.getId();
- }
- return tag;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
index 55c9372..634a1cb 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
@@ -21,7 +21,11 @@ package org.apache.oozie.action.hadoop;
import java.io.IOException;
import java.io.StringReader;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
+
+import javax.annotation.Nonnull;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -31,6 +35,14 @@ import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.oozie.action.ActionExecutor;
import org.apache.oozie.action.ActionExecutorException;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.service.ConfigurationService;
@@ -39,7 +51,11 @@ import org.apache.oozie.util.XLog;
import org.apache.oozie.util.XmlUtils;
import org.jdom.Element;
import org.jdom.Namespace;
-import org.json.simple.JSONObject;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Iterables;
+import com.google.common.io.Closeables;
public class MapReduceActionExecutor extends JavaActionExecutor {
@@ -47,16 +63,16 @@ public class MapReduceActionExecutor extends JavaActionExecutor {
public static final String HADOOP_COUNTERS = "hadoop.counters";
public static final String OOZIE_MAPREDUCE_UBER_JAR_ENABLE = "oozie.action.mapreduce.uber.jar.enable";
private static final String STREAMING_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.StreamingMain";
+ public static final String JOB_END_NOTIFICATION_URL = "job.end.notification.url";
private XLog log = XLog.getLog(getClass());
public MapReduceActionExecutor() {
super("map-reduce");
}
- @SuppressWarnings("rawtypes")
@Override
- public List<Class> getLauncherClasses() {
- List<Class> classes = new ArrayList<Class>();
+ public List<Class<?>> getLauncherClasses() {
+ List<Class<?>> classes = new ArrayList<Class<?>>();
try {
classes.add(Class.forName(STREAMING_MAIN_CLASS_NAME));
}
@@ -97,9 +113,25 @@ public class MapReduceActionExecutor extends JavaActionExecutor {
}
@Override
- Configuration setupLauncherConf(Configuration conf, Element actionXml, Path appPath, Context context) throws ActionExecutorException {
+ Configuration setupLauncherConf(Configuration conf, Element actionXml, Path appPath, Context context)
+ throws ActionExecutorException {
super.setupLauncherConf(conf, actionXml, appPath, context);
conf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", false);
+
+ return conf;
+ }
+
+ private void injectConfigClass(Configuration conf, Element actionXml) {
+ // Inject config-class for launcher to use for action
+ Element e = actionXml.getChild("config-class", actionXml.getNamespace());
+ if (e != null) {
+ conf.set(LauncherMapper.OOZIE_ACTION_CONFIG_CLASS, e.getTextTrim());
+ }
+ }
+
+ @Override
+ protected Configuration createBaseHadoopConf(Context context, Element actionXml, boolean loadResources) {
+ Configuration conf = super.createBaseHadoopConf(context, actionXml, loadResources);
return conf;
}
@@ -108,6 +140,8 @@ public class MapReduceActionExecutor extends JavaActionExecutor {
Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath)
throws ActionExecutorException {
boolean regularMR = false;
+
+ injectConfigClass(actionConf, actionXml);
Namespace ns = actionXml.getNamespace();
if (actionXml.getChild("streaming", ns) != null) {
Element streamingXml = actionXml.getChild("streaming", ns);
@@ -193,7 +227,7 @@ public class MapReduceActionExecutor extends JavaActionExecutor {
try {
if (action.getStatus() == WorkflowAction.Status.OK) {
Element actionXml = XmlUtils.parseXml(action.getConf());
- JobConf jobConf = createBaseHadoopConf(context, actionXml);
+ Configuration jobConf = createBaseHadoopConf(context, actionXml);
jobClient = createJobClient(context, jobConf);
RunningJob runningJob = jobClient.getJob(JobID.forName(action.getExternalChildIDs()));
if (runningJob == null) {
@@ -248,7 +282,8 @@ public class MapReduceActionExecutor extends JavaActionExecutor {
}
// Return the value of the specified configuration property
- private String evaluateConfigurationProperty(Element actionConf, String key, String defaultValue) throws ActionExecutorException {
+ private String evaluateConfigurationProperty(Element actionConf, String key, String defaultValue)
+ throws ActionExecutorException {
try {
String ret = defaultValue;
if (actionConf != null) {
@@ -267,26 +302,6 @@ public class MapReduceActionExecutor extends JavaActionExecutor {
}
}
- @SuppressWarnings("unchecked")
- private JSONObject counterstoJson(Counters counters) {
-
- if (counters == null) {
- return null;
- }
-
- JSONObject groups = new JSONObject();
- for (String gName : counters.getGroupNames()) {
- JSONObject group = new JSONObject();
- for (Counters.Counter counter : counters.getGroup(gName)) {
- String cName = counter.getName();
- Long cValue = counter.getCounter();
- group.put(cName, cValue);
- }
- groups.put(gName, group);
- }
- return groups;
- }
-
/**
* Return the sharelib name for the action.
*
@@ -299,25 +314,6 @@ public class MapReduceActionExecutor extends JavaActionExecutor {
return (actionXml.getChild("streaming", ns) != null) ? "mapreduce-streaming" : null;
}
- @Override
- JobConf createLauncherConf(FileSystem actionFs, Context context, WorkflowAction action, Element actionXml,
- Configuration actionConf) throws ActionExecutorException {
- // If the user is using a regular MapReduce job and specified an uber jar, we need to also set it for the launcher;
- // so we override createLauncherConf to call super and then to set the uber jar if specified. At this point, checking that
- // uber jars are enabled and resolving the uber jar path is already done by setupActionConf() when it parsed the actionConf
- // argument and we can just look up the uber jar in the actionConf argument.
- JobConf launcherJobConf = super.createLauncherConf(actionFs, context, action, actionXml, actionConf);
- Namespace ns = actionXml.getNamespace();
- if (actionXml.getChild("streaming", ns) == null && actionXml.getChild("pipes", ns) == null) {
- // Set for uber jar
- String uberJar = actionConf.get(MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR);
- if (uberJar != null && uberJar.trim().length() > 0) {
- launcherJobConf.setJar(uberJar);
- }
- }
- return launcherJobConf;
- }
-
public static void setStreaming(Configuration conf, String mapper, String reducer, String recordReader,
String[] recordReaderMapping, String[] env) {
if (mapper != null) {
@@ -329,18 +325,93 @@ public class MapReduceActionExecutor extends JavaActionExecutor {
if (recordReader != null) {
conf.set("oozie.streaming.record-reader", recordReader);
}
- MapReduceMain.setStrings(conf, "oozie.streaming.record-reader-mapping", recordReaderMapping);
- MapReduceMain.setStrings(conf, "oozie.streaming.env", env);
+ ActionUtils.setStrings(conf, "oozie.streaming.record-reader-mapping", recordReaderMapping);
+ ActionUtils.setStrings(conf, "oozie.streaming.env", env);
}
@Override
- protected RunningJob getRunningJob(Context context, WorkflowAction action, JobClient jobClient) throws Exception{
- RunningJob runningJob = null;
- String jobId = getActualExternalId(action);
- if (jobId != null) {
- runningJob = jobClient.getJob(JobID.forName(jobId));
+ protected void injectCallback(Context context, Configuration conf) {
+ // add callback for the MapReduce job
+ String callback = context.getCallbackUrl("$jobStatus");
+ String originalCallbackURL = conf.get(JOB_END_NOTIFICATION_URL);
+ if (originalCallbackURL != null) {
+ LOG.warn("Overriding the action job end notification URI. Original value: {0}", originalCallbackURL);
+ }
+ conf.set(JOB_END_NOTIFICATION_URL, callback);
+
+ super.injectCallback(context, conf);
+ }
+
+ @Override
+ protected boolean needToAddMapReduceToClassPath() {
+ return true;
+ }
+
+ @Override
+ public void check(Context context, WorkflowAction action) throws ActionExecutorException {
+ Map<String, String> actionData = Collections.emptyMap();
+ Configuration jobConf = null;
+
+ try {
+ FileSystem actionFs = context.getAppFileSystem();
+ Element actionXml = XmlUtils.parseXml(action.getConf());
+ jobConf = createBaseHadoopConf(context, actionXml);
+ Path actionDir = context.getActionDir();
+ actionData = LauncherHelper.getActionData(actionFs, actionDir, jobConf);
+ } catch (Exception e) {
+ LOG.warn("Exception in check(). Message[{0}]", e.getMessage(), e);
+ throw convertException(e);
+ }
+
+ final String newId = actionData.get(LauncherMapper.ACTION_DATA_NEW_ID);
+
+ // check the Hadoop job if newID is defined (which should be the case here) - otherwise perform the normal check()
+ if (newId != null) {
+ boolean jobCompleted;
+ JobClient jobClient = null;
+ boolean exception = false;
+
+ try {
+ jobClient = createJobClient(context, new JobConf(jobConf));
+ RunningJob runningJob = jobClient.getJob(JobID.forName(newId));
+
+ if (runningJob == null) {
+ context.setExternalStatus(FAILED);
+ throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017",
+ "Unknown hadoop job [{0}] associated with action [{1}]. Failing this action!", newId,
+ action.getId());
+ }
+
+ jobCompleted = runningJob.isComplete();
+ } catch (Exception e) {
+ LOG.warn("Unable to check the state of a running MapReduce job -"
+ + " please check the health of the Job History Server!", e);
+ exception = true;
+ throw convertException(e);
+ } finally {
+ if (jobClient != null) {
+ try {
+ jobClient.close();
+ } catch (Exception e) {
+ if (exception) {
+ LOG.error("JobClient error (not re-throwing due to a previous error): ", e);
+ } else {
+ throw convertException(e);
+ }
+ }
+ }
+ }
+
+ // run original check() if the MR action is completed or there are errors - otherwise mark it as RUNNING
+ if (jobCompleted || actionData.containsKey(LauncherMapper.ACTION_DATA_ERROR_PROPS)) {
+ super.check(context, action);
+ } else {
+ context.setExternalStatus(RUNNING);
+ context.setExternalChildIDs(newId);
+ }
+ } else {
+ super.check(context, action);
}
- return runningJob;
}
@Override
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/action/hadoop/OozieJobInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/OozieJobInfo.java b/core/src/main/java/org/apache/oozie/action/hadoop/OozieJobInfo.java
index 581d3b3..d8b1f03 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/OozieJobInfo.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/OozieJobInfo.java
@@ -29,9 +29,7 @@ import org.apache.oozie.action.ActionExecutor.Context;
import org.apache.oozie.action.oozie.SubWorkflowActionExecutor;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.WorkflowAction;
-import org.apache.oozie.command.wf.JobXCommand;
import org.apache.oozie.service.ConfigurationService;
-import org.apache.oozie.service.Services;
import org.apache.oozie.util.XConfiguration;
import com.google.common.annotations.VisibleForTesting;
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/action/hadoop/PigActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/PigActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/PigActionExecutor.java
index 8b2dc16..8a24ac3 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/PigActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/PigActionExecutor.java
@@ -18,25 +18,21 @@
package org.apache.oozie.action.hadoop;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobConf;
import org.apache.oozie.action.ActionExecutorException;
-import org.apache.oozie.action.ActionExecutor.Context;
-import org.apache.oozie.client.XOozieClient;
import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.client.XOozieClient;
import org.apache.oozie.service.ConfigurationService;
import org.apache.oozie.service.HadoopAccessorService;
-import org.apache.oozie.service.Services;
-import org.apache.oozie.service.WorkflowAppService;
import org.jdom.Element;
-import org.jdom.Namespace;
import org.jdom.JDOMException;
+import org.jdom.Namespace;
import org.json.simple.parser.JSONParser;
-import java.util.ArrayList;
-import java.util.List;
-
public class PigActionExecutor extends ScriptLanguageActionExecutor {
private static final String PIG_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.PigMain";
@@ -48,10 +44,9 @@ public class PigActionExecutor extends ScriptLanguageActionExecutor {
super("pig");
}
- @SuppressWarnings("rawtypes")
@Override
- public List<Class> getLauncherClasses() {
- List<Class> classes = new ArrayList<Class>();
+ public List<Class<?>> getLauncherClasses() {
+ List<Class<?>> classes = new ArrayList<Class<?>>();
try {
classes.add(Class.forName(PIG_MAIN_CLASS_NAME));
classes.add(JSONParser.class);
@@ -73,7 +68,6 @@ public class PigActionExecutor extends ScriptLanguageActionExecutor {
}
@Override
- @SuppressWarnings("unchecked")
Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath)
throws ActionExecutorException {
super.setupActionConf(actionConf, context, actionXml, appPath);
@@ -82,12 +76,14 @@ public class PigActionExecutor extends ScriptLanguageActionExecutor {
String script = actionXml.getChild("script", ns).getTextTrim();
String pigName = new Path(script).getName();
- List<Element> params = (List<Element>) actionXml.getChildren("param", ns);
+ @SuppressWarnings("unchecked")
+ List<Element> params = actionXml.getChildren("param", ns);
String[] strParams = new String[params.size()];
for (int i = 0; i < params.size(); i++) {
strParams[i] = params.get(i).getTextTrim();
}
String[] strArgs = null;
+ @SuppressWarnings("unchecked")
List<Element> eArgs = actionXml.getChildren("argument", ns);
if (eArgs != null && eArgs.size() > 0) {
strArgs = new String[eArgs.size()];
@@ -101,8 +97,8 @@ public class PigActionExecutor extends ScriptLanguageActionExecutor {
public static void setPigScript(Configuration conf, String script, String[] params, String[] args) {
conf.set(PIG_SCRIPT, script);
- MapReduceMain.setStrings(conf, PIG_PARAMS, params);
- MapReduceMain.setStrings(conf, PIG_ARGS, args);
+ ActionUtils.setStrings(conf, PIG_PARAMS, params);
+ ActionUtils.setStrings(conf, PIG_ARGS, args);
}
@@ -127,10 +123,15 @@ public class PigActionExecutor extends ScriptLanguageActionExecutor {
}
@Override
- protected JobConf loadHadoopDefaultResources(Context context, Element actionXml) {
+ protected boolean needToAddMapReduceToClassPath() {
+ return true;
+ }
+
+ @Override
+ protected Configuration loadHadoopDefaultResources(Context context, Element actionXml) {
boolean loadDefaultResources = ConfigurationService
.getBoolean(HadoopAccessorService.ACTION_CONFS_LOAD_DEFAULT_RESOURCES);
- JobConf conf = super.createBaseHadoopConf(context, actionXml, loadDefaultResources);
+ Configuration conf = super.createBaseHadoopConf(context, actionXml, loadDefaultResources);
return conf;
}
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/action/hadoop/ScriptLanguageActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/ScriptLanguageActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/ScriptLanguageActionExecutor.java
index 92e149d..196f0b7 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/ScriptLanguageActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/ScriptLanguageActionExecutor.java
@@ -37,9 +37,8 @@ public abstract class ScriptLanguageActionExecutor extends JavaActionExecutor {
super(type);
}
- @SuppressWarnings("rawtypes")
@Override
- public List<Class> getLauncherClasses() {
+ public List<Class<?>> getLauncherClasses() {
return null;
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/action/hadoop/ShellActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/ShellActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/ShellActionExecutor.java
index b9ffa7a..d44bbc5 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/ShellActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/ShellActionExecutor.java
@@ -19,10 +19,13 @@
package org.apache.oozie.action.hadoop;
+import java.io.File;
import java.util.List;
+import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.util.Apps;
import org.apache.oozie.action.ActionExecutorException;
import org.apache.oozie.service.ConfigurationService;
import org.jdom.Element;
@@ -30,19 +33,12 @@ import org.jdom.Namespace;
public class ShellActionExecutor extends JavaActionExecutor {
- /**
- * Config property name to set the child environment
- */
- public String OOZIE_LAUNCHER_CHILD_ENV = "mapred.child.env";
- public String OOZIE_LAUNCHER_MAP_ENV = "mapreduce.map.env";
-
public ShellActionExecutor() {
super("shell");
}
- @SuppressWarnings("rawtypes")
@Override
- public List<Class> getLauncherClasses() {
+ public List<Class<?>> getLauncherClasses() {
return null;
}
@@ -51,7 +47,6 @@ public class ShellActionExecutor extends JavaActionExecutor {
return launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, ShellMain.class.getName());
}
- @SuppressWarnings("unchecked")
@Override
Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath)
throws ActionExecutorException {
@@ -103,6 +98,7 @@ public class ShellActionExecutor extends JavaActionExecutor {
boolean checkKeyValue) throws ActionExecutorException {
String[] strTagValue = null;
Namespace ns = actionXml.getNamespace();
+ @SuppressWarnings("unchecked")
List<Element> eTags = actionXml.getChildren(tag, ns);
if (eTags != null && eTags.size() > 0) {
strTagValue = new String[eTags.size()];
@@ -113,7 +109,7 @@ public class ShellActionExecutor extends JavaActionExecutor {
}
}
}
- MapReduceMain.setStrings(actionConf, key, strTagValue);
+ ActionUtils.setStrings(actionConf, key, strTagValue);
}
/**
@@ -130,23 +126,8 @@ public class ShellActionExecutor extends JavaActionExecutor {
}
@Override
- protected Configuration setupLauncherConf(Configuration conf, Element actionXml, Path appPath, Context context)
- throws ActionExecutorException {
- super.setupLauncherConf(conf, actionXml, appPath, context);
- addDefaultChildEnv(conf);
- return conf;
- }
-
- /**
- * This method sets the PATH to current working directory for the launched
- * map task from where shell command will run.
- *
- * @param conf
- */
- protected void addDefaultChildEnv(Configuration conf) {
- String envValues = "PATH=.:$PATH";
- updateProperty(conf, OOZIE_LAUNCHER_MAP_ENV, envValues);
- updateProperty(conf, OOZIE_LAUNCHER_CHILD_ENV, envValues);
+ protected void addActionSpecificEnvVars(Map<String, String> env) {
+ Apps.setEnvFromInputString(env, "PATH=.:$PATH", File.pathSeparator);
}
/**
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java
index 1a3197a..00497a7 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java
@@ -21,7 +21,6 @@ package org.apache.oozie.action.hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobConf;
import org.apache.oozie.action.ActionExecutorException;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.service.ConfigurationService;
@@ -32,12 +31,11 @@ import org.jdom.Namespace;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
public class SparkActionExecutor extends JavaActionExecutor {
public static final String SPARK_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.SparkMain";
- public static final String TASK_USER_PRECEDENCE = "mapreduce.task.classpath.user.precedence"; // hadoop-2
- public static final String TASK_USER_CLASSPATH_PRECEDENCE = "mapreduce.user.classpath.first"; // hadoop-1
public static final String SPARK_MASTER = "oozie.spark.master";
public static final String SPARK_MODE = "oozie.spark.mode";
public static final String SPARK_OPTS = "oozie.spark.spark-opts";
@@ -78,7 +76,7 @@ public class SparkActionExecutor extends JavaActionExecutor {
StringBuilder sparkOptsSb = new StringBuilder();
if (master.startsWith("yarn")) {
- String resourceManager = actionConf.get(HADOOP_JOB_TRACKER);
+ String resourceManager = actionConf.get(HADOOP_YARN_RM);
Properties sparkConfig =
Services.get().get(SparkConfigurationService.class).getSparkConfig(resourceManager);
for (String property : sparkConfig.stringPropertyNames()) {
@@ -102,20 +100,6 @@ public class SparkActionExecutor extends JavaActionExecutor {
}
@Override
- JobConf createLauncherConf(FileSystem actionFs, Context context, WorkflowAction action, Element actionXml,
- Configuration actionConf) throws ActionExecutorException {
-
- JobConf launcherJobConf = super.createLauncherConf(actionFs, context, action, actionXml, actionConf);
- if (launcherJobConf.get("oozie.launcher." + TASK_USER_PRECEDENCE) == null) {
- launcherJobConf.set(TASK_USER_PRECEDENCE, "true");
- }
- if (launcherJobConf.get("oozie.launcher." + TASK_USER_CLASSPATH_PRECEDENCE) == null) {
- launcherJobConf.set(TASK_USER_CLASSPATH_PRECEDENCE, "true");
- }
- return launcherJobConf;
- }
-
- @Override
Configuration setupLauncherConf(Configuration conf, Element actionXml, Path appPath, Context context)
throws ActionExecutorException {
super.setupLauncherConf(conf, actionXml, appPath, context);
@@ -136,8 +120,8 @@ public class SparkActionExecutor extends JavaActionExecutor {
}
@Override
- public List<Class> getLauncherClasses() {
- List<Class> classes = new ArrayList<Class>();
+ public List<Class<?>> getLauncherClasses() {
+ List<Class<?>> classes = new ArrayList<Class<?>>();
try {
classes.add(Class.forName(SPARK_MAIN_CLASS_NAME));
} catch (ClassNotFoundException e) {
@@ -159,6 +143,16 @@ public class SparkActionExecutor extends JavaActionExecutor {
}
@Override
+ protected boolean needToAddMapReduceToClassPath() {
+ return true;
+ }
+
+ @Override
+ protected void addActionSpecificEnvVars(Map<String, String> env) {
+ env.put("SPARK_HOME", ".");
+ }
+
+ @Override
protected String getLauncherMain(Configuration launcherConf, Element actionXml) {
return launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, SPARK_MAIN_CLASS_NAME);
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java
index 22e2874..955f3b7 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java
@@ -18,6 +18,12 @@
package org.apache.oozie.action.hadoop;
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.StringTokenizer;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.Counters;
@@ -33,12 +39,6 @@ import org.apache.oozie.util.XmlUtils;
import org.jdom.Element;
import org.jdom.Namespace;
-import java.io.IOException;
-import java.io.StringReader;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.StringTokenizer;
-
public class SqoopActionExecutor extends JavaActionExecutor {
public static final String OOZIE_ACTION_EXTERNAL_STATS_WRITE = "oozie.action.external.stats.write";
@@ -51,8 +51,8 @@ public class SqoopActionExecutor extends JavaActionExecutor {
}
@Override
- public List<Class> getLauncherClasses() {
- List<Class> classes = new ArrayList<Class>();
+ public List<Class<?>> getLauncherClasses() {
+ List<Class<?>> classes = new ArrayList<Class<?>>();
try {
classes.add(Class.forName(SQOOP_MAIN_CLASS_NAME));
}
@@ -68,7 +68,6 @@ public class SqoopActionExecutor extends JavaActionExecutor {
}
@Override
- @SuppressWarnings("unchecked")
Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath)
throws ActionExecutorException {
super.setupActionConf(actionConf, context, actionXml, appPath);
@@ -96,6 +95,7 @@ public class SqoopActionExecutor extends JavaActionExecutor {
}
}
else {
+ @SuppressWarnings("unchecked")
List<Element> eArgs = (List<Element>) actionXml.getChildren("arg", ns);
for (Element elem : eArgs) {
argList.add(elem.getTextTrim());
@@ -119,7 +119,7 @@ public class SqoopActionExecutor extends JavaActionExecutor {
}
private void setSqoopCommand(Configuration conf, String[] args) {
- MapReduceMain.setStrings(conf, SQOOP_ARGS, args);
+ ActionUtils.setStrings(conf, SQOOP_ARGS, args);
}
/**
@@ -141,7 +141,7 @@ public class SqoopActionExecutor extends JavaActionExecutor {
try {
if (action.getStatus() == WorkflowAction.Status.OK) {
Element actionXml = XmlUtils.parseXml(action.getConf());
- JobConf jobConf = createBaseHadoopConf(context, actionXml);
+ Configuration jobConf = createBaseHadoopConf(context, actionXml);
jobClient = createJobClient(context, jobConf);
// Cumulative counters for all Sqoop mapreduce jobs
@@ -236,6 +236,11 @@ public class SqoopActionExecutor extends JavaActionExecutor {
}
}
+ @Override
+ protected boolean needToAddMapReduceToClassPath() {
+ return true;
+ }
+
/**
* Return the sharelib name for the action.
*
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/client/rest/BulkResponseImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/client/rest/BulkResponseImpl.java b/core/src/main/java/org/apache/oozie/client/rest/BulkResponseImpl.java
index fb021bd..374c6ef 100644
--- a/core/src/main/java/org/apache/oozie/client/rest/BulkResponseImpl.java
+++ b/core/src/main/java/org/apache/oozie/client/rest/BulkResponseImpl.java
@@ -18,7 +18,6 @@
package org.apache.oozie.client.rest;
-import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -29,6 +28,8 @@ import org.apache.oozie.client.BulkResponse;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
+import com.google.common.collect.ImmutableSet;
+
/**
* Server-side implementation class of the client interface BulkResponse
* Declares all the bulk request specific user parameters and handling as JSON object
@@ -48,20 +49,14 @@ public class BulkResponseImpl implements BulkResponse, JsonBean {
public static final String BULK_FILTER_END_NOMINAL_EPOCH = "endscheduledtime";
public static final String BULK_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:SS'Z'";
- public static final Set<String> BULK_FILTER_NAMES = new HashSet<String>();
-
- static {
-
- BULK_FILTER_NAMES.add(BulkResponseImpl.BULK_FILTER_BUNDLE);
- BULK_FILTER_NAMES.add(BulkResponseImpl.BULK_FILTER_COORD);
- BULK_FILTER_NAMES.add(BulkResponseImpl.BULK_FILTER_LEVEL);
- BULK_FILTER_NAMES.add(BulkResponseImpl.BULK_FILTER_STATUS);
- BULK_FILTER_NAMES.add(BulkResponseImpl.BULK_FILTER_START_CREATED_EPOCH);
- BULK_FILTER_NAMES.add(BulkResponseImpl.BULK_FILTER_END_CREATED_EPOCH);
- BULK_FILTER_NAMES.add(BulkResponseImpl.BULK_FILTER_START_NOMINAL_EPOCH);
- BULK_FILTER_NAMES.add(BulkResponseImpl.BULK_FILTER_END_NOMINAL_EPOCH);
-
- }
+ public static final Set<String> BULK_FILTER_NAMES = ImmutableSet.of(BulkResponseImpl.BULK_FILTER_BUNDLE,
+ BulkResponseImpl.BULK_FILTER_COORD,
+ BulkResponseImpl.BULK_FILTER_LEVEL,
+ BulkResponseImpl.BULK_FILTER_STATUS,
+ BulkResponseImpl.BULK_FILTER_START_CREATED_EPOCH,
+ BulkResponseImpl.BULK_FILTER_END_CREATED_EPOCH,
+ BulkResponseImpl.BULK_FILTER_START_NOMINAL_EPOCH,
+ BulkResponseImpl.BULK_FILTER_END_NOMINAL_EPOCH);
/**
* Construct JSON object using the bulk request object and the associated tags
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/command/XCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/XCommand.java b/core/src/main/java/org/apache/oozie/command/XCommand.java
index bdf13f6..7b8f47c 100644
--- a/core/src/main/java/org/apache/oozie/command/XCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/XCommand.java
@@ -244,7 +244,10 @@ public abstract class XCommand<T> implements XCallable<T> {
@Override
public final T call() throws CommandException {
setLogInfo();
- if (CallableQueueService.INTERRUPT_TYPES.contains(this.getType()) && used.get()) {
+ CallableQueueService callableQueueService = Services.get().get(CallableQueueService.class);
+ Set<String> interruptTypes = callableQueueService.getInterruptTypes();
+
+ if (interruptTypes.contains(this.getType()) && used.get()) {
LOG.debug("Command [{0}] key [{1}] already used for [{2}]", getName(), getEntityKey(), this.toString());
return null;
}
@@ -271,7 +274,7 @@ public abstract class XCommand<T> implements XCallable<T> {
}
if (!isLockRequired() || (lock != null) || this.inInterruptMode()) {
- if (CallableQueueService.INTERRUPT_TYPES.contains(this.getType())
+ if (interruptTypes.contains(this.getType())
&& !used.compareAndSet(false, true)) {
LOG.debug("Command [{0}] key [{1}] already executed for [{2}]", getName(), getEntityKey(),
this.toString());
@@ -289,7 +292,6 @@ public abstract class XCommand<T> implements XCallable<T> {
instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".execute", executeCron);
}
if (commandQueue != null) {
- CallableQueueService callableQueueService = Services.get().get(CallableQueueService.class);
for (Map.Entry<Long, List<XCommand<?>>> entry : commandQueue.entrySet()) {
LOG.debug("Queuing [{0}] commands with delay [{1}]ms", entry.getValue().size(), entry.getKey());
if (!callableQueueService.queueSerial(entry.getValue(), entry.getKey())) {
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/command/wf/SubmitHttpXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/wf/SubmitHttpXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/SubmitHttpXCommand.java
index d2a2742..98d0f3c 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/SubmitHttpXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/SubmitHttpXCommand.java
@@ -46,6 +46,8 @@ import org.apache.oozie.client.XOozieClient;
import org.jdom.Element;
import org.jdom.Namespace;
+import com.google.common.collect.ImmutableSet;
+
import java.util.Date;
import java.util.List;
import java.util.Map;
@@ -54,17 +56,8 @@ import java.util.HashSet;
public abstract class SubmitHttpXCommand extends WorkflowXCommand<String> {
- protected static final Set<String> MANDATORY_OOZIE_CONFS = new HashSet<String>();
- protected static final Set<String> OPTIONAL_OOZIE_CONFS = new HashSet<String>();
-
- static {
- MANDATORY_OOZIE_CONFS.add(XOozieClient.JT);
- MANDATORY_OOZIE_CONFS.add(XOozieClient.NN);
- MANDATORY_OOZIE_CONFS.add(OozieClient.LIBPATH);
-
- OPTIONAL_OOZIE_CONFS.add(XOozieClient.FILES);
- OPTIONAL_OOZIE_CONFS.add(XOozieClient.ARCHIVES);
- }
+ static final Set<String> MANDATORY_OOZIE_CONFS = ImmutableSet.of(XOozieClient.RM, XOozieClient.NN, OozieClient.LIBPATH);
+ static final Set<String> OPTIONAL_OOZIE_CONFS = ImmutableSet.of(XOozieClient.FILES, XOozieClient.ARCHIVES);
private Configuration conf;
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/command/wf/SubmitMRXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/wf/SubmitMRXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/SubmitMRXCommand.java
index cc61d3d..05e7595 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/SubmitMRXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/SubmitMRXCommand.java
@@ -41,11 +41,10 @@ public class SubmitMRXCommand extends SubmitHttpXCommand {
static {
SKIPPED_CONFS.add(WorkflowAppService.HADOOP_USER);
- SKIPPED_CONFS.add(XOozieClient.JT);
+ SKIPPED_CONFS.add(XOozieClient.RM);
SKIPPED_CONFS.add(XOozieClient.NN);
DEPRECATE_MAP.put(XOozieClient.NN, XOozieClient.NN_2);
- DEPRECATE_MAP.put(XOozieClient.JT, XOozieClient.JT_2);
DEPRECATE_MAP.put(WorkflowAppService.HADOOP_USER, "mapreduce.job.user.name");
}
@@ -93,8 +92,7 @@ public class SubmitMRXCommand extends SubmitHttpXCommand {
protected Element generateSection(Configuration conf, Namespace ns) {
Element mapreduce = new Element("map-reduce", ns);
Element jt = new Element("job-tracker", ns);
- String newJTVal = conf.get(DEPRECATE_MAP.get(XOozieClient.JT));
- jt.addContent(newJTVal != null ? newJTVal : (conf.get(XOozieClient.JT)));
+ jt.addContent(conf.get(XOozieClient.RM));
mapreduce.addContent(jt);
Element nn = new Element("name-node", ns);
String newNNVal = conf.get(DEPRECATE_MAP.get(XOozieClient.NN));
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/command/wf/SubmitScriptLanguageXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/wf/SubmitScriptLanguageXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/SubmitScriptLanguageXCommand.java
index 9d41305..fab4398 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/SubmitScriptLanguageXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/SubmitScriptLanguageXCommand.java
@@ -19,7 +19,7 @@
package org.apache.oozie.command.wf;
import org.apache.hadoop.conf.Configuration;
-import org.apache.oozie.action.hadoop.MapReduceMain;
+import org.apache.oozie.action.hadoop.ActionUtils;
import org.apache.oozie.client.XOozieClient;
import org.apache.oozie.command.CommandException;
import org.jdom.Element;
@@ -50,7 +50,7 @@ public abstract class SubmitScriptLanguageXCommand extends SubmitHttpXCommand {
String name = getWorkflowName();
Element ele = new Element(name, ns);
Element jt = new Element("job-tracker", ns);
- jt.addContent(conf.get(XOozieClient.JT));
+ jt.addContent(conf.get(XOozieClient.RM));
ele.addContent(jt);
Element nn = new Element("name-node", ns);
nn.addContent(conf.get(XOozieClient.NN));
@@ -58,7 +58,7 @@ public abstract class SubmitScriptLanguageXCommand extends SubmitHttpXCommand {
List<String> Dargs = new ArrayList<String>();
List<String> otherArgs = new ArrayList<String>();
- String[] args = MapReduceMain.getStrings(conf, getOptions());
+ String[] args = ActionUtils.getStrings(conf, getOptions());
for (String arg : args) {
if (arg.startsWith("-D")) {
Dargs.add(arg);
@@ -67,7 +67,7 @@ public abstract class SubmitScriptLanguageXCommand extends SubmitHttpXCommand {
otherArgs.add(arg);
}
}
- String [] params = MapReduceMain.getStrings(conf, getScriptParamters());
+ String [] params = ActionUtils.getStrings(conf, getScriptParamters());
// configuration section
if (Dargs.size() > 0) {
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/command/wf/SubmitSqoopXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/wf/SubmitSqoopXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/SubmitSqoopXCommand.java
index 51b739e..c5574c5 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/SubmitSqoopXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/SubmitSqoopXCommand.java
@@ -22,7 +22,7 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.client.XOozieClient;
import org.apache.oozie.command.CommandException;
-import org.apache.oozie.action.hadoop.MapReduceMain;
+import org.apache.oozie.action.hadoop.ActionUtils;
import org.jdom.Namespace;
import org.jdom.Element;
@@ -50,14 +50,14 @@ public class SubmitSqoopXCommand extends SubmitHttpXCommand {
String name = "sqoop";
Element ele = new Element(name, ns);
Element jt = new Element("job-tracker", ns);
- jt.addContent(conf.get(XOozieClient.JT));
+ jt.addContent(conf.get(XOozieClient.RM));
ele.addContent(jt);
Element nn = new Element("name-node", ns);
nn.addContent(conf.get(XOozieClient.NN));
ele.addContent(nn);
List<String> Dargs = new ArrayList<String>();
- String[] args = MapReduceMain.getStrings(conf, getOptions());
+ String[] args = ActionUtils.getStrings(conf, getOptions());
for (String arg : args) {
if (arg.startsWith("-D")) {
Dargs.add(arg);
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/coord/input/dependency/AbstractCoordInputDependency.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/coord/input/dependency/AbstractCoordInputDependency.java b/core/src/main/java/org/apache/oozie/coord/input/dependency/AbstractCoordInputDependency.java
index ace120d..79355eb 100644
--- a/core/src/main/java/org/apache/oozie/coord/input/dependency/AbstractCoordInputDependency.java
+++ b/core/src/main/java/org/apache/oozie/coord/input/dependency/AbstractCoordInputDependency.java
@@ -111,15 +111,16 @@ public abstract class AbstractCoordInputDependency implements Writable, CoordInp
missingDependenciesSet = new HashMap<String, List<String>>();
availableDependenciesSet = new HashMap<String, List<String>>();
- Set<String> keySets = dependencyMap.keySet();
- for (String key : keySets) {
- for (CoordInputInstance coordInputInstance : dependencyMap.get(key))
+ for (Entry<String, List<CoordInputInstance>> entry : dependencyMap.entrySet()) {
+ String key = entry.getKey();
+ for (CoordInputInstance coordInputInstance : entry.getValue()) {
if (coordInputInstance.isAvailable()) {
addToAvailableDependencies(key, coordInputInstance);
}
else {
addToMissingDependencies(key, coordInputInstance);
}
+ }
}
}
catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/dependency/hcat/EhcacheHCatDependencyCache.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/dependency/hcat/EhcacheHCatDependencyCache.java b/core/src/main/java/org/apache/oozie/dependency/hcat/EhcacheHCatDependencyCache.java
index 6f0abf6..a8b58d5 100644
--- a/core/src/main/java/org/apache/oozie/dependency/hcat/EhcacheHCatDependencyCache.java
+++ b/core/src/main/java/org/apache/oozie/dependency/hcat/EhcacheHCatDependencyCache.java
@@ -492,13 +492,13 @@ public class EhcacheHCatDependencyCache implements HCatDependencyCache, CacheEve
@Override
public void removeNonWaitingCoordActions(Set<String> staleActions) {
- Iterator<String> serverItr = missingDepsByServer.keySet().iterator();
- while (serverItr.hasNext()) {
- String server = serverItr.next();
- Cache missingCache = missingDepsByServer.get(server);
+ for (Entry<String, Cache> entry : missingDepsByServer.entrySet()) {
+ Cache missingCache = entry.getValue();
+
if (missingCache == null) {
continue;
}
+
synchronized (missingCache) {
for (Object key : missingCache.getKeys()) {
Element element = missingCache.get(key);
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/CallableQueueService.java b/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
index a86a8d0..cfd208a 100644
--- a/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
+++ b/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
@@ -48,6 +48,8 @@ import org.apache.oozie.util.PriorityDelayQueue.QueueElement;
import org.apache.oozie.util.XCallable;
import org.apache.oozie.util.XLog;
+import com.google.common.collect.ImmutableSet;
+
/**
* The callable queue service queues {@link XCallable}s for asynchronous execution.
* <p>
@@ -95,9 +97,9 @@ public class CallableQueueService implements Service, Instrumentable {
private final Map<String, Date> uniqueCallables = new ConcurrentHashMap<String, Date>();
- private final ConcurrentHashMap<String, Set<XCallable<?>>> interruptCommandsMap = new ConcurrentHashMap<String, Set<XCallable<?>>>();
+ private final ConcurrentHashMap<String, Set<XCallable<?>>> interruptCommandsMap = new ConcurrentHashMap<>();
- public static final HashSet<String> INTERRUPT_TYPES = new HashSet<String>();
+ private Set<String> interruptTypes;
private int interruptMapMaxSize;
@@ -452,10 +454,12 @@ public class CallableQueueService implements Service, Instrumentable {
int threads = ConfigurationService.getInt(conf, CONF_THREADS);
boolean callableNextEligible = ConfigurationService.getBoolean(conf, CONF_CALLABLE_NEXT_ELIGIBLE);
+ interruptTypes = new HashSet<>();
for (String type : ConfigurationService.getStrings(conf, CONF_CALLABLE_INTERRUPT_TYPES)) {
log.debug("Adding interrupt type [{0}]", type);
- INTERRUPT_TYPES.add(type);
+ interruptTypes.add(type);
}
+ interruptTypes = ImmutableSet.copyOf(interruptTypes);
if (!callableNextEligible) {
queue = new PriorityDelayQueue<CallableWrapper>(3, 1000 * 30, TimeUnit.MILLISECONDS, queueSize) {
@@ -720,12 +724,12 @@ public class CallableQueueService implements Service, Instrumentable {
public void checkInterruptTypes(XCallable<?> callable) {
if ((callable instanceof CompositeCallable) && (((CompositeCallable) callable).getCallables() != null)) {
for (XCallable<?> singleCallable : ((CompositeCallable) callable).getCallables()) {
- if (INTERRUPT_TYPES.contains(singleCallable.getType())) {
+ if (interruptTypes.contains(singleCallable.getType())) {
insertCallableIntoInterruptMap(singleCallable);
}
}
}
- else if (INTERRUPT_TYPES.contains(callable.getType())) {
+ else if (interruptTypes.contains(callable.getType())) {
insertCallableIntoInterruptMap(callable);
}
}
@@ -791,4 +795,8 @@ public class CallableQueueService implements Service, Instrumentable {
return executor.invokeAll(tasks);
}
+ public Set<String> getInterruptTypes() {
+ return interruptTypes;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/service/EventHandlerService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/EventHandlerService.java b/core/src/main/java/org/apache/oozie/service/EventHandlerService.java
index 22c6fb0..a68f94f 100644
--- a/core/src/main/java/org/apache/oozie/service/EventHandlerService.java
+++ b/core/src/main/java/org/apache/oozie/service/EventHandlerService.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
@@ -170,14 +171,16 @@ public class EventHandlerService implements Service {
@Override
public void destroy() {
eventsEnabled = false;
- for (MessageType type : listenerMap.keySet()) {
- Iterator<?> iter = listenerMap.get(type).iterator();
- while (iter.hasNext()) {
+
+ for (Entry<MessageType, List<?>> entry : listenerMap.entrySet()) {
+ List<?> listeners = entry.getValue();
+ MessageType type = entry.getKey();
+
+ for (Object listener : listeners) {
if (type == MessageType.JOB) {
- ((JobEventListener) iter.next()).destroy();
- }
- else if (type == MessageType.SLA) {
- ((SLAEventListener) iter.next()).destroy();
+ ((JobEventListener) listener).destroy();
+ } else if (type == MessageType.SLA) {
+ ((SLAEventListener) listener).destroy();
}
}
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java b/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java
index 23a9d92..9624104 100644
--- a/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java
+++ b/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java
@@ -18,17 +18,22 @@
package org.apache.oozie.service;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.action.hadoop.JavaActionExecutor;
import org.apache.oozie.util.IOUtils;
@@ -43,7 +48,7 @@ import java.io.FileInputStream;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.InputStream;
-import java.lang.reflect.InvocationTargetException;
+import java.io.OutputStream;
import java.lang.reflect.Method;
import java.net.InetAddress;
import java.net.URI;
@@ -79,19 +84,16 @@ public class HadoopAccessorService implements Service {
public static final String KERBEROS_PRINCIPAL = CONF_PREFIX + "kerberos.principal";
public static final Text MR_TOKEN_ALIAS = new Text("oozie mr token");
- protected static final String OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED = "oozie.HadoopAccessorService.created";
/** The Kerberos principal for the job tracker.*/
protected static final String JT_PRINCIPAL = "mapreduce.jobtracker.kerberos.principal";
/** The Kerberos principal for the resource manager.*/
protected static final String RM_PRINCIPAL = "yarn.resourcemanager.principal";
- protected static final String HADOOP_JOB_TRACKER = "mapred.job.tracker";
- protected static final String HADOOP_JOB_TRACKER_2 = "mapreduce.jobtracker.address";
protected static final String HADOOP_YARN_RM = "yarn.resourcemanager.address";
- private static final Map<String, Text> mrTokenRenewers = new HashMap<String, Text>();
-
- private static Configuration cachedConf;
+ private static final String OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED = "oozie.HadoopAccessorService.created";
+ private static final Map<String, Text> mrTokenRenewers = new HashMap<String, Text>();
private static final String DEFAULT_ACTIONNAME = "default";
+ private static Configuration cachedConf;
private Set<String> jobTrackerWhitelist = new HashSet<String>();
private Set<String> nameNodeWhitelist = new HashSet<String>();
@@ -406,18 +408,20 @@ public class HadoopAccessorService implements Service {
public boolean accept(File dir, String name) {
return ActionConfFileType.isSupportedFileType(name);
}});
- Arrays.sort(actionConfFiles, new Comparator<File>() {
- @Override
- public int compare(File o1, File o2) {
- return o1.getName().compareTo(o2.getName());
- }
- });
- for (File f : actionConfFiles) {
- if (f.isFile() && f.canRead()) {
- updateActionConfigWithFile(actionConf, f);
+
+ if (actionConfFiles != null) {
+ Arrays.sort(actionConfFiles, new Comparator<File>() {
+ @Override
+ public int compare(File o1, File o2) {
+ return o1.getName().compareTo(o2.getName());
+ }
+ });
+ for (File f : actionConfFiles) {
+ if (f.isFile() && f.canRead()) {
+ updateActionConfigWithFile(actionConf, f);
+ }
}
}
-
}
private Configuration readActionConfFile(File file) throws IOException {
@@ -505,7 +509,7 @@ public class HadoopAccessorService implements Service {
if (!conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) {
throw new HadoopAccessorException(ErrorCode.E0903);
}
- String jobTracker = conf.get(JavaActionExecutor.HADOOP_JOB_TRACKER);
+ String jobTracker = conf.get(JavaActionExecutor.HADOOP_YARN_RM);
validateJobTracker(jobTracker);
try {
UserGroupInformation ugi = getUGI(user);
@@ -516,39 +520,60 @@ public class HadoopAccessorService implements Service {
});
return jobClient;
}
- catch (InterruptedException ex) {
- throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex);
- }
- catch (IOException ex) {
+ catch (IOException | InterruptedException ex) {
throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex);
}
}
/**
- * Get the RM delegation token using jobClient and add it to conf
+ * Return a JobClient created with the provided user/group.
*
- * @param jobClient
- * @param conf
- * @throws HadoopAccessorException
+ *
+ * @param conf Configuration with all necessary information to create the
+ * JobClient.
+ * @return JobClient created with the provided user/group.
+ * @throws HadoopAccessorException if the client could not be created.
*/
- public void addRMDelegationToken(JobClient jobClient, JobConf conf) throws HadoopAccessorException {
- Token<DelegationTokenIdentifier> mrdt;
- try {
- mrdt = jobClient.getDelegationToken(getMRDelegationTokenRenewer(conf));
- }
- catch (IOException e) {
- throw new HadoopAccessorException(ErrorCode.E0902, e.getMessage(), e);
+ public JobClient createJobClient(String user, Configuration conf) throws HadoopAccessorException {
+ return createJobClient(user, new JobConf(conf));
+ }
+
+ /**
+ * Return a YarnClient created with the provided user and configuration. The caller is responsible for closing it when done.
+ *
+ * @param user The username to impersonate
+ * @param conf The conf
+ * @return a YarnClient with the provided user and configuration
+ * @throws HadoopAccessorException if the client could not be created.
+ */
+ public YarnClient createYarnClient(String user, final Configuration conf) throws HadoopAccessorException {
+ ParamChecker.notEmpty(user, "user");
+ if (!conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) {
+ throw new HadoopAccessorException(ErrorCode.E0903);
}
- catch (InterruptedException e) {
- throw new HadoopAccessorException(ErrorCode.E0902, e.getMessage(), e);
+ String rm = conf.get(JavaActionExecutor.HADOOP_YARN_RM);
+ validateJobTracker(rm);
+ try {
+ UserGroupInformation ugi = getUGI(user);
+ YarnClient yarnClient = ugi.doAs(new PrivilegedExceptionAction<YarnClient>() {
+ @Override
+ public YarnClient run() throws Exception {
+ YarnClient yarnClient = YarnClient.createYarnClient();
+ yarnClient.init(conf);
+ yarnClient.start();
+ return yarnClient;
+ }
+ });
+ return yarnClient;
+ } catch (IOException | InterruptedException ex) {
+ throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex);
}
- conf.getCredentials().addToken(MR_TOKEN_ALIAS, mrdt);
}
/**
* Return a FileSystem created with the provided user for the specified URI.
*
- *
+ * @param user The username to impersonate
* @param uri file system URI.
* @param conf Configuration with all necessary information to create the FileSystem.
* @return FileSystem created with the provided user/group.
@@ -556,8 +581,14 @@ public class HadoopAccessorService implements Service {
*/
public FileSystem createFileSystem(String user, final URI uri, final Configuration conf)
throws HadoopAccessorException {
+ return createFileSystem(user, uri, conf, true);
+ }
+
+ private FileSystem createFileSystem(String user, final URI uri, final Configuration conf, boolean checkAccessorProperty)
+ throws HadoopAccessorException {
ParamChecker.notEmpty(user, "user");
- if (!conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) {
+
+ if (checkAccessorProperty && !conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) {
throw new HadoopAccessorException(ErrorCode.E0903);
}
@@ -585,10 +616,7 @@ public class HadoopAccessorService implements Service {
}
});
}
- catch (InterruptedException ex) {
- throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex);
- }
- catch (IOException ex) {
+ catch (IOException | InterruptedException ex) {
throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex);
}
}
@@ -639,10 +667,7 @@ public class HadoopAccessorService implements Service {
renewer = mrTokenRenewers.get(servicePrincipal);
if (renewer == null) {
// Mimic org.apache.hadoop.mapred.Master.getMasterPrincipal()
- String target = jobConf.get(HADOOP_YARN_RM, jobConf.get(HADOOP_JOB_TRACKER_2));
- if (target == null) {
- target = jobConf.get(HADOOP_JOB_TRACKER);
- }
+ String target = jobConf.get(HADOOP_YARN_RM);
try {
String addr = NetUtils.createSocketAddr(target).getHostName();
renewer = new Text(SecurityUtil.getServerPrincipal(servicePrincipal, addr));
@@ -705,4 +730,48 @@ public class HadoopAccessorService implements Service {
return supportedSchemes;
}
-}
+ /**
+ * Creates a {@link LocalResource} for the Configuration to localize it for a Yarn Container. This involves also writing it
+ * to HDFS.
+ * Example usage:
+ * * <pre>
+ * {@code
+ * LocalResource res1 = createLocalResourceForConfigurationFile(filename1, user, conf, uri, dir);
+ * LocalResource res2 = createLocalResourceForConfigurationFile(filename2, user, conf, uri, dir);
+ * ...
+ * Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+ * localResources.put(filename1, res1);
+ * localResources.put(filename2, res2);
+ * ...
+ * containerLaunchContext.setLocalResources(localResources);
+ * }
+ * </pre>
+ *
+ * @param filename The filename to use on the remote filesystem and once it has been localized.
+ * @param user The user
+ * @param conf The configuration to process
+ * @param uri The URI of the remote filesystem (e.g. HDFS)
+ * @param dir The directory on the remote filesystem to write the file to
+ * @return
+ * @throws IOException A problem occurred writing the file
+ * @throws HadoopAccessorException A problem occured with Hadoop
+ * @throws URISyntaxException A problem occurred parsing the URI
+ */
+ public LocalResource createLocalResourceForConfigurationFile(String filename, String user, Configuration conf, URI uri,
+ Path dir)
+ throws IOException, HadoopAccessorException, URISyntaxException {
+ Path dst = new Path(dir, filename);
+ FileSystem fs = createFileSystem(user, uri, conf, false);
+ try (OutputStream os = fs.create(dst)){
+ conf.writeXml(os);
+ }
+ LocalResource localResource = Records.newRecord(LocalResource.class);
+ localResource.setType(LocalResourceType.FILE); localResource.setVisibility(LocalResourceVisibility.APPLICATION);
+ localResource.setResource(ConverterUtils.getYarnUrlFromPath(dst));
+ FileStatus destStatus = fs.getFileStatus(dst);
+ localResource.setTimestamp(destStatus.getModificationTime());
+ localResource.setSize(destStatus.getLen());
+ return localResource;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/service/Services.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/Services.java b/core/src/main/java/org/apache/oozie/service/Services.java
index 829d5f5..7f47f88 100644
--- a/core/src/main/java/org/apache/oozie/service/Services.java
+++ b/core/src/main/java/org/apache/oozie/service/Services.java
@@ -204,7 +204,6 @@ public class Services {
*
* @throws ServiceException thrown if any of the services could not initialize.
*/
- @SuppressWarnings("unchecked")
public void init() throws ServiceException {
XLog log = new XLog(LogFactory.getLog(getClass()));
log.trace("Initializing");
@@ -255,9 +254,9 @@ public class Services {
* configuration.
* @throws ServiceException thrown if a service class could not be loaded.
*/
- private void loadServices(Class[] classes, List<Service> list) throws ServiceException {
+ private void loadServices(Class<?>[] classes, List<Service> list) throws ServiceException {
XLog log = new XLog(LogFactory.getLog(getClass()));
- for (Class klass : classes) {
+ for (Class<?> klass : classes) {
try {
Service service = (Service) klass.newInstance();
log.debug("Loading service [{0}] implementation [{1}]", service.getInterface(),
@@ -284,10 +283,10 @@ public class Services {
private void loadServices() throws ServiceException {
XLog log = new XLog(LogFactory.getLog(getClass()));
try {
- Map<Class, Service> map = new LinkedHashMap<Class, Service>();
- Class[] classes = ConfigurationService.getClasses(conf, CONF_SERVICE_CLASSES);
+ Map<Class<?>, Service> map = new LinkedHashMap<Class<?>, Service>();
+ Class<?>[] classes = ConfigurationService.getClasses(conf, CONF_SERVICE_CLASSES);
log.debug("Services list obtained from property '" + CONF_SERVICE_CLASSES + "'");
- Class[] classesExt = ConfigurationService.getClasses(conf, CONF_SERVICE_EXT_CLASSES);
+ Class<?>[] classesExt = ConfigurationService.getClasses(conf, CONF_SERVICE_EXT_CLASSES);
log.debug("Services list obtained from property '" + CONF_SERVICE_EXT_CLASSES + "'");
List<Service> list = new ArrayList<Service>();
loadServices(classes, list);
@@ -301,11 +300,12 @@ public class Services {
}
map.put(service.getInterface(), service);
}
- for (Map.Entry<Class, Service> entry : map.entrySet()) {
+ for (Map.Entry<Class<?>, Service> entry : map.entrySet()) {
setService(entry.getValue().getClass());
}
} catch (RuntimeException rex) {
- log.fatal("Runtime Exception during Services Load. Check your list of '" + CONF_SERVICE_CLASSES + "' or '" + CONF_SERVICE_EXT_CLASSES + "'");
+ log.fatal("Runtime Exception during Services Load. Check your list of '{0}' or '{1}'",
+ CONF_SERVICE_CLASSES, CONF_SERVICE_EXT_CLASSES, rex);
throw new ServiceException(ErrorCode.E0103, rex.getMessage(), rex);
}
}