You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-issues@hadoop.apache.org by GitBox <gi...@apache.org> on 2021/07/19 16:24:57 UTC

[GitHub] [hadoop] mukund-thakur commented on a change in pull request #2971: MAPREDUCE-7341. Intermediate Manifest Committer

mukund-thakur commented on a change in pull request #2971:
URL: https://github.com/apache/hadoop/pull/2971#discussion_r672449087



##########
File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitterConfig.java
##########
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest;
+
+import java.io.IOException;
+import java.util.Objects;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+
+import static org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.DEFAULT_CREATE_SUCCESSFUL_JOB_DIR_MARKER;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_IO_PROCESSORS;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_IO_PROCESSORS_DEFAULT;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_VALIDATE_OUTPUT;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_VALIDATE_OUTPUT_DEFAULT;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterSupport.buildJobUUID;
+import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterSupport.getAppAttemptId;
+
+/**
+ * The configuration for the committer as built up from the job configuration
+ * and data passed down from the committer factory.
+ * Isolated for ease of dev/test
+ */
+final class ManifestCommitterConfig implements IOStatisticsSource {
+
+  /**
+   * Final destination of work.
+   * This is <i>unqualified</i>.
+   */
+  private final Path destinationDir;
+
+  /**
+   * Role: used in log/text messages.
+   */
+  private final String role;
+
+  /**
+   * This is the directory for all intermediate work: where the output
+   * format will write data.
+   */
+  private final Path taskAttemptDir;
+
+  /** Configuration of the job. */
+  private final Configuration conf;
+
+  /** The job context. For a task, this can be cast to a TaskContext. */
+  private final JobContext jobContext;
+
+  /** Should a job marker be created? */
+  private final boolean createJobMarker;
+
+  /**
+   * Job ID Or UUID -without any attempt suffix.
+   * This is expected/required to be unique, though
+   * Spark has had "issues" there until recently
+   * with lack of uniqueness of generated MR Job IDs.
+   */
+  private final String jobUniqueId;
+
+  /**
+   * Where did the job Unique ID come from?
+   */
+  private final String jobUniqueIdSource;
+
+  /**
+   * Number of this attempt; starts at zero.
+   */
+  private final int jobAttemptNumber;
+
+  /**
+   * Job ID + AttemptID.
+   */
+  private final String jobAttemptId;
+
+  /**
+   * Task ID: used as the filename of the manifest.
+   */
+  private final String taskId;
+
+  /**
+   * Task attempt ID. Determines the working
+   * directory for task attempts to write data into,
+   * and for the task committer to scan.
+   */
+  private final String taskAttemptId;
+
+  /** Any progressable for progress callbacks. */
+  private final Progressable progressable;
+
+  /**
+   * IOStatistics to update.
+   */
+  private final IOStatisticsStore iostatistics;
+
+  /** Should the output be validated after the commit? */
+  private final boolean validateOutput;
+
+  /**
+   * Attempt directory management.
+   */
+  private final ManifestCommitterSupport.AttemptDirectories dirs;
+
+  /**
+   * Callback when a stage is entered.
+   */
+  private final StageEventCallbacks stageEventCallbacks;
+
+  /**
+   * Constructor.
+   * @param outputPath destination path of the job.
+   * @param role role for log messages.
+   * @param context job/task context
+   * @param iostatistics IO Statistics
+   * @param stageEventCallbacks stage event callbacks.
+   */
+  ManifestCommitterConfig(
+      final Path outputPath,
+      final String role,
+      final JobContext context,
+      final IOStatisticsStore iostatistics,
+      final StageEventCallbacks stageEventCallbacks) {
+    this.role = role;
+    this.jobContext = context;
+    this.conf = context.getConfiguration();
+    this.destinationDir = outputPath;
+    this.iostatistics = iostatistics;
+    this.stageEventCallbacks = stageEventCallbacks;
+    this.createJobMarker = conf.getBoolean(
+        SUCCESSFUL_JOB_OUTPUT_DIR_MARKER,
+        DEFAULT_CREATE_SUCCESSFUL_JOB_DIR_MARKER);
+    this.validateOutput = conf.getBoolean(
+        OPT_VALIDATE_OUTPUT,
+        OPT_VALIDATE_OUTPUT_DEFAULT);
+    Pair<String, String> pair = buildJobUUID(conf, context.getJobID());
+    this.jobUniqueId = pair.getLeft();
+    this.jobUniqueIdSource = pair.getRight();
+    this.jobAttemptNumber = getAppAttemptId(context);
+    this.jobAttemptId = this.jobUniqueId + "_" + jobAttemptNumber;
+
+    // build directories
+    this.dirs = new ManifestCommitterSupport.AttemptDirectories(outputPath,
+        this.jobUniqueId, jobAttemptNumber);
+
+    // if constructed with a task attempt, build the task ID and path.
+    if (context instanceof TaskAttemptContext) {
+      // it's a task
+      final TaskAttemptContext tac = (TaskAttemptContext) context;
+      TaskAttemptID taskAttempt = Objects.requireNonNull(
+          tac.getTaskAttemptID());
+      taskAttemptId = taskAttempt.toString();
+      taskId = taskAttempt.getTaskID().toString();
+      // Task attempt dir; must be different across instances
+      taskAttemptDir = dirs.getTaskAttemptPath(taskAttemptId);
+      // the context is also the progress callback.
+      progressable = tac;
+
+    } else {

Review comment:
       I am wondering why can't we separate out Task and Job using different classes?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org