You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/09/12 22:09:40 UTC
git commit: TEZ-419. Change SimpleOutput to work with the new engine
APIs. (hitesh)
Updated Branches:
refs/heads/TEZ-398 8731540a9 -> 3d64024d9
TEZ-419. Change SimpleOutput to work with the new engine APIs. (hitesh)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/3d64024d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/3d64024d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/3d64024d
Branch: refs/heads/TEZ-398
Commit: 3d64024d962816edea3cf19eab8921f37c928455
Parents: 8731540
Author: Hitesh Shah <hi...@apache.org>
Authored: Thu Sep 12 13:09:19 2013 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Thu Sep 12 13:09:19 2013 -0700
----------------------------------------------------------------------
.../apache/tez/mapreduce/hadoop/MRConfig.java | 27 +--
.../newmapreduce/TaskAttemptContextImpl.java | 9 +-
.../tez/mapreduce/newoutput/SimpleOutput.java | 214 +++++++++++++++++++
3 files changed, 234 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3d64024d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRConfig.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRConfig.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRConfig.java
index cb1d59b..c93c675 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRConfig.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRConfig.java
@@ -21,8 +21,8 @@ import org.apache.hadoop.classification.InterfaceAudience;
/**
* Place holder for cluster level configuration keys.
- *
- * The keys should have "mapreduce.cluster." as the prefix.
+ *
+ * The keys should have "mapreduce.cluster." as the prefix.
*
*/
@InterfaceAudience.Private
@@ -32,7 +32,7 @@ public interface MRConfig {
public static final String TEMP_DIR = "mapreduce.cluster.temp.dir";
public static final String LOCAL_DIR = "mapreduce.cluster.local.dir";
public static final String MAPMEMORY_MB = "mapreduce.cluster.mapmemory.mb";
- public static final String REDUCEMEMORY_MB =
+ public static final String REDUCEMEMORY_MB =
"mapreduce.cluster.reducememory.mb";
public static final String MR_ACLS_ENABLED = "mapreduce.cluster.acls.enabled";
public static final String MR_ADMINS =
@@ -42,26 +42,26 @@ public interface MRConfig {
"mapreduce.cluster.permissions.supergroup";
//Delegation token related keys
- public static final String DELEGATION_KEY_UPDATE_INTERVAL_KEY =
+ public static final String DELEGATION_KEY_UPDATE_INTERVAL_KEY =
"mapreduce.cluster.delegation.key.update-interval";
- public static final long DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT =
+ public static final long DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT =
24*60*60*1000; // 1 day
- public static final String DELEGATION_TOKEN_RENEW_INTERVAL_KEY =
+ public static final String DELEGATION_TOKEN_RENEW_INTERVAL_KEY =
"mapreduce.cluster.delegation.token.renew-interval";
- public static final long DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT =
+ public static final long DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT =
24*60*60*1000; // 1 day
- public static final String DELEGATION_TOKEN_MAX_LIFETIME_KEY =
+ public static final String DELEGATION_TOKEN_MAX_LIFETIME_KEY =
"mapreduce.cluster.delegation.token.max-lifetime";
- public static final long DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT =
+ public static final long DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT =
7*24*60*60*1000; // 7 days
-
+
public static final String RESOURCE_CALCULATOR_PROCESS_TREE =
"mapreduce.job.process-tree.class";
- public static final String STATIC_RESOLUTIONS =
+ public static final String STATIC_RESOLUTIONS =
"mapreduce.job.net.static.resolutions";
public static final String MASTER_ADDRESS = "mapreduce.jobtracker.address";
- public static final String MASTER_USER_NAME =
+ public static final String MASTER_USER_NAME =
"mapreduce.jobtracker.kerberos.principal";
public static final String FRAMEWORK_NAME = "mapreduce.framework.name";
@@ -101,4 +101,7 @@ public interface MRConfig {
public static final int DEFAULT_MAPRED_IFILE_READAHEAD_BYTES =
4 * 1024 * 1024;
+
+ public static final String IS_MAP_PROCESSOR =
+ "tez.mapreduce.is_map_processor";
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3d64024d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/newmapreduce/TaskAttemptContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/newmapreduce/TaskAttemptContextImpl.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/newmapreduce/TaskAttemptContextImpl.java
index fa9d770..67aa628 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/newmapreduce/TaskAttemptContextImpl.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/newmapreduce/TaskAttemptContextImpl.java
@@ -43,12 +43,13 @@ public class TaskAttemptContextImpl
super(conf, new TaskAttemptID(
new TaskID(String.valueOf(taskContext.getApplicationId()
.getClusterTimestamp()), taskContext.getApplicationId().getId(),
- TaskType.MAP, taskContext.getTaskIndex()),
- taskContext.getAttemptNumber()));
+ isMap ? TaskType.MAP : TaskType.REDUCE,
+ taskContext.getTaskIndex()),
+ taskContext.getAttemptNumber()));
this.taskContext = taskContext;
-
+
}
-
+
@Override
public float getProgress() {
// TODO NEWTEZ Will this break anything ?
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3d64024d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newoutput/SimpleOutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newoutput/SimpleOutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newoutput/SimpleOutput.java
new file mode 100644
index 0000000..6634429
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newoutput/SimpleOutput.java
@@ -0,0 +1,214 @@
+package org.apache.tez.mapreduce.newoutput;
+
+import java.io.IOException;
+import java.text.NumberFormat;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystem.Statistics;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.engine.newapi.Event;
+import org.apache.tez.engine.newapi.KVWriter;
+import org.apache.tez.engine.newapi.LogicalOutput;
+import org.apache.tez.engine.newapi.TezOutputContext;
+import org.apache.tez.engine.newapi.Writer;
+import org.apache.tez.mapreduce.common.Utils;
+import org.apache.tez.mapreduce.hadoop.MRConfig;
+import org.apache.tez.mapreduce.hadoop.newmapred.MRReporter;
+import org.apache.tez.mapreduce.hadoop.newmapreduce.TaskAttemptContextImpl;
+
+public class SimpleOutput implements LogicalOutput {
+
+ private static final Log LOG = LogFactory.getLog(SimpleOutput.class);
+
+ private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
+ static {
+ NUMBER_FORMAT.setMinimumIntegerDigits(5);
+ NUMBER_FORMAT.setGroupingUsed(false);
+ }
+
+ private TezOutputContext outputContext;
+
+ private JobConf jobConf;
+
+ boolean useNewApi;
+
+ @SuppressWarnings("rawtypes")
+ org.apache.hadoop.mapreduce.OutputFormat newOutputFormat;
+ @SuppressWarnings("rawtypes")
+ org.apache.hadoop.mapreduce.RecordWriter newRecordWriter;
+
+ @SuppressWarnings("rawtypes")
+ org.apache.hadoop.mapred.OutputFormat oldOutputFormat;
+ @SuppressWarnings("rawtypes")
+ org.apache.hadoop.mapred.RecordWriter oldRecordWriter;
+
+ private TezCounter outputRecordCounter;
+ private TezCounter fileOutputByteCounter;
+ private List<Statistics> fsStats;
+
+ private org.apache.hadoop.mapreduce.TaskAttemptContext taskAttemptContext;
+
+ private boolean isMapperOutput;
+
+ @Override
+ public List<Event> initialize(TezOutputContext outputContext)
+ throws IOException {
+ LOG.info("Initializing Simple Output");
+ this.outputContext = outputContext;
+ Configuration conf = TezUtils.createConfFromUserPayload(
+ outputContext.getUserPayload());
+ this.jobConf = new JobConf(conf);
+ this.useNewApi = this.jobConf.getUseNewMapper();
+ this.isMapperOutput = jobConf.getBoolean(MRConfig.IS_MAP_PROCESSOR,
+ false);
+
+ outputRecordCounter = outputContext.getCounters().findCounter(
+ TaskCounter.MAP_OUTPUT_RECORDS);
+ fileOutputByteCounter = outputContext.getCounters().findCounter(
+ FileOutputFormatCounter.BYTES_WRITTEN);
+
+ if (useNewApi) {
+ taskAttemptContext = createTaskAttemptContext();
+ try {
+ newOutputFormat =
+ ReflectionUtils.newInstance(
+ taskAttemptContext.getOutputFormatClass(), jobConf);
+ } catch (ClassNotFoundException cnfe) {
+ throw new IOException(cnfe);
+ }
+
+ List<Statistics> matchedStats = null;
+ if (newOutputFormat instanceof
+ org.apache.hadoop.mapreduce.lib.output.FileOutputFormat) {
+ matchedStats =
+ Utils.getFsStatistics(
+ org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
+ .getOutputPath(taskAttemptContext),
+ jobConf);
+ }
+ fsStats = matchedStats;
+
+ long bytesOutPrev = getOutputBytes();
+ try {
+ newRecordWriter =
+ newOutputFormat.getRecordWriter(taskAttemptContext);
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted while creating record writer", e);
+ }
+ long bytesOutCurr = getOutputBytes();
+ fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
+ } else {
+ oldOutputFormat = jobConf.getOutputFormat();
+
+ List<Statistics> matchedStats = null;
+ if (oldOutputFormat instanceof org.apache.hadoop.mapred.FileOutputFormat) {
+ matchedStats =
+ Utils.getFsStatistics(
+ org.apache.hadoop.mapred.FileOutputFormat.getOutputPath(
+ jobConf),
+ jobConf);
+ }
+ fsStats = matchedStats;
+
+ FileSystem fs = FileSystem.get(jobConf);
+ String finalName = getOutputName();
+
+ long bytesOutPrev = getOutputBytes();
+ oldRecordWriter =
+ oldOutputFormat.getRecordWriter(
+ fs, jobConf, finalName, new MRReporter(outputContext));
+ long bytesOutCurr = getOutputBytes();
+ fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
+ }
+
+ LOG.info("Initialized Simple Output"
+ + ", using_new_api" + useNewApi);
+ return null;
+ }
+
+ private TaskAttemptContext createTaskAttemptContext() {
+ return new TaskAttemptContextImpl(this.jobConf, outputContext,
+ isMapperOutput);
+ }
+
+ private long getOutputBytes() {
+ if (fsStats == null) return 0;
+ long bytesWritten = 0;
+ for (Statistics stat: fsStats) {
+ bytesWritten = bytesWritten + stat.getBytesWritten();
+ }
+ return bytesWritten;
+ }
+
+ private String getOutputName() {
+ return "part-" + NUMBER_FORMAT.format(outputContext.getTaskIndex());
+ }
+
+ @Override
+ public Writer getWriter() throws IOException {
+ return new KVWriter() {
+ private final boolean useNewWriter = useNewApi;
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void write(Object key, Object value) throws IOException {
+ long bytesOutPrev = getOutputBytes();
+ if (useNewWriter) {
+ try {
+ newRecordWriter.write(key, value);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted while writing next key-value",e);
+ }
+ } else {
+ oldRecordWriter.write(key, value);
+ }
+
+ long bytesOutCurr = getOutputBytes();
+ fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
+ outputRecordCounter.increment(1);
+ }
+ };
+ }
+
+ @Override
+ public void handleEvents(List<Event> outputEvents) {
+ // Not expecting any events at the moment.
+ }
+
+ @Override
+ public List<Event> close() throws IOException {
+ LOG.info("Closing Simple Output");
+ long bytesOutPrev = getOutputBytes();
+ if (useNewApi) {
+ try {
+ newRecordWriter.close(taskAttemptContext);
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted while closing record writer", e);
+ }
+ } else {
+ oldRecordWriter.close(null);
+ }
+ long bytesOutCurr = getOutputBytes();
+ fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
+ LOG.info("Closed Simple Output");
+ return null;
+ }
+
+ @Override
+ public void setNumPhysicalOutputs(int numOutputs) {
+ // Nothing to do for now
+ }
+
+}