You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by pb...@apache.org on 2016/12/13 13:19:52 UTC
[21/48] oozie git commit: OOZIE-2591 Fix recovery handling
OOZIE-2591 Fix recovery handling
Change-Id: I7501411b2bdcdc1962e5ac77082a71c96b205902
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/3b6daff5
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/3b6daff5
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/3b6daff5
Branch: refs/heads/oya
Commit: 3b6daff59a58c5b5b30a123c4ca75d6c20b4e30d
Parents: ba68347
Author: Peter Bacsko <pb...@cloudera.com>
Authored: Fri Nov 18 11:13:00 2016 +0100
Committer: Peter Bacsko <pb...@cloudera.com>
Committed: Mon Nov 21 14:28:36 2016 +0100
----------------------------------------------------------------------
.../oozie/action/hadoop/JavaActionExecutor.java | 81 ++++++++-----------
.../oozie/action/hadoop/LauncherMainTester.java | 1 +
.../oozie/service/TestRecoveryService.java | 26 ++-----
.../oozie/action/hadoop/HdfsOperations.java | 50 ++++++++++++
.../apache/oozie/action/hadoop/LauncherAM.java | 78 +++++++++++--------
.../oozie/action/hadoop/TestLauncherAM.java | 82 +++++++++++++++++++-
6 files changed, 211 insertions(+), 107 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/3b6daff5/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
index 284690b..2ec5266 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.AccessControlException;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobID;
@@ -103,6 +104,7 @@ import org.jdom.JDOMException;
import org.jdom.Namespace;
import com.google.common.collect.ImmutableList;
+import com.google.common.io.Closeables;
public class JavaActionExecutor extends ActionExecutor {
@@ -958,6 +960,7 @@ public class JavaActionExecutor extends ActionExecutor {
public void submitLauncher(FileSystem actionFs, final Context context, WorkflowAction action) throws ActionExecutorException {
JobClient jobClient = null;
boolean exception = false;
+ YarnClient yarnClient = null;
try {
Path appPathRoot = new Path(context.getWorkflow().getAppPath());
@@ -1014,23 +1017,23 @@ public class JavaActionExecutor extends ActionExecutor {
}
JobConf launcherJobConf = createLauncherConf(actionFs, context, action, actionXml, actionConf);
- boolean alreadyRunning = false;
- String launcherId = null;
- String consoleUrl = null;
- // TODO: OYA: equivalent of this? (recovery, alreadyRunning) When does this happen?
-// LOG.debug("Creating Job Client for action " + action.getId());
-// jobClient = createJobClient(context, launcherJobConf);
-// launcherId = LauncherMapperHelper.getRecoveryId(launcherJobConf, context.getActionDir(), context
-// .getRecoveryId());
-// alreadyRunning = launcherId != null;
- RunningJob runningJob;
+ String consoleUrl;
+ String launcherId = LauncherMapperHelper.getRecoveryId(launcherJobConf, context.getActionDir(), context
+ .getRecoveryId());
+ boolean alreadyRunning = launcherId != null;
// if user-retry is on, always submit new launcher
boolean isUserRetry = ((WorkflowActionBean)action).isUserRetry();
+ yarnClient = createYarnClient(context, launcherJobConf);
if (alreadyRunning && !isUserRetry) {
- runningJob = jobClient.getJob(JobID.forName(launcherId));
- if (runningJob == null) {
+ try {
+ ApplicationId appId = ConverterUtils.toApplicationId(launcherId);
+ ApplicationReport report = yarnClient.getApplicationReport(appId);
+ consoleUrl = report.getTrackingUrl();
+ } catch (RemoteException e) {
+ // caught when the application id does not exist
+ LOG.error("Got RemoteException from YARN", e);
String jobTracker = launcherJobConf.get(HADOOP_YARN_RM);
throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA017",
"unknown job [{0}@{1}], cannot recover", launcherId, jobTracker);
@@ -1070,32 +1073,18 @@ public class JavaActionExecutor extends ActionExecutor {
LOG.info("No need to inject credentials.");
}
- YarnClient yarnClient = null;
- try {
- String user = context.getWorkflow().getUser();
-
- // Create application
- yarnClient = createYarnClient(context, launcherJobConf);
- YarnClientApplication newApp = yarnClient.createApplication();
- ApplicationId appId = newApp.getNewApplicationResponse().getApplicationId();
-
- // Create launch context for app master
- ApplicationSubmissionContext appContext =
- createAppSubmissionContext(appId, launcherJobConf, user, context, actionConf);
-
- // Submit the launcher AM
- yarnClient.submitApplication(appContext);
-
- launcherId = appId.toString();
- LOG.debug("After submission get the launcherId [{0}]", launcherId);
- ApplicationReport appReport = yarnClient.getApplicationReport(appId);
- consoleUrl = appReport.getTrackingUrl();
- } finally {
- if (yarnClient != null) {
- yarnClient.close();
- yarnClient = null;
- }
- }
+ String user = context.getWorkflow().getUser();
+
+ YarnClientApplication newApp = yarnClient.createApplication();
+ ApplicationId appId = newApp.getNewApplicationResponse().getApplicationId();
+ ApplicationSubmissionContext appContext =
+ createAppSubmissionContext(appId, launcherJobConf, user, context, actionConf);
+ yarnClient.submitApplication(appContext);
+
+ launcherId = appId.toString();
+ LOG.debug("After submission get the launcherId [{0}]", launcherId);
+ ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+ consoleUrl = appReport.getTrackingUrl();
}
String jobTracker = launcherJobConf.get(HADOOP_YARN_RM);
@@ -1106,6 +1095,10 @@ public class JavaActionExecutor extends ActionExecutor {
throw convertException(ex);
}
finally {
+ if (yarnClient != null) {
+ Closeables.closeQuietly(yarnClient);
+ }
+
if (jobClient != null) {
try {
jobClient.close();
@@ -1126,26 +1119,16 @@ public class JavaActionExecutor extends ActionExecutor {
Context context, Configuration actionConf)
throws IOException, HadoopAccessorException, URISyntaxException {
- // Create launch context for app master
ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class);
- // set the application id
appContext.setApplicationId(appId);
-
- // set the application name
appContext.setApplicationName(launcherJobConf.getJobName());
appContext.setApplicationType("Oozie Launcher");
-
- // Set the priority for the application master
Priority pri = Records.newRecord(Priority.class);
int priority = 0; // TODO: OYA: Add a constant or a config
pri.setPriority(priority);
appContext.setPriority(pri);
-
- // Set the queue to which this application is to be submitted in the RM
appContext.setQueue(launcherJobConf.getQueueName());
-
- // Set up the container launch context for the application master
ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
// Set the resources to localize
@@ -1193,7 +1176,7 @@ public class JavaActionExecutor extends ActionExecutor {
vargs.add("-Dhadoop.root.logger=INFO,CLA");
vargs.add("-Dhadoop.root.logfile=" + TaskLog.LogName.SYSLOG);
vargs.add("-Dsubmitter.user=" + context.getWorkflow().getUser());
- vargs.add("org.apache.oozie.action.hadoop.LauncherAM"); // note: using string temporarily so we don't have to depend on sharelib-oozie
+ vargs.add(LauncherAM.class.getCanonicalName());
vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
Path.SEPARATOR + ApplicationConstants.STDOUT);
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
http://git-wip-us.apache.org/repos/asf/oozie/blob/3b6daff5/core/src/test/java/org/apache/oozie/action/hadoop/LauncherMainTester.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/LauncherMainTester.java b/core/src/test/java/org/apache/oozie/action/hadoop/LauncherMainTester.java
index 4baed6e..c2aae4c 100644
--- a/core/src/test/java/org/apache/oozie/action/hadoop/LauncherMainTester.java
+++ b/core/src/test/java/org/apache/oozie/action/hadoop/LauncherMainTester.java
@@ -30,6 +30,7 @@ public class LauncherMainTester {
if (args.length == 0) {
System.out.println("Hello World!");
}
+
if (args.length == 1) {
if (args[0].equals("throwable")) {
throw new Throwable("throwing throwable");
http://git-wip-us.apache.org/repos/asf/oozie/blob/3b6daff5/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java b/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
index 8fd0c2d..a3270e9 100644
--- a/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
@@ -21,10 +21,7 @@ package org.apache.oozie.service;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.RunningJob;
import org.apache.oozie.BundleActionBean;
import org.apache.oozie.BundleJobBean;
import org.apache.oozie.CoordinatorActionBean;
@@ -250,20 +247,10 @@ public class TestRecoveryService extends XDataTestCase {
ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(job1, action1, false, false);
MapReduceActionExecutor actionExecutor = new MapReduceActionExecutor();
JobConf conf = actionExecutor.createBaseHadoopConf(context, XmlUtils.parseXml(action1.getConf()));
- String user = conf.get("user.name");
- String group = conf.get("group.name");
- JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, conf);
-
String launcherId = action1.getExternalId();
- final RunningJob launcherJob = jobClient.getJob(JobID.forName(launcherId));
+ waitUntilYarnAppDoneAndAssertSuccess(launcherId);
- waitFor(240 * 1000, new Predicate() {
- public boolean evaluate() throws Exception {
- return launcherJob.isComplete();
- }
- });
- assertTrue(launcherJob.isSuccessful());
Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(),
conf);
assertTrue(LauncherMapperHelper.hasIdSwap(actionData));
@@ -274,10 +261,8 @@ public class TestRecoveryService extends XDataTestCase {
* @throws Exception
*/
public void testBundleRecoveryCoordCreate() throws Exception {
- final BundleActionBean bundleAction;
- final BundleJobBean bundle;
- bundle = addRecordToBundleJobTable(Job.Status.RUNNING, false);
- bundleAction = addRecordToBundleActionTable(bundle.getId(), "coord1", 1, Job.Status.PREP);
+ final BundleJobBean bundle = addRecordToBundleJobTable(Job.Status.RUNNING, false);
+ addRecordToBundleActionTable(bundle.getId(), "coord1", 1, Job.Status.PREP);
final JPAService jpaService = Services.get().get(JPAService.class);
sleep(3000);
@@ -290,7 +275,7 @@ public class TestRecoveryService extends XDataTestCase {
jpaService.execute(new BundleActionGetJPAExecutor(bundle.getId(), "coord1"));
try {
if (mybundleAction.getCoordId() != null) {
- CoordinatorJobBean coord = jpaService.execute(new CoordJobGetJPAExecutor(mybundleAction.getCoordId()));
+ jpaService.execute(new CoordJobGetJPAExecutor(mybundleAction.getCoordId()));
return true;
}
} catch (Exception e) {
@@ -345,12 +330,11 @@ public class TestRecoveryService extends XDataTestCase {
* @throws Exception
*/
public void testBundleRecoveryCoordExists() throws Exception {
- final BundleActionBean bundleAction;
final BundleJobBean bundle;
final CoordinatorJob coord;
bundle = addRecordToBundleJobTable(Job.Status.RUNNING, false);
coord = addRecordToCoordJobTable(Job.Status.PREP, false, false);
- bundleAction = addRecordToBundleActionTable(bundle.getId(), coord.getId(), "coord1", 1, Job.Status.PREP);
+ addRecordToBundleActionTable(bundle.getId(), coord.getId(), "coord1", 1, Job.Status.PREP);
final JPAService jpaService = Services.get().get(JPAService.class);
sleep(3000);
http://git-wip-us.apache.org/repos/asf/oozie/blob/3b6daff5/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/HdfsOperations.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/HdfsOperations.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/HdfsOperations.java
index 593de00..6f354a8 100644
--- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/HdfsOperations.java
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/HdfsOperations.java
@@ -17,12 +17,18 @@
*/
package org.apache.oozie.action.hadoop;
+import java.io.BufferedReader;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
@@ -85,4 +91,48 @@ public class HdfsOperations {
throw ioe;
}
}
+
+ public boolean fileExists(final Path path, final Configuration launcherJobConf) throws IOException, InterruptedException {
+ return ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
+ @Override
+ public Boolean run() throws Exception {
+ FileSystem fs = FileSystem.get(path.toUri(), launcherJobConf);
+ return fs.exists(path);
+ }
+ });
+ }
+
+ public void writeStringToFile(final Path path, final Configuration conf, final String contents) throws IOException, InterruptedException {
+ ugi.doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ FileSystem fs = FileSystem.get(path.toUri(), conf);
+ java.io.Writer writer = new OutputStreamWriter(fs.create(path));
+ writer.write(contents);
+ writer.close();
+ return null;
+ }
+ });
+ }
+
+ public String readFileContents(final Path path, final Configuration conf) throws IOException, InterruptedException {
+ return ugi.doAs(new PrivilegedExceptionAction<String>() {
+ @Override
+ public String run() throws Exception {
+ FileSystem fs = FileSystem.get(path.toUri(), conf);
+ InputStream is = fs.open(path);
+ BufferedReader reader = new BufferedReader(new InputStreamReader(is));
+ StringBuilder sb = new StringBuilder();
+
+ String contents;
+ while ((contents = reader.readLine()) != null) {
+ sb.append(contents);
+ }
+
+ reader.close();
+
+ return sb.toString();
+ }
+ });
+ }
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/3b6daff5/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java
index 89357ad..881fa72 100644
--- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java
@@ -25,6 +25,7 @@ import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.security.Permission;
import java.security.PrivilegedAction;
+import java.text.MessageFormat;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
@@ -34,7 +35,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -81,6 +83,7 @@ public class LauncherAM {
private final PrepareActionsHandler prepareHandler;
private final LauncherAMCallbackNotifierFactory callbackNotifierFactory;
private final LauncherSecurityManager launcherSecurityManager;
+ private final ContainerId containerId;
private Configuration launcherJobConf;
private AMRMClientAsync<?> amRmClientAsync;
@@ -94,7 +97,8 @@ public class LauncherAM {
LocalFsOperations localFsOperations,
PrepareActionsHandler prepareHandler,
LauncherAMCallbackNotifierFactory callbackNotifierFactory,
- LauncherSecurityManager launcherSecurityManager) {
+ LauncherSecurityManager launcherSecurityManager,
+ String containerId) {
this.ugi = Preconditions.checkNotNull(ugi, "ugi should not be null");
this.amRmClientAsyncFactory = Preconditions.checkNotNull(amRmClientAsyncFactory, "amRmClientAsyncFactory should not be null");
this.callbackHandler = Preconditions.checkNotNull(callbackHandler, "callbackHandler should not be null");
@@ -103,6 +107,7 @@ public class LauncherAM {
this.prepareHandler = Preconditions.checkNotNull(prepareHandler, "prepareHandler should not be null");
this.callbackNotifierFactory = Preconditions.checkNotNull(callbackNotifierFactory, "callbackNotifierFactory should not be null");
this.launcherSecurityManager = Preconditions.checkNotNull(launcherSecurityManager, "launcherSecurityManager should not be null");
+ this.containerId = ContainerId.fromString(Preconditions.checkNotNull(containerId, "containerId should not be null"));
}
public static void main(String[] args) throws Exception {
@@ -134,29 +139,16 @@ public class LauncherAM {
localFSOperations,
prepareHandler,
callbackNotifierFactory,
- launcherSecurityManager);
+ launcherSecurityManager,
+ System.getenv("CONTAINER_ID"));
launcher.run();
}
- // TODO: OYA: rethink all print messages and formatting
public void run() throws Exception {
final ErrorHolder errorHolder = new ErrorHolder();
OozieActionResult actionResult = OozieActionResult.FAILED;
boolean launcerExecutedProperly = false;
-
- String jobUserName = System.getenv(ApplicationConstants.Environment.USER.name());
-
- // DEBUG - will be removed
- UserGroupInformation login = UserGroupInformation.getLoginUser();
- System.out.println("Login: " + login.getUserName());
- System.out.println("SecurityEnabled:" + UserGroupInformation.isSecurityEnabled());
- System.out.println("Login keytab based:" + UserGroupInformation.isLoginKeytabBased());
- System.out.println("Login from keytab: " + login.isFromKeytab());
- System.out.println("Login has kerberos credentials: " + login.hasKerberosCredentials());
- System.out.println("Login authMethod: " + login.getAuthenticationMethod());
- System.out.println("JobUserName:" + jobUserName);
-
boolean backgroundAction = false;
try {
@@ -288,15 +280,20 @@ public class LauncherAM {
System.out.println("Java System Properties:");
System.out.println("------------------------");
System.getProperties().store(System.out, "");
- System.out.flush();
System.out.println("------------------------");
System.out.println();
+ System.out.println("Environment variables");
+ Map<String, String> env = System.getenv();
+ System.out.println("------------------------");
+ for (Map.Entry<String, String> entry : env.entrySet()) {
+ System.out.println(entry.getKey() + "=" + entry.getValue());
+ }
+ System.out.println("------------------------");
System.out.println("=================================================================");
System.out.println();
System.out.println(">>> Invoking Main class now >>>");
System.out.println();
- System.out.flush();
}
private void registerWithRM() throws IOException, YarnException {
@@ -317,7 +314,7 @@ public class LauncherAM {
// tracking url is determined automatically
amRmClientAsync.unregisterApplicationMaster(actionResult.getYarnStatus(), message, "");
} catch (Exception ex) {
- System.err.println("Error un-registering AM client");
+ System.out.println("Error un-registering AM client");
throw ex;
} finally {
amRmClientAsync.stop();
@@ -366,12 +363,7 @@ public class LauncherAM {
System.setProperty(ACTION_PREFIX + ACTION_DATA_OUTPUT_PROPS, new File(ACTION_DATA_OUTPUT_PROPS).getAbsolutePath());
System.setProperty(ACTION_PREFIX + ACTION_DATA_ERROR_PROPS, new File(ACTION_DATA_ERROR_PROPS).getAbsolutePath());
- // FIXME - make sure it's always set
- if (launcherJobConf.get("oozie.job.launch.time") != null) {
- System.setProperty("oozie.job.launch.time", launcherJobConf.get("oozie.job.launch.time"));
- } else {
- System.setProperty("oozie.job.launch.time", String.valueOf(System.currentTimeMillis()));
- }
+ System.setProperty("oozie.job.launch.time", String.valueOf(System.currentTimeMillis()));
}
private boolean runActionMain(final String[] mainArgs, final ErrorHolder eHolder, UserGroupInformation ugi) {
@@ -382,9 +374,9 @@ public class LauncherAM {
@Override
public Void run() {
try {
+ setRecoveryId();
Class<?> klass = launcherJobConf.getClass(CONF_OOZIE_ACTION_MAIN_CLASS, Object.class);
System.out.println("Launcher class: " + klass.toString());
- System.out.flush();
Method mainMethod = klass.getMethod("main", String[].class);
// Enable LauncherSecurityManager to catch System.exit calls
launcherSecurityManager.set();
@@ -412,7 +404,6 @@ public class LauncherAM {
if (launcherSecurityManager.getExitInvoked()) {
final int exitCode = launcherSecurityManager.getExitCode();
System.out.println("Intercepting System.exit(" + exitCode + ")");
- System.err.println("Intercepting System.exit(" + exitCode + ")");
// if 0 main() method finished successfully
// ignoring
eHolder.setErrorCode(exitCode);
@@ -438,8 +429,6 @@ public class LauncherAM {
eHolder.setErrorMessage(t.getMessage());
eHolder.setErrorCause(t);
} finally {
- System.out.flush();
- System.err.flush();
// Disable LauncherSecurityManager
launcherSecurityManager.unset();
}
@@ -451,6 +440,31 @@ public class LauncherAM {
return actionMainExecutedProperly.get();
}
+ private void setRecoveryId() throws LauncherException {
+ try {
+ ApplicationId applicationId = containerId.getApplicationAttemptId().getApplicationId();
+ String applicationIdStr = applicationId.toString();
+
+ String recoveryId = Preconditions.checkNotNull(launcherJobConf.get(LauncherMapper.OOZIE_ACTION_RECOVERY_ID),
+ "RecoveryID should not be null");
+
+ Path path = new Path(actionDir, recoveryId);
+ if (!hdfsOperations.fileExists(path, launcherJobConf)) {
+ hdfsOperations.writeStringToFile(path, launcherJobConf, applicationIdStr);
+ } else {
+ String id = hdfsOperations.readFileContents(path, launcherJobConf);
+
+ if (!applicationIdStr.equals(id)) {
+ throw new LauncherException(MessageFormat.format(
+ "YARN Id mismatch, action file [{0}] declares Id [{1}] current Id [{2}]", path, id,
+ applicationIdStr));
+ }
+ }
+ } catch (Exception ex) {
+ throw new LauncherException("IO error",ex);
+ }
+ }
+
private void handleActionData() throws IOException {
// external child IDs
processActionData(ACTION_PREFIX + ACTION_DATA_EXTERNAL_CHILD_IDS, null, ACTION_DATA_EXTERNAL_CHILD_IDS, -1, ACTIONOUTPUTTYPE_EXT_CHILD_ID);
@@ -516,14 +530,12 @@ public class LauncherAM {
}
}
} catch (IOException ioe) {
- System.err.println("A problem occured trying to fail the launcher");
+ System.out.println("A problem occured trying to fail the launcher");
ioe.printStackTrace();
} finally {
System.out.print("Failing Oozie Launcher, " + eHolder.getErrorMessage() + "\n");
- System.err.print("Failing Oozie Launcher, " + eHolder.getErrorMessage() + "\n");
if (eHolder.getErrorCause() != null) {
eHolder.getErrorCause().printStackTrace(System.out);
- eHolder.getErrorCause().printStackTrace(System.err);
}
}
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/3b6daff5/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAM.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAM.java b/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAM.java
index 30441ea..052673d 100644
--- a/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAM.java
+++ b/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAM.java
@@ -65,6 +65,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.oozie.action.hadoop.LauncherAM.LauncherSecurityManager;
import org.apache.oozie.action.hadoop.LauncherAM.OozieActionResult;
import org.junit.Before;
@@ -72,7 +73,6 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
-import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.runners.MockitoJUnitRunner;
@@ -80,6 +80,7 @@ import org.mockito.stubbing.Answer;
@RunWith(MockitoJUnitRunner.class)
public class TestLauncherAM {
+ private static final String DEFAULT_CONTAINER_ID = "container_1479473450392_0001_01_000001";
private static final String ACTIONDATA_ERROR_PROPERTIES = "error.properties";
private static final String ACTIONDATA_FINAL_STATUS_PROPERTY = "final.status";
private static final String ERROR_CODE_PROPERTY = "error.code";
@@ -108,7 +109,7 @@ public class TestLauncherAM {
private AMRMCallBackHandler callbackHandlerMock;
@Mock
- private HdfsOperations fsOperationsMock;
+ private HdfsOperations hdfsOperationsMock;
@Mock
private LocalFsOperations localFsOperationsMock;
@@ -127,7 +128,10 @@ public class TestLauncherAM {
private Configuration launcherJobConfig = new Configuration();
- @InjectMocks
+ private String containerId = DEFAULT_CONTAINER_ID;
+
+ private String applicationId = ConverterUtils.toContainerId(containerId).getApplicationAttemptId().getApplicationId().toString();
+
private LauncherAM launcherAM;
private ExpectedFailureDetails failureDetails = new ExpectedFailureDetails();
@@ -135,6 +139,8 @@ public class TestLauncherAM {
@Before
public void setup() throws IOException {
configureMocksForHappyPath();
+ launcherJobConfig.set(LauncherMapper.OOZIE_ACTION_RECOVERY_ID, "1");
+ instantiateLauncher();
}
@Test
@@ -379,6 +385,73 @@ public class TestLauncherAM {
}
}
+ @Test
+ public void testRecoveryIdNotSet() throws Exception {
+ launcherJobConfig.unset(LauncherMapper.OOZIE_ACTION_RECOVERY_ID);
+ instantiateLauncher();
+
+ executeLauncher();
+
+ failureDetails.expectedExceptionMessage("IO error")
+ .expectedErrorCode(EXIT_CODE_0)
+ .expectedErrorReason("IO error, IO error")
+ .withStackTrace();
+
+ assertFailedExecution();
+ }
+
+ @Test
+ public void testRecoveryIdExistsAndRecoveryIsdMatch() throws Exception {
+ given(hdfsOperationsMock.fileExists(any(Path.class), eq(launcherJobConfig))).willReturn(true);
+ given(hdfsOperationsMock.readFileContents(any(Path.class), eq(launcherJobConfig))).willReturn(applicationId);
+
+ executeLauncher();
+
+ verify(hdfsOperationsMock).readFileContents(any(Path.class), eq(launcherJobConfig));
+ }
+
+ @Test
+ public void testRecoveryIdExistsAndRecoveryIdsDoNotMatch() throws Exception {
+ given(hdfsOperationsMock.fileExists(any(Path.class), eq(launcherJobConfig))).willReturn(true);
+ given(hdfsOperationsMock.readFileContents(any(Path.class), eq(launcherJobConfig))).willReturn("not_matching_appid");
+
+ executeLauncher();
+
+ failureDetails.expectedExceptionMessage("IO error")
+ .expectedErrorCode(EXIT_CODE_0)
+ .expectedErrorReason("IO error, IO error")
+ .withStackTrace();
+
+ verify(hdfsOperationsMock).readFileContents(any(Path.class), eq(launcherJobConfig));
+ assertFailedExecution();
+ }
+
+ @Test
+ public void testReadingRecoveryIdFails() throws Exception {
+ willThrow(new IOException()).given(hdfsOperationsMock).writeStringToFile(any(Path.class), eq(launcherJobConfig), eq(applicationId));
+
+ executeLauncher();
+
+ failureDetails.expectedExceptionMessage("IO error")
+ .expectedErrorCode(EXIT_CODE_0)
+ .expectedErrorReason("IO error, IO error")
+ .withStackTrace();
+
+ assertFailedExecution();
+ }
+
+ private void instantiateLauncher() {
+ launcherAM = new LauncherAM(ugiMock,
+ amRMClientAsyncFactoryMock,
+ callbackHandlerMock,
+ hdfsOperationsMock,
+ localFsOperationsMock,
+ prepareHandlerMock,
+ launcherCallbackNotifierFactoryMock,
+ launcherSecurityManagerMock,
+ containerId);
+ }
+
@SuppressWarnings("unchecked")
private void configureMocksForHappyPath() throws IOException {
launcherJobConfig.set(LauncherAM.OOZIE_ACTION_DIR_PATH, "dummy");
@@ -426,9 +499,10 @@ public class TestLauncherAM {
verify(amRmAsyncClientMock).unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, EMPTY_STRING, EMPTY_STRING);
verify(amRmAsyncClientMock).stop();
verify(ugiMock, times(2)).doAs(any(PrivilegedAction.class)); // prepare & action main
- verify(fsOperationsMock).uploadActionDataToHDFS(any(Configuration.class), any(Path.class), any(Map.class));
+ verify(hdfsOperationsMock).uploadActionDataToHDFS(any(Configuration.class), any(Path.class), any(Map.class));
verify(launcherCallbackNotifierFactoryMock).createCallbackNotifier(any(Configuration.class));
verify(launcherCallbackNotifierMock).notifyURL(actionResult);
+ verify(hdfsOperationsMock).writeStringToFile(any(Path.class), any(Configuration.class), any(String.class));
Map<String, String> actionData = launcherAM.getActionData();
verifyFinalStatus(actionData, actionResult);