You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by pb...@apache.org on 2016/12/13 13:19:52 UTC

[21/48] oozie git commit: OOZIE-2591 Fix recovery handling

OOZIE-2591 Fix recovery handling

Change-Id: I7501411b2bdcdc1962e5ac77082a71c96b205902


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/3b6daff5
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/3b6daff5
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/3b6daff5

Branch: refs/heads/oya
Commit: 3b6daff59a58c5b5b30a123c4ca75d6c20b4e30d
Parents: ba68347
Author: Peter Bacsko <pb...@cloudera.com>
Authored: Fri Nov 18 11:13:00 2016 +0100
Committer: Peter Bacsko <pb...@cloudera.com>
Committed: Mon Nov 21 14:28:36 2016 +0100

----------------------------------------------------------------------
 .../oozie/action/hadoop/JavaActionExecutor.java | 81 ++++++++-----------
 .../oozie/action/hadoop/LauncherMainTester.java |  1 +
 .../oozie/service/TestRecoveryService.java      | 26 ++-----
 .../oozie/action/hadoop/HdfsOperations.java     | 50 ++++++++++++
 .../apache/oozie/action/hadoop/LauncherAM.java  | 78 +++++++++++--------
 .../oozie/action/hadoop/TestLauncherAM.java     | 82 +++++++++++++++++++-
 6 files changed, 211 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/3b6daff5/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
index 284690b..2ec5266 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.AccessControlException;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobID;
@@ -103,6 +104,7 @@ import org.jdom.JDOMException;
 import org.jdom.Namespace;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.io.Closeables;
 
 
 public class JavaActionExecutor extends ActionExecutor {
@@ -958,6 +960,7 @@ public class JavaActionExecutor extends ActionExecutor {
     public void submitLauncher(FileSystem actionFs, final Context context, WorkflowAction action) throws ActionExecutorException {
         JobClient jobClient = null;
         boolean exception = false;
+        YarnClient yarnClient = null;
         try {
             Path appPathRoot = new Path(context.getWorkflow().getAppPath());
 
@@ -1014,23 +1017,23 @@ public class JavaActionExecutor extends ActionExecutor {
             }
             JobConf launcherJobConf = createLauncherConf(actionFs, context, action, actionXml, actionConf);
 
-            boolean alreadyRunning = false;
-            String launcherId = null;
-            String consoleUrl = null;
-            // TODO: OYA: equivalent of this? (recovery, alreadyRunning)  When does this happen?
-//            LOG.debug("Creating Job Client for action " + action.getId());
-//            jobClient = createJobClient(context, launcherJobConf);
-//            launcherId = LauncherMapperHelper.getRecoveryId(launcherJobConf, context.getActionDir(), context
-//                    .getRecoveryId());
-//            alreadyRunning = launcherId != null;
-            RunningJob runningJob;
+            String consoleUrl;
+            String launcherId = LauncherMapperHelper.getRecoveryId(launcherJobConf, context.getActionDir(), context
+                    .getRecoveryId());
+            boolean alreadyRunning = launcherId != null;
 
             // if user-retry is on, always submit new launcher
             boolean isUserRetry = ((WorkflowActionBean)action).isUserRetry();
+            yarnClient = createYarnClient(context, launcherJobConf);
 
             if (alreadyRunning && !isUserRetry) {
-                runningJob = jobClient.getJob(JobID.forName(launcherId));
-                if (runningJob == null) {
+                try {
+                    ApplicationId appId = ConverterUtils.toApplicationId(launcherId);
+                    ApplicationReport report = yarnClient.getApplicationReport(appId);
+                    consoleUrl = report.getTrackingUrl();
+                } catch (RemoteException e) {
+                    // caught when the application id does not exist
+                    LOG.error("Got RemoteException from YARN", e);
                     String jobTracker = launcherJobConf.get(HADOOP_YARN_RM);
                     throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA017",
                             "unknown job [{0}@{1}], cannot recover", launcherId, jobTracker);
@@ -1070,32 +1073,18 @@ public class JavaActionExecutor extends ActionExecutor {
                     LOG.info("No need to inject credentials.");
                 }
 
-                YarnClient yarnClient = null;
-                try {
-                    String user = context.getWorkflow().getUser();
-
-                    // Create application
-                    yarnClient = createYarnClient(context, launcherJobConf);
-                    YarnClientApplication newApp = yarnClient.createApplication();
-                    ApplicationId appId = newApp.getNewApplicationResponse().getApplicationId();
-
-                    // Create launch context for app master
-                    ApplicationSubmissionContext appContext =
-                            createAppSubmissionContext(appId, launcherJobConf, user, context, actionConf);
-
-                    // Submit the launcher AM
-                    yarnClient.submitApplication(appContext);
-
-                    launcherId = appId.toString();
-                    LOG.debug("After submission get the launcherId [{0}]", launcherId);
-                    ApplicationReport appReport = yarnClient.getApplicationReport(appId);
-                    consoleUrl = appReport.getTrackingUrl();
-                } finally {
-                    if (yarnClient != null) {
-                        yarnClient.close();
-                        yarnClient = null;
-                    }
-                }
+                String user = context.getWorkflow().getUser();
+
+                YarnClientApplication newApp = yarnClient.createApplication();
+                ApplicationId appId = newApp.getNewApplicationResponse().getApplicationId();
+                ApplicationSubmissionContext appContext =
+                        createAppSubmissionContext(appId, launcherJobConf, user, context, actionConf);
+                yarnClient.submitApplication(appContext);
+
+                launcherId = appId.toString();
+                LOG.debug("After submission get the launcherId [{0}]", launcherId);
+                ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+                consoleUrl = appReport.getTrackingUrl();
             }
 
             String jobTracker = launcherJobConf.get(HADOOP_YARN_RM);
@@ -1106,6 +1095,10 @@ public class JavaActionExecutor extends ActionExecutor {
             throw convertException(ex);
         }
         finally {
+            if (yarnClient != null) {
+                Closeables.closeQuietly(yarnClient);
+            }
+ 
             if (jobClient != null) {
                 try {
                     jobClient.close();
@@ -1126,26 +1119,16 @@ public class JavaActionExecutor extends ActionExecutor {
                                                                     Context context, Configuration actionConf)
             throws IOException, HadoopAccessorException, URISyntaxException {
 
-        // Create launch context for app master
         ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class);
 
-        // set the application id
         appContext.setApplicationId(appId);
-
-        // set the application name
         appContext.setApplicationName(launcherJobConf.getJobName());
         appContext.setApplicationType("Oozie Launcher");
-
-        // Set the priority for the application master
         Priority pri = Records.newRecord(Priority.class);
         int priority = 0; // TODO: OYA: Add a constant or a config
         pri.setPriority(priority);
         appContext.setPriority(pri);
-
-        // Set the queue to which this application is to be submitted in the RM
         appContext.setQueue(launcherJobConf.getQueueName());
-
-        // Set up the container launch context for the application master
         ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
 
         // Set the resources to localize
@@ -1193,7 +1176,7 @@ public class JavaActionExecutor extends ActionExecutor {
         vargs.add("-Dhadoop.root.logger=INFO,CLA");
         vargs.add("-Dhadoop.root.logfile=" + TaskLog.LogName.SYSLOG);
         vargs.add("-Dsubmitter.user=" + context.getWorkflow().getUser());
-        vargs.add("org.apache.oozie.action.hadoop.LauncherAM");  // note: using string temporarily so we don't have to depend on sharelib-oozie
+        vargs.add(LauncherAM.class.getCanonicalName());
         vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
                 Path.SEPARATOR + ApplicationConstants.STDOUT);
         vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +

http://git-wip-us.apache.org/repos/asf/oozie/blob/3b6daff5/core/src/test/java/org/apache/oozie/action/hadoop/LauncherMainTester.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/LauncherMainTester.java b/core/src/test/java/org/apache/oozie/action/hadoop/LauncherMainTester.java
index 4baed6e..c2aae4c 100644
--- a/core/src/test/java/org/apache/oozie/action/hadoop/LauncherMainTester.java
+++ b/core/src/test/java/org/apache/oozie/action/hadoop/LauncherMainTester.java
@@ -30,6 +30,7 @@ public class LauncherMainTester {
         if (args.length == 0) {
             System.out.println("Hello World!");
         }
+ 
         if (args.length == 1) {
             if (args[0].equals("throwable")) {
                 throw new Throwable("throwing throwable");

http://git-wip-us.apache.org/repos/asf/oozie/blob/3b6daff5/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java b/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
index 8fd0c2d..a3270e9 100644
--- a/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
@@ -21,10 +21,7 @@ package org.apache.oozie.service;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.RunningJob;
 import org.apache.oozie.BundleActionBean;
 import org.apache.oozie.BundleJobBean;
 import org.apache.oozie.CoordinatorActionBean;
@@ -250,20 +247,10 @@ public class TestRecoveryService extends XDataTestCase {
         ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(job1, action1, false, false);
         MapReduceActionExecutor actionExecutor = new MapReduceActionExecutor();
         JobConf conf = actionExecutor.createBaseHadoopConf(context, XmlUtils.parseXml(action1.getConf()));
-        String user = conf.get("user.name");
-        String group = conf.get("group.name");
-        JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, conf);
-
         String launcherId = action1.getExternalId();
 
-        final RunningJob launcherJob = jobClient.getJob(JobID.forName(launcherId));
+        waitUntilYarnAppDoneAndAssertSuccess(launcherId);
 
-        waitFor(240 * 1000, new Predicate() {
-            public boolean evaluate() throws Exception {
-                return launcherJob.isComplete();
-            }
-        });
-        assertTrue(launcherJob.isSuccessful());
         Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(),
                 conf);
         assertTrue(LauncherMapperHelper.hasIdSwap(actionData));
@@ -274,10 +261,8 @@ public class TestRecoveryService extends XDataTestCase {
      * @throws Exception
      */
     public void testBundleRecoveryCoordCreate() throws Exception {
-        final BundleActionBean bundleAction;
-        final BundleJobBean bundle;
-        bundle = addRecordToBundleJobTable(Job.Status.RUNNING, false);
-        bundleAction = addRecordToBundleActionTable(bundle.getId(), "coord1", 1, Job.Status.PREP);
+        final BundleJobBean bundle = addRecordToBundleJobTable(Job.Status.RUNNING, false);
+        addRecordToBundleActionTable(bundle.getId(), "coord1", 1, Job.Status.PREP);
         final JPAService jpaService = Services.get().get(JPAService.class);
 
         sleep(3000);
@@ -290,7 +275,7 @@ public class TestRecoveryService extends XDataTestCase {
                         jpaService.execute(new BundleActionGetJPAExecutor(bundle.getId(), "coord1"));
                 try {
                     if (mybundleAction.getCoordId() != null) {
-                        CoordinatorJobBean coord = jpaService.execute(new CoordJobGetJPAExecutor(mybundleAction.getCoordId()));
+                        jpaService.execute(new CoordJobGetJPAExecutor(mybundleAction.getCoordId()));
                         return true;
                     }
                 } catch (Exception e) {
@@ -345,12 +330,11 @@ public class TestRecoveryService extends XDataTestCase {
      * @throws Exception
      */
     public void testBundleRecoveryCoordExists() throws Exception {
-        final BundleActionBean bundleAction;
         final BundleJobBean bundle;
         final CoordinatorJob coord;
         bundle = addRecordToBundleJobTable(Job.Status.RUNNING, false);
         coord = addRecordToCoordJobTable(Job.Status.PREP, false, false);
-        bundleAction = addRecordToBundleActionTable(bundle.getId(), coord.getId(), "coord1", 1, Job.Status.PREP);
+        addRecordToBundleActionTable(bundle.getId(), coord.getId(), "coord1", 1, Job.Status.PREP);
         final JPAService jpaService = Services.get().get(JPAService.class);
 
         sleep(3000);

http://git-wip-us.apache.org/repos/asf/oozie/blob/3b6daff5/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/HdfsOperations.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/HdfsOperations.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/HdfsOperations.java
index 593de00..6f354a8 100644
--- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/HdfsOperations.java
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/HdfsOperations.java
@@ -17,12 +17,18 @@
  */
 package org.apache.oozie.action.hadoop;
 
+import java.io.BufferedReader;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
 import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
 import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
@@ -85,4 +91,48 @@ public class HdfsOperations {
             throw ioe;
         }
     }
+
+    public boolean fileExists(final Path path, final Configuration launcherJobConf) throws IOException, InterruptedException {
+        return ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
+            @Override
+            public Boolean run() throws Exception {
+                FileSystem fs = FileSystem.get(path.toUri(), launcherJobConf);
+                return fs.exists(path);
+            }
+        });
+    }
+
+    public void writeStringToFile(final Path path, final Configuration conf, final String contents) throws IOException, InterruptedException {
+        ugi.doAs(new PrivilegedExceptionAction<Void>() {
+            @Override
+            public Void run() throws Exception {
+                FileSystem fs = FileSystem.get(path.toUri(), conf);
+                java.io.Writer writer = new OutputStreamWriter(fs.create(path));
+                writer.write(contents);
+                writer.close();
+                return null;
+            }
+        });
+    }
+
+    public String readFileContents(final Path path, final Configuration conf) throws IOException, InterruptedException {
+        return ugi.doAs(new PrivilegedExceptionAction<String>() {
+            @Override
+            public String run() throws Exception {
+                FileSystem fs = FileSystem.get(path.toUri(), conf);
+                InputStream is = fs.open(path);
+                BufferedReader reader = new BufferedReader(new InputStreamReader(is));
+                StringBuilder sb = new StringBuilder();
+
+                String contents;
+                while ((contents = reader.readLine()) != null) {
+                    sb.append(contents);
+                }
+
+                reader.close();
+
+                return sb.toString();
+            }
+        });
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/3b6daff5/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java
index 89357ad..881fa72 100644
--- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java
@@ -25,6 +25,7 @@ import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.security.Permission;
 import java.security.PrivilegedAction;
+import java.text.MessageFormat;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
@@ -34,7 +35,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -81,6 +83,7 @@ public class LauncherAM {
     private final PrepareActionsHandler prepareHandler;
     private final LauncherAMCallbackNotifierFactory callbackNotifierFactory;
     private final LauncherSecurityManager launcherSecurityManager;
+    private final ContainerId containerId;
 
     private Configuration launcherJobConf;
     private AMRMClientAsync<?> amRmClientAsync;
@@ -94,7 +97,8 @@ public class LauncherAM {
             LocalFsOperations localFsOperations,
             PrepareActionsHandler prepareHandler,
             LauncherAMCallbackNotifierFactory callbackNotifierFactory,
-            LauncherSecurityManager launcherSecurityManager) {
+            LauncherSecurityManager launcherSecurityManager,
+            String containerId) {
         this.ugi = Preconditions.checkNotNull(ugi, "ugi should not be null");
         this.amRmClientAsyncFactory = Preconditions.checkNotNull(amRmClientAsyncFactory, "amRmClientAsyncFactory should not be null");
         this.callbackHandler = Preconditions.checkNotNull(callbackHandler, "callbackHandler should not be null");
@@ -103,6 +107,7 @@ public class LauncherAM {
         this.prepareHandler = Preconditions.checkNotNull(prepareHandler, "prepareHandler should not be null");
         this.callbackNotifierFactory = Preconditions.checkNotNull(callbackNotifierFactory, "callbackNotifierFactory should not be null");
         this.launcherSecurityManager = Preconditions.checkNotNull(launcherSecurityManager, "launcherSecurityManager should not be null");
+        this.containerId = ContainerId.fromString(Preconditions.checkNotNull(containerId, "containerId should not be null"));
     }
 
     public static void main(String[] args) throws Exception {
@@ -134,29 +139,16 @@ public class LauncherAM {
                 localFSOperations,
                 prepareHandler,
                 callbackNotifierFactory,
-                launcherSecurityManager);
+                launcherSecurityManager,
+                System.getenv("CONTAINER_ID"));
 
         launcher.run();
     }
 
-    // TODO: OYA: rethink all print messages and formatting
     public void run() throws Exception {
         final ErrorHolder errorHolder = new ErrorHolder();
         OozieActionResult actionResult = OozieActionResult.FAILED;
         boolean launcerExecutedProperly = false;
-
-        String jobUserName = System.getenv(ApplicationConstants.Environment.USER.name());
-
-        // DEBUG - will be removed
-        UserGroupInformation login = UserGroupInformation.getLoginUser();
-        System.out.println("Login: " + login.getUserName());
-        System.out.println("SecurityEnabled:" + UserGroupInformation.isSecurityEnabled());
-        System.out.println("Login keytab based:" + UserGroupInformation.isLoginKeytabBased());
-        System.out.println("Login from keytab: " + login.isFromKeytab());
-        System.out.println("Login has kerberos credentials: " + login.hasKerberosCredentials());
-        System.out.println("Login authMethod: " + login.getAuthenticationMethod());
-        System.out.println("JobUserName:" + jobUserName);
-
         boolean backgroundAction = false;
 
         try {
@@ -288,15 +280,20 @@ public class LauncherAM {
         System.out.println("Java System Properties:");
         System.out.println("------------------------");
         System.getProperties().store(System.out, "");
-        System.out.flush();
         System.out.println("------------------------");
         System.out.println();
 
+        System.out.println("Environment variables");
+        Map<String, String> env = System.getenv();
+        System.out.println("------------------------");
+        for (Map.Entry<String, String> entry : env.entrySet()) {
+            System.out.println(entry.getKey() + "=" + entry.getValue());
+        }
+        System.out.println("------------------------");
         System.out.println("=================================================================");
         System.out.println();
         System.out.println(">>> Invoking Main class now >>>");
         System.out.println();
-        System.out.flush();
     }
 
     private void registerWithRM() throws IOException, YarnException {
@@ -317,7 +314,7 @@ public class LauncherAM {
                 // tracking url is determined automatically
                 amRmClientAsync.unregisterApplicationMaster(actionResult.getYarnStatus(), message, "");
             } catch (Exception ex) {
-                System.err.println("Error un-registering AM client");
+                System.out.println("Error un-registering AM client");
                 throw ex;
             } finally {
                 amRmClientAsync.stop();
@@ -366,12 +363,7 @@ public class LauncherAM {
         System.setProperty(ACTION_PREFIX + ACTION_DATA_OUTPUT_PROPS, new File(ACTION_DATA_OUTPUT_PROPS).getAbsolutePath());
         System.setProperty(ACTION_PREFIX + ACTION_DATA_ERROR_PROPS, new File(ACTION_DATA_ERROR_PROPS).getAbsolutePath());
 
-        // FIXME - make sure it's always set
-        if (launcherJobConf.get("oozie.job.launch.time") != null) {
-            System.setProperty("oozie.job.launch.time", launcherJobConf.get("oozie.job.launch.time"));
-        } else {
-            System.setProperty("oozie.job.launch.time", String.valueOf(System.currentTimeMillis()));
-        }
+        System.setProperty("oozie.job.launch.time", String.valueOf(System.currentTimeMillis()));
     }
 
     private boolean runActionMain(final String[] mainArgs, final ErrorHolder eHolder, UserGroupInformation ugi) {
@@ -382,9 +374,9 @@ public class LauncherAM {
             @Override
             public Void run() {
                 try {
+                    setRecoveryId();
                     Class<?> klass = launcherJobConf.getClass(CONF_OOZIE_ACTION_MAIN_CLASS, Object.class);
                     System.out.println("Launcher class: " + klass.toString());
-                    System.out.flush();
                     Method mainMethod = klass.getMethod("main", String[].class);
                     // Enable LauncherSecurityManager to catch System.exit calls
                     launcherSecurityManager.set();
@@ -412,7 +404,6 @@ public class LauncherAM {
                         if (launcherSecurityManager.getExitInvoked()) {
                             final int exitCode = launcherSecurityManager.getExitCode();
                             System.out.println("Intercepting System.exit(" + exitCode + ")");
-                            System.err.println("Intercepting System.exit(" + exitCode + ")");
                             // if 0 main() method finished successfully
                             // ignoring
                             eHolder.setErrorCode(exitCode);
@@ -438,8 +429,6 @@ public class LauncherAM {
                     eHolder.setErrorMessage(t.getMessage());
                     eHolder.setErrorCause(t);
                 } finally {
-                    System.out.flush();
-                    System.err.flush();
                     // Disable LauncherSecurityManager
                     launcherSecurityManager.unset();
                 }
@@ -451,6 +440,31 @@ public class LauncherAM {
         return actionMainExecutedProperly.get();
     }
 
+    private void setRecoveryId() throws LauncherException {
+        try {
+            ApplicationId applicationId = containerId.getApplicationAttemptId().getApplicationId();
+            String applicationIdStr = applicationId.toString();
+
+            String recoveryId = Preconditions.checkNotNull(launcherJobConf.get(LauncherMapper.OOZIE_ACTION_RECOVERY_ID),
+                            "RecoveryID should not be null");
+
+            Path path = new Path(actionDir, recoveryId);
+            if (!hdfsOperations.fileExists(path, launcherJobConf)) {
+                hdfsOperations.writeStringToFile(path, launcherJobConf, applicationIdStr);
+            } else {
+                String id = hdfsOperations.readFileContents(path, launcherJobConf);
+
+                if (!applicationIdStr.equals(id)) {
+                    throw new LauncherException(MessageFormat.format(
+                            "YARN Id mismatch, action file [{0}] declares Id [{1}] current Id [{2}]", path, id,
+                            applicationIdStr));
+                }
+            }
+        } catch (Exception ex) {
+            throw new LauncherException("IO error",ex);
+        }
+    }
+
     private void handleActionData() throws IOException {
         // external child IDs
         processActionData(ACTION_PREFIX + ACTION_DATA_EXTERNAL_CHILD_IDS, null, ACTION_DATA_EXTERNAL_CHILD_IDS, -1, ACTIONOUTPUTTYPE_EXT_CHILD_ID);
@@ -516,14 +530,12 @@ public class LauncherAM {
                 }
             }
         } catch (IOException ioe) {
-            System.err.println("A problem occured trying to fail the launcher");
+            System.out.println("A problem occured trying to fail the launcher");
             ioe.printStackTrace();
         } finally {
             System.out.print("Failing Oozie Launcher, " + eHolder.getErrorMessage() + "\n");
-            System.err.print("Failing Oozie Launcher, " + eHolder.getErrorMessage() + "\n");
             if (eHolder.getErrorCause() != null) {
                 eHolder.getErrorCause().printStackTrace(System.out);
-                eHolder.getErrorCause().printStackTrace(System.err);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/oozie/blob/3b6daff5/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAM.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAM.java b/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAM.java
index 30441ea..052673d 100644
--- a/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAM.java
+++ b/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAM.java
@@ -65,6 +65,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.oozie.action.hadoop.LauncherAM.LauncherSecurityManager;
 import org.apache.oozie.action.hadoop.LauncherAM.OozieActionResult;
 import org.junit.Before;
@@ -72,7 +73,6 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
-import org.mockito.InjectMocks;
 import org.mockito.Mock;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.runners.MockitoJUnitRunner;
@@ -80,6 +80,7 @@ import org.mockito.stubbing.Answer;
 
 @RunWith(MockitoJUnitRunner.class)
 public class TestLauncherAM {
+    private static final String DEFAULT_CONTAINER_ID = "container_1479473450392_0001_01_000001";
     private static final String ACTIONDATA_ERROR_PROPERTIES = "error.properties";
     private static final String ACTIONDATA_FINAL_STATUS_PROPERTY = "final.status";
     private static final String ERROR_CODE_PROPERTY = "error.code";
@@ -108,7 +109,7 @@ public class TestLauncherAM {
     private AMRMCallBackHandler callbackHandlerMock;
 
     @Mock
-    private HdfsOperations fsOperationsMock;
+    private HdfsOperations hdfsOperationsMock;
 
     @Mock
     private LocalFsOperations localFsOperationsMock;
@@ -127,7 +128,10 @@ public class TestLauncherAM {
 
     private Configuration launcherJobConfig = new Configuration();
 
-    @InjectMocks
+    private String containerId = DEFAULT_CONTAINER_ID;
+
+    private String applicationId = ConverterUtils.toContainerId(containerId).getApplicationAttemptId().getApplicationId().toString();
+
     private LauncherAM launcherAM;
 
     private ExpectedFailureDetails failureDetails = new ExpectedFailureDetails();
@@ -135,6 +139,8 @@ public class TestLauncherAM {
     @Before
     public void setup() throws IOException {
         configureMocksForHappyPath();
+        launcherJobConfig.set(LauncherMapper.OOZIE_ACTION_RECOVERY_ID, "1");
+        instantiateLauncher();
     }
 
     @Test
@@ -379,6 +385,73 @@ public class TestLauncherAM {
         }
     }
 
+    @Test
+    public void testRecoveryIdNotSet() throws Exception {
+        launcherJobConfig.unset(LauncherMapper.OOZIE_ACTION_RECOVERY_ID);
+        instantiateLauncher();
+
+        executeLauncher();
+
+        failureDetails.expectedExceptionMessage("IO error")
+            .expectedErrorCode(EXIT_CODE_0)
+            .expectedErrorReason("IO error, IO error")
+            .withStackTrace();
+
+        assertFailedExecution();
+    }
+
+    @Test
+    public void testRecoveryIdExistsAndRecoveryIsdMatch() throws Exception {
+        given(hdfsOperationsMock.fileExists(any(Path.class), eq(launcherJobConfig))).willReturn(true);
+        given(hdfsOperationsMock.readFileContents(any(Path.class), eq(launcherJobConfig))).willReturn(applicationId);
+
+        executeLauncher();
+
+        verify(hdfsOperationsMock).readFileContents(any(Path.class), eq(launcherJobConfig));
+    }
+
+    @Test
+    public void testRecoveryIdExistsAndRecoveryIdsDoNotMatch() throws Exception {
+        given(hdfsOperationsMock.fileExists(any(Path.class), eq(launcherJobConfig))).willReturn(true);
+        given(hdfsOperationsMock.readFileContents(any(Path.class), eq(launcherJobConfig))).willReturn("not_matching_appid");
+
+        executeLauncher();
+
+        failureDetails.expectedExceptionMessage("IO error")
+            .expectedErrorCode(EXIT_CODE_0)
+            .expectedErrorReason("IO error, IO error")
+            .withStackTrace();
+
+        verify(hdfsOperationsMock).readFileContents(any(Path.class), eq(launcherJobConfig));
+        assertFailedExecution();
+    }
+
+    @Test
+    public void testReadingRecoveryIdFails() throws Exception {
+        willThrow(new IOException()).given(hdfsOperationsMock).writeStringToFile(any(Path.class), eq(launcherJobConfig), eq(applicationId));
+
+        executeLauncher();
+
+        failureDetails.expectedExceptionMessage("IO error")
+            .expectedErrorCode(EXIT_CODE_0)
+            .expectedErrorReason("IO error, IO error")
+            .withStackTrace();
+
+        assertFailedExecution();
+    }
+
+    private void instantiateLauncher() {
+        launcherAM = new LauncherAM(ugiMock,
+                amRMClientAsyncFactoryMock,
+                callbackHandlerMock,
+                hdfsOperationsMock,
+                localFsOperationsMock,
+                prepareHandlerMock,
+                launcherCallbackNotifierFactoryMock,
+                launcherSecurityManagerMock,
+                containerId);
+    }
+
     @SuppressWarnings("unchecked")
     private void configureMocksForHappyPath() throws IOException {
         launcherJobConfig.set(LauncherAM.OOZIE_ACTION_DIR_PATH, "dummy");
@@ -426,9 +499,10 @@ public class TestLauncherAM {
         verify(amRmAsyncClientMock).unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, EMPTY_STRING, EMPTY_STRING);
         verify(amRmAsyncClientMock).stop();
         verify(ugiMock, times(2)).doAs(any(PrivilegedAction.class)); // prepare & action main
-        verify(fsOperationsMock).uploadActionDataToHDFS(any(Configuration.class), any(Path.class), any(Map.class));
+        verify(hdfsOperationsMock).uploadActionDataToHDFS(any(Configuration.class), any(Path.class), any(Map.class));
         verify(launcherCallbackNotifierFactoryMock).createCallbackNotifier(any(Configuration.class));
         verify(launcherCallbackNotifierMock).notifyURL(actionResult);
+        verify(hdfsOperationsMock).writeStringToFile(any(Path.class), any(Configuration.class), any(String.class));
 
         Map<String, String> actionData = launcherAM.getActionData();
         verifyFinalStatus(actionData, actionResult);