You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/09/16 22:36:40 UTC
git commit: TEZ-449. Several bug fixes in new api implementation
(part of TEZ-398). (sseth)
Updated Branches:
refs/heads/TEZ-398 7df571a4e -> 6e66c5e7b
TEZ-449. Several bug fixes in new api implementation (part of TEZ-398).
(sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/6e66c5e7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/6e66c5e7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/6e66c5e7
Branch: refs/heads/TEZ-398
Commit: 6e66c5e7b8f75258736b0782f9a7357938a5c51a
Parents: 7df571a
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Sep 16 13:36:19 2013 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Sep 16 13:36:19 2013 -0700
----------------------------------------------------------------------
.../java/org/apache/tez/engine/newapi/TezTaskContext.java | 2 +-
.../java/org/apache/tez/engine/newapi/impl/TaskSpec.java | 9 ++++++++-
.../hadoop/mapreduce/split/SplitMetaInfoReaderTez.java | 8 ++++++--
.../hadoop/newmapreduce/TaskAttemptContextImpl.java | 3 ++-
.../java/org/apache/tez/mapreduce/newinput/SimpleInput.java | 3 ++-
5 files changed, 19 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6e66c5e7/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezTaskContext.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezTaskContext.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezTaskContext.java
index 341377a..81313aa 100644
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezTaskContext.java
+++ b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezTaskContext.java
@@ -84,7 +84,7 @@ public interface TezTaskContext {
* @return an array of work dirs
*/
public String[] getWorkDirs();
-
+
/**
* Returns an identifier which is unique to the specific Input, Processor or
* Output
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6e66c5e7/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TaskSpec.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TaskSpec.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TaskSpec.java
index 03a26c3..8290e30 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TaskSpec.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TaskSpec.java
@@ -41,12 +41,13 @@ public class TaskSpec implements Writable {
public TaskSpec() {
}
+ // TODO NEWTEZ Remove user
public TaskSpec(TezTaskAttemptID taskAttemptID, String user,
String vertexName, ProcessorDescriptor processorDescriptor,
List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList) {
this.taskAttemptId = taskAttemptID;
- this.user = user;
this.vertexName = vertexName;
+ this.user = user;
this.processorDescriptor = processorDescriptor;
this.inputSpecList = inputSpecList;
this.outputSpecList = outputSpecList;
@@ -78,6 +79,8 @@ public class TaskSpec implements Writable {
@Override
public void write(DataOutput out) throws IOException {
+ taskAttemptId.write(out);
+ out.writeUTF(vertexName);
byte[] procDesc =
DagTypeConverters.convertToDAGPlan(processorDescriptor).toByteArray();
out.writeInt(procDesc.length);
@@ -94,6 +97,9 @@ public class TaskSpec implements Writable {
@Override
public void readFields(DataInput in) throws IOException {
+ taskAttemptId = new TezTaskAttemptID();
+ taskAttemptId.readFields(in);
+ vertexName = in.readUTF();
int procDescLength = in.readInt();
// TODO at least 3 buffer copies here. Need to convert this to full PB
// TEZ-305
@@ -121,6 +127,7 @@ public class TaskSpec implements Writable {
@Override
public String toString() {
StringBuffer sb = new StringBuffer();
+ sb.append("TaskAttemptID:" + taskAttemptId);
sb.append("processorName=" + processorDescriptor.getClassName()
+ ", inputSpecListSize=" + inputSpecList.size()
+ ", outputSpecListSize=" + outputSpecList.size());
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6e66c5e7/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java
index 0fcfe65..0705796 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java
@@ -57,8 +57,12 @@ public class SplitMetaInfoReaderTez {
MRJobConfig.SPLIT_METAINFO_MAXSIZE,
MRJobConfig.DEFAULT_SPLIT_METAINFO_MAXSIZE);
+ // TODO NEWTEZ Figure out how this can be improved. i.e. access from context instead of setting in conf ?
+ String basePath = conf.get(TezJobConfig.TASK_LOCAL_RESOURCE_DIR, ".");
+ LOG.info("Attempting to find splits in dir: " + basePath);
+
Path metaSplitFile = new Path(
- conf.get(TezJobConfig.TASK_LOCAL_RESOURCE_DIR),
+ basePath,
MRJobConfig.JOB_SPLIT_METAINFO);
String jobSplitFile = MRJobConfig.JOB_SPLIT;
@@ -91,7 +95,7 @@ public class SplitMetaInfoReaderTez {
JobSplit.SplitMetaInfo splitMetaInfo = new JobSplit.SplitMetaInfo();
splitMetaInfo.readFields(in);
JobSplit.TaskSplitIndex splitIndex = new JobSplit.TaskSplitIndex(
- new Path(conf.get(TezJobConfig.TASK_LOCAL_RESOURCE_DIR), jobSplitFile)
+ new Path(basePath, jobSplitFile)
.toUri().toString(), splitMetaInfo.getStartOffset());
allSplitMetaInfo[i] = new JobSplit.TaskSplitMetaInfo(splitIndex,
splitMetaInfo.getLocations(), splitMetaInfo.getInputDataLength());
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6e66c5e7/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/newmapreduce/TaskAttemptContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/newmapreduce/TaskAttemptContextImpl.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/newmapreduce/TaskAttemptContextImpl.java
index 549841b..b4f673b 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/newmapreduce/TaskAttemptContextImpl.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/newmapreduce/TaskAttemptContextImpl.java
@@ -20,9 +20,9 @@ package org.apache.tez.mapreduce.hadoop.newmapreduce;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
-import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.tez.engine.newapi.TezTaskContext;
import org.apache.tez.mapreduce.common.Utils;
@@ -34,6 +34,7 @@ import org.apache.tez.mapreduce.common.Utils;
@InterfaceStability.Unstable
public class TaskAttemptContextImpl
extends org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl {
+
private TezTaskContext taskContext;
// FIXME we need to use DAG Id but we are using App Id
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6e66c5e7/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newinput/SimpleInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newinput/SimpleInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newinput/SimpleInput.java
index 9969b81..73d8cc7 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newinput/SimpleInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newinput/SimpleInput.java
@@ -98,9 +98,10 @@ public class SimpleInput implements LogicalInput {
@Override
public List<Event> initialize(TezInputContext inputContext) throws IOException {
+ this.inputContext = inputContext;
Configuration conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload());
this.jobConf = new JobConf(conf);
-
+
// Read split information.
TaskSplitMetaInfo[] allMetaInfo = readSplits(conf);
TaskSplitMetaInfo thisTaskMetaInfo = allMetaInfo[inputContext.getTaskIndex()];