You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2017/09/15 16:00:38 UTC

[1/2] hadoop git commit: MAPREDUCE-6956 FileOutputCommitter to gain abstract superclass PathOutputCommitter. Contributed by Steve Loughran

Repository: hadoop
Updated Branches:
  refs/heads/branch-3.0 ba26097ea -> b5e998235
  refs/heads/trunk 78bdf10ae -> 11390c2d1


MAPREDUCE-6956 FileOutputCommitter to gain abstract superclass PathOutputCommitter.
Contributed by Steve Loughran


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/11390c2d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/11390c2d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/11390c2d

Branch: refs/heads/trunk
Commit: 11390c2d111910b01d9c4d3e39dee49babae272f
Parents: 78bdf10
Author: Steve Loughran <st...@apache.org>
Authored: Fri Sep 15 16:59:04 2017 +0100
Committer: Steve Loughran <st...@apache.org>
Committed: Fri Sep 15 16:59:04 2017 +0100

----------------------------------------------------------------------
 .../lib/output/FileOutputCommitter.java         |  25 +-
 .../mapreduce/lib/output/FileOutputFormat.java  |  57 ++-
 .../lib/output/PathOutputCommitter.java         |  91 +++++
 .../hadoop/mapreduce/task/JobContextImpl.java   |  10 +-
 .../mapreduce/task/TaskAttemptContextImpl.java  |  13 +-
 .../lib/output/TestPathOutputCommitter.java     | 377 +++++++++++++++++++
 6 files changed, 553 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/11390c2d/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
index 9e750be..0061406 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
@@ -39,13 +39,14 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 
 /** An {@link OutputCommitter} that commits files specified 
  * in job output directory i.e. ${mapreduce.output.fileoutputformat.outputdir}.
  **/
 @InterfaceAudience.Public
 @InterfaceStability.Stable
-public class FileOutputCommitter extends OutputCommitter {
+public class FileOutputCommitter extends PathOutputCommitter {
   private static final Log LOG = LogFactory.getLog(FileOutputCommitter.class);
 
   /** 
@@ -101,8 +102,11 @@ public class FileOutputCommitter extends OutputCommitter {
   public FileOutputCommitter(Path outputPath, 
                              TaskAttemptContext context) throws IOException {
     this(outputPath, (JobContext)context);
-    if (outputPath != null) {
-      workPath = getTaskAttemptPath(context, outputPath);
+    if (getOutputPath() != null) {
+      workPath = Preconditions.checkNotNull(
+          getTaskAttemptPath(context, getOutputPath()),
+          "Null task attempt path in %s and output path %s",
+          context, outputPath);
     }
   }
   
@@ -116,6 +120,7 @@ public class FileOutputCommitter extends OutputCommitter {
   @Private
   public FileOutputCommitter(Path outputPath, 
                              JobContext context) throws IOException {
+    super(outputPath, context);
     Configuration conf = context.getConfiguration();
     algorithmVersion =
         conf.getInt(FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
@@ -705,4 +710,18 @@ public class FileOutputCommitter extends OutputCommitter {
       LOG.warn("Output Path is null in recoverTask()");
     }
   }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder(
+        "FileOutputCommitter{");
+    sb.append(super.toString()).append("; ");
+    sb.append("outputPath=").append(outputPath);
+    sb.append(", workPath=").append(workPath);
+    sb.append(", algorithmVersion=").append(algorithmVersion);
+    sb.append(", skipCleanup=").append(skipCleanup);
+    sb.append(", ignoreCleanupFailures=").append(ignoreCleanupFailures);
+    sb.append('}');
+    return sb.toString();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/11390c2d/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java
index c11f8d8..0e7efa3 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.mapreduce.lib.output;
 import java.io.IOException;
 import java.text.NumberFormat;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
@@ -38,11 +39,15 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** A base class for {@link OutputFormat}s that read from {@link FileSystem}s.*/
 @InterfaceAudience.Public
 @InterfaceStability.Stable
 public abstract class FileOutputFormat<K, V> extends OutputFormat<K, V> {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(FileOutputFormat.class);
 
   /** Construct output file names so that, when an output directory listing is
    * sorted lexicographically, positions correspond to output partitions.*/
@@ -53,12 +58,25 @@ public abstract class FileOutputFormat<K, V> extends OutputFormat<K, V> {
     NUMBER_FORMAT.setMinimumIntegerDigits(5);
     NUMBER_FORMAT.setGroupingUsed(false);
   }
-  private FileOutputCommitter committer = null;
-public static final String COMPRESS ="mapreduce.output.fileoutputformat.compress";
-public static final String COMPRESS_CODEC = 
-"mapreduce.output.fileoutputformat.compress.codec";
-public static final String COMPRESS_TYPE = "mapreduce.output.fileoutputformat.compress.type";
-public static final String OUTDIR = "mapreduce.output.fileoutputformat.outputdir";
+  private PathOutputCommitter committer = null;
+
+  /** Configuration option: should output be compressed? {@value}. */
+  public static final String COMPRESS =
+      "mapreduce.output.fileoutputformat.compress";
+
+  /** If compression is enabled, name of codec: {@value}. */
+  public static final String COMPRESS_CODEC =
+      "mapreduce.output.fileoutputformat.compress.codec";
+  /**
+   * Type of compression {@value}: NONE, RECORD, BLOCK.
+   * Generally only used in {@code SequenceFileOutputFormat}.
+   */
+  public static final String COMPRESS_TYPE =
+      "mapreduce.output.fileoutputformat.compress.type";
+
+  /** Destination directory of work: {@value}. */
+  public static final String OUTDIR =
+      "mapreduce.output.fileoutputformat.outputdir";
 
   @Deprecated
   public enum Counter {
@@ -110,14 +128,14 @@ public static final String OUTDIR = "mapreduce.output.fileoutputformat.outputdir
    */
   public static Class<? extends CompressionCodec> 
   getOutputCompressorClass(JobContext job, 
-		                       Class<? extends CompressionCodec> defaultValue) {
+                       Class<? extends CompressionCodec> defaultValue) {
     Class<? extends CompressionCodec> codecClass = defaultValue;
     Configuration conf = job.getConfiguration();
     String name = conf.get(FileOutputFormat.COMPRESS_CODEC);
     if (name != null) {
       try {
-        codecClass = 
-        	conf.getClassByName(name).asSubclass(CompressionCodec.class);
+        codecClass =
+            conf.getClassByName(name).asSubclass(CompressionCodec.class);
       } catch (ClassNotFoundException e) {
         throw new IllegalArgumentException("Compression codec " + name + 
                                            " was not found.", e);
@@ -219,9 +237,11 @@ public static final String OUTDIR = "mapreduce.output.fileoutputformat.outputdir
   public static Path getWorkOutputPath(TaskInputOutputContext<?,?,?,?> context
                                        ) throws IOException, 
                                                 InterruptedException {
-    FileOutputCommitter committer = (FileOutputCommitter) 
+    PathOutputCommitter committer = (PathOutputCommitter)
       context.getOutputCommitter();
-    return committer.getWorkPath();
+    Path workPath = committer.getWorkPath();
+    LOG.debug("Work path is {}", workPath);
+    return workPath;
   }
 
   /**
@@ -281,10 +301,17 @@ public static final String OUTDIR = "mapreduce.output.fileoutputformat.outputdir
    */
   public Path getDefaultWorkFile(TaskAttemptContext context,
                                  String extension) throws IOException{
-    FileOutputCommitter committer = 
-      (FileOutputCommitter) getOutputCommitter(context);
-    return new Path(committer.getWorkPath(), getUniqueFile(context, 
-      getOutputName(context), extension));
+    OutputCommitter c = getOutputCommitter(context);
+    Preconditions.checkState(c instanceof PathOutputCommitter,
+        "Committer %s is not a PathOutputCommitter", c);
+    Path workPath = ((PathOutputCommitter) c).getWorkPath();
+    Preconditions.checkNotNull(workPath,
+        "Null workPath returned by committer %s", c);
+    Path workFile = new Path(workPath,
+        getUniqueFile(context, getOutputName(context), extension));
+    LOG.debug("Work file for {} extension '{}' is {}",
+        context, extension, workFile);
+    return workFile;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/11390c2d/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PathOutputCommitter.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PathOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PathOutputCommitter.java
new file mode 100644
index 0000000..2df30ba
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PathOutputCommitter.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output;
+
+import java.io.IOException;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * A committer which somehow commits data written to a working directory
+ * to the final directory during the commit process. The reference
+ * implementation of this is the {@link FileOutputCommitter}.
+ *
+ * There are two constructors, both of which do nothing but long and
+ * validate their arguments.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public abstract class PathOutputCommitter extends OutputCommitter {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(PathOutputCommitter.class);
+
+  private final JobContext context;
+
+  /**
+   * Constructor for a task attempt.
+   * Subclasses should provide a public constructor with this signature.
+   * @param outputPath output path: may be null
+   * @param context task context
+   * @throws IOException IO problem
+   */
+  protected PathOutputCommitter(Path outputPath,
+      TaskAttemptContext context) throws IOException {
+    this.context = Preconditions.checkNotNull(context, "Null context");
+    LOG.debug("Creating committer with output path {} and task context"
+        + " {}", outputPath, context);
+  }
+
+  /**
+   * Constructor for a job attempt.
+   * Subclasses should provide a public constructor with this signature.
+   * @param outputPath output path: may be null
+   * @param context task context
+   * @throws IOException IO problem
+   */
+  protected PathOutputCommitter(Path outputPath,
+      JobContext context) throws IOException {
+    this.context = Preconditions.checkNotNull(context, "Null context");
+    LOG.debug("Creating committer with output path {} and job context"
+        + " {}", outputPath, context);
+  }
+
+  /**
+   * Get the directory that the task should write results into.
+   * Warning: there's no guarantee that this work path is on the same
+   * FS as the final output, or that it's visible across machines.
+   * @return the work directory
+   * @throws IOException IO problem
+   */
+  public abstract Path getWorkPath() throws IOException;
+
+  @Override
+  public String toString() {
+    return "PathOutputCommitter{context=" + context + '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/11390c2d/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java
index b9014ef..1696246 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java
@@ -451,5 +451,13 @@ public class JobContextImpl implements JobContext {
   public Credentials getCredentials() {
     return credentials;
   }
-  
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder(
+        "JobContextImpl{");
+    sb.append("jobId=").append(jobId);
+    sb.append('}');
+    return sb.toString();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/11390c2d/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/TaskAttemptContextImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/TaskAttemptContextImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/TaskAttemptContextImpl.java
index 333f57b..a622d3a 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/TaskAttemptContextImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/TaskAttemptContextImpl.java
@@ -118,4 +118,15 @@ public class TaskAttemptContextImpl extends JobContextImpl
   public float getProgress() {
     return reporter.getProgress();
   }
-}
\ No newline at end of file
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder(
+        "TaskAttemptContextImpl{");
+    sb.append(super.toString());
+    sb.append("; taskId=").append(taskId);
+    sb.append(", status='").append(status).append('\'');
+    sb.append('}');
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/11390c2d/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestPathOutputCommitter.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestPathOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestPathOutputCommitter.java
new file mode 100644
index 0000000..9cff82f
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestPathOutputCommitter.java
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.hadoop.security.Credentials;
+
+/**
+ * Test the path output committer binding to FileOutputFormat.
+ */
+public class TestPathOutputCommitter extends Assert {
+
+  @Test
+  public void testFileOutputCommitterOverrride() throws Throwable {
+    TaskContext context = new TaskContext();
+    Path workPath = new Path("file:///work");
+    context.setOutputCommitter(
+        new SimpleCommitter(new Path("/"), context, workPath));
+    assertEquals(workPath, FileOutputFormat.getWorkOutputPath(context));
+  }
+
+  @Test
+  public void testFileOutputCommitterNullWorkPath() throws Throwable {
+    TaskContext context = new TaskContext();
+    context.setOutputCommitter(
+        new SimpleCommitter(new Path("/"), context, null));
+    assertNull(FileOutputFormat.getWorkOutputPath(context));
+  }
+
+  private static class SimpleCommitter extends PathOutputCommitter {
+
+    private final Path workPath;
+
+    SimpleCommitter(Path outputPath,
+        TaskAttemptContext context, Path workPath) throws IOException {
+      super(outputPath, context);
+      this.workPath = workPath;
+    }
+
+    SimpleCommitter(Path outputPath,
+        JobContext context, Path workPath) throws IOException {
+      super(outputPath, context);
+      this.workPath = workPath;
+    }
+
+    @Override
+    public Path getWorkPath() throws IOException {
+      return workPath;
+    }
+
+    @Override
+    public void setupJob(JobContext jobContext) throws IOException {
+
+    }
+
+    @Override
+    public void setupTask(TaskAttemptContext taskContext) throws IOException {
+
+    }
+
+    @Override
+    public boolean needsTaskCommit(TaskAttemptContext taskContext)
+        throws IOException {
+      return false;
+    }
+
+    @Override
+    public void commitTask(TaskAttemptContext taskContext) throws IOException {
+
+    }
+
+    @Override
+    public void abortTask(TaskAttemptContext taskContext) throws IOException {
+
+    }
+  }
+
+  /**
+   * Stub task context.
+   */
+  public class TaskContext
+      implements TaskInputOutputContext<String, String, String, String> {
+
+    private OutputCommitter outputCommitter;
+
+    public void setOutputCommitter(OutputCommitter outputCommitter) {
+      this.outputCommitter = outputCommitter;
+    }
+
+    @Override
+    public OutputCommitter getOutputCommitter() {
+      return outputCommitter;
+    }
+
+    @Override
+    public boolean nextKeyValue() throws IOException, InterruptedException {
+      return false;
+    }
+
+    @Override
+    public String getCurrentKey() throws IOException, InterruptedException {
+      return null;
+    }
+
+    @Override
+    public String getCurrentValue() throws IOException, InterruptedException {
+      return null;
+    }
+
+    @Override
+    public void write(String key, String value)
+        throws IOException, InterruptedException {
+    }
+
+
+    @Override
+    public TaskAttemptID getTaskAttemptID() {
+      return null;
+    }
+
+    @Override
+    public void setStatus(String msg) {
+    }
+
+    @Override
+    public String getStatus() {
+      return null;
+    }
+
+    @Override
+    public float getProgress() {
+      return 0;
+    }
+
+    @Override
+    public Counter getCounter(Enum<?> counterName) {
+      return null;
+    }
+
+    @Override
+    public Counter getCounter(String groupName, String counterName) {
+      return null;
+    }
+
+    @Override
+    public Configuration getConfiguration() {
+      return null;
+    }
+
+    @Override
+    public Credentials getCredentials() {
+      return null;
+    }
+
+    @Override
+    public JobID getJobID() {
+      return null;
+    }
+
+    @Override
+    public int getNumReduceTasks() {
+      return 0;
+    }
+
+    @Override
+    public Path getWorkingDirectory() throws IOException {
+      return null;
+    }
+
+    @Override
+    public Class<?> getOutputKeyClass() {
+      return null;
+    }
+
+    @Override
+    public Class<?> getOutputValueClass() {
+      return null;
+    }
+
+    @Override
+    public Class<?> getMapOutputKeyClass() {
+      return null;
+    }
+
+    @Override
+    public Class<?> getMapOutputValueClass() {
+      return null;
+    }
+
+    @Override
+    public String getJobName() {
+      return null;
+    }
+
+    @Override
+    public Class<? extends InputFormat<?, ?>> getInputFormatClass()
+        throws ClassNotFoundException {
+      return null;
+    }
+
+    @Override
+    public Class<? extends Mapper<?, ?, ?, ?>> getMapperClass()
+        throws ClassNotFoundException {
+      return null;
+    }
+
+    @Override
+    public Class<? extends Reducer<?, ?, ?, ?>> getCombinerClass()
+        throws ClassNotFoundException {
+      return null;
+    }
+
+    @Override
+    public Class<? extends Reducer<?, ?, ?, ?>> getReducerClass()
+        throws ClassNotFoundException {
+      return null;
+    }
+
+    @Override
+    public Class<? extends OutputFormat<?, ?>> getOutputFormatClass()
+        throws ClassNotFoundException {
+      return null;
+    }
+
+    @Override
+    public Class<? extends Partitioner<?, ?>> getPartitionerClass()
+        throws ClassNotFoundException {
+      return null;
+    }
+
+    @Override
+    public RawComparator<?> getSortComparator() {
+      return null;
+    }
+
+    @Override
+    public String getJar() {
+      return null;
+    }
+
+    @Override
+    public RawComparator<?> getCombinerKeyGroupingComparator() {
+      return null;
+    }
+
+    @Override
+    public RawComparator<?> getGroupingComparator() {
+      return null;
+    }
+
+    @Override
+    public boolean getJobSetupCleanupNeeded() {
+      return false;
+    }
+
+    @Override
+    public boolean getTaskCleanupNeeded() {
+      return false;
+    }
+
+    @Override
+    public boolean getProfileEnabled() {
+      return false;
+    }
+
+    @Override
+    public String getProfileParams() {
+      return null;
+    }
+
+    @Override
+    public Configuration.IntegerRanges getProfileTaskRange(boolean isMap) {
+      return null;
+    }
+
+    @Override
+    public String getUser() {
+      return null;
+    }
+
+    @Override
+    public boolean getSymlink() {
+      return false;
+    }
+
+    @Override
+    public Path[] getArchiveClassPaths() {
+      return new Path[0];
+    }
+
+    @Override
+    public URI[] getCacheArchives() throws IOException {
+      return new URI[0];
+    }
+
+    @Override
+    public URI[] getCacheFiles() throws IOException {
+      return new URI[0];
+    }
+
+    @Override
+    public Path[] getLocalCacheArchives() throws IOException {
+      return new Path[0];
+    }
+
+    @Override
+    public Path[] getLocalCacheFiles() throws IOException {
+      return new Path[0];
+    }
+
+    @Override
+    public Path[] getFileClassPaths() {
+      return new Path[0];
+    }
+
+    @Override
+    public String[] getArchiveTimestamps() {
+      return new String[0];
+    }
+
+    @Override
+    public String[] getFileTimestamps() {
+      return new String[0];
+    }
+
+    @Override
+    public int getMaxMapAttempts() {
+      return 0;
+    }
+
+    @Override
+    public int getMaxReduceAttempts() {
+      return 0;
+    }
+
+    @Override
+    public void progress() {
+    }
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[2/2] hadoop git commit: MAPREDUCE-6956 FileOutputCommitter to gain abstract superclass PathOutputCommitter. Contributed by Steve Loughran

Posted by st...@apache.org.
MAPREDUCE-6956 FileOutputCommitter to gain abstract superclass PathOutputCommitter.
Contributed by Steve Loughran

(cherry picked from commit 11390c2d111910b01d9c4d3e39dee49babae272f)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b5e99823
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b5e99823
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b5e99823

Branch: refs/heads/branch-3.0
Commit: b5e9982355a9f1392ed70c4731e5ecab40ac6c45
Parents: ba26097
Author: Steve Loughran <st...@apache.org>
Authored: Fri Sep 15 17:00:16 2017 +0100
Committer: Steve Loughran <st...@apache.org>
Committed: Fri Sep 15 17:00:16 2017 +0100

----------------------------------------------------------------------
 .../lib/output/FileOutputCommitter.java         |  25 +-
 .../mapreduce/lib/output/FileOutputFormat.java  |  57 ++-
 .../lib/output/PathOutputCommitter.java         |  91 +++++
 .../hadoop/mapreduce/task/JobContextImpl.java   |  10 +-
 .../mapreduce/task/TaskAttemptContextImpl.java  |  13 +-
 .../lib/output/TestPathOutputCommitter.java     | 377 +++++++++++++++++++
 6 files changed, 553 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5e99823/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
index 9e750be..0061406 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
@@ -39,13 +39,14 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 
 /** An {@link OutputCommitter} that commits files specified 
  * in job output directory i.e. ${mapreduce.output.fileoutputformat.outputdir}.
  **/
 @InterfaceAudience.Public
 @InterfaceStability.Stable
-public class FileOutputCommitter extends OutputCommitter {
+public class FileOutputCommitter extends PathOutputCommitter {
   private static final Log LOG = LogFactory.getLog(FileOutputCommitter.class);
 
   /** 
@@ -101,8 +102,11 @@ public class FileOutputCommitter extends OutputCommitter {
   public FileOutputCommitter(Path outputPath, 
                              TaskAttemptContext context) throws IOException {
     this(outputPath, (JobContext)context);
-    if (outputPath != null) {
-      workPath = getTaskAttemptPath(context, outputPath);
+    if (getOutputPath() != null) {
+      workPath = Preconditions.checkNotNull(
+          getTaskAttemptPath(context, getOutputPath()),
+          "Null task attempt path in %s and output path %s",
+          context, outputPath);
     }
   }
   
@@ -116,6 +120,7 @@ public class FileOutputCommitter extends OutputCommitter {
   @Private
   public FileOutputCommitter(Path outputPath, 
                              JobContext context) throws IOException {
+    super(outputPath, context);
     Configuration conf = context.getConfiguration();
     algorithmVersion =
         conf.getInt(FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
@@ -705,4 +710,18 @@ public class FileOutputCommitter extends OutputCommitter {
       LOG.warn("Output Path is null in recoverTask()");
     }
   }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder(
+        "FileOutputCommitter{");
+    sb.append(super.toString()).append("; ");
+    sb.append("outputPath=").append(outputPath);
+    sb.append(", workPath=").append(workPath);
+    sb.append(", algorithmVersion=").append(algorithmVersion);
+    sb.append(", skipCleanup=").append(skipCleanup);
+    sb.append(", ignoreCleanupFailures=").append(ignoreCleanupFailures);
+    sb.append('}');
+    return sb.toString();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5e99823/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java
index c11f8d8..0e7efa3 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.mapreduce.lib.output;
 import java.io.IOException;
 import java.text.NumberFormat;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
@@ -38,11 +39,15 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** A base class for {@link OutputFormat}s that read from {@link FileSystem}s.*/
 @InterfaceAudience.Public
 @InterfaceStability.Stable
 public abstract class FileOutputFormat<K, V> extends OutputFormat<K, V> {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(FileOutputFormat.class);
 
   /** Construct output file names so that, when an output directory listing is
    * sorted lexicographically, positions correspond to output partitions.*/
@@ -53,12 +58,25 @@ public abstract class FileOutputFormat<K, V> extends OutputFormat<K, V> {
     NUMBER_FORMAT.setMinimumIntegerDigits(5);
     NUMBER_FORMAT.setGroupingUsed(false);
   }
-  private FileOutputCommitter committer = null;
-public static final String COMPRESS ="mapreduce.output.fileoutputformat.compress";
-public static final String COMPRESS_CODEC = 
-"mapreduce.output.fileoutputformat.compress.codec";
-public static final String COMPRESS_TYPE = "mapreduce.output.fileoutputformat.compress.type";
-public static final String OUTDIR = "mapreduce.output.fileoutputformat.outputdir";
+  private PathOutputCommitter committer = null;
+
+  /** Configuration option: should output be compressed? {@value}. */
+  public static final String COMPRESS =
+      "mapreduce.output.fileoutputformat.compress";
+
+  /** If compression is enabled, name of codec: {@value}. */
+  public static final String COMPRESS_CODEC =
+      "mapreduce.output.fileoutputformat.compress.codec";
+  /**
+   * Type of compression {@value}: NONE, RECORD, BLOCK.
+   * Generally only used in {@code SequenceFileOutputFormat}.
+   */
+  public static final String COMPRESS_TYPE =
+      "mapreduce.output.fileoutputformat.compress.type";
+
+  /** Destination directory of work: {@value}. */
+  public static final String OUTDIR =
+      "mapreduce.output.fileoutputformat.outputdir";
 
   @Deprecated
   public enum Counter {
@@ -110,14 +128,14 @@ public static final String OUTDIR = "mapreduce.output.fileoutputformat.outputdir
    */
   public static Class<? extends CompressionCodec> 
   getOutputCompressorClass(JobContext job, 
-		                       Class<? extends CompressionCodec> defaultValue) {
+                       Class<? extends CompressionCodec> defaultValue) {
     Class<? extends CompressionCodec> codecClass = defaultValue;
     Configuration conf = job.getConfiguration();
     String name = conf.get(FileOutputFormat.COMPRESS_CODEC);
     if (name != null) {
       try {
-        codecClass = 
-        	conf.getClassByName(name).asSubclass(CompressionCodec.class);
+        codecClass =
+            conf.getClassByName(name).asSubclass(CompressionCodec.class);
       } catch (ClassNotFoundException e) {
         throw new IllegalArgumentException("Compression codec " + name + 
                                            " was not found.", e);
@@ -219,9 +237,11 @@ public static final String OUTDIR = "mapreduce.output.fileoutputformat.outputdir
   public static Path getWorkOutputPath(TaskInputOutputContext<?,?,?,?> context
                                        ) throws IOException, 
                                                 InterruptedException {
-    FileOutputCommitter committer = (FileOutputCommitter) 
+    PathOutputCommitter committer = (PathOutputCommitter)
       context.getOutputCommitter();
-    return committer.getWorkPath();
+    Path workPath = committer.getWorkPath();
+    LOG.debug("Work path is {}", workPath);
+    return workPath;
   }
 
   /**
@@ -281,10 +301,17 @@ public static final String OUTDIR = "mapreduce.output.fileoutputformat.outputdir
    */
   public Path getDefaultWorkFile(TaskAttemptContext context,
                                  String extension) throws IOException{
-    FileOutputCommitter committer = 
-      (FileOutputCommitter) getOutputCommitter(context);
-    return new Path(committer.getWorkPath(), getUniqueFile(context, 
-      getOutputName(context), extension));
+    OutputCommitter c = getOutputCommitter(context);
+    Preconditions.checkState(c instanceof PathOutputCommitter,
+        "Committer %s is not a PathOutputCommitter", c);
+    Path workPath = ((PathOutputCommitter) c).getWorkPath();
+    Preconditions.checkNotNull(workPath,
+        "Null workPath returned by committer %s", c);
+    Path workFile = new Path(workPath,
+        getUniqueFile(context, getOutputName(context), extension));
+    LOG.debug("Work file for {} extension '{}' is {}",
+        context, extension, workFile);
+    return workFile;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5e99823/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PathOutputCommitter.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PathOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PathOutputCommitter.java
new file mode 100644
index 0000000..2df30ba
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PathOutputCommitter.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output;
+
+import java.io.IOException;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * A committer which somehow commits data written to a working directory
+ * to the final directory during the commit process. The reference
+ * implementation of this is the {@link FileOutputCommitter}.
+ *
+ * There are two constructors, both of which do nothing but long and
+ * validate their arguments.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public abstract class PathOutputCommitter extends OutputCommitter {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(PathOutputCommitter.class);
+
+  private final JobContext context;
+
+  /**
+   * Constructor for a task attempt.
+   * Subclasses should provide a public constructor with this signature.
+   * @param outputPath output path: may be null
+   * @param context task context
+   * @throws IOException IO problem
+   */
+  protected PathOutputCommitter(Path outputPath,
+      TaskAttemptContext context) throws IOException {
+    this.context = Preconditions.checkNotNull(context, "Null context");
+    LOG.debug("Creating committer with output path {} and task context"
+        + " {}", outputPath, context);
+  }
+
+  /**
+   * Constructor for a job attempt.
+   * Subclasses should provide a public constructor with this signature.
+   * @param outputPath output path: may be null
+   * @param context task context
+   * @throws IOException IO problem
+   */
+  protected PathOutputCommitter(Path outputPath,
+      JobContext context) throws IOException {
+    this.context = Preconditions.checkNotNull(context, "Null context");
+    LOG.debug("Creating committer with output path {} and job context"
+        + " {}", outputPath, context);
+  }
+
+  /**
+   * Get the directory that the task should write results into.
+   * Warning: there's no guarantee that this work path is on the same
+   * FS as the final output, or that it's visible across machines.
+   * @return the work directory
+   * @throws IOException IO problem
+   */
+  public abstract Path getWorkPath() throws IOException;
+
+  @Override
+  public String toString() {
+    return "PathOutputCommitter{context=" + context + '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5e99823/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java
index b9014ef..1696246 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java
@@ -451,5 +451,13 @@ public class JobContextImpl implements JobContext {
   public Credentials getCredentials() {
     return credentials;
   }
-  
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder(
+        "JobContextImpl{");
+    sb.append("jobId=").append(jobId);
+    sb.append('}');
+    return sb.toString();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5e99823/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/TaskAttemptContextImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/TaskAttemptContextImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/TaskAttemptContextImpl.java
index 333f57b..a622d3a 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/TaskAttemptContextImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/TaskAttemptContextImpl.java
@@ -118,4 +118,15 @@ public class TaskAttemptContextImpl extends JobContextImpl
   public float getProgress() {
     return reporter.getProgress();
   }
-}
\ No newline at end of file
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder(
+        "TaskAttemptContextImpl{");
+    sb.append(super.toString());
+    sb.append("; taskId=").append(taskId);
+    sb.append(", status='").append(status).append('\'');
+    sb.append('}');
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5e99823/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestPathOutputCommitter.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestPathOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestPathOutputCommitter.java
new file mode 100644
index 0000000..9cff82f
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestPathOutputCommitter.java
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.hadoop.security.Credentials;
+
+/**
+ * Test the path output committer binding to FileOutputFormat.
+ */
+public class TestPathOutputCommitter extends Assert {
+
+  @Test
+  public void testFileOutputCommitterOverrride() throws Throwable {
+    TaskContext context = new TaskContext();
+    Path workPath = new Path("file:///work");
+    context.setOutputCommitter(
+        new SimpleCommitter(new Path("/"), context, workPath));
+    assertEquals(workPath, FileOutputFormat.getWorkOutputPath(context));
+  }
+
+  @Test
+  public void testFileOutputCommitterNullWorkPath() throws Throwable {
+    TaskContext context = new TaskContext();
+    context.setOutputCommitter(
+        new SimpleCommitter(new Path("/"), context, null));
+    assertNull(FileOutputFormat.getWorkOutputPath(context));
+  }
+
+  private static class SimpleCommitter extends PathOutputCommitter {
+
+    private final Path workPath;
+
+    SimpleCommitter(Path outputPath,
+        TaskAttemptContext context, Path workPath) throws IOException {
+      super(outputPath, context);
+      this.workPath = workPath;
+    }
+
+    SimpleCommitter(Path outputPath,
+        JobContext context, Path workPath) throws IOException {
+      super(outputPath, context);
+      this.workPath = workPath;
+    }
+
+    @Override
+    public Path getWorkPath() throws IOException {
+      return workPath;
+    }
+
+    @Override
+    public void setupJob(JobContext jobContext) throws IOException {
+
+    }
+
+    @Override
+    public void setupTask(TaskAttemptContext taskContext) throws IOException {
+
+    }
+
+    @Override
+    public boolean needsTaskCommit(TaskAttemptContext taskContext)
+        throws IOException {
+      return false;
+    }
+
+    @Override
+    public void commitTask(TaskAttemptContext taskContext) throws IOException {
+
+    }
+
+    @Override
+    public void abortTask(TaskAttemptContext taskContext) throws IOException {
+
+    }
+  }
+
+  /**
+   * Stub task context.
+   */
+  public class TaskContext
+      implements TaskInputOutputContext<String, String, String, String> {
+
+    private OutputCommitter outputCommitter;
+
+    public void setOutputCommitter(OutputCommitter outputCommitter) {
+      this.outputCommitter = outputCommitter;
+    }
+
+    @Override
+    public OutputCommitter getOutputCommitter() {
+      return outputCommitter;
+    }
+
+    @Override
+    public boolean nextKeyValue() throws IOException, InterruptedException {
+      return false;
+    }
+
+    @Override
+    public String getCurrentKey() throws IOException, InterruptedException {
+      return null;
+    }
+
+    @Override
+    public String getCurrentValue() throws IOException, InterruptedException {
+      return null;
+    }
+
+    @Override
+    public void write(String key, String value)
+        throws IOException, InterruptedException {
+    }
+
+
+    @Override
+    public TaskAttemptID getTaskAttemptID() {
+      return null;
+    }
+
+    @Override
+    public void setStatus(String msg) {
+    }
+
+    @Override
+    public String getStatus() {
+      return null;
+    }
+
+    @Override
+    public float getProgress() {
+      return 0;
+    }
+
+    @Override
+    public Counter getCounter(Enum<?> counterName) {
+      return null;
+    }
+
+    @Override
+    public Counter getCounter(String groupName, String counterName) {
+      return null;
+    }
+
+    @Override
+    public Configuration getConfiguration() {
+      return null;
+    }
+
+    @Override
+    public Credentials getCredentials() {
+      return null;
+    }
+
+    @Override
+    public JobID getJobID() {
+      return null;
+    }
+
+    @Override
+    public int getNumReduceTasks() {
+      return 0;
+    }
+
+    @Override
+    public Path getWorkingDirectory() throws IOException {
+      return null;
+    }
+
+    @Override
+    public Class<?> getOutputKeyClass() {
+      return null;
+    }
+
+    @Override
+    public Class<?> getOutputValueClass() {
+      return null;
+    }
+
+    @Override
+    public Class<?> getMapOutputKeyClass() {
+      return null;
+    }
+
+    @Override
+    public Class<?> getMapOutputValueClass() {
+      return null;
+    }
+
+    @Override
+    public String getJobName() {
+      return null;
+    }
+
+    @Override
+    public Class<? extends InputFormat<?, ?>> getInputFormatClass()
+        throws ClassNotFoundException {
+      return null;
+    }
+
+    @Override
+    public Class<? extends Mapper<?, ?, ?, ?>> getMapperClass()
+        throws ClassNotFoundException {
+      return null;
+    }
+
+    @Override
+    public Class<? extends Reducer<?, ?, ?, ?>> getCombinerClass()
+        throws ClassNotFoundException {
+      return null;
+    }
+
+    @Override
+    public Class<? extends Reducer<?, ?, ?, ?>> getReducerClass()
+        throws ClassNotFoundException {
+      return null;
+    }
+
+    @Override
+    public Class<? extends OutputFormat<?, ?>> getOutputFormatClass()
+        throws ClassNotFoundException {
+      return null;
+    }
+
+    @Override
+    public Class<? extends Partitioner<?, ?>> getPartitionerClass()
+        throws ClassNotFoundException {
+      return null;
+    }
+
+    @Override
+    public RawComparator<?> getSortComparator() {
+      return null;
+    }
+
+    @Override
+    public String getJar() {
+      return null;
+    }
+
+    @Override
+    public RawComparator<?> getCombinerKeyGroupingComparator() {
+      return null;
+    }
+
+    @Override
+    public RawComparator<?> getGroupingComparator() {
+      return null;
+    }
+
+    @Override
+    public boolean getJobSetupCleanupNeeded() {
+      return false;
+    }
+
+    @Override
+    public boolean getTaskCleanupNeeded() {
+      return false;
+    }
+
+    @Override
+    public boolean getProfileEnabled() {
+      return false;
+    }
+
+    @Override
+    public String getProfileParams() {
+      return null;
+    }
+
+    @Override
+    public Configuration.IntegerRanges getProfileTaskRange(boolean isMap) {
+      return null;
+    }
+
+    @Override
+    public String getUser() {
+      return null;
+    }
+
+    @Override
+    public boolean getSymlink() {
+      return false;
+    }
+
+    @Override
+    public Path[] getArchiveClassPaths() {
+      return new Path[0];
+    }
+
+    @Override
+    public URI[] getCacheArchives() throws IOException {
+      return new URI[0];
+    }
+
+    @Override
+    public URI[] getCacheFiles() throws IOException {
+      return new URI[0];
+    }
+
+    @Override
+    public Path[] getLocalCacheArchives() throws IOException {
+      return new Path[0];
+    }
+
+    @Override
+    public Path[] getLocalCacheFiles() throws IOException {
+      return new Path[0];
+    }
+
+    @Override
+    public Path[] getFileClassPaths() {
+      return new Path[0];
+    }
+
+    @Override
+    public String[] getArchiveTimestamps() {
+      return new String[0];
+    }
+
+    @Override
+    public String[] getFileTimestamps() {
+      return new String[0];
+    }
+
+    @Override
+    public int getMaxMapAttempts() {
+      return 0;
+    }
+
+    @Override
+    public int getMaxReduceAttempts() {
+      return 0;
+    }
+
+    @Override
+    public void progress() {
+    }
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org