You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/09/16 22:36:40 UTC

git commit: TEZ-449. Several bug fixes in new api implementation (part of TEZ-398). (sseth)

Updated Branches:
  refs/heads/TEZ-398 7df571a4e -> 6e66c5e7b


TEZ-449. Several bug fixes in new api implementation (part of TEZ-398).
(sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/6e66c5e7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/6e66c5e7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/6e66c5e7

Branch: refs/heads/TEZ-398
Commit: 6e66c5e7b8f75258736b0782f9a7357938a5c51a
Parents: 7df571a
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Sep 16 13:36:19 2013 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Sep 16 13:36:19 2013 -0700

----------------------------------------------------------------------
 .../java/org/apache/tez/engine/newapi/TezTaskContext.java   | 2 +-
 .../java/org/apache/tez/engine/newapi/impl/TaskSpec.java    | 9 ++++++++-
 .../hadoop/mapreduce/split/SplitMetaInfoReaderTez.java      | 8 ++++++--
 .../hadoop/newmapreduce/TaskAttemptContextImpl.java         | 3 ++-
 .../java/org/apache/tez/mapreduce/newinput/SimpleInput.java | 3 ++-
 5 files changed, 19 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6e66c5e7/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezTaskContext.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezTaskContext.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezTaskContext.java
index 341377a..81313aa 100644
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezTaskContext.java
+++ b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezTaskContext.java
@@ -84,7 +84,7 @@ public interface TezTaskContext {
    * @return an array of work dirs
    */
   public String[] getWorkDirs();
-  
+
   /**
    * Returns an identifier which is unique to the specific Input, Processor or
    * Output

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6e66c5e7/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TaskSpec.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TaskSpec.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TaskSpec.java
index 03a26c3..8290e30 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TaskSpec.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TaskSpec.java
@@ -41,12 +41,13 @@ public class TaskSpec implements Writable {
   public TaskSpec() {
   }
 
+  // TODO NEWTEZ Remove user
   public TaskSpec(TezTaskAttemptID taskAttemptID, String user,
       String vertexName, ProcessorDescriptor processorDescriptor,
       List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList) {
     this.taskAttemptId = taskAttemptID;
-    this.user = user;
     this.vertexName = vertexName;
+    this.user = user;
     this.processorDescriptor = processorDescriptor;
     this.inputSpecList = inputSpecList;
     this.outputSpecList = outputSpecList;
@@ -78,6 +79,8 @@ public class TaskSpec implements Writable {
 
   @Override
   public void write(DataOutput out) throws IOException {
+    taskAttemptId.write(out);
+    out.writeUTF(vertexName);
     byte[] procDesc =
         DagTypeConverters.convertToDAGPlan(processorDescriptor).toByteArray();
     out.writeInt(procDesc.length);
@@ -94,6 +97,9 @@ public class TaskSpec implements Writable {
 
   @Override
   public void readFields(DataInput in) throws IOException {
+    taskAttemptId = new TezTaskAttemptID();
+    taskAttemptId.readFields(in);
+    vertexName = in.readUTF();
     int procDescLength = in.readInt();
     // TODO at least 3 buffer copies here. Need to convert this to full PB
     // TEZ-305
@@ -121,6 +127,7 @@ public class TaskSpec implements Writable {
   @Override
   public String toString() {
     StringBuffer sb = new StringBuffer();
+    sb.append("TaskAttemptID:" + taskAttemptId);
     sb.append("processorName=" + processorDescriptor.getClassName()
         + ", inputSpecListSize=" + inputSpecList.size()
         + ", outputSpecListSize=" + outputSpecList.size());

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6e66c5e7/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java
index 0fcfe65..0705796 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java
@@ -57,8 +57,12 @@ public class SplitMetaInfoReaderTez {
         MRJobConfig.SPLIT_METAINFO_MAXSIZE,
         MRJobConfig.DEFAULT_SPLIT_METAINFO_MAXSIZE);
 
+    // TODO NEWTEZ Figure out how this can be improved. i.e. access from context instead of setting in conf ?
+    String basePath = conf.get(TezJobConfig.TASK_LOCAL_RESOURCE_DIR, ".");
+    LOG.info("Attempting to find splits in dir: " + basePath);
+    
     Path metaSplitFile = new Path(
-        conf.get(TezJobConfig.TASK_LOCAL_RESOURCE_DIR),
+        basePath,
         MRJobConfig.JOB_SPLIT_METAINFO);
     String jobSplitFile = MRJobConfig.JOB_SPLIT;
 
@@ -91,7 +95,7 @@ public class SplitMetaInfoReaderTez {
       JobSplit.SplitMetaInfo splitMetaInfo = new JobSplit.SplitMetaInfo();
       splitMetaInfo.readFields(in);
       JobSplit.TaskSplitIndex splitIndex = new JobSplit.TaskSplitIndex(
-          new Path(conf.get(TezJobConfig.TASK_LOCAL_RESOURCE_DIR), jobSplitFile)
+          new Path(basePath, jobSplitFile)
               .toUri().toString(), splitMetaInfo.getStartOffset());
       allSplitMetaInfo[i] = new JobSplit.TaskSplitMetaInfo(splitIndex,
           splitMetaInfo.getLocations(), splitMetaInfo.getInputDataLength());

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6e66c5e7/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/newmapreduce/TaskAttemptContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/newmapreduce/TaskAttemptContextImpl.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/newmapreduce/TaskAttemptContextImpl.java
index 549841b..b4f673b 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/newmapreduce/TaskAttemptContextImpl.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/newmapreduce/TaskAttemptContextImpl.java
@@ -20,9 +20,9 @@ package org.apache.tez.mapreduce.hadoop.newmapreduce;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
-import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.tez.engine.newapi.TezTaskContext;
 import org.apache.tez.mapreduce.common.Utils;
@@ -34,6 +34,7 @@ import org.apache.tez.mapreduce.common.Utils;
 @InterfaceStability.Unstable
 public class TaskAttemptContextImpl
        extends org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl {
+  
   private TezTaskContext taskContext;
 
   // FIXME we need to use DAG Id but we are using App Id

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6e66c5e7/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newinput/SimpleInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newinput/SimpleInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newinput/SimpleInput.java
index 9969b81..73d8cc7 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newinput/SimpleInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newinput/SimpleInput.java
@@ -98,9 +98,10 @@ public class SimpleInput implements LogicalInput {
 
   @Override
   public List<Event> initialize(TezInputContext inputContext) throws IOException {
+    this.inputContext = inputContext;
     Configuration conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload());
     this.jobConf = new JobConf(conf);
-    
+
     // Read split information.
     TaskSplitMetaInfo[] allMetaInfo = readSplits(conf);
     TaskSplitMetaInfo thisTaskMetaInfo = allMetaInfo[inputContext.getTaskIndex()];