You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by ge...@apache.org on 2017/05/26 09:27:55 UTC
[03/10] oozie git commit: OOZIE-1770 Create Oozie Application Master
for YARN (asasvari, pbacsko, rkanter, gezapeti)
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/hcatalog/pom.xml
----------------------------------------------------------------------
diff --git a/sharelib/hcatalog/pom.xml b/sharelib/hcatalog/pom.xml
index 6eb88ef..fea277f 100644
--- a/sharelib/hcatalog/pom.xml
+++ b/sharelib/hcatalog/pom.xml
@@ -297,18 +297,6 @@
<outputFile>${project.build.directory}/classpath</outputFile>
</configuration>
</execution>
- <execution>
- <id>create-mrapp-generated-classpath</id>
- <phase>generate-test-resources</phase>
- <goals>
- <goal>build-classpath</goal>
- </goals>
- <configuration>
- <!-- needed to run the unit test for DS to generate the required classpath
- that is required in the env of the launch container in the mini mr/yarn cluster -->
- <outputFile>${project.build.directory}/test-classes/mrapp-generated-classpath</outputFile>
- </configuration>
- </execution>
</executions>
</plugin>
<plugin>
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/hive/pom.xml
----------------------------------------------------------------------
diff --git a/sharelib/hive/pom.xml b/sharelib/hive/pom.xml
index 1331219..7268fa9 100644
--- a/sharelib/hive/pom.xml
+++ b/sharelib/hive/pom.xml
@@ -122,18 +122,10 @@
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <scope>provided</scope>
- </dependency>
-
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minicluster</artifactId>
</dependency>
-
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
@@ -150,8 +142,8 @@
<scope>provided</scope>
</dependency>
<dependency>
- <groupId>org.apache.oozie</groupId>
- <artifactId>oozie-hadoop-utils</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
@@ -184,18 +176,6 @@
<outputFile>${project.build.directory}/classpath</outputFile>
</configuration>
</execution>
- <execution>
- <id>create-mrapp-generated-classpath</id>
- <phase>generate-test-resources</phase>
- <goals>
- <goal>build-classpath</goal>
- </goals>
- <configuration>
- <!-- needed to run the unit test for DS to generate the required classpath
- that is required in the env of the launch container in the mini mr/yarn cluster -->
- <outputFile>${project.build.directory}/test-classes/mrapp-generated-classpath</outputFile>
- </configuration>
- </execution>
</executions>
</plugin>
<plugin>
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/hive/src/main/java/org/apache/oozie/action/hadoop/HiveMain.java
----------------------------------------------------------------------
diff --git a/sharelib/hive/src/main/java/org/apache/oozie/action/hadoop/HiveMain.java b/sharelib/hive/src/main/java/org/apache/oozie/action/hadoop/HiveMain.java
index 6a600fa..1f88c85 100644
--- a/sharelib/hive/src/main/java/org/apache/oozie/action/hadoop/HiveMain.java
+++ b/sharelib/hive/src/main/java/org/apache/oozie/action/hadoop/HiveMain.java
@@ -233,8 +233,11 @@ public class HiveMain extends LauncherMain {
File localDir = new File("dummy").getAbsoluteFile().getParentFile();
System.out.println("Current (local) dir = " + localDir.getAbsolutePath());
System.out.println("------------------------");
- for (String file : localDir.list()) {
- System.out.println(" " + file);
+ String[] files = localDir.list();
+ if (files != null) {
+ for (String file : files) {
+ System.out.println(" " + file);
+ }
}
System.out.println("------------------------");
System.out.println();
@@ -264,7 +267,7 @@ public class HiveMain extends LauncherMain {
}
// Pass any parameters to Hive via arguments
- String[] params = MapReduceMain.getStrings(hiveConf, HiveActionExecutor.HIVE_PARAMS);
+ String[] params = ActionUtils.getStrings(hiveConf, HiveActionExecutor.HIVE_PARAMS);
if (params.length > 0) {
System.out.println("Parameters:");
System.out.println("------------------------");
@@ -284,7 +287,7 @@ public class HiveMain extends LauncherMain {
System.out.println();
}
- String[] hiveArgs = MapReduceMain.getStrings(hiveConf, HiveActionExecutor.HIVE_ARGS);
+ String[] hiveArgs = ActionUtils.getStrings(hiveConf, HiveActionExecutor.HIVE_ARGS);
for (String hiveArg : hiveArgs) {
if (DISALLOWED_HIVE_OPTIONS.contains(hiveArg)) {
throw new RuntimeException("Error: Hive argument " + hiveArg + " is not supported");
@@ -298,7 +301,7 @@ public class HiveMain extends LauncherMain {
}
System.out.println();
- LauncherMainHadoopUtils.killChildYarnJobs(hiveConf);
+ LauncherMain.killChildYarnJobs(hiveConf);
System.out.println("=================================================================");
System.out.println();
@@ -309,13 +312,6 @@ public class HiveMain extends LauncherMain {
try {
runHive(arguments.toArray(new String[arguments.size()]));
}
- catch (SecurityException ex) {
- if (LauncherSecurityManager.getExitInvoked()) {
- if (LauncherSecurityManager.getExitCode() != 0) {
- throw ex;
- }
- }
- }
finally {
System.out.println("\n<<< Invocation of Hive command completed <<<\n");
writeExternalChildIDs(logFile, HIVE_JOB_IDS_PATTERNS, "Hive");
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/hive/src/test/java/org/apache/oozie/action/hadoop/TestHiveActionExecutor.java
----------------------------------------------------------------------
diff --git a/sharelib/hive/src/test/java/org/apache/oozie/action/hadoop/TestHiveActionExecutor.java b/sharelib/hive/src/test/java/org/apache/oozie/action/hadoop/TestHiveActionExecutor.java
index 12e1e91..71ee641 100644
--- a/sharelib/hive/src/test/java/org/apache/oozie/action/hadoop/TestHiveActionExecutor.java
+++ b/sharelib/hive/src/test/java/org/apache/oozie/action/hadoop/TestHiveActionExecutor.java
@@ -22,7 +22,6 @@ import java.io.FileInputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
-import java.io.StringReader;
import java.io.Writer;
import java.text.MessageFormat;
import java.util.Arrays;
@@ -32,22 +31,14 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.RunningJob;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.service.ConfigurationService;
-import org.apache.oozie.service.HadoopAccessorService;
-import org.apache.oozie.service.Services;
import org.apache.oozie.service.WorkflowAppService;
import org.apache.oozie.util.ClassUtils;
import org.apache.oozie.util.IOUtils;
import org.apache.oozie.util.XConfiguration;
-import org.apache.oozie.util.XmlUtils;
-import org.jdom.Element;
import org.jdom.Namespace;
public class TestHiveActionExecutor extends ActionExecutorTestCase {
@@ -163,19 +154,13 @@ public class TestHiveActionExecutor extends ActionExecutorTestCase {
dataWriter.close();
Context context = createContext(getActionScriptXml());
Namespace ns = Namespace.getNamespace("uri:oozie:hive-action:0.2");
- final RunningJob launcherJob = submitAction(context, ns);
- String launcherId = context.getAction().getExternalId();
- waitFor(200 * 1000, new Predicate() {
- public boolean evaluate() throws Exception {
- return launcherJob.isComplete();
- }
- });
- assertTrue(launcherJob.isSuccessful());
+ final String launcherId = submitAction(context, ns);
+ waitUntilYarnAppDoneAndAssertSuccess(launcherId);
Configuration conf = new XConfiguration();
conf.set("user.name", getTestUser());
- Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(),
+ Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(),
conf);
- assertFalse(LauncherMapperHelper.hasIdSwap(actionData));
+ assertFalse(LauncherHelper.hasIdSwap(actionData));
HiveActionExecutor ae = new HiveActionExecutor();
ae.check(context, context.getAction());
assertTrue(launcherId.equals(context.getAction().getExternalId()));
@@ -192,19 +177,13 @@ public class TestHiveActionExecutor extends ActionExecutorTestCase {
{
Context context = createContext(getActionQueryXml(hiveScript));
Namespace ns = Namespace.getNamespace("uri:oozie:hive-action:0.6");
- final RunningJob launcherJob = submitAction(context, ns);
- String launcherId = context.getAction().getExternalId();
- waitFor(200 * 1000, new Predicate() {
- public boolean evaluate() throws Exception {
- return launcherJob.isComplete();
- }
- });
- assertTrue(launcherJob.isSuccessful());
+ final String launcherId = submitAction(context, ns);
+ waitUntilYarnAppDoneAndAssertSuccess(launcherId);
Configuration conf = new XConfiguration();
conf.set("user.name", getTestUser());
- Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(),
+ Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(),
conf);
- assertFalse(LauncherMapperHelper.hasIdSwap(actionData));
+ assertFalse(LauncherHelper.hasIdSwap(actionData));
HiveActionExecutor ae = new HiveActionExecutor();
ae.check(context, context.getAction());
assertTrue(launcherId.equals(context.getAction().getExternalId()));
@@ -220,7 +199,7 @@ public class TestHiveActionExecutor extends ActionExecutorTestCase {
}
}
- private RunningJob submitAction(Context context, Namespace ns) throws Exception {
+ private String submitAction(Context context, Namespace ns) throws Exception {
HiveActionExecutor ae = new HiveActionExecutor();
WorkflowAction action = context.getAction();
@@ -234,22 +213,9 @@ public class TestHiveActionExecutor extends ActionExecutorTestCase {
assertNotNull(jobId);
assertNotNull(jobTracker);
assertNotNull(consoleUrl);
- Element e = XmlUtils.parseXml(action.getConf());
- XConfiguration conf =
- new XConfiguration(new StringReader(XmlUtils.prettyPrint(e.getChild("configuration", ns)).toString()));
- conf.set("mapred.job.tracker", e.getChildTextTrim("job-tracker", ns));
- conf.set("fs.default.name", e.getChildTextTrim("name-node", ns));
- conf.set("user.name", context.getProtoActionConf().get("user.name"));
- conf.set("group.name", getTestGroup());
- JobConf jobConf = Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker);
- XConfiguration.copy(conf, jobConf);
- String user = jobConf.get("user.name");
- String group = jobConf.get("group.name");
- JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, jobConf);
- final RunningJob runningJob = jobClient.getJob(JobID.forName(jobId));
- assertNotNull(runningJob);
- return runningJob;
+
+ return jobId;
}
private String copyJar(String targetFile, Class<?> anyContainedClass)
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/hive/src/test/java/org/apache/oozie/action/hadoop/TestHiveMain.java
----------------------------------------------------------------------
diff --git a/sharelib/hive/src/test/java/org/apache/oozie/action/hadoop/TestHiveMain.java b/sharelib/hive/src/test/java/org/apache/oozie/action/hadoop/TestHiveMain.java
index 2ba0da7..bbd6246 100644
--- a/sharelib/hive/src/test/java/org/apache/oozie/action/hadoop/TestHiveMain.java
+++ b/sharelib/hive/src/test/java/org/apache/oozie/action/hadoop/TestHiveMain.java
@@ -110,10 +110,10 @@ public class TestHiveMain extends MainTestCase {
SharelibUtils.addToDistributedCache("hive", fs, getFsTestCaseDir(), jobConf);
jobConf.set(HiveActionExecutor.HIVE_SCRIPT, script.toString());
- MapReduceMain.setStrings(jobConf, HiveActionExecutor.HIVE_PARAMS, new String[]{
+ ActionUtils.setStrings(jobConf, HiveActionExecutor.HIVE_PARAMS, new String[]{
"IN=" + inputDir.toUri().getPath(),
"OUT=" + outputDir.toUri().getPath()});
- MapReduceMain.setStrings(jobConf, HiveActionExecutor.HIVE_ARGS,
+ ActionUtils.setStrings(jobConf, HiveActionExecutor.HIVE_ARGS,
new String[]{ "-v" });
File actionXml = new File(getTestCaseDir(), "action.xml");
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/hive2/pom.xml
----------------------------------------------------------------------
diff --git a/sharelib/hive2/pom.xml b/sharelib/hive2/pom.xml
index e81bfbe..be51cd1 100644
--- a/sharelib/hive2/pom.xml
+++ b/sharelib/hive2/pom.xml
@@ -171,11 +171,7 @@
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.apache.oozie</groupId>
- <artifactId>oozie-hadoop-utils</artifactId>
- <scope>provided</scope>
- </dependency>
+
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
@@ -221,18 +217,6 @@
<outputFile>${project.build.directory}/classpath</outputFile>
</configuration>
</execution>
- <execution>
- <id>create-mrapp-generated-classpath</id>
- <phase>generate-test-resources</phase>
- <goals>
- <goal>build-classpath</goal>
- </goals>
- <configuration>
- <!-- needed to run the unit test for DS to generate the required classpath
- that is required in the env of the launch container in the mini mr/yarn cluster -->
- <outputFile>${project.build.directory}/test-classes/mrapp-generated-classpath</outputFile>
- </configuration>
- </execution>
</executions>
</plugin>
<plugin>
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/hive2/src/main/java/org/apache/oozie/action/hadoop/Hive2Main.java
----------------------------------------------------------------------
diff --git a/sharelib/hive2/src/main/java/org/apache/oozie/action/hadoop/Hive2Main.java b/sharelib/hive2/src/main/java/org/apache/oozie/action/hadoop/Hive2Main.java
index a3a07bd..e626dbb 100644
--- a/sharelib/hive2/src/main/java/org/apache/oozie/action/hadoop/Hive2Main.java
+++ b/sharelib/hive2/src/main/java/org/apache/oozie/action/hadoop/Hive2Main.java
@@ -152,8 +152,11 @@ public class Hive2Main extends LauncherMain {
File localDir = new File("dummy").getAbsoluteFile().getParentFile();
System.out.println("Current (local) dir = " + localDir.getAbsolutePath());
System.out.println("------------------------");
- for (String file : localDir.list()) {
- System.out.println(" " + file);
+ String[] files = localDir.list();
+ if (files != null) {
+ for (String file : files) {
+ System.out.println(" " + file);
+ }
}
System.out.println("------------------------");
System.out.println();
@@ -183,7 +186,7 @@ public class Hive2Main extends LauncherMain {
}
// Pass any parameters to Beeline via arguments
- String[] params = MapReduceMain.getStrings(actionConf, Hive2ActionExecutor.HIVE2_PARAMS);
+ String[] params = ActionUtils.getStrings(actionConf, Hive2ActionExecutor.HIVE2_PARAMS);
if (params.length > 0) {
System.out.println("Parameters:");
System.out.println("------------------------");
@@ -208,7 +211,7 @@ public class Hive2Main extends LauncherMain {
arguments.add("-a");
arguments.add("delegationToken");
- String[] beelineArgs = MapReduceMain.getStrings(actionConf, Hive2ActionExecutor.HIVE2_ARGS);
+ String[] beelineArgs = ActionUtils.getStrings(actionConf, Hive2ActionExecutor.HIVE2_ARGS);
for (String beelineArg : beelineArgs) {
if (DISALLOWED_BEELINE_OPTIONS.contains(beelineArg)) {
throw new RuntimeException("Error: Beeline argument " + beelineArg + " is not supported");
@@ -233,7 +236,7 @@ public class Hive2Main extends LauncherMain {
}
System.out.println();
- LauncherMainHadoopUtils.killChildYarnJobs(actionConf);
+ LauncherMain.killChildYarnJobs(actionConf);
System.out.println("=================================================================");
System.out.println();
@@ -244,13 +247,6 @@ public class Hive2Main extends LauncherMain {
try {
runBeeline(arguments.toArray(new String[arguments.size()]), logFile);
}
- catch (SecurityException ex) {
- if (LauncherSecurityManager.getExitInvoked()) {
- if (LauncherSecurityManager.getExitCode() != 0) {
- throw ex;
- }
- }
- }
finally {
System.out.println("\n<<< Invocation of Beeline command completed <<<\n");
writeExternalChildIDs(logFile, HIVE2_JOB_IDS_PATTERNS, "Beeline");
@@ -269,6 +265,7 @@ public class Hive2Main extends LauncherMain {
BeeLine beeLine = new BeeLine();
beeLine.setErrorStream(new PrintStream(new TeeOutputStream(System.err, new FileOutputStream(logFile))));
int status = beeLine.begin(args, null);
+ beeLine.close();
if (status != 0) {
System.exit(status);
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/hive2/src/test/java/org/apache/oozie/action/hadoop/TestHive2ActionExecutor.java
----------------------------------------------------------------------
diff --git a/sharelib/hive2/src/test/java/org/apache/oozie/action/hadoop/TestHive2ActionExecutor.java b/sharelib/hive2/src/test/java/org/apache/oozie/action/hadoop/TestHive2ActionExecutor.java
index 4818bb6..2127eb0 100644
--- a/sharelib/hive2/src/test/java/org/apache/oozie/action/hadoop/TestHive2ActionExecutor.java
+++ b/sharelib/hive2/src/test/java/org/apache/oozie/action/hadoop/TestHive2ActionExecutor.java
@@ -19,7 +19,6 @@
package org.apache.oozie.action.hadoop;
import java.io.OutputStreamWriter;
-import java.io.StringReader;
import java.io.Writer;
import java.text.MessageFormat;
import java.util.ArrayList;
@@ -29,15 +28,9 @@ import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.RunningJob;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.client.WorkflowAction;
-import org.apache.oozie.service.HadoopAccessorService;
-import org.apache.oozie.service.Services;
import org.apache.oozie.service.WorkflowAppService;
import org.apache.oozie.util.XConfiguration;
import org.apache.oozie.util.XmlUtils;
@@ -69,10 +62,9 @@ public class TestHive2ActionExecutor extends ActionExecutorTestCase {
setSystemProperty("oozie.service.ActionService.executor.classes", Hive2ActionExecutor.class.getName());
}
- @SuppressWarnings("unchecked")
public void testSetupMethodsForScript() throws Exception {
Hive2ActionExecutor ae = new Hive2ActionExecutor();
- List<Class> classes = new ArrayList<Class>();
+ List<Class<?>> classes = new ArrayList<>();
classes.add(Hive2Main.class);
assertEquals(classes, ae.getLauncherClasses());
@@ -110,10 +102,9 @@ public class TestHive2ActionExecutor extends ActionExecutorTestCase {
assertEquals("--dee", conf.get("oozie.hive2.args.1"));
}
- @SuppressWarnings("unchecked")
public void testSetupMethodsForQuery() throws Exception {
Hive2ActionExecutor ae = new Hive2ActionExecutor();
- List<Class> classes = new ArrayList<Class>();
+ List<Class<?>> classes = new ArrayList<>();
classes.add(Hive2Main.class);
assertEquals(classes, ae.getLauncherClasses());
@@ -192,7 +183,6 @@ public class TestHive2ActionExecutor extends ActionExecutorTestCase {
"<query>" + query + "</query>" + "</hive2>";
}
- @SuppressWarnings("deprecation")
public void testHive2Action() throws Exception {
setupHiveServer2();
Path inputDir = new Path(getFsTestCaseDir(), INPUT_DIRNAME);
@@ -205,21 +195,14 @@ public class TestHive2ActionExecutor extends ActionExecutorTestCase {
dataWriter.write(SAMPLE_DATA_TEXT);
dataWriter.close();
Context context = createContext(getQueryActionXml(query));
- final RunningJob launcherJob = submitAction(context,
+ final String launcherId = submitAction(context,
Namespace.getNamespace("uri:oozie:hive2-action:0.2"));
- String launcherId = context.getAction().getExternalId();
- waitFor(200 * 1000, new Predicate() {
- @Override
- public boolean evaluate() throws Exception {
- return launcherJob.isComplete();
- }
- });
- assertTrue(launcherJob.isSuccessful());
+ waitUntilYarnAppDoneAndAssertSuccess(launcherId);
Configuration conf = new XConfiguration();
conf.set("user.name", getTestUser());
- Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(),
+ Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(),
conf);
- assertFalse(LauncherMapperHelper.hasIdSwap(actionData));
+ assertFalse(LauncherHelper.hasIdSwap(actionData));
Hive2ActionExecutor ae = new Hive2ActionExecutor();
ae.check(context, context.getAction());
assertTrue(launcherId.equals(context.getAction().getExternalId()));
@@ -241,21 +224,14 @@ public class TestHive2ActionExecutor extends ActionExecutorTestCase {
dataWriter.write(SAMPLE_DATA_TEXT);
dataWriter.close();
Context context = createContext(getScriptActionXml());
- final RunningJob launcherJob = submitAction(context,
+ final String launcherId = submitAction(context,
Namespace.getNamespace("uri:oozie:hive2-action:0.1"));
- String launcherId = context.getAction().getExternalId();
- waitFor(200 * 1000, new Predicate() {
- @Override
- public boolean evaluate() throws Exception {
- return launcherJob.isComplete();
- }
- });
- assertTrue(launcherJob.isSuccessful());
+ waitUntilYarnAppDoneAndAssertSuccess(launcherId);
Configuration conf = new XConfiguration();
conf.set("user.name", getTestUser());
- Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(),
+ Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(),
conf);
- assertFalse(LauncherMapperHelper.hasIdSwap(actionData));
+ assertFalse(LauncherHelper.hasIdSwap(actionData));
Hive2ActionExecutor ae = new Hive2ActionExecutor();
ae.check(context, context.getAction());
assertTrue(launcherId.equals(context.getAction().getExternalId()));
@@ -267,35 +243,33 @@ public class TestHive2ActionExecutor extends ActionExecutorTestCase {
assertTrue(fs.exists(outputDir));
assertTrue(fs.isDirectory(outputDir));
}
- // Negative testcase with incorrect hive-query.
- {
- String query = getHive2BadScript(inputDir.toString(), outputDir.toString());
- Writer dataWriter = new OutputStreamWriter(fs.create(new Path(inputDir, DATA_FILENAME)));
- dataWriter.write(SAMPLE_DATA_TEXT);
- dataWriter.close();
- Context context = createContext(getQueryActionXml(query));
- final RunningJob launcherJob = submitAction(context, Namespace.getNamespace("uri:oozie:hive2-action:0.2"));
- String launcherId = context.getAction().getExternalId();
- waitFor(200 * 1000, new Predicate() {
- @Override
- public boolean evaluate() throws Exception {
- return launcherJob.isComplete();
- }
- });
- assertTrue(launcherJob.isSuccessful());
- Configuration conf = new XConfiguration();
- conf.set("user.name", getTestUser());
- Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(),
- conf);
- assertFalse(LauncherMapperHelper.hasIdSwap(actionData));
- Hive2ActionExecutor ae = new Hive2ActionExecutor();
- ae.check(context, context.getAction());
- assertTrue(launcherId.equals(context.getAction().getExternalId()));
- assertEquals("FAILED/KILLED", context.getAction().getExternalStatus());
- ae.end(context, context.getAction());
- assertEquals(WorkflowAction.Status.ERROR, context.getAction().getStatus());
- assertNull(context.getExternalChildIDs());
- }
+ }
+
+ public void testHive2ActionFails() throws Exception {
+ setupHiveServer2();
+ Path inputDir = new Path(getFsTestCaseDir(), INPUT_DIRNAME);
+ Path outputDir = new Path(getFsTestCaseDir(), OUTPUT_DIRNAME);
+ FileSystem fs = getFileSystem();
+
+ String query = getHive2BadScript(inputDir.toString(), outputDir.toString());
+ Writer dataWriter = new OutputStreamWriter(fs.create(new Path(inputDir, DATA_FILENAME)));
+ dataWriter.write(SAMPLE_DATA_TEXT);
+ dataWriter.close();
+ Context context = createContext(getQueryActionXml(query));
+ final String launcherId = submitAction(context, Namespace.getNamespace("uri:oozie:hive2-action:0.2"));
+ waitUntilYarnAppDoneAndAssertSuccess(launcherId);
+ Configuration conf = new XConfiguration();
+ conf.set("user.name", getTestUser());
+ Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(),
+ conf);
+ assertFalse(LauncherHelper.hasIdSwap(actionData));
+ Hive2ActionExecutor ae = new Hive2ActionExecutor();
+ ae.check(context, context.getAction());
+ assertTrue(launcherId.equals(context.getAction().getExternalId()));
+ assertEquals("FAILED/KILLED", context.getAction().getExternalStatus());
+ ae.end(context, context.getAction());
+ assertEquals(WorkflowAction.Status.ERROR, context.getAction().getStatus());
+ assertNull(context.getExternalChildIDs());
}
private String getHive2BadScript(String inputPath, String outputPath) {
@@ -311,7 +285,7 @@ public class TestHive2ActionExecutor extends ActionExecutorTestCase {
return buffer.toString();
}
- private RunningJob submitAction(Context context, Namespace ns) throws Exception {
+ private String submitAction(Context context, Namespace ns) throws Exception {
Hive2ActionExecutor ae = new Hive2ActionExecutor();
WorkflowAction action = context.getAction();
@@ -325,21 +299,7 @@ public class TestHive2ActionExecutor extends ActionExecutorTestCase {
assertNotNull(jobId);
assertNotNull(jobTracker);
assertNotNull(consoleUrl);
- Element e = XmlUtils.parseXml(action.getConf());
- XConfiguration conf =
- new XConfiguration(new StringReader(XmlUtils.prettyPrint(e.getChild("configuration", ns)).toString()));
- conf.set("mapred.job.tracker", e.getChildTextTrim("job-tracker", ns));
- conf.set("fs.default.name", e.getChildTextTrim("name-node", ns));
- conf.set("user.name", context.getProtoActionConf().get("user.name"));
- conf.set("group.name", getTestGroup());
-
- JobConf jobConf = Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker);
- XConfiguration.copy(conf, jobConf);
- String user = jobConf.get("user.name");
- JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, jobConf);
- final RunningJob runningJob = jobClient.getJob(JobID.forName(jobId));
- assertNotNull(runningJob);
- return runningJob;
+ return jobId;
}
private Context createContext(String actionXml) throws Exception {
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/oozie/pom.xml
----------------------------------------------------------------------
diff --git a/sharelib/oozie/pom.xml b/sharelib/oozie/pom.xml
index f3ea071..12f5cdd 100644
--- a/sharelib/oozie/pom.xml
+++ b/sharelib/oozie/pom.xml
@@ -61,17 +61,17 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
-
- <dependency>
- <groupId>org.apache.oozie</groupId>
- <artifactId>oozie-hadoop-utils</artifactId>
- <scope>compile</scope>
- </dependency>
</dependencies>
<build>
@@ -97,18 +97,6 @@
<outputFile>${project.build.directory}/classpath</outputFile>
</configuration>
</execution>
- <execution>
- <id>create-mrapp-generated-classpath</id>
- <phase>generate-test-resources</phase>
- <goals>
- <goal>build-classpath</goal>
- </goals>
- <configuration>
- <!-- needed to run the unit test for DS to generate the required classpath
- that is required in the env of the launch container in the mini mr/yarn cluster -->
- <outputFile>${project.build.directory}/test-classes/mrapp-generated-classpath</outputFile>
- </configuration>
- </execution>
</executions>
</plugin>
<plugin>
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/AMRMCallBackHandler.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/AMRMCallBackHandler.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/AMRMCallBackHandler.java
new file mode 100644
index 0000000..e6c9d04
--- /dev/null
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/AMRMCallBackHandler.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+
+// Note: methods which modify/read the state of errorHolder are synchronized to avoid data races when LauncherAM invokes getError()
+public class AMRMCallBackHandler implements AMRMClientAsync.CallbackHandler {
+ private ErrorHolder errorHolder;
+
+ @Override
+ public void onContainersCompleted(List<ContainerStatus> containerStatuses) {
+ //noop
+ }
+
+ @Override
+ public void onContainersAllocated(List<Container> containers) {
+ //noop
+ }
+
+ @Override
+ public synchronized void onShutdownRequest() {
+ System.out.println("Resource manager requested AM Shutdown");
+ errorHolder = new ErrorHolder();
+ errorHolder.setErrorCode(0);
+ errorHolder.setErrorMessage("ResourceManager requested AM Shutdown");
+ }
+
+ @Override
+ public void onNodesUpdated(List<NodeReport> nodeReports) {
+ //noop
+ }
+
+ @Override
+ public float getProgress() {
+ return 0.5f;
+ }
+
+ @Override
+ public synchronized void onError(final Throwable ex) {
+ System.out.println("Received asynchronous error");
+ ex.printStackTrace();
+ errorHolder = new ErrorHolder();
+ errorHolder.setErrorCode(0);
+ errorHolder.setErrorMessage(ex.getMessage());
+ errorHolder.setErrorCause(ex);
+ }
+
+ public synchronized ErrorHolder getError() {
+ return errorHolder;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/AMRMClientAsyncFactory.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/AMRMClientAsyncFactory.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/AMRMClientAsyncFactory.java
new file mode 100644
index 0000000..b4cbb4b
--- /dev/null
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/AMRMClientAsyncFactory.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+
+public class AMRMClientAsyncFactory {
+
+ public AMRMClientAsync<?> createAMRMClientAsync(int intervalMs) {
+ AMRMClient<?> amRmClient = AMRMClient.createAMRMClient();
+ AMRMCallBackHandler callBackHandler = new AMRMCallBackHandler();
+ AMRMClientAsync<?> amRmClientAsync = AMRMClientAsync.createAMRMClientAsync(amRmClient, intervalMs, callBackHandler);
+
+ return amRmClientAsync;
+ }
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/ActionUtils.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/ActionUtils.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/ActionUtils.java
new file mode 100644
index 0000000..3002ad5
--- /dev/null
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/ActionUtils.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.action.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+
+public final class ActionUtils {
+
+ private ActionUtils() {
+ // no instances
+ }
+
+ public static void setStrings(Configuration conf, String key, String[] values) {
+ if (values != null) {
+ conf.setInt(key + ".size", values.length);
+ for (int i = 0; i < values.length; i++) {
+ conf.set(key + "." + i, values[i]);
+ }
+ }
+ }
+
+ public static String[] getStrings(Configuration conf, String key) {
+ String[] values = new String[conf.getInt(key + ".size", 0)];
+ for (int i = 0; i < values.length; i++) {
+ values[i] = conf.get(key + "." + i);
+ if (values[i] == null) {
+ values[i] = "";
+ }
+ }
+ return values;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/ErrorHolder.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/ErrorHolder.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/ErrorHolder.java
new file mode 100644
index 0000000..6a755db
--- /dev/null
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/ErrorHolder.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+public class ErrorHolder {
+ private int errorCode = 0;
+ private Throwable errorCause = null;
+ private String errorMessage = null;
+ private boolean populated = false;
+
+ public int getErrorCode() {
+ return errorCode;
+ }
+
+ public void setErrorCode(int errorCode) {
+ this.errorCode = errorCode;
+ this.populated = true;
+ }
+
+ public Throwable getErrorCause() {
+ return errorCause;
+ }
+
+ public void setErrorCause(Throwable errorCause) {
+ this.errorCause = errorCause;
+ this.populated = true;
+ }
+
+ public String getErrorMessage() {
+ return errorMessage;
+ }
+
+ public void setErrorMessage(String errorMessage) {
+ this.errorMessage = errorMessage;
+ this.populated = true;
+ }
+
+ public boolean isPopulated() {
+ return populated;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/HdfsOperations.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/HdfsOperations.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/HdfsOperations.java
new file mode 100644
index 0000000..874d371
--- /dev/null
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/HdfsOperations.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.security.PrivilegedExceptionAction;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import com.google.common.base.Preconditions;
+
+public class HdfsOperations {
+ private static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;
+ private final SequenceFileWriterFactory seqFileWriterFactory;
+ private final UserGroupInformation ugi;
+
+ public HdfsOperations(SequenceFileWriterFactory seqFileWriterFactory, UserGroupInformation ugi) {
+ this.seqFileWriterFactory = Preconditions.checkNotNull(seqFileWriterFactory, "seqFileWriterFactory should not be null");
+ this.ugi = Preconditions.checkNotNull(ugi, "ugi should not be null");
+ }
+
+ /**
+ * Creates a Sequence file which contains the output from an action and uploads it to HDFS.
+ */
+ public void uploadActionDataToHDFS(final Configuration launcherJobConf, final Path actionDir,
+ final Map<String, String> actionData) throws IOException, InterruptedException {
+ ugi.doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ Path finalPath = new Path(actionDir, LauncherAM.ACTION_DATA_SEQUENCE_FILE);
+ // upload into sequence file
+ System.out.println("Oozie Launcher, uploading action data to HDFS sequence file: " + finalPath.toUri());
+
+ try (SequenceFile.Writer wr =
+ seqFileWriterFactory.createSequenceFileWriter(launcherJobConf, finalPath, Text.class, Text.class)) {
+
+ if (wr != null) {
+ for (Entry<String, String> entry : actionData.entrySet()) {
+ wr.append(new Text(entry.getKey()), new Text(entry.getValue()));
+ }
+ } else {
+ throw new IOException("SequenceFile.Writer is null for " + finalPath);
+ }
+ }
+
+ return null;
+ }
+ });
+ }
+
+ public boolean fileExists(final Path path, final Configuration launcherJobConf) throws IOException, InterruptedException {
+ return ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
+ @Override
+ public Boolean run() throws Exception {
+ FileSystem fs = FileSystem.get(path.toUri(), launcherJobConf);
+ return fs.exists(path);
+ }
+ });
+ }
+
+ public void writeStringToFile(final Path path, final Configuration conf, final String contents)
+ throws IOException, InterruptedException {
+ ugi.doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ try (FileSystem fs = FileSystem.get(path.toUri(), conf);
+ java.io.Writer writer = new OutputStreamWriter(fs.create(path), DEFAULT_CHARSET)) {
+ writer.write(contents);
+ }
+
+ return null;
+ }
+ });
+ }
+
+ public String readFileContents(final Path path, final Configuration conf) throws IOException, InterruptedException {
+ return ugi.doAs(new PrivilegedExceptionAction<String>() {
+ @Override
+ public String run() throws Exception {
+ StringBuilder sb = new StringBuilder();
+
+ try (FileSystem fs = FileSystem.get(path.toUri(), conf);
+ InputStream is = fs.open(path);
+ BufferedReader reader = new BufferedReader(new InputStreamReader(is, DEFAULT_CHARSET))) {
+
+ String contents;
+ while ((contents = reader.readLine()) != null) {
+ sb.append(contents);
+ }
+ }
+
+ return sb.toString();
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/JavaMain.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/JavaMain.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/JavaMain.java
index 30d68e2..c3e3d3f 100644
--- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/JavaMain.java
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/JavaMain.java
@@ -44,11 +44,11 @@ public class JavaMain extends LauncherMain {
setApplicationTags(actionConf, TEZ_APPLICATION_TAGS);
setApplicationTags(actionConf, SPARK_YARN_TAGS);
- LauncherMainHadoopUtils.killChildYarnJobs(actionConf);
+ LauncherMain.killChildYarnJobs(actionConf);
Class<?> klass = actionConf.getClass(JAVA_MAIN_CLASS, Object.class);
- System.out.println("Main class : " + klass.getName());
- LauncherMapper.printArgs("Arguments :", args);
+ System.out.println("Java action main class : " + klass.getName());
+ printArgs("Java action arguments :", args);
System.out.println();
Method mainMethod = klass.getMethod("main", String[].class);
try {
@@ -60,4 +60,13 @@ public class JavaMain extends LauncherMain {
}
+ /**
+ * Used by JavaMain to wrap a Throwable when an Exception occurs
+ */
+ @SuppressWarnings("serial")
+ static class JavaMainException extends Exception {
+ public JavaMainException(Throwable t) {
+ super(t);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java
new file mode 100644
index 0000000..4f252d1
--- /dev/null
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java
@@ -0,0 +1,614 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.security.Permission;
+import java.security.PrivilegedExceptionAction;
+import java.text.MessageFormat;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.StringTokenizer;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+
+public class LauncherAM {
+ private static final String OOZIE_ACTION_CONF_XML = "oozie.action.conf.xml";
+ private static final String OOZIE_LAUNCHER_JOB_ID = "oozie.launcher.job.id";
+
+ public static final String JAVA_CLASS_PATH = "java.class.path";
+ public static final String OOZIE_ACTION_ID = "oozie.action.id";
+ public static final String OOZIE_JOB_ID = "oozie.job.id";
+ public static final String ACTION_PREFIX = "oozie.action.";
+ static final String OOZIE_ACTION_RECOVERY_ID = ACTION_PREFIX + "recovery.id";
+ public static final String CONF_OOZIE_ACTION_MAX_OUTPUT_DATA = ACTION_PREFIX + "max.output.data";
+ public static final String CONF_OOZIE_ACTION_MAIN_ARG_PREFIX = ACTION_PREFIX + "main.arg.";
+ public static final String CONF_OOZIE_ACTION_MAIN_ARG_COUNT = CONF_OOZIE_ACTION_MAIN_ARG_PREFIX + "count";
+ public static final String CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE = "oozie.external.stats.max.size";
+ public static final String OOZIE_ACTION_DIR_PATH = ACTION_PREFIX + "dir.path";
+ public static final String ACTION_PREPARE_XML = ACTION_PREFIX + "prepare.xml";
+ public static final String ACTION_DATA_SEQUENCE_FILE = "action-data.seq"; // COMBO FILE
+ public static final String ACTION_DATA_EXTERNAL_CHILD_IDS = "externalChildIDs";
+ public static final String ACTION_DATA_OUTPUT_PROPS = "output.properties";
+ public static final String ACTION_DATA_STATS = "stats.properties";
+ public static final String ACTION_DATA_NEW_ID = "newId";
+ public static final String ACTION_DATA_ERROR_PROPS = "error.properties";
+ public static final String CONF_OOZIE_ACTION_MAIN_CLASS = "oozie.launcher.action.main.class";
+
+ // TODO: OYA: more unique file names? action.xml may be stuck for backwards compat though
+ public static final String LAUNCHER_JOB_CONF_XML = "launcher.xml";
+ public static final String ACTION_CONF_XML = "action.xml";
+ public static final String ACTION_DATA_FINAL_STATUS = "final.status";
+
+ private final UserGroupInformation ugi;
+ private final AMRMCallBackHandler callbackHandler;
+ private final AMRMClientAsyncFactory amRmClientAsyncFactory;
+ private final HdfsOperations hdfsOperations;
+ private final LocalFsOperations localFsOperations;
+ private final PrepareActionsHandler prepareHandler;
+ private final LauncherAMCallbackNotifierFactory callbackNotifierFactory;
+ private final LauncherSecurityManager launcherSecurityManager;
+ private final ContainerId containerId;
+
+ private Configuration launcherJobConf;
+ private AMRMClientAsync<?> amRmClientAsync;
+ private Path actionDir;
+ private Map<String, String> actionData = new HashMap<String,String>();
+
+ public LauncherAM(UserGroupInformation ugi,
+ AMRMClientAsyncFactory amRmClientAsyncFactory,
+ AMRMCallBackHandler callbackHandler,
+ HdfsOperations hdfsOperations,
+ LocalFsOperations localFsOperations,
+ PrepareActionsHandler prepareHandler,
+ LauncherAMCallbackNotifierFactory callbackNotifierFactory,
+ LauncherSecurityManager launcherSecurityManager,
+ String containerId) {
+ this.ugi = Preconditions.checkNotNull(ugi, "ugi should not be null");
+ this.amRmClientAsyncFactory = Preconditions.checkNotNull(amRmClientAsyncFactory,
+ "amRmClientAsyncFactory should not be null");
+ this.callbackHandler = Preconditions.checkNotNull(callbackHandler, "callbackHandler should not be null");
+ this.hdfsOperations = Preconditions.checkNotNull(hdfsOperations, "hdfsOperations should not be null");
+ this.localFsOperations = Preconditions.checkNotNull(localFsOperations, "localFsOperations should not be null");
+ this.prepareHandler = Preconditions.checkNotNull(prepareHandler, "prepareHandler should not be null");
+ this.callbackNotifierFactory = Preconditions.checkNotNull(callbackNotifierFactory,
+ "callbackNotifierFactory should not be null");
+ this.launcherSecurityManager = Preconditions.checkNotNull(launcherSecurityManager,
+ "launcherSecurityManager should not be null");
+ this.containerId = ContainerId.fromString(Preconditions.checkNotNull(containerId, "containerId should not be null"));
+ }
+
+ public static void main(String[] args) throws Exception {
+ UserGroupInformation ugi = null;
+ String submitterUser = System.getProperty("submitter.user", "").trim();
+ Preconditions.checkArgument(!submitterUser.isEmpty(), "Submitter user is undefined");
+ System.out.println("Submitter user is: " + submitterUser);
+
+ // We don't need remote/proxy user if the current login user is the workflow submitter
+ // Otherwise we have to create a remote user
+ if (UserGroupInformation.getLoginUser().getShortUserName().equals(submitterUser)) {
+ System.out.println("Using login user for UGI");
+ ugi = UserGroupInformation.getLoginUser();
+ } else {
+ ugi = UserGroupInformation.createRemoteUser(submitterUser);
+ ugi.addCredentials(UserGroupInformation.getLoginUser().getCredentials());
+ }
+
+ AMRMClientAsyncFactory amRmClientAsyncFactory = new AMRMClientAsyncFactory();
+ AMRMCallBackHandler callbackHandler = new AMRMCallBackHandler();
+ HdfsOperations hdfsOperations = new HdfsOperations(new SequenceFileWriterFactory(), ugi);
+ LocalFsOperations localFSOperations = new LocalFsOperations();
+ PrepareActionsHandler prepareHandler = new PrepareActionsHandler();
+ LauncherAMCallbackNotifierFactory callbackNotifierFactory = new LauncherAMCallbackNotifierFactory();
+ LauncherSecurityManager launcherSecurityManager = new LauncherSecurityManager();
+
+ LauncherAM launcher = new LauncherAM(ugi,
+ amRmClientAsyncFactory,
+ callbackHandler,
+ hdfsOperations,
+ localFSOperations,
+ prepareHandler,
+ callbackNotifierFactory,
+ launcherSecurityManager,
+ System.getenv("CONTAINER_ID"));
+
+ launcher.run();
+ }
+
+ public void run() throws Exception {
+ final ErrorHolder errorHolder = new ErrorHolder();
+ OozieActionResult actionResult = OozieActionResult.FAILED;
+ boolean launcherExecutedProperly = false;
+ boolean backgroundAction = false;
+
+ try {
+ try {
+ launcherJobConf = localFsOperations.readLauncherConf();
+ System.out.println("Launcher AM configuration loaded");
+ } catch (Exception ex) {
+ errorHolder.setErrorMessage("Could not load the Launcher AM configuration file");
+ errorHolder.setErrorCause(ex);
+ throw ex;
+ }
+ actionDir = new Path(launcherJobConf.get(OOZIE_ACTION_DIR_PATH));
+
+ registerWithRM();
+ executePrepare(ugi, errorHolder);
+ final String[] mainArgs = getMainArguments(launcherJobConf);
+ printDebugInfo();
+ setupMainConfiguration();
+ launcherExecutedProperly = runActionMain(mainArgs, errorHolder, ugi);
+
+ if (launcherExecutedProperly) {
+ handleActionData();
+ if (actionData.get(ACTION_DATA_OUTPUT_PROPS) != null) {
+ System.out.println();
+ System.out.println("Oozie Launcher, capturing output data:");
+ System.out.println("=======================");
+ System.out.println(actionData.get(ACTION_DATA_OUTPUT_PROPS));
+ System.out.println();
+ System.out.println("=======================");
+ System.out.println();
+ }
+ if (actionData.get(ACTION_DATA_NEW_ID) != null) {
+ System.out.println();
+ System.out.println("Oozie Launcher, propagating new Hadoop job id to Oozie");
+ System.out.println("=======================");
+ System.out.println(actionData.get(ACTION_DATA_NEW_ID));
+ System.out.println("=======================");
+ System.out.println();
+ backgroundAction = true;
+ }
+ }
+ } catch (Exception e) {
+ System.out.println("Launcher AM execution failed");
+ System.err.println("Launcher AM execution failed");
+ e.printStackTrace(System.out);
+ e.printStackTrace(System.err);
+ launcherExecutedProperly = false;
+ if (!errorHolder.isPopulated()) {
+ errorHolder.setErrorCause(e);
+ errorHolder.setErrorMessage(e.getMessage());
+ }
+ throw e;
+ } finally {
+ try {
+ ErrorHolder callbackErrorHolder = callbackHandler.getError();
+
+ if (launcherExecutedProperly) {
+ actionResult = backgroundAction ? OozieActionResult.RUNNING : OozieActionResult.SUCCEEDED;
+ }
+
+ if (!launcherExecutedProperly) {
+ updateActionDataWithFailure(errorHolder, actionData);
+ } else if (callbackErrorHolder != null) { // async error from the callback
+ actionResult = OozieActionResult.FAILED;
+ updateActionDataWithFailure(callbackErrorHolder, actionData);
+ }
+
+ actionData.put(ACTION_DATA_FINAL_STATUS, actionResult.toString());
+ hdfsOperations.uploadActionDataToHDFS(launcherJobConf, actionDir, actionData);
+ } finally {
+ try {
+ unregisterWithRM(actionResult, errorHolder.getErrorMessage());
+ } finally {
+ LauncherAMCallbackNotifier cn = callbackNotifierFactory.createCallbackNotifier(launcherJobConf);
+ cn.notifyURL(actionResult);
+ }
+ }
+ }
+ }
+
+ @VisibleForTesting
+ Map<String, String> getActionData() {
+ return actionData;
+ }
+
+ private void printDebugInfo() throws IOException {
+ localFsOperations.printContentsOfDir(new File("."));
+
+ System.out.println();
+ System.out.println("Oozie Launcher Application Master configuration");
+ System.out.println("===============================================");
+ System.out.println("Workflow job id : " + launcherJobConf.get(OOZIE_JOB_ID));
+ System.out.println("Workflow action id: " + launcherJobConf.get(OOZIE_ACTION_ID));
+ System.out.println();
+ System.out.println("Classpath :");
+ System.out.println("------------------------");
+ StringTokenizer st = new StringTokenizer(System.getProperty(JAVA_CLASS_PATH), ":");
+ while (st.hasMoreTokens()) {
+ System.out.println(" " + st.nextToken());
+ }
+ System.out.println("------------------------");
+ System.out.println();
+ String mainClass = launcherJobConf.get(CONF_OOZIE_ACTION_MAIN_CLASS);
+ System.out.println("Main class : " + mainClass);
+ System.out.println();
+ System.out.println("Maximum output : "
+ + launcherJobConf.getInt(CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, 2 * 1024));
+ System.out.println();
+
+ System.out.println();
+ System.out.println("Java System Properties:");
+ System.out.println("------------------------");
+ System.getProperties().store(System.out, "");
+ System.out.println("------------------------");
+ System.out.println();
+
+ System.out.println("Environment variables");
+ Map<String, String> env = System.getenv();
+ System.out.println("------------------------");
+ for (Map.Entry<String, String> entry : env.entrySet()) {
+ System.out.println(entry.getKey() + "=" + entry.getValue());
+ }
+ System.out.println("------------------------");
+ System.out.println("=================================================================");
+ System.out.println();
+ System.out.println(">>> Invoking Main class now >>>");
+ System.out.println();
+ }
+
+ private void registerWithRM() throws IOException, YarnException {
+ // TODO: OYA: make heartbeat interval configurable & make interval higher to put less load on RM, but lower than timeout
+ amRmClientAsync = amRmClientAsyncFactory.createAMRMClientAsync(60000);
+ amRmClientAsync.init(new Configuration(launcherJobConf));
+ amRmClientAsync.start();
+
+ // hostname and tracking url are determined automatically
+ amRmClientAsync.registerApplicationMaster("", 0, "");
+ }
+
+ private void unregisterWithRM(OozieActionResult actionResult, String message) throws YarnException, IOException {
+ if (amRmClientAsync != null) {
+ System.out.println("Stopping AM");
+ try {
+ message = (message == null) ? "" : message;
+ // tracking url is determined automatically
+ amRmClientAsync.unregisterApplicationMaster(actionResult.getYarnStatus(), message, "");
+ } catch (Exception ex) {
+ System.out.println("Error un-registering AM client");
+ throw ex;
+ } finally {
+ amRmClientAsync.stop();
+ }
+ }
+ }
+
+ // Method to execute the prepare actions
+ private void executePrepare(UserGroupInformation ugi, ErrorHolder errorHolder) throws Exception {
+ try {
+ System.out.println("\nStarting the execution of prepare actions");
+ ugi.doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ String prepareXML = launcherJobConf.get(ACTION_PREPARE_XML);
+ if (prepareXML != null) {
+ if (prepareXML.length() != 0) {
+ Configuration actionConf = new Configuration(launcherJobConf);
+ actionConf.addResource(ACTION_CONF_XML);
+ prepareHandler.prepareAction(prepareXML, actionConf);
+ } else {
+ System.out.println("There are no prepare actions to execute.");
+ }
+ }
+ return null;
+ }
+ });
+ System.out.println("Completed the execution of prepare actions successfully");
+ } catch (Exception ex) {
+ errorHolder.setErrorMessage("Prepare execution in the Launcher AM has failed");
+ errorHolder.setErrorCause(ex);
+ throw ex;
+ }
+ }
+
+ private void setupMainConfiguration() throws IOException {
+ System.setProperty(OOZIE_LAUNCHER_JOB_ID, launcherJobConf.get(OOZIE_JOB_ID));
+ System.setProperty(OOZIE_JOB_ID, launcherJobConf.get(OOZIE_JOB_ID));
+ System.setProperty(OOZIE_ACTION_ID, launcherJobConf.get(OOZIE_ACTION_ID));
+ System.setProperty(OOZIE_ACTION_CONF_XML, new File(ACTION_CONF_XML).getAbsolutePath());
+ System.setProperty(ACTION_PREFIX + ACTION_DATA_EXTERNAL_CHILD_IDS,
+ new File(ACTION_DATA_EXTERNAL_CHILD_IDS).getAbsolutePath());
+ System.setProperty(ACTION_PREFIX + ACTION_DATA_STATS, new File(ACTION_DATA_STATS).getAbsolutePath());
+ System.setProperty(ACTION_PREFIX + ACTION_DATA_NEW_ID, new File(ACTION_DATA_NEW_ID).getAbsolutePath());
+ System.setProperty(ACTION_PREFIX + ACTION_DATA_OUTPUT_PROPS, new File(ACTION_DATA_OUTPUT_PROPS).getAbsolutePath());
+ System.setProperty(ACTION_PREFIX + ACTION_DATA_ERROR_PROPS, new File(ACTION_DATA_ERROR_PROPS).getAbsolutePath());
+
+ System.setProperty("oozie.job.launch.time", String.valueOf(System.currentTimeMillis()));
+ }
+
+ private boolean runActionMain(final String[] mainArgs, final ErrorHolder eHolder, UserGroupInformation ugi) throws Exception {
+ // using AtomicBoolean because we want to modify it inside run()
+ final AtomicBoolean actionMainExecutedProperly = new AtomicBoolean(false);
+
+ ugi.doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ try {
+ setRecoveryId();
+ Class<?> klass = launcherJobConf.getClass(CONF_OOZIE_ACTION_MAIN_CLASS, null);
+ Preconditions.checkNotNull(klass, "Launcher class should not be null");
+ System.out.println("Launcher class: " + klass.toString());
+ Method mainMethod = klass.getMethod("main", String[].class);
+ // Enable LauncherSecurityManager to catch System.exit calls
+ launcherSecurityManager.enable();
+ mainMethod.invoke(null, (Object) mainArgs);
+
+ System.out.println();
+ System.out.println("<<< Invocation of Main class completed <<<");
+ System.out.println();
+ actionMainExecutedProperly.set(true);
+ } catch (InvocationTargetException ex) {
+ ex.printStackTrace(System.out);
+ // Get what actually caused the exception
+ Throwable cause = ex.getCause();
+ // If we got a JavaMainException from JavaMain, then we need to unwrap it
+ if (JavaMain.JavaMainException.class.isInstance(cause)) {
+ cause = cause.getCause();
+ }
+ if (LauncherMainException.class.isInstance(cause)) {
+ int errorCode = ((LauncherMainException) ex.getCause()).getErrorCode();
+ String mainClass = launcherJobConf.get(CONF_OOZIE_ACTION_MAIN_CLASS);
+ eHolder.setErrorMessage("Main Class [" + mainClass + "], exit code [" +
+ errorCode + "]");
+ eHolder.setErrorCode(errorCode);
+ } else if (SecurityException.class.isInstance(cause)) {
+ if (launcherSecurityManager.getExitInvoked()) {
+ final int exitCode = launcherSecurityManager.getExitCode();
+ System.out.println("Intercepting System.exit(" + exitCode + ")");
+ // if 0 main() method finished successfully
+ // ignoring
+ eHolder.setErrorCode(exitCode);
+ if (exitCode != 0) {
+ String mainClass = launcherJobConf.get(CONF_OOZIE_ACTION_MAIN_CLASS);
+ eHolder.setErrorMessage("Main Class [" + mainClass + "],"
+ + " exit code [" + eHolder.getErrorCode() + "]");
+ } else {
+ actionMainExecutedProperly.set(true);
+ }
+ } else {
+ // just SecurityException, no exit was invoked
+ eHolder.setErrorCode(0);
+ eHolder.setErrorCause(cause);
+ eHolder.setErrorMessage(cause.getMessage());
+ }
+ } else {
+ eHolder.setErrorMessage(cause.getMessage());
+ eHolder.setErrorCause(cause);
+ }
+ } catch (Throwable t) {
+ t.printStackTrace();
+ eHolder.setErrorMessage(t.getMessage());
+ eHolder.setErrorCause(t);
+ } finally {
+ // Disable LauncherSecurityManager
+ launcherSecurityManager.disable();
+ }
+
+ return null;
+ }
+ });
+
+ return actionMainExecutedProperly.get();
+ }
+
+ private void setRecoveryId() throws LauncherException {
+ try {
+ ApplicationId applicationId = containerId.getApplicationAttemptId().getApplicationId();
+ String applicationIdStr = applicationId.toString();
+
+ String recoveryId = Preconditions.checkNotNull(launcherJobConf.get(OOZIE_ACTION_RECOVERY_ID),
+ "RecoveryID should not be null");
+
+ Path path = new Path(actionDir, recoveryId);
+ if (!hdfsOperations.fileExists(path, launcherJobConf)) {
+ hdfsOperations.writeStringToFile(path, launcherJobConf, applicationIdStr);
+ } else {
+ String id = hdfsOperations.readFileContents(path, launcherJobConf);
+
+ if (!applicationIdStr.equals(id)) {
+ throw new LauncherException(MessageFormat.format(
+ "YARN Id mismatch, action file [{0}] declares Id [{1}] current Id [{2}]", path, id,
+ applicationIdStr));
+ }
+ }
+ } catch (RuntimeException | InterruptedException | IOException ex) {
+ throw new LauncherException("IO error", ex);
+ }
+ }
+
+ private void handleActionData() throws IOException {
+ // external child IDs
+ processActionData(ACTION_PREFIX + ACTION_DATA_EXTERNAL_CHILD_IDS, null,
+ ACTION_DATA_EXTERNAL_CHILD_IDS, -1);
+
+ // external stats
+ processActionData(ACTION_PREFIX + ACTION_DATA_STATS, CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE,
+ ACTION_DATA_STATS, Integer.MAX_VALUE);
+
+ // output data
+ processActionData(ACTION_PREFIX + ACTION_DATA_OUTPUT_PROPS, CONF_OOZIE_ACTION_MAX_OUTPUT_DATA,
+ ACTION_DATA_OUTPUT_PROPS, 2048);
+
+ // id swap
+ processActionData(ACTION_PREFIX + ACTION_DATA_NEW_ID, null,
+ ACTION_DATA_NEW_ID, -1);
+ }
+
+ private void processActionData(String propertyName, String maxSizePropertyName, String actionDataPropertyName,
+ int maxSizeDefault) throws IOException {
+ String propValue = System.getProperty(propertyName);
+ int maxSize = maxSizeDefault;
+
+ if (maxSizePropertyName != null) {
+ maxSize = launcherJobConf.getInt(maxSizePropertyName, maxSizeDefault);
+ }
+
+ if (propValue != null) {
+ File actionDataFile = new File(propValue);
+ if (localFsOperations.fileExists(actionDataFile)) {
+ actionData.put(actionDataPropertyName, localFsOperations.getLocalFileContentAsString(actionDataFile,
+ actionDataPropertyName, maxSize));
+ }
+ }
+ }
+
+ private void updateActionDataWithFailure(ErrorHolder eHolder, Map<String, String> actionData) {
+ if (eHolder.getErrorCause() != null && eHolder.getErrorCause().getMessage() != null) {
+ if (Objects.equal(eHolder.getErrorMessage(), eHolder.getErrorCause().getMessage())) {
+ eHolder.setErrorMessage(eHolder.getErrorMessage());
+ } else {
+ eHolder.setErrorMessage(eHolder.getErrorMessage() + ", " + eHolder.getErrorCause().getMessage());
+ }
+ }
+
+ Properties errorProps = new Properties();
+ errorProps.setProperty("error.code", Integer.toString(eHolder.getErrorCode()));
+ String errorMessage = eHolder.getErrorMessage() == null ? "<empty>" : eHolder.getErrorMessage();
+ errorProps.setProperty("error.reason", errorMessage);
+ if (eHolder.getErrorCause() != null) {
+ if (eHolder.getErrorCause().getMessage() != null) {
+ errorProps.setProperty("exception.message", eHolder.getErrorCause().getMessage());
+ }
+ StringWriter sw = new StringWriter();
+ PrintWriter pw = new PrintWriter(sw);
+ eHolder.getErrorCause().printStackTrace(pw);
+ pw.close();
+ errorProps.setProperty("exception.stacktrace", sw.toString());
+ }
+
+ StringWriter sw = new StringWriter();
+ try {
+ errorProps.store(sw, "");
+ sw.close();
+ actionData.put(LauncherAM.ACTION_DATA_ERROR_PROPS, sw.toString());
+
+ // external child IDs
+ String externalChildIdsProp = System.getProperty(LauncherAM.ACTION_PREFIX + LauncherAM.ACTION_DATA_EXTERNAL_CHILD_IDS);
+ if (externalChildIdsProp != null) {
+ File externalChildIDs = new File(externalChildIdsProp);
+ if (localFsOperations.fileExists(externalChildIDs)) {
+ actionData.put(LauncherAM.ACTION_DATA_EXTERNAL_CHILD_IDS,
+ localFsOperations.getLocalFileContentAsString(externalChildIDs, ACTION_DATA_EXTERNAL_CHILD_IDS, -1));
+ }
+ }
+ } catch (IOException ioe) {
+ System.out.println("A problem occured trying to fail the launcher");
+ ioe.printStackTrace();
+ } finally {
+ System.out.print("Failing Oozie Launcher, " + eHolder.getErrorMessage() + "\n");
+ if (eHolder.getErrorCause() != null) {
+ eHolder.getErrorCause().printStackTrace(System.out);
+ }
+ }
+ }
+
+ private String[] getMainArguments(Configuration conf) {
+ return LauncherMapper.getMainArguments(conf);
+ }
+
+ public static class LauncherSecurityManager extends SecurityManager {
+ private boolean exitInvoked;
+ private int exitCode;
+ private SecurityManager originalSecurityManager;
+
+ public LauncherSecurityManager() {
+ exitInvoked = false;
+ exitCode = 0;
+ originalSecurityManager = System.getSecurityManager();
+ }
+
+ @Override
+ public void checkPermission(Permission perm, Object context) {
+ if (originalSecurityManager != null) {
+ // check everything with the original SecurityManager
+ originalSecurityManager.checkPermission(perm, context);
+ }
+ }
+
+ @Override
+ public void checkPermission(Permission perm) {
+ if (originalSecurityManager != null) {
+ // check everything with the original SecurityManager
+ originalSecurityManager.checkPermission(perm);
+ }
+ }
+
+ @Override
+ public void checkExit(int status) throws SecurityException {
+ exitInvoked = true;
+ exitCode = status;
+ throw new SecurityException("Intercepted System.exit(" + status + ")");
+ }
+
+ public boolean getExitInvoked() {
+ return exitInvoked;
+ }
+
+ public int getExitCode() {
+ return exitCode;
+ }
+
+ public void enable() {
+ if (System.getSecurityManager() != this) {
+ System.setSecurityManager(this);
+ }
+ }
+
+ public void disable() {
+ if (System.getSecurityManager() == this) {
+ System.setSecurityManager(originalSecurityManager);
+ }
+ }
+ }
+
+ public enum OozieActionResult {
+ SUCCEEDED(FinalApplicationStatus.SUCCEEDED),
+ FAILED(FinalApplicationStatus.FAILED),
+ RUNNING(FinalApplicationStatus.SUCCEEDED);
+
+ // YARN-equivalent status
+ private FinalApplicationStatus yarnStatus;
+
+ OozieActionResult(FinalApplicationStatus yarnStatus) {
+ this.yarnStatus = yarnStatus;
+ }
+
+ public FinalApplicationStatus getYarnStatus() {
+ return yarnStatus;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifier.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifier.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifier.java
new file mode 100644
index 0000000..2972658
--- /dev/null
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifier.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.action.hadoop.LauncherAM.OozieActionResult;
+
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.net.Proxy;
+import java.net.URL;
+
+// Adapted from org.apache.hadoop.mapreduce.v2.app.JobEndNotifier
+/**
+ * This call sends back an HTTP GET callback to the configured URL. It is meant for the {@link LauncherAM} to notify the
+ * Oozie Server that it has finished.
+ */
+public class LauncherAMCallbackNotifier {
+ private static final String OOZIE_LAUNCHER_CALLBACK = "oozie.launcher.callback.";
+ private static final int OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL_MAX = 5000;
+
+ public static final String OOZIE_LAUNCHER_CALLBACK_RETRY_ATTEMPTS = OOZIE_LAUNCHER_CALLBACK + "retry.attempts";
+ public static final String OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL = OOZIE_LAUNCHER_CALLBACK + "retry.interval";
+ public static final String OOZIE_LAUNCHER_CALLBACK_MAX_ATTEMPTS = OOZIE_LAUNCHER_CALLBACK + "max.attempts";
+ public static final String OOZIE_LAUNCHER_CALLBACK_TIMEOUT = OOZIE_LAUNCHER_CALLBACK + "timeout";
+ public static final String OOZIE_LAUNCHER_CALLBACK_URL = OOZIE_LAUNCHER_CALLBACK + "url";
+ public static final String OOZIE_LAUNCHER_CALLBACK_PROXY = OOZIE_LAUNCHER_CALLBACK + "proxy";
+ public static final String OOZIE_LAUNCHER_CALLBACK_JOBSTATUS_TOKEN = "$jobStatus";
+
+ protected String userUrl;
+ protected String proxyConf;
+ protected int numTries; //Number of tries to attempt notification
+ protected int waitInterval; //Time (ms) to wait between retrying notification
+ protected int timeout; // Timeout (ms) on the connection and notification
+ protected URL urlToNotify; //URL to notify read from the config
+ protected Proxy proxyToUse = Proxy.NO_PROXY; //Proxy to use for notification
+
+
+ /**
+ * Parse the URL that needs to be notified of the end of the job, along
+ * with the number of retries in case of failure, the amount of time to
+ * wait between retries and proxy settings
+ * @param conf the configuration
+ */
+ public LauncherAMCallbackNotifier(Configuration conf) {
+ numTries = Math.min(conf.getInt(OOZIE_LAUNCHER_CALLBACK_RETRY_ATTEMPTS, 0) + 1,
+ conf.getInt(OOZIE_LAUNCHER_CALLBACK_MAX_ATTEMPTS, 1));
+
+ waitInterval = Math.min(conf.getInt(OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL, OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL_MAX),
+ OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL_MAX);
+ waitInterval = (waitInterval < 0) ? OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL_MAX : waitInterval;
+
+ timeout = conf.getInt(OOZIE_LAUNCHER_CALLBACK_TIMEOUT, OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL_MAX);
+
+ userUrl = conf.get(OOZIE_LAUNCHER_CALLBACK_URL);
+
+ proxyConf = conf.get(OOZIE_LAUNCHER_CALLBACK_PROXY);
+
+ //Configure the proxy to use if its set. It should be set like
+ //proxyType@proxyHostname:port
+ if(proxyConf != null && !proxyConf.equals("") &&
+ proxyConf.lastIndexOf(":") != -1) {
+ int typeIndex = proxyConf.indexOf("@");
+ Proxy.Type proxyType = Proxy.Type.HTTP;
+ if(typeIndex != -1 &&
+ proxyConf.substring(0, typeIndex).compareToIgnoreCase("socks") == 0) {
+ proxyType = Proxy.Type.SOCKS;
+ }
+ String hostname = proxyConf.substring(typeIndex + 1,
+ proxyConf.lastIndexOf(":"));
+ String portConf = proxyConf.substring(proxyConf.lastIndexOf(":") + 1);
+ try {
+ int port = Integer.parseInt(portConf);
+ proxyToUse = new Proxy(proxyType,
+ new InetSocketAddress(hostname, port));
+ System.out.println("Callback notification using proxy type \"" + proxyType +
+ "\" hostname \"" + hostname + "\" and port \"" + port + "\"");
+ } catch(NumberFormatException nfe) {
+ System.err.println("Callback notification couldn't parse configured proxy's port "
+ + portConf + ". Not going to use a proxy");
+ }
+ }
+
+ }
+
+ /**
+ * Notify the URL just once. Use best effort.
+ */
+ protected boolean notifyURLOnce() {
+ boolean success = false;
+ HttpURLConnection conn = null;
+ try {
+ System.out.println("Callback notification trying " + urlToNotify);
+ conn = (HttpURLConnection) urlToNotify.openConnection(proxyToUse);
+ conn.setConnectTimeout(timeout);
+ conn.setReadTimeout(timeout);
+ conn.setAllowUserInteraction(false);
+ if(conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
+ System.err.println("Callback notification to " + urlToNotify +" failed with code: "
+ + conn.getResponseCode() + " and message \"" + conn.getResponseMessage()
+ +"\"");
+ }
+ else {
+ success = true;
+ System.out.println("Callback notification to " + urlToNotify + " succeeded");
+ }
+ } catch(IOException ioe) {
+ System.err.println("Callback notification to " + urlToNotify + " failed");
+ ioe.printStackTrace();
+ } finally {
+ if (conn != null) {
+ conn.disconnect();
+ }
+ }
+ return success;
+ }
+
+ /**
+ * Notify a server of the completion of a submitted job.
+ * @param actionResult The Action Result (failed/succeeded/running)
+ *
+ * @throws InterruptedException
+ */
+ public void notifyURL(OozieActionResult actionResult) throws InterruptedException {
+ // Do we need job-end notification?
+ if (userUrl == null) {
+ System.out.println("Callback notification URL not set, skipping.");
+ return;
+ }
+
+ //Do string replacements for final status
+ if (userUrl.contains(OOZIE_LAUNCHER_CALLBACK_JOBSTATUS_TOKEN)) {
+ userUrl = userUrl.replace(OOZIE_LAUNCHER_CALLBACK_JOBSTATUS_TOKEN, actionResult.toString());
+ }
+
+ // Create the URL, ensure sanity
+ try {
+ urlToNotify = new URL(userUrl);
+ } catch (MalformedURLException mue) {
+ System.err.println("Callback notification couldn't parse " + userUrl);
+ mue.printStackTrace();
+ return;
+ }
+
+ // Send notification
+ boolean success = false;
+ while (numTries-- > 0 && !success) {
+ System.out.println("Callback notification attempts left " + numTries);
+ success = notifyURLOnce();
+ if (!success) {
+ Thread.sleep(waitInterval);
+ }
+ }
+ if (!success) {
+ System.err.println("Callback notification failed to notify : " + urlToNotify);
+ } else {
+ System.out.println("Callback notification succeeded");
+ }
+ }
+}