You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2016/10/04 15:19:41 UTC
[10/50] [abbrv] hadoop git commit: MAPREDUCE-6638. Do not attempt to
recover progress from previous job attempts if spill encryption is enabled.
(Haibo Chen via kasha)
MAPREDUCE-6638. Do not attempt to recover progress from previous job attempts if spill encryption is enabled. (Haibo Chen via kasha)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/de7a0a92
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/de7a0a92
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/de7a0a92
Branch: refs/heads/YARN-3368
Commit: de7a0a92ca1983b35ca4beb7ab712fd700a9e6e0
Parents: 7442084
Author: Karthik Kambatla <ka...@cloudera.com>
Authored: Mon Oct 3 10:30:22 2016 -0700
Committer: Karthik Kambatla <ka...@cloudera.com>
Committed: Mon Oct 3 10:30:22 2016 -0700
----------------------------------------------------------------------
.../hadoop/mapreduce/v2/app/MRAppMaster.java | 90 ++++++++++++++------
.../hadoop/mapreduce/v2/app/TestRecovery.java | 66 ++++++++++++++
2 files changed, 129 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/de7a0a92/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
index d94f8a5..4a8a90e 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
@@ -149,7 +149,6 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager;
import org.apache.hadoop.yarn.util.Clock;
-import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.log4j.LogManager;
@@ -1303,44 +1302,77 @@ public class MRAppMaster extends CompositeService {
}
private void processRecovery() throws IOException{
- if (appAttemptID.getAttemptId() == 1) {
- return; // no need to recover on the first attempt
+ boolean attemptRecovery = shouldAttemptRecovery();
+ boolean recoverySucceeded = true;
+ if (attemptRecovery) {
+ LOG.info("Attempting to recover.");
+ try {
+ parsePreviousJobHistory();
+ } catch (IOException e) {
+ LOG.warn("Unable to parse prior job history, aborting recovery", e);
+ recoverySucceeded = false;
+ }
+ }
+
+ if (!isFirstAttempt() && (!attemptRecovery || !recoverySucceeded)) {
+ amInfos.addAll(readJustAMInfos());
+ }
+ }
+
+ private boolean isFirstAttempt() {
+ return appAttemptID.getAttemptId() == 1;
+ }
+
+ /**
+ * Check if the current job attempt should try to recover from previous
+ * job attempts if any.
+ */
+ private boolean shouldAttemptRecovery() throws IOException {
+ if (isFirstAttempt()) {
+ return false; // no need to recover on the first attempt
}
boolean recoveryEnabled = getConfig().getBoolean(
MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE,
MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE_DEFAULT);
+ if (!recoveryEnabled) {
+ LOG.info("Not attempting to recover. Recovery disabled. To enable " +
+ "recovery, set " + MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE);
+ return false;
+ }
boolean recoverySupportedByCommitter = isRecoverySupported();
+ if (!recoverySupportedByCommitter) {
+ LOG.info("Not attempting to recover. Recovery is not supported by " +
+ committer.getClass() + ". Use an OutputCommitter that supports" +
+ " recovery.");
+ return false;
+ }
- // If a shuffle secret was not provided by the job client then this app
- // attempt will generate one. However that disables recovery if there
- // are reducers as the shuffle secret would be app attempt specific.
- int numReduceTasks = getConfig().getInt(MRJobConfig.NUM_REDUCES, 0);
+ int reducerCount = getConfig().getInt(MRJobConfig.NUM_REDUCES, 0);
+
+ // If a shuffle secret was not provided by the job client, one will be
+ // generated in this job attempt. However, that disables recovery if
+ // there are reducers as the shuffle secret would be job attempt specific.
boolean shuffleKeyValidForRecovery =
TokenCache.getShuffleSecretKey(jobCredentials) != null;
+ if (reducerCount > 0 && !shuffleKeyValidForRecovery) {
+ LOG.info("Not attempting to recover. The shuffle key is invalid for " +
+ "recovery.");
+ return false;
+ }
- if (recoveryEnabled && recoverySupportedByCommitter
- && (numReduceTasks <= 0 || shuffleKeyValidForRecovery)) {
- LOG.info("Recovery is enabled. "
- + "Will try to recover from previous life on best effort basis.");
- try {
- parsePreviousJobHistory();
- } catch (IOException e) {
- LOG.warn("Unable to parse prior job history, aborting recovery", e);
- // try to get just the AMInfos
- amInfos.addAll(readJustAMInfos());
- }
- } else {
- LOG.info("Will not try to recover. recoveryEnabled: "
- + recoveryEnabled + " recoverySupportedByCommitter: "
- + recoverySupportedByCommitter + " numReduceTasks: "
- + numReduceTasks + " shuffleKeyValidForRecovery: "
- + shuffleKeyValidForRecovery + " ApplicationAttemptID: "
- + appAttemptID.getAttemptId());
- // Get the amInfos anyways whether recovery is enabled or not
- amInfos.addAll(readJustAMInfos());
+ // If the intermediate data is encrypted, recovering the job requires the
+ // access to the key. Until the encryption key is persisted, we should
+ // avoid attempts to recover.
+ boolean spillEncrypted = CryptoUtils.isEncryptedSpillEnabled(getConfig());
+ if (reducerCount > 0 && spillEncrypted) {
+ LOG.info("Not attempting to recover. Intermediate spill encryption" +
+ " is enabled.");
+ return false;
}
+
+ return true;
}
private static FSDataInputStream getPreviousJobHistoryStream(
@@ -1440,6 +1472,10 @@ public class MRAppMaster extends CompositeService {
return amInfos;
}
+ public boolean recovered() {
+ return recoveredJobStartTime > 0;
+ }
+
/**
* This can be overridden to instantiate multiple jobs and create a
* workflow.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/de7a0a92/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
index 9d5f0ae..071575a 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
@@ -579,6 +579,72 @@ public class TestRecovery {
app.verifyCompleted();
}
+ @Test
+ public void testRecoveryWithSpillEncryption() throws Exception {
+ int runCount = 0;
+ MRApp app = new MRAppWithHistory(1, 1, false, this.getClass().getName(),
+ true, ++runCount) {
+ };
+ Configuration conf = new Configuration();
+ conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
+ conf.setBoolean("mapred.mapper.new-api", true);
+ conf.setBoolean("mapred.reducer.new-api", true);
+ conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+ conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+ conf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true);
+
+ // run the MR job at the first attempt
+ Job jobAttempt1 = app.submit(conf);
+ app.waitForState(jobAttempt1, JobState.RUNNING);
+
+ Iterator<Task> tasks = jobAttempt1.getTasks().values().iterator();
+
+ // finish the map task but the reduce task
+ Task mapper = tasks.next();
+ app.waitForState(mapper, TaskState.RUNNING);
+ TaskAttempt mapAttempt = mapper.getAttempts().values().iterator().next();
+ app.waitForState(mapAttempt, TaskAttemptState.RUNNING);
+ app.getContext().getEventHandler().handle(
+ new TaskAttemptEvent(mapAttempt.getID(), TaskAttemptEventType.TA_DONE));
+ app.waitForState(mapper, TaskState.SUCCEEDED);
+
+ // crash the first attempt of the MR job
+ app.stop();
+
+ // run the MR job again at the second attempt
+ app = new MRAppWithHistory(1, 1, false, this.getClass().getName(), false,
+ ++runCount);
+ Job jobAttempt2 = app.submit(conf);
+ Assert.assertTrue("Recovery from previous job attempt is processed even " +
+ "though intermediate data encryption is enabled.", !app.recovered());
+
+ // The map task succeeded from previous job attempt will not be recovered
+ // because the data spill encryption is enabled.
+ // Let's finish the job at the second attempt and verify its completion.
+ app.waitForState(jobAttempt2, JobState.RUNNING);
+ tasks = jobAttempt2.getTasks().values().iterator();
+ mapper = tasks.next();
+ Task reducer = tasks.next();
+
+ // finish the map task first
+ app.waitForState(mapper, TaskState.RUNNING);
+ mapAttempt = mapper.getAttempts().values().iterator().next();
+ app.waitForState(mapAttempt, TaskAttemptState.RUNNING);
+ app.getContext().getEventHandler().handle(
+ new TaskAttemptEvent(mapAttempt.getID(), TaskAttemptEventType.TA_DONE));
+ app.waitForState(mapper, TaskState.SUCCEEDED);
+
+ // then finish the reduce task
+ TaskAttempt redAttempt = reducer.getAttempts().values().iterator().next();
+ app.waitForState(redAttempt, TaskAttemptState.RUNNING);
+ app.getContext().getEventHandler().handle(
+ new TaskAttemptEvent(redAttempt.getID(), TaskAttemptEventType.TA_DONE));
+ app.waitForState(reducer, TaskState.SUCCEEDED);
+
+ // verify that the job succeeds at the 2rd attempt
+ app.waitForState(jobAttempt2, JobState.SUCCEEDED);
+ }
+
/**
* This test case primarily verifies if the recovery is controlled through config
* property. In this case, recover is turned OFF. AM with 3 maps and 0 reduce.
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org