You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by rk...@apache.org on 2016/07/26 01:25:07 UTC
[1/3] oozie git commit: OOZIE-2590 OYA: Create basic Oozie Launcher
Application Master (rkanter)
Repository: oozie
Updated Branches:
refs/heads/oya a37835fec -> fea512cf6
http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java
new file mode 100644
index 0000000..e056acc
--- /dev/null
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java
@@ -0,0 +1,636 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.xml.sax.SAXException;
+
+import javax.xml.parsers.ParserConfigurationException;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.Reader;
+import java.io.StringWriter;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.security.Permission;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.StringTokenizer;
+
+public class LauncherAM {
+
+ static final String CONF_OOZIE_ACTION_MAIN_CLASS = "oozie.launcher.action.main.class";
+
+ static final String ACTION_PREFIX = "oozie.action.";
+ public static final String CONF_OOZIE_ACTION_MAX_OUTPUT_DATA = ACTION_PREFIX + "max.output.data";
+ static final String CONF_OOZIE_ACTION_MAIN_ARG_PREFIX = ACTION_PREFIX + "main.arg.";
+ static final String CONF_OOZIE_ACTION_MAIN_ARG_COUNT = ACTION_PREFIX + CONF_OOZIE_ACTION_MAIN_ARG_PREFIX + "count";
+ static final String CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE = "oozie.external.stats.max.size";
+
+ static final String OOZIE_ACTION_DIR_PATH = ACTION_PREFIX + "dir.path";
+ static final String ACTION_PREPARE_XML = ACTION_PREFIX + "prepare.xml";
+ static final String ACTION_DATA_SEQUENCE_FILE = "action-data.seq"; // COMBO FILE
+ static final String ACTION_DATA_EXTERNAL_CHILD_IDS = "externalChildIDs";
+ static final String ACTION_DATA_OUTPUT_PROPS = "output.properties";
+ static final String ACTION_DATA_STATS = "stats.properties";
+ static final String ACTION_DATA_NEW_ID = "newId";
+ static final String ACTION_DATA_ERROR_PROPS = "error.properties";
+
+ // TODO: OYA: more unique file names? action.xml may be stuck for backwards compat though
+ public static final String LAUNCHER_JOB_CONF_XML = "launcher.xml";
+ public static final String ACTION_CONF_XML = "action.xml";
+ public static final String ACTION_DATA_FINAL_STATUS = "final.status";
+
+ private static AMRMClientAsync<AMRMClient.ContainerRequest> amRmClientAsync = null;
+ private static Configuration launcherJobConf = null;
+ private static Path actionDir;
+ private static Map<String, String> actionData = new HashMap<String,String>();
+
+ private static void printDebugInfo(String[] mainArgs) throws IOException {
+ printContentsOfCurrentDir();
+
+ System.out.println();
+ System.out.println("Oozie Launcher Application Master configuration");
+ System.out.println("===============================================");
+ System.out.println("Workflow job id : " + launcherJobConf.get("oozie.job.id"));
+ System.out.println("Workflow action id: " + launcherJobConf.get("oozie.action.id"));
+ System.out.println();
+ System.out.println("Classpath :");
+ System.out.println("------------------------");
+ StringTokenizer st = new StringTokenizer(System.getProperty("java.class.path"), ":");
+ while (st.hasMoreTokens()) {
+ System.out.println(" " + st.nextToken());
+ }
+ System.out.println("------------------------");
+ System.out.println();
+ String mainClass = launcherJobConf.get(CONF_OOZIE_ACTION_MAIN_CLASS);
+ System.out.println("Main class : " + mainClass);
+ System.out.println();
+ System.out.println("Maximum output : "
+ + launcherJobConf.getInt(CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, 2 * 1024));
+ System.out.println();
+ System.out.println("Arguments :");
+ for (String arg : mainArgs) {
+ System.out.println(" " + arg);
+ }
+
+ System.out.println();
+ System.out.println("Java System Properties:");
+ System.out.println("------------------------");
+ System.getProperties().store(System.out, "");
+ System.out.flush();
+ System.out.println("------------------------");
+ System.out.println();
+
+ System.out.println("=================================================================");
+ System.out.println();
+ System.out.println(">>> Invoking Main class now >>>");
+ System.out.println();
+ System.out.flush();
+ }
+
+ // TODO: OYA: delete me when making real Action Mains
+ public static class DummyMain {
+ public static void main(String[] args) throws Exception {
+ System.out.println("Hello World!");
+ if (launcherJobConf.get("foo", "0").equals("1")) {
+ throw new IOException("foo 1");
+ } else if (launcherJobConf.get("foo", "0").equals("2")) {
+ throw new JavaMainException(new IOException("foo 2"));
+ } else if (launcherJobConf.get("foo", "0").equals("3")) {
+ throw new LauncherMainException(3);
+ } else if (launcherJobConf.get("foo", "0").equals("4")) {
+ System.exit(0);
+ } else if (launcherJobConf.get("foo", "0").equals("5")) {
+ System.exit(1);
+ }
+ }
+ }
+
+ // TODO: OYA: rethink all print messages and formatting
+ public static void main(String[] AMargs) throws Exception {
+ ErrorHolder eHolder = new ErrorHolder();
+ FinalApplicationStatus finalStatus = FinalApplicationStatus.FAILED;
+ try {
+ try {
+ launcherJobConf = readLauncherConf();
+ System.out.println("Launcher AM configuration loaded");
+ } catch (Exception ex) {
+ eHolder.setErrorMessage("Could not load the Launcher AM configuration file");
+ eHolder.setErrorCause(ex);
+ throw ex;
+ }
+
+ registerWithRM();
+
+ actionDir = new Path(launcherJobConf.get(OOZIE_ACTION_DIR_PATH));
+
+ try {
+ System.out.println("\nStarting the execution of prepare actions");
+ executePrepare();
+ System.out.println("Completed the execution of prepare actions successfully");
+ } catch (Exception ex) {
+ eHolder.setErrorMessage("Prepare execution in the Launcher AM has failed");
+ eHolder.setErrorCause(ex);
+ throw ex;
+ }
+
+ String[] mainArgs = getMainArguments(launcherJobConf);
+
+ // TODO: OYA: should we allow turning this off?
+ // TODO: OYA: what should default be?
+ if (launcherJobConf.getBoolean("oozie.launcher.print.debug.info", true)) {
+ printDebugInfo(mainArgs);
+ }
+ finalStatus = runActionMain(mainArgs, eHolder);
+ if (finalStatus != FinalApplicationStatus.SUCCEEDED) {
+ handleActionData();
+ if (actionData.get(ACTION_DATA_OUTPUT_PROPS) != null) {
+ System.out.println();
+ System.out.println("Oozie Launcher, capturing output data:");
+ System.out.println("=======================");
+ System.out.println(actionData.get(ACTION_DATA_OUTPUT_PROPS));
+ System.out.println();
+ System.out.println("=======================");
+ System.out.println();
+ }
+ if (actionData.get(ACTION_DATA_NEW_ID) != null) {
+ System.out.println();
+ System.out.println("Oozie Launcher, propagating new Hadoop job id to Oozie");
+ System.out.println("=======================");
+ System.out.println(actionData.get(ACTION_DATA_NEW_ID));
+ System.out.println("=======================");
+ System.out.println();
+ }
+ }
+ } finally {
+ try {
+ // Store final status in case Launcher AM falls off the RM
+ actionData.put(ACTION_DATA_FINAL_STATUS, finalStatus.toString());
+ if (finalStatus != FinalApplicationStatus.SUCCEEDED) {
+ failLauncher(eHolder);
+ }
+ uploadActionDataToHDFS();
+ } finally {
+ try {
+ unregisterWithRM(finalStatus, eHolder.getErrorMessage());
+ } finally {
+ LauncherAMCallbackNotifier cn = new LauncherAMCallbackNotifier(launcherJobConf);
+ cn.notifyURL(finalStatus);
+ }
+ }
+ }
+ }
+
+ private static void registerWithRM() throws IOException, YarnException {
+ AMRMClient<AMRMClient.ContainerRequest> amRmClient = AMRMClient.createAMRMClient();
+
+ AMRMCallBackHandler callBackHandler = new AMRMCallBackHandler();
+ // TODO: OYA: make heartbeat interval configurable
+ // TODO: OYA: make heartbeat interval higher to put less load on RM, but lower than timeout
+ amRmClientAsync = AMRMClientAsync.createAMRMClientAsync(amRmClient, 60000, callBackHandler);
+ amRmClientAsync.init(launcherJobConf);
+ amRmClientAsync.start();
+
+ // hostname and tracking url are determined automatically
+ amRmClientAsync.registerApplicationMaster("", 0, "");
+ }
+
+ private static void unregisterWithRM(FinalApplicationStatus status, String message) throws YarnException, IOException {
+ if (amRmClientAsync != null) {
+ System.out.println("Stopping AM");
+ try {
+ message = (message == null) ? "" : message;
+ // tracking url is determined automatically
+ amRmClientAsync.unregisterApplicationMaster(status, message, "");
+ } catch (YarnException ex) {
+ System.err.println("Error un-registering AM client");
+ throw ex;
+ } catch (IOException ex) {
+ System.err.println("Error un-registering AM client");
+ throw ex;
+ } finally {
+ amRmClientAsync.stop();
+ amRmClientAsync = null;
+ }
+ }
+ }
+
+ // Method to execute the prepare actions
+ private static void executePrepare() throws IOException, LauncherException, ParserConfigurationException, SAXException {
+ String prepareXML = launcherJobConf.get(ACTION_PREPARE_XML);
+ if (prepareXML != null) {
+ if (prepareXML.length() != 0) {
+ Configuration actionConf = new Configuration(launcherJobConf);
+ actionConf.addResource(ACTION_CONF_XML);
+ PrepareActionsDriver.doOperations(prepareXML, actionConf);
+ } else {
+ System.out.println("There are no prepare actions to execute.");
+ }
+ }
+ }
+
+ private static FinalApplicationStatus runActionMain(String[] mainArgs, ErrorHolder eHolder) {
+ FinalApplicationStatus finalStatus = FinalApplicationStatus.FAILED;
+ LauncherSecurityManager secMan = new LauncherSecurityManager();
+ try {
+ Class<?> klass = launcherJobConf.getClass(CONF_OOZIE_ACTION_MAIN_CLASS, Object.class);
+ Method mainMethod = klass.getMethod("main", String[].class);
+ // Enable LauncherSecurityManager to catch System.exit calls
+ secMan.set();
+ // TODO: OYA: remove this line to actually run the Main class instead of this dummy
+ mainMethod = DummyMain.class.getMethod("main", String[].class);
+ mainMethod.invoke(null, (Object) mainArgs);
+
+ System.out.println();
+ System.out.println("<<< Invocation of Main class completed <<<");
+ System.out.println();
+ finalStatus = FinalApplicationStatus.SUCCEEDED;
+ } catch (InvocationTargetException ex) {
+ // Get what actually caused the exception
+ Throwable cause = ex.getCause();
+ // If we got a JavaMainException from JavaMain, then we need to unwrap it
+ if (JavaMainException.class.isInstance(cause)) {
+ cause = cause.getCause();
+ }
+ if (LauncherMainException.class.isInstance(cause)) {
+ String mainClass = launcherJobConf.get(CONF_OOZIE_ACTION_MAIN_CLASS);
+ eHolder.setErrorMessage("Main Class [" + mainClass + "], exit code [" +
+ ((LauncherMainException) ex.getCause()).getErrorCode() + "]");
+ } else if (SecurityException.class.isInstance(cause)) {
+ if (secMan.getExitInvoked()) {
+ System.out.println("Intercepting System.exit(" + secMan.getExitCode()
+ + ")");
+ System.err.println("Intercepting System.exit(" + secMan.getExitCode()
+ + ")");
+ // if 0 main() method finished successfully
+ // ignoring
+ eHolder.setErrorCode(secMan.getExitCode());
+ if (eHolder.getErrorCode() != 0) {
+ String mainClass = launcherJobConf.get(CONF_OOZIE_ACTION_MAIN_CLASS);
+ eHolder.setErrorMessage("Main Class [" + mainClass + "], exit code [" + eHolder.getErrorCode() + "]");
+ } else {
+ finalStatus = FinalApplicationStatus.SUCCEEDED;
+ }
+ }
+ } else {
+ eHolder.setErrorMessage(cause.getMessage());
+ eHolder.setErrorCause(cause);
+ }
+ } catch (Throwable t) {
+ eHolder.setErrorMessage(t.getMessage());
+ eHolder.setErrorCause(t);
+ } finally {
+ // Disable LauncherSecurityManager
+ secMan.unset();
+ }
+ return finalStatus;
+ }
+
+ private static void handleActionData() throws IOException {
+ // external child IDs
+ String externalChildIdsProp = System.getProperty(ACTION_PREFIX
+ + ACTION_DATA_EXTERNAL_CHILD_IDS);
+ if (externalChildIdsProp != null) {
+ File externalChildIDs = new File(externalChildIdsProp);
+ if (externalChildIDs.exists()) {
+ actionData.put(ACTION_DATA_EXTERNAL_CHILD_IDS, getLocalFileContentStr(externalChildIDs, "", -1));
+ }
+ }
+
+ // external stats
+ String statsProp = System.getProperty(ACTION_PREFIX + ACTION_DATA_STATS);
+ if (statsProp != null) {
+ File actionStatsData = new File(statsProp);
+ if (actionStatsData.exists()) {
+ int statsMaxOutputData = launcherJobConf.getInt(CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE,
+ Integer.MAX_VALUE);
+ actionData.put(ACTION_DATA_STATS,
+ getLocalFileContentStr(actionStatsData, "Stats", statsMaxOutputData));
+ }
+ }
+
+ // output data
+ String outputProp = System.getProperty(ACTION_PREFIX + ACTION_DATA_OUTPUT_PROPS);
+ if (outputProp != null) {
+ File actionOutputData = new File(outputProp);
+ if (actionOutputData.exists()) {
+ int maxOutputData = launcherJobConf.getInt(CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, 2 * 1024);
+ actionData.put(ACTION_DATA_OUTPUT_PROPS,
+ getLocalFileContentStr(actionOutputData, "Output", maxOutputData));
+ }
+ }
+
+ // id swap
+ String newIdProp = System.getProperty(ACTION_PREFIX + ACTION_DATA_NEW_ID);
+ if (newIdProp != null) {
+ File newId = new File(newIdProp);
+ if (newId.exists()) {
+ actionData.put(ACTION_DATA_NEW_ID, getLocalFileContentStr(newId, "", -1));
+ }
+ }
+ }
+
+ public static String getLocalFileContentStr(File file, String type, int maxLen) throws IOException {
+ StringBuilder sb = new StringBuilder();
+ Reader reader = null;
+ try {
+ reader = new BufferedReader(new FileReader(file));
+ char[] buffer = new char[2048];
+ int read;
+ int count = 0;
+ while ((read = reader.read(buffer)) > -1) {
+ count += read;
+ if (maxLen > -1 && count > maxLen) {
+ throw new IOException(type + " data exceeds its limit [" + maxLen + "]");
+ }
+ sb.append(buffer, 0, read);
+ }
+ } finally {
+ if (reader != null) {
+ reader.close();
+ }
+ }
+ return sb.toString();
+ }
+
+ private static void uploadActionDataToHDFS() throws IOException {
+ Path finalPath = new Path(actionDir, ACTION_DATA_SEQUENCE_FILE);
+ FileSystem fs = FileSystem.get(finalPath.toUri(), launcherJobConf);
+ // upload into sequence file
+ System.out.println("Oozie Launcher, uploading action data to HDFS sequence file: "
+ + new Path(actionDir, ACTION_DATA_SEQUENCE_FILE).toUri());
+
+ SequenceFile.Writer wr = null;
+ try {
+ wr = SequenceFile.createWriter(launcherJobConf,
+ SequenceFile.Writer.file(finalPath),
+ SequenceFile.Writer.keyClass(Text.class),
+ SequenceFile.Writer.valueClass(Text.class));
+ if (wr != null) {
+ Set<String> keys = actionData.keySet();
+ for (String propsKey : keys) {
+ wr.append(new Text(propsKey), new Text(actionData.get(propsKey)));
+ }
+ }
+ else {
+ throw new IOException("SequenceFile.Writer is null for " + finalPath);
+ }
+ }
+ catch(IOException e) {
+ e.printStackTrace();
+ throw e;
+ }
+ finally {
+ if (wr != null) {
+ wr.close();
+ }
+ }
+ }
+ private static void failLauncher(int errorCode, String message, Throwable ex) {
+ ErrorHolder eHolder = new ErrorHolder();
+ eHolder.setErrorCode(errorCode);
+ eHolder.setErrorMessage(message);
+ eHolder.setErrorCause(ex);
+ failLauncher(eHolder);
+ }
+
+ private static void failLauncher(ErrorHolder eHolder) {
+ if (eHolder.getErrorCause() != null) {
+ eHolder.setErrorMessage(eHolder.getErrorMessage() + ", " + eHolder.getErrorCause().getMessage());
+ }
+ Properties errorProps = new Properties();
+ errorProps.setProperty("error.code", Integer.toString(eHolder.getErrorCode()));
+ errorProps.setProperty("error.reason", eHolder.getErrorMessage());
+ if (eHolder.getErrorCause() != null) {
+ if (eHolder.getErrorCause().getMessage() != null) {
+ errorProps.setProperty("exception.message", eHolder.getErrorCause().getMessage());
+ }
+ StringWriter sw = new StringWriter();
+ PrintWriter pw = new PrintWriter(sw);
+ eHolder.getErrorCause().printStackTrace(pw);
+ pw.close();
+ errorProps.setProperty("exception.stacktrace", sw.toString());
+ }
+ StringWriter sw = new StringWriter();
+ try {
+ errorProps.store(sw, "");
+ sw.close();
+ actionData.put(ACTION_DATA_ERROR_PROPS, sw.toString());
+
+ // external child IDs
+ String externalChildIdsProp = System.getProperty(ACTION_PREFIX + ACTION_DATA_EXTERNAL_CHILD_IDS);
+ if (externalChildIdsProp != null) {
+ File externalChildIDs = new File(externalChildIdsProp);
+ if (externalChildIDs.exists()) {
+ actionData.put(ACTION_DATA_EXTERNAL_CHILD_IDS, getLocalFileContentStr(externalChildIDs, "", -1));
+ }
+ }
+ } catch (IOException ioe) {
+ System.err.println("A problem occured trying to fail the launcher");
+ ioe.printStackTrace();
+ } finally {
+ System.out.print("Failing Oozie Launcher, " + eHolder.getErrorMessage() + "\n");
+ System.err.print("Failing Oozie Launcher, " + eHolder.getErrorMessage() + "\n");
+ if (eHolder.getErrorCause() != null) {
+ eHolder.getErrorCause().printStackTrace(System.out);
+ eHolder.getErrorCause().printStackTrace(System.err);
+ }
+ }
+ }
+
+ private static class AMRMCallBackHandler implements AMRMClientAsync.CallbackHandler {
+ @Override
+ public void onContainersCompleted(List<ContainerStatus> containerStatuses) {
+ //noop
+ }
+
+ @Override
+ public void onContainersAllocated(List<Container> containers) {
+ //noop
+ }
+
+ @Override
+ public void onShutdownRequest() {
+ failLauncher(0, "ResourceManager requested AM Shutdown", null);
+ // TODO: OYA: interrupt?
+ }
+
+ @Override
+ public void onNodesUpdated(List<NodeReport> nodeReports) {
+ //noop
+ }
+
+ @Override
+ public float getProgress() {
+ return 0.5f; //TODO: OYA: maybe some action types can report better progress?
+ }
+
+ @Override
+ public void onError(final Throwable ex) {
+ failLauncher(0, ex.getMessage(), ex);
+ // TODO: OYA: interrupt?
+ }
+ }
+
+ public static String[] getMainArguments(Configuration conf) {
+ String[] args = new String[conf.getInt(CONF_OOZIE_ACTION_MAIN_ARG_COUNT, 0)];
+ for (int i = 0; i < args.length; i++) {
+ args[i] = conf.get(CONF_OOZIE_ACTION_MAIN_ARG_PREFIX + i);
+ }
+ return args;
+ }
+
+ private static class LauncherSecurityManager extends SecurityManager {
+ private boolean exitInvoked;
+ private int exitCode;
+ private SecurityManager securityManager;
+
+ public LauncherSecurityManager() {
+ exitInvoked = false;
+ exitCode = 0;
+ securityManager = System.getSecurityManager();
+ }
+
+ @Override
+ public void checkPermission(Permission perm, Object context) {
+ if (securityManager != null) {
+ // check everything with the original SecurityManager
+ securityManager.checkPermission(perm, context);
+ }
+ }
+
+ @Override
+ public void checkPermission(Permission perm) {
+ if (securityManager != null) {
+ // check everything with the original SecurityManager
+ securityManager.checkPermission(perm);
+ }
+ }
+
+ @Override
+ public void checkExit(int status) throws SecurityException {
+ exitInvoked = true;
+ exitCode = status;
+ throw new SecurityException("Intercepted System.exit(" + status + ")");
+ }
+
+ public boolean getExitInvoked() {
+ return exitInvoked;
+ }
+
+ public int getExitCode() {
+ return exitCode;
+ }
+
+ public void set() {
+ if (System.getSecurityManager() != this) {
+ System.setSecurityManager(this);
+ }
+ }
+
+ public void unset() {
+ if (System.getSecurityManager() == this) {
+ System.setSecurityManager(securityManager);
+ }
+ }
+ }
+
+
+ /**
+ * Print files and directories in current directory. Will list files in the sub-directory (only 1 level deep)
+ */
+ protected static void printContentsOfCurrentDir() {
+ File folder = new File(".");
+ System.out.println();
+ System.out.println("Files in current dir:" + folder.getAbsolutePath());
+ System.out.println("======================");
+
+ File[] listOfFiles = folder.listFiles();
+ for (File fileName : listOfFiles) {
+ if (fileName.isFile()) {
+ System.out.println("File: " + fileName.getName());
+ } else if (fileName.isDirectory()) {
+ System.out.println("Dir: " + fileName.getName());
+ File subDir = new File(fileName.getName());
+ File[] moreFiles = subDir.listFiles();
+ for (File subFileName : moreFiles) {
+ if (subFileName.isFile()) {
+ System.out.println(" File: " + subFileName.getName());
+ } else if (subFileName.isDirectory()) {
+ System.out.println(" Dir: " + subFileName.getName());
+ }
+ }
+ }
+ }
+ }
+
+ protected static Configuration readLauncherConf() {
+ File confFile = new File(LAUNCHER_JOB_CONF_XML);
+ Configuration conf = new Configuration(false);
+ conf.addResource(new Path(confFile.getAbsolutePath()));
+ return conf;
+ }
+
+ protected static class ErrorHolder {
+ private int errorCode = 0;
+ private Throwable errorCause = null;
+ private String errorMessage = null;
+
+ public int getErrorCode() {
+ return errorCode;
+ }
+
+ public void setErrorCode(int errorCode) {
+ this.errorCode = errorCode;
+ }
+
+ public Throwable getErrorCause() {
+ return errorCause;
+ }
+
+ public void setErrorCause(Throwable errorCause) {
+ this.errorCause = errorCause;
+ }
+
+ public String getErrorMessage() {
+ return errorMessage;
+ }
+
+ public void setErrorMessage(String errorMessage) {
+ this.errorMessage = errorMessage;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifier.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifier.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifier.java
new file mode 100644
index 0000000..dbef441
--- /dev/null
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifier.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.net.Proxy;
+import java.net.URL;
+
+// Adapted from org.apache.hadoop.mapreduce.v2.app.JobEndNotifier
+/**
+ * This call sends back an HTTP GET callback to the configured URL. It is meant for the {@link LauncherAM} to notify the
+ * Oozie Server that it has finished.
+ */
+public class LauncherAMCallbackNotifier {
+ private static final String OOZIE_LAUNCHER_CALLBACK = "oozie.launcher.callback.";
+ public static final String OOZIE_LAUNCHER_CALLBACK_RETRY_ATTEMPTS = OOZIE_LAUNCHER_CALLBACK + "retry.attempts";
+ public static final String OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL = OOZIE_LAUNCHER_CALLBACK + "retry.interval";
+ static final int OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL_MAX = 5000;
+ public static final String OOZIE_LAUNCHER_CALLBACK_MAX_ATTEMPTS = OOZIE_LAUNCHER_CALLBACK + "max.attempts";
+ public static final String OOZIE_LAUNCHER_CALLBACK_TIMEOUT = OOZIE_LAUNCHER_CALLBACK + "timeout";
+ public static final String OOZIE_LAUNCHER_CALLBACK_URL = OOZIE_LAUNCHER_CALLBACK + "url";
+ public static final String OOZIE_LAUNCHER_CALLBACK_PROXY = OOZIE_LAUNCHER_CALLBACK + "proxy";
+ public static final String OOZIE_LAUNCHER_CALLBACK_JOBSTATUS_TOKEN = "$jobStatus";
+
+ protected String userUrl;
+ protected String proxyConf;
+ protected int numTries; //Number of tries to attempt notification
+ protected int waitInterval; //Time (ms) to wait between retrying notification
+ protected int timeout; // Timeout (ms) on the connection and notification
+ protected URL urlToNotify; //URL to notify read from the config
+ protected Proxy proxyToUse = Proxy.NO_PROXY; //Proxy to use for notification
+
+ /**
+ * Parse the URL that needs to be notified of the end of the job, along
+ * with the number of retries in case of failure, the amount of time to
+ * wait between retries and proxy settings
+ * @param conf the configuration
+ */
+ public LauncherAMCallbackNotifier(Configuration conf) {
+ numTries = Math.min(conf.getInt(OOZIE_LAUNCHER_CALLBACK_RETRY_ATTEMPTS, 0) + 1,
+ conf.getInt(OOZIE_LAUNCHER_CALLBACK_MAX_ATTEMPTS, 1));
+
+ waitInterval = Math.min(conf.getInt(OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL, OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL_MAX),
+ OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL_MAX);
+ waitInterval = (waitInterval < 0) ? OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL_MAX : waitInterval;
+
+ timeout = conf.getInt(OOZIE_LAUNCHER_CALLBACK_TIMEOUT, OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL_MAX);
+
+ userUrl = conf.get(OOZIE_LAUNCHER_CALLBACK_URL);
+
+ proxyConf = conf.get(OOZIE_LAUNCHER_CALLBACK_PROXY);
+
+ //Configure the proxy to use if its set. It should be set like
+ //proxyType@proxyHostname:port
+ if(proxyConf != null && !proxyConf.equals("") &&
+ proxyConf.lastIndexOf(":") != -1) {
+ int typeIndex = proxyConf.indexOf("@");
+ Proxy.Type proxyType = Proxy.Type.HTTP;
+ if(typeIndex != -1 &&
+ proxyConf.substring(0, typeIndex).compareToIgnoreCase("socks") == 0) {
+ proxyType = Proxy.Type.SOCKS;
+ }
+ String hostname = proxyConf.substring(typeIndex + 1,
+ proxyConf.lastIndexOf(":"));
+ String portConf = proxyConf.substring(proxyConf.lastIndexOf(":") + 1);
+ try {
+ int port = Integer.parseInt(portConf);
+ proxyToUse = new Proxy(proxyType,
+ new InetSocketAddress(hostname, port));
+ System.out.println("Callback notification using proxy type \"" + proxyType +
+ "\" hostname \"" + hostname + "\" and port \"" + port + "\"");
+ } catch(NumberFormatException nfe) {
+ System.err.println("Callback notification couldn't parse configured proxy's port "
+ + portConf + ". Not going to use a proxy");
+ }
+ }
+
+ }
+
+ /**
+ * Notify the URL just once. Use best effort.
+ */
+ protected boolean notifyURLOnce() {
+ boolean success = false;
+ HttpURLConnection conn = null;
+ try {
+ System.out.println("Callback notification trying " + urlToNotify);
+ conn = (HttpURLConnection) urlToNotify.openConnection(proxyToUse);
+ conn.setConnectTimeout(timeout);
+ conn.setReadTimeout(timeout);
+ conn.setAllowUserInteraction(false);
+ if(conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
+ System.err.println("Callback notification to " + urlToNotify +" failed with code: "
+ + conn.getResponseCode() + " and message \"" + conn.getResponseMessage()
+ +"\"");
+ }
+ else {
+ success = true;
+ System.out.println("Callback notification to " + urlToNotify + " succeeded");
+ }
+ } catch(IOException ioe) {
+ System.err.println("Callback notification to " + urlToNotify + " failed");
+ ioe.printStackTrace();
+ } finally {
+ if (conn != null) {
+ conn.disconnect();
+ }
+ }
+ return success;
+ }
+
+ /**
+ * Notify a server of the completion of a submitted job.
+ * @param finalStatus The Application Status
+ *
+ * @throws InterruptedException
+ */
+ public void notifyURL(FinalApplicationStatus finalStatus) throws InterruptedException {
+ // Do we need job-end notification?
+ if (userUrl == null) {
+ System.out.println("Callback notification URL not set, skipping.");
+ return;
+ }
+
+ //Do string replacements for final status
+ if (userUrl.contains(OOZIE_LAUNCHER_CALLBACK_JOBSTATUS_TOKEN)) {
+ userUrl = userUrl.replace(OOZIE_LAUNCHER_CALLBACK_JOBSTATUS_TOKEN, finalStatus.toString());
+ }
+
+ // Create the URL, ensure sanity
+ try {
+ urlToNotify = new URL(userUrl);
+ } catch (MalformedURLException mue) {
+ System.err.println("Callback notification couldn't parse " + userUrl);
+ mue.printStackTrace();
+ return;
+ }
+
+ // Send notification
+ boolean success = false;
+ while (numTries-- > 0 && !success) {
+ System.out.println("Callback notification attempts left " + numTries);
+ success = notifyURLOnce();
+ if (!success) {
+ Thread.sleep(waitInterval);
+ }
+ }
+ if (!success) {
+ System.err.println("Callback notification failed to notify : " + urlToNotify);
+ } else {
+ System.out.println("Callback notification succeeded");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java
index f2cba13..eeffe81 100644
--- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java
@@ -49,7 +49,11 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
+import org.xml.sax.SAXException;
+import javax.xml.parsers.ParserConfigurationException;
+
+// TODO: OYA: Delete :)
public class LauncherMapper<K1, V1, K2, V2> implements Mapper<K1, V1, K2, V2>, Runnable {
static final String CONF_OOZIE_ACTION_MAIN_CLASS = "oozie.launcher.action.main.class";
@@ -480,7 +484,7 @@ public class LauncherMapper<K1, V1, K2, V2> implements Mapper<K1, V1, K2, V2>, R
}
// Method to execute the prepare actions
- private void executePrepare() throws IOException, LauncherException {
+ private void executePrepare() throws IOException, LauncherException, ParserConfigurationException, SAXException {
String prepareXML = getJobConf().get(ACTION_PREPARE_XML);
if (prepareXML != null) {
if (!prepareXML.equals("")) {
http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java
index 21ae456..4a51d48 100644
--- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java
@@ -46,35 +46,26 @@ public class PrepareActionsDriver {
* @param prepareXML Prepare XML block in string format
* @throws LauncherException
*/
- static void doOperations(String prepareXML, Configuration conf) throws LauncherException {
- try {
- Document doc = getDocumentFromXML(prepareXML);
- doc.getDocumentElement().normalize();
+ static void doOperations(String prepareXML, Configuration conf)
+ throws IOException, SAXException, ParserConfigurationException, LauncherException {
+ Document doc = getDocumentFromXML(prepareXML);
+ doc.getDocumentElement().normalize();
- // Get the list of child nodes, basically, each one corresponding to a separate action
- NodeList nl = doc.getDocumentElement().getChildNodes();
- LauncherURIHandlerFactory factory = new LauncherURIHandlerFactory(conf);
+ // Get the list of child nodes, basically, each one corresponding to a separate action
+ NodeList nl = doc.getDocumentElement().getChildNodes();
+ LauncherURIHandlerFactory factory = new LauncherURIHandlerFactory(conf);
- for (int i = 0; i < nl.getLength(); ++i) {
- Node n = nl.item(i);
- String operation = n.getNodeName();
- if (n.getAttributes() == null || n.getAttributes().getNamedItem("path") == null) {
- continue;
- }
- String pathStr = n.getAttributes().getNamedItem("path").getNodeValue().trim();
- // use Path to avoid URIsyntax error caused by square bracket in glob
- URI uri = new Path(pathStr).toUri();
- LauncherURIHandler handler = factory.getURIHandler(uri);
- execute(operation, uri, handler, conf);
+ for (int i = 0; i < nl.getLength(); ++i) {
+ Node n = nl.item(i);
+ String operation = n.getNodeName();
+ if (n.getAttributes() == null || n.getAttributes().getNamedItem("path") == null) {
+ continue;
}
- } catch (IOException ioe) {
- throw new LauncherException(ioe.getMessage(), ioe);
- } catch (SAXException saxe) {
- throw new LauncherException(saxe.getMessage(), saxe);
- } catch (ParserConfigurationException pce) {
- throw new LauncherException(pce.getMessage(), pce);
- } catch (IllegalArgumentException use) {
- throw new LauncherException(use.getMessage(), use);
+ String pathStr = n.getAttributes().getNamedItem("path").getNodeValue().trim();
+ // use Path to avoid URIsyntax error caused by square bracket in glob
+ URI uri = new Path(pathStr).toUri();
+ LauncherURIHandler handler = factory.getURIHandler(uri);
+ execute(operation, uri, handler, conf);
}
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/sharelib/pig/pom.xml
----------------------------------------------------------------------
diff --git a/sharelib/pig/pom.xml b/sharelib/pig/pom.xml
index 562c530..ea674a1 100644
--- a/sharelib/pig/pom.xml
+++ b/sharelib/pig/pom.xml
@@ -136,18 +136,6 @@
<outputFile>${project.build.directory}/classpath</outputFile>
</configuration>
</execution>
- <execution>
- <id>create-mrapp-generated-classpath</id>
- <phase>generate-test-resources</phase>
- <goals>
- <goal>build-classpath</goal>
- </goals>
- <configuration>
- <!-- needed to run the unit test for DS to generate the required classpath
- that is required in the env of the launch container in the mini mr/yarn cluster -->
- <outputFile>${project.build.directory}/test-classes/mrapp-generated-classpath</outputFile>
- </configuration>
- </execution>
</executions>
</plugin>
<plugin>
http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/sharelib/spark/pom.xml
----------------------------------------------------------------------
diff --git a/sharelib/spark/pom.xml b/sharelib/spark/pom.xml
index 46c6375..6d52ab4 100644
--- a/sharelib/spark/pom.xml
+++ b/sharelib/spark/pom.xml
@@ -370,18 +370,6 @@
<outputFile>${project.build.directory}/classpath</outputFile>
</configuration>
</execution>
- <execution>
- <id>create-mrapp-generated-classpath</id>
- <phase>generate-test-resources</phase>
- <goals>
- <goal>build-classpath</goal>
- </goals>
- <configuration>
- <!-- needed to run the unit test for DS to generate the required classpath
- that is required in the env of the launch container in the mini mr/yarn cluster -->
- <outputFile>${project.build.directory}/test-classes/mrapp-generated-classpath</outputFile>
- </configuration>
- </execution>
</executions>
</plugin>
<plugin>
http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/sharelib/sqoop/pom.xml
----------------------------------------------------------------------
diff --git a/sharelib/sqoop/pom.xml b/sharelib/sqoop/pom.xml
index d875c93..aad13f9 100644
--- a/sharelib/sqoop/pom.xml
+++ b/sharelib/sqoop/pom.xml
@@ -239,18 +239,6 @@
<outputFile>${project.build.directory}/classpath</outputFile>
</configuration>
</execution>
- <execution>
- <id>create-mrapp-generated-classpath</id>
- <phase>generate-test-resources</phase>
- <goals>
- <goal>build-classpath</goal>
- </goals>
- <configuration>
- <!-- needed to run the unit test for DS to generate the required classpath
- that is required in the env of the launch container in the mini mr/yarn cluster -->
- <outputFile>${project.build.directory}/test-classes/mrapp-generated-classpath</outputFile>
- </configuration>
- </execution>
</executions>
</plugin>
<plugin>
http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/sharelib/streaming/pom.xml
----------------------------------------------------------------------
diff --git a/sharelib/streaming/pom.xml b/sharelib/streaming/pom.xml
index fd79518..783c669 100644
--- a/sharelib/streaming/pom.xml
+++ b/sharelib/streaming/pom.xml
@@ -107,39 +107,6 @@
<outputFile>${project.build.directory}/classpath</outputFile>
</configuration>
</execution>
- <execution>
- <id>create-mrapp-generated-classpath</id>
- <phase>generate-test-resources</phase>
- <goals>
- <goal>build-classpath</goal>
- </goals>
- <configuration>
- <!-- needed to run the unit test for DS to generate the required classpath
- that is required in the env of the launch container in the mini mr/yarn cluster -->
- <outputFile>${project.build.directory}/test-classes/mrapp-generated-classpath</outputFile>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-antrun-plugin</artifactId>
- <version>1.6</version>
- <executions>
- <execution>
- <configuration>
- <target>
- <!-- needed to include Main class in classpath for mini yarn cluster for unit tests -->
- <echo file="${project.build.directory}/test-classes/mrapp-generated-classpath"
- append="true"
- message=":${project.build.directory}/classes"/>
- </target>
- </configuration>
- <goals>
- <goal>run</goal>
- </goals>
- <phase>generate-test-resources</phase>
- </execution>
</executions>
</plugin>
<plugin>
[2/3] oozie git commit: OOZIE-2590 OYA: Create basic Oozie Launcher
Application Master (rkanter)
Posted by rk...@apache.org.
http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
index 879bfeb..794ad81 100644
--- a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
+++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
@@ -21,6 +21,7 @@ package org.apache.oozie.action.hadoop;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
+import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
@@ -35,18 +36,22 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
+import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.action.ActionExecutor;
@@ -55,6 +60,7 @@ import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.service.ConfigurationService;
+import org.apache.oozie.service.HadoopAccessorException;
import org.apache.oozie.service.HadoopAccessorService;
import org.apache.oozie.service.LiteWorkflowStoreService;
import org.apache.oozie.service.Services;
@@ -62,7 +68,6 @@ import org.apache.oozie.service.ShareLibService;
import org.apache.oozie.service.UUIDService;
import org.apache.oozie.service.WorkflowAppService;
import org.apache.oozie.service.WorkflowStoreService;
-import org.apache.oozie.util.FSUtils;
import org.apache.oozie.util.IOUtils;
import org.apache.oozie.util.XConfiguration;
import org.apache.oozie.util.XmlUtils;
@@ -336,7 +341,8 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase {
return new Context(wf, action);
}
- protected RunningJob submitAction(Context context, JavaActionExecutor javaActionExecutor) throws Exception {
+ // TODO: OYA: void
+ protected RunningJob submitAction(Context context, JavaActionExecutor javaActionExecutor) throws ActionExecutorException {
WorkflowAction action = context.getAction();
javaActionExecutor.prepareActionDir(getFileSystem(), context);
@@ -348,21 +354,37 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase {
assertNotNull(jobId);
assertNotNull(jobTracker);
assertNotNull(consoleUrl);
-
- JobConf jobConf = Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker);
- jobConf.set("yarn.resourcemanager.address", jobTracker);
-
- JobClient jobClient =
- Services.get().get(HadoopAccessorService.class).createJobClient(getTestUser(), jobConf);
- final RunningJob runningJob = jobClient.getJob(JobID.forName(jobId));
- assertNotNull(runningJob);
- return runningJob;
+ return null;
}
- protected RunningJob submitAction(Context context) throws Exception {
+ // TODO: OYA: void
+ protected RunningJob submitAction(Context context) throws ActionExecutorException {
return submitAction(context, new JavaActionExecutor());
}
+ private void waitUntilYarnAppState(String externalId, final YarnApplicationState state)
+ throws HadoopAccessorException, IOException, YarnException {
+ final ApplicationId appId = ConverterUtils.toApplicationId(externalId);
+
+ JobConf jobConf = Services.get().get(HadoopAccessorService.class).createJobConf(getJobTrackerUri());
+ // This is needed here because we need a mutable final YarnClient
+ final MutableObject<YarnClient> yarnClientMO = new MutableObject<YarnClient>(null);
+ try {
+ yarnClientMO.setValue(Services.get().get(HadoopAccessorService.class).createYarnClient(getTestUser(), jobConf));
+ waitFor(60 * 1000, new Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return yarnClientMO.getValue().getApplicationReport(appId).getYarnApplicationState().equals(state);
+ }
+ });
+ } finally {
+ if (yarnClientMO.getValue() != null) {
+ yarnClientMO.getValue().close();
+ }
+ }
+ assertTrue(yarnClientMO.getValue().getApplicationReport(appId).getYarnApplicationState().equals(state));
+ }
+
public void testSimpestSleSubmitOK() throws Exception {
String actionXml = "<java>" +
"<job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
@@ -370,14 +392,8 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase {
"<main-class>" + LauncherMainTester.class.getName() + "</main-class>" +
"</java>";
Context context = createContext(actionXml, null);
- final RunningJob runningJob = submitAction(context);
- waitFor(60 * 1000, new Predicate() {
- @Override
- public boolean evaluate() throws Exception {
- return runningJob.isComplete();
- }
- });
- assertTrue(runningJob.isSuccessful());
+ submitAction(context);
+ waitUntilYarnAppState(context.getAction().getExternalId(), YarnApplicationState.FINISHED);
ActionExecutor ae = new JavaActionExecutor();
ae.check(context, context.getAction());
assertEquals("SUCCEEDED", context.getAction().getExternalStatus());
@@ -1817,118 +1833,12 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase {
JavaActionExecutor jae = new JavaActionExecutor("java");
jae.setupLauncherConf(conf, xml, appPath, createContext("<java/>", null));
assertEquals(5, conf.size());
- assertEquals("true", conf.get("mapreduce.job.ubertask.enable"));
assertEquals("v1", conf.get("oozie.launcher.p1"));
assertEquals("v1", conf.get("p1"));
assertEquals("v2b", conf.get("oozie.launcher.p2"));
assertEquals("v2b", conf.get("p2"));
}
- public void testInjectLauncherUseUberMode() throws Exception {
- // default -- should set to true
- JavaActionExecutor jae = new JavaActionExecutor();
- Configuration conf = new Configuration(false);
- assertNull(conf.get("mapreduce.job.ubertask.enable"));
- jae.injectLauncherUseUberMode(conf);
- assertEquals("true", conf.get("mapreduce.job.ubertask.enable"));
-
- // action conf set to true -- should keep at true
- conf = new Configuration(false);
- assertNull(conf.get("mapreduce.job.ubertask.enable"));
- conf.setBoolean("mapreduce.job.ubertask.enable", true);
- jae.injectLauncherUseUberMode(conf);
- assertEquals("true", conf.get("mapreduce.job.ubertask.enable"));
-
- // action conf set to false -- should keep at false
- conf = new Configuration(false);
- assertNull(conf.get("mapreduce.job.ubertask.enable"));
- conf.setBoolean("mapreduce.job.ubertask.enable", false);
- jae.injectLauncherUseUberMode(conf);
- assertEquals("false", conf.get("mapreduce.job.ubertask.enable"));
-
- // disable at oozie-site level for just the "test" action
- ConfigurationService.setBoolean("oozie.action.test.launcher.mapreduce.job.ubertask.enable", false);
- JavaActionExecutor tjae = new JavaActionExecutor("test");
-
- // default -- should not set
- conf = new Configuration(false);
- assertNull(conf.get("mapreduce.job.ubertask.enable"));
- tjae.injectLauncherUseUberMode(conf);
- assertNull(conf.get("mapreduce.job.ubertask.enable"));
- // default -- should be true
- conf = new Configuration(false);
- assertNull(conf.get("mapreduce.job.ubertask.enable"));
- jae.injectLauncherUseUberMode(conf);
- assertEquals("true", conf.get("mapreduce.job.ubertask.enable"));
-
- // action conf set to true -- should keep at true
- conf = new Configuration(false);
- assertNull(conf.get("mapreduce.job.ubertask.enable"));
- conf.setBoolean("mapreduce.job.ubertask.enable", true);
- tjae.injectLauncherUseUberMode(conf);
- assertEquals("true", conf.get("mapreduce.job.ubertask.enable"));
- // action conf set to true -- should keep at true
- conf = new Configuration(false);
- assertNull(conf.get("mapreduce.job.ubertask.enable"));
- conf.setBoolean("mapreduce.job.ubertask.enable", true);
- jae.injectLauncherUseUberMode(conf);
- assertEquals("true", conf.get("mapreduce.job.ubertask.enable"));
-
- // action conf set to false -- should keep at false
- conf = new Configuration(false);
- assertNull(conf.get("mapreduce.job.ubertask.enable"));
- conf.setBoolean("mapreduce.job.ubertask.enable", false);
- tjae.injectLauncherUseUberMode(conf);
- assertEquals("false", conf.get("mapreduce.job.ubertask.enable"));
- // action conf set to false -- should keep at false
- conf = new Configuration(false);
- assertNull(conf.get("mapreduce.job.ubertask.enable"));
- conf.setBoolean("mapreduce.job.ubertask.enable", false);
- jae.injectLauncherUseUberMode(conf);
- assertEquals("false", conf.get("mapreduce.job.ubertask.enable"));
-
- // disable at oozie-site level for all actions except for the "test" action
- ConfigurationService.setBoolean("oozie.action.test.launcher.mapreduce.job.ubertask.enable", true);
- ConfigurationService.setBoolean("oozie.action.launcher.mapreduce.job.ubertask.enable", false);
-
- // default -- should be true
- conf = new Configuration(false);
- assertNull(conf.get("mapreduce.job.ubertask.enable"));
- tjae.injectLauncherUseUberMode(conf);
- assertEquals("true", conf.get("mapreduce.job.ubertask.enable"));
- // default -- should not set
- conf = new Configuration(false);
- assertNull(conf.get("mapreduce.job.ubertask.enable"));
- jae.injectLauncherUseUberMode(conf);
- assertNull(conf.get("mapreduce.job.ubertask.enable"));
-
- // action conf set to true -- should keep at true
- conf = new Configuration(false);
- assertNull(conf.get("mapreduce.job.ubertask.enable"));
- conf.setBoolean("mapreduce.job.ubertask.enable", true);
- tjae.injectLauncherUseUberMode(conf);
- assertEquals("true", conf.get("mapreduce.job.ubertask.enable"));
- // action conf set to true -- should keep at true
- conf = new Configuration(false);
- assertNull(conf.get("mapreduce.job.ubertask.enable"));
- conf.setBoolean("mapreduce.job.ubertask.enable", true);
- jae.injectLauncherUseUberMode(conf);
- assertEquals("true", conf.get("mapreduce.job.ubertask.enable"));
-
- // action conf set to false -- should keep at false
- conf = new Configuration(false);
- assertNull(conf.get("mapreduce.job.ubertask.enable"));
- conf.setBoolean("mapreduce.job.ubertask.enable", false);
- tjae.injectLauncherUseUberMode(conf);
- assertEquals("false", conf.get("mapreduce.job.ubertask.enable"));
- // action conf set to false -- should keep at false
- conf = new Configuration(false);
- assertNull(conf.get("mapreduce.job.ubertask.enable"));
- conf.setBoolean("mapreduce.job.ubertask.enable", false);
- jae.injectLauncherUseUberMode(conf);
- assertEquals("false", conf.get("mapreduce.job.ubertask.enable"));
- }
-
public void testUpdateConfForJavaTmpDir() throws Exception {
//Test UpdateCOnfForJavaTmpDir for launcherConf
@@ -1993,355 +1903,6 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase {
assertEquals("-Xmx2560m -XX:NewRatio=8", jobConf.get(JavaActionExecutor.HADOOP_REDUCE_JAVA_OPTS));
assertEquals("-Xmx1024m -Djava.io.tmpdir=./tmp", jobConf.get(JavaActionExecutor.YARN_AM_COMMAND_OPTS));
}
- public void testUpdateConfForUberMode() throws Exception {
- Element actionXml1 = XmlUtils
- .parseXml("<java>"
- + "<job-tracker>"
- + getJobTrackerUri()
- + "</job-tracker>"
- + "<name-node>"
- + getNameNodeUri()
- + "</name-node>"
- + "<configuration>"
- + "<property><name>oozie.launcher.mapreduce.map.memory.mb</name><value>2048</value></property>"
- + "<property><name>oozie.launcher.mapred.child.java.opts</name>"
- + "<value>-Xmx2048m -Djava.net.preferIPv4Stack=true</value></property>"
- + "<property><name>oozie.launcher.mapred.child.env</name><value>A=foo</value></property>"
- + "</configuration>" + "<main-class>MAIN-CLASS</main-class>" + "</java>");
- JavaActionExecutor ae = new JavaActionExecutor();
- XConfiguration protoConf = new XConfiguration();
- protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser());
-
- WorkflowJobBean wf = createBaseWorkflow(protoConf, "action");
- WorkflowActionBean action = (WorkflowActionBean) wf.getActions().get(0);
- action.setType(ae.getType());
-
- Context context = new Context(wf, action);
- JobConf launcherConf = new JobConf();
- launcherConf = ae.createLauncherConf(getFileSystem(), context, action, actionXml1, launcherConf);
- // memoryMB (2048 + 512)
- assertEquals("2560", launcherConf.get(JavaActionExecutor.YARN_AM_RESOURCE_MB));
- // heap size in child.opts (2048 + 512)
- int heapSize = ae.extractHeapSizeMB(launcherConf.get(JavaActionExecutor.YARN_AM_COMMAND_OPTS));
- assertEquals("-Xmx2048m -Djava.net.preferIPv4Stack=true",
- launcherConf.get("mapred.child.java.opts"));
- assertEquals("-Xmx2048m -Djava.net.preferIPv4Stack=true",
- launcherConf.get("mapreduce.map.java.opts"));
-
- assertEquals("-Xmx1024m -Xmx2048m -Djava.net.preferIPv4Stack=true -Xmx2560m -Djava.io.tmpdir=./tmp",
- launcherConf.get(JavaActionExecutor.YARN_AM_COMMAND_OPTS).trim());
-
- assertEquals(2560, heapSize);
-
- // env
- assertEquals("A=foo", launcherConf.get(JavaActionExecutor.YARN_AM_ENV));
-
- Element actionXml2 = XmlUtils
- .parseXml("<java>"
- + "<job-tracker>"
- + getJobTrackerUri()
- + "</job-tracker>"
- + "<name-node>"
- + getNameNodeUri()
- + "</name-node>"
- + "<configuration>"
- + "<property><name>oozie.launcher.yarn.app.mapreduce.am.resource.mb</name><value>3072</value></property>"
- + "<property><name>oozie.launcher.mapreduce.map.memory.mb</name><value>2048</value></property>"
- + "<property><name>oozie.launcher.yarn.app.mapreduce.am.command-opts</name>"
- + "<value>-Xmx1024m -Djava.net.preferIPv4Stack=true </value></property>"
- + "<property><name>oozie.launcher.mapred.child.java.opts</name><value>-Xmx1536m</value></property>"
- + "<property><name>oozie.launcher.mapreduce.map.java.opts</name>"
- + "<value>-Xmx2560m -XX:NewRatio=8</value></property>"
- + "<property><name>oozie.launcher.yarn.app.mapreduce.am.env</name><value>A=foo</value></property>"
- + "<property><name>oozie.launcher.mapred.child.env</name><value>B=bar</value></property>"
- + "</configuration>" + "<main-class>MAIN-CLASS</main-class>" + "</java>");
-
- launcherConf = ae.createLauncherConf(getFileSystem(), context, action, actionXml2, launcherConf);
-
- // memoryMB (3072 + 512)
- assertEquals("3584", launcherConf.get(JavaActionExecutor.YARN_AM_RESOURCE_MB));
-
- // heap size (2560 + 512)
- heapSize = ae.extractHeapSizeMB(launcherConf.get(JavaActionExecutor.YARN_AM_COMMAND_OPTS));
- assertEquals("-Xmx1536m -Xmx2560m -XX:NewRatio=8", launcherConf.get("mapred.child.java.opts"));
- assertEquals("-Xmx1536m -Xmx2560m -XX:NewRatio=8", launcherConf.get("mapreduce.map.java.opts"));
- assertEquals("-Xmx1024m -Djava.net.preferIPv4Stack=true -Xmx1536m -Xmx2560m -XX:NewRatio=8 " +
- "-Xmx3072m -Djava.io.tmpdir=./tmp", launcherConf.get(JavaActionExecutor.YARN_AM_COMMAND_OPTS).trim());
- assertEquals(3072, heapSize);
-
- // env (equqls to mapreduce.map.env + am.env)
- assertTrue(launcherConf.get(JavaActionExecutor.YARN_AM_ENV).trim().equals("A=foo,B=bar"));
-
- // Test limit is applied in case of 32 bit
- Element actionXml3 = XmlUtils
- .parseXml("<java>"
- + "<job-tracker>"
- + getJobTrackerUri()
- + "</job-tracker>"
- + "<name-node>"
- + getNameNodeUri()
- + "</name-node>"
- + "<configuration>"
- + "<property><name>oozie.launcher.yarn.app.mapreduce.am.resource.mb</name><value>3072</value></property>"
- + "<property><name>oozie.launcher.mapreduce.map.memory.mb</name><value>4000</value></property>"
- + "<property><name>oozie.launcher.yarn.app.mapreduce.am.command-opts</name>"
- + "<value>-Xmx1024m -Djava.net.preferIPv4Stack=true</value></property>"
- + "<property><name>oozie.launcher.mapred.child.java.opts</name><value>-Xmx1536m</value></property>"
- + "<property><name>oozie.launcher.mapreduce.map.java.opts</name>"
- + "<value>-Xmx4000m -XX:NewRatio=8</value></property>"
- + "<property><name>oozie.launcher.yarn.app.mapreduce.am.env</name><value>A=foo</value></property>"
- + "<property><name>oozie.launcher.mapred.child.env</name><value>B=bar</value></property>"
- + "</configuration>" + "<main-class>MAIN-CLASS</main-class>" + "</java>");
-
- launcherConf = ae.createBaseHadoopConf(context, actionXml3);
- launcherConf = ae.createLauncherConf(getFileSystem(), context, action, actionXml3, launcherConf);
-
- // memoryMB (limit to 4096)
- assertEquals("4096", launcherConf.get(JavaActionExecutor.YARN_AM_RESOURCE_MB));
-
- // heap size (limit to 3584)
- heapSize = ae.extractHeapSizeMB(launcherConf.get(JavaActionExecutor.YARN_AM_COMMAND_OPTS));
- assertEquals("-Xmx1536m -Xmx4000m -XX:NewRatio=8", launcherConf.get("mapred.child.java.opts"));
- assertEquals("-Xmx1536m -Xmx4000m -XX:NewRatio=8", launcherConf.get("mapreduce.map.java.opts"));
- assertEquals("-Xmx1024m -Djava.net.preferIPv4Stack=true -Xmx1536m -Xmx4000m -XX:NewRatio=8 " +
- "-Xmx3584m -Djava.io.tmpdir=./tmp", launcherConf.get(JavaActionExecutor.YARN_AM_COMMAND_OPTS).trim());
- assertEquals(3584, heapSize);
-
- // env (equqls to mapreduce.map.env + am.env)
- assertEquals("A=foo,B=bar", launcherConf.get(JavaActionExecutor.YARN_AM_ENV));
- }
-
- public void testUpdateConfForUberModeWithEnvDup() throws Exception {
- Element actionXml1 = XmlUtils.parseXml("<java>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>"
- + "<name-node>" + getNameNodeUri() + "</name-node>" + "<configuration>"
- + "<property><name>oozie.launcher.yarn.app.mapreduce.am.env</name>"
- + "<value>JAVA_HOME=/home/blah/java/jdk64/current,A=foo,B=bar</value></property>"
- + "<property><name>oozie.launcher.mapreduce.map.env</name>"
- + "<value>JAVA_HOME=/home/blah/java/jdk64/latest,C=blah</value></property>" + "</configuration>"
- + "<main-class>MAIN-CLASS</main-class>" + "</java>");
- JavaActionExecutor ae = new JavaActionExecutor();
- XConfiguration protoConf = new XConfiguration();
- protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser());
-
- WorkflowJobBean wf = createBaseWorkflow(protoConf, "action");
- WorkflowActionBean action = (WorkflowActionBean) wf.getActions().get(0);
- action.setType(ae.getType());
-
- Context context = new Context(wf, action);
- JobConf launcherConf = new JobConf();
- launcherConf = ae.createLauncherConf(getFileSystem(), context, action, actionXml1, launcherConf);
-
- // uber mode should be disabled since JAVA_HOME points to different paths in am.evn and map.env
- assertEquals("false", launcherConf.get(JavaActionExecutor.HADOOP_YARN_UBER_MODE));
-
- // testing complicated env setting case
- Element actionXml2 = XmlUtils.parseXml("<java>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>"
- + "<name-node>" + getNameNodeUri() + "</name-node>" + "<configuration>" + "<property>"
- + "<name>oozie.launcher.yarn.app.mapreduce.am.env</name>"
- + "<value>LD_LIBRARY_PATH=$HADOOP_HOME_1/lib/native/`$JAVA_HOME/bin/java -d32 -version;"
- + "if [ $? -eq 0 ]; then echo Linux-i386-32; else echo Linux-amd64-64;fi`</value></property>"
- + "<property>" + "<name>oozie.launcher.mapreduce.map.env</name>"
- + "<value>LD_LIBRARY_PATH=$HADOOP_HOME_2/lib/native/`$JAVA_HOME/bin/java -d32 -version;"
- + "if [ $? -eq 0 ]; then echo Linux-i386-32; else echo Linux-amd64-64;fi`</value></property>"
- + "</configuration>" + "<main-class>MAIN-CLASS</main-class>" + "</java>");
-
- launcherConf = ae.createLauncherConf(getFileSystem(), context, action, actionXml2, launcherConf);
-
- // uber mode should be disabled since LD_LIBRARY_PATH is different in am.evn and map.env
- assertEquals("false", launcherConf.get(JavaActionExecutor.HADOOP_YARN_UBER_MODE));
-
- Element actionXml3 = XmlUtils
- .parseXml("<java>"
- + "<job-tracker>"
- + getJobTrackerUri()
- + "</job-tracker>"
- + "<name-node>"
- + getNameNodeUri()
- + "</name-node>"
- + "<configuration>"
- + "<property><name>oozie.launcher.yarn.app.mapreduce.am.env</name>"
- + "<value>JAVA_HOME=/home/blah/java/jdk64/current,PATH=A,PATH=B</value></property>"
- + "<property><name>oozie.launcher.mapreduce.map.env</name>"
- + "<value>JAVA_HOME=/home/blah/java/jdk64/current,PATH=A</value></property>"
- + "</configuration>" + "<main-class>MAIN-CLASS</main-class>" + "</java>");
-
- launcherConf = ae.createBaseHadoopConf(context, actionXml3);
- launcherConf = ae.createLauncherConf(getFileSystem(), context, action, actionXml3, launcherConf);
-
- // uber mode should be enabled since JAVA_HOME is the same, and PATH doesn't conflict
- assertEquals("true", launcherConf.get(JavaActionExecutor.HADOOP_YARN_UBER_MODE));
-
- // JAVA_HOME, PATH=A duplication is removed
- String a = launcherConf.get(JavaActionExecutor.YARN_AM_ENV);
- assertEquals("JAVA_HOME=/home/blah/java/jdk64/current,PATH=A,PATH=B",
- launcherConf.get(JavaActionExecutor.YARN_AM_ENV));
- }
-
- public void testUpdateConfForUberModeForJavaOpts() throws Exception {
- Element actionXml1 = XmlUtils
- .parseXml("<java>"
- + "<job-tracker>"
- + getJobTrackerUri()
- + "</job-tracker>"
- + "<name-node>"
- + getNameNodeUri()
- + "</name-node>"
- + "<configuration>"
- + "<property><name>oozie.launcher.yarn.app.mapreduce.am.command-opts</name>"
- + "<value>-Xmx1024m -Djava.net.preferIPv4Stack=true </value></property>"
- + "<property><name>oozie.launcher.mapreduce.map.java.opts</name><value>-Xmx1536m</value></property>"
- + "</configuration>" + "<main-class>MAIN-CLASS</main-class>"
- + "<java-opt>-Xmx2048m</java-opt>"
- + "<java-opt>-Dkey1=val1</java-opt>"
- + "<java-opt>-Dkey2=val2</java-opt>"
- + "</java>");
- JavaActionExecutor ae = new JavaActionExecutor();
- XConfiguration protoConf = new XConfiguration();
- protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser());
-
- WorkflowJobBean wf = createBaseWorkflow(protoConf, "action");
- WorkflowActionBean action = (WorkflowActionBean) wf.getActions().get(0);
- action.setType(ae.getType());
-
- Context context = new Context(wf, action);
- JobConf launcherConf = new JobConf();
- launcherConf = ae.createLauncherConf(getFileSystem(), context, action, actionXml1, launcherConf);
-
- // heap size (2048 + 512)
- int heapSize = ae.extractHeapSizeMB(launcherConf.get(JavaActionExecutor.YARN_AM_COMMAND_OPTS));
- assertEquals("-Xmx200m -Xmx1536m -Xmx2048m -Dkey1=val1 -Dkey2=val2",
- launcherConf.get("mapred.child.java.opts"));
- assertEquals("-Xmx200m -Xmx1536m -Xmx2048m -Dkey1=val1 -Dkey2=val2",
- launcherConf.get("mapreduce.map.java.opts"));
- assertEquals("-Xmx1024m -Djava.net.preferIPv4Stack=true -Xmx200m -Xmx1536m -Xmx2048m -Dkey1=val1 -Dkey2=val2 -Xmx2560m " +
- "-Djava.io.tmpdir=./tmp", launcherConf.get(JavaActionExecutor.YARN_AM_COMMAND_OPTS).trim());
- assertEquals(2560, heapSize);
-
- Element actionXml2 = XmlUtils
- .parseXml("<java>"
- + "<job-tracker>"
- + getJobTrackerUri()
- + "</job-tracker>"
- + "<name-node>"
- + getNameNodeUri()
- + "</name-node>"
- + "<configuration>"
- + "<property><name>oozie.launcher.yarn.app.mapreduce.am.command-opts</name>"
- + "<value>-Xmx1024m -Djava.net.preferIPv4Stack=true </value></property>"
- + "<property><name>oozie.launcher.mapreduce.map.java.opts</name><value>-Xmx1536m</value></property>"
- + "</configuration>" + "<main-class>MAIN-CLASS</main-class>"
- + "<java-opts>-Xmx2048m -Dkey1=val1</java-opts>"
- + "</java>");
-
- launcherConf = ae.createLauncherConf(getFileSystem(), context, action, actionXml2, launcherConf);
-
- // heap size (2048 + 512)
- heapSize = ae.extractHeapSizeMB(launcherConf.get(JavaActionExecutor.YARN_AM_COMMAND_OPTS));
- assertEquals("-Xmx200m -Xmx1536m -Xmx2048m -Dkey1=val1",
- launcherConf.get("mapred.child.java.opts"));
- assertEquals("-Xmx200m -Xmx1536m -Xmx2048m -Dkey1=val1",
- launcherConf.get("mapreduce.map.java.opts"));
- assertEquals("-Xmx1024m -Djava.net.preferIPv4Stack=true -Xmx200m -Xmx1536m -Xmx2048m -Dkey1=val1 -Xmx2560m " +
- "-Djava.io.tmpdir=./tmp", launcherConf.get(JavaActionExecutor.YARN_AM_COMMAND_OPTS).trim());
- assertEquals(2560, heapSize);
-
- Element actionXml3 = XmlUtils
- .parseXml("<java>"
- + "<job-tracker>"
- + getJobTrackerUri()
- + "</job-tracker>"
- + "<name-node>"
- + getNameNodeUri()
- + "</name-node>"
- + "<configuration>"
- + "<property><name>oozie.launcher.yarn.app.mapreduce.am.command-opts</name>"
- + "<value>-Xmx2048m -Djava.net.preferIPv4Stack=true </value></property>"
- + "<property><name>oozie.launcher.mapreduce.map.java.opts</name><value>-Xmx3072m</value></property>"
- + "</configuration>" + "<main-class>MAIN-CLASS</main-class>"
- + "<java-opts>-Xmx1024m -Dkey1=val1</java-opts>"
- + "</java>");
-
- launcherConf = ae.createLauncherConf(getFileSystem(), context, action, actionXml3, launcherConf);
-
- // heap size (2048 + 512)
- heapSize = ae.extractHeapSizeMB(launcherConf.get(JavaActionExecutor.YARN_AM_COMMAND_OPTS));
- assertEquals("-Xmx200m -Xmx3072m -Xmx1024m -Dkey1=val1",
- launcherConf.get("mapred.child.java.opts"));
- assertEquals("-Xmx200m -Xmx3072m -Xmx1024m -Dkey1=val1",
- launcherConf.get("mapreduce.map.java.opts"));
- assertEquals("-Xmx2048m -Djava.net.preferIPv4Stack=true -Xmx200m -Xmx3072m -Xmx1024m -Dkey1=val1 -Xmx2560m " +
- "-Djava.io.tmpdir=./tmp", launcherConf.get(JavaActionExecutor.YARN_AM_COMMAND_OPTS).trim());
- assertEquals(2560, heapSize);
- }
-
- public void testDisableUberForProperties() throws Exception {
- Element actionXml1 = XmlUtils.parseXml("<java>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>"
- + "<name-node>" + getNameNodeUri() + "</name-node>"
- + "<configuration>"
- + "<property><name>oozie.launcher.mapreduce.job.classloader</name>"
- + "<value>true</value></property>"
- + "</configuration>"
- + "<main-class>MAIN-CLASS</main-class>" + "</java>");
- JavaActionExecutor ae = new JavaActionExecutor();
- XConfiguration protoConf = new XConfiguration();
- protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser());
-
- WorkflowJobBean wf = createBaseWorkflow(protoConf, "action");
- WorkflowActionBean action = (WorkflowActionBean) wf.getActions().get(0);
- action.setType(ae.getType());
-
- Context context = new Context(wf, action);
- JobConf launcherConf = new JobConf();
- launcherConf = ae.createLauncherConf(getFileSystem(), context, action, actionXml1, launcherConf);
-
- // uber mode should be disabled since oozie.launcher.mapreduce.job.classloader=true
- assertEquals("false", launcherConf.get(JavaActionExecutor.HADOOP_YARN_UBER_MODE));
-
- Element actionXml2 = XmlUtils.parseXml("<java>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>"
- + "<name-node>" + getNameNodeUri() + "</name-node>"
- + "<configuration>"
- + "<property><name>oozie.launcher.mapreduce.user.classpath.first</name>"
- + "<value>true</value></property>"
- + "</configuration>"
- + "<main-class>MAIN-CLASS</main-class>" + "</java>");
- ae = new JavaActionExecutor();
- protoConf = new XConfiguration();
- protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser());
-
- wf = createBaseWorkflow(protoConf, "action");
- action = (WorkflowActionBean) wf.getActions().get(0);
- action.setType(ae.getType());
-
- context = new Context(wf, action);
- launcherConf = new JobConf();
- launcherConf = ae.createLauncherConf(getFileSystem(), context, action, actionXml2, launcherConf);
-
- // uber mode should be disabled since oozie.launcher.mapreduce.user.classpath.first=true
- assertEquals("false", launcherConf.get(JavaActionExecutor.HADOOP_YARN_UBER_MODE));
- }
-
- public void testDisableUberForUserProperties() throws Exception {
- Element actionXml1 = XmlUtils.parseXml("<java>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>"
- + "<name-node>" + getNameNodeUri() + "</name-node>"
- + "<configuration>"
- + "<property><name>oozie.launcher.mapreduce.job.ubertask.enable</name>"
- + "<value>false</value></property>"
- + "</configuration>"
- + "<main-class>MAIN-CLASS</main-class>" + "</java>");
- JavaActionExecutor ae = new JavaActionExecutor();
- XConfiguration protoConf = new XConfiguration();
- protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser());
-
- WorkflowJobBean wf = createBaseWorkflow(protoConf, "action");
- WorkflowActionBean action = (WorkflowActionBean) wf.getActions().get(0);
- action.setType(ae.getType());
-
- Context context = new Context(wf, action);
- JobConf launcherConf = new JobConf();
- launcherConf = ae.createLauncherConf(getFileSystem(), context, action, actionXml1, launcherConf);
- // uber mode should be disabled since oozie.launcher.mapreduce.job.classloader=true
- assertEquals("false", launcherConf.get(JavaActionExecutor.HADOOP_YARN_UBER_MODE));
- }
public void testUpdateConfForTimeLineServiceEnabled() throws Exception {
Element actionXml = XmlUtils
@@ -2766,12 +2327,8 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase {
assertEquals("AA", conf.get("a"));
assertEquals("action.barbar", conf.get("oozie.launcher.action.foofoo"));
assertEquals("action.barbar", conf.get("action.foofoo"));
- assertEquals("true", conf.get("mapreduce.job.ubertask.enable"));
- if (conf.size() == 7) {
- assertEquals(getJobTrackerUri(), conf.get("yarn.resourcemanager.address"));
- } else {
- assertEquals(6, conf.size());
- }
+ assertEquals(getJobTrackerUri(), conf.get("yarn.resourcemanager.address"));
+ assertEquals(6, conf.size());
conf = new Configuration(false);
Assert.assertEquals(0, conf.size());
@@ -2780,12 +2337,8 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase {
assertEquals(getJobTrackerUri(), conf.get("yarn.resourcemanager.address"));
assertEquals("action.barbar", conf.get("oozie.launcher.action.foofoo"));
assertEquals("action.barbar", conf.get("action.foofoo"));
- assertEquals("true", conf.get("mapreduce.job.ubertask.enable"));
- if (conf.size() == 5) {
- assertEquals(getJobTrackerUri(), conf.get("mapreduce.jobtracker.address"));
- } else {
- assertEquals(4, conf.size());
- }
+ assertEquals(getJobTrackerUri(), conf.get("mapreduce.jobtracker.address"));
+ assertEquals(4, conf.size());
}
public void testSetRootLoggerLevel() throws Exception {
http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAM.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAM.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAM.java
new file mode 100644
index 0000000..ed29299
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAM.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.action.hadoop;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.oozie.service.HadoopAccessorService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XFsTestCase;
+import org.apache.oozie.util.IOUtils;
+import org.apache.oozie.util.XConfiguration;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.Writer;
+import java.net.URI;
+import java.util.Map;
+
+public class TestLauncherAM extends XFsTestCase {
+
+
+ // TODO: OYA: write tests later
+
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAMCallbackNotifier.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAMCallbackNotifier.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAMCallbackNotifier.java
new file mode 100644
index 0000000..d0b4d5b
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAMCallbackNotifier.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.action.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.oozie.QueryServlet;
+import org.apache.oozie.command.wf.HangServlet;
+import org.apache.oozie.test.EmbeddedServletContainer;
+import org.apache.oozie.test.XTestCase;
+import org.junit.Assert;
+import org.mockito.Mockito;
+
+import java.net.Proxy;
+import java.util.HashMap;
+import java.util.Map;
+
+// A lot of this adapted from org.apache.hadoop.mapreduce.v2.app.TestJobEndNotifier and org.apache.hadoop.mapred.TestJobEndNotifier
+public class TestLauncherAMCallbackNotifier extends XTestCase {
+
+ public void testConfiguration() throws Exception {
+ Configuration conf = new Configuration(false);
+
+ conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_MAX_ATTEMPTS, "0");
+ conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_ATTEMPTS, "10");
+ LauncherAMCallbackNotifier cn = new LauncherAMCallbackNotifier(conf);
+ assertEquals(0, cn.numTries);
+ conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_MAX_ATTEMPTS, "1");
+ cn = new LauncherAMCallbackNotifier(conf);
+ assertEquals(1, cn.numTries);
+ conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_MAX_ATTEMPTS, "20");
+ cn = new LauncherAMCallbackNotifier(conf);
+ assertEquals(11, cn.numTries); //11 because number of _retries_ is 10
+
+ conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL, "1000");
+ cn = new LauncherAMCallbackNotifier(conf);
+ assertEquals(1000, cn.waitInterval);
+ conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL, "10000");
+ cn = new LauncherAMCallbackNotifier(conf);
+ assertEquals(5000, cn.waitInterval);
+ //Test negative numbers are set to default
+ conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL, "-10");
+ cn = new LauncherAMCallbackNotifier(conf);
+ assertEquals(5000, cn.waitInterval);
+
+ conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_TIMEOUT, "1000");
+ cn = new LauncherAMCallbackNotifier(conf);
+ assertEquals(1000, cn.timeout);
+
+ conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_PROXY, "somehost");
+ cn = new LauncherAMCallbackNotifier(conf);
+ assertEquals(Proxy.Type.DIRECT, cn.proxyToUse.type());
+ conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_PROXY, "somehost:someport");
+ cn = new LauncherAMCallbackNotifier(conf);
+ assertEquals(Proxy.Type.DIRECT, cn.proxyToUse.type());
+ conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_PROXY, "somehost:1000");
+ cn = new LauncherAMCallbackNotifier(conf);
+ assertEquals("HTTP @ somehost:1000", cn.proxyToUse.toString());
+ conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_PROXY, "socks@somehost:1000");
+ cn = new LauncherAMCallbackNotifier(conf);
+ assertEquals("SOCKS @ somehost:1000", cn.proxyToUse.toString());
+ conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_PROXY, "SOCKS@somehost:1000");
+ cn = new LauncherAMCallbackNotifier(conf);
+ assertEquals("SOCKS @ somehost:1000", cn.proxyToUse.toString());
+ conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_PROXY, "sfafn@somehost:1000");
+ cn = new LauncherAMCallbackNotifier(conf);
+ assertEquals("HTTP @ somehost:1000", cn.proxyToUse.toString());
+ }
+
+ public void testNotifyRetries() throws InterruptedException {
+ Configuration conf = new Configuration(false);
+ conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_ATTEMPTS, "0");
+ conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_MAX_ATTEMPTS, "1");
+ conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_URL, "http://nonexistent");
+ conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL, "5000");
+
+ LauncherAMCallbackNotifier cnSpy = Mockito.spy(new LauncherAMCallbackNotifier(conf));
+ long start = System.currentTimeMillis();
+ cnSpy.notifyURL(FinalApplicationStatus.SUCCEEDED);
+ long end = System.currentTimeMillis();
+ Mockito.verify(cnSpy, Mockito.times(1)).notifyURLOnce();
+ Assert.assertTrue("Should have taken more than 5 seconds but it only took " + (end - start), end - start >= 5000);
+
+ conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_MAX_ATTEMPTS, "3");
+ conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_ATTEMPTS, "3");
+ conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL, "3000");
+
+ cnSpy = Mockito.spy(new LauncherAMCallbackNotifier(conf));
+ start = System.currentTimeMillis();
+ cnSpy.notifyURL(FinalApplicationStatus.SUCCEEDED);
+ end = System.currentTimeMillis();
+ Mockito.verify(cnSpy, Mockito.times(3)).notifyURLOnce();
+ Assert.assertTrue("Should have taken more than 9 seconds but it only took " + (end - start), end - start >= 9000);
+ }
+
+ public void testNotifyTimeout() throws Exception {
+ EmbeddedServletContainer container = null;
+ try {
+ container = new EmbeddedServletContainer("blah");
+ Map<String, String> params = new HashMap<String, String>();
+ params.put(HangServlet.SLEEP_TIME_MS, "1000000");
+ container.addServletEndpoint("/hang/*", HangServlet.class, params);
+ container.start();
+
+ Configuration conf = new Configuration(false);
+ conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_ATTEMPTS, "0");
+ conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_MAX_ATTEMPTS, "1");
+ conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_URL, container.getServletURL("/hang/*"));
+ conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL, "5000");
+
+ LauncherAMCallbackNotifier cnSpy = Mockito.spy(new LauncherAMCallbackNotifier(conf));
+ long start = System.currentTimeMillis();
+ cnSpy.notifyURL(FinalApplicationStatus.SUCCEEDED);
+ long end = System.currentTimeMillis();
+ Mockito.verify(cnSpy, Mockito.times(1)).notifyURLOnce();
+ Assert.assertTrue("Should have taken more than 5 seconds but it only took " + (end - start), end - start >= 5000);
+ } finally {
+ if (container != null) {
+ container.stop();
+ }
+ }
+ }
+
+ public void testNotify() throws Exception {
+ EmbeddedServletContainer container = null;
+ try {
+ container = new EmbeddedServletContainer("blah");
+ container.addServletEndpoint("/count/*", QueryServlet.class);
+ container.start();
+
+ Configuration conf = new Configuration(false);
+ conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_ATTEMPTS, "0");
+ conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_MAX_ATTEMPTS, "1");
+ conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_URL, container.getServletURL("/count/?status=$jobStatus"));
+ conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL, "5000");
+
+ LauncherAMCallbackNotifier cn = new LauncherAMCallbackNotifier(conf);
+ QueryServlet.lastQueryString = null;
+ assertNull(QueryServlet.lastQueryString);
+ cn.notifyURL(FinalApplicationStatus.SUCCEEDED);
+ waitFor(5000, new Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return "status=SUCCEEDED".equals(QueryServlet.lastQueryString);
+ }
+ });
+ assertEquals("status=SUCCEEDED", QueryServlet.lastQueryString);
+ } finally {
+ if (container != null) {
+ container.stop();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/test/java/org/apache/oozie/action/hadoop/TestPrepareActionsDriver.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestPrepareActionsDriver.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestPrepareActionsDriver.java
index df9e939..e940d39 100644
--- a/core/src/test/java/org/apache/oozie/action/hadoop/TestPrepareActionsDriver.java
+++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestPrepareActionsDriver.java
@@ -24,6 +24,9 @@ import org.apache.oozie.service.Services;
import org.apache.oozie.test.XFsTestCase;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.mapred.JobConf;
+import org.xml.sax.SAXException;
+
+import javax.xml.parsers.ParserConfigurationException;
public class TestPrepareActionsDriver extends XFsTestCase {
@@ -40,7 +43,7 @@ public class TestPrepareActionsDriver extends XFsTestCase {
}
// Test to check if prepare action is performed as expected when the prepare XML block is a valid one
- public void testDoOperationsWithValidXML() throws LauncherException, IOException {
+ public void testDoOperationsWithValidXML() throws LauncherException, IOException, ParserConfigurationException, SAXException {
Path actionDir = getFsTestCaseDir();
FileSystem fs = getFileSystem();
Path newDir = new Path(actionDir, "newDir");
@@ -57,7 +60,7 @@ public class TestPrepareActionsDriver extends XFsTestCase {
assertTrue(fs.exists(actionDir));
}
- // Test to check if LauncherException is thrown when the prepare XML block is invalid
+ // Test to check if Exception is thrown when the prepare XML block is invalid
public void testDoOperationsWithInvalidXML() throws LauncherException, IOException {
Path actionDir = getFsTestCaseDir();
FileSystem fs = getFileSystem();
@@ -75,11 +78,9 @@ public class TestPrepareActionsDriver extends XFsTestCase {
LauncherMapperHelper.setupLauncherURIHandlerConf(conf);
PrepareActionsDriver.doOperations(prepareXML, conf);
fail("Expected to catch an exception but did not encounter any");
- } catch (LauncherException le) {
- assertEquals(le.getCause().getClass(), org.xml.sax.SAXParseException.class);
- assertEquals(le.getMessage(), "Content is not allowed in prolog.");
- } catch(Exception ex){
- fail("Expected a LauncherException but received an Exception");
+ } catch (Exception ex) {
+ assertEquals(ex.getClass(), org.xml.sax.SAXParseException.class);
+ assertEquals(ex.getMessage(), "Content is not allowed in prolog.");
}
}
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/test/java/org/apache/oozie/action/hadoop/TestShellActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestShellActionExecutor.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestShellActionExecutor.java
index 6a962a1..9468fad 100644
--- a/core/src/test/java/org/apache/oozie/action/hadoop/TestShellActionExecutor.java
+++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestShellActionExecutor.java
@@ -367,25 +367,4 @@ public class TestShellActionExecutor extends ActionExecutorTestCase {
assertNotNull(runningJob);
return runningJob;
}
-
- public void testShellMainPathInUber() throws Exception {
- Services.get().getConf().setBoolean("oozie.action.shell.launcher.mapreduce.job.ubertask.enable", true);
-
- Element actionXml = XmlUtils.parseXml("<shell>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>"
- + "<name-node>" + getNameNodeUri() + "</name-node>" + "<exec>script.sh</exec>"
- + "<argument>a=A</argument>" + "<argument>b=B</argument>" + "</shell>");
- ShellActionExecutor ae = new ShellActionExecutor();
- XConfiguration protoConf = new XConfiguration();
- protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser());
-
- WorkflowJobBean wf = createBaseWorkflow(protoConf, "action");
- WorkflowActionBean action = (WorkflowActionBean) wf.getActions().get(0);
- action.setType(ae.getType());
-
- Context context = new Context(wf, action);
- JobConf launcherConf = new JobConf();
- launcherConf = ae.createLauncherConf(getFileSystem(), context, action, actionXml, launcherConf);
- // env
- assertEquals("PATH=.:$PATH", launcherConf.get(JavaActionExecutor.YARN_AM_ENV));
- }
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/test/java/org/apache/oozie/command/wf/HangServlet.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/wf/HangServlet.java b/core/src/test/java/org/apache/oozie/command/wf/HangServlet.java
index 3344cf9..d90aeb6 100644
--- a/core/src/test/java/org/apache/oozie/command/wf/HangServlet.java
+++ b/core/src/test/java/org/apache/oozie/command/wf/HangServlet.java
@@ -18,6 +18,8 @@
package org.apache.oozie.command.wf;
+import org.apache.oozie.util.XLog;
+
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
@@ -25,14 +27,27 @@ import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
/**
- * Servlet that 'hangs' for 200 ms. Used by TestNotificationXCommand
+ * Servlet that 'hangs' for some amount of time (200ms) by default.
+ * The time can be configured by setting {@link HangServlet#SLEEP_TIME_MS} as an init parameter for the servlet.
*/
public class HangServlet extends HttpServlet {
+ public static final String SLEEP_TIME_MS = "sleep_time_ms";
+
protected void doGet(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
try {
- Thread.sleep(200);
+ long time = 200;
+ String sleeptime = getInitParameter(SLEEP_TIME_MS);
+ if (sleeptime != null) {
+ try {
+ time = Long.parseLong(sleeptime);
+ } catch (NumberFormatException nfe) {
+ XLog.getLog(HangServlet.class).error("Invalid sleep time, using default (200)", nfe);
+ }
+ }
+ XLog.getLog(HangServlet.class).info("Sleeping for " + time + " ms");
+ Thread.sleep(time);
}
catch (Exception ex) {
//NOP
http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/test/java/org/apache/oozie/service/TestConfigurationService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestConfigurationService.java b/core/src/test/java/org/apache/oozie/service/TestConfigurationService.java
index 2153bf1..9b48df5 100644
--- a/core/src/test/java/org/apache/oozie/service/TestConfigurationService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestConfigurationService.java
@@ -209,9 +209,6 @@ public class TestConfigurationService extends XTestCase {
assertEquals(2048, ConfigurationService.getInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA));
assertEquals("http://localhost:8080/oozie?job=", ConfigurationService.get(JobXCommand.CONF_CONSOLE_URL));
- assertEquals(true, ConfigurationService.getBoolean(JavaActionExecutor.CONF_HADOOP_YARN_UBER_MODE));
- assertEquals(false, ConfigurationService.getBoolean(
- "oozie.action.shell.launcher." + JavaActionExecutor.HADOOP_YARN_UBER_MODE));
assertEquals(false, ConfigurationService.getBoolean(HadoopAccessorService.KERBEROS_AUTH_ENABLED));
assertEquals(0, ConfigurationService.getStrings("no.defined").length);
http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/test/java/org/apache/oozie/service/TestHadoopAccessorService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestHadoopAccessorService.java b/core/src/test/java/org/apache/oozie/service/TestHadoopAccessorService.java
index 96faa48..2798719 100644
--- a/core/src/test/java/org/apache/oozie/service/TestHadoopAccessorService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestHadoopAccessorService.java
@@ -18,7 +18,17 @@
package org.apache.oozie.service;
-import org.apache.oozie.test.XTestCase;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.security.authorize.*;
+import org.apache.hadoop.security.authorize.AuthorizationException;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.oozie.test.XFsTestCase;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.fs.FileSystem;
@@ -34,7 +44,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.util.XConfiguration;
-public class TestHadoopAccessorService extends XTestCase {
+public class TestHadoopAccessorService extends XFsTestCase {
protected void setUp() throws Exception {
super.setUp();
@@ -136,45 +146,89 @@ public class TestHadoopAccessorService extends XTestCase {
*/
assertEquals("100", conf.get("action.testprop"));
assertEquals("1", conf.get("default.testprop"));
+ }
+
+ public void testCreateJobClient() throws Exception {
+ HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
+ JobConf conf = has.createJobConf(getJobTrackerUri());
+
+ JobClient jc = has.createJobClient(getTestUser(), conf);
+ assertNotNull(jc);
+ jc.getAllJobs();
+
+ try {
+ has.createJobClient("invalid-user", conf);
+ fail("Should have thrown exception because not allowed to impersonate 'invalid-user'");
+ }
+ catch (HadoopAccessorException ex) {
+ assertEquals(ErrorCode.E0902, ex.getErrorCode());
+ }
+ JobConf conf2 = new JobConf(false);
+ conf2.set("mapred.job.tracker", getJobTrackerUri());
+ try {
+ has.createJobClient(getTestUser(), conf2);
+ fail("Should have thrown exception because Configuration not created by HadoopAccessorService");
+ }
+ catch (HadoopAccessorException ex) {
+ assertEquals(ErrorCode.E0903, ex.getErrorCode());
+ }
}
- public void testAccessor() throws Exception {
- Services services = Services.get();
- HadoopAccessorService has = services.get(HadoopAccessorService.class);
+ public void testCreateYarnClient() throws Exception {
+ HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
JobConf conf = has.createJobConf(getJobTrackerUri());
- conf.set("mapred.job.tracker", getJobTrackerUri());
- conf.set("fs.default.name", getNameNodeUri());
- URI uri = new URI(getNameNodeUri());
+ YarnClient yc = has.createYarnClient(getTestUser(), conf);
+ assertNotNull(yc);
+ yc.getApplications();
- //valid user
- String user = getTestUser();
- String group = getTestGroup();
+ try {
+ yc = has.createYarnClient("invalid-user", conf);
+ assertNotNull(yc);
+ yc.getApplications();
+ fail("Should have thrown exception because not allowed to impersonate 'invalid-user'");
+ }
+ catch (AuthorizationException ex) {
+ }
- JobClient jc = has.createJobClient(user, conf);
- assertNotNull(jc);
- FileSystem fs = has.createFileSystem(user, new URI(getNameNodeUri()), conf);
- assertNotNull(fs);
- fs = has.createFileSystem(user, uri, conf);
- assertNotNull(fs);
+ JobConf conf2 = new JobConf(false);
+ conf2.set("yarn.resourcemanager.address", getJobTrackerUri());
+ try {
+ has.createYarnClient(getTestUser(), conf2);
+ fail("Should have thrown exception because Configuration not created by HadoopAccessorService");
+ }
+ catch (HadoopAccessorException ex) {
+ assertEquals(ErrorCode.E0903, ex.getErrorCode());
+ }
+ }
- //invalid user
+ public void testCreateFileSystem() throws Exception {
+ HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
+ JobConf conf = has.createJobConf(getJobTrackerUri());
- user = "invalid";
+ FileSystem fs = has.createFileSystem(getTestUser(), new URI(getNameNodeUri()), conf);
+ assertNotNull(fs);
+ fs.exists(new Path(getNameNodeUri(), "/foo"));
try {
- has.createJobClient(user, conf);
- fail();
+ fs = has.createFileSystem("invalid-user", new URI(getNameNodeUri()), conf);
+ assertNotNull(fs);
+ fs.exists(new Path(getNameNodeUri(), "/foo"));
+ fail("Should have thrown exception because not allowed to impersonate 'invalid-user'");
}
- catch (Throwable ex) {
+ catch (RemoteException ex) {
+ assertEquals(AuthorizationException.class.getName(), ex.getClassName());
}
+ JobConf conf2 = new JobConf(false);
+ conf2.set("fs.default.name", getNameNodeUri());
try {
- has.createFileSystem(user, uri, conf);
- fail();
+ has.createFileSystem(getTestUser(), new URI(getNameNodeUri()), conf2);
+ fail("Should have thrown exception because Configuration not created by HadoopAccessorService");
}
- catch (Throwable ex) {
+ catch (HadoopAccessorException ex) {
+ assertEquals(ErrorCode.E0903, ex.getErrorCode());
}
}
@@ -290,4 +344,21 @@ public class TestHadoopAccessorService extends XTestCase {
}
has.destroy();
}
+
+ public void testCreateLocalResourceForConfigurationFile() throws Exception {
+ HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
+ String filename = "foo.xml";
+ Configuration conf = has.createJobConf(getNameNodeUri());
+ conf.set("foo", "bar");
+ LocalResource lRes = has.createLocalResourceForConfigurationFile(filename, getTestUser(), conf, getFileSystem().getUri(),
+ getFsTestCaseDir());
+ assertNotNull(lRes);
+ assertEquals(LocalResourceType.FILE, lRes.getType());
+ assertEquals(LocalResourceVisibility.APPLICATION, lRes.getVisibility());
+ Path resPath = ConverterUtils.getPathFromYarnURL(lRes.getResource());
+ assertEquals(new Path(getFsTestCaseDir(), "foo.xml"), resPath);
+ Configuration conf2 = new Configuration(false);
+ conf2.addResource(getFileSystem().open(resPath));
+ assertEquals("bar", conf2.get("foo"));
+ }
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/test/java/org/apache/oozie/test/XTestCase.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/test/XTestCase.java b/core/src/test/java/org/apache/oozie/test/XTestCase.java
index e360369..81a33fd 100644
--- a/core/src/test/java/org/apache/oozie/test/XTestCase.java
+++ b/core/src/test/java/org/apache/oozie/test/XTestCase.java
@@ -82,6 +82,7 @@ import org.apache.oozie.sla.SLASummaryBean;
import org.apache.oozie.store.StoreException;
import org.apache.oozie.test.MiniHCatServer.RUNMODE;
import org.apache.oozie.test.hive.MiniHS2;
+import org.apache.oozie.util.ClasspathUtils;
import org.apache.oozie.util.IOUtils;
import org.apache.oozie.util.ParamChecker;
import org.apache.oozie.util.XConfiguration;
@@ -877,6 +878,7 @@ public abstract class XTestCase extends TestCase {
private static MiniDFSCluster dfsCluster = null;
private static MiniDFSCluster dfsCluster2 = null;
+ // TODO: OYA: replace with MiniYarnCluster or MiniMRYarnCluster
private static MiniMRCluster mrCluster = null;
private static MiniHCatServer hcatServer = null;
private static MiniHS2 hiveserver2 = null;
@@ -886,6 +888,8 @@ public abstract class XTestCase extends TestCase {
if (System.getProperty("hadoop.log.dir") == null) {
System.setProperty("hadoop.log.dir", testCaseDir);
}
+ // Tell the ClasspathUtils that we're using a mini cluster
+ ClasspathUtils.setUsingMiniYarnCluster(true);
int taskTrackers = 2;
int dataNodes = 2;
String oozieUser = getOozieUser();
http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/test/java/org/apache/oozie/util/TestClasspathUtils.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/util/TestClasspathUtils.java b/core/src/test/java/org/apache/oozie/util/TestClasspathUtils.java
new file mode 100644
index 0000000..3a7215b
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/util/TestClasspathUtils.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.util;
+
+import junit.framework.TestCase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.oozie.test.XFsTestCase;
+import org.apache.oozie.test.XTestCase;
+
+import java.net.URI;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+public class TestClasspathUtils extends XFsTestCase {
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ // This is normally true, and adds the entirety of the current classpath in ClasspathUtils, which we don't want to test or
+ // worry about here. Temporarily set this back to false so it behaves normally.
+ ClasspathUtils.setUsingMiniYarnCluster(false);
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ // Make sure to turn this back on for subsequent tests
+ ClasspathUtils.setUsingMiniYarnCluster(true);
+ super.tearDown();
+ }
+
+ public void testSetupClasspath() throws Exception {
+ Configuration conf = new Configuration(false);
+ Map<String, String> env = new HashMap<String, String>();
+
+ Path p1 = new Path(getFsTestCaseDir(), "foo.xml");
+ getFileSystem().createNewFile(p1);
+ DistributedCache.addFileToClassPath(p1, conf);
+
+ Path p2 = new Path(getFsTestCaseDir(), "foo.txt");
+ getFileSystem().createNewFile(p2);
+ DistributedCache.addFileToClassPath(p2, conf);
+
+ Path p3 = new Path(getFsTestCaseDir(), "foo.zip");
+ getFileSystem().createNewFile(p3);
+ DistributedCache.addArchiveToClassPath(p3, conf);
+
+ ClasspathUtils.setupClasspath(env, conf);
+
+ assertEquals(2, env.size());
+ assertTrue(env.containsKey("CLASSPATH"));
+ String[] paths = env.get("CLASSPATH").split(":");
+ assertEquals(12, paths.length);
+ Arrays.sort(paths);
+ assertEquals("$HADOOP_COMMON_HOME/share/hadoop/common/*", paths[0]);
+ assertEquals("$HADOOP_COMMON_HOME/share/hadoop/common/lib/*", paths[1]);
+ assertEquals("$HADOOP_CONF_DIR", paths[2]);
+ assertEquals("$HADOOP_HDFS_HOME/share/hadoop/hdfs/*", paths[3]);
+ assertEquals("$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*", paths[4]);
+ assertEquals("$HADOOP_YARN_HOME/share/hadoop/yarn/*", paths[5]);
+ assertEquals("$HADOOP_YARN_HOME/share/hadoop/yarn/lib/*", paths[6]);
+ assertEquals("$PWD", paths[7]);
+ assertEquals("$PWD/*", paths[8]);
+ assertEquals("job.jar/classes/", paths[9]);
+ assertEquals("job.jar/job.jar", paths[10]);
+ assertEquals("job.jar/lib/*", paths[11]);
+
+ assertTrue(env.containsKey("$PWD"));
+ paths = env.get("$PWD").split(":");
+ assertEquals(3, paths.length);
+ Arrays.sort(paths);
+ assertEquals("$PWD/foo.txt", paths[0]);
+ assertEquals("$PWD/foo.xml", paths[1]);
+ assertEquals("$PWD/foo.zip", paths[2]);
+ }
+
+ public void testAddMapReduceToClasspath() throws Exception {
+ Configuration conf = new Configuration(false);
+ Map<String, String> env = new HashMap<String, String>();
+
+ ClasspathUtils.addMapReduceToClasspath(env, conf);
+
+ assertEquals(1, env.size());
+ assertTrue(env.containsKey("CLASSPATH"));
+ String[] paths = env.get("CLASSPATH").split(":");
+ assertEquals(2, paths.length);
+ Arrays.sort(paths);
+ assertEquals("$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*", paths[0]);
+ assertEquals("$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*", paths[1]);
+ }
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 3a23cbf..c75911e 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 4.3.0 release (trunk - unreleased)
+OOZIE-2590 OYA: Create basic Oozie Launcher Application Master (rkanter)
OOZIE-2316 Drop support for Hadoop 1 and 0.23 (asasvari via rkanter)
OOZIE-2503 show ChildJobURLs to spark action (satishsaley via puru)
OOZIE-2551 Feature request: epoch timestamp generation (jtolar via puru)
http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/sharelib/distcp/pom.xml
----------------------------------------------------------------------
diff --git a/sharelib/distcp/pom.xml b/sharelib/distcp/pom.xml
index c8cc47c..cb01faa 100644
--- a/sharelib/distcp/pom.xml
+++ b/sharelib/distcp/pom.xml
@@ -91,18 +91,6 @@
<outputFile>${project.build.directory}/classpath</outputFile>
</configuration>
</execution>
- <execution>
- <id>create-mrapp-generated-classpath</id>
- <phase>generate-test-resources</phase>
- <goals>
- <goal>build-classpath</goal>
- </goals>
- <configuration>
- <!-- needed to run the unit test for DS to generate the required classpath
- that is required in the env of the launch container in the mini mr/yarn cluster -->
- <outputFile>${project.build.directory}/test-classes/mrapp-generated-classpath</outputFile>
- </configuration>
- </execution>
</executions>
</plugin>
<plugin>
http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/sharelib/hcatalog/pom.xml
----------------------------------------------------------------------
diff --git a/sharelib/hcatalog/pom.xml b/sharelib/hcatalog/pom.xml
index 2b0c504..f4273a5 100644
--- a/sharelib/hcatalog/pom.xml
+++ b/sharelib/hcatalog/pom.xml
@@ -297,18 +297,6 @@
<outputFile>${project.build.directory}/classpath</outputFile>
</configuration>
</execution>
- <execution>
- <id>create-mrapp-generated-classpath</id>
- <phase>generate-test-resources</phase>
- <goals>
- <goal>build-classpath</goal>
- </goals>
- <configuration>
- <!-- needed to run the unit test for DS to generate the required classpath
- that is required in the env of the launch container in the mini mr/yarn cluster -->
- <outputFile>${project.build.directory}/test-classes/mrapp-generated-classpath</outputFile>
- </configuration>
- </execution>
</executions>
</plugin>
<plugin>
http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/sharelib/hive/pom.xml
----------------------------------------------------------------------
diff --git a/sharelib/hive/pom.xml b/sharelib/hive/pom.xml
index d10d7b8..ba49403 100644
--- a/sharelib/hive/pom.xml
+++ b/sharelib/hive/pom.xml
@@ -171,18 +171,6 @@
<outputFile>${project.build.directory}/classpath</outputFile>
</configuration>
</execution>
- <execution>
- <id>create-mrapp-generated-classpath</id>
- <phase>generate-test-resources</phase>
- <goals>
- <goal>build-classpath</goal>
- </goals>
- <configuration>
- <!-- needed to run the unit test for DS to generate the required classpath
- that is required in the env of the launch container in the mini mr/yarn cluster -->
- <outputFile>${project.build.directory}/test-classes/mrapp-generated-classpath</outputFile>
- </configuration>
- </execution>
</executions>
</plugin>
<plugin>
http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/sharelib/hive2/pom.xml
----------------------------------------------------------------------
diff --git a/sharelib/hive2/pom.xml b/sharelib/hive2/pom.xml
index ce967c5..329832d 100644
--- a/sharelib/hive2/pom.xml
+++ b/sharelib/hive2/pom.xml
@@ -152,18 +152,6 @@
<outputFile>${project.build.directory}/classpath</outputFile>
</configuration>
</execution>
- <execution>
- <id>create-mrapp-generated-classpath</id>
- <phase>generate-test-resources</phase>
- <goals>
- <goal>build-classpath</goal>
- </goals>
- <configuration>
- <!-- needed to run the unit test for DS to generate the required classpath
- that is required in the env of the launch container in the mini mr/yarn cluster -->
- <outputFile>${project.build.directory}/test-classes/mrapp-generated-classpath</outputFile>
- </configuration>
- </execution>
</executions>
</plugin>
<plugin>
http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/sharelib/oozie/pom.xml
----------------------------------------------------------------------
diff --git a/sharelib/oozie/pom.xml b/sharelib/oozie/pom.xml
index dd95b45..b2da4e2 100644
--- a/sharelib/oozie/pom.xml
+++ b/sharelib/oozie/pom.xml
@@ -85,18 +85,6 @@
<outputFile>${project.build.directory}/classpath</outputFile>
</configuration>
</execution>
- <execution>
- <id>create-mrapp-generated-classpath</id>
- <phase>generate-test-resources</phase>
- <goals>
- <goal>build-classpath</goal>
- </goals>
- <configuration>
- <!-- needed to run the unit test for DS to generate the required classpath
- that is required in the env of the launch container in the mini mr/yarn cluster -->
- <outputFile>${project.build.directory}/test-classes/mrapp-generated-classpath</outputFile>
- </configuration>
- </execution>
</executions>
</plugin>
<plugin>
[3/3] oozie git commit: OOZIE-2590 OYA: Create basic Oozie Launcher
Application Master (rkanter)
Posted by rk...@apache.org.
OOZIE-2590 OYA: Create basic Oozie Launcher Application Master (rkanter)
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/fea512cf
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/fea512cf
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/fea512cf
Branch: refs/heads/oya
Commit: fea512cf66aec92d867e13c200978fd103868ab1
Parents: a37835f
Author: Robert Kanter <rk...@cloudera.com>
Authored: Mon Jul 25 18:24:35 2016 -0700
Committer: Robert Kanter <rk...@cloudera.com>
Committed: Mon Jul 25 18:24:35 2016 -0700
----------------------------------------------------------------------
core/pom.xml | 17 -
.../action/hadoop/DistcpActionExecutor.java | 7 +-
.../action/hadoop/Hive2ActionExecutor.java | 5 +-
.../oozie/action/hadoop/HiveActionExecutor.java | 5 +-
.../oozie/action/hadoop/JavaActionExecutor.java | 593 ++++++++---------
.../action/hadoop/LauncherMapperHelper.java | 12 -
.../action/hadoop/SparkActionExecutor.java | 5 +-
.../action/hadoop/SqoopActionExecutor.java | 9 +-
.../oozie/service/HadoopAccessorService.java | 97 ++-
.../org/apache/oozie/util/ClasspathUtils.java | 145 +++++
core/src/main/resources/oozie-default.xml | 25 -
.../java/org/apache/oozie/QueryServlet.java | 40 ++
.../action/hadoop/TestJavaActionExecutor.java | 531 ++--------------
.../oozie/action/hadoop/TestLauncherAM.java | 46 ++
.../hadoop/TestLauncherAMCallbackNotifier.java | 170 +++++
.../action/hadoop/TestPrepareActionsDriver.java | 15 +-
.../action/hadoop/TestShellActionExecutor.java | 21 -
.../apache/oozie/command/wf/HangServlet.java | 19 +-
.../oozie/service/TestConfigurationService.java | 3 -
.../service/TestHadoopAccessorService.java | 121 +++-
.../java/org/apache/oozie/test/XTestCase.java | 4 +
.../apache/oozie/util/TestClasspathUtils.java | 110 ++++
release-log.txt | 1 +
sharelib/distcp/pom.xml | 12 -
sharelib/hcatalog/pom.xml | 12 -
sharelib/hive/pom.xml | 12 -
sharelib/hive2/pom.xml | 12 -
sharelib/oozie/pom.xml | 12 -
.../apache/oozie/action/hadoop/LauncherAM.java | 636 +++++++++++++++++++
.../hadoop/LauncherAMCallbackNotifier.java | 175 +++++
.../oozie/action/hadoop/LauncherMapper.java | 6 +-
.../action/hadoop/PrepareActionsDriver.java | 43 +-
sharelib/pig/pom.xml | 12 -
sharelib/spark/pom.xml | 12 -
sharelib/sqoop/pom.xml | 12 -
sharelib/streaming/pom.xml | 33 -
36 files changed, 1899 insertions(+), 1091 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
index 6584af8..86feea0 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -453,23 +453,6 @@
</configuration>
</plugin>
<plugin>
- <artifactId>maven-dependency-plugin</artifactId>
- <executions>
- <execution>
- <id>create-mrapp-generated-classpath</id>
- <phase>generate-test-resources</phase>
- <goals>
- <goal>build-classpath</goal>
- </goals>
- <configuration>
- <!-- needed to run the unit test for DS to generate the required classpath
- that is required in the env of the launch container in the mini mr/yarn cluster -->
- <outputFile>${project.build.directory}/test-classes/mrapp-generated-classpath</outputFile>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <plugin>
<groupId>org.apache.openjpa</groupId>
<artifactId>openjpa-maven-plugin</artifactId>
<executions>
http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/main/java/org/apache/oozie/action/hadoop/DistcpActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/DistcpActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/DistcpActionExecutor.java
index 96726da..99652e8 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/DistcpActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/DistcpActionExecutor.java
@@ -26,13 +26,10 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.RunningJob;
import org.apache.oozie.action.ActionExecutorException;
-import org.apache.oozie.action.ActionExecutor.Context;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.service.ConfigurationService;
import org.apache.oozie.service.HadoopAccessorException;
-import org.apache.oozie.service.Services;
import org.apache.oozie.util.XLog;
import org.jdom.Element;
import org.jdom.JDOMException;
@@ -126,9 +123,9 @@ public class DistcpActionExecutor extends JavaActionExecutor{
}
@Override
- protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context)
+ protected void getActionData(FileSystem actionFs, WorkflowAction action, Context context)
throws HadoopAccessorException, JDOMException, IOException, URISyntaxException {
- super.getActionData(actionFs, runningJob, action, context);
+ super.getActionData(actionFs, action, context);
readExternalChildIDs(action, context);
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/main/java/org/apache/oozie/action/hadoop/Hive2ActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/Hive2ActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/Hive2ActionExecutor.java
index b5b1bf9..9ba6318 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/Hive2ActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/Hive2ActionExecutor.java
@@ -28,7 +28,6 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.RunningJob;
import org.apache.oozie.action.ActionExecutorException;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.service.HadoopAccessorException;
@@ -134,9 +133,9 @@ public class Hive2ActionExecutor extends ScriptLanguageActionExecutor {
}
@Override
- protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context)
+ protected void getActionData(FileSystem actionFs, WorkflowAction action, Context context)
throws HadoopAccessorException, JDOMException, IOException, URISyntaxException {
- super.getActionData(actionFs, runningJob, action, context);
+ super.getActionData(actionFs, action, context);
readExternalChildIDs(action, context);
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/main/java/org/apache/oozie/action/hadoop/HiveActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/HiveActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/HiveActionExecutor.java
index c74e9e6..a850957 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/HiveActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/HiveActionExecutor.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RunningJob;
import org.apache.oozie.action.ActionExecutorException;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.XOozieClient;
@@ -125,9 +124,9 @@ public class HiveActionExecutor extends ScriptLanguageActionExecutor {
}
@Override
- protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context)
+ protected void getActionData(FileSystem actionFs, WorkflowAction action, Context context)
throws HadoopAccessorException, JDOMException, IOException, URISyntaxException {
- super.getActionData(actionFs, runningJob, action, context);
+ super.getActionData(actionFs, action, context);
readExternalChildIDs(action, context);
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
index 99e3344..d573fc3 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
@@ -18,42 +18,38 @@
package org.apache.oozie.action.hadoop;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.StringReader;
-import java.net.ConnectException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.UnknownHostException;
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.Set;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
+import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.AccessControlException;
+import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.DiskChecker;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.action.ActionExecutor;
@@ -69,18 +65,41 @@ import org.apache.oozie.service.Services;
import org.apache.oozie.service.ShareLibService;
import org.apache.oozie.service.URIHandlerService;
import org.apache.oozie.service.WorkflowAppService;
+import org.apache.oozie.util.ClasspathUtils;
import org.apache.oozie.util.ELEvaluationException;
import org.apache.oozie.util.ELEvaluator;
-import org.apache.oozie.util.XLog;
-import org.apache.oozie.util.XConfiguration;
-import org.apache.oozie.util.XmlUtils;
import org.apache.oozie.util.JobUtils;
import org.apache.oozie.util.LogUtils;
import org.apache.oozie.util.PropertiesUtils;
+import org.apache.oozie.util.XConfiguration;
+import org.apache.oozie.util.XLog;
+import org.apache.oozie.util.XmlUtils;
import org.jdom.Element;
import org.jdom.JDOMException;
import org.jdom.Namespace;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.StringReader;
+import java.net.ConnectException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
public class JavaActionExecutor extends ActionExecutor {
@@ -94,7 +113,6 @@ public class JavaActionExecutor extends ActionExecutor {
public static final String ACL_VIEW_JOB = "mapreduce.job.acl-view-job";
public static final String ACL_MODIFY_JOB = "mapreduce.job.acl-modify-job";
public static final String HADOOP_YARN_TIMELINE_SERVICE_ENABLED = "yarn.timeline-service.enabled";
- public static final String HADOOP_YARN_UBER_MODE = "mapreduce.job.ubertask.enable";
public static final String HADOOP_YARN_KILL_CHILD_JOBS_ON_AMRESTART = "oozie.action.launcher.am.restart.kill.childjobs";
public static final String HADOOP_MAP_MEMORY_MB = "mapreduce.map.memory.mb";
public static final String HADOOP_CHILD_JAVA_OPTS = "mapred.child.java.opts";
@@ -117,7 +135,6 @@ public class JavaActionExecutor extends ActionExecutor {
protected XLog LOG = XLog.getLog(getClass());
private static final Pattern heapPattern = Pattern.compile("-Xmx(([0-9]+)[mMgG])");
private static final String JAVA_TMP_DIR_SETTINGS = "-Djava.io.tmpdir=";
- public static final String CONF_HADOOP_YARN_UBER_MODE = "oozie.action.launcher." + HADOOP_YARN_UBER_MODE;
public static final String HADOOP_JOB_CLASSLOADER = "mapreduce.job.classloader";
public static final String HADOOP_USER_CLASSPATH_FIRST = "mapreduce.user.classpath.first";
public static final String OOZIE_CREDENTIALS_SKIP = "oozie.credentials.skip";
@@ -138,10 +155,11 @@ public class JavaActionExecutor extends ActionExecutor {
public static List<Class> getCommonLauncherClasses() {
List<Class> classes = new ArrayList<Class>();
- classes.add(LauncherMapper.class);
classes.add(OozieLauncherInputFormat.class);
classes.add(LauncherMain.class);
classes.addAll(Services.get().get(URIHandlerService.class).getClassesForLauncher());
+ classes.add(LauncherAM.class);
+ classes.add(LauncherAMCallbackNotifier.class);
return classes;
}
@@ -159,7 +177,7 @@ public class JavaActionExecutor extends ActionExecutor {
@Override
public void initActionType() {
super.initActionType();
- maxActionOutputLen = ConfigurationService.getInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA);
+ maxActionOutputLen = ConfigurationService.getInt(LauncherAM.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA);
//Get the limit for the maximum allowed size of action stats
maxExternalStatsSize = ConfigurationService.getInt(JavaActionExecutor.MAX_EXTERNAL_STATS_SIZE);
maxExternalStatsSize = (maxExternalStatsSize == -1) ? Integer.MAX_VALUE : maxExternalStatsSize;
@@ -257,8 +275,6 @@ public class JavaActionExecutor extends ActionExecutor {
} catch (URISyntaxException ex) {
throw convertException(ex);
}
- // Inject use uber mode for launcher
- injectLauncherUseUberMode(launcherConf);
XConfiguration.copy(launcherConf, conf);
checkForDisallowedProps(launcherConf, "launcher configuration");
// Inject config-class for launcher to use for action
@@ -273,25 +289,6 @@ public class JavaActionExecutor extends ActionExecutor {
}
}
- void injectLauncherUseUberMode(Configuration launcherConf) {
- // Set Uber Mode for the launcher (YARN only, ignored by MR1)
- // Priority:
- // 1. action's <configuration>
- // 2. oozie.action.#action-type#.launcher.mapreduce.job.ubertask.enable
- // 3. oozie.action.launcher.mapreduce.job.ubertask.enable
- if (launcherConf.get(HADOOP_YARN_UBER_MODE) == null) {
- if (ConfigurationService.get("oozie.action." + getType() + ".launcher." + HADOOP_YARN_UBER_MODE).length() > 0) {
- if (ConfigurationService.getBoolean("oozie.action." + getType() + ".launcher." + HADOOP_YARN_UBER_MODE)) {
- launcherConf.setBoolean(HADOOP_YARN_UBER_MODE, true);
- }
- } else {
- if (ConfigurationService.getBoolean("oozie.action.launcher." + HADOOP_YARN_UBER_MODE)) {
- launcherConf.setBoolean(HADOOP_YARN_UBER_MODE, true);
- }
- }
- }
- }
-
void injectLauncherTimelineServiceEnabled(Configuration launcherConf, Configuration actionConf) {
// Getting delegation token for ATS. If tez-site.xml is present in distributed cache, turn on timeline service.
if (actionConf.get("oozie.launcher." + HADOOP_YARN_TIMELINE_SERVICE_ENABLED) == null
@@ -303,104 +300,6 @@ public class JavaActionExecutor extends ActionExecutor {
}
}
- void updateConfForUberMode(Configuration launcherConf) {
-
- // child.env
- boolean hasConflictEnv = false;
- String launcherMapEnv = launcherConf.get(HADOOP_MAP_JAVA_ENV);
- if (launcherMapEnv == null) {
- launcherMapEnv = launcherConf.get(HADOOP_CHILD_JAVA_ENV);
- }
- String amEnv = launcherConf.get(YARN_AM_ENV);
- StringBuffer envStr = new StringBuffer();
- HashMap<String, List<String>> amEnvMap = null;
- HashMap<String, List<String>> launcherMapEnvMap = null;
- if (amEnv != null) {
- envStr.append(amEnv);
- amEnvMap = populateEnvMap(amEnv);
- }
- if (launcherMapEnv != null) {
- launcherMapEnvMap = populateEnvMap(launcherMapEnv);
- if (amEnvMap != null) {
- Iterator<String> envKeyItr = launcherMapEnvMap.keySet().iterator();
- while (envKeyItr.hasNext()) {
- String envKey = envKeyItr.next();
- if (amEnvMap.containsKey(envKey)) {
- List<String> amValList = amEnvMap.get(envKey);
- List<String> launcherValList = launcherMapEnvMap.get(envKey);
- Iterator<String> valItr = launcherValList.iterator();
- while (valItr.hasNext()) {
- String val = valItr.next();
- if (!amValList.contains(val)) {
- hasConflictEnv = true;
- break;
- }
- else {
- valItr.remove();
- }
- }
- if (launcherValList.isEmpty()) {
- envKeyItr.remove();
- }
- }
- }
- }
- }
- if (hasConflictEnv) {
- launcherConf.setBoolean(HADOOP_YARN_UBER_MODE, false);
- }
- else {
- if (launcherMapEnvMap != null) {
- for (String key : launcherMapEnvMap.keySet()) {
- List<String> launcherValList = launcherMapEnvMap.get(key);
- for (String val : launcherValList) {
- if (envStr.length() > 0) {
- envStr.append(",");
- }
- envStr.append(key).append("=").append(val);
- }
- }
- }
-
- launcherConf.set(YARN_AM_ENV, envStr.toString());
-
- // memory.mb
- int launcherMapMemoryMB = launcherConf.getInt(HADOOP_MAP_MEMORY_MB, 1536);
- int amMemoryMB = launcherConf.getInt(YARN_AM_RESOURCE_MB, 1536);
- // YARN_MEMORY_MB_MIN to provide buffer.
- // suppose launcher map aggressively use high memory, need some
- // headroom for AM
- int memoryMB = Math.max(launcherMapMemoryMB, amMemoryMB) + YARN_MEMORY_MB_MIN;
- // limit to 4096 in case of 32 bit
- if (launcherMapMemoryMB < 4096 && amMemoryMB < 4096 && memoryMB > 4096) {
- memoryMB = 4096;
- }
- launcherConf.setInt(YARN_AM_RESOURCE_MB, memoryMB);
-
- // We already made mapred.child.java.opts and
- // mapreduce.map.java.opts equal, so just start with one of them
- String launcherMapOpts = launcherConf.get(HADOOP_MAP_JAVA_OPTS, "");
- String amChildOpts = launcherConf.get(YARN_AM_COMMAND_OPTS);
- StringBuilder optsStr = new StringBuilder();
- int heapSizeForMap = extractHeapSizeMB(launcherMapOpts);
- int heapSizeForAm = extractHeapSizeMB(amChildOpts);
- int heapSize = Math.max(heapSizeForMap, heapSizeForAm) + YARN_MEMORY_MB_MIN;
- // limit to 3584 in case of 32 bit
- if (heapSizeForMap < 4096 && heapSizeForAm < 4096 && heapSize > 3584) {
- heapSize = 3584;
- }
- if (amChildOpts != null) {
- optsStr.append(amChildOpts);
- }
- optsStr.append(" ").append(launcherMapOpts.trim());
- if (heapSize > 0) {
- // append calculated total heap size to the end
- optsStr.append(" ").append("-Xmx").append(heapSize).append("m");
- }
- launcherConf.set(YARN_AM_COMMAND_OPTS, optsStr.toString().trim());
- }
- }
-
void updateConfForJavaTmpDir(Configuration conf) {
String amChildOpts = conf.get(YARN_AM_COMMAND_OPTS);
String oozieJavaTmpDirSetting = "-Djava.io.tmpdir=./tmp";
@@ -868,7 +767,7 @@ public class JavaActionExecutor extends ActionExecutor {
protected String getLauncherMain(Configuration launcherConf, Element actionXml) {
- return launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, JavaMain.class.getName());
+ return launcherConf.get(LauncherAM.CONF_OOZIE_ACTION_MAIN_CLASS, JavaMain.class.getName());
}
private void setJavaMain(Configuration actionConf, Element actionXml) {
@@ -1004,15 +903,6 @@ public class JavaActionExecutor extends ActionExecutor {
launcherJobConf.set(HADOOP_CHILD_JAVA_OPTS, opts.toString().trim());
launcherJobConf.set(HADOOP_MAP_JAVA_OPTS, opts.toString().trim());
- // setting for uber mode
- if (launcherJobConf.getBoolean(HADOOP_YARN_UBER_MODE, false)) {
- if (checkPropertiesToDisableUber(launcherJobConf)) {
- launcherJobConf.setBoolean(HADOOP_YARN_UBER_MODE, false);
- }
- else {
- updateConfForUberMode(launcherJobConf);
- }
- }
updateConfForJavaTmpDir(launcherJobConf);
injectLauncherTimelineServiceEnabled(launcherJobConf, actionConf);
@@ -1027,23 +917,9 @@ public class JavaActionExecutor extends ActionExecutor {
}
}
- private boolean checkPropertiesToDisableUber(Configuration launcherConf) {
- boolean disable = false;
- if (launcherConf.getBoolean(HADOOP_JOB_CLASSLOADER, false)) {
- disable = true;
- }
- else if (launcherConf.getBoolean(HADOOP_USER_CLASSPATH_FIRST, false)) {
- disable = true;
- }
- return disable;
- }
-
private void injectCallback(Context context, Configuration conf) {
- String callback = context.getCallbackUrl("$jobStatus");
- if (conf.get("job.end.notification.url") != null) {
- LOG.warn("Overriding the action job end notification URI");
- }
- conf.set("job.end.notification.url", callback);
+ String callback = context.getCallbackUrl(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_JOBSTATUS_TOKEN);
+ conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_URL, callback);
}
void injectActionCallback(Context context, Configuration actionConf) {
@@ -1062,7 +938,7 @@ public class JavaActionExecutor extends ActionExecutor {
}
}
- public void submitLauncher(FileSystem actionFs, Context context, WorkflowAction action) throws ActionExecutorException {
+ public void submitLauncher(FileSystem actionFs, final Context context, WorkflowAction action) throws ActionExecutorException {
JobClient jobClient = null;
boolean exception = false;
try {
@@ -1119,14 +995,17 @@ public class JavaActionExecutor extends ActionExecutor {
}
}
}
-
JobConf launcherJobConf = createLauncherConf(actionFs, context, action, actionXml, actionConf);
- LOG.debug("Creating Job Client for action " + action.getId());
- jobClient = createJobClient(context, launcherJobConf);
- String launcherId = LauncherMapperHelper.getRecoveryId(launcherJobConf, context.getActionDir(), context
- .getRecoveryId());
- boolean alreadyRunning = launcherId != null;
+ boolean alreadyRunning = false;
+ String launcherId = null;
+ String consoleUrl = null;
+ // TODO: OYA: equivalent of this? (recovery, alreadyRunning) When does this happen?
+// LOG.debug("Creating Job Client for action " + action.getId());
+// jobClient = createJobClient(context, launcherJobConf);
+// launcherId = LauncherMapperHelper.getRecoveryId(launcherJobConf, context.getActionDir(), context
+// .getRecoveryId());
+// alreadyRunning = launcherId != null;
RunningJob runningJob;
// if user-retry is on, always submit new launcher
@@ -1141,13 +1020,13 @@ public class JavaActionExecutor extends ActionExecutor {
}
}
else {
- LOG.debug("Submitting the job through Job Client for action " + action.getId());
-
- // setting up propagation of the delegation token.
- HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
- Token<DelegationTokenIdentifier> mrdt = jobClient.getDelegationToken(has
- .getMRDelegationTokenRenewer(launcherJobConf));
- launcherJobConf.getCredentials().addToken(HadoopAccessorService.MR_TOKEN_ALIAS, mrdt);
+ // TODO: OYA: do we actually need an MR token? IIRC, it's issued by the JHS
+// // setting up propagation of the delegation token.
+// Token<DelegationTokenIdentifier> mrdt = null;
+// HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
+// mrdt = jobClient.getDelegationToken(has
+// .getMRDelegationTokenRenewer(launcherJobConf));
+// launcherJobConf.getCredentials().addToken(HadoopAccessorService.MR_TOKEN_ALIAS, mrdt);
// insert credentials tokens to launcher job conf if needed
if (needInjectCredentials() && credentialsConf != null) {
@@ -1173,17 +1052,36 @@ public class JavaActionExecutor extends ActionExecutor {
else {
LOG.info("No need to inject credentials.");
}
- runningJob = jobClient.submitJob(launcherJobConf);
- if (runningJob == null) {
- throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA017",
- "Error submitting launcher for action [{0}]", action.getId());
+
+ YarnClient yarnClient = null;
+ try {
+ String user = context.getWorkflow().getUser();
+
+ // Create application
+ yarnClient = createYarnClient(context, launcherJobConf);
+ YarnClientApplication newApp = yarnClient.createApplication();
+ ApplicationId appId = newApp.getNewApplicationResponse().getApplicationId();
+
+ // Create launch context for app master
+ ApplicationSubmissionContext appContext =
+ createAppSubmissionContext(appId, launcherJobConf, user, context, actionConf);
+
+ // Submit the launcher AM
+ yarnClient.submitApplication(appContext);
+
+ launcherId = appId.toString();
+ LOG.debug("After submission get the launcherId [{0}]", launcherId);
+ ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+ consoleUrl = appReport.getTrackingUrl();
+ } finally {
+ if (yarnClient != null) {
+ yarnClient.close();
+ yarnClient = null;
+ }
}
- launcherId = runningJob.getID().toString();
- LOG.debug("After submission get the launcherId " + launcherId);
}
String jobTracker = launcherJobConf.get(HADOOP_YARN_RM);
- String consoleUrl = runningJob.getTrackingURL();
context.setStartData(launcherId, jobTracker, consoleUrl);
}
catch (Exception ex) {
@@ -1206,6 +1104,91 @@ public class JavaActionExecutor extends ActionExecutor {
}
}
}
+
+ private ApplicationSubmissionContext createAppSubmissionContext(ApplicationId appId, JobConf launcherJobConf, String user,
+ Context context, Configuration actionConf)
+ throws IOException, HadoopAccessorException, URISyntaxException {
+ // Create launch context for app master
+ ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class);
+
+ // set the application id
+ appContext.setApplicationId(appId);
+
+ // set the application name
+ appContext.setApplicationName(launcherJobConf.getJobName());
+ appContext.setApplicationType("Oozie Launcher");
+
+ // Set the priority for the application master
+ Priority pri = Records.newRecord(Priority.class);
+ int priority = 0; // TODO: OYA: Add a constant or a config
+ pri.setPriority(priority);
+ appContext.setPriority(pri);
+
+ // Set the queue to which this application is to be submitted in the RM
+ appContext.setQueue(launcherJobConf.getQueueName());
+
+ // Set up the container launch context for the application master
+ ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
+
+ // Set the resources to localize
+ Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+ ClientDistributedCacheManager.determineTimestampsAndCacheVisibilities(launcherJobConf);
+ MRApps.setupDistributedCache(launcherJobConf, localResources);
+ // Add the Launcher and Action configs as Resources
+ HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
+ LocalResource launcherJobConfLR = has.createLocalResourceForConfigurationFile(LauncherAM.LAUNCHER_JOB_CONF_XML, user,
+ launcherJobConf, context.getAppFileSystem().getUri(), context.getActionDir());
+ localResources.put(LauncherAM.LAUNCHER_JOB_CONF_XML, launcherJobConfLR);
+ LocalResource actionConfLR = has.createLocalResourceForConfigurationFile(LauncherAM.ACTION_CONF_XML, user, actionConf,
+ context.getAppFileSystem().getUri(), context.getActionDir());
+ localResources.put(LauncherAM.ACTION_CONF_XML, actionConfLR);
+ amContainer.setLocalResources(localResources);
+
+ // Set the environment variables
+ Map<String, String> env = new HashMap<String, String>();
+ // This adds the Hadoop jars to the classpath in the Launcher JVM
+ ClasspathUtils.setupClasspath(env, launcherJobConf);
+ if (false) { // TODO: OYA: config to add MR jars? Probably also needed for MR Action
+ ClasspathUtils.addMapReduceToClasspath(env, launcherJobConf);
+ }
+ amContainer.setEnvironment(env);
+
+ // Set the command
+ List<String> vargs = new ArrayList<String>(6);
+ vargs.add(MRApps.crossPlatformifyMREnv(launcherJobConf, ApplicationConstants.Environment.JAVA_HOME)
+ + "/bin/java");
+ // TODO: OYA: remove attach debugger to AM; useful for debugging
+// vargs.add("-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005");
+ MRApps.addLog4jSystemProperties("INFO", 1024 * 1024, 0, vargs);
+ vargs.add(LauncherAM.class.getName());
+ vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
+ Path.SEPARATOR + ApplicationConstants.STDOUT);
+ vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
+ Path.SEPARATOR + ApplicationConstants.STDERR);
+ List<String> vargsFinal = new ArrayList<String>(6);
+ StringBuilder mergedCommand = new StringBuilder();
+ for (CharSequence str : vargs) {
+ mergedCommand.append(str).append(" ");
+ }
+ vargsFinal.add(mergedCommand.toString());
+ LOG.debug("Command to launch container for ApplicationMaster is : "
+ + mergedCommand);
+ amContainer.setCommands(vargsFinal);
+ appContext.setAMContainerSpec(amContainer);
+
+ // Set tokens
+ DataOutputBuffer dob = new DataOutputBuffer();
+ launcherJobConf.getCredentials().writeTokenStorageToStream(dob);
+ amContainer.setTokens(ByteBuffer.wrap(dob.getData(), 0, dob.getLength()));
+
+ // Set Resources
+ // TODO: OYA: make resources allocated for the AM configurable and choose good defaults (memory MB, vcores)
+ Resource resource = Resource.newInstance(2048, 1);
+ appContext.setResource(resource);
+
+ return appContext;
+ }
+
private boolean needInjectCredentials() {
boolean methodExists = true;
@@ -1409,6 +1392,19 @@ public class JavaActionExecutor extends ActionExecutor {
return Services.get().get(HadoopAccessorService.class).createJobClient(user, jobConf);
}
+ /**
+ * Create yarn client object
+ *
+ * @param context
+ * @param jobConf
+ * @return YarnClient
+ * @throws HadoopAccessorException
+ */
+ protected YarnClient createYarnClient(Context context, JobConf jobConf) throws HadoopAccessorException {
+ String user = context.getWorkflow().getUser();
+ return Services.get().get(HadoopAccessorService.class).createYarnClient(user, jobConf);
+ }
+
protected RunningJob getRunningJob(Context context, WorkflowAction action, JobClient jobClient) throws Exception{
RunningJob runningJob = jobClient.getJob(JobID.forName(action.getExternalId()));
return runningJob;
@@ -1425,129 +1421,112 @@ public class JavaActionExecutor extends ActionExecutor {
@Override
public void check(Context context, WorkflowAction action) throws ActionExecutorException {
- JobClient jobClient = null;
- boolean exception = false;
+ boolean fallback = false;
+ LOG = XLog.resetPrefix(LOG);
LogUtils.setLogInfo(action);
+ YarnClient yarnClient = null;
try {
Element actionXml = XmlUtils.parseXml(action.getConf());
- FileSystem actionFs = context.getAppFileSystem();
JobConf jobConf = createBaseHadoopConf(context, actionXml);
- jobClient = createJobClient(context, jobConf);
- RunningJob runningJob = getRunningJob(context, action, jobClient);
- if (runningJob == null) {
- context.setExecutionData(FAILED, null);
- throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017",
- "Could not lookup launched hadoop Job ID [{0}] which was associated with " +
- " action [{1}]. Failing this action!", getActualExternalId(action), action.getId());
- }
- if (runningJob.isComplete()) {
+ FileSystem actionFs = context.getAppFileSystem();
+ yarnClient = createYarnClient(context, jobConf);
+ FinalApplicationStatus appStatus = null;
+ try {
+ ApplicationReport appReport =
+ yarnClient.getApplicationReport(ConverterUtils.toApplicationId(action.getExternalId()));
+ YarnApplicationState appState = appReport.getYarnApplicationState();
+ if (appState == YarnApplicationState.FAILED || appState == YarnApplicationState.FINISHED
+ || appState == YarnApplicationState.KILLED) {
+ appStatus = appReport.getFinalApplicationStatus();
+ }
+
+ } catch (Exception ye) {
+ LOG.debug("Exception occurred while checking Launcher AM status; will try checking action data file instead ", ye);
+ // Fallback to action data file if we can't find the Launcher AM (maybe it got purged)
+ fallback = true;
+ }
+ if (appStatus != null || fallback) {
Path actionDir = context.getActionDir();
String newId = null;
// load sequence file into object
Map<String, String> actionData = LauncherMapperHelper.getActionData(actionFs, actionDir, jobConf);
- if (actionData.containsKey(LauncherMapper.ACTION_DATA_NEW_ID)) {
- newId = actionData.get(LauncherMapper.ACTION_DATA_NEW_ID);
- String launcherId = action.getExternalId();
- runningJob = jobClient.getJob(JobID.forName(newId));
- if (runningJob == null) {
- context.setExternalStatus(FAILED);
+ if (fallback) {
+ String finalStatus = actionData.get(LauncherAM.ACTION_DATA_FINAL_STATUS);
+ if (finalStatus != null) {
+ appStatus = FinalApplicationStatus.valueOf(finalStatus);
+ } else {
+ context.setExecutionData(FAILED, null);
throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017",
- "Unknown hadoop job [{0}] associated with action [{1}]. Failing this action!", newId,
- action.getId());
+ "Unknown hadoop job [{0}] associated with action [{1}] and couldn't determine status from" +
+ " action data. Failing this action!", action.getExternalId(), action.getId());
}
- context.setExternalChildIDs(newId);
- LOG.info(XLog.STD, "External ID swap, old ID [{0}] new ID [{1}]", launcherId,
- newId);
}
- else {
- String externalIDs = actionData.get(LauncherMapper.ACTION_DATA_EXTERNAL_CHILD_IDS);
- if (externalIDs != null) {
- context.setExternalChildIDs(externalIDs);
- LOG.info(XLog.STD, "Hadoop Jobs launched : [{0}]", externalIDs);
- }
+ String externalIDs = actionData.get(LauncherAM.ACTION_DATA_EXTERNAL_CHILD_IDS);
+ if (externalIDs != null) {
+ context.setExternalChildIDs(externalIDs);
+ LOG.info(XLog.STD, "Hadoop Jobs launched : [{0}]", externalIDs);
}
- if (runningJob.isComplete()) {
- // fetching action output and stats for the Map-Reduce action.
- if (newId != null) {
- actionData = LauncherMapperHelper.getActionData(actionFs, context.getActionDir(), jobConf);
+ LOG.info(XLog.STD, "action completed, external ID [{0}]",
+ action.getExternalId());
+ context.setExecutionData(appStatus.toString(), null);
+ if (appStatus == FinalApplicationStatus.SUCCEEDED) {
+ if (getCaptureOutput(action) && LauncherMapperHelper.hasOutputData(actionData)) {
+ context.setExecutionData(SUCCEEDED, PropertiesUtils.stringToProperties(actionData
+ .get(LauncherAM.ACTION_DATA_OUTPUT_PROPS)));
+ LOG.info(XLog.STD, "action produced output");
}
- LOG.info(XLog.STD, "action completed, external ID [{0}]",
- action.getExternalId());
- if (LauncherMapperHelper.isMainSuccessful(runningJob)) {
- if (getCaptureOutput(action) && LauncherMapperHelper.hasOutputData(actionData)) {
- context.setExecutionData(SUCCEEDED, PropertiesUtils.stringToProperties(actionData
- .get(LauncherMapper.ACTION_DATA_OUTPUT_PROPS)));
- LOG.info(XLog.STD, "action produced output");
+ else {
+ context.setExecutionData(SUCCEEDED, null);
+ }
+ if (LauncherMapperHelper.hasStatsData(actionData)) {
+ context.setExecutionStats(actionData.get(LauncherAM.ACTION_DATA_STATS));
+ LOG.info(XLog.STD, "action produced stats");
+ }
+ getActionData(actionFs, action, context);
+ }
+ else {
+ String errorReason;
+ if (actionData.containsKey(LauncherAM.ACTION_DATA_ERROR_PROPS)) {
+ Properties props = PropertiesUtils.stringToProperties(actionData
+ .get(LauncherAM.ACTION_DATA_ERROR_PROPS));
+ String errorCode = props.getProperty("error.code");
+ if ("0".equals(errorCode)) {
+ errorCode = "JA018";
}
- else {
- context.setExecutionData(SUCCEEDED, null);
+ if ("-1".equals(errorCode)) {
+ errorCode = "JA019";
}
- if (LauncherMapperHelper.hasStatsData(actionData)) {
- context.setExecutionStats(actionData.get(LauncherMapper.ACTION_DATA_STATS));
- LOG.info(XLog.STD, "action produced stats");
+ errorReason = props.getProperty("error.reason");
+ LOG.warn("Launcher ERROR, reason: {0}", errorReason);
+ String exMsg = props.getProperty("exception.message");
+ String errorInfo = (exMsg != null) ? exMsg : errorReason;
+ context.setErrorInfo(errorCode, errorInfo);
+ String exStackTrace = props.getProperty("exception.stacktrace");
+ if (exMsg != null) {
+ LOG.warn("Launcher exception: {0}{E}{1}", exMsg, exStackTrace);
}
- getActionData(actionFs, runningJob, action, context);
}
else {
- String errorReason;
- if (actionData.containsKey(LauncherMapper.ACTION_DATA_ERROR_PROPS)) {
- Properties props = PropertiesUtils.stringToProperties(actionData
- .get(LauncherMapper.ACTION_DATA_ERROR_PROPS));
- String errorCode = props.getProperty("error.code");
- if ("0".equals(errorCode)) {
- errorCode = "JA018";
- }
- if ("-1".equals(errorCode)) {
- errorCode = "JA019";
- }
- errorReason = props.getProperty("error.reason");
- LOG.warn("Launcher ERROR, reason: {0}", errorReason);
- String exMsg = props.getProperty("exception.message");
- String errorInfo = (exMsg != null) ? exMsg : errorReason;
- context.setErrorInfo(errorCode, errorInfo);
- String exStackTrace = props.getProperty("exception.stacktrace");
- if (exMsg != null) {
- LOG.warn("Launcher exception: {0}{E}{1}", exMsg, exStackTrace);
- }
- }
- else {
- errorReason = XLog.format("LauncherMapper died, check Hadoop LOG for job [{0}:{1}]", action
- .getTrackerUri(), action.getExternalId());
- LOG.warn(errorReason);
- }
- context.setExecutionData(FAILED_KILLED, null);
+ errorReason = XLog.format("Launcher AM died, check Hadoop LOG for job [{0}:{1}]", action
+ .getTrackerUri(), action.getExternalId());
+ LOG.warn(errorReason);
}
- }
- else {
- context.setExternalStatus("RUNNING");
- LOG.info(XLog.STD, "checking action, hadoop job ID [{0}] status [RUNNING]",
- runningJob.getID());
+ context.setExecutionData(FAILED_KILLED, null);
}
}
else {
- context.setExternalStatus("RUNNING");
+ context.setExternalStatus(YarnApplicationState.RUNNING.toString());
LOG.info(XLog.STD, "checking action, hadoop job ID [{0}] status [RUNNING]",
- runningJob.getID());
+ action.getExternalId());
}
}
catch (Exception ex) {
LOG.warn("Exception in check(). Message[{0}]", ex.getMessage(), ex);
- exception = true;
throw convertException(ex);
}
finally {
- if (jobClient != null) {
- try {
- jobClient.close();
- }
- catch (Exception e) {
- if (exception) {
- LOG.error("JobClient error: ", e);
- }
- else {
- throw convertException(e);
- }
- }
+ if (yarnClient != null) {
+ IOUtils.closeQuietly(yarnClient);
}
}
}
@@ -1555,14 +1534,12 @@ public class JavaActionExecutor extends ActionExecutor {
/**
* Get the output data of an action. Subclasses should override this method
* to get action specific output data.
- *
* @param actionFs the FileSystem object
- * @param runningJob the runningJob
* @param action the Workflow action
* @param context executor context
*
*/
- protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context)
+ protected void getActionData(FileSystem actionFs, WorkflowAction action, Context context)
throws HadoopAccessorException, JDOMException, IOException, URISyntaxException {
}
@@ -1585,38 +1562,28 @@ public class JavaActionExecutor extends ActionExecutor {
@Override
public void kill(Context context, WorkflowAction action) throws ActionExecutorException {
- JobClient jobClient = null;
- boolean exception = false;
+ YarnClient yarnClient = null;
try {
Element actionXml = XmlUtils.parseXml(action.getConf());
+ String user = context.getWorkflow().getUser();
JobConf jobConf = createBaseHadoopConf(context, actionXml);
- jobClient = createJobClient(context, jobConf);
- RunningJob runningJob = getRunningJob(context, action, jobClient);
- if (runningJob != null) {
- runningJob.killJob();
- }
+ yarnClient = createYarnClient(context, jobConf);
+ yarnClient.killApplication(ConverterUtils.toApplicationId(action.getExternalId()));
context.setExternalStatus(KILLED);
context.setExecutionData(KILLED, null);
- }
- catch (Exception ex) {
- exception = true;
+ } catch (Exception ex) {
+ LOG.error("Error: ", ex);
throw convertException(ex);
- }
- finally {
+ } finally {
try {
FileSystem actionFs = context.getAppFileSystem();
cleanUpActionDir(actionFs, context);
- if (jobClient != null) {
- jobClient.close();
- }
- }
- catch (Exception ex) {
- if (exception) {
- LOG.error("Error: ", ex);
- }
- else {
- throw convertException(ex);
+ if (yarnClient != null) {
+ yarnClient.close();
}
+ } catch (Exception ex) {
+ LOG.error("Error: ", ex);
+ throw convertException(ex);
}
}
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java b/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java
index 69e1044..07d1262 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java
@@ -145,18 +145,6 @@ public class LauncherMapperHelper {
launcherConf.setBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", true);
}
- FileSystem fs =
- Services.get().get(HadoopAccessorService.class).createFileSystem(launcherConf.get("user.name"),
- actionDir.toUri(), launcherConf);
- fs.mkdirs(actionDir);
-
- OutputStream os = fs.create(new Path(actionDir, LauncherMapper.ACTION_CONF_XML));
- try {
- actionConf.writeXml(os);
- } finally {
- IOUtils.closeSafely(os);
- }
-
launcherConf.setInputFormat(OozieLauncherInputFormat.class);
launcherConf.set("mapred.output.dir", new Path(actionDir, "output").toString());
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java
index 252f461..6a41235 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RunningJob;
import org.apache.oozie.action.ActionExecutorException;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.service.HadoopAccessorException;
@@ -157,9 +156,9 @@ public class SparkActionExecutor extends JavaActionExecutor {
}
@Override
- protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context)
+ protected void getActionData(FileSystem actionFs, WorkflowAction action, Context context)
throws HadoopAccessorException, JDOMException, IOException, URISyntaxException {
- super.getActionData(actionFs, runningJob, action, context);
+ super.getActionData(actionFs, action, context);
readExternalChildIDs(action, context);
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java
index 6813a37..82e5f0c 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java
@@ -23,7 +23,6 @@ import java.io.StringReader;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
-import java.util.Properties;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
@@ -232,17 +231,15 @@ public class SqoopActionExecutor extends JavaActionExecutor {
/**
* Get the stats and external child IDs
- *
- * @param actionFs the FileSystem object
- * @param runningJob the runningJob
+ * @param actionFs the FileSystem object
* @param action the Workflow action
* @param context executor context
*
*/
@Override
- protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context)
+ protected void getActionData(FileSystem actionFs, WorkflowAction action, Context context)
throws HadoopAccessorException, JDOMException, IOException, URISyntaxException{
- super.getActionData(actionFs, runningJob, action, context);
+ super.getActionData(actionFs, action, context);
readExternalChildIDs(action, context);
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java b/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java
index 794e825..0177241 100644
--- a/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java
+++ b/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java
@@ -18,6 +18,7 @@
package org.apache.oozie.service;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
@@ -29,7 +30,14 @@ import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
import org.apache.oozie.ErrorCode;
+import org.apache.oozie.action.ActionExecutor;
import org.apache.oozie.action.hadoop.JavaActionExecutor;
import org.apache.oozie.util.ParamChecker;
import org.apache.oozie.util.XConfiguration;
@@ -39,6 +47,7 @@ import org.apache.oozie.workflow.lite.LiteWorkflowAppParser;
import java.io.File;
import java.io.FileInputStream;
+import java.io.FileOutputStream;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.InputStream;
@@ -511,9 +520,43 @@ public class HadoopAccessorService implements Service {
}
/**
- * Return a FileSystem created with the provided user for the specified URI.
+ * Return a YarnClient created with the provided user and configuration.
*
+ * @param user The username to impersonate
+ * @param conf The conf
+ * @return a YarnClient with the provided user and configuration
+ * @throws HadoopAccessorException if the client could not be created.
+ */
+ public YarnClient createYarnClient(String user, final Configuration conf) throws HadoopAccessorException {
+ ParamChecker.notEmpty(user, "user");
+ if (!conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) {
+ throw new HadoopAccessorException(ErrorCode.E0903);
+ }
+ String rm = conf.get(JavaActionExecutor.HADOOP_YARN_RM);
+ validateJobTracker(rm);
+ try {
+ UserGroupInformation ugi = getUGI(user);
+ YarnClient yarnClient = ugi.doAs(new PrivilegedExceptionAction<YarnClient>() {
+ @Override
+ public YarnClient run() throws Exception {
+ YarnClient yarnClient = YarnClient.createYarnClient();
+ yarnClient.init(conf);
+ yarnClient.start();
+ return yarnClient;
+ }
+ });
+ return yarnClient;
+ } catch (InterruptedException ex) {
+ throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex);
+ } catch (IOException ex) {
+ throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex);
+ }
+ }
+
+ /**
+ * Return a FileSystem created with the provided user for the specified URI.
*
+ * @param user The username to impersonate
* @param uri file system URI.
* @param conf Configuration with all necessary information to create the FileSystem.
* @return FileSystem created with the provided user/group.
@@ -667,4 +710,56 @@ public class HadoopAccessorService implements Service {
return supportedSchemes;
}
+ /**
+ * Creates a {@link LocalResource} for the Configuration to localize it for a Yarn Container. This involves also writing it
+ * to HDFS.
+ * Example usage:
+ * * <pre>
+ * {@code
+ * LocalResource res1 = createLocalResourceForConfigurationFile(filename1, user, conf, uri, dir);
+ * LocalResource res2 = createLocalResourceForConfigurationFile(filename2, user, conf, uri, dir);
+ * ...
+ * Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+ * localResources.put(filename1, res1);
+ * localResources.put(filename2, res2);
+ * ...
+ * containerLaunchContext.setLocalResources(localResources);
+ * }
+ * </pre>
+ *
+ * @param filename The filename to use on the remote filesystem and once it has been localized.
+ * @param user The user
+ * @param conf The configuration to process
+ * @param uri The URI of the remote filesystem (e.g. HDFS)
+ * @param dir The directory on the remote filesystem to write the file to
+ * @return
+ * @throws IOException A problem occurred writing the file
+ * @throws HadoopAccessorException A problem occured with Hadoop
+ * @throws URISyntaxException A problem occurred parsing the URI
+ */
+ public LocalResource createLocalResourceForConfigurationFile(String filename, String user, Configuration conf, URI uri,
+ Path dir)
+ throws IOException, HadoopAccessorException, URISyntaxException {
+ File f = File.createTempFile(filename, ".tmp");
+ FileOutputStream fos = null;
+ try {
+ fos = new FileOutputStream(f);
+ conf.writeXml(fos);
+ } finally {
+ if (fos != null) {
+ fos.close();
+ }
+ }
+ FileSystem fs = createFileSystem(user, uri, conf);
+ Path dst = new Path(dir, filename);
+ fs.copyFromLocalFile(new Path(f.getAbsolutePath()), dst);
+ LocalResource localResource = Records.newRecord(LocalResource.class);
+ localResource.setType(LocalResourceType.FILE); localResource.setVisibility(LocalResourceVisibility.APPLICATION);
+ localResource.setResource(ConverterUtils.getYarnUrlFromPath(dst));
+ FileStatus destStatus = fs.getFileStatus(dst);
+ localResource.setTimestamp(destStatus.getModificationTime());
+ localResource.setSize(destStatus.getLen());
+ return localResource;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/main/java/org/apache/oozie/util/ClasspathUtils.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/util/ClasspathUtils.java b/core/src/main/java/org/apache/oozie/util/ClasspathUtils.java
new file mode 100644
index 0000000..8533371
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/util/ClasspathUtils.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.util;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+public class ClasspathUtils {
+ private static boolean usingMiniYarnCluster = false;
+ private static final List<String> CLASSPATH_ENTRIES = Arrays.asList(
+ ApplicationConstants.Environment.PWD.$(),
+ MRJobConfig.JOB_JAR + Path.SEPARATOR + MRJobConfig.JOB_JAR,
+ MRJobConfig.JOB_JAR + Path.SEPARATOR + "classes" + Path.SEPARATOR,
+ MRJobConfig.JOB_JAR + Path.SEPARATOR + "lib" + Path.SEPARATOR + "*",
+ ApplicationConstants.Environment.PWD.$() + Path.SEPARATOR + "*"
+ );
+
+ @VisibleForTesting
+ public static void setUsingMiniYarnCluster(boolean useMiniYarnCluster) {
+ usingMiniYarnCluster = useMiniYarnCluster;
+ }
+
+ // Adapted from MRApps#setClasspath. Adds Yarn, HDFS, Common, and distributed cache jars.
+ public static void setupClasspath(Map<String, String> env, Configuration conf) throws IOException {
+ // Propagate the system classpath when using the mini cluster
+ if (usingMiniYarnCluster) {
+ MRApps.addToEnvironment(
+ env,
+ ApplicationConstants.Environment.CLASSPATH.name(),
+ System.getProperty("java.class.path"), conf);
+ }
+
+ for (String entry : CLASSPATH_ENTRIES) {
+ MRApps.addToEnvironment(env, ApplicationConstants.Environment.CLASSPATH.name(), entry, conf);
+ }
+
+ // a * in the classpath will only find a .jar, so we need to filter out
+ // all .jars and add everything else
+ addToClasspathIfNotJar(org.apache.hadoop.mapreduce.filecache.DistributedCache.getFileClassPaths(conf),
+ org.apache.hadoop.mapreduce.filecache.DistributedCache.getCacheFiles(conf),
+ conf,
+ env, ApplicationConstants.Environment.PWD.$());
+ addToClasspathIfNotJar(org.apache.hadoop.mapreduce.filecache.DistributedCache.getArchiveClassPaths(conf),
+ org.apache.hadoop.mapreduce.filecache.DistributedCache.getCacheArchives(conf),
+ conf,
+ env, ApplicationConstants.Environment.PWD.$());
+
+
+ boolean crossPlatform = conf.getBoolean(MRConfig.MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM,
+ MRConfig.DEFAULT_MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM);
+
+ for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+ crossPlatform
+ ? YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH
+ : YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
+ MRApps.addToEnvironment(env, ApplicationConstants.Environment.CLASSPATH.name(),
+ c.trim(), conf);
+ }
+ }
+
+ // Adapted from MRApps#setClasspath
+ public static void addMapReduceToClasspath(Map<String, String> env, Configuration conf) {
+ boolean crossPlatform = conf.getBoolean(MRConfig.MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM,
+ MRConfig.DEFAULT_MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM);
+
+ for (String c : conf.getStrings(MRJobConfig.MAPREDUCE_APPLICATION_CLASSPATH,
+ crossPlatform ?
+ StringUtils.getStrings(MRJobConfig.DEFAULT_MAPREDUCE_CROSS_PLATFORM_APPLICATION_CLASSPATH)
+ : StringUtils.getStrings(MRJobConfig.DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH))) {
+ MRApps.addToEnvironment(env, ApplicationConstants.Environment.CLASSPATH.name(),
+ c.trim(), conf);
+ }
+ }
+
+ // Borrowed from MRApps#addToClasspathIfNotJar
+ private static void addToClasspathIfNotJar(Path[] paths,
+ URI[] withLinks, Configuration conf,
+ Map<String, String> environment,
+ String classpathEnvVar) throws IOException {
+ if (paths != null) {
+ HashMap<Path, String> linkLookup = new HashMap<Path, String>();
+ if (withLinks != null) {
+ for (URI u: withLinks) {
+ Path p = new Path(u);
+ FileSystem remoteFS = p.getFileSystem(conf);
+ p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
+ remoteFS.getWorkingDirectory()));
+ String name = (null == u.getFragment())
+ ? p.getName() : u.getFragment();
+ if (!name.toLowerCase(Locale.ENGLISH).endsWith(".jar")) {
+ linkLookup.put(p, name);
+ }
+ }
+ }
+
+ for (Path p : paths) {
+ FileSystem remoteFS = p.getFileSystem(conf);
+ p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
+ remoteFS.getWorkingDirectory()));
+ String name = linkLookup.get(p);
+ if (name == null) {
+ name = p.getName();
+ }
+ if(!name.toLowerCase(Locale.ENGLISH).endsWith(".jar")) {
+ MRApps.addToEnvironment(
+ environment,
+ classpathEnvVar,
+ ApplicationConstants.Environment.PWD.$() + Path.SEPARATOR + name, conf);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/main/resources/oozie-default.xml
----------------------------------------------------------------------
diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml
index 6c2f7d8..5f4645c 100644
--- a/core/src/main/resources/oozie-default.xml
+++ b/core/src/main/resources/oozie-default.xml
@@ -1782,31 +1782,6 @@ will be the requeue interval for the actions which are waiting for a long time w
</property>
<property>
- <name>oozie.action.launcher.mapreduce.job.ubertask.enable</name>
- <value>true</value>
- <description>
- Enables Uber Mode for the launcher job in YARN/Hadoop 2 (no effect in Hadoop 1) for all action types by default.
- This can be overridden on a per-action-type basis by setting
- oozie.action.#action-type#.launcher.mapreduce.job.ubertask.enable in oozie-site.xml (where #action-type# is the action
- type; for example, "pig"). And that can be overridden on a per-action basis by setting
- oozie.launcher.mapreduce.job.ubertask.enable in an action's configuration section in a workflow. In summary, the
- priority is this:
- 1. action's configuration section in a workflow
- 2. oozie.action.#action-type#.launcher.mapreduce.job.ubertask.enable in oozie-site
- 3. oozie.action.launcher.mapreduce.job.ubertask.enable in oozie-site
- </description>
- </property>
-
- <property>
- <name>oozie.action.shell.launcher.mapreduce.job.ubertask.enable</name>
- <value>false</value>
- <description>
- The Shell action may have issues with the $PATH environment when using Uber Mode, and so Uber Mode is disabled by
- default for it. See oozie.action.launcher.mapreduce.job.ubertask.enable
- </description>
- </property>
-
- <property>
<name>oozie.action.shell.setup.hadoop.conf.dir</name>
<value>false</value>
<description>
http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/test/java/org/apache/oozie/QueryServlet.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/QueryServlet.java b/core/src/test/java/org/apache/oozie/QueryServlet.java
new file mode 100644
index 0000000..8789438
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/QueryServlet.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.net.URLDecoder;
+
+/**
+ * Servlet that keeps track of the last query string it recieved
+ */
+public class QueryServlet extends HttpServlet {
+
+ public static String lastQueryString = null;
+
+ protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
+ lastQueryString = URLDecoder.decode(request.getQueryString(), "UTF-8");
+ response.setStatus(HttpServletResponse.SC_OK);
+ }
+
+}