You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2019/10/04 17:55:01 UTC
[hadoop] branch trunk updated: HADOOP-16570. S3A committers
encounter scale issues.
This is an automated email from the ASF dual-hosted git repository.
stevel pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6574f27 HADOOP-16570. S3A committers encounter scale issues.
6574f27 is described below
commit 6574f27fa348542411bff888b184cd7ce34e5d9e
Author: Steve Loughran <st...@cloudera.com>
AuthorDate: Fri Oct 4 18:53:53 2019 +0100
HADOOP-16570. S3A committers encounter scale issues.
Contributed by Steve Loughran.
This addresses two scale issues which has surfaced in large scale benchmarks
of the S3A Committers.
* Thread pools are not cleaned up.
This now happens, with tests.
* OOM on job commit for jobs with many thousands of tasks,
each generating tens of (very large) files.
Instead of loading all pending commits into memory as a single list, the list
of files to load is the sole list which is passed around; .pendingset files are
loaded and processed in isolation -and reloaded if necessary for any
abort/rollback operation.
The parallel commit/abort/revert operations now work at the .pendingset level,
rather than that of individual pending commit files. The existing parallelized
Tasks API is still used to commit those files, but with a null thread pool, so
as to serialize the operations.
Change-Id: I5c8240cd31800eaa83d112358770ca0eb2bca797
---
.../java/org/apache/hadoop/fs/s3a/Constants.java | 6 +
.../org/apache/hadoop/fs/s3a/S3AFileSystem.java | 11 +-
.../apache/hadoop/fs/s3a/S3AInstrumentation.java | 3 +
.../hadoop/fs/s3a/commit/AbstractS3ACommitter.java | 466 ++++++++++++++++++---
.../fs/s3a/commit/AbstractS3ACommitterFactory.java | 2 +-
.../hadoop/fs/s3a/commit/CommitConstants.java | 6 +
.../hadoop/fs/s3a/commit/files/SuccessData.java | 6 +
.../fs/s3a/commit/magic/MagicS3GuardCommitter.java | 8 +-
.../commit/staging/DirectoryStagingCommitter.java | 17 +-
.../staging/PartitionedStagingCommitter.java | 101 ++++-
.../fs/s3a/commit/staging/StagingCommitter.java | 43 +-
.../org/apache/hadoop/fs/s3a/ITestS3AClosedFS.java | 18 +-
.../org/apache/hadoop/fs/s3a/S3ATestUtils.java | 42 +-
.../fs/s3a/commit/AbstractITCommitProtocol.java | 74 +++-
.../org/apache/hadoop/fs/s3a/commit/TestTasks.java | 2 +-
.../commit/integration/ITestS3ACommitterMRJob.java | 6 +-
.../fs/s3a/commit/staging/StagingTestBase.java | 66 ++-
.../staging/TestDirectoryCommitterScale.java | 314 ++++++++++++++
.../s3a/commit/staging/TestStagingCommitter.java | 29 +-
.../TestStagingDirectoryOutputCommitter.java | 10 +-
.../staging/TestStagingPartitionedJobCommit.java | 43 +-
.../staging/TestStagingPartitionedTaskCommit.java | 46 +-
22 files changed, 1123 insertions(+), 196 deletions(-)
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
index 014a494..fdbdf37 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
@@ -837,4 +837,10 @@ public final class Constants {
public static final String AWS_SERVICE_IDENTIFIER_S3 = "S3";
public static final String AWS_SERVICE_IDENTIFIER_DDB = "DDB";
public static final String AWS_SERVICE_IDENTIFIER_STS = "STS";
+
+ /**
+ * How long to wait for the thread pool to terminate when cleaning up.
+ * Value: {@value} seconds.
+ */
+ public static final int THREAD_POOL_SHUTDOWN_DELAY_SECONDS = 30;
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 9431884..26f16a7 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -154,6 +154,7 @@ import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
import static org.apache.hadoop.fs.impl.AbstractFSBuilderImpl.rejectUnknownMandatoryKeys;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
@@ -3062,6 +3063,12 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
transfers.shutdownNow(true);
transfers = null;
}
+ HadoopExecutors.shutdown(boundedThreadPool, LOG,
+ THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
+ boundedThreadPool = null;
+ HadoopExecutors.shutdown(unboundedThreadPool, LOG,
+ THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
+ unboundedThreadPool = null;
S3AUtils.closeAll(LOG, metadataStore, instrumentation);
metadataStore = null;
instrumentation = null;
@@ -4064,7 +4071,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
*/
@Retries.OnceRaw
void abortMultipartUpload(String destKey, String uploadId) {
- LOG.info("Aborting multipart upload {} to {}", uploadId, destKey);
+ LOG.debug("Aborting multipart upload {} to {}", uploadId, destKey);
getAmazonS3Client().abortMultipartUpload(
new AbortMultipartUploadRequest(getBucket(),
destKey,
@@ -4084,7 +4091,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
uploadId = upload.getUploadId();
if (LOG.isInfoEnabled()) {
DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- LOG.info("Aborting multipart upload {} to {} initiated by {} on {}",
+ LOG.debug("Aborting multipart upload {} to {} initiated by {} on {}",
uploadId, destKey, upload.getInitiator(),
df.format(upload.getInitiated()));
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
index e9ed972..15f7390 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
@@ -611,11 +611,14 @@ public class S3AInstrumentation implements Closeable, MetricsSource {
public void close() {
synchronized (metricsSystemLock) {
+ // it is critical to close each quantile, as they start a scheduled
+ // task in a shared thread pool.
putLatencyQuantile.stop();
throttleRateQuantile.stop();
metricsSystem.unregisterSource(metricsSourceName);
int activeSources = --metricsSourceActiveCounter;
if (activeSources == 0) {
+ LOG.debug("Shutting down metrics publisher");
metricsSystem.publishMetricsNow();
metricsSystem.shutdown();
metricsSystem = null;
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java
index a49ab52..e82fbda 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java
@@ -18,18 +18,18 @@
package org.apache.hadoop.fs.s3a.commit;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import com.amazonaws.services.s3.model.MultipartUpload;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,7 +49,9 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.DurationInfo;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import static org.apache.hadoop.fs.s3a.Constants.THREAD_POOL_SHUTDOWN_DELAY_SECONDS;
import static org.apache.hadoop.fs.s3a.Invoker.ignoreIOExceptions;
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
@@ -66,11 +68,28 @@ import static org.apache.hadoop.fs.s3a.commit.CommitUtilsWithMR.*;
* to handle the creation of a committer when the destination is unknown.
*
* Requiring an output directory simplifies coding and testing.
+ *
+ * The original implementation loaded all .pendingset files
+ * before attempting any commit/abort operations.
+ * While straightforward and guaranteeing that no changes were made to the
+ * destination until all files had successfully been loaded -it didn't scale;
+ * the list grew until it exceeded heap size.
+ *
+ * The second iteration builds up an {@link ActiveCommit} class with the
+ * list of .pendingset files to load and then commit; that can be done
+ * incrementally and in parallel.
+ * As a side effect of this change, unless/until changed,
+ * the commit/abort/revert of all files uploaded by a single task will be
+ * serialized. This may slow down these operations if there are many files
+ * created by a few tasks, <i>and</i> the HTTP connection pool in the S3A
+ * committer was large enough for more all the parallel POST requests.
*/
public abstract class AbstractS3ACommitter extends PathOutputCommitter {
private static final Logger LOG =
LoggerFactory.getLogger(AbstractS3ACommitter.class);
+ public static final String THREAD_PREFIX = "s3a-committer-pool-";
+
/**
* Thread pool for task execution.
*/
@@ -349,16 +368,11 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
* @throws IOException IO failure
*/
protected void maybeCreateSuccessMarkerFromCommits(JobContext context,
- List<SinglePendingCommit> pending) throws IOException {
+ ActiveCommit pending) throws IOException {
List<String> filenames = new ArrayList<>(pending.size());
- for (SinglePendingCommit commit : pending) {
- String key = commit.getDestinationKey();
- if (!key.startsWith("/")) {
- // fix up so that FS.makeQualified() sets up the path OK
- key = "/" + key;
- }
- filenames.add(key);
- }
+ // The list of committed objects in pending is size limited in
+ // ActiveCommit.uploadCommitted.
+ filenames.addAll(pending.committedObjects);
maybeCreateSuccessMarker(context, filenames);
}
@@ -390,22 +404,25 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
}
/**
- * Base job setup deletes the success marker.
- * TODO: Do we need this?
+ * Base job setup (optionally) deletes the success marker and
+ * always creates the destination directory.
+ * When objects are committed that dest dir marker will inevitably
+ * be deleted; creating it now ensures there is something at the end
+ * while the job is in progress -and if nothing is created, that
+ * it is still there.
* @param context context
* @throws IOException IO failure
*/
-/*
@Override
public void setupJob(JobContext context) throws IOException {
- if (createJobMarker) {
- try (DurationInfo d = new DurationInfo("Deleting _SUCCESS marker")) {
+ try (DurationInfo d = new DurationInfo(LOG, "preparing destination")) {
+ if (createJobMarker){
commitOperations.deleteSuccessMarker(getOutputPath());
}
+ getDestFS().mkdirs(getOutputPath());
}
}
-*/
@Override
public void setupTask(TaskAttemptContext context) throws IOException {
@@ -430,28 +447,152 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
}
/**
- * Commit a list of pending uploads.
+ * Commit all the pending uploads.
+ * Each file listed in the ActiveCommit instance is queued for processing
+ * in a separate thread; its contents are loaded and then (sequentially)
+ * committed.
+ * On a failure or abort of a single file's commit, all its uploads are
+ * aborted.
+ * The revert operation lists the files already committed and deletes them.
* @param context job context
- * @param pending list of pending uploads
+ * @param pending pending uploads
* @throws IOException on any failure
*/
- protected void commitPendingUploads(JobContext context,
- List<SinglePendingCommit> pending) throws IOException {
+ protected void commitPendingUploads(
+ final JobContext context,
+ final ActiveCommit pending) throws IOException {
if (pending.isEmpty()) {
LOG.warn("{}: No pending uploads to commit", getRole());
}
- LOG.debug("{}: committing the output of {} task(s)",
- getRole(), pending.size());
- try(CommitOperations.CommitContext commitContext
+ try (DurationInfo ignored = new DurationInfo(LOG,
+ "committing the output of %s task(s)", pending.size());
+ CommitOperations.CommitContext commitContext
= initiateCommitOperation()) {
- Tasks.foreach(pending)
+
+ Tasks.foreach(pending.getSourceFiles())
.stopOnFailure()
+ .suppressExceptions(false)
.executeWith(buildThreadPool(context))
+ .abortWith(path ->
+ loadAndAbort(commitContext, pending, path, true, false))
+ .revertWith(path ->
+ loadAndRevert(commitContext, pending, path))
+ .run(path ->
+ loadAndCommit(commitContext, pending, path));
+ }
+ }
+
+ /**
+ * Run a precommit check that all files are loadable.
+ * This check avoids the situation where the inability to read
+ * a file only surfaces partway through the job commit, so
+ * results in the destination being tainted.
+ * @param context job context
+ * @param pending the pending operations
+ * @throws IOException any failure
+ */
+ protected void precommitCheckPendingFiles(
+ final JobContext context,
+ final ActiveCommit pending) throws IOException {
+
+ FileSystem sourceFS = pending.getSourceFS();
+ try (DurationInfo ignored =
+ new DurationInfo(LOG, "Preflight Load of pending files")) {
+
+ Tasks.foreach(pending.getSourceFiles())
+ .stopOnFailure()
+ .suppressExceptions(false)
+ .executeWith(buildThreadPool(context))
+ .run(path -> PendingSet.load(sourceFS, path));
+ }
+ }
+
+ /**
+ * Load a pendingset file and commit all of its contents.
+ * @param commitContext context to commit through
+ * @param activeCommit commit state
+ * @param path path to load
+ * @throws IOException failure
+ */
+ private void loadAndCommit(
+ final CommitOperations.CommitContext commitContext,
+ final ActiveCommit activeCommit,
+ final Path path) throws IOException {
+
+ try (DurationInfo ignored =
+ new DurationInfo(LOG, false, "Committing %s", path)) {
+ PendingSet pendingSet = PendingSet.load(activeCommit.getSourceFS(), path);
+ Tasks.foreach(pendingSet.getCommits())
+ .stopOnFailure()
+ .suppressExceptions(false)
+ .executeWith(singleCommitThreadPool())
.onFailure((commit, exception) ->
commitContext.abortSingleCommit(commit))
.abortWith(commitContext::abortSingleCommit)
.revertWith(commitContext::revertCommit)
- .run(commitContext::commitOrFail);
+ .run(commit -> {
+ commitContext.commitOrFail(commit);
+ activeCommit.uploadCommitted(
+ commit.getDestinationKey(), commit.getLength());
+ });
+ }
+ }
+
+ /**
+ * Load a pendingset file and revert all of its contents.
+ * @param commitContext context to commit through
+ * @param activeCommit commit state
+ * @param path path to load
+ * @throws IOException failure
+ */
+ private void loadAndRevert(
+ final CommitOperations.CommitContext commitContext,
+ final ActiveCommit activeCommit,
+ final Path path) throws IOException {
+
+ try (DurationInfo ignored =
+ new DurationInfo(LOG, false, "Committing %s", path)) {
+ PendingSet pendingSet = PendingSet.load(activeCommit.getSourceFS(), path);
+ Tasks.foreach(pendingSet.getCommits())
+ .suppressExceptions(true)
+ .run(commitContext::revertCommit);
+ }
+ }
+
+ /**
+ * Load a pendingset file and abort all of its contents.
+ * @param commitContext context to commit through
+ * @param activeCommit commit state
+ * @param path path to load
+ * @param deleteRemoteFiles should remote files be deleted?
+ * @throws IOException failure
+ */
+ private void loadAndAbort(
+ final CommitOperations.CommitContext commitContext,
+ final ActiveCommit activeCommit,
+ final Path path,
+ final boolean suppressExceptions,
+ final boolean deleteRemoteFiles) throws IOException {
+
+ try (DurationInfo ignored =
+ new DurationInfo(LOG, false, "Aborting %s", path)) {
+ PendingSet pendingSet = PendingSet.load(activeCommit.getSourceFS(),
+ path);
+ FileSystem fs = getDestFS();
+ Tasks.foreach(pendingSet.getCommits())
+ .executeWith(singleCommitThreadPool())
+ .suppressExceptions(suppressExceptions)
+ .run(commit -> {
+ try {
+ commitContext.abortSingleCommit(commit);
+ } catch (FileNotFoundException e) {
+ // Commit ID was not known; file may exist.
+ // delete it if instructed to do so.
+ if (deleteRemoteFiles) {
+ fs.delete(commit.destinationPath(), false);
+ }
+ }
+ });
}
}
@@ -466,43 +607,14 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
}
/**
- * Try to read every pendingset file and build a list of them/
- * In the case of a failure to read the file, exceptions are held until all
- * reads have been attempted.
- * @param context job context
- * @param suppressExceptions whether to suppress exceptions.
- * @param fs job attempt fs
- * @param pendingCommitFiles list of files found in the listing scan
- * @return the list of commits
- * @throws IOException on a failure when suppressExceptions is false.
- */
- protected List<SinglePendingCommit> loadPendingsetFiles(
- JobContext context,
- boolean suppressExceptions,
- FileSystem fs,
- Iterable<? extends FileStatus> pendingCommitFiles) throws IOException {
-
- final List<SinglePendingCommit> pending = Collections.synchronizedList(
- Lists.newArrayList());
- Tasks.foreach(pendingCommitFiles)
- .suppressExceptions(suppressExceptions)
- .executeWith(buildThreadPool(context))
- .run(pendingCommitFile ->
- pending.addAll(
- PendingSet.load(fs, pendingCommitFile.getPath()).getCommits())
- );
- return pending;
- }
-
- /**
* Internal Job commit operation: where the S3 requests are made
* (potentially in parallel).
* @param context job context
- * @param pending pending request
+ * @param pending pending commits
* @throws IOException any failure
*/
protected void commitJobInternal(JobContext context,
- List<SinglePendingCommit> pending)
+ ActiveCommit pending)
throws IOException {
commitPendingUploads(context, pending);
@@ -523,6 +635,9 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
* This must clean up operations; it is called when a commit fails, as
* well as in an {@link #abortJob(JobContext, JobStatus.State)} call.
* The base implementation calls {@link #cleanup(JobContext, boolean)}
+ * so cleans up the filesystems and destroys the thread pool.
+ * Subclasses must always invoke this superclass method after their
+ * own operations.
* @param context job context
* @param suppressExceptions should exceptions be suppressed?
* @throws IOException any IO problem raised when suppressExceptions is false.
@@ -536,13 +651,15 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
/**
* Abort all pending uploads to the destination directory during
* job cleanup operations.
+ * Note: this instantiates the thread pool if required -so
+ * {@link #destroyThreadPool()} must be called after this.
* @param suppressExceptions should exceptions be suppressed
* @throws IOException IO problem
*/
protected void abortPendingUploadsInCleanup(
boolean suppressExceptions) throws IOException {
Path dest = getOutputPath();
- try (DurationInfo d =
+ try (DurationInfo ignored =
new DurationInfo(LOG, "Aborting all pending commits under %s",
dest);
CommitOperations.CommitContext commitContext
@@ -565,13 +682,18 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
}
/**
- * Subclass-specific pre commit actions.
+ * Subclass-specific pre-Job-commit actions.
+ * The staging committers all load the pending files to verify that
+ * they can be loaded.
+ * The Magic committer does not, because of the overhead of reading files
+ * from S3 makes it too expensive.
* @param context job context
* @param pending the pending operations
* @throws IOException any failure
*/
- protected void preCommitJob(JobContext context,
- List<SinglePendingCommit> pending) throws IOException {
+ @VisibleForTesting
+ public void preCommitJob(JobContext context,
+ ActiveCommit pending) throws IOException {
}
/**
@@ -584,7 +706,7 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
* <p>
* Commit internal: do the final commit sequence.
* <p>
- * The final commit action is to build the {@code __SUCCESS} file entry.
+ * The final commit action is to build the {@code _SUCCESS} file entry.
* </p>
* @param context job context
* @throws IOException any failure
@@ -594,7 +716,7 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
String id = jobIdString(context);
try (DurationInfo d = new DurationInfo(LOG,
"%s: commitJob(%s)", getRole(), id)) {
- List<SinglePendingCommit> pending
+ ActiveCommit pending
= listPendingUploadsToCommit(context);
preCommitJob(context, pending);
commitJobInternal(context, pending);
@@ -629,12 +751,13 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
* @return a list of pending uploads.
* @throws IOException Any IO failure
*/
- protected abstract List<SinglePendingCommit> listPendingUploadsToCommit(
+ protected abstract ActiveCommit listPendingUploadsToCommit(
JobContext context)
throws IOException;
/**
- * Cleanup the job context, including aborting anything pending.
+ * Cleanup the job context, including aborting anything pending
+ * and destroying the thread pool.
* @param context job context
* @param suppressExceptions should exceptions be suppressed?
* @throws IOException any failure if exceptions were not suppressed.
@@ -645,6 +768,7 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
"Cleanup job %s", jobIdString(context))) {
abortPendingUploadsInCleanup(suppressExceptions);
} finally {
+ destroyThreadPool();
cleanupStagingDirs();
}
}
@@ -715,7 +839,7 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
/**
* Returns an {@link ExecutorService} for parallel tasks. The number of
- * threads in the thread-pool is set by s3.multipart.committer.num-threads.
+ * threads in the thread-pool is set by fs.s3a.committer.threads.
* If num-threads is 0, this will return null;
*
* @param context the JobContext for this commit
@@ -730,10 +854,10 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
DEFAULT_COMMITTER_THREADS);
LOG.debug("{}: creating thread pool of size {}", getRole(), numThreads);
if (numThreads > 0) {
- threadPool = Executors.newFixedThreadPool(numThreads,
+ threadPool = HadoopExecutors.newFixedThreadPool(numThreads,
new ThreadFactoryBuilder()
.setDaemon(true)
- .setNameFormat("s3-committer-pool-%d")
+ .setNameFormat(THREAD_PREFIX + context.getJobID() + "-%d")
.build());
} else {
return null;
@@ -743,6 +867,40 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
}
/**
+ * Destroy any thread pools; wait for that to finish,
+ * but don't overreact if it doesn't finish in time.
+ */
+ protected synchronized void destroyThreadPool() {
+ if (threadPool != null) {
+ LOG.debug("Destroying thread pool");
+ HadoopExecutors.shutdown(threadPool, LOG,
+ THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
+ threadPool = null;
+ }
+ }
+
+ /**
+ * Get the thread pool for executing the single file commit/revert
+ * within the commit of all uploads of a single task.
+ * This is currently null; it is here to allow the Tasks class to
+ * provide the logic for execute/revert.
+ * Why not use the existing thread pool? Too much fear of deadlocking,
+ * and tasks are being committed in parallel anyway.
+ * @return null. always.
+ */
+ protected final synchronized ExecutorService singleCommitThreadPool() {
+ return null;
+ }
+
+ /**
+ * Does this committer have a thread pool?
+ * @return true if a thread pool exists.
+ */
+ public synchronized boolean hasThreadPool() {
+ return threadPool != null;
+ }
+
+ /**
* Delete the task attempt path without raising any errors.
* @param context task context
*/
@@ -755,6 +913,8 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
/**
* Abort all pending uploads in the list.
+ * This operation is used by the magic committer as part of its
+ * rollback after a failure during task commit.
* @param context job context
* @param pending pending uploads
* @param suppressExceptions should exceptions be suppressed
@@ -779,4 +939,172 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter {
}
}
+ /**
+ * Abort all pending uploads in the list.
+ * @param context job context
+ * @param pending pending uploads
+ * @param suppressExceptions should exceptions be suppressed?
+ * @param deleteRemoteFiles should remote files be deleted?
+ * @throws IOException any exception raised
+ */
+ protected void abortPendingUploads(
+ final JobContext context,
+ final ActiveCommit pending,
+ final boolean suppressExceptions,
+ final boolean deleteRemoteFiles) throws IOException {
+
+ if (pending.isEmpty()) {
+ LOG.info("{}: no pending commits to abort", getRole());
+ } else {
+ try (DurationInfo d = new DurationInfo(LOG,
+ "Aborting %s uploads", pending.size());
+ CommitOperations.CommitContext commitContext
+ = initiateCommitOperation()) {
+ Tasks.foreach(pending.getSourceFiles())
+ .executeWith(buildThreadPool(context))
+ .suppressExceptions(suppressExceptions)
+ .run(path ->
+ loadAndAbort(commitContext,
+ pending,
+ path,
+ suppressExceptions,
+ deleteRemoteFiles));
+ }
+ }
+ }
+
+ /**
+ * State of the active commit operation.
+ *
+ * It contains a list of all pendingset files to load as the source
+ * of outstanding commits to complete/abort,
+ * and tracks the files uploaded.
+ *
+ * To avoid running out of heap by loading all the source files
+ * simultaneously:
+ * <ol>
+ * <li>
+ * The list of files to load is passed round but
+ * the contents are only loaded on demand.
+ * </li>
+ * <li>
+ * The number of written files tracked for logging in
+ * the _SUCCESS file are limited to a small amount -enough
+ * for testing only.
+ * </li>
+ * </ol>
+ */
+ public static class ActiveCommit {
+
+ private static final AbstractS3ACommitter.ActiveCommit EMPTY
+ = new ActiveCommit(null, new ArrayList<>());
+
+ /** All pendingset files to iterate through. */
+ private final List<Path> sourceFiles;
+
+ /**
+ * Filesystem for the source files.
+ */
+ private final FileSystem sourceFS;
+
+ /**
+ * List of committed objects; only built up until the commit limit is
+ * reached.
+ */
+ private final List<String> committedObjects = new ArrayList<>();
+
+ /**
+ * The total number of committed objects.
+ */
+ private int committedObjectCount;
+
+ /**
+ * Total number of bytes committed.
+ */
+ private long committedBytes;
+
+ /**
+ * Construct from a source FS and list of files.
+ * @param sourceFS filesystem containing the list of pending files
+ * @param sourceFiles .pendingset files to load and commit.
+ */
+ public ActiveCommit(
+ final FileSystem sourceFS,
+ final List<Path> sourceFiles) {
+ this.sourceFiles = sourceFiles;
+ this.sourceFS = sourceFS;
+ }
+
+ /**
+ * Create an active commit of the given pending files.
+ * @param pendingFS source filesystem.
+ * @param statuses list of file status or subclass to use.
+ * @return the commit
+ */
+ public static ActiveCommit fromStatusList(
+ final FileSystem pendingFS,
+ final List<? extends FileStatus> statuses) {
+ return new ActiveCommit(pendingFS,
+ statuses.stream()
+ .map(FileStatus::getPath)
+ .collect(Collectors.toList()));
+ }
+
+ /**
+ * Get the empty entry.
+ * @return an active commit with no pending files.
+ */
+ public static ActiveCommit empty() {
+ return EMPTY;
+ }
+
+ public List<Path> getSourceFiles() {
+ return sourceFiles;
+ }
+
+ public FileSystem getSourceFS() {
+ return sourceFS;
+ }
+
+ /**
+ * Note that a file was committed.
+ * Increase the counter of files and total size.
+ * If there is room in the committedFiles list, the file
+ * will be added to the list and so end up in the _SUCCESS file.
+ * @param key key of the committed object.
+ * @param size size in bytes.
+ */
+ public synchronized void uploadCommitted(String key, long size) {
+ if (committedObjects.size() < SUCCESS_MARKER_FILE_LIMIT) {
+ committedObjects.add(
+ key.startsWith("/") ? key : ("/" + key));
+ }
+ committedObjectCount++;
+ committedBytes += size;
+ }
+
+ public synchronized List<String> getCommittedObjects() {
+ return committedObjects;
+ }
+
+ public synchronized int getCommittedFileCount() {
+ return committedObjectCount;
+ }
+
+ public synchronized long getCommittedBytes() {
+ return committedBytes;
+ }
+
+ public int size() {
+ return sourceFiles.size();
+ }
+
+ public boolean isEmpty() {
+ return sourceFiles.isEmpty();
+ }
+
+ public void add(Path path) {
+ sourceFiles.add(path);
+ }
+ }
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitterFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitterFactory.java
index b3bcca1..6e7a99f 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitterFactory.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitterFactory.java
@@ -51,7 +51,7 @@ public abstract class AbstractS3ACommitterFactory
throw new PathCommitException(outputPath,
"Filesystem not supported by this committer");
}
- LOG.info("Using Commmitter {} for {}",
+ LOG.info("Using Committer {} for {}",
outputCommitter,
outputPath);
return outputCommitter;
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java
index c9b0337..3e28a5d 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java
@@ -255,4 +255,10 @@ public final class CommitConstants {
public static final String FS_S3A_COMMITTER_STAGING_ABORT_PENDING_UPLOADS =
"fs.s3a.committer.staging.abort.pending.uploads";
+ /**
+ * The limit to the number of committed objects tracked during
+ * job commits and saved to the _SUCCESS file.
+ */
+ public static final int SUCCESS_MARKER_FILE_LIMIT = 100;
+
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SuccessData.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SuccessData.java
index cf84cb3..e0273fa 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SuccessData.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SuccessData.java
@@ -52,6 +52,12 @@ import org.apache.hadoop.util.JsonSerialization;
* Applications reading this data should use/check the {@link #name} field
* to differentiate from any other JSON-based manifest and to identify
* changes in the output format.
+ *
+ * Note: to deal with scale issues, the S3A committers do not include any
+ * more than the number of objects listed in
+ * {@link org.apache.hadoop.fs.s3a.commit.CommitConstants#SUCCESS_MARKER_FILE_LIMIT}.
+ * This is intended to suffice for basic integration tests.
+ * Larger tests should examine the generated files themselves.
*/
@SuppressWarnings("unused")
@InterfaceAudience.Private
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java
index 969286e..9912173 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java
@@ -109,11 +109,11 @@ public class MagicS3GuardCommitter extends AbstractS3ACommitter {
* @return a list of pending commits.
* @throws IOException Any IO failure
*/
- protected List<SinglePendingCommit> listPendingUploadsToCommit(
+ protected ActiveCommit listPendingUploadsToCommit(
JobContext context)
throws IOException {
FileSystem fs = getDestFS();
- return loadPendingsetFiles(context, false, fs,
+ return ActiveCommit.fromStatusList(fs,
listAndFilter(fs, getJobAttemptPath(context), false,
CommitOperations.PENDINGSET_FILTER));
}
@@ -174,6 +174,7 @@ public class MagicS3GuardCommitter extends AbstractS3ACommitter {
} finally {
// delete the task attempt so there's no possibility of a second attempt
deleteTaskAttemptPathQuietly(context);
+ destroyThreadPool();
}
getCommitOperations().taskCompleted(true);
}
@@ -181,7 +182,7 @@ public class MagicS3GuardCommitter extends AbstractS3ACommitter {
/**
* Inner routine for committing a task.
* The list of pending commits is loaded and then saved to the job attempt
- * dir.
+ * dir in a single pendingset file.
* Failure to load any file or save the final file triggers an abort of
* all known pending commits.
* @param context context
@@ -250,6 +251,7 @@ public class MagicS3GuardCommitter extends AbstractS3ACommitter {
deleteQuietly(
attemptPath.getFileSystem(context.getConfiguration()),
attemptPath, true);
+ destroyThreadPool();
}
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/DirectoryStagingCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/DirectoryStagingCommitter.java
index 32642c9..1a5a63c 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/DirectoryStagingCommitter.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/DirectoryStagingCommitter.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.fs.s3a.commit.staging;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,7 +30,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathExistsException;
import org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants;
-import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -66,7 +64,6 @@ public class DirectoryStagingCommitter extends StagingCommitter {
@Override
public void setupJob(JobContext context) throws IOException {
- super.setupJob(context);
Path outputPath = getOutputPath();
FileSystem fs = getDestFS();
ConflictResolution conflictResolution = getConflictResolutionMode(
@@ -91,10 +88,10 @@ public class DirectoryStagingCommitter extends StagingCommitter {
}
} catch (FileNotFoundException ignored) {
// there is no destination path, hence, no conflict.
- // make the parent directory, which also triggers a recursive directory
- // creation operation
- fs.mkdirs(outputPath);
}
+ // make the parent directory, which also triggers a recursive directory
+ // creation operation
+ super.setupJob(context);
}
/**
@@ -106,8 +103,12 @@ public class DirectoryStagingCommitter extends StagingCommitter {
* @throws IOException any failure
*/
@Override
- protected void preCommitJob(JobContext context,
- List<SinglePendingCommit> pending) throws IOException {
+ public void preCommitJob(
+ final JobContext context,
+ final ActiveCommit pending) throws IOException {
+
+ // see if the files can be loaded.
+ super.preCommitJob(context, pending);
Path outputPath = getOutputPath();
FileSystem fs = getDestFS();
Configuration fsConf = fs.getConf();
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java
index b51bcb5..20aca3c 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java
@@ -20,10 +20,11 @@ package org.apache.hadoop.fs.s3a.commit.staging;
import java.io.IOException;
import java.util.List;
+import java.util.Map;
import java.util.Set;
-import java.util.stream.Collectors;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
-import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,11 +33,14 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.commit.PathCommitException;
+import org.apache.hadoop.fs.s3a.commit.Tasks;
+import org.apache.hadoop.fs.s3a.commit.files.PendingSet;
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.DurationInfo;
-import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.COMMITTER_NAME_PARTITIONED;
/**
* Partitioned committer.
@@ -52,6 +56,9 @@ import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
* <li>REPLACE: delete the destination partition in the job commit
* (i.e. after and only if all tasks have succeeded.</li>
* </ul>
+ * To determine the paths, the precommit process actually has to read
+ * in all source files, independently of the final commit phase.
+ * This is inefficient, though some parallelization here helps.
*/
public class PartitionedStagingCommitter extends StagingCommitter {
@@ -107,6 +114,7 @@ public class PartitionedStagingCommitter extends StagingCommitter {
}
/**
+ * All
* Job-side conflict resolution.
* The partition path conflict resolution actions are:
* <ol>
@@ -119,13 +127,15 @@ public class PartitionedStagingCommitter extends StagingCommitter {
* @throws IOException any failure
*/
@Override
- protected void preCommitJob(JobContext context,
- List<SinglePendingCommit> pending) throws IOException {
+ public void preCommitJob(
+ final JobContext context,
+ final ActiveCommit pending) throws IOException {
FileSystem fs = getDestFS();
// enforce conflict resolution
Configuration fsConf = fs.getConf();
+ boolean shouldPrecheckPendingFiles = true;
switch (getConflictResolutionMode(context, fsConf)) {
case FAIL:
// FAIL checking is done on the task side, so this does nothing
@@ -134,21 +144,84 @@ public class PartitionedStagingCommitter extends StagingCommitter {
// no check is needed because the output may exist for appending
break;
case REPLACE:
- Set<Path> partitions = pending.stream()
- .map(SinglePendingCommit::destinationPath)
- .map(Path::getParent)
- .collect(Collectors.toCollection(Sets::newLinkedHashSet));
- for (Path partitionPath : partitions) {
- LOG.debug("{}: removing partition path to be replaced: " +
- getRole(), partitionPath);
- fs.delete(partitionPath, true);
- }
+ // identify and replace the destination partitions
+ replacePartitions(context, pending);
+ // and so there is no need to do another check.
+ shouldPrecheckPendingFiles = false;
break;
default:
throw new PathCommitException("",
getRole() + ": unknown conflict resolution mode: "
+ getConflictResolutionMode(context, fsConf));
}
+ if (shouldPrecheckPendingFiles) {
+ precommitCheckPendingFiles(context, pending);
+ }
+ }
+
+ /**
+ * Identify all partitions which need to be replaced and then delete them.
+ * The original implementation relied on all the pending commits to be
+ * loaded so could simply enumerate them.
+ * This iteration does not do that; it has to reload all the files
+ * to build the set, after which it initiates the delete process.
+ * This is done in parallel.
+ * <pre>
+ * Set<Path> partitions = pending.stream()
+ * .map(Path::getParent)
+ * .collect(Collectors.toCollection(Sets::newLinkedHashSet));
+ * for (Path partitionPath : partitions) {
+ * LOG.debug("{}: removing partition path to be replaced: " +
+ * getRole(), partitionPath);
+ * fs.delete(partitionPath, true);
+ * }
+ * </pre>
+ *
+ * @param context job context
+ * @param pending the pending operations
+ * @throws IOException any failure
+ */
+ private void replacePartitions(
+ final JobContext context,
+ final ActiveCommit pending) throws IOException {
+
+ Map<Path, String> partitions = new ConcurrentHashMap<>();
+ FileSystem sourceFS = pending.getSourceFS();
+ ExecutorService pool = buildThreadPool(context);
+ try (DurationInfo ignored =
+ new DurationInfo(LOG, "Replacing partitions")) {
+
+ // the parent directories are saved to a concurrent hash map.
+ // for a marginal optimisation, the previous parent is tracked, so
+ // if a task writes many files to the same dir, the synchronized map
+ // is updated only once.
+ Tasks.foreach(pending.getSourceFiles())
+ .stopOnFailure()
+ .suppressExceptions(false)
+ .executeWith(pool)
+ .run(path -> {
+ PendingSet pendingSet = PendingSet.load(sourceFS, path);
+ Path lastParent = null;
+ for (SinglePendingCommit commit : pendingSet.getCommits()) {
+ Path parent = commit.destinationPath().getParent();
+ if (parent != null && !parent.equals(lastParent)) {
+ partitions.put(parent, "");
+ lastParent = parent;
+ }
+ }
+ });
+ }
+ // now do the deletes
+ FileSystem fs = getDestFS();
+ Tasks.foreach(partitions.keySet())
+ .stopOnFailure()
+ .suppressExceptions(false)
+ .executeWith(pool)
+ .run(partitionPath -> {
+ LOG.debug("{}: removing partition path to be replaced: " +
+ getRole(), partitionPath);
+ fs.delete(partitionPath, true);
+ });
}
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java
index 833edd4..6cc9e48 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.fs.s3a.commit.staging;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Queue;
@@ -457,6 +456,7 @@ public class StagingCommitter extends AbstractS3ACommitter {
context.getConfiguration().set(
InternalCommitterConstants.FS_S3A_COMMITTER_STAGING_UUID, uuid);
wrappedCommitter.setupJob(context);
+ super.setupJob(context);
}
/**
@@ -466,7 +466,7 @@ public class StagingCommitter extends AbstractS3ACommitter {
* @throws IOException Any IO failure
*/
@Override
- protected List<SinglePendingCommit> listPendingUploadsToCommit(
+ protected ActiveCommit listPendingUploadsToCommit(
JobContext context)
throws IOException {
return listPendingUploads(context, false);
@@ -480,7 +480,7 @@ public class StagingCommitter extends AbstractS3ACommitter {
* then this may not match the actual set of pending operations
* @throws IOException shouldn't be raised, but retained for the compiler
*/
- protected List<SinglePendingCommit> listPendingUploadsToAbort(
+ protected ActiveCommit listPendingUploadsToAbort(
JobContext context) throws IOException {
return listPendingUploads(context, true);
}
@@ -493,13 +493,14 @@ public class StagingCommitter extends AbstractS3ACommitter {
* then this may not match the actual set of pending operations
* @throws IOException Any IO failure which wasn't swallowed.
*/
- protected List<SinglePendingCommit> listPendingUploads(
+ protected ActiveCommit listPendingUploads(
JobContext context, boolean suppressExceptions) throws IOException {
- try {
- Path wrappedJobAttemptPath = wrappedCommitter.getJobAttemptPath(context);
+ try (DurationInfo ignored = new DurationInfo(LOG,
+ "Listing pending uploads")) {
+ Path wrappedJobAttemptPath = getJobAttemptPath(context);
final FileSystem attemptFS = wrappedJobAttemptPath.getFileSystem(
context.getConfiguration());
- return loadPendingsetFiles(context, suppressExceptions, attemptFS,
+ return ActiveCommit.fromStatusList(attemptFS,
listAndFilter(attemptFS,
wrappedJobAttemptPath, false,
HIDDEN_FILE_FILTER));
@@ -512,7 +513,7 @@ public class StagingCommitter extends AbstractS3ACommitter {
maybeIgnore(suppressExceptions, "Listing pending uploads", e);
}
// reached iff an IOE was caught and swallowed
- return new ArrayList<>(0);
+ return ActiveCommit.empty();
}
@Override
@@ -558,8 +559,8 @@ public class StagingCommitter extends AbstractS3ACommitter {
boolean failed = false;
try (DurationInfo d = new DurationInfo(LOG,
"%s: aborting job in state %s ", r, jobIdString(context))) {
- List<SinglePendingCommit> pending = listPendingUploadsToAbort(context);
- abortPendingUploads(context, pending, suppressExceptions);
+ ActiveCommit pending = listPendingUploadsToAbort(context);
+ abortPendingUploads(context, pending, suppressExceptions, true);
} catch (FileNotFoundException e) {
// nothing to list
LOG.debug("No job directory to read uploads from");
@@ -571,6 +572,7 @@ public class StagingCommitter extends AbstractS3ACommitter {
}
}
+
/**
* Delete the working paths of a job.
* <ol>
@@ -646,6 +648,8 @@ public class StagingCommitter extends AbstractS3ACommitter {
getRole(), context.getTaskAttemptID(), e);
getCommitOperations().taskCompleted(false);
throw e;
+ } finally {
+ destroyThreadPool();
}
getCommitOperations().taskCompleted(true);
}
@@ -694,6 +698,7 @@ public class StagingCommitter extends AbstractS3ACommitter {
try {
Tasks.foreach(taskOutput)
.stopOnFailure()
+ .suppressExceptions(false)
.executeWith(buildThreadPool(context))
.run(stat -> {
Path path = stat.getPath();
@@ -779,6 +784,8 @@ public class StagingCommitter extends AbstractS3ACommitter {
LOG.error("{}: exception when aborting task {}",
getRole(), context.getTaskAttemptID(), e);
throw e;
+ } finally {
+ destroyThreadPool();
}
}
@@ -901,4 +908,20 @@ public class StagingCommitter extends AbstractS3ACommitter {
defVal).toUpperCase(Locale.ENGLISH);
}
+ /**
+ * Pre-commit actions for a job.
+ * Loads all the pending files to verify they can be loaded
+ * and parsed.
+ * @param context job context
+ * @param pending pending commits
+ * @throws IOException any failure
+ */
+ @Override
+ public void preCommitJob(
+ final JobContext context,
+ final ActiveCommit pending) throws IOException {
+
+ // see if the files can be loaded.
+ precommitCheckPendingFiles(context, pending);
+ }
}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClosedFS.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClosedFS.java
index 6e81452..79772ec 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClosedFS.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClosedFS.java
@@ -19,17 +19,21 @@
package org.apache.hadoop.fs.s3a;
import java.io.IOException;
+import java.util.Set;
+import org.assertj.core.api.Assertions;
+import org.junit.AfterClass;
import org.junit.Test;
import org.apache.hadoop.fs.Path;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.getCurrentThreadNames;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.listInitialThreadsForLifecycleChecks;
import static org.apache.hadoop.test.LambdaTestUtils.*;
import static org.apache.hadoop.fs.s3a.S3AUtils.E_FS_CLOSED;
/**
- * Tests of the S3A FileSystem which is closed; just make sure
- * that that basic file Ops fail meaningfully.
+ * Tests of the S3A FileSystem which is closed.
*/
public class ITestS3AClosedFS extends AbstractS3ATestBase {
@@ -47,6 +51,16 @@ public class ITestS3AClosedFS extends AbstractS3ATestBase {
// no op, as the FS is closed
}
+ private static final Set<String> THREAD_SET =
+ listInitialThreadsForLifecycleChecks();
+
+ @AfterClass
+ public static void checkForThreadLeakage() {
+ Assertions.assertThat(getCurrentThreadNames())
+ .describedAs("The threads at the end of the test run")
+ .isSubsetOf(THREAD_SET);
+ }
+
@Test
public void testClosedGetFileStatus() throws Exception {
intercept(IOException.class, E_FS_CLOSED,
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
index 1889c05..bd38b40 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
@@ -63,7 +63,10 @@ import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
import java.util.concurrent.Callable;
+import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CREDENTIAL_PROVIDER_PATH;
@@ -598,7 +601,7 @@ public final class S3ATestUtils {
String tmpDir = conf.get(HADOOP_TMP_DIR, "target/build/test");
if (testUniqueForkId != null) {
// patch temp dir for the specific branch
- tmpDir = tmpDir + File.pathSeparatorChar + testUniqueForkId;
+ tmpDir = tmpDir + File.separator + testUniqueForkId;
conf.set(HADOOP_TMP_DIR, tmpDir);
}
conf.set(BUFFER_DIR, tmpDir);
@@ -1346,4 +1349,41 @@ public final class S3ATestUtils {
STABILIZATION_TIME, PROBE_INTERVAL_MILLIS,
() -> fs.getFileStatus(testFilePath));
}
+
+ /**
+ * This creates a set containing all current threads and some well-known
+ * thread names whose existence should not fail test runs.
+ * They are generally static cleaner threads created by various classes
+ * on instantiation.
+ * @return a set of threads to use in later assertions.
+ */
+ public static Set<String> listInitialThreadsForLifecycleChecks() {
+ Set<String> threadSet = getCurrentThreadNames();
+ // static filesystem statistics cleaner
+ threadSet.add(
+ "org.apache.hadoop.fs.FileSystem$Statistics$StatisticsDataReferenceCleaner");
+ // AWS progress callbacks
+ threadSet.add("java-sdk-progress-listener-callback-thread");
+ // another AWS thread
+ threadSet.add("java-sdk-http-connection-reaper");
+ // java.lang.UNIXProcess. maybe if chmod is called?
+ threadSet.add("process reaper");
+ // once a quantile has been scheduled, the mutable quantile thread pool
+ // is initialized; it has a minimum thread size of 1.
+ threadSet.add("MutableQuantiles-0");
+ // IDE?
+ threadSet.add("Attach Listener");
+ return threadSet;
+ }
+
+ /**
+ * Get a set containing the names of all active threads.
+ * @return the current set of threads.
+ */
+ public static Set<String> getCurrentThreadNames() {
+ return Thread.getAllStackTraces().keySet()
+ .stream()
+ .map(Thread::getName)
+ .collect(Collectors.toCollection(TreeSet::new));
+ }
}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java
index 822e361..cacd54d 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java
@@ -25,7 +25,10 @@ import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.assertj.core.api.Assertions;
+import org.junit.AfterClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -179,6 +182,20 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
}
/**
+ * This only looks for leakage of committer thread pools,
+ * and not any other leaked threads, such as those from S3A FS instances.
+ */
+ @AfterClass
+ public static void checkForThreadLeakage() {
+ List<String> committerThreads = getCurrentThreadNames().stream()
+ .filter(n -> n.startsWith(AbstractS3ACommitter.THREAD_PREFIX))
+ .collect(Collectors.toList());
+ Assertions.assertThat(committerThreads)
+ .describedAs("Outstanding committer threads")
+ .isEmpty();
+ }
+
+ /**
* Add the specified job to the current list of jobs to abort in teardown.
* @param jobData job data.
*/
@@ -518,6 +535,7 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
describe("\ncommitting job");
committer.commitJob(jContext);
describe("commit complete\n");
+ verifyCommitterHasNoThreads(committer);
}
}
@@ -574,7 +592,7 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
// Commit the task. This will promote data and metadata to where
// job commits will pick it up on commit or abort.
- committer.commitTask(tContext);
+ commitTask(committer, tContext);
assertTaskAttemptPathDoesNotExist(committer, tContext);
Configuration conf2 = jobData.job.getConfiguration();
@@ -600,6 +618,7 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
committer2.abortJob(jContext2, JobStatus.State.KILLED);
// now, state of system may still have pending data
assertNoMultipartUploadsPending(outDir);
+ verifyCommitterHasNoThreads(committer2);
}
protected void assertTaskAttemptPathDoesNotExist(
@@ -742,7 +761,7 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
describe("2. Committing task");
assertTrue("No files to commit were found by " + committer,
committer.needsTaskCommit(tContext));
- committer.commitTask(tContext);
+ commitTask(committer, tContext);
// this is only task commit; there MUST be no part- files in the dest dir
waitForConsistency();
@@ -758,7 +777,7 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
describe("3. Committing job");
assertMultipartUploadsPending(outDir);
- committer.commitJob(jContext);
+ commitJob(committer, jContext);
// validate output
describe("4. Validating content");
@@ -809,7 +828,7 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
// now fail job
expectSimulatedFailureOnJobCommit(jContext, committer);
- committer.commitJob(jContext);
+ commitJob(committer, jContext);
// but the data got there, due to the order of operations.
validateContent(outDir, shouldExpectSuccessMarker());
@@ -1011,6 +1030,7 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
committer.abortJob(jobData.jContext, JobStatus.State.FAILED);
assertJobAbortCleanedUp(jobData);
+ verifyCommitterHasNoThreads(committer);
}
/**
@@ -1064,6 +1084,7 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
// try again; expect abort to be idempotent.
committer.abortJob(jContext, JobStatus.State.FAILED);
assertNoMultipartUploadsPending(outDir);
+ verifyCommitterHasNoThreads(committer);
}
public void assertPart0000DoesNotExist(Path dir) throws Exception {
@@ -1223,8 +1244,8 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
validateTaskAttemptPathAfterWrite(dest);
assertTrue("Committer does not have data to commit " + committer,
committer.needsTaskCommit(tContext));
- committer.commitTask(tContext);
- committer.commitJob(jContext);
+ commitTask(committer, tContext);
+ commitJob(committer, jContext);
// validate output
verifySuccessMarker(outDir);
}
@@ -1257,6 +1278,7 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
AbstractS3ACommitter committer2 = (AbstractS3ACommitter)
outputFormat.getOutputCommitter(newAttempt);
committer2.abortTask(tContext);
+ verifyCommitterHasNoThreads(committer2);
assertNoMultipartUploadsPending(getOutDir());
}
@@ -1306,19 +1328,19 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
// at this point, job1 and job2 both have uncommitted tasks
// commit tasks in order task 2, task 1.
- committer2.commitTask(tContext2);
- committer1.commitTask(tContext1);
+ commitTask(committer2, tContext2);
+ commitTask(committer1, tContext1);
assertMultipartUploadsPending(job1Dest);
assertMultipartUploadsPending(job2Dest);
// commit jobs in order job 1, job 2
- committer1.commitJob(jContext1);
+ commitJob(committer1, jContext1);
assertNoMultipartUploadsPending(job1Dest);
getPart0000(job1Dest);
assertMultipartUploadsPending(job2Dest);
- committer2.commitJob(jContext2);
+ commitJob(committer2, jContext2);
getPart0000(job2Dest);
assertNoMultipartUploadsPending(job2Dest);
} finally {
@@ -1379,4 +1401,36 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
TaskAttemptContext context) throws IOException {
}
+ /**
+ * Commit a task then validate the state of the committer afterwards.
+ * @param committer committer
+ * @param tContext task context
+ * @throws IOException IO failure
+ */
+ protected void commitTask(final AbstractS3ACommitter committer,
+ final TaskAttemptContext tContext) throws IOException {
+ committer.commitTask(tContext);
+ verifyCommitterHasNoThreads(committer);
+ }
+
+ /**
+ * Commit a job then validate the state of the committer afterwards.
+ * @param committer committer
+ * @param jContext job context
+ * @throws IOException IO failure
+ */
+ protected void commitJob(final AbstractS3ACommitter committer,
+ final JobContext jContext) throws IOException {
+ committer.commitJob(jContext);
+ verifyCommitterHasNoThreads(committer);
+ }
+
+ /**
+ * Verify that the committer does not have a thread pool.
+ * @param committer committer to validate.
+ */
+ protected void verifyCommitterHasNoThreads(AbstractS3ACommitter committer) {
+ assertFalse("Committer has an active thread pool",
+ committer.hasThreadPool());
+ }
}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestTasks.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestTasks.java
index 5e6fb82..4ee39f1 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestTasks.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestTasks.java
@@ -73,7 +73,7 @@ public class TestTasks extends HadoopTestBase {
* more checks on single thread than parallel ops.
* @return a list of parameter tuples.
*/
- @Parameterized.Parameters
+ @Parameterized.Parameters(name = "threads={0}")
public static Collection<Object[]> params() {
return Arrays.asList(new Object[][]{
{0},
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/integration/ITestS3ACommitterMRJob.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/integration/ITestS3ACommitterMRJob.java
index e3e4449..1045a29 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/integration/ITestS3ACommitterMRJob.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/integration/ITestS3ACommitterMRJob.java
@@ -632,10 +632,10 @@ public class ITestS3ACommitterMRJob extends AbstractYarnClusterITest {
final FileStatus st = fs.getFileStatus(magicDir);
StringBuilder result = new StringBuilder("Found magic dir which should"
+ " have been deleted at ").append(st).append('\n');
- result.append("[");
+ result.append(" [");
applyLocatedFiles(fs.listFiles(magicDir, true),
- (status) -> result.append(status.getPath()).append('\n'));
- result.append("[");
+ (status) -> result.append(" ").append(status.getPath()).append('\n'));
+ result.append("]");
return result.toString();
});
}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/StagingTestBase.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/StagingTestBase.java
index fd585d0..f368bf2 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/StagingTestBase.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/StagingTestBase.java
@@ -251,6 +251,12 @@ public class StagingTestBase {
verify(mockS3).getFileStatus(path);
}
+ /**
+ * Verify that mkdirs was invoked once.
+ * @param mockS3 mock
+ * @param path path to check
+ * @throws IOException from the mkdirs signature.
+ */
public static void verifyMkdirsInvoked(FileSystem mockS3, Path path)
throws IOException {
verify(mockS3).mkdirs(path);
@@ -320,12 +326,7 @@ public class StagingTestBase {
@Before
public void setupJob() throws Exception {
- this.jobConf = new JobConf();
- jobConf.set(InternalCommitterConstants.FS_S3A_COMMITTER_STAGING_UUID,
- UUID.randomUUID().toString());
- jobConf.setBoolean(
- CommitConstants.CREATE_SUCCESSFUL_JOB_OUTPUT_DIR_MARKER,
- false);
+ this.jobConf = createJobConf();
this.job = new JobContextImpl(jobConf, JOB_ID);
this.results = new StagingTestBase.ClientResults();
@@ -338,6 +339,16 @@ public class StagingTestBase {
wrapperFS.setAmazonS3Client(mockClient);
}
+ protected JobConf createJobConf() {
+ JobConf conf = new JobConf();
+ conf.set(InternalCommitterConstants.FS_S3A_COMMITTER_STAGING_UUID,
+ UUID.randomUUID().toString());
+ conf.setBoolean(
+ CommitConstants.CREATE_SUCCESSFUL_JOB_OUTPUT_DIR_MARKER,
+ false);
+ return conf;
+ }
+
public S3AFileSystem getMockS3A() {
return mockFS;
}
@@ -461,6 +472,11 @@ public class StagingTestBase {
return deletes;
}
+ public List<String> getDeletePaths() {
+ return deletes.stream().map(DeleteObjectRequest::getKey).collect(
+ Collectors.toList());
+ }
+
public void resetDeletes() {
deletes.clear();
}
@@ -478,6 +494,14 @@ public class StagingTestBase {
requests.clear();
}
+ public void addUpload(String id, String key) {
+ activeUploads.put(id, key);
+ }
+
+ public void addUploads(Map<String, String> uploadMap) {
+ activeUploads.putAll(uploadMap);
+ }
+
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(
@@ -648,8 +672,9 @@ public class StagingTestBase {
}
CompleteMultipartUploadRequest req = getArgumentAt(invocation,
0, CompleteMultipartUploadRequest.class);
+ String uploadId = req.getUploadId();
+ removeUpload(results, uploadId);
results.commits.add(req);
- results.activeUploads.remove(req.getUploadId());
return newResult(req);
}
@@ -669,14 +694,7 @@ public class StagingTestBase {
AbortMultipartUploadRequest req = getArgumentAt(invocation,
0, AbortMultipartUploadRequest.class);
String id = req.getUploadId();
- String p = results.activeUploads.remove(id);
- if (p == null) {
- // upload doesn't exist
- AmazonS3Exception ex = new AmazonS3Exception(
- "not found " + id);
- ex.setStatusCode(404);
- throw ex;
- }
+ removeUpload(results, id);
results.aborts.add(req);
return null;
}
@@ -729,6 +747,24 @@ public class StagingTestBase {
return mockClient;
}
+ /**
+ * Remove an upload from the upload map.
+ * @param results result set
+ * @param uploadId The upload ID to remove
+ * @throws AmazonS3Exception with error code 404 if the id is unknown.
+ */
+ protected static void removeUpload(final ClientResults results,
+ final String uploadId) {
+ String removed = results.activeUploads.remove(uploadId);
+ if (removed == null) {
+ // upload doesn't exist
+ AmazonS3Exception ex = new AmazonS3Exception(
+ "not found " + uploadId);
+ ex.setStatusCode(404);
+ throw ex;
+ }
+ }
+
private static CompleteMultipartUploadResult newResult(
CompleteMultipartUploadRequest req) {
return new CompleteMultipartUploadResult();
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestDirectoryCommitterScale.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestDirectoryCommitterScale.java
new file mode 100644
index 0000000..6d93e5f
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestDirectoryCommitterScale.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit.staging;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import com.amazonaws.services.s3.model.PartETag;
+import com.google.common.collect.Maps;
+import org.assertj.core.api.Assertions;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runners.MethodSorters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter;
+import org.apache.hadoop.fs.s3a.commit.CommitConstants;
+import org.apache.hadoop.fs.s3a.commit.files.PendingSet;
+import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.DurationInfo;
+
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.CONFLICT_MODE_APPEND;
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_CONFLICT_MODE;
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.PENDINGSET_SUFFIX;
+import static org.apache.hadoop.fs.s3a.commit.staging.StagingTestBase.BUCKET;
+import static org.apache.hadoop.fs.s3a.commit.staging.StagingTestBase.outputPath;
+import static org.apache.hadoop.fs.s3a.commit.staging.StagingTestBase.outputPathUri;
+import static org.apache.hadoop.fs.s3a.commit.staging.StagingTestBase.pathIsDirectory;
+
+/**
+ * Scale test of the directory committer: if there are many, many files
+ * does job commit overload.
+ * This is a mock test as to avoid the overhead of going near S3;
+ * it does use a lot of local filesystem files though so as to
+ * simulate real large scale deployment better.
+ */
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public class TestDirectoryCommitterScale
+ extends StagingTestBase.JobCommitterTest<DirectoryStagingCommitter> {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestDirectoryCommitterScale.class);
+
+ public static final int TASKS = 500;
+
+ public static final int FILES_PER_TASK = 10;
+
+ public static final int TOTAL_COMMIT_COUNT = FILES_PER_TASK * TASKS;
+
+ public static final int BLOCKS_PER_TASK = 1000;
+
+ private static File stagingDir;
+
+ private static LocalFileSystem localFS;
+
+ private static Path stagingPath;
+
+ private static Map<String, String> activeUploads =
+ Maps.newHashMap();
+
+ @Override
+ DirectoryCommitterForTesting newJobCommitter() throws Exception {
+ return new DirectoryCommitterForTesting(outputPath,
+ createTaskAttemptForJob());
+ }
+
+ @BeforeClass
+ public static void setupStaging() throws Exception {
+ stagingDir = File.createTempFile("staging", "");
+ stagingDir.delete();
+ stagingDir.mkdir();
+ stagingPath = new Path(stagingDir.toURI());
+ localFS = FileSystem.getLocal(new Configuration());
+ }
+
+
+ @AfterClass
+ public static void teardownStaging() throws IOException {
+ try {
+ if (stagingDir != null) {
+ FileUtils.deleteDirectory(stagingDir);
+ }
+ } catch (IOException ignored) {
+
+ }
+ }
+
+ @Override
+ protected JobConf createJobConf() {
+ JobConf conf = super.createJobConf();
+ conf.setInt(
+ CommitConstants.FS_S3A_COMMITTER_THREADS,
+ 100);
+ return conf;
+ }
+
+ protected Configuration getJobConf() {
+ return getJob().getConfiguration();
+ }
+
+ @Test
+ public void test_010_createTaskFiles() throws Exception {
+ try (DurationInfo ignored =
+ new DurationInfo(LOG, "Creating %d test files in %s",
+ TOTAL_COMMIT_COUNT, stagingDir)) {
+ createTasks();
+ }
+ }
+
+ /**
+ * Create the mock uploads of the tasks and save
+ * to .pendingset files.
+ * @throws IOException failure.
+ */
+ private void createTasks() throws IOException {
+ // create a stub multipart commit containing multiple files.
+
+ // step1: a list of tags.
+ // this is the md5sum of hadoop 3.2.1.tar
+ String tag = "9062dcf18ffaee254821303bbd11c72b";
+ List<PartETag> etags = IntStream.rangeClosed(1, BLOCKS_PER_TASK + 1)
+ .mapToObj(i -> new PartETag(i, tag))
+ .collect(Collectors.toList());
+ SinglePendingCommit base = new SinglePendingCommit();
+ base.setBucket(BUCKET);
+ base.setJobId("0000");
+ base.setLength(914688000);
+ base.bindCommitData(etags);
+ // these get overwritten
+ base.setDestinationKey("/base");
+ base.setUploadId("uploadId");
+ base.setUri(outputPathUri.toString());
+
+ SinglePendingCommit[] singles = new SinglePendingCommit[FILES_PER_TASK];
+ byte[] bytes = base.toBytes();
+ for (int i = 0; i < FILES_PER_TASK; i++) {
+ singles[i] = SinglePendingCommit.serializer().fromBytes(bytes);
+ }
+ // now create the files, using this as the template
+
+ int uploadCount = 0;
+ for (int task = 0; task < TASKS; task++) {
+ PendingSet pending = new PendingSet();
+ String taskId = String.format("task-%04d", task);
+
+ for (int i = 0; i < FILES_PER_TASK; i++) {
+ String uploadId = String.format("%05d-task-%04d-file-%02d",
+ uploadCount, task, i);
+ // longer paths to take up more space.
+ Path p = new Path(outputPath,
+ "datasets/examples/testdirectoryscale/"
+ + "year=2019/month=09/day=26/hour=20/second=53"
+ + uploadId);
+ URI dest = p.toUri();
+ SinglePendingCommit commit = singles[i];
+ String key = dest.getPath();
+ commit.setDestinationKey(key);
+ commit.setUri(dest.toString());
+ commit.touch(Instant.now().toEpochMilli());
+ commit.setTaskId(taskId);
+ commit.setUploadId(uploadId);
+ pending.add(commit);
+ activeUploads.put(uploadId, key);
+ }
+ Path path = new Path(stagingPath,
+ String.format("task-%04d." + PENDINGSET_SUFFIX, task));
+ pending.save(localFS, path, true);
+ }
+ }
+
+
+ @Test
+ public void test_020_loadFilesToAttempt() throws Exception {
+ DirectoryStagingCommitter committer = newJobCommitter();
+
+ Configuration jobConf = getJobConf();
+ jobConf.set(
+ FS_S3A_COMMITTER_STAGING_CONFLICT_MODE, CONFLICT_MODE_APPEND);
+ FileSystem mockS3 = getMockS3A();
+ pathIsDirectory(mockS3, outputPath);
+ try (DurationInfo ignored =
+ new DurationInfo(LOG, "listing pending uploads")) {
+ AbstractS3ACommitter.ActiveCommit activeCommit
+ = committer.listPendingUploadsToCommit(getJob());
+ Assertions.assertThat(activeCommit.getSourceFiles())
+ .describedAs("Source files of %s", activeCommit)
+ .hasSize(TASKS);
+ }
+ }
+
+ @Test
+ public void test_030_commitFiles() throws Exception {
+ DirectoryCommitterForTesting committer = newJobCommitter();
+ StagingTestBase.ClientResults results = getMockResults();
+ results.addUploads(activeUploads);
+ Configuration jobConf = getJobConf();
+ jobConf.set(
+ FS_S3A_COMMITTER_STAGING_CONFLICT_MODE, CONFLICT_MODE_APPEND);
+ S3AFileSystem mockS3 = getMockS3A();
+ pathIsDirectory(mockS3, outputPath);
+
+ try (DurationInfo ignored =
+ new DurationInfo(LOG, "Committing Job")) {
+ committer.commitJob(getJob());
+ }
+
+ Assertions.assertThat(results.getCommits())
+ .describedAs("commit count")
+ .hasSize(TOTAL_COMMIT_COUNT);
+ AbstractS3ACommitter.ActiveCommit activeCommit = committer.activeCommit;
+ Assertions.assertThat(activeCommit.getCommittedObjects())
+ .describedAs("committed objects in active commit")
+ .hasSize(Math.min(TOTAL_COMMIT_COUNT,
+ CommitConstants.SUCCESS_MARKER_FILE_LIMIT));
+ Assertions.assertThat(activeCommit.getCommittedFileCount())
+ .describedAs("committed objects in active commit")
+ .isEqualTo(TOTAL_COMMIT_COUNT);
+
+ }
+
+ @Test
+ public void test_040_abortFiles() throws Exception {
+ DirectoryStagingCommitter committer = newJobCommitter();
+ getMockResults().addUploads(activeUploads);
+ Configuration jobConf = getJobConf();
+ jobConf.set(
+ FS_S3A_COMMITTER_STAGING_CONFLICT_MODE, CONFLICT_MODE_APPEND);
+ FileSystem mockS3 = getMockS3A();
+ pathIsDirectory(mockS3, outputPath);
+
+ committer.abortJob(getJob(), JobStatus.State.FAILED);
+ }
+
+
+ /**
+ * Committer overridden for better testing.
+ */
+ private static final class DirectoryCommitterForTesting extends
+ DirectoryStagingCommitter {
+ private ActiveCommit activeCommit;
+
+ private DirectoryCommitterForTesting(Path outputPath,
+ TaskAttemptContext context) throws IOException {
+ super(outputPath, context);
+ }
+
+ @Override
+ protected void initOutput(Path out) throws IOException {
+ super.initOutput(out);
+ setOutputPath(out);
+ }
+
+ /**
+ * Returns the mock FS without checking FS type.
+ * @param out output path
+ * @param config job/task config
+ * @return a filesystem.
+ * @throws IOException failure to get the FS
+ */
+ @Override
+ protected FileSystem getDestinationFS(Path out, Configuration config)
+ throws IOException {
+ return out.getFileSystem(config);
+ }
+
+ @Override
+ public Path getJobAttemptPath(JobContext context) {
+ return stagingPath;
+ }
+
+ @Override
+ protected void commitJobInternal(final JobContext context,
+ final ActiveCommit pending)
+ throws IOException {
+ activeCommit = pending;
+ super.commitJobInternal(context, pending);
+ }
+ }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingCommitter.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingCommitter.java
index 8939296..15ea754 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingCommitter.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingCommitter.java
@@ -35,6 +35,7 @@ import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
import com.google.common.collect.Sets;
+import org.assertj.core.api.Assertions;
import org.hamcrest.core.StringStartsWith;
import org.junit.After;
import org.junit.Before;
@@ -535,33 +536,31 @@ public class TestStagingCommitter extends StagingTestBase.MiniDFSTest {
return jobCommitter.toString();
});
- assertEquals("Should have succeeded to commit some uploads",
- 5, results.getCommits().size());
-
- assertEquals("Should have deleted the files that succeeded",
- 5, results.getDeletes().size());
Set<String> commits = results.getCommits()
.stream()
- .map((commit) -> commit.getBucketName() + commit.getKey())
+ .map(commit ->
+ "s3a://" + commit.getBucketName() + "/" + commit.getKey())
.collect(Collectors.toSet());
Set<String> deletes = results.getDeletes()
.stream()
- .map((delete) -> delete.getBucketName() + delete.getKey())
+ .map(delete ->
+ "s3a://" + delete.getBucketName() + "/" + delete.getKey())
.collect(Collectors.toSet());
- assertEquals("Committed and deleted objects should match",
- commits, deletes);
-
- assertEquals("Mismatch in aborted upload count",
- 7, results.getAborts().size());
+ Assertions.assertThat(commits)
+ .describedAs("Committed objects compared to deleted paths %s", results)
+ .containsExactlyInAnyOrderElementsOf(deletes);
+ Assertions.assertThat(results.getAborts())
+ .describedAs("aborted count in %s", results)
+ .hasSize(7);
Set<String> uploadIds = getCommittedIds(results.getCommits());
uploadIds.addAll(getAbortedIds(results.getAborts()));
-
- assertEquals("Should have committed/deleted or aborted all uploads",
- uploads, uploadIds);
+ Assertions.assertThat(uploadIds)
+ .describedAs("Combined commit/delete and aborted upload IDs")
+ .containsExactlyInAnyOrderElementsOf(uploads);
assertPathDoesNotExist(fs, "jobAttemptPath not deleted", jobAttemptPath);
}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingDirectoryOutputCommitter.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingDirectoryOutputCommitter.java
index 994ecef..98075b8 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingDirectoryOutputCommitter.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingDirectoryOutputCommitter.java
@@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.PathExistsException;
+import org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter;
import org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
@@ -84,17 +85,18 @@ public class TestStagingDirectoryOutputCommitter
() -> committer.setupJob(getJob()));
// but there are no checks in job commit (HADOOP-15469)
- committer.commitJob(getJob());
+ // this is done by calling the preCommit method directly,
+ committer.preCommitJob(getJob(), AbstractS3ACommitter.ActiveCommit.empty());
- reset((FileSystem) getMockS3A());
+ reset(getMockS3A());
pathDoesNotExist(getMockS3A(), outputPath);
committer.setupJob(getJob());
verifyExistenceChecked(getMockS3A(), outputPath);
verifyMkdirsInvoked(getMockS3A(), outputPath);
- verifyNoMoreInteractions((FileSystem) getMockS3A());
+ verifyNoMoreInteractions(getMockS3A());
- reset((FileSystem) getMockS3A());
+ reset(getMockS3A());
pathDoesNotExist(getMockS3A(), outputPath);
committer.commitJob(getJob());
verifyCompletion(getMockS3A());
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedJobCommit.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedJobCommit.java
index e7410e3..872097f 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedJobCommit.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedJobCommit.java
@@ -18,20 +18,21 @@
package org.apache.hadoop.fs.s3a.commit.staging;
+import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.List;
import java.util.UUID;
-import com.google.common.collect.Lists;
import org.junit.Test;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.MockS3AFileSystem;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.commit.PathCommitException;
+import org.apache.hadoop.fs.s3a.commit.files.PendingSet;
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -59,37 +60,59 @@ public class TestStagingPartitionedJobCommit
/**
* Subclass of the Partitioned Staging committer used in the test cases.
*/
- private static final class PartitionedStagingCommitterForTesting
+ private final class PartitionedStagingCommitterForTesting
extends PartitionedCommitterForTesting {
- private boolean aborted = false;
+ private boolean aborted;
private PartitionedStagingCommitterForTesting(TaskAttemptContext context)
throws IOException {
super(StagingTestBase.outputPath, context);
}
+ /**
+ * Generate pending uploads to commit.
+ * This is quite complex as the mock pending uploads need to be saved
+ * to a filesystem for the next stage of the commit process.
+ * To simulate multiple commit, more than one .pendingset file is created,
+ * @param context job context
+ * @return an active commit containing a list of paths to valid pending set
+ * file.
+ * @throws IOException IO failure
+ */
@Override
- protected List<SinglePendingCommit> listPendingUploadsToCommit(
+ protected ActiveCommit listPendingUploadsToCommit(
JobContext context) throws IOException {
- List<SinglePendingCommit> pending = Lists.newArrayList();
+ LocalFileSystem localFS = FileSystem.getLocal(getConf());
+ ActiveCommit activeCommit = new ActiveCommit(localFS,
+ new ArrayList<>(0));
+ // need to create some pending entries.
for (String dateint : Arrays.asList("20161115", "20161116")) {
+ PendingSet pendingSet = new PendingSet();
for (String hour : Arrays.asList("13", "14")) {
+ String uploadId = UUID.randomUUID().toString();
String key = OUTPUT_PREFIX + "/dateint=" + dateint + "/hour=" + hour +
- "/" + UUID.randomUUID().toString() + ".parquet";
+ "/" + uploadId + ".parquet";
SinglePendingCommit commit = new SinglePendingCommit();
commit.setBucket(BUCKET);
commit.setDestinationKey(key);
commit.setUri("s3a://" + BUCKET + "/" + key);
- commit.setUploadId(UUID.randomUUID().toString());
+ commit.setUploadId(uploadId);
ArrayList<String> etags = new ArrayList<>();
etags.add("tag1");
commit.setEtags(etags);
- pending.add(commit);
+ pendingSet.add(commit);
+ // register the upload so commit operations are not rejected
+ getMockResults().addUpload(uploadId, key);
}
+ File file = File.createTempFile("staging", ".pendingset");
+ file.deleteOnExit();
+ Path path = new Path(file.toURI());
+ pendingSet.save(localFS, path, true);
+ activeCommit.add(path);
}
- return pending;
+ return activeCommit;
}
@Override
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedTaskCommit.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedTaskCommit.java
index 8116b79..4b56826 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedTaskCommit.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedTaskCommit.java
@@ -27,6 +27,7 @@ import java.util.UUID;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import org.assertj.core.api.Assertions;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -114,18 +115,7 @@ public class TestStagingPartitionedTaskCommit
reset(mockS3);
committer.commitTask(getTAC());
- Set<String> files = Sets.newHashSet();
- for (InitiateMultipartUploadRequest request :
- getMockResults().getRequests().values()) {
- assertEquals(BUCKET, request.getBucketName());
- files.add(request.getKey());
- }
- assertEquals("Should have the right number of uploads",
- relativeFiles.size(), files.size());
-
- Set<String> expected = buildExpectedList(committer);
-
- assertEquals("Should have correct paths", expected, files);
+ verifyFilesCreated(committer);
}
@Test
@@ -146,18 +136,29 @@ public class TestStagingPartitionedTaskCommit
pathExists(mockS3, new Path(outputPath, relativeFiles.get(2)).getParent());
committer.commitTask(getTAC());
+ verifyFilesCreated(committer);
+ }
+
+ /**
+ * Verify that the files created matches that expected.
+ * @param committer committer
+ */
+ protected void verifyFilesCreated(
+ final PartitionedStagingCommitter committer) {
Set<String> files = Sets.newHashSet();
for (InitiateMultipartUploadRequest request :
getMockResults().getRequests().values()) {
assertEquals(BUCKET, request.getBucketName());
files.add(request.getKey());
}
- assertEquals("Should have the right number of uploads",
- relativeFiles.size(), files.size());
+ Assertions.assertThat(files)
+ .describedAs("Should have the right number of uploads")
+ .hasSize(relativeFiles.size());
Set<String> expected = buildExpectedList(committer);
-
- assertEquals("Should have correct paths", expected, files);
+ Assertions.assertThat(files)
+ .describedAs("Should have correct paths")
+ .containsExactlyInAnyOrderElementsOf(expected);
}
@Test
@@ -180,18 +181,7 @@ public class TestStagingPartitionedTaskCommit
pathExists(mockS3, new Path(outputPath, relativeFiles.get(3)).getParent());
committer.commitTask(getTAC());
- Set<String> files = Sets.newHashSet();
- for (InitiateMultipartUploadRequest request :
- getMockResults().getRequests().values()) {
- assertEquals(BUCKET, request.getBucketName());
- files.add(request.getKey());
- }
- assertEquals("Should have the right number of uploads",
- relativeFiles.size(), files.size());
-
- Set<String> expected = buildExpectedList(committer);
-
- assertEquals("Should have correct paths", expected, files);
+ verifyFilesCreated(committer);
}
public Set<String> buildExpectedList(StagingCommitter committer) {
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org