You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2020/10/12 12:42:32 UTC

[hadoop] branch branch-3.3 updated: HADOOP-17258. Magic S3Guard Committer to overwrite existing pendingSet file on task commit (#2371)

This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 5032f8a  HADOOP-17258. Magic S3Guard Committer to overwrite existing pendingSet file on task commit (#2371)
5032f8a is described below

commit 5032f8abba41469e6dbd25627031726e1e3b425e
Author: Dongjoon Hyun <dh...@apple.com>
AuthorDate: Mon Oct 12 13:39:15 2020 +0100

    HADOOP-17258. Magic S3Guard Committer to overwrite existing pendingSet file on task commit (#2371)
    
    Contributed by Dongjoon Hyun and Steve Loughran
    
    Change-Id: Ibaf8082e60eff5298ff4e6513edc386c5bae0274
---
 .../hadoop/fs/s3a/commit/CommitConstants.java      |  3 +
 .../hadoop/fs/s3a/commit/files/PendingSet.java     |  9 +++
 .../fs/s3a/commit/files/SinglePendingCommit.java   | 10 +++
 .../fs/s3a/commit/magic/MagicS3GuardCommitter.java |  6 +-
 .../fs/s3a/commit/staging/StagingCommitter.java    |  2 +
 .../fs/s3a/commit/AbstractITCommitProtocol.java    | 86 +++++++++++++++++++++-
 6 files changed, 111 insertions(+), 5 deletions(-)

diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java
index 3e28a5d..e7c0492 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java
@@ -261,4 +261,7 @@ public final class CommitConstants {
    */
   public static final int SUCCESS_MARKER_FILE_LIMIT = 100;
 
+  /** Extra Data key for task attempt in pendingset files. */
+  public static final String TASK_ATTEMPT_ID = "task.attempt.id";
+
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/PendingSet.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/PendingSet.java
index c0d7415..4793b78 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/PendingSet.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/PendingSet.java
@@ -189,4 +189,13 @@ public class PendingSet extends PersistentCommitData {
   public void setCommits(List<SinglePendingCommit> commits) {
     this.commits = commits;
   }
+
+  /**
+   * Set/Update an extra data entry.
+   * @param key key
+   * @param value value
+   */
+  public void putExtraData(String key, String value) {
+    extraData.put(key, value);
+  }
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SinglePendingCommit.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SinglePendingCommit.java
index 596dd95..c848f80 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SinglePendingCommit.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SinglePendingCommit.java
@@ -419,6 +419,15 @@ public class SinglePendingCommit extends PersistentCommitData
   }
 
   /**
+   * Set/Update an extra data entry.
+   * @param key key
+   * @param value value
+   */
+  public void putExtraData(String key, String value) {
+    extraData.put(key, value);
+  }
+
+  /**
    * Destination file size.
    * @return size of destination object
    */
@@ -429,4 +438,5 @@ public class SinglePendingCommit extends PersistentCommitData
   public void setLength(long length) {
     this.length = length;
   }
+
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java
index 9912173..30417ea 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.util.DurationInfo;
 
 import static org.apache.hadoop.fs.s3a.S3AUtils.*;
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.TASK_ATTEMPT_ID;
 import static org.apache.hadoop.fs.s3a.commit.CommitUtils.*;
 import static org.apache.hadoop.fs.s3a.commit.MagicCommitPaths.*;
 import static org.apache.hadoop.fs.s3a.commit.CommitUtilsWithMR.*;
@@ -213,7 +214,7 @@ public class MagicS3GuardCommitter extends AbstractS3ACommitter {
       commit.setJobId(jobId);
       commit.setTaskId(taskId);
     }
-
+    pendingSet.putExtraData(TASK_ATTEMPT_ID, taskId);
     Path jobAttemptPath = getJobAttemptPath(context);
     TaskAttemptID taskAttemptID = context.getTaskAttemptID();
     Path taskOutcomePath = new Path(jobAttemptPath,
@@ -221,7 +222,8 @@ public class MagicS3GuardCommitter extends AbstractS3ACommitter {
         CommitConstants.PENDINGSET_SUFFIX);
     LOG.info("Saving work of {} to {}", taskAttemptID, taskOutcomePath);
     try {
-      pendingSet.save(getDestFS(), taskOutcomePath, false);
+      // We will overwrite if there exists a pendingSet file already
+      pendingSet.save(getDestFS(), taskOutcomePath, true);
     } catch (IOException e) {
       LOG.warn("Failed to save task commit data to {} ",
           taskOutcomePath, e);
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java
index 91e68af..9cc932b 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java
@@ -695,6 +695,8 @@ public class StagingCommitter extends AbstractS3ACommitter {
       context.progress();
 
       PendingSet pendingCommits = new PendingSet(commitCount);
+      pendingCommits.putExtraData(TASK_ATTEMPT_ID,
+          context.getTaskAttemptID().toString());
       try {
         Tasks.foreach(taskOutput)
             .stopOnFailure()
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java
index cacd54d..03759a5 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.commit.files.SuccessData;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.MapFile;
@@ -307,14 +308,19 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
    * @param context task
    * @throws IOException IO failure
    * @throws InterruptedException write interrupted
+   * @return the path written to
    */
-  protected void writeTextOutput(TaskAttemptContext context)
+  protected Path writeTextOutput(TaskAttemptContext context)
       throws IOException, InterruptedException {
     describe("write output");
     try (DurationInfo d = new DurationInfo(LOG,
         "Writing Text output for task %s", context.getTaskAttemptID())) {
-      writeOutput(new LoggingTextOutputFormat().getRecordWriter(context),
+      LoggingTextOutputFormat.LoggingLineRecordWriter<Object, Object>
+          recordWriter = new LoggingTextOutputFormat<>().getRecordWriter(
           context);
+      writeOutput(recordWriter,
+          context);
+      return recordWriter.getDest();
     }
   }
 
@@ -480,11 +486,17 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
         "setup job %s", jContext.getJobID())) {
       committer.setupJob(jContext);
     }
+    setupCommitter(committer, tContext);
+    describe("setup complete\n");
+  }
+
+  private void setupCommitter(
+      final AbstractS3ACommitter committer,
+      final TaskAttemptContext tContext) throws IOException {
     try (DurationInfo d = new DurationInfo(LOG,
         "setup task %s", tContext.getTaskAttemptID())) {
       committer.setupTask(tContext);
     }
-    describe("setup complete\n");
   }
 
   /**
@@ -806,6 +818,74 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
     expectFNFEonTaskCommit(committer, tContext);
   }
 
+  /**
+   * HADOOP-17258. If a second task attempt is committed, it
+   * must succeed, and the output of the first TA, even if already
+   * committed, MUST NOT be visible in the final output.
+   * <p></p>
+   * What's important is not just that only one TA must succeed,
+   * but it must be the last one executed. Why? because that's
+   * the one
+   */
+  @Test
+  public void testTwoTaskAttemptsCommit() throws Exception {
+    describe("Commit two task attempts;" +
+        " expect the second attempt to succeed.");
+    JobData jobData = startJob(false);
+    JobContext jContext = jobData.jContext;
+    TaskAttemptContext tContext = jobData.tContext;
+    AbstractS3ACommitter committer = jobData.committer;
+    // do commit
+    describe("\ncommitting task");
+    // write output for TA 1,
+    Path outputTA1 = writeTextOutput(tContext);
+
+    // speculatively execute committer 2.
+
+    // jobconf with a different base to its parts.
+    Configuration conf2 = jobData.conf;
+    conf2.set("mapreduce.output.basename", "attempt2");
+    String attempt2 = "attempt_" + jobId + "_m_000000_1";
+    TaskAttemptID ta2 = TaskAttemptID.forName(attempt2);
+    TaskAttemptContext tContext2 = new TaskAttemptContextImpl(
+        conf2, ta2);
+
+    AbstractS3ACommitter committer2 = standardCommitterFactory
+        .createCommitter(tContext2);
+    setupCommitter(committer2, tContext2);
+    // write output for TA 2,
+    Path outputTA2 = writeTextOutput(tContext2);
+
+    // verify the names are different.
+    String name1 = outputTA1.getName();
+    String name2 = outputTA2.getName();
+    Assertions.assertThat(name1)
+        .describedAs("name of task attempt output %s", outputTA1)
+        .isNotEqualTo(name2);
+
+    // commit task 1
+    committer.commitTask(tContext);
+
+    // then pretend that task1 didn't respond, so
+    // commit task 2
+    committer2.commitTask(tContext2);
+
+    // and the job
+    committer2.commitJob(tContext);
+
+    // validate output
+    S3AFileSystem fs = getFileSystem();
+    SuccessData successData = validateSuccessFile(outDir, "", fs, "query", 1);
+    Assertions.assertThat(successData.getFilenames())
+        .describedAs("Files committed")
+        .hasSize(1);
+
+    assertPathExists("attempt2 output", new Path(outDir, name2));
+    assertPathDoesNotExist("attempt1 output", new Path(outDir, name1));
+
+    assertNoMultipartUploadsPending(outDir);
+  }
+
   protected boolean shouldExpectSuccessMarker() {
     return true;
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org