You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-issues@hadoop.apache.org by GitBox <gi...@apache.org> on 2022/03/15 10:35:03 UTC

[GitHub] [hadoop] mukund-thakur commented on a change in pull request #2971: MAPREDUCE-7341. Intermediate Manifest Committer

mukund-thakur commented on a change in pull request #2971:
URL: https://github.com/apache/hadoop/pull/2971#discussion_r825692207



##########
File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer_architecture.md
##########
@@ -0,0 +1,335 @@
+<!---
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+
+# Manifest Committer Architecture
+
+This document describes the architecture and other implementation/correctness
+aspects of the [Manifest Committer](manifest_committer.html)
+
+The protocol and its correctness are covered in [Manifest Committer Protocol](manifest_committer_protocol.html).
+<!-- MACRO{toc|fromDepth=0|toDepth=2} -->
+
+The _Manifest_ committer is a committer for work which provides performance on ABFS for "real world"
+queries, and performance and correctness on GCS.
+
+This committer uses the extension point which came in for the S3A committers.
+Users can declare a new committer factory for `abfs://` and `gcs://` URLs.
+It can be used through Hadoop MapReduce and Apache Spark.
+
+## Background
+
+### Terminology
+
+| Term | Meaning|
+|------|--------|
+| Committer |  A class which can be invoked by MR/Spark to perform the task and job commit operations. |
+| Spark Driver | The spark process scheduling the work and choreographing the commit operation.|
+| Job  | In MapReduce. the entire application. In spark, this is a single stage in a chain of work |
+| Job Attempt | A single attempt at a job. MR supports multiple Job attempts with recovery on partial job failure. Spark says "start again from scratch" |
+| Task | a subsection of a job, such as processing one file, or one part of a file |
+| Task ID |  ID of the task, unique within this job. Usually starts at 0 and is used in filenames (part-0000, part-001, etc.) |
+| Task attempt (TA) | An attempt to perform a task. It may fail, in which case MR/spark will schedule another. |
+| Task Attempt ID | A unique ID for the task attempt. The Task ID + an attempt counter.|
+|  Destination directory | The final destination of work.|

Review comment:
       nit: extra space in the start.
   

##########
File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitterFactory.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory;
+
+/**
+ * This is the committer factory to register as the source of committers
+ * for the job/filesystem schema.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class ManifestCommitterFactory extends PathOutputCommitterFactory {
+
+  public static final Logger LOG = LoggerFactory.getLogger(
+      ManifestCommitterFactory.class);
+
+  /**
+   * Name of this factory.
+   */
+  public static final String NAME = ManifestCommitterFactory.class.getName();
+
+  @Override
+  public ManifestCommitter createOutputCommitter(final Path outputPath,
+      final TaskAttemptContext context) throws IOException {
+    // safety check. S3A does not support this, so fail fast.
+    if ("s3a".equals(outputPath.toUri().getScheme())) {

Review comment:
       nit: hardcoding the s3a scheme? 

##########
File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer_architecture.md
##########
@@ -0,0 +1,335 @@
+<!---
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+
+# Manifest Committer Architecture
+
+This document describes the architecture and other implementation/correctness
+aspects of the [Manifest Committer](manifest_committer.html)
+
+The protocol and its correctness are covered in [Manifest Committer Protocol](manifest_committer_protocol.html).
+<!-- MACRO{toc|fromDepth=0|toDepth=2} -->
+
+The _Manifest_ committer is a committer for work which provides performance on ABFS for "real world"
+queries, and performance and correctness on GCS.
+
+This committer uses the extension point which came in for the S3A committers.
+Users can declare a new committer factory for `abfs://` and `gcs://` URLs.
+It can be used through Hadoop MapReduce and Apache Spark.
+
+## Background
+
+### Terminology
+
+| Term | Meaning|
+|------|--------|
+| Committer |  A class which can be invoked by MR/Spark to perform the task and job commit operations. |
+| Spark Driver | The spark process scheduling the work and choreographing the commit operation.|
+| Job  | In MapReduce. the entire application. In spark, this is a single stage in a chain of work |
+| Job Attempt | A single attempt at a job. MR supports multiple Job attempts with recovery on partial job failure. Spark says "start again from scratch" |
+| Task | a subsection of a job, such as processing one file, or one part of a file |
+| Task ID |  ID of the task, unique within this job. Usually starts at 0 and is used in filenames (part-0000, part-001, etc.) |
+| Task attempt (TA) | An attempt to perform a task. It may fail, in which case MR/spark will schedule another. |
+| Task Attempt ID | A unique ID for the task attempt. The Task ID + an attempt counter.|
+|  Destination directory | The final destination of work.|
+| Job Attempt Directory | A temporary directory used by the job attempt. This is always _underneath_ the destination directory, so as to ensure it is in the same encryption zone as HDFS, storage volume in other filesystems, etc.|
+| Task Attempt directory | (also known as "Task Attempt Working Directory"). Directory exclusive for each task attempt under which files are written |
+| Task Commit | Taking the output of a Task Attempt and making it the final/exclusive result of that "successful" Task.|
+| Job Commit | aggregating all the outputs of all committed tasks and producing the final results of the job. |
+
+
+
+The purpose of a committer is to ensure that the complete output of
+a job ends up in the destination, even in the presence of failures of tasks.
+
+- _Complete:_ the output includes the work of all successful tasks.
+- _Exclusive:_ the output of unsuccessful tasks is not present.
+- _Concurrent:_ When multiple tasks are committed in parallel the output is the same as when
+  the task commits are serialized. This is not a requirement of Job Commit.
+- _Abortable:_  jobs and tasks may be aborted prior to job commit, after which their output is not visible.
+- _Continuity of correctness:_ once a job is committed, the output of any failed,
+  aborted, or unsuccessful task MUST NO appear at some point in the future.
+
+For Hive's classic hierarchical-directory-structured tables, job committing
+requires the output of all committed tasks to be put into the correct location
+in the directory tree.
+
+The committer built into `hadoop-mapreduce-client-core` module is the `FileOutputCommitter`.
+
+
+
+## The Manifest Committer: A high performance committer for Spark on Azure and Google storage.
+
+The Manifest Committera higher performance committer for ABFS and GCS storage

Review comment:
       typo: Committera-> Committer

##########
File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer_architecture.md
##########
@@ -0,0 +1,335 @@
+<!---
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+
+# Manifest Committer Architecture
+
+This document describes the architecture and other implementation/correctness
+aspects of the [Manifest Committer](manifest_committer.html)
+
+The protocol and its correctness are covered in [Manifest Committer Protocol](manifest_committer_protocol.html).
+<!-- MACRO{toc|fromDepth=0|toDepth=2} -->
+
+The _Manifest_ committer is a committer for work which provides performance on ABFS for "real world"
+queries, and performance and correctness on GCS.
+
+This committer uses the extension point which came in for the S3A committers.
+Users can declare a new committer factory for `abfs://` and `gcs://` URLs.
+It can be used through Hadoop MapReduce and Apache Spark.
+
+## Background
+
+### Terminology
+
+| Term | Meaning|
+|------|--------|
+| Committer |  A class which can be invoked by MR/Spark to perform the task and job commit operations. |
+| Spark Driver | The spark process scheduling the work and choreographing the commit operation.|
+| Job  | In MapReduce. the entire application. In spark, this is a single stage in a chain of work |
+| Job Attempt | A single attempt at a job. MR supports multiple Job attempts with recovery on partial job failure. Spark says "start again from scratch" |
+| Task | a subsection of a job, such as processing one file, or one part of a file |
+| Task ID |  ID of the task, unique within this job. Usually starts at 0 and is used in filenames (part-0000, part-001, etc.) |
+| Task attempt (TA) | An attempt to perform a task. It may fail, in which case MR/spark will schedule another. |
+| Task Attempt ID | A unique ID for the task attempt. The Task ID + an attempt counter.|
+|  Destination directory | The final destination of work.|
+| Job Attempt Directory | A temporary directory used by the job attempt. This is always _underneath_ the destination directory, so as to ensure it is in the same encryption zone as HDFS, storage volume in other filesystems, etc.|
+| Task Attempt directory | (also known as "Task Attempt Working Directory"). Directory exclusive for each task attempt under which files are written |
+| Task Commit | Taking the output of a Task Attempt and making it the final/exclusive result of that "successful" Task.|
+| Job Commit | aggregating all the outputs of all committed tasks and producing the final results of the job. |
+
+
+
+The purpose of a committer is to ensure that the complete output of
+a job ends up in the destination, even in the presence of failures of tasks.
+
+- _Complete:_ the output includes the work of all successful tasks.
+- _Exclusive:_ the output of unsuccessful tasks is not present.
+- _Concurrent:_ When multiple tasks are committed in parallel the output is the same as when
+  the task commits are serialized. This is not a requirement of Job Commit.
+- _Abortable:_  jobs and tasks may be aborted prior to job commit, after which their output is not visible.
+- _Continuity of correctness:_ once a job is committed, the output of any failed,
+  aborted, or unsuccessful task MUST NO appear at some point in the future.
+
+For Hive's classic hierarchical-directory-structured tables, job committing
+requires the output of all committed tasks to be put into the correct location
+in the directory tree.
+
+The committer built into `hadoop-mapreduce-client-core` module is the `FileOutputCommitter`.
+
+
+
+## The Manifest Committer: A high performance committer for Spark on Azure and Google storage.
+
+The Manifest Committera higher performance committer for ABFS and GCS storage
+for jobs which create file across deep directory trees through many tasks.
+
+It will also work on `hdfs://` and indeed, `file://` URLs, but
+it is optimized to address listing and renaming performance and throttling
+issues in cloud storage.
+
+It *will not* work correctly with S3, because it relies on an atomic rename-no-overwrite
+operation to commit the manifest file. It will also have the performance
+problems of copying rather than moving all the generated data.
+
+Although it will work with MapReduce
+there is no handling of multiple job attempts with recovery from previous failed
+attempts. (Plan: fail on MR AM restart)
+
+### The Manifest
+
+A Manifest file is designed which contains (along with IOStatistics and some
+other things)
+
+1. A list of destination directories which must be created if they do not exist.
+1. A list of files to rename, recorded as (absolute source, absolute destination,
+   file-size) entries.
+
+### Task Commit
+
+Task attempts are committed by:
+
+1. Recursively listing the task attempt working dir to build
+   1. A list of directories under which files are renamed.
+   2. A list of files to rename: source, destination, size and optionally, etag.
+2. Saving this information in a manifest file in the job attempt directory with
+   a filename derived from the Task ID.
+   Note: writing to a temp file and then renaming to the final path will be used
+   to ensure the manifest creation is atomic.
+
+
+No renaming takes place —the files are left in their original location.
+
+The directory treewalk is single-threaded, then it is `O(directories)`,
+with each directory listing using one or more paged LIST calls.
+
+This is simple, and for most tasks, the scan is off the critical path of of the job.
+
+Statistics analysis may justify moving to parallel scans in future.
+
+
+### Job Commit
+
+Job Commit consists of:
+
+1. List all manifest files in the job attempt directory.
+1. Load each manifest file, create directories which do not yet exist, then
+   rename each file in the rename list.
+1. Save a JSON `_SUCCESS` file with the same format as the S3A committer (for
+   testing; use write and rename for atomic save)
+
+The job commit phase supports parallelization for many tasks and many files
+per task, specifically:
+
+1. Manifest tasks are loaded and processed in a pool of "manifest processor"
+   threads.
+2. Directory creation and file rename operations are each processed in a pool of "
+   executor" threads: many renames can execute in parallel as they use minimal
+   network IO.
+3. job cleanup can parallelize deletion of task attempt directories. This
+   is relevant as directory deletion is `O(files)` on Google cloud storage,
+   and also on ABFS when OAuth authentication is used.
+
+
+### Ancestor directory preparation
+
+Optional scan of all ancestors ...if any are files, delete.
+
+
+### Parent directory creation
+
+1. Probe shared directory map for directory existing. If found: operation is
+   complete.
+1. if the map is empty, call `getFileStatus()` on the path. Not found: create
+   directory, add entry and those of all parent paths Found and is directory:
+   add entry and those of all parent paths Found and is file: delete. then
+   create as before.
+
+Efficiently handling concurrent creation of directories (or delete+create) is going to be a
+troublespot; some effort is invested there to build the set of directories to
+create.
+
+### File Rename
+
+Files are renamed in parallel.
+
+A pre-rename check for anything being at that path (and deleting it) will be optional.
+With spark creating new UUIDs for each file, this isn't going to happen, and
+saves HTTP requests.
+
+
+### Validation
+
+Optional scan of all committed files and verify length and, if known,
+etag. For testing and diagnostics.
+
+## Benefits
+
+* Pushes the source tree list operations into the task commit phase, which is
+  generally off the critical path of execution
+* Provides an atomic task commit to GCS, as there is no expectation that
+  directory rename is atomic
+* It is possible to pass IOStatistics from workers in manifest.
+* Allows for some pre-rename operations similar to the S3A "Partitioned Staging
+  committer". This can be configured to delete all existing entries in
+  directories scheduled to be created -or fail if those partitions are
+  non-empty.
+  See [Partitioned Staging Committer](../../hadoop-aws/tools/hadoop-aws/committers.html#The_.E2.80.9CPartitioned.E2.80.9D_Staging_Committer)
+* Allows for an optional preflight validation check (verify no duplicate files created by different tasks)
+* Manifests can be viewed, size of output determined, etc, during
+  development/debugging.
+
+### Disadvantages
+
+* Needs a new manifest file format.
+* May makes task commit more complex.
+
+This solution is necessary for GCS and should be beneficial on ABFS as listing
+overheads are paid for in the task committers.
+
+# Implementation Details
+
+### Constraints
+
+A key goal is to keep the manifest committer isolated and neither
+touch the existing committer code nor other parts of the hadoop codebase.
+
+It must plug directly into MR and Spark without needing any changes
+other than already implemented for the S3A Committers
+
+* Self-contained: MUST NOT require changes to hadoop-common, etc.
+* Isolated: MUST NOT make changes to existing committers
+* Integrated: MUST bind via `PathOutputCommitterFactory`.
+
+As a result of this there's a bit of copy and paste from elsewhere,
+e.g. `org.apache.hadoop.util.functional.TaskPool`
+is based on S3ACommitter's `org.apache.hadoop.fs.s3a.commit.Tasks`.
+
+The` _SUCCESS` file MUST be compatible with the S3A JSON file.
+This is to ensure any existing test suites which validate
+S3A committer output can be retargeted at jobs executed
+by the manifest committer without any changes.
+
+
+#### Progress callbacks in job commit.
+
+When? Proposed: heartbeat until renaming finally finishes.
+
+#### Error handling and aborting in job commit.
+
+We would want to stop the entire job commit. Some atomic boolean "abort job"
+would need to be checked in the processing of each task committer thread's
+iteraton through a directory (or processing of each file?)
+Failures in listing or renaming will need to be escalated to halting the entire
+job commit. This implies that any IOE raised in asynchronous rename operation or
+in a task committer thread must:
+
+1. be caught
+1. be stored in a shared field/variable
+1. trigger the abort
+1. be rethrown at the end of the `commitJob()` call
+
+#### Avoiding deadlocks
+
+If a job commit stage is using a thread pool for per-task operations, e.g. loading
+files, that same thread pool MUST NOT be used for parallel operations within
+the per-task stage.
+
+As every `JobStage` is executed in sequence within task or job commit, it
+is safe to share the same thread pool across stages.
+
+In the current implementation, there is no parallel "per manifest" operation
+in job commit other than for actually loading the files.
+The operations to create directories and to rename files are actually
+performed without performing parallel processing of individual manifests.
+
+Directory Preparation: merge the directory lists of all manifests,
+then queue for creation the (hopefully very much smaller) set of unique
+directories.
+
+Rname: iterate through all manifests and queue their renames into a pool for

Review comment:
       typo: Rename

##########
File path: hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RateLimitingFactory.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+import java.time.Duration;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.RateLimiter;
+
+/**
+ * Factory for Rate Limiting.
+ * This should be only place in the code where the guava RateLimiter is imported.
+ */
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public final class RateLimitingFactory {
+
+  private static final RateLimiting UNLIMITED = new NoRateLimiting();
+
+  /**
+   * No waiting took place.
+   */
+  private static final Duration INSTANTLY = Duration.ofMillis(0);
+
+  private RateLimitingFactory() {
+  }
+
+  /**
+   * No Rate Limiting.
+   */
+  private static class NoRateLimiting implements RateLimiting {
+
+
+    @Override
+    public Duration acquire(int capacity) {
+      return INSTANTLY;
+    }
+  }
+
+  /**
+   * Rate limiting restricted to that of a google rate limiter.
+   */
+  private static final class RestrictedRateLimiting implements RateLimiting {
+    private final RateLimiter limiter;
+
+    /**
+     * Constructor.
+     * @param capacity capacity in permits/second.
+     */
+    private RestrictedRateLimiting(int capacity) {

Review comment:
       Should we change the name to maxCapacity?

##########
File path: hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RateLimiting.java
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Minimal subset of google rate limiter class.
+ * Can be used to throttle use of object stores where excess load
+ * will trigger cluster-wide throttling, backoff etc and so collapse
+ * performance.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface RateLimiting {

Review comment:
       I think we need to experiment/test more with test before moving for all operations in FS itself.  

##########
File path: hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/AbfsManifestStoreOperations.java
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.commit;
+
+import java.io.IOException;
+import java.time.Duration;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperationsThroughFileSystem;
+
+/**
+ * Extension of StoreOperationsThroughFileSystem with ABFS awareness.
+ * Purely for use by jobs committing work through the manifest committer.
+ * The {@link AzureManifestCommitterFactory} will configure the

Review comment:
       typo: will configure twice.

##########
File path: hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/commit/ResilientCommitByRename.java
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.commit;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.time.Duration;
+import javax.annotation.Nullable;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+
+/**
+ * API exclusively for committing files.
+ *
+ * This is only for use by (@link {@link AbfsManifestStoreOperations},
+ * and is intended to be implemented by ABFS.
+ * To ensure that there is no need to add mapreduce JARs to the
+ * classpath just to work with ABFS, this interface
+ * MUST NOT refer to anything in the
+ * {@code org.apache.hadoop.mapreduce} package.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface ResilientCommitByRename extends IOStatisticsSource {

Review comment:
       Should we move this to hadoop-common such that other stores can also implement?

##########
File path: hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
##########
@@ -438,24 +459,88 @@ public boolean rename(final Path src, final Path dst) throws IOException {
 
       qualifiedDstPath = makeQualified(adjustedDst);
 
-      abfsStore.rename(qualifiedSrcPath, qualifiedDstPath, tracingContext);
+      abfsStore.rename(qualifiedSrcPath, qualifiedDstPath, tracingContext, null);
       return true;
-    } catch(AzureBlobFileSystemException ex) {
+    } catch (AzureBlobFileSystemException ex) {
       LOG.debug("Rename operation failed. ", ex);
       checkException(
-              src,
-              ex,
-              AzureServiceErrorCode.PATH_ALREADY_EXISTS,
-              AzureServiceErrorCode.INVALID_RENAME_SOURCE_PATH,
-              AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND,
-              AzureServiceErrorCode.INVALID_SOURCE_OR_DESTINATION_RESOURCE_TYPE,
-              AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND,
-              AzureServiceErrorCode.INTERNAL_OPERATION_ABORT);
+          src,
+          ex,
+          AzureServiceErrorCode.PATH_ALREADY_EXISTS,
+          AzureServiceErrorCode.INVALID_RENAME_SOURCE_PATH,
+          AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND,
+          AzureServiceErrorCode.INVALID_SOURCE_OR_DESTINATION_RESOURCE_TYPE,
+          AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND,
+          AzureServiceErrorCode.INTERNAL_OPERATION_ABORT);
       return false;
     }
 
   }
 
+  /**
+   * Private method to create resilient commit support.
+   * @return a new instance
+   * @param path destination path
+   * @throws IOException problem probing store capabilities
+   * @throws UnsupportedOperationException if the store lacks this support
+   */
+  @InterfaceAudience.Private
+  public ResilientCommitByRename createResilientCommitSupport(final Path path)
+      throws IOException {
+
+    if (!hasPathCapability(path,
+        CommonPathCapabilities.ETAGS_PRESERVED_IN_RENAME)) {
+      throw new UnsupportedOperationException(
+          "Resilient commit support not available for " + path);
+    }
+    return new ResilientCommitByRenameImpl();
+  }
+
+  /**
+   * Resilient commit support.
+   * Provided as a nested class to avoid contaminating the
+   * FS instance with too many private methods which end up
+   * being used widely (as has happened to the S3A FS)
+   */
+  public class ResilientCommitByRenameImpl implements ResilientCommitByRename {
+    public Pair<Boolean, Duration> commitSingleFileByRename(

Review comment:
       nit: newline

##########
File path: hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
##########
@@ -438,24 +459,88 @@ public boolean rename(final Path src, final Path dst) throws IOException {
 
       qualifiedDstPath = makeQualified(adjustedDst);
 
-      abfsStore.rename(qualifiedSrcPath, qualifiedDstPath, tracingContext);
+      abfsStore.rename(qualifiedSrcPath, qualifiedDstPath, tracingContext, null);
       return true;
-    } catch(AzureBlobFileSystemException ex) {
+    } catch (AzureBlobFileSystemException ex) {
       LOG.debug("Rename operation failed. ", ex);
       checkException(
-              src,
-              ex,
-              AzureServiceErrorCode.PATH_ALREADY_EXISTS,
-              AzureServiceErrorCode.INVALID_RENAME_SOURCE_PATH,
-              AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND,
-              AzureServiceErrorCode.INVALID_SOURCE_OR_DESTINATION_RESOURCE_TYPE,
-              AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND,
-              AzureServiceErrorCode.INTERNAL_OPERATION_ABORT);
+          src,
+          ex,
+          AzureServiceErrorCode.PATH_ALREADY_EXISTS,
+          AzureServiceErrorCode.INVALID_RENAME_SOURCE_PATH,
+          AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND,
+          AzureServiceErrorCode.INVALID_SOURCE_OR_DESTINATION_RESOURCE_TYPE,
+          AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND,
+          AzureServiceErrorCode.INTERNAL_OPERATION_ABORT);
       return false;
     }
 
   }
 
+  /**
+   * Private method to create resilient commit support.
+   * @return a new instance
+   * @param path destination path
+   * @throws IOException problem probing store capabilities
+   * @throws UnsupportedOperationException if the store lacks this support
+   */
+  @InterfaceAudience.Private
+  public ResilientCommitByRename createResilientCommitSupport(final Path path)
+      throws IOException {
+
+    if (!hasPathCapability(path,
+        CommonPathCapabilities.ETAGS_PRESERVED_IN_RENAME)) {
+      throw new UnsupportedOperationException(
+          "Resilient commit support not available for " + path);
+    }
+    return new ResilientCommitByRenameImpl();
+  }
+
+  /**
+   * Resilient commit support.
+   * Provided as a nested class to avoid contaminating the
+   * FS instance with too many private methods which end up
+   * being used widely (as has happened to the S3A FS)
+   */
+  public class ResilientCommitByRenameImpl implements ResilientCommitByRename {
+    public Pair<Boolean, Duration> commitSingleFileByRename(
+        final Path source,
+        final Path dest,
+        @Nullable final String sourceEtag) throws IOException {
+
+      LOG.debug("renameFileWithEtag source: {} dest: {} etag {}", source, dest, sourceEtag);
+      statIncrement(CALL_RENAME);
+
+      trailingPeriodCheck(dest);
+      Path qualifiedSrcPath = makeQualified(source);
+      Path qualifiedDstPath = makeQualified(dest);
+
+      TracingContext tracingContext = new TracingContext(clientCorrelationId,
+          fileSystemId, FSOperationType.RENAME, true, tracingHeaderFormat,
+          listener);
+
+      if (qualifiedSrcPath.equals(qualifiedDstPath)) {
+        // rename to itself is forbidden
+        throw new PathIOException(qualifiedSrcPath.toString(), "cannot rename object onto self");
+      }
+
+      // acquire one IO permit
+      final Duration waitTime = rateLimiting.acquire(1);

Review comment:
       So this ratelimiting is only used in rename not others?

##########
File path: hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RateLimitingFactory.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+import java.time.Duration;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.RateLimiter;
+
+/**
+ * Factory for Rate Limiting.
+ * This should be only place in the code where the guava RateLimiter is imported.
+ */
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public final class RateLimitingFactory {
+
+  private static final RateLimiting UNLIMITED = new NoRateLimiting();
+
+  /**
+   * No waiting took place.
+   */
+  private static final Duration INSTANTLY = Duration.ofMillis(0);
+
+  private RateLimitingFactory() {
+  }
+
+  /**
+   * No Rate Limiting.
+   */
+  private static class NoRateLimiting implements RateLimiting {
+
+
+    @Override
+    public Duration acquire(int capacity) {
+      return INSTANTLY;
+    }
+  }
+
+  /**
+   * Rate limiting restricted to that of a google rate limiter.
+   */
+  private static final class RestrictedRateLimiting implements RateLimiting {
+    private final RateLimiter limiter;
+
+    /**
+     * Constructor.
+     * @param capacity capacity in permits/second.
+     */
+    private RestrictedRateLimiting(int capacity) {
+      this.limiter = RateLimiter.create(capacity);
+    }
+
+    @Override
+    public Duration acquire(int capacity) {

Review comment:
       and this requestedCapacity for better understanding?

##########
File path: hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/RateLimiting.java
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+import java.time.Duration;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Minimal subset of google rate limiter class.
+ * Can be used to throttle use of object stores where excess load
+ * will trigger cluster-wide throttling, backoff etc. and so collapse
+ * performance.
+ * The time waited is returned as a Duration type.
+ * The google rate limiter implements this by allowing a caller to ask for
+ * more capacity than is available. This will be granted
+ * but the subsequent request will be blocked if the bucket of
+ * capacity hasn't let refilled to the point where there is
+ * capacity again.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface RateLimiting {
+
+  /**
+   * Acquire rate limiter capacity.
+   * If there is not enough space, the permits will be acquired,
+   * but the subsequent call will block until the capacity has been
+   * refilled.
+   * @param capacity capacity to acquire.
+   * @return time spent waiting for output.
+   */
+  Duration acquire(int capacity);

Review comment:
       Are there no operation to release? How will capacity be freed once IO is done?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org