You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-issues@hadoop.apache.org by GitBox <gi...@apache.org> on 2022/03/16 11:09:48 UTC

[GitHub] [hadoop] mehakmeet commented on a change in pull request #2971: MAPREDUCE-7341. Intermediate Manifest Committer

mehakmeet commented on a change in pull request #2971:
URL: https://github.com/apache/hadoop/pull/2971#discussion_r827831884



##########
File path: hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/AbstractAbfsClusterITest.java
##########
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.commit;
+
+import java.io.IOException;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+
+import org.junit.AfterClass;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.azure.integration.AzureTestConstants;
+import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
+import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.AbstractManifestCommitterTest;
+import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
+import org.apache.hadoop.util.DurationInfo;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.fs.azure.integration.AzureTestUtils.assumeScaleTestsEnabled;
+import static org.apache.hadoop.io.IOUtils.closeStream;
+
+/**
+ * Tests which create a yarn minicluster.
+ * These are all considered scale tests; the probe for
+ * scale tests being enabled is executed before the cluster
+ * is set up to avoid wasting time on non-scale runs.
+ */
+public abstract class AbstractAbfsClusterITest extends
+    AbstractManifestCommitterTest {
+
+  public static final int NO_OF_NODEMANAGERS = 2;
+
+  private final ABFSContractTestBinding binding;
+
+
+  /**
+   * The static cluster binding with the lifecycle of this test; served
+   * through instance-level methods for sharing across methods in the
+   * suite.
+   */
+  @SuppressWarnings("StaticNonFinalField")
+  private static ClusterBinding clusterBinding;
+
+  protected AbstractAbfsClusterITest() throws Exception {
+    binding = new ABFSContractTestBinding();
+  }
+
+  @Override
+  protected int getTestTimeoutMillis() {
+    return AzureTestConstants.SCALE_TEST_TIMEOUT_MILLIS;
+  }
+
+  @Override
+  public void setup() throws Exception {
+    binding.setup();
+    super.setup();
+    requireScaleTestsEnabled();
+    if (getClusterBinding() == null) {
+      clusterBinding = demandCreateClusterBinding();
+    }
+    assertNotNull("cluster is not bound", getClusterBinding());
+  }
+
+  @AfterClass
+  public static void teardownClusters() throws IOException {
+    terminateCluster(clusterBinding);
+    clusterBinding = null;
+  }
+
+  @Override
+  protected AbstractFSContract createContract(final Configuration conf) {
+    return new AbfsFileSystemContract(conf, binding.isSecureMode());
+  }
+
+  @Override
+  protected Configuration createConfiguration() {
+    return AbfsCommitTestHelper.prepareTestConfiguration(binding);
+  }
+
+  /**
+   * This is the cluster binding which every subclass must create.
+   */
+  protected static final class ClusterBinding {
+
+    private String clusterName;
+
+    private final MiniMRYarnCluster yarn;
+
+    public ClusterBinding(
+        final String clusterName,
+        final MiniMRYarnCluster yarn) {
+      this.clusterName = clusterName;
+      this.yarn = requireNonNull(yarn);
+    }
+
+
+    /**
+     * Get the cluster FS, which will either be HDFS or the local FS.
+     * @return a filesystem.
+     * @throws IOException failure
+     */
+    public FileSystem getClusterFS() throws IOException {
+      return FileSystem.getLocal(yarn.getConfig());
+    }
+
+    public MiniMRYarnCluster getYarn() {
+      return yarn;
+    }
+
+    public Configuration getConf() {
+      return getYarn().getConfig();
+    }
+
+    public String getClusterName() {
+      return clusterName;
+    }
+
+    public void terminate() {
+      closeStream(getYarn());
+    }
+  }
+
+  /**
+   * Create the cluster binding.
+   * The configuration will be patched by propagating down options
+   * from the maven build (S3Guard binding etc) and turning off unwanted
+   * YARN features.
+   *
+   * If an HDFS cluster is requested,
+   * the HDFS and YARN clusters will share the same configuration, so
+   * the HDFS cluster binding is implicitly propagated to YARN.
+   * If one is not requested, the local filesystem is used as the cluster FS.
+   * @param conf configuration to start with.
+   * @param useHDFS should an HDFS cluster be instantiated.

Review comment:
       Not a param

##########
File path: hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/AbfsCommitTestHelper.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.commit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
+
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_STORE_OPERATIONS_CLASS;
+
+/**
+ * Helper methods for committer tests on ABFS.
+ */
+final class AbfsCommitTestHelper {
+  private AbfsCommitTestHelper() {
+  }
+
+  /**
+   * Prepare the test configuration.
+   * @param contractTestBinding test binding
+   * @return an extraced and patched configuration.

Review comment:
       typo: "extracted"

##########
File path: hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsManifestManifestStoreOperations.java
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.commit;
+
+import java.nio.charset.StandardCharsets;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
+import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.AbstractManifestCommitterTest;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperations;
+
+import static org.apache.hadoop.fs.CommonPathCapabilities.ETAGS_PRESERVED_IN_RENAME;
+import static org.apache.hadoop.fs.azurebfs.commit.AbfsCommitTestHelper.prepareTestConfiguration;
+import static org.junit.Assume.assumeTrue;
+
+/**
+ * Test {@link AbfsManifestStoreOperations}.
+ * As this looks at etag handling through FS operations, it's actually testing how etags work
+ * in ABFS (preservation across renames) and in the client (are they consisten
+ * in LIST and HEAD calls)
+ */
+public class ITestAbfsManifestManifestStoreOperations extends AbstractManifestCommitterTest {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ITestAbfsManifestManifestStoreOperations.class);
+
+  private final ABFSContractTestBinding binding;
+
+  public ITestAbfsManifestManifestStoreOperations() throws Exception {
+    binding = new ABFSContractTestBinding();
+  }
+
+  @Override
+  public void setup() throws Exception {
+    binding.setup();
+    super.setup();
+
+    // skip tests on non-HNS stores
+    assumeTrue("Resilient rename not available",
+        getFileSystem().hasPathCapability(getContract().getTestPath(),
+            ETAGS_PRESERVED_IN_RENAME));
+
+  }
+
+  @Override
+  protected Configuration createConfiguration() {
+    return enableManifestCommitter(prepareTestConfiguration(binding));
+  }
+
+  @Override
+  protected AbstractFSContract createContract(final Configuration conf) {
+    return new AbfsFileSystemContract(conf, binding.isSecureMode());
+  }
+
+  /**
+   * basic consistency across operations, as well as being non-empty.
+   */
+  @Test
+  public void testEtagConsistencyAcrossListAndHead() throws Throwable {
+    describe("Etag values must be non-empty and consistent across LIST and HEAD Calls.");
+    final Path path = methodPath();
+    final FileSystem fs = getFileSystem();
+    ContractTestUtils.touch(fs, path);
+    final ManifestStoreOperations operations = createManifestStoreOperations();
+    Assertions.assertThat(operations)
+        .describedAs("Store operations class loaded via Configuration")
+        .isInstanceOf(AbfsManifestStoreOperations.class);
+
+    final FileStatus st = operations.getFileStatus(path);
+    final String etag = operations.getEtag(st);
+    Assertions.assertThat(etag)
+        .describedAs("Etag of %s", st)
+        .isNotBlank();
+    LOG.info("etag of empty file is \"{}\"", etag);
+
+    final FileStatus[] statuses = fs.listStatus(path);
+    Assertions.assertThat(statuses)
+        .describedAs("List(%s)", path)
+        .hasSize(1);
+    final FileStatus lsStatus = statuses[0];
+    Assertions.assertThat(operations.getEtag(lsStatus))
+        .describedAs("etag of list status (%s) compared to HEAD value of %s", lsStatus, st)
+        .isEqualTo(etag);
+  }
+
+  /**
+   * Ovew

Review comment:
       fix Javadoc

##########
File path: hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsManifestManifestStoreOperations.java
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.commit;
+
+import java.nio.charset.StandardCharsets;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
+import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.AbstractManifestCommitterTest;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperations;
+
+import static org.apache.hadoop.fs.CommonPathCapabilities.ETAGS_PRESERVED_IN_RENAME;
+import static org.apache.hadoop.fs.azurebfs.commit.AbfsCommitTestHelper.prepareTestConfiguration;
+import static org.junit.Assume.assumeTrue;
+
+/**
+ * Test {@link AbfsManifestStoreOperations}.
+ * As this looks at etag handling through FS operations, it's actually testing how etags work
+ * in ABFS (preservation across renames) and in the client (are they consisten

Review comment:
       typo: "consistent"

##########
File path: hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/AbstractAbfsClusterITest.java
##########
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.commit;
+
+import java.io.IOException;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+
+import org.junit.AfterClass;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.azure.integration.AzureTestConstants;
+import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
+import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.AbstractManifestCommitterTest;
+import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
+import org.apache.hadoop.util.DurationInfo;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.fs.azure.integration.AzureTestUtils.assumeScaleTestsEnabled;
+import static org.apache.hadoop.io.IOUtils.closeStream;
+
+/**
+ * Tests which create a yarn minicluster.
+ * These are all considered scale tests; the probe for
+ * scale tests being enabled is executed before the cluster
+ * is set up to avoid wasting time on non-scale runs.
+ */
+public abstract class AbstractAbfsClusterITest extends
+    AbstractManifestCommitterTest {
+
+  public static final int NO_OF_NODEMANAGERS = 2;
+
+  private final ABFSContractTestBinding binding;
+
+
+  /**
+   * The static cluster binding with the lifecycle of this test; served
+   * through instance-level methods for sharing across methods in the
+   * suite.
+   */
+  @SuppressWarnings("StaticNonFinalField")
+  private static ClusterBinding clusterBinding;
+
+  protected AbstractAbfsClusterITest() throws Exception {
+    binding = new ABFSContractTestBinding();
+  }
+
+  @Override
+  protected int getTestTimeoutMillis() {
+    return AzureTestConstants.SCALE_TEST_TIMEOUT_MILLIS;
+  }
+
+  @Override
+  public void setup() throws Exception {
+    binding.setup();
+    super.setup();
+    requireScaleTestsEnabled();
+    if (getClusterBinding() == null) {
+      clusterBinding = demandCreateClusterBinding();
+    }
+    assertNotNull("cluster is not bound", getClusterBinding());
+  }
+
+  @AfterClass
+  public static void teardownClusters() throws IOException {
+    terminateCluster(clusterBinding);
+    clusterBinding = null;
+  }
+
+  @Override
+  protected AbstractFSContract createContract(final Configuration conf) {
+    return new AbfsFileSystemContract(conf, binding.isSecureMode());
+  }
+
+  @Override
+  protected Configuration createConfiguration() {
+    return AbfsCommitTestHelper.prepareTestConfiguration(binding);
+  }
+
+  /**
+   * This is the cluster binding which every subclass must create.
+   */
+  protected static final class ClusterBinding {
+
+    private String clusterName;
+
+    private final MiniMRYarnCluster yarn;
+
+    public ClusterBinding(
+        final String clusterName,
+        final MiniMRYarnCluster yarn) {
+      this.clusterName = clusterName;
+      this.yarn = requireNonNull(yarn);
+    }
+
+
+    /**
+     * Get the cluster FS, which will either be HDFS or the local FS.
+     * @return a filesystem.
+     * @throws IOException failure
+     */
+    public FileSystem getClusterFS() throws IOException {
+      return FileSystem.getLocal(yarn.getConfig());
+    }
+
+    public MiniMRYarnCluster getYarn() {
+      return yarn;
+    }
+
+    public Configuration getConf() {
+      return getYarn().getConfig();
+    }
+
+    public String getClusterName() {
+      return clusterName;
+    }
+
+    public void terminate() {
+      closeStream(getYarn());
+    }
+  }
+
+  /**
+   * Create the cluster binding.
+   * The configuration will be patched by propagating down options
+   * from the maven build (S3Guard binding etc) and turning off unwanted
+   * YARN features.
+   *
+   * If an HDFS cluster is requested,
+   * the HDFS and YARN clusters will share the same configuration, so
+   * the HDFS cluster binding is implicitly propagated to YARN.
+   * If one is not requested, the local filesystem is used as the cluster FS.
+   * @param conf configuration to start with.
+   * @param useHDFS should an HDFS cluster be instantiated.
+   * @return the cluster binding.
+   * @throws IOException failure.
+   */
+  protected static ClusterBinding createCluster(
+      final JobConf conf) throws IOException {
+    try (DurationInfo d = new DurationInfo(LOG, "Creating YARN MiniCluster")) {
+      conf.setBoolean(JHAdminConfig.MR_HISTORY_CLEANER_ENABLE, false);
+      // create a unique cluster name based on the current time in millis.
+      String timestamp = LocalDateTime.now().format(
+          DateTimeFormatter.ofPattern("yyyy-MM-dd-HH.mm.ss.SS"));
+      String clusterName = "yarn-" + timestamp;
+      MiniMRYarnCluster yarnCluster =
+          new MiniMRYarnCluster(clusterName, NO_OF_NODEMANAGERS);
+      yarnCluster.init(conf);
+      yarnCluster.start();
+      return new ClusterBinding(clusterName, yarnCluster);
+    }
+  }
+
+  /**
+   * Terminate the cluster if it is not null.
+   * @param cluster the cluster
+   */
+  protected static void terminateCluster(ClusterBinding cluster) {
+    if (cluster != null) {
+      cluster.terminate();
+    }
+  }
+
+  /**
+   * Get the cluster binding for this subclass.
+   * @return the cluster binding
+   */
+  protected ClusterBinding getClusterBinding() {
+    return clusterBinding;
+  }
+
+  protected MiniMRYarnCluster getYarn() {
+    return getClusterBinding().getYarn();
+  }
+
+
+  /**
+   * We stage work into a temporary directory rather than directly under
+   * the user's home directory, as that is often rejected by CI test
+   * runners.
+   */
+  @Rule
+  public final TemporaryFolder stagingFilesDir = new TemporaryFolder();
+
+
+  /**
+   * binding on demand rather than in a BeforeClass static method.
+   * Subclasses can override this to change the binding options.
+   * @return the cluster binding
+   */
+  protected ClusterBinding demandCreateClusterBinding() throws Exception {
+    return createCluster(new JobConf());
+  }
+
+  /**
+   * Create a job configuration.
+   * This creates a new job conf from the yarn
+   * cluster configuration then calls
+   * {@link #applyCustomConfigOptions(JobConf)} to allow it to be customized.
+   * @return the new job configuration.
+   * @throws IOException failure
+   */
+  protected JobConf newJobConf() throws IOException {
+    JobConf jobConf = new JobConf(getYarn().getConfig());
+    jobConf.addResource(getConfiguration());
+    applyCustomConfigOptions(jobConf);
+    return jobConf;
+  }
+
+
+  protected Job createJob(Configuration jobConf) throws IOException {

Review comment:
       We never use this method

##########
File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestCreateOutputDirectoriesStage.java
##########
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.DirEntry;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.EntryStatus;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.CreateOutputDirectoriesStage;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.SetupJobStage;
+import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.StageConfig;
+import org.apache.hadoop.util.Lists;
+
+import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupCounterStatistic;
+import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
+import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_DELETE;
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_GET_FILE_STATUS;
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_IS_FILE;
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_MKDIRS;
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_FAILURES;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_DELETE_FILE_UNDER_DESTINATION;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_PREPARE_DIR_ANCESTORS;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Test directory creation.
+ * As the directory creation phase relies on input from the task manifest to
+ * determine which directories to explicitly create, delete files at
+ * etc, these tests build up manifests and assert that the output
+ * of the directory creation stage matches that of the combination
+ * of the manifest and the filesystem state.
+ */
+public class TestCreateOutputDirectoriesStage extends AbstractManifestCommitterTest {
+
+  /**
+   * Deep tree width, subclasses (including in external projects)
+   * may change.
+   */
+  protected static final int DEEP_TREE_WIDTH = 4;
+
+  /**
+   * The number of directories created in test setup; this must be
+   * added to all assertions of the value of OP_MKDIRS.
+   */
+  private static final int DIRECTORIES_CREATED_IN_SETUP = 2;
+
+  private Path destDir;
+  private CreateOutputDirectoriesStage mkdirStage;
+  private StageConfig stageConfig;
+  private IOStatisticsStore iostats;
+
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+    destDir = methodPath();
+    // clean up dest dir completely
+    destDir.getFileSystem(getConfiguration()).delete(destDir, true);
+    setStoreOperations(createManifestStoreOperations());
+    stageConfig = createStageConfigForJob(JOB1, destDir)
+        .withDeleteTargetPaths(true);
+    setJobStageConfig(stageConfig);
+    // creates the job directories.
+    new SetupJobStage(stageConfig).apply(true);
+    mkdirStage = new CreateOutputDirectoriesStage(stageConfig);
+    iostats = stageConfig.getIOStatistics();
+    // assert original count of dirs created == 2 : job and task manifest
+    verifyStatisticCounterValue(iostats, OP_MKDIRS,
+        DIRECTORIES_CREATED_IN_SETUP);
+    // reset the value to simplify future work
+    iostats.getCounterReference(OP_MKDIRS).set(0);
+  }
+
+  @Test
+  public void testPrepareSomeDirs() throws Throwable {
+
+    final long initialFileStatusCount = lookupCounterStatistic(iostats, OP_GET_FILE_STATUS);
+    final int dirCount = 8;
+    final List<Path> dirs = subpaths(destDir, dirCount);
+    final List<DirEntry> dirEntries = dirEntries(dirs, 1, EntryStatus.not_found);
+
+    // two manifests with duplicate entries
+    final List<TaskManifest> manifests = Lists.newArrayList(
+        manifestWithDirsToCreate(dirEntries),
+        manifestWithDirsToCreate(dirEntries));
+    final CreateOutputDirectoriesStage.Result result = mkdirStage.apply(manifests);
+    Assertions.assertThat(result.getCreatedDirectories())
+        .describedAs("output of %s", mkdirStage)
+        .containsExactlyInAnyOrderElementsOf(dirs);
+
+    LOG.info("Job Statistics\n{}", ioStatisticsToPrettyString(iostats));
+
+    // now dirCount new dirs are added.
+    verifyStatisticCounterValue(iostats, OP_MKDIRS, dirCount);
+
+    // now rerun the same preparation sequence, but this
+    // time declare that the directories exist (as they do)
+    final CreateOutputDirectoriesStage s2 =
+        new CreateOutputDirectoriesStage(stageConfig);
+    final CreateOutputDirectoriesStage.Result r2 = s2.apply(
+        Lists.newArrayList(
+            manifestWithDirsToCreate(dirEntries(dirs, 1, EntryStatus.dir))));
+
+    // no directories are now created.
+    Assertions.assertThat(r2.getCreatedDirectories())
+        .describedAs("output of %s", s2)
+        .isEmpty();
+    LOG.info("Job Statistics after second pass\n{}", ioStatisticsToPrettyString(iostats));
+
+    // second run probed no dest dirs
+    verifyStatisticCounterValue(iostats, OP_GET_FILE_STATUS, initialFileStatusCount);
+    // and no new mkdir calls were made
+    verifyStatisticCounterValue(iostats, OP_MKDIRS, dirCount);
+    verifyStatisticCounterValue(iostats, OP_DELETE_FILE_UNDER_DESTINATION, 0);
+    verifyStatisticCounterValue(iostats, OP_IS_FILE, 0);
+  }
+
+  /**
+   * Given a list of paths, build a list of DirEntry entries.
+   * @param paths list of paths
+   * @param level Level in the treewalk.
+   * @param entryStatus status of dirs
+   * @return list of entries with  the given level and entry status.
+   */
+  protected List<DirEntry> dirEntries(Collection<Path> paths,
+      int level,
+      EntryStatus entryStatus) {
+    return paths.stream()
+        .map(p -> DirEntry.dirEntry(p, entryStatus, level))
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Create a manifest with the list of directory entries added.
+   * Job commit requires the entries to have been probed for, and
+   * for the entire tree under the dest path to be included.
+   * @param dirEntries list of directory entries.
+   * @return the manifest.
+   */
+  protected TaskManifest manifestWithDirsToCreate(List<DirEntry> dirEntries) {
+    final TaskManifest taskManifest = new TaskManifest();
+    taskManifest.getDestDirectories().addAll(dirEntries);
+    return taskManifest;
+  }
+
+  /**
+   * Assert the directory map status of a path.
+   * @param result stage result
+   * @param path path to look up
+   * @param expected expected value.
+   */
+  private static void assertDirMapStatus(
+      CreateOutputDirectoriesStage.Result result,
+      Path path,
+      CreateOutputDirectoriesStage.DirMapState expected) {
+    Assertions.assertThat(result.getDirMap())
+        .describedAs("Directory Map entry for %s", path)
+        .isNotNull()
+        .containsKey(path)
+        .containsEntry(path, expected);
+  }
+
+  /**
+   * Prepare a deep tree {@code c ^ 3} of entries.

Review comment:
       `@code` ?

##########
File path: hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
##########
@@ -505,9 +530,73 @@ public AbfsRestOperation renamePath(String source, final String destination,
             HTTP_METHOD_PUT,
             url,
             requestHeaders);
-    // no attempt at recovery using timestamps as it was not reliable.
-    op.execute(tracingContext);
-    return op;
+    try {
+      op.execute(tracingContext);
+      return Pair.of(op, false);
+    } catch (AzureBlobFileSystemException e) {
+        // If we have no HTTP response, throw the original exception.
+        if (!op.hasResult()) {
+          throw e;
+        }
+        boolean etagCheckSucceeded = renameIdempotencyCheckOp(
+            source,
+            sourceEtag, op, destination, tracingContext);
+        if (!etagCheckSucceeded) {
+          // idempotency did not return different result
+          // throw back the exception
+          throw e;
+        }
+      return Pair.of(op, true);
+    }
+  }
+
+  /**
+   * Check if the rename request failure is post a retry and if earlier rename
+   * request might have succeeded at back-end.
+   *
+   * If a source etag was passed in, and the error was 404, get the
+   * etag of any file at the destination.
+   * If it matches the source etag, then the rename is considered
+   * a success.
+   * Exceptions raised in the probe of the destination are swallowed,
+   * so that they do not interfere with the original rename failures.
+   * @param source source path
+   * @param op Rename request REST operation response with non-null HTTP response
+   * @param destination rename destination path
+   * @param sourceEtag etag of source file. may be null or empty
+   * @param tracingContext Tracks identifiers for request header
+   * @return true if the file was successfully copied
+   */
+  public boolean renameIdempotencyCheckOp(
+      final String source,
+      final String sourceEtag,
+      final AbfsRestOperation op,
+      final String destination,
+      TracingContext tracingContext) {
+    Preconditions.checkArgument(op.hasResult(), "Operations has null HTTP response");
+
+    if ((op.isARetriedRequest())
+        && (op.getResult().getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND)
+        && isNotEmpty(sourceEtag)) {
+
+      // Server has returned HTTP 404, which means rename source no longer
+      // exists. Check on destination status and if its etag matches
+      // that of the source, consider it to be a success.
+      LOG.debug("rename {} to {} failed, checking etag of destination",
+          source, destination);
+
+      try {
+        final AbfsRestOperation destStatusOp = getPathStatus(destination,
+            false, tracingContext);
+        final AbfsHttpOperation result = destStatusOp.getResult();
+
+        return result.getStatusCode() == HttpURLConnection.HTTP_OK
+            && sourceEtag.equals(extractEtagHeader(result));
+      } catch (AzureBlobFileSystemException ignored) {
+        // GetFileStatus on the destination failed, the rename did not take place

Review comment:
       Can we add Debug logs here for this scenario?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org