You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2016/10/04 15:19:41 UTC

[10/50] [abbrv] hadoop git commit: MAPREDUCE-6638. Do not attempt to recover progress from previous job attempts if spill encryption is enabled. (Haibo Chen via kasha)

MAPREDUCE-6638. Do not attempt to recover progress from previous job attempts if spill encryption is enabled. (Haibo Chen via kasha)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/de7a0a92
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/de7a0a92
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/de7a0a92

Branch: refs/heads/YARN-3368
Commit: de7a0a92ca1983b35ca4beb7ab712fd700a9e6e0
Parents: 7442084
Author: Karthik Kambatla <ka...@cloudera.com>
Authored: Mon Oct 3 10:30:22 2016 -0700
Committer: Karthik Kambatla <ka...@cloudera.com>
Committed: Mon Oct 3 10:30:22 2016 -0700

----------------------------------------------------------------------
 .../hadoop/mapreduce/v2/app/MRAppMaster.java    | 90 ++++++++++++++------
 .../hadoop/mapreduce/v2/app/TestRecovery.java   | 66 ++++++++++++++
 2 files changed, 129 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/de7a0a92/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
index d94f8a5..4a8a90e 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
@@ -149,7 +149,6 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager;
 import org.apache.hadoop.yarn.util.Clock;
-import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.log4j.LogManager;
 
@@ -1303,44 +1302,77 @@ public class MRAppMaster extends CompositeService {
   }
 
   private void processRecovery() throws IOException{
-    if (appAttemptID.getAttemptId() == 1) {
-      return;  // no need to recover on the first attempt
+    boolean attemptRecovery = shouldAttemptRecovery();
+    boolean recoverySucceeded = true;
+    if (attemptRecovery) {
+      LOG.info("Attempting to recover.");
+      try {
+        parsePreviousJobHistory();
+      } catch (IOException e) {
+        LOG.warn("Unable to parse prior job history, aborting recovery", e);
+        recoverySucceeded = false;
+      }
+    }
+
+    if (!isFirstAttempt() && (!attemptRecovery || !recoverySucceeded)) {
+      amInfos.addAll(readJustAMInfos());
+    }
+  }
+
+  private boolean isFirstAttempt() {
+    return appAttemptID.getAttemptId() == 1;
+  }
+
+  /**
+   * Check if the current job attempt should try to recover from previous
+   * job attempts if any.
+   */
+  private boolean shouldAttemptRecovery() throws IOException {
+    if (isFirstAttempt()) {
+      return false;  // no need to recover on the first attempt
     }
 
     boolean recoveryEnabled = getConfig().getBoolean(
         MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE,
         MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE_DEFAULT);
+    if (!recoveryEnabled) {
+      LOG.info("Not attempting to recover. Recovery disabled. To enable " +
+          "recovery, set " + MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE);
+      return false;
+    }
 
     boolean recoverySupportedByCommitter = isRecoverySupported();
+    if (!recoverySupportedByCommitter) {
+      LOG.info("Not attempting to recover. Recovery is not supported by " +
+          committer.getClass() + ". Use an OutputCommitter that supports" +
+              " recovery.");
+      return false;
+    }
 
-    // If a shuffle secret was not provided by the job client then this app
-    // attempt will generate one.  However that disables recovery if there
-    // are reducers as the shuffle secret would be app attempt specific.
-    int numReduceTasks = getConfig().getInt(MRJobConfig.NUM_REDUCES, 0);
+    int reducerCount = getConfig().getInt(MRJobConfig.NUM_REDUCES, 0);
+
+    // If a shuffle secret was not provided by the job client, one will be
+    // generated in this job attempt. However, that disables recovery if
+    // there are reducers as the shuffle secret would be job attempt specific.
     boolean shuffleKeyValidForRecovery =
         TokenCache.getShuffleSecretKey(jobCredentials) != null;
+    if (reducerCount > 0 && !shuffleKeyValidForRecovery) {
+      LOG.info("Not attempting to recover. The shuffle key is invalid for " +
+          "recovery.");
+      return false;
+    }
 
-    if (recoveryEnabled && recoverySupportedByCommitter
-        && (numReduceTasks <= 0 || shuffleKeyValidForRecovery)) {
-      LOG.info("Recovery is enabled. "
-          + "Will try to recover from previous life on best effort basis.");
-      try {
-        parsePreviousJobHistory();
-      } catch (IOException e) {
-        LOG.warn("Unable to parse prior job history, aborting recovery", e);
-        // try to get just the AMInfos
-        amInfos.addAll(readJustAMInfos());
-      }
-    } else {
-      LOG.info("Will not try to recover. recoveryEnabled: "
-            + recoveryEnabled + " recoverySupportedByCommitter: "
-            + recoverySupportedByCommitter + " numReduceTasks: "
-            + numReduceTasks + " shuffleKeyValidForRecovery: "
-            + shuffleKeyValidForRecovery + " ApplicationAttemptID: "
-            + appAttemptID.getAttemptId());
-      // Get the amInfos anyways whether recovery is enabled or not
-      amInfos.addAll(readJustAMInfos());
+    // If the intermediate data is encrypted, recovering the job requires the
+    // access to the key. Until the encryption key is persisted, we should
+    // avoid attempts to recover.
+    boolean spillEncrypted = CryptoUtils.isEncryptedSpillEnabled(getConfig());
+    if (reducerCount > 0 && spillEncrypted) {
+      LOG.info("Not attempting to recover. Intermediate spill encryption" +
+          " is enabled.");
+      return false;
     }
+
+    return true;
   }
 
   private static FSDataInputStream getPreviousJobHistoryStream(
@@ -1440,6 +1472,10 @@ public class MRAppMaster extends CompositeService {
     return amInfos;
   }
 
+  public boolean recovered() {
+    return recoveredJobStartTime > 0;
+  }
+
   /**
    * This can be overridden to instantiate multiple jobs and create a 
    * workflow.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de7a0a92/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
index 9d5f0ae..071575a 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
@@ -579,6 +579,72 @@ public class TestRecovery {
     app.verifyCompleted();
   }
 
+  @Test
+  public void testRecoveryWithSpillEncryption() throws Exception {
+    int runCount = 0;
+    MRApp app = new MRAppWithHistory(1, 1, false, this.getClass().getName(),
+        true, ++runCount) {
+    };
+    Configuration conf = new Configuration();
+    conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
+    conf.setBoolean("mapred.mapper.new-api", true);
+    conf.setBoolean("mapred.reducer.new-api", true);
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+    conf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true);
+
+    // run the MR job at the first attempt
+    Job jobAttempt1 = app.submit(conf);
+    app.waitForState(jobAttempt1, JobState.RUNNING);
+
+    Iterator<Task> tasks = jobAttempt1.getTasks().values().iterator();
+
+    // finish the map task but the reduce task
+    Task mapper = tasks.next();
+    app.waitForState(mapper, TaskState.RUNNING);
+    TaskAttempt mapAttempt = mapper.getAttempts().values().iterator().next();
+    app.waitForState(mapAttempt, TaskAttemptState.RUNNING);
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(mapAttempt.getID(), TaskAttemptEventType.TA_DONE));
+    app.waitForState(mapper, TaskState.SUCCEEDED);
+
+    // crash the first attempt of the MR job
+    app.stop();
+
+    // run the MR job again at the second attempt
+    app = new MRAppWithHistory(1, 1, false, this.getClass().getName(), false,
+        ++runCount);
+    Job jobAttempt2 = app.submit(conf);
+    Assert.assertTrue("Recovery from previous job attempt is processed even " +
+        "though intermediate data encryption is enabled.", !app.recovered());
+
+    // The map task succeeded from previous job attempt will not be recovered
+    // because the data spill encryption is enabled.
+    // Let's finish the job at the second attempt and verify its completion.
+    app.waitForState(jobAttempt2, JobState.RUNNING);
+    tasks = jobAttempt2.getTasks().values().iterator();
+    mapper = tasks.next();
+    Task reducer = tasks.next();
+
+    // finish the map task first
+    app.waitForState(mapper, TaskState.RUNNING);
+    mapAttempt = mapper.getAttempts().values().iterator().next();
+    app.waitForState(mapAttempt, TaskAttemptState.RUNNING);
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(mapAttempt.getID(), TaskAttemptEventType.TA_DONE));
+    app.waitForState(mapper, TaskState.SUCCEEDED);
+
+    // then finish the reduce task
+    TaskAttempt redAttempt = reducer.getAttempts().values().iterator().next();
+    app.waitForState(redAttempt, TaskAttemptState.RUNNING);
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(redAttempt.getID(), TaskAttemptEventType.TA_DONE));
+    app.waitForState(reducer, TaskState.SUCCEEDED);
+
+    // verify that the job succeeds at the 2rd attempt
+    app.waitForState(jobAttempt2, JobState.SUCCEEDED);
+  }
+
   /**
    * This test case primarily verifies if the recovery is controlled through config
    * property. In this case, recover is turned OFF. AM with 3 maps and 0 reduce.


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org