You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by ge...@apache.org on 2017/05/26 09:27:54 UTC
[02/10] oozie git commit: OOZIE-1770 Create Oozie Application Master
for YARN (asasvari, pbacsko, rkanter, gezapeti)
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifierFactory.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifierFactory.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifierFactory.java
new file mode 100644
index 0000000..688424b
--- /dev/null
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifierFactory.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+
+public class LauncherAMCallbackNotifierFactory {
+
+ public LauncherAMCallbackNotifier createCallbackNotifier(Configuration conf) {
+ return new LauncherAMCallbackNotifier(conf);
+ }
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java
index 9a411ac..0236e1b 100644
--- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java
@@ -30,7 +30,10 @@ import java.io.StringWriter;
import java.net.URL;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.LinkedHashSet;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
@@ -45,7 +48,15 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
public abstract class LauncherMain {
@@ -56,6 +67,10 @@ public abstract class LauncherMain {
public static final String OUTPUT_PROPERTIES = ACTION_PREFIX + "output.properties";
public static final String HADOOP_JOBS = "hadoopJobs";
public static final String MAPREDUCE_JOB_TAGS = "mapreduce.job.tags";
+
+ public static final String CHILD_MAPREDUCE_JOB_TAGS = "oozie.child.mapreduce.job.tags";
+ public static final String OOZIE_JOB_LAUNCH_TIME = "oozie.job.launch.time";
+
public static final String TEZ_APPLICATION_TAGS = "tez.application.tags";
public static final String SPARK_YARN_TAGS = "spark.yarn.tags";
protected static String[] HADOOP_SITE_FILES = new String[]
@@ -170,6 +185,81 @@ public abstract class LauncherMain {
}
}
+ public static Set<ApplicationId> getChildYarnJobs(Configuration actionConf) {
+ return getChildYarnJobs(actionConf, ApplicationsRequestScope.OWN);
+ }
+
+ public static Set<ApplicationId> getChildYarnJobs(Configuration actionConf, ApplicationsRequestScope scope,
+ long startTime) {
+ Set<ApplicationId> childYarnJobs = new HashSet<ApplicationId>();
+ String tag = actionConf.get(CHILD_MAPREDUCE_JOB_TAGS);
+ if (tag == null) {
+ System.out.print("Could not find Yarn tags property " + CHILD_MAPREDUCE_JOB_TAGS);
+ return childYarnJobs;
+ }
+ System.out.println("tag id : " + tag);
+ GetApplicationsRequest gar = GetApplicationsRequest.newInstance();
+ gar.setScope(scope);
+ gar.setApplicationTags(Collections.singleton(tag));
+ long endTime = System.currentTimeMillis();
+ if (startTime > endTime) {
+ System.out.println("WARNING: Clock skew between the Oozie server host and this host detected. Please fix this. " +
+ "Attempting to work around...");
+ // We don't know which one is wrong (relative to the RM), so to be safe, let's assume they're both wrong and add an
+ // offset in both directions
+ long diff = 2 * (startTime - endTime);
+ startTime = startTime - diff;
+ endTime = endTime + diff;
+ }
+ gar.setStartRange(startTime, endTime);
+ try {
+ ApplicationClientProtocol proxy = ClientRMProxy.createRMProxy(actionConf, ApplicationClientProtocol.class);
+ GetApplicationsResponse apps = proxy.getApplications(gar);
+ List<ApplicationReport> appsList = apps.getApplicationList();
+ for(ApplicationReport appReport : appsList) {
+ childYarnJobs.add(appReport.getApplicationId());
+ }
+ } catch (YarnException | IOException ioe) {
+ throw new RuntimeException("Exception occurred while finding child jobs", ioe);
+ }
+
+ System.out.println("Child yarn jobs are found - " + StringUtils.join(childYarnJobs, ","));
+ return childYarnJobs;
+ }
+ public static Set<ApplicationId> getChildYarnJobs(Configuration actionConf, ApplicationsRequestScope scope) {
+ System.out.println("Fetching child yarn jobs");
+
+ long startTime = 0L;
+ try {
+ startTime = Long.parseLong(System.getProperty(OOZIE_JOB_LAUNCH_TIME));
+ } catch(NumberFormatException nfe) {
+ throw new RuntimeException("Could not find Oozie job launch time", nfe);
+ }
+ return getChildYarnJobs(actionConf, scope, startTime);
+ }
+
+ public static void killChildYarnJobs(Configuration actionConf) {
+ try {
+ Set<ApplicationId> childYarnJobs = getChildYarnJobs(actionConf);
+ if (!childYarnJobs.isEmpty()) {
+ System.out.println();
+ System.out.println("Found [" + childYarnJobs.size() + "] Map-Reduce jobs from this launcher");
+ System.out.println("Killing existing jobs and starting over:");
+ YarnClient yarnClient = YarnClient.createYarnClient();
+ yarnClient.init(actionConf);
+ yarnClient.start();
+ for (ApplicationId app : childYarnJobs) {
+ System.out.print("Killing job [" + app + "] ... ");
+ yarnClient.killApplication(app);
+ System.out.println("Done");
+ }
+ System.out.println();
+ }
+ } catch (IOException | YarnException ye) {
+ throw new RuntimeException("Exception occurred while killing child job(s)", ye);
+ }
+ }
+
protected abstract void run(String[] args) throws Exception;
/**
@@ -181,12 +271,13 @@ public abstract class LauncherMain {
* @param conf Configuration/Properties object to dump to STDOUT
* @throws IOException thrown if an IO error ocurred.
*/
- @SuppressWarnings("unchecked")
- protected static void logMasking(String header, Collection<String> maskSet, Iterable conf) throws IOException {
+
+ protected static void logMasking(String header, Collection<String> maskSet, Iterable<Map.Entry<String,String>> conf)
+ throws IOException {
StringWriter writer = new StringWriter();
writer.write(header + "\n");
writer.write("--------------------\n");
- for (Map.Entry entry : (Iterable<Map.Entry>) conf) {
+ for (Map.Entry<String, String> entry : conf) {
String name = (String) entry.getKey();
String value = (String) entry.getValue();
for (String mask : maskSet) {
@@ -221,30 +312,6 @@ public abstract class LauncherMain {
}
/**
- * Will run the user specified OozieActionConfigurator subclass (if one is provided) to update the action configuration.
- *
- * @param actionConf The action configuration to update
- * @throws OozieActionConfiguratorException
- */
- protected static void runConfigClass(JobConf actionConf) throws OozieActionConfiguratorException {
- String configClass = System.getProperty(LauncherMapper.OOZIE_ACTION_CONFIG_CLASS);
- if (configClass != null) {
- try {
- Class<?> klass = Class.forName(configClass);
- Class<? extends OozieActionConfigurator> actionConfiguratorKlass = klass.asSubclass(OozieActionConfigurator.class);
- OozieActionConfigurator actionConfigurator = actionConfiguratorKlass.newInstance();
- actionConfigurator.configure(actionConf);
- } catch (ClassNotFoundException e) {
- throw new OozieActionConfiguratorException("An Exception occurred while instantiating the action config class", e);
- } catch (InstantiationException e) {
- throw new OozieActionConfiguratorException("An Exception occurred while instantiating the action config class", e);
- } catch (IllegalAccessException e) {
- throw new OozieActionConfiguratorException("An Exception occurred while instantiating the action config class", e);
- }
- }
- }
-
- /**
* Read action configuration passes through action xml file.
*
* @return action Configuration
@@ -268,13 +335,13 @@ public abstract class LauncherMain {
}
protected static void setYarnTag(Configuration actionConf) {
- if(actionConf.get(LauncherMainHadoopUtils.CHILD_MAPREDUCE_JOB_TAGS) != null) {
+ if(actionConf.get(CHILD_MAPREDUCE_JOB_TAGS) != null) {
// in case the user set their own tags, appending the launcher tag.
if(actionConf.get(MAPREDUCE_JOB_TAGS) != null) {
actionConf.set(MAPREDUCE_JOB_TAGS, actionConf.get(MAPREDUCE_JOB_TAGS) + ","
- + actionConf.get(LauncherMainHadoopUtils.CHILD_MAPREDUCE_JOB_TAGS));
+ + actionConf.get(CHILD_MAPREDUCE_JOB_TAGS));
} else {
- actionConf.set(MAPREDUCE_JOB_TAGS, actionConf.get(LauncherMainHadoopUtils.CHILD_MAPREDUCE_JOB_TAGS));
+ actionConf.set(MAPREDUCE_JOB_TAGS, actionConf.get(CHILD_MAPREDUCE_JOB_TAGS));
}
}
}
@@ -331,6 +398,27 @@ public abstract class LauncherMain {
}
copyFileMultiplex(actionXmlFile, dstFiles);
}
+ /**
+ * Print arguments to standard output stream. Mask out argument values to option with name 'password' in them.
+ * @param banner source banner
+ * @param args arguments to be printed
+ */
+ void printArgs(String banner, String[] args) {
+ System.out.println(banner);
+ boolean maskNextArg = false;
+ for (String arg : args) {
+ if (maskNextArg) {
+ System.out.println(" " + "********");
+ maskNextArg = false;
+ }
+ else {
+ System.out.println(" " + arg);
+ if (arg.toLowerCase().contains("password")) {
+ maskNextArg = true;
+ }
+ }
+ }
+ }
}
class LauncherMainException extends Exception {
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java
index 8657c67..912eba2 100644
--- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java
@@ -34,8 +34,8 @@ import java.security.Permission;
import java.text.MessageFormat;
import java.util.HashMap;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Properties;
-import java.util.Set;
import java.util.StringTokenizer;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -49,9 +49,12 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
+import org.xml.sax.SAXException;
import com.google.common.base.Strings;
+import javax.xml.parsers.ParserConfigurationException;
+// TODO: OYA: Delete :)
public class LauncherMapper<K1, V1, K2, V2> implements Mapper<K1, V1, K2, V2>, Runnable {
static final String CONF_OOZIE_ACTION_MAIN_CLASS = "oozie.launcher.action.main.class";
@@ -238,7 +241,7 @@ public class LauncherMapper<K1, V1, K2, V2> implements Mapper<K1, V1, K2, V2>, R
// Get what actually caused the exception
Throwable cause = ex.getCause();
// If we got a JavaMainException from JavaMain, then we need to unwrap it
- if (JavaMainException.class.isInstance(cause)) {
+ if (JavaMain.JavaMainException.class.isInstance(cause)) {
cause = cause.getCause();
}
if (LauncherMainException.class.isInstance(cause)) {
@@ -348,9 +351,9 @@ public class LauncherMapper<K1, V1, K2, V2> implements Mapper<K1, V1, K2, V2>, R
// loading action conf prepared by Oozie
Configuration actionConf = LauncherMain.loadActionConf();
- if(actionConf.get(LauncherMainHadoopUtils.CHILD_MAPREDUCE_JOB_TAGS) != null) {
+ if(actionConf.get(LauncherMain.CHILD_MAPREDUCE_JOB_TAGS) != null) {
propagationConf.set(LauncherMain.MAPREDUCE_JOB_TAGS,
- actionConf.get(LauncherMainHadoopUtils.CHILD_MAPREDUCE_JOB_TAGS));
+ actionConf.get(LauncherMain.CHILD_MAPREDUCE_JOB_TAGS));
}
propagationConf.writeXml(new FileWriter(PROPAGATION_CONF_XML));
@@ -432,9 +435,8 @@ public class LauncherMapper<K1, V1, K2, V2> implements Mapper<K1, V1, K2, V2>, R
try {
wr = SequenceFile.createWriter(fs, getJobConf(), finalPath, Text.class, Text.class);
if (wr != null) {
- Set<String> keys = actionData.keySet();
- for (String propsKey : keys) {
- wr.append(new Text(propsKey), new Text(actionData.get(propsKey)));
+ for (Entry<String, String> entry : actionData.entrySet()) {
+ wr.append(new Text(entry.getKey()), new Text(entry.getValue()));
}
}
else {
@@ -469,9 +471,9 @@ public class LauncherMapper<K1, V1, K2, V2> implements Mapper<K1, V1, K2, V2>, R
System.setProperty(ACTION_PREFIX + ACTION_DATA_NEW_ID, new File(ACTION_DATA_NEW_ID).getAbsolutePath());
System.setProperty(ACTION_PREFIX + ACTION_DATA_OUTPUT_PROPS, new File(ACTION_DATA_OUTPUT_PROPS).getAbsolutePath());
System.setProperty(ACTION_PREFIX + ACTION_DATA_ERROR_PROPS, new File(ACTION_DATA_ERROR_PROPS).getAbsolutePath());
- if (getJobConf().get(LauncherMainHadoopUtils.OOZIE_JOB_LAUNCH_TIME) != null) {
- System.setProperty(LauncherMainHadoopUtils.OOZIE_JOB_LAUNCH_TIME,
- getJobConf().get(LauncherMainHadoopUtils.OOZIE_JOB_LAUNCH_TIME));
+ if (getJobConf().get(LauncherMain.OOZIE_JOB_LAUNCH_TIME) != null) {
+ System.setProperty(LauncherMain.OOZIE_JOB_LAUNCH_TIME,
+ getJobConf().get(LauncherMain.OOZIE_JOB_LAUNCH_TIME));
}
String actionConfigClass = getJobConf().get(OOZIE_ACTION_CONFIG_CLASS);
@@ -481,7 +483,7 @@ public class LauncherMapper<K1, V1, K2, V2> implements Mapper<K1, V1, K2, V2>, R
}
// Method to execute the prepare actions
- private void executePrepare() throws IOException, LauncherException {
+ private void executePrepare() throws IOException, LauncherException, ParserConfigurationException, SAXException {
String prepareXML = getJobConf().get(ACTION_PREPARE_XML);
if (prepareXML != null) {
if (!prepareXML.equals("")) {
@@ -601,20 +603,26 @@ public class LauncherMapper<K1, V1, K2, V2> implements Mapper<K1, V1, K2, V2>, R
System.out.println("======================");
File[] listOfFiles = folder.listFiles();
- for (File fileName : listOfFiles) {
- if (fileName.isFile()) {
- System.out.println("File: " + fileName.getName());
- }
- else if (fileName.isDirectory()) {
- System.out.println("Dir: " + fileName.getName());
- File subDir = new File(fileName.getName());
- File[] moreFiles = subDir.listFiles();
- for (File subFileName : moreFiles) {
- if (subFileName.isFile()) {
- System.out.println(" File: " + subFileName.getName());
- }
- else if (subFileName.isDirectory()) {
- System.out.println(" Dir: " + subFileName.getName());
+
+ if (listOfFiles != null) {
+ for (File fileName : listOfFiles) {
+ if (fileName.isFile()) {
+ System.out.println("File: " + fileName.getName());
+ }
+ else if (fileName.isDirectory()) {
+ System.out.println("Dir: " + fileName.getName());
+ File subDir = new File(fileName.getName());
+ File[] moreFiles = subDir.listFiles();
+
+ if (moreFiles != null) {
+ for (File subFileName : moreFiles) {
+ if (subFileName.isFile()) {
+ System.out.println(" File: " + subFileName.getName());
+ }
+ else if (subFileName.isDirectory()) {
+ System.out.println(" Dir: " + subFileName.getName());
+ }
+ }
}
}
}
@@ -709,12 +717,3 @@ class LauncherSecurityManager extends SecurityManager {
}
}
-/**
- * Used by JavaMain to wrap a Throwable when an Exception occurs
- */
-@SuppressWarnings("serial")
-class JavaMainException extends Exception {
- public JavaMainException(Throwable t) {
- super(t);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LocalFsOperations.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LocalFsOperations.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LocalFsOperations.java
new file mode 100644
index 0000000..011ce93
--- /dev/null
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LocalFsOperations.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.FileVisitOption;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.EnumSet;
+
+import org.apache.hadoop.conf.Configuration;
+
+public class LocalFsOperations {
+ private static final int WALK_DEPTH = 2;
+
+ /**
+ * Reads the launcher configuration "launcher.xml"
+ * @return Configuration object
+ */
+ public Configuration readLauncherConf() {
+ File confFile = new File(LauncherAM.LAUNCHER_JOB_CONF_XML);
+ Configuration conf = new Configuration(false);
+ conf.addResource(new org.apache.hadoop.fs.Path(confFile.getAbsolutePath()));
+ return conf;
+ }
+
+ /**
+ * Print files and directories in current directory. Will list files in the sub-directory (only 2 level deep)
+ * @throws IOException
+ */
+ public void printContentsOfDir(File folder) throws IOException {
+ System.out.println();
+ System.out.println("Files in current dir:" + folder.getAbsolutePath());
+ System.out.println("======================");
+
+ final Path root = folder.toPath();
+ Files.walkFileTree(root, EnumSet.of(FileVisitOption.FOLLOW_LINKS), WALK_DEPTH, new SimpleFileVisitor<Path>() {
+ @Override
+ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
+ if (attrs.isRegularFile()) {
+ System.out.println(" File: " + root.relativize(file));
+ } else if (attrs.isDirectory()) {
+ System.out.println(" Dir: " + root.relativize(file) + "/");
+ }
+
+ return FileVisitResult.CONTINUE;
+ }
+ });
+ }
+
+ /**
+ * Returns the contents of a file as string.
+ *
+ * @param file the File object which represents the file to be read
+ * @param type Type of the file
+ * @param maxLen Maximum allowed length
+ * @return The file contents as string
+ * @throws IOException if the file is bigger than maxLen or there is any I/O error
+ * @throws FileNotFoundException if the file does not exist
+ */
+ public String getLocalFileContentAsString(File file, String type, int maxLen) throws IOException {
+ if (file.exists()) {
+ if (maxLen > -1 && file.length() > maxLen) {
+ throw new IOException(type + " data exceeds its limit [" + maxLen + "]");
+ }
+
+ return com.google.common.io.Files.toString(file, StandardCharsets.UTF_8);
+ } else {
+ throw new FileNotFoundException("File not found: " + file.toPath().toAbsolutePath());
+ }
+ }
+
+ /**
+ * Checks if a given File exists or not. This method helps writing unit tests.
+ */
+ public boolean fileExists(File file) {
+ return file.exists();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/MapReduceMain.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/MapReduceMain.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/MapReduceMain.java
index d376057..e0974e8 100644
--- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/MapReduceMain.java
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/MapReduceMain.java
@@ -52,7 +52,7 @@ public class MapReduceMain extends LauncherMain {
JobConf jobConf = new JobConf();
addActionConf(jobConf, actionConf);
- LauncherMainHadoopUtils.killChildYarnJobs(jobConf);
+ LauncherMain.killChildYarnJobs(jobConf);
// Run a config class if given to update the job conf
runConfigClass(jobConf);
@@ -132,31 +132,27 @@ public class MapReduceMain extends LauncherMain {
return runJob;
}
- @SuppressWarnings("unchecked")
protected JobClient createJobClient(JobConf jobConf) throws IOException {
return new JobClient(jobConf);
}
- // allows any character in the value, the conf.setStrings() does not allow
- // commas
- public static void setStrings(Configuration conf, String key, String[] values) {
- if (values != null) {
- conf.setInt(key + ".size", values.length);
- for (int i = 0; i < values.length; i++) {
- conf.set(key + "." + i, values[i]);
- }
- }
- }
-
- public static String[] getStrings(Configuration conf, String key) {
- String[] values = new String[conf.getInt(key + ".size", 0)];
- for (int i = 0; i < values.length; i++) {
- values[i] = conf.get(key + "." + i);
- if (values[i] == null) {
- values[i] = "";
+ /**
+ * Will run the user specified OozieActionConfigurator subclass (if one is provided) to update the action configuration.
+ *
+ * @param actionConf The action configuration to update
+ * @throws OozieActionConfiguratorException
+ */
+ private static void runConfigClass(JobConf actionConf) throws OozieActionConfiguratorException {
+ String configClass = actionConf.get(LauncherMapper.OOZIE_ACTION_CONFIG_CLASS);
+ if (configClass != null) {
+ try {
+ Class<?> klass = Class.forName(configClass);
+ Class<? extends OozieActionConfigurator> actionConfiguratorKlass = klass.asSubclass(OozieActionConfigurator.class);
+ OozieActionConfigurator actionConfigurator = actionConfiguratorKlass.newInstance();
+ actionConfigurator.configure(actionConf);
+ } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
+ throw new OozieActionConfiguratorException("An Exception occurred while instantiating the action config class", e);
}
}
- return values;
}
-
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java
index 21ae456..cb5b1ac 100644
--- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java
@@ -38,7 +38,9 @@ import javax.xml.parsers.ParserConfigurationException;
* Utility class to perform operations on the prepare block of Workflow
*
*/
+@Deprecated
public class PrepareActionsDriver {
+ private static final PrepareActionsHandler prepareHandler = new PrepareActionsHandler();
/**
* Method to parse the prepare XML and execute the corresponding prepare actions
@@ -46,52 +48,9 @@ public class PrepareActionsDriver {
* @param prepareXML Prepare XML block in string format
* @throws LauncherException
*/
- static void doOperations(String prepareXML, Configuration conf) throws LauncherException {
- try {
- Document doc = getDocumentFromXML(prepareXML);
- doc.getDocumentElement().normalize();
-
- // Get the list of child nodes, basically, each one corresponding to a separate action
- NodeList nl = doc.getDocumentElement().getChildNodes();
- LauncherURIHandlerFactory factory = new LauncherURIHandlerFactory(conf);
-
- for (int i = 0; i < nl.getLength(); ++i) {
- Node n = nl.item(i);
- String operation = n.getNodeName();
- if (n.getAttributes() == null || n.getAttributes().getNamedItem("path") == null) {
- continue;
- }
- String pathStr = n.getAttributes().getNamedItem("path").getNodeValue().trim();
- // use Path to avoid URIsyntax error caused by square bracket in glob
- URI uri = new Path(pathStr).toUri();
- LauncherURIHandler handler = factory.getURIHandler(uri);
- execute(operation, uri, handler, conf);
- }
- } catch (IOException ioe) {
- throw new LauncherException(ioe.getMessage(), ioe);
- } catch (SAXException saxe) {
- throw new LauncherException(saxe.getMessage(), saxe);
- } catch (ParserConfigurationException pce) {
- throw new LauncherException(pce.getMessage(), pce);
- } catch (IllegalArgumentException use) {
- throw new LauncherException(use.getMessage(), use);
- }
- }
-
- /**
- * Method to execute the prepare actions based on the command
- *
- * @param n Child node of the prepare XML
- * @throws LauncherException
- */
- private static void execute(String operation, URI uri, LauncherURIHandler handler, Configuration conf)
- throws LauncherException {
- if (operation.equals("delete")) {
- handler.delete(uri, conf);
- }
- else if (operation.equals("mkdir")) {
- handler.create(uri, conf);
- }
+ static void doOperations(String prepareXML, Configuration conf)
+ throws IOException, SAXException, ParserConfigurationException, LauncherException {
+ prepareHandler.prepareAction(prepareXML, conf);
}
// Method to return the document from the prepare XML block
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsHandler.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsHandler.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsHandler.java
new file mode 100644
index 0000000..b5377b1
--- /dev/null
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsHandler.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.w3c.dom.Document;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+import org.xml.sax.SAXException;
+
+public class PrepareActionsHandler {
+
+ /**
+ * Method to parse the prepare XML and execute the corresponding prepare actions
+ *
+ * @param prepareXML Prepare XML block in string format
+ * @throws LauncherException
+ */
+ public void prepareAction(String prepareXML, Configuration conf)
+ throws IOException, SAXException, ParserConfigurationException, LauncherException {
+ Document doc = getDocumentFromXML(prepareXML);
+ doc.getDocumentElement().normalize();
+
+ // Get the list of child nodes, basically, each one corresponding to a separate action
+ NodeList nl = doc.getDocumentElement().getChildNodes();
+ LauncherURIHandlerFactory factory = new LauncherURIHandlerFactory(conf);
+
+ for (int i = 0; i < nl.getLength(); ++i) {
+ Node n = nl.item(i);
+ String operation = n.getNodeName();
+ if (n.getAttributes() == null || n.getAttributes().getNamedItem("path") == null) {
+ continue;
+ }
+ String pathStr = n.getAttributes().getNamedItem("path").getNodeValue().trim();
+ // use Path to avoid URIsyntax error caused by square bracket in glob
+ URI uri = new Path(pathStr).toUri();
+ LauncherURIHandler handler = factory.getURIHandler(uri);
+ execute(operation, uri, handler, conf);
+ }
+ }
+
+ private void execute(String operation, URI uri, LauncherURIHandler handler, Configuration conf)
+ throws LauncherException {
+
+ switch (operation) {
+ case "delete":
+ handler.delete(uri, conf);
+ break;
+
+ case "mkdir":
+ handler.create(uri, conf);
+ break;
+
+ default:
+ System.out.println("Warning: unknown prepare operation " + operation + " -- skipping");
+ }
+ }
+
+ // Method to return the document from the prepare XML block
+ static Document getDocumentFromXML(String prepareXML) throws ParserConfigurationException, SAXException,
+ IOException {
+ DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory.newInstance();
+ docBuilderFactory.setNamespaceAware(true);
+ // support for includes in the xml file
+ docBuilderFactory.setXIncludeAware(true);
+ // ignore all comments inside the xml file
+ docBuilderFactory.setIgnoringComments(true);
+ docBuilderFactory.setExpandEntityReferences(false);
+ docBuilderFactory.setFeature("http://apache.org/xml/features/disallow-doctype-decl", true);
+ DocumentBuilder docBuilder = docBuilderFactory.newDocumentBuilder();
+ InputStream is = new ByteArrayInputStream(prepareXML.getBytes("UTF-8"));
+ return docBuilder.parse(is);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/SequenceFileWriterFactory.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/SequenceFileWriterFactory.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/SequenceFileWriterFactory.java
new file mode 100644
index 0000000..8d986af
--- /dev/null
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/SequenceFileWriterFactory.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+
+public class SequenceFileWriterFactory {
+
+ public SequenceFile.Writer createSequenceFileWriter(Configuration launcherJobConf, Path finalPath,
+ Class<?> keyClass, Class<?> valueClass) throws IOException {
+ return SequenceFile.createWriter(launcherJobConf,
+ SequenceFile.Writer.file(finalPath),
+ SequenceFile.Writer.keyClass(keyClass),
+ SequenceFile.Writer.valueClass(valueClass));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/ShellMain.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/ShellMain.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/ShellMain.java
index f109318..0ee35e8 100644
--- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/ShellMain.java
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/ShellMain.java
@@ -24,10 +24,10 @@ import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileWriter;
-import java.io.PrintWriter;
-import java.io.StringReader;
import java.io.IOException;
import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.io.StringReader;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -186,7 +186,7 @@ public class ShellMain extends LauncherMain {
*/
private Map<String, String> getEnvMap(Map<String, String> envp, Configuration actionConf) {
// Adding user-specified environments
- String[] envs = MapReduceMain.getStrings(actionConf, CONF_OOZIE_SHELL_ENVS);
+ String[] envs = ActionUtils.getStrings(actionConf, CONF_OOZIE_SHELL_ENVS);
for (String env : envs) {
String[] varValue = env.split("=",2); // Error case is handled in
// ShellActionExecutor
@@ -339,7 +339,7 @@ public class ShellMain extends LauncherMain {
*/
protected List<String> getShellArguments(Configuration actionConf) {
List<String> arguments = new ArrayList<String>();
- String[] scrArgs = MapReduceMain.getStrings(actionConf, CONF_OOZIE_SHELL_ARGS);
+ String[] scrArgs = ActionUtils.getStrings(actionConf, CONF_OOZIE_SHELL_ARGS);
for (String scrArg : scrArgs) {
arguments.add(scrArg);
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/LauncherAMTestMainClass.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/LauncherAMTestMainClass.java b/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/LauncherAMTestMainClass.java
new file mode 100644
index 0000000..718bf64
--- /dev/null
+++ b/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/LauncherAMTestMainClass.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.action.hadoop;
+
+public class LauncherAMTestMainClass {
+ public static final String SECURITY_EXCEPTION = "security";
+ public static final String LAUNCHER_EXCEPTION = "launcher";
+ public static final String JAVA_EXCEPTION = "java";
+ public static final String THROWABLE = "throwable";
+
+ public static final String JAVA_EXCEPTION_MESSAGE = "Java Exception";
+ public static final String SECURITY_EXCEPTION_MESSAGE = "Security Exception";
+ public static final String THROWABLE_MESSAGE = "Throwable";
+ public static final int LAUNCHER_ERROR_CODE = 1234;
+
+ public static void main(String args[]) throws Throwable {
+ System.out.println("Invocation of TestMain");
+
+ if (args != null && args.length == 1) {
+ switch (args[0]){
+ case JAVA_EXCEPTION:
+ throw new JavaMain.JavaMainException(new RuntimeException(JAVA_EXCEPTION_MESSAGE));
+ case LAUNCHER_EXCEPTION:
+ throw new LauncherMainException(LAUNCHER_ERROR_CODE);
+ case SECURITY_EXCEPTION:
+ throw new SecurityException(SECURITY_EXCEPTION_MESSAGE);
+ case THROWABLE:
+ throw new Throwable(THROWABLE_MESSAGE);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestHdfsOperations.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestHdfsOperations.java b/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestHdfsOperations.java
new file mode 100644
index 0000000..68c0f4b
--- /dev/null
+++ b/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestHdfsOperations.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import static org.junit.Assert.assertEquals;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.BDDMockito.given;
+import static org.mockito.BDDMockito.willThrow;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestHdfsOperations {
+ @Mock
+ private SequenceFileWriterFactory seqFileWriterFactoryMock;
+
+ @Mock
+ private SequenceFile.Writer writerMock;
+
+ @Mock
+ private UserGroupInformation ugiMock;
+
+ @Mock
+ private Configuration configurationMock;
+
+ private Path path = new Path(".");
+
+ private Map<String, String> actionData = new HashMap<>();
+
+ @InjectMocks
+ private HdfsOperations hdfsOperations;
+
+ @Before
+ public void setup() throws Exception {
+ configureMocksForHappyPath();
+ actionData.put("testKey", "testValue");
+ }
+
+ @Test
+ public void testActionDataUploadToHdfsSucceeds() throws Exception {
+ hdfsOperations.uploadActionDataToHDFS(configurationMock, path, actionData);
+
+ verify(seqFileWriterFactoryMock).createSequenceFileWriter(eq(configurationMock),
+ any(Path.class), eq(Text.class), eq(Text.class));
+ ArgumentCaptor<Text> keyCaptor = ArgumentCaptor.forClass(Text.class);
+ ArgumentCaptor<Text> valueCaptor = ArgumentCaptor.forClass(Text.class);
+ verify(writerMock).append(keyCaptor.capture(), valueCaptor.capture());
+ assertEquals("testKey", keyCaptor.getValue().toString());
+ assertEquals("testValue", valueCaptor.getValue().toString());
+ }
+
+ @Test(expected = IOException.class)
+ public void testActionDataUploadToHdfsFailsWhenAppendingToWriter() throws Exception {
+ willThrow(new IOException()).given(writerMock).append(any(Text.class), any(Text.class));
+
+ hdfsOperations.uploadActionDataToHDFS(configurationMock, path, actionData);
+ }
+
+ @Test(expected = IOException.class)
+ public void testActionDataUploadToHdfsFailsWhenWriterIsNull() throws Exception {
+ given(seqFileWriterFactoryMock.createSequenceFileWriter(eq(configurationMock),
+ any(Path.class), eq(Text.class), eq(Text.class))).willReturn(null);
+
+ hdfsOperations.uploadActionDataToHDFS(configurationMock, path, actionData);
+ }
+
+ @SuppressWarnings("unchecked")
+ private void configureMocksForHappyPath() throws Exception {
+ given(ugiMock.doAs(any(PrivilegedExceptionAction.class))).willAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ PrivilegedExceptionAction<?> action = (PrivilegedExceptionAction<?>) invocation.getArguments()[0];
+ return action.run();
+ }
+ });
+
+ given(seqFileWriterFactoryMock.createSequenceFileWriter(eq(configurationMock),
+ any(Path.class), eq(Text.class), eq(Text.class))).willReturn(writerMock);
+ }
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAM.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAM.java b/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAM.java
new file mode 100644
index 0000000..9cdedb7
--- /dev/null
+++ b/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAM.java
@@ -0,0 +1,641 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import static org.apache.oozie.action.hadoop.LauncherAM.ACTION_DATA_EXTERNAL_CHILD_IDS;
+import static org.apache.oozie.action.hadoop.LauncherAM.ACTION_DATA_NEW_ID;
+import static org.apache.oozie.action.hadoop.LauncherAM.ACTION_DATA_OUTPUT_PROPS;
+import static org.apache.oozie.action.hadoop.LauncherAM.ACTION_DATA_STATS;
+import static org.apache.oozie.action.hadoop.LauncherAMTestMainClass.JAVA_EXCEPTION;
+import static org.apache.oozie.action.hadoop.LauncherAMTestMainClass.JAVA_EXCEPTION_MESSAGE;
+import static org.apache.oozie.action.hadoop.LauncherAMTestMainClass.LAUNCHER_ERROR_CODE;
+import static org.apache.oozie.action.hadoop.LauncherAMTestMainClass.LAUNCHER_EXCEPTION;
+import static org.apache.oozie.action.hadoop.LauncherAMTestMainClass.SECURITY_EXCEPTION;
+import static org.apache.oozie.action.hadoop.LauncherAMTestMainClass.SECURITY_EXCEPTION_MESSAGE;
+import static org.apache.oozie.action.hadoop.LauncherAMTestMainClass.THROWABLE;
+import static org.apache.oozie.action.hadoop.LauncherAMTestMainClass.THROWABLE_MESSAGE;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasKey;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+
+import static org.mockito.BDDMockito.given;
+import static org.mockito.BDDMockito.willReturn;
+import static org.mockito.BDDMockito.willThrow;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.StringReader;
+import java.security.PrivilegedExceptionAction;
+import java.text.MessageFormat;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.oozie.action.hadoop.LauncherAM.LauncherSecurityManager;
+import org.apache.oozie.action.hadoop.LauncherAM.OozieActionResult;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestLauncherAM {
+ private static final String DEFAULT_CONTAINER_ID = "container_1479473450392_0001_01_000001";
+ private static final String ACTIONDATA_ERROR_PROPERTIES = "error.properties";
+ private static final String ACTIONDATA_FINAL_STATUS_PROPERTY = "final.status";
+ private static final String ERROR_CODE_PROPERTY = "error.code";
+ private static final String EXCEPTION_STACKTRACE_PROPERTY = "exception.stacktrace";
+ private static final String EXCEPTION_MESSAGE_PROPERTY = "exception.message";
+ private static final String ERROR_REASON_PROPERTY = "error.reason";
+
+ private static final String EMPTY_STRING = "";
+ private static final String EXIT_CODE_1 = "1";
+ private static final String EXIT_CODE_0 = "0";
+ private static final String DUMMY_XML = "<dummy>dummyXml</dummy>";
+
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ @Mock
+ private UserGroupInformation ugiMock;
+
+ @Mock
+ private AMRMClientAsyncFactory amRMClientAsyncFactoryMock;
+
+ @Mock
+ private AMRMClientAsync<?> amRmAsyncClientMock;
+
+ @Mock
+ private AMRMCallBackHandler callbackHandlerMock;
+
+ @Mock
+ private HdfsOperations hdfsOperationsMock;
+
+ @Mock
+ private LocalFsOperations localFsOperationsMock;
+
+ @Mock
+ private PrepareActionsHandler prepareHandlerMock;
+
+ @Mock
+ private LauncherAMCallbackNotifierFactory launcherCallbackNotifierFactoryMock;
+
+ @Mock
+ private LauncherAMCallbackNotifier launcherCallbackNotifierMock;
+
+ @Mock
+ private LauncherSecurityManager launcherSecurityManagerMock;
+
+ private Configuration launcherJobConfig = new Configuration();
+
+ private String containerId = DEFAULT_CONTAINER_ID;
+
+ private String applicationId = ConverterUtils.toContainerId(containerId)
+ .getApplicationAttemptId().getApplicationId().toString();
+
+ private LauncherAM launcherAM;
+
+ private ExpectedFailureDetails failureDetails = new ExpectedFailureDetails();
+
+ @Before
+ public void setup() throws Exception {
+ configureMocksForHappyPath();
+ launcherJobConfig.set(LauncherMapper.OOZIE_ACTION_RECOVERY_ID, "1");
+ instantiateLauncher();
+ }
+
+ @Test
+ public void testMainIsSuccessfullyInvokedWithActionData() throws Exception {
+ setupActionOutputContents();
+
+ executeLauncher();
+
+ verifyZeroInteractions(prepareHandlerMock);
+ assertSuccessfulExecution(OozieActionResult.RUNNING);
+ assertActionOutputDataPresentAndCorrect();
+ }
+
+ @Test
+ public void testMainIsSuccessfullyInvokedWithoutActionData() throws Exception {
+ executeLauncher();
+
+ verifyZeroInteractions(prepareHandlerMock);
+ assertSuccessfulExecution(OozieActionResult.SUCCEEDED);
+ assertNoActionOutputData();
+ }
+
+ @Test
+ public void testActionHasPrepareXML() throws Exception {
+ launcherJobConfig.set(LauncherAM.ACTION_PREPARE_XML, DUMMY_XML);
+
+ executeLauncher();
+
+ verify(prepareHandlerMock).prepareAction(eq(DUMMY_XML), any(Configuration.class));
+ assertSuccessfulExecution(OozieActionResult.SUCCEEDED);
+ }
+
+ @Test
+ public void testActionHasEmptyPrepareXML() throws Exception {
+ launcherJobConfig.set(LauncherAM.ACTION_PREPARE_XML, EMPTY_STRING);
+
+ executeLauncher();
+
+ verifyZeroInteractions(prepareHandlerMock);
+ assertSuccessfulExecution(OozieActionResult.SUCCEEDED);
+ assertNoActionOutputData();
+ }
+
+ @Test
+ public void testLauncherClassNotDefined() throws Exception {
+ launcherJobConfig.unset(LauncherAM.CONF_OOZIE_ACTION_MAIN_CLASS);
+
+ executeLauncher();
+
+ failureDetails.expectedExceptionMessage("Launcher class should not be null")
+ .expectedErrorCode(EXIT_CODE_0)
+ .expectedErrorReason("Launcher class should not be null")
+ .withStackTrace();
+
+ assertFailedExecution();
+ }
+
+ @Test
+ public void testMainIsSuccessfullyInvokedAndAsyncErrorReceived() throws Exception {
+ ErrorHolder errorHolder = new ErrorHolder();
+ errorHolder.setErrorCode(6);
+ errorHolder.setErrorMessage("dummy error");
+ errorHolder.setErrorCause(new Exception());
+ given(callbackHandlerMock.getError()).willReturn(errorHolder);
+
+ executeLauncher();
+
+ failureDetails.expectedExceptionMessage(null)
+ .expectedErrorCode("6")
+ .expectedErrorReason("dummy error")
+ .withStackTrace();
+
+ assertFailedExecution();
+ }
+
+ @Test
+ public void testMainClassNotFound() throws Exception {
+ launcherJobConfig.set(LauncherAM.CONF_OOZIE_ACTION_MAIN_CLASS, "org.apache.non.existing.Klass");
+
+ executeLauncher();
+
+ failureDetails.expectedExceptionMessage(ClassNotFoundException.class.getCanonicalName())
+ .expectedErrorCode(EXIT_CODE_0)
+ .expectedErrorReason(ClassNotFoundException.class.getCanonicalName())
+ .withStackTrace();
+
+ assertFailedExecution();
+ }
+
+ @Test
+ public void testLauncherJobConfCannotBeLoaded() throws Exception {
+ given(localFsOperationsMock.readLauncherConf()).willThrow(new RuntimeException());
+ thrown.expect(RuntimeException.class);
+
+ try {
+ executeLauncher();
+ } finally {
+ failureDetails.expectedExceptionMessage(null)
+ .expectedErrorCode(EXIT_CODE_0)
+ .expectedErrorReason("Could not load the Launcher AM configuration file")
+ .withStackTrace();
+
+ assertFailedExecution();
+ }
+ }
+
+ @Test
+ public void testActionPrepareFails() throws Exception {
+ launcherJobConfig.set(LauncherAM.ACTION_PREPARE_XML, DUMMY_XML);
+ willThrow(new IOException()).given(prepareHandlerMock).prepareAction(anyString(), any(Configuration.class));
+ thrown.expect(IOException.class);
+
+ try {
+ executeLauncher();
+ } finally {
+ failureDetails.expectedExceptionMessage(null)
+ .expectedErrorCode(EXIT_CODE_0)
+ .expectedErrorReason("Prepare execution in the Launcher AM has failed")
+ .withStackTrace();
+
+ assertFailedExecution();
+ }
+ }
+
+ @Test
+ public void testActionThrowsJavaMainException() throws Exception {
+ setupArgsForMainClass(JAVA_EXCEPTION);
+
+ executeLauncher();
+
+ failureDetails.expectedExceptionMessage(JAVA_EXCEPTION_MESSAGE)
+ .expectedErrorCode(EXIT_CODE_0)
+ .expectedErrorReason(JAVA_EXCEPTION_MESSAGE)
+ .withStackTrace();
+
+ assertFailedExecution();
+ }
+
+ @Test
+ public void testActionThrowsLauncherException() throws Exception {
+ setupArgsForMainClass(LAUNCHER_EXCEPTION);
+
+ executeLauncher();
+
+ failureDetails.expectedExceptionMessage(null)
+ .expectedErrorCode(String.valueOf(LAUNCHER_ERROR_CODE))
+ .expectedErrorReason("exit code [" + LAUNCHER_ERROR_CODE + "]")
+ .withoutStackTrace();
+
+ assertFailedExecution();
+ }
+
+ @Test
+ public void testActionThrowsSecurityExceptionWithExitCode0() throws Exception {
+ setupArgsForMainClass(SECURITY_EXCEPTION);
+ given(launcherSecurityManagerMock.getExitInvoked()).willReturn(true);
+ given(launcherSecurityManagerMock.getExitCode()).willReturn(0);
+
+ executeLauncher();
+
+ assertSuccessfulExecution(OozieActionResult.SUCCEEDED);
+ }
+
+ @Test
+ public void testActionThrowsSecurityExceptionWithExitCode1() throws Exception {
+ setupArgsForMainClass(SECURITY_EXCEPTION);
+ given(launcherSecurityManagerMock.getExitInvoked()).willReturn(true);
+ given(launcherSecurityManagerMock.getExitCode()).willReturn(1);
+
+ executeLauncher();
+
+ failureDetails.expectedExceptionMessage(null)
+ .expectedErrorCode(EXIT_CODE_1)
+ .expectedErrorReason("exit code ["+ EXIT_CODE_1 + "]")
+ .withoutStackTrace();
+
+ assertFailedExecution();
+ }
+
+ @Test
+ public void testActionThrowsSecurityExceptionWithoutSystemExit() throws Exception {
+ setupArgsForMainClass(SECURITY_EXCEPTION);
+ given(launcherSecurityManagerMock.getExitInvoked()).willReturn(false);
+
+ executeLauncher();
+
+ failureDetails.expectedExceptionMessage(SECURITY_EXCEPTION_MESSAGE)
+ .expectedErrorCode(EXIT_CODE_0)
+ .expectedErrorReason(SECURITY_EXCEPTION_MESSAGE)
+ .withStackTrace();
+
+ assertFailedExecution();
+ }
+
+ @Test
+ public void testActionThrowsThrowable() throws Exception {
+ setupArgsForMainClass(THROWABLE);
+
+ executeLauncher();
+
+ failureDetails.expectedExceptionMessage(THROWABLE_MESSAGE)
+ .expectedErrorCode(EXIT_CODE_0)
+ .expectedErrorReason(THROWABLE_MESSAGE)
+ .withStackTrace();
+
+ assertFailedExecution();
+ }
+
+ @Test
+ public void testActionThrowsThrowableAndAsyncErrorReceived() throws Exception {
+ setupArgsForMainClass(THROWABLE);
+ ErrorHolder errorHolder = new ErrorHolder();
+ errorHolder.setErrorCode(6);
+ errorHolder.setErrorMessage("dummy error");
+ errorHolder.setErrorCause(new Exception());
+ given(callbackHandlerMock.getError()).willReturn(errorHolder);
+
+ executeLauncher();
+
+ // sync problem overrides async problem
+ failureDetails.expectedExceptionMessage(THROWABLE_MESSAGE)
+ .expectedErrorCode(EXIT_CODE_0)
+ .expectedErrorReason(THROWABLE_MESSAGE)
+ .withStackTrace();
+
+ assertFailedExecution();
+ }
+
+ @Test
+ public void testYarnUnregisterFails() throws Exception {
+ willThrow(new IOException()).given(amRmAsyncClientMock).unregisterApplicationMaster(any(FinalApplicationStatus.class),
+ anyString(), anyString());
+ thrown.expect(IOException.class);
+
+ try {
+ executeLauncher();
+ } finally {
+ // TODO: check if this behaviour is correct (url callback: successful, but unregister fails)
+ assertSuccessfulExecution(OozieActionResult.SUCCEEDED);
+ }
+ }
+
+ @Test
+ public void testUpdateActionDataFailsWithActionError() throws Exception {
+ setupActionOutputContents();
+ given(localFsOperationsMock.getLocalFileContentAsString(any(File.class), eq(ACTION_DATA_EXTERNAL_CHILD_IDS), anyInt()))
+ .willThrow(new IOException());
+ thrown.expect(IOException.class);
+
+ try {
+ executeLauncher();
+ } finally {
+ Map<String, String> actionData = launcherAM.getActionData();
+ assertThat(actionData, not(hasKey(ACTION_DATA_EXTERNAL_CHILD_IDS)));
+ verify(launcherCallbackNotifierMock).notifyURL(OozieActionResult.FAILED);
+ }
+ }
+
+ @Test
+ public void testRecoveryIdNotSet() throws Exception {
+ launcherJobConfig.unset(LauncherMapper.OOZIE_ACTION_RECOVERY_ID);
+ instantiateLauncher();
+
+ executeLauncher();
+
+ failureDetails.expectedExceptionMessage("IO error")
+ .expectedErrorCode(EXIT_CODE_0)
+ .expectedErrorReason("IO error")
+ .withStackTrace();
+
+ assertFailedExecution();
+ }
+
+ @Test
+ public void testRecoveryIdExistsAndRecoveryIsdMatch() throws Exception {
+ given(hdfsOperationsMock.fileExists(any(Path.class), eq(launcherJobConfig))).willReturn(true);
+ given(hdfsOperationsMock.readFileContents(any(Path.class), eq(launcherJobConfig))).willReturn(applicationId);
+
+ executeLauncher();
+
+ verify(hdfsOperationsMock).readFileContents(any(Path.class), eq(launcherJobConfig));
+ }
+
+ @Test
+ public void testRecoveryIdExistsAndRecoveryIdsDoNotMatch() throws Exception {
+ String newAppId = "not_matching_appid";
+ given(hdfsOperationsMock.fileExists(any(Path.class), eq(launcherJobConfig))).willReturn(true);
+ given(hdfsOperationsMock.readFileContents(any(Path.class), eq(launcherJobConfig))).willReturn(newAppId);
+
+ executeLauncher();
+
+ String errorMessage = MessageFormat.format(
+ "YARN Id mismatch, action file [{0}] declares Id [{1}] current Id [{2}]", "dummy/1",
+ newAppId,
+ applicationId);
+
+ failureDetails.expectedExceptionMessage(errorMessage)
+ .expectedErrorCode(EXIT_CODE_0)
+ .expectedErrorReason(errorMessage)
+ .withStackTrace();
+
+ verify(hdfsOperationsMock).readFileContents(any(Path.class), eq(launcherJobConfig));
+ assertFailedExecution();
+ }
+
+ @Test
+ public void testReadingRecoveryIdFails() throws Exception {
+ willThrow(new IOException()).given(hdfsOperationsMock)
+ .writeStringToFile(any(Path.class), eq(launcherJobConfig), eq(applicationId));
+
+ executeLauncher();
+
+ failureDetails.expectedExceptionMessage("IO error")
+ .expectedErrorCode(EXIT_CODE_0)
+ .expectedErrorReason("IO error")
+ .withStackTrace();
+
+ assertFailedExecution();
+ }
+
+ private void instantiateLauncher() {
+ launcherAM = new LauncherAM(ugiMock,
+ amRMClientAsyncFactoryMock,
+ callbackHandlerMock,
+ hdfsOperationsMock,
+ localFsOperationsMock,
+ prepareHandlerMock,
+ launcherCallbackNotifierFactoryMock,
+ launcherSecurityManagerMock,
+ containerId);
+ }
+
+ @SuppressWarnings("unchecked")
+ private void configureMocksForHappyPath() throws Exception {
+ launcherJobConfig.set(LauncherAM.OOZIE_ACTION_DIR_PATH, "dummy");
+ launcherJobConfig.set(LauncherAM.OOZIE_JOB_ID, "dummy");
+ launcherJobConfig.set(LauncherAM.OOZIE_ACTION_ID, "dummy");
+ launcherJobConfig.set(LauncherAM.CONF_OOZIE_ACTION_MAIN_CLASS, LauncherAMTestMainClass.class.getCanonicalName());
+
+ given(localFsOperationsMock.readLauncherConf()).willReturn(launcherJobConfig);
+ given(localFsOperationsMock.fileExists(any(File.class))).willReturn(true);
+ willReturn(amRmAsyncClientMock).given(amRMClientAsyncFactoryMock).createAMRMClientAsync(anyInt());
+ given(ugiMock.doAs(any(PrivilegedExceptionAction.class))).willAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ PrivilegedExceptionAction<?> action = (PrivilegedExceptionAction<?>) invocation.getArguments()[0];
+ return action.run();
+ }
+ });
+ given(launcherCallbackNotifierFactoryMock.createCallbackNotifier(any(Configuration.class)))
+ .willReturn(launcherCallbackNotifierMock);
+ }
+
+ private void setupActionOutputContents() throws IOException {
+ // output files generated by an action
+ given(localFsOperationsMock.getLocalFileContentAsString(any(File.class), eq(ACTION_DATA_EXTERNAL_CHILD_IDS), anyInt()))
+ .willReturn(ACTION_DATA_EXTERNAL_CHILD_IDS);
+
+ given(localFsOperationsMock.getLocalFileContentAsString(any(File.class), eq(ACTION_DATA_NEW_ID), anyInt()))
+ .willReturn(ACTION_DATA_NEW_ID);
+
+ given(localFsOperationsMock.getLocalFileContentAsString(any(File.class), eq(ACTION_DATA_OUTPUT_PROPS), anyInt()))
+ .willReturn(ACTION_DATA_OUTPUT_PROPS);
+
+ given(localFsOperationsMock.getLocalFileContentAsString(any(File.class), eq(ACTION_DATA_STATS), anyInt()))
+ .willReturn(ACTION_DATA_STATS);
+ }
+
+ private void setupArgsForMainClass(final String... args) {
+ launcherJobConfig.set(String.valueOf(LauncherAM.CONF_OOZIE_ACTION_MAIN_ARG_COUNT), String.valueOf(args.length));
+
+ for (int i = 0; i < args.length; i++) {
+ launcherJobConfig.set(String.valueOf(LauncherAM.CONF_OOZIE_ACTION_MAIN_ARG_PREFIX + i), args[i]);
+ }
+ }
+
+ private void executeLauncher() throws Exception {
+ launcherAM.run();
+ }
+
+ @SuppressWarnings("unchecked")
+ private void assertSuccessfulExecution(OozieActionResult actionResult) throws Exception {
+ verify(amRmAsyncClientMock).registerApplicationMaster(anyString(), anyInt(), anyString());
+ verify(amRmAsyncClientMock).unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, EMPTY_STRING, EMPTY_STRING);
+ verify(amRmAsyncClientMock).stop();
+ verify(ugiMock, times(2)).doAs(any(PrivilegedExceptionAction.class)); // prepare & action main
+ verify(hdfsOperationsMock).uploadActionDataToHDFS(any(Configuration.class), any(Path.class), any(Map.class));
+ verify(launcherCallbackNotifierFactoryMock).createCallbackNotifier(any(Configuration.class));
+ verify(launcherCallbackNotifierMock).notifyURL(actionResult);
+ verify(hdfsOperationsMock).writeStringToFile(any(Path.class), any(Configuration.class), any(String.class));
+
+ Map<String, String> actionData = launcherAM.getActionData();
+ verifyFinalStatus(actionData, actionResult);
+ verifyNoError(actionData);
+ }
+
+ private void assertActionOutputDataPresentAndCorrect() {
+ Map<String, String> actionData = launcherAM.getActionData();
+ String extChildId = actionData.get(ACTION_DATA_EXTERNAL_CHILD_IDS);
+ String stats = actionData.get(ACTION_DATA_STATS);
+ String output = actionData.get(ACTION_DATA_OUTPUT_PROPS);
+ String idSwap = actionData.get(ACTION_DATA_NEW_ID);
+
+ assertThat("extChildID output", ACTION_DATA_EXTERNAL_CHILD_IDS, equalTo(extChildId));
+ assertThat("stats output", ACTION_DATA_STATS, equalTo(stats));
+ assertThat("action output", ACTION_DATA_OUTPUT_PROPS, equalTo(output));
+ assertThat("idSwap output", ACTION_DATA_NEW_ID, equalTo(idSwap));
+ }
+
+ private void assertNoActionOutputData() {
+ Map<String, String> actionData = launcherAM.getActionData();
+ String extChildId = actionData.get(ACTION_DATA_EXTERNAL_CHILD_IDS);
+ String stats = actionData.get(ACTION_DATA_STATS);
+ String output = actionData.get(ACTION_DATA_OUTPUT_PROPS);
+ String idSwap = actionData.get(ACTION_DATA_NEW_ID);
+
+ assertThat("extChildId", extChildId, nullValue());
+ assertThat("stats", stats, nullValue());
+ assertThat("Output", output, nullValue());
+ assertThat("idSwap", idSwap, nullValue());
+ }
+
+ private void assertFailedExecution() throws Exception {
+ Map<String, String> actionData = launcherAM.getActionData();
+ verify(launcherCallbackNotifierFactoryMock).createCallbackNotifier(any(Configuration.class));
+ verify(launcherCallbackNotifierMock).notifyURL(OozieActionResult.FAILED);
+ verifyFinalStatus(actionData, OozieActionResult.FAILED);
+
+ // Note: actionData contains properties inside a property, so we have to extract them into a new Property object
+ String fullError = actionData.get(ACTIONDATA_ERROR_PROPERTIES);
+ Properties props = new Properties();
+ props.load(new StringReader(fullError));
+
+ String errorReason = props.getProperty(ERROR_REASON_PROPERTY);
+ if (failureDetails.expectedErrorReason != null) {
+ assertThat("errorReason", errorReason, containsString(failureDetails.expectedErrorReason));
+ } else {
+ assertThat("errorReason", errorReason, nullValue());
+ }
+
+ String exceptionMessage = props.getProperty(EXCEPTION_MESSAGE_PROPERTY);
+ if (failureDetails.expectedExceptionMessage != null) {
+ assertThat("exceptionMessage", exceptionMessage, containsString(failureDetails.expectedExceptionMessage));
+ } else {
+ assertThat("exceptionMessage", exceptionMessage, nullValue());
+ }
+
+ String stackTrace = props.getProperty(EXCEPTION_STACKTRACE_PROPERTY);
+ if (failureDetails.hasStackTrace) {
+ assertThat("stackTrace", stackTrace, notNullValue());
+ } else {
+ assertThat("stackTrace", stackTrace, nullValue());
+ }
+
+ String errorCode = props.getProperty(ERROR_CODE_PROPERTY);
+ assertThat("errorCode", errorCode, equalTo(failureDetails.expectedErrorCode));
+ }
+
+ private void verifyFinalStatus(Map<String, String> actionData, OozieActionResult actionResult) {
+ String finalStatus = actionData.get(ACTIONDATA_FINAL_STATUS_PROPERTY);
+ assertThat("actionResult", actionResult.toString(), equalTo(finalStatus));
+ }
+
+ private void verifyNoError(Map<String, String> actionData) {
+ String fullError = actionData.get(ACTIONDATA_ERROR_PROPERTIES);
+ assertThat("error properties", fullError, nullValue());
+ }
+
+ private class ExpectedFailureDetails {
+ String expectedExceptionMessage;
+ String expectedErrorCode;
+ String expectedErrorReason;
+ boolean hasStackTrace;
+
+ public ExpectedFailureDetails expectedExceptionMessage(String expectedExceptionMessage) {
+ this.expectedExceptionMessage = expectedExceptionMessage;
+ return this;
+ }
+
+ public ExpectedFailureDetails expectedErrorCode(String expectedErrorCode) {
+ this.expectedErrorCode = expectedErrorCode;
+ return this;
+ }
+
+ public ExpectedFailureDetails expectedErrorReason(String expectedErrorReason) {
+ this.expectedErrorReason = expectedErrorReason;
+ return this;
+ }
+
+ public ExpectedFailureDetails withStackTrace() {
+ this.hasStackTrace = true;
+ return this;
+ }
+
+ public ExpectedFailureDetails withoutStackTrace() {
+ this.hasStackTrace = false;
+ return this;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/pig/pom.xml
----------------------------------------------------------------------
diff --git a/sharelib/pig/pom.xml b/sharelib/pig/pom.xml
index 99148d7..8f74ded 100644
--- a/sharelib/pig/pom.xml
+++ b/sharelib/pig/pom.xml
@@ -82,9 +82,9 @@
</dependency>
<dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <scope>provided</scope>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <scope>test</scope>
</dependency>
<dependency>
@@ -103,8 +103,8 @@
<scope>provided</scope>
</dependency>
<dependency>
- <groupId>org.apache.oozie</groupId>
- <artifactId>oozie-hadoop-utils</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
@@ -170,18 +170,6 @@
<outputFile>${project.build.directory}/classpath</outputFile>
</configuration>
</execution>
- <execution>
- <id>create-mrapp-generated-classpath</id>
- <phase>generate-test-resources</phase>
- <goals>
- <goal>build-classpath</goal>
- </goals>
- <configuration>
- <!-- needed to run the unit test for DS to generate the required classpath
- that is required in the env of the launch container in the mini mr/yarn cluster -->
- <outputFile>${project.build.directory}/test-classes/mrapp-generated-classpath</outputFile>
- </configuration>
- </execution>
</executions>
</plugin>
<plugin>
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/pig/src/main/java/org/apache/oozie/action/hadoop/PigMain.java
----------------------------------------------------------------------
diff --git a/sharelib/pig/src/main/java/org/apache/oozie/action/hadoop/PigMain.java b/sharelib/pig/src/main/java/org/apache/oozie/action/hadoop/PigMain.java
index 11cc7ee..5a9123a 100644
--- a/sharelib/pig/src/main/java/org/apache/oozie/action/hadoop/PigMain.java
+++ b/sharelib/pig/src/main/java/org/apache/oozie/action/hadoop/PigMain.java
@@ -131,7 +131,8 @@ public class PigMain extends LauncherMain {
pigProperties.store(os, "");
os.close();
- logMasking("pig.properties:", Arrays.asList("password"), pigProperties.entrySet());
+ logMasking("pig.properties:", Arrays.asList("password"),
+ (Iterable<Map.Entry<String, String>>)(Iterable<?>) pigProperties.entrySet());
List<String> arguments = new ArrayList<String>();
String script = actionConf.get(PigActionExecutor.PIG_SCRIPT);
@@ -148,7 +149,7 @@ public class PigMain extends LauncherMain {
arguments.add("-file");
arguments.add(script);
- String[] params = MapReduceMain.getStrings(actionConf, PigActionExecutor.PIG_PARAMS);
+ String[] params = ActionUtils.getStrings(actionConf, PigActionExecutor.PIG_PARAMS);
for (String param : params) {
arguments.add("-param");
arguments.add(param);
@@ -193,7 +194,7 @@ public class PigMain extends LauncherMain {
arguments.add("-logfile");
arguments.add(pigLog);
- String[] pigArgs = MapReduceMain.getStrings(actionConf, PigActionExecutor.PIG_ARGS);
+ String[] pigArgs = ActionUtils.getStrings(actionConf, PigActionExecutor.PIG_ARGS);
for (String pigArg : pigArgs) {
if (DISALLOWED_PIG_OPTIONS.contains(pigArg)) {
throw new RuntimeException("Error: Pig argument " + pigArg + " is not supported");
@@ -212,7 +213,7 @@ public class PigMain extends LauncherMain {
System.out.println(" " + arg);
}
- LauncherMainHadoopUtils.killChildYarnJobs(actionConf);
+ LauncherMain.killChildYarnJobs(actionConf);
System.out.println("=================================================================");
System.out.println();
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/pig/src/main/java/org/apache/oozie/action/hadoop/PigMainWithOldAPI.java
----------------------------------------------------------------------
diff --git a/sharelib/pig/src/main/java/org/apache/oozie/action/hadoop/PigMainWithOldAPI.java b/sharelib/pig/src/main/java/org/apache/oozie/action/hadoop/PigMainWithOldAPI.java
index 503d0eb..0ee4b0b 100644
--- a/sharelib/pig/src/main/java/org/apache/oozie/action/hadoop/PigMainWithOldAPI.java
+++ b/sharelib/pig/src/main/java/org/apache/oozie/action/hadoop/PigMainWithOldAPI.java
@@ -135,7 +135,7 @@ public class PigMainWithOldAPI extends LauncherMain {
arguments.add("-file");
arguments.add(script);
- String[] params = MapReduceMain.getStrings(actionConf, "oozie.pig.params");
+ String[] params = ActionUtils.getStrings(actionConf, "oozie.pig.params");
for (String param : params) {
arguments.add("-param");
arguments.add(param);
@@ -178,7 +178,7 @@ public class PigMainWithOldAPI extends LauncherMain {
arguments.add("-logfile");
arguments.add(pigLog);
- String[] pigArgs = MapReduceMain.getStrings(actionConf, "oozie.pig.args");
+ String[] pigArgs = ActionUtils.getStrings(actionConf, "oozie.pig.args");
for (String pigArg : pigArgs) {
if (DISALLOWED_PIG_OPTIONS.contains(pigArg)) {
throw new RuntimeException("Error: Pig argument " + pigArg + " is not supported");