You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2020/10/12 12:42:32 UTC
[hadoop] branch branch-3.3 updated: HADOOP-17258. Magic S3Guard
Committer to overwrite existing pendingSet file on task commit (#2371)
This is an automated email from the ASF dual-hosted git repository.
stevel pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 5032f8a HADOOP-17258. Magic S3Guard Committer to overwrite existing pendingSet file on task commit (#2371)
5032f8a is described below
commit 5032f8abba41469e6dbd25627031726e1e3b425e
Author: Dongjoon Hyun <dh...@apple.com>
AuthorDate: Mon Oct 12 13:39:15 2020 +0100
HADOOP-17258. Magic S3Guard Committer to overwrite existing pendingSet file on task commit (#2371)
Contributed by Dongjoon Hyun and Steve Loughran
Change-Id: Ibaf8082e60eff5298ff4e6513edc386c5bae0274
---
.../hadoop/fs/s3a/commit/CommitConstants.java | 3 +
.../hadoop/fs/s3a/commit/files/PendingSet.java | 9 +++
.../fs/s3a/commit/files/SinglePendingCommit.java | 10 +++
.../fs/s3a/commit/magic/MagicS3GuardCommitter.java | 6 +-
.../fs/s3a/commit/staging/StagingCommitter.java | 2 +
.../fs/s3a/commit/AbstractITCommitProtocol.java | 86 +++++++++++++++++++++-
6 files changed, 111 insertions(+), 5 deletions(-)
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java
index 3e28a5d..e7c0492 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java
@@ -261,4 +261,7 @@ public final class CommitConstants {
*/
public static final int SUCCESS_MARKER_FILE_LIMIT = 100;
+ /** Extra Data key for task attempt in pendingset files. */
+ public static final String TASK_ATTEMPT_ID = "task.attempt.id";
+
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/PendingSet.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/PendingSet.java
index c0d7415..4793b78 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/PendingSet.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/PendingSet.java
@@ -189,4 +189,13 @@ public class PendingSet extends PersistentCommitData {
public void setCommits(List<SinglePendingCommit> commits) {
this.commits = commits;
}
+
+ /**
+ * Set/Update an extra data entry.
+ * @param key key
+ * @param value value
+ */
+ public void putExtraData(String key, String value) {
+ extraData.put(key, value);
+ }
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SinglePendingCommit.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SinglePendingCommit.java
index 596dd95..c848f80 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SinglePendingCommit.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SinglePendingCommit.java
@@ -419,6 +419,15 @@ public class SinglePendingCommit extends PersistentCommitData
}
/**
+ * Set/Update an extra data entry.
+ * @param key key
+ * @param value value
+ */
+ public void putExtraData(String key, String value) {
+ extraData.put(key, value);
+ }
+
+ /**
* Destination file size.
* @return size of destination object
*/
@@ -429,4 +438,5 @@ public class SinglePendingCommit extends PersistentCommitData
public void setLength(long length) {
this.length = length;
}
+
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java
index 9912173..30417ea 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.util.DurationInfo;
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.TASK_ATTEMPT_ID;
import static org.apache.hadoop.fs.s3a.commit.CommitUtils.*;
import static org.apache.hadoop.fs.s3a.commit.MagicCommitPaths.*;
import static org.apache.hadoop.fs.s3a.commit.CommitUtilsWithMR.*;
@@ -213,7 +214,7 @@ public class MagicS3GuardCommitter extends AbstractS3ACommitter {
commit.setJobId(jobId);
commit.setTaskId(taskId);
}
-
+ pendingSet.putExtraData(TASK_ATTEMPT_ID, taskId);
Path jobAttemptPath = getJobAttemptPath(context);
TaskAttemptID taskAttemptID = context.getTaskAttemptID();
Path taskOutcomePath = new Path(jobAttemptPath,
@@ -221,7 +222,8 @@ public class MagicS3GuardCommitter extends AbstractS3ACommitter {
CommitConstants.PENDINGSET_SUFFIX);
LOG.info("Saving work of {} to {}", taskAttemptID, taskOutcomePath);
try {
- pendingSet.save(getDestFS(), taskOutcomePath, false);
+ // We will overwrite if there exists a pendingSet file already
+ pendingSet.save(getDestFS(), taskOutcomePath, true);
} catch (IOException e) {
LOG.warn("Failed to save task commit data to {} ",
taskOutcomePath, e);
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java
index 91e68af..9cc932b 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java
@@ -695,6 +695,8 @@ public class StagingCommitter extends AbstractS3ACommitter {
context.progress();
PendingSet pendingCommits = new PendingSet(commitCount);
+ pendingCommits.putExtraData(TASK_ATTEMPT_ID,
+ context.getTaskAttemptID().toString());
try {
Tasks.foreach(taskOutput)
.stopOnFailure()
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java
index cacd54d..03759a5 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.commit.files.SuccessData;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapFile;
@@ -307,14 +308,19 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
* @param context task
* @throws IOException IO failure
* @throws InterruptedException write interrupted
+ * @return the path written to
*/
- protected void writeTextOutput(TaskAttemptContext context)
+ protected Path writeTextOutput(TaskAttemptContext context)
throws IOException, InterruptedException {
describe("write output");
try (DurationInfo d = new DurationInfo(LOG,
"Writing Text output for task %s", context.getTaskAttemptID())) {
- writeOutput(new LoggingTextOutputFormat().getRecordWriter(context),
+ LoggingTextOutputFormat.LoggingLineRecordWriter<Object, Object>
+ recordWriter = new LoggingTextOutputFormat<>().getRecordWriter(
context);
+ writeOutput(recordWriter,
+ context);
+ return recordWriter.getDest();
}
}
@@ -480,11 +486,17 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
"setup job %s", jContext.getJobID())) {
committer.setupJob(jContext);
}
+ setupCommitter(committer, tContext);
+ describe("setup complete\n");
+ }
+
+ private void setupCommitter(
+ final AbstractS3ACommitter committer,
+ final TaskAttemptContext tContext) throws IOException {
try (DurationInfo d = new DurationInfo(LOG,
"setup task %s", tContext.getTaskAttemptID())) {
committer.setupTask(tContext);
}
- describe("setup complete\n");
}
/**
@@ -806,6 +818,74 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
expectFNFEonTaskCommit(committer, tContext);
}
+ /**
+ * HADOOP-17258. If a second task attempt is committed, it
+ * must succeed, and the output of the first TA, even if already
+ * committed, MUST NOT be visible in the final output.
+ * <p></p>
+ * What's important is not just that only one TA must succeed,
+ * but it must be the last one executed. Why? because that's
+ * the one
+ */
+ @Test
+ public void testTwoTaskAttemptsCommit() throws Exception {
+ describe("Commit two task attempts;" +
+ " expect the second attempt to succeed.");
+ JobData jobData = startJob(false);
+ JobContext jContext = jobData.jContext;
+ TaskAttemptContext tContext = jobData.tContext;
+ AbstractS3ACommitter committer = jobData.committer;
+ // do commit
+ describe("\ncommitting task");
+ // write output for TA 1,
+ Path outputTA1 = writeTextOutput(tContext);
+
+ // speculatively execute committer 2.
+
+ // jobconf with a different base to its parts.
+ Configuration conf2 = jobData.conf;
+ conf2.set("mapreduce.output.basename", "attempt2");
+ String attempt2 = "attempt_" + jobId + "_m_000000_1";
+ TaskAttemptID ta2 = TaskAttemptID.forName(attempt2);
+ TaskAttemptContext tContext2 = new TaskAttemptContextImpl(
+ conf2, ta2);
+
+ AbstractS3ACommitter committer2 = standardCommitterFactory
+ .createCommitter(tContext2);
+ setupCommitter(committer2, tContext2);
+ // write output for TA 2,
+ Path outputTA2 = writeTextOutput(tContext2);
+
+ // verify the names are different.
+ String name1 = outputTA1.getName();
+ String name2 = outputTA2.getName();
+ Assertions.assertThat(name1)
+ .describedAs("name of task attempt output %s", outputTA1)
+ .isNotEqualTo(name2);
+
+ // commit task 1
+ committer.commitTask(tContext);
+
+ // then pretend that task1 didn't respond, so
+ // commit task 2
+ committer2.commitTask(tContext2);
+
+ // and the job
+ committer2.commitJob(tContext);
+
+ // validate output
+ S3AFileSystem fs = getFileSystem();
+ SuccessData successData = validateSuccessFile(outDir, "", fs, "query", 1);
+ Assertions.assertThat(successData.getFilenames())
+ .describedAs("Files committed")
+ .hasSize(1);
+
+ assertPathExists("attempt2 output", new Path(outDir, name2));
+ assertPathDoesNotExist("attempt1 output", new Path(outDir, name1));
+
+ assertNoMultipartUploadsPending(outDir);
+ }
+
protected boolean shouldExpectSuccessMarker() {
return true;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org