You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/09/12 22:09:40 UTC

git commit: TEZ-419. Change SimpleOutput to work with the new engine APIs. (hitesh)

Updated Branches:
  refs/heads/TEZ-398 8731540a9 -> 3d64024d9


TEZ-419. Change SimpleOutput to work with the new engine APIs. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/3d64024d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/3d64024d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/3d64024d

Branch: refs/heads/TEZ-398
Commit: 3d64024d962816edea3cf19eab8921f37c928455
Parents: 8731540
Author: Hitesh Shah <hi...@apache.org>
Authored: Thu Sep 12 13:09:19 2013 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Thu Sep 12 13:09:19 2013 -0700

----------------------------------------------------------------------
 .../apache/tez/mapreduce/hadoop/MRConfig.java   |  27 +--
 .../newmapreduce/TaskAttemptContextImpl.java    |   9 +-
 .../tez/mapreduce/newoutput/SimpleOutput.java   | 214 +++++++++++++++++++
 3 files changed, 234 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3d64024d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRConfig.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRConfig.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRConfig.java
index cb1d59b..c93c675 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRConfig.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRConfig.java
@@ -21,8 +21,8 @@ import org.apache.hadoop.classification.InterfaceAudience;
 
 /**
  * Place holder for cluster level configuration keys.
- * 
- * The keys should have "mapreduce.cluster." as the prefix. 
+ *
+ * The keys should have "mapreduce.cluster." as the prefix.
  *
  */
 @InterfaceAudience.Private
@@ -32,7 +32,7 @@ public interface MRConfig {
   public static final String TEMP_DIR = "mapreduce.cluster.temp.dir";
   public static final String LOCAL_DIR = "mapreduce.cluster.local.dir";
   public static final String MAPMEMORY_MB = "mapreduce.cluster.mapmemory.mb";
-  public static final String REDUCEMEMORY_MB = 
+  public static final String REDUCEMEMORY_MB =
     "mapreduce.cluster.reducememory.mb";
   public static final String MR_ACLS_ENABLED = "mapreduce.cluster.acls.enabled";
   public static final String MR_ADMINS =
@@ -42,26 +42,26 @@ public interface MRConfig {
     "mapreduce.cluster.permissions.supergroup";
 
   //Delegation token related keys
-  public static final String  DELEGATION_KEY_UPDATE_INTERVAL_KEY = 
+  public static final String  DELEGATION_KEY_UPDATE_INTERVAL_KEY =
     "mapreduce.cluster.delegation.key.update-interval";
-  public static final long    DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT = 
+  public static final long    DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT =
     24*60*60*1000; // 1 day
-  public static final String  DELEGATION_TOKEN_RENEW_INTERVAL_KEY = 
+  public static final String  DELEGATION_TOKEN_RENEW_INTERVAL_KEY =
     "mapreduce.cluster.delegation.token.renew-interval";
-  public static final long    DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT = 
+  public static final long    DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT =
     24*60*60*1000;  // 1 day
-  public static final String  DELEGATION_TOKEN_MAX_LIFETIME_KEY = 
+  public static final String  DELEGATION_TOKEN_MAX_LIFETIME_KEY =
     "mapreduce.cluster.delegation.token.max-lifetime";
-  public static final long    DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT = 
+  public static final long    DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT =
     7*24*60*60*1000; // 7 days
-  
+
   public static final String RESOURCE_CALCULATOR_PROCESS_TREE =
     "mapreduce.job.process-tree.class";
-  public static final String STATIC_RESOLUTIONS = 
+  public static final String STATIC_RESOLUTIONS =
     "mapreduce.job.net.static.resolutions";
 
   public static final String MASTER_ADDRESS  = "mapreduce.jobtracker.address";
-  public static final String MASTER_USER_NAME = 
+  public static final String MASTER_USER_NAME =
     "mapreduce.jobtracker.kerberos.principal";
 
   public static final String FRAMEWORK_NAME  = "mapreduce.framework.name";
@@ -101,4 +101,7 @@ public interface MRConfig {
 
   public static final int DEFAULT_MAPRED_IFILE_READAHEAD_BYTES =
     4 * 1024 * 1024;
+
+  public static final String IS_MAP_PROCESSOR =
+      "tez.mapreduce.is_map_processor";
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3d64024d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/newmapreduce/TaskAttemptContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/newmapreduce/TaskAttemptContextImpl.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/newmapreduce/TaskAttemptContextImpl.java
index fa9d770..67aa628 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/newmapreduce/TaskAttemptContextImpl.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/newmapreduce/TaskAttemptContextImpl.java
@@ -43,12 +43,13 @@ public class TaskAttemptContextImpl
     super(conf, new TaskAttemptID(
         new TaskID(String.valueOf(taskContext.getApplicationId()
             .getClusterTimestamp()), taskContext.getApplicationId().getId(),
-            TaskType.MAP, taskContext.getTaskIndex()),
-        taskContext.getAttemptNumber()));
+            isMap ? TaskType.MAP : TaskType.REDUCE,
+            taskContext.getTaskIndex()),
+            taskContext.getAttemptNumber()));
     this.taskContext = taskContext;
-    
+
   }
-  
+
   @Override
   public float getProgress() {
     // TODO NEWTEZ Will this break anything ?

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3d64024d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newoutput/SimpleOutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newoutput/SimpleOutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newoutput/SimpleOutput.java
new file mode 100644
index 0000000..6634429
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newoutput/SimpleOutput.java
@@ -0,0 +1,214 @@
+package org.apache.tez.mapreduce.newoutput;
+
+import java.io.IOException;
+import java.text.NumberFormat;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystem.Statistics;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.engine.newapi.Event;
+import org.apache.tez.engine.newapi.KVWriter;
+import org.apache.tez.engine.newapi.LogicalOutput;
+import org.apache.tez.engine.newapi.TezOutputContext;
+import org.apache.tez.engine.newapi.Writer;
+import org.apache.tez.mapreduce.common.Utils;
+import org.apache.tez.mapreduce.hadoop.MRConfig;
+import org.apache.tez.mapreduce.hadoop.newmapred.MRReporter;
+import org.apache.tez.mapreduce.hadoop.newmapreduce.TaskAttemptContextImpl;
+
+public class SimpleOutput implements LogicalOutput {
+
+  private static final Log LOG = LogFactory.getLog(SimpleOutput.class);
+
+  private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
+  static {
+    NUMBER_FORMAT.setMinimumIntegerDigits(5);
+    NUMBER_FORMAT.setGroupingUsed(false);
+  }
+
+  private TezOutputContext outputContext;
+
+  private JobConf jobConf;
+
+  boolean useNewApi;
+
+  @SuppressWarnings("rawtypes")
+  org.apache.hadoop.mapreduce.OutputFormat newOutputFormat;
+  @SuppressWarnings("rawtypes")
+  org.apache.hadoop.mapreduce.RecordWriter newRecordWriter;
+
+  @SuppressWarnings("rawtypes")
+  org.apache.hadoop.mapred.OutputFormat oldOutputFormat;
+  @SuppressWarnings("rawtypes")
+  org.apache.hadoop.mapred.RecordWriter oldRecordWriter;
+
+  private TezCounter outputRecordCounter;
+  private TezCounter fileOutputByteCounter;
+  private List<Statistics> fsStats;
+
+  private org.apache.hadoop.mapreduce.TaskAttemptContext taskAttemptContext;
+
+  private boolean isMapperOutput;
+
+  @Override
+  public List<Event> initialize(TezOutputContext outputContext)
+      throws IOException {
+    LOG.info("Initializing Simple Output");
+    this.outputContext = outputContext;
+    Configuration conf = TezUtils.createConfFromUserPayload(
+        outputContext.getUserPayload());
+    this.jobConf = new JobConf(conf);
+    this.useNewApi = this.jobConf.getUseNewMapper();
+    this.isMapperOutput = jobConf.getBoolean(MRConfig.IS_MAP_PROCESSOR,
+        false);
+
+    outputRecordCounter = outputContext.getCounters().findCounter(
+        TaskCounter.MAP_OUTPUT_RECORDS);
+    fileOutputByteCounter = outputContext.getCounters().findCounter(
+        FileOutputFormatCounter.BYTES_WRITTEN);
+
+    if (useNewApi) {
+      taskAttemptContext = createTaskAttemptContext();
+      try {
+        newOutputFormat =
+            ReflectionUtils.newInstance(
+                taskAttemptContext.getOutputFormatClass(), jobConf);
+      } catch (ClassNotFoundException cnfe) {
+        throw new IOException(cnfe);
+      }
+
+      List<Statistics> matchedStats = null;
+      if (newOutputFormat instanceof
+          org.apache.hadoop.mapreduce.lib.output.FileOutputFormat) {
+        matchedStats =
+            Utils.getFsStatistics(
+                org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
+                    .getOutputPath(taskAttemptContext),
+                jobConf);
+      }
+      fsStats = matchedStats;
+
+      long bytesOutPrev = getOutputBytes();
+      try {
+        newRecordWriter =
+            newOutputFormat.getRecordWriter(taskAttemptContext);
+      } catch (InterruptedException e) {
+        throw new IOException("Interrupted while creating record writer", e);
+      }
+      long bytesOutCurr = getOutputBytes();
+      fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
+    } else {
+      oldOutputFormat = jobConf.getOutputFormat();
+
+      List<Statistics> matchedStats = null;
+      if (oldOutputFormat instanceof org.apache.hadoop.mapred.FileOutputFormat) {
+        matchedStats =
+            Utils.getFsStatistics(
+                org.apache.hadoop.mapred.FileOutputFormat.getOutputPath(
+                    jobConf),
+                jobConf);
+      }
+      fsStats = matchedStats;
+
+      FileSystem fs = FileSystem.get(jobConf);
+      String finalName = getOutputName();
+
+      long bytesOutPrev = getOutputBytes();
+      oldRecordWriter =
+          oldOutputFormat.getRecordWriter(
+              fs, jobConf, finalName, new MRReporter(outputContext));
+      long bytesOutCurr = getOutputBytes();
+      fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
+    }
+
+    LOG.info("Initialized Simple Output"
+        + ", using_new_api" + useNewApi);
+    return null;
+  }
+
+  private TaskAttemptContext createTaskAttemptContext() {
+    return new TaskAttemptContextImpl(this.jobConf, outputContext,
+        isMapperOutput);
+  }
+
+  private long getOutputBytes() {
+    if (fsStats == null) return 0;
+    long bytesWritten = 0;
+    for (Statistics stat: fsStats) {
+      bytesWritten = bytesWritten + stat.getBytesWritten();
+    }
+    return bytesWritten;
+  }
+
+  private String getOutputName() {
+    return "part-" + NUMBER_FORMAT.format(outputContext.getTaskIndex());
+  }
+
+  @Override
+  public Writer getWriter() throws IOException {
+    return new KVWriter() {
+      private final boolean useNewWriter = useNewApi;
+
+      @SuppressWarnings("unchecked")
+      @Override
+      public void write(Object key, Object value) throws IOException {
+        long bytesOutPrev = getOutputBytes();
+        if (useNewWriter) {
+          try {
+            newRecordWriter.write(key, value);
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new IOException("Interrupted while writing next key-value",e);
+          }
+        } else {
+          oldRecordWriter.write(key, value);
+        }
+
+        long bytesOutCurr = getOutputBytes();
+        fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
+        outputRecordCounter.increment(1);
+      }
+    };
+  }
+
+  @Override
+  public void handleEvents(List<Event> outputEvents) {
+    // Not expecting any events at the moment.
+  }
+
+  @Override
+  public List<Event> close() throws IOException {
+    LOG.info("Closing Simple Output");
+    long bytesOutPrev = getOutputBytes();
+    if (useNewApi) {
+      try {
+        newRecordWriter.close(taskAttemptContext);
+      } catch (InterruptedException e) {
+        throw new IOException("Interrupted while closing record writer", e);
+      }
+    } else {
+      oldRecordWriter.close(null);
+    }
+    long bytesOutCurr = getOutputBytes();
+    fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
+    LOG.info("Closed Simple Output");
+    return null;
+  }
+
+  @Override
+  public void setNumPhysicalOutputs(int numOutputs) {
+    // Nothing to do for now
+  }
+
+}