You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/04/22 20:29:00 UTC

svn commit: r1470653 [1/2] - in /incubator/tez/branches/TEZ-1: tez-common/src/main/java/org/apache/tez/common/ tez-dag/src/main/java/org/apache/hadoop/mapred/ tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ tez-dag/src/main/java/org/apache/tez/d...

Author: sseth
Date: Mon Apr 22 18:28:58 2013
New Revision: 1470653

URL: http://svn.apache.org/r1470653
Log:
TEZ-49. Split TezTask into a piece for static data, and one for Runtime access. (sseth)

Added:
    incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/TezTaskContext.java
    incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/common/RunningTaskContext.java
    incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/common/TezEngineTaskContext.java
Removed:
    incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/TezTask.java
    incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/common/TezEngineTask.java
Modified:
    incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
    incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
    incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java
    incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java
    incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
    incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerTask.java
    incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/common/ContainerTask.java
    incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/LocalShuffle.java
    incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java
    incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/server/ShuffleHandler.java
    incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/SortingOutput.java
    incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
    incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/PipelinedSorter.java
    incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java
    incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/InMemoryShuffleSorter.java
    incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/input/LocalMergedInput.java
    incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java
    incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/output/InMemorySortedOutput.java
    incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java
    incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java
    incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/InputFactory.java
    incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/OutputFactory.java
    incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/ProcessorFactory.java
    incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/TezEngineFactory.java
    incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/TezEngineFactoryImpl.java
    incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
    incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/SimpleInput.java
    incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/SimpleOutput.java
    incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
    incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/TezTaskReporterImpl.java
    incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
    incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
    incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestTaskModules.java
    incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
    incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java

Added: incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/TezTaskContext.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/TezTaskContext.java?rev=1470653&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/TezTaskContext.java (added)
+++ incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/TezTaskContext.java Mon Apr 22 18:28:58 2013
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.common;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.tez.engine.records.TezDAGID;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+
+public abstract class TezTaskContext implements Writable {
+
+  // Serialized Fields
+  private TezTaskAttemptID taskAttemptId;
+  private String user;
+  private String jobName;
+  private String vertexName;
+
+  public TezTaskContext() {
+  }
+
+  public TezTaskContext(TezTaskAttemptID taskAttemptID, String user, String jobName,
+      String vertexName) {
+    this.taskAttemptId = taskAttemptID;
+    this.user = user;
+    this.jobName = jobName;
+    this.vertexName = vertexName;
+  }
+
+  public TezTaskAttemptID getTaskAttemptId() {
+    return taskAttemptId;
+  }
+
+  
+
+  public TezDAGID getDAGID() {
+    return taskAttemptId.getTaskID().getVertexID().getDAGId();
+  }
+
+  public String getUser() {
+    return user;
+  }
+
+  public String getJobName() {
+    return jobName;
+  }
+  
+  public String getVertexName() {
+    return this.vertexName;
+  }
+
+  public void statusUpdate() throws IOException, InterruptedException {
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    taskAttemptId.write(out);
+    Text.writeString(out, user);
+    Text.writeString(out, jobName);
+    Text.writeString(out, vertexName);
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    taskAttemptId = TezTaskAttemptID.read(in);
+    user = Text.readString(in);
+    jobName = Text.readString(in);
+    vertexName = Text.readString(in);
+  }
+
+}

Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java?rev=1470653&r1=1470652&r2=1470653&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java Mon Apr 22 18:28:58 2013
@@ -65,7 +65,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.log4j.LogManager;
 import org.apache.tez.common.ContainerTask;
-import org.apache.tez.common.TezEngineTask;
+import org.apache.tez.common.TezEngineTaskContext;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.engine.api.Task;
 import org.apache.tez.engine.records.TezTaskAttemptID;
@@ -139,7 +139,7 @@ public class YarnTezDagChild {
     if (LOG.isDebugEnabled()) {
       LOG.debug("PID, containerId: " + pid + ", " + containerId);
     }
-    TezEngineTask taskContext = null;
+    TezEngineTaskContext taskContext = null;
     ContainerTask containerTask = null;
     UserGroupInformation childUGI = null;
     TezTaskAttemptID taskAttemptId = null;
@@ -402,7 +402,7 @@ public class YarnTezDagChild {
   }
 
   private static Task createAndConfigureTezTask(
-      TezEngineTask taskContext,
+      TezEngineTaskContext taskContext,
       TezTaskUmbilicalProtocol master, 
       Credentials credentials, Token<JobTokenIdentifier> jt) 
       throws IOException, InterruptedException {

Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java?rev=1470653&r1=1470652&r2=1470653&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java Mon Apr 22 18:28:58 2013
@@ -56,8 +56,8 @@ import org.apache.hadoop.yarn.state.Stat
 import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.hadoop.yarn.util.RackResolver;
 import org.apache.hadoop.yarn.util.Records;
-import org.apache.tez.common.TezEngineTask;
-import org.apache.tez.common.TezTask;
+import org.apache.tez.common.TezEngineTaskContext;
+import org.apache.tez.common.TezTaskContext;
 import org.apache.tez.common.counters.DAGCounter;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
@@ -300,11 +300,11 @@ public class TaskAttemptImpl implements 
     return attemptId;
   }
   
-  TezTask createRemoteTask() {
+  TezTaskContext createRemoteTask() {
     Vertex vertex = getTask().getVertex();
 
     // TODO  TEZ-50 user and jobname
-    return new TezEngineTask(getID(), "user", "jobname", getTask()
+    return new TezEngineTaskContext(getID(), "user", "jobname", getTask()
         .getVertex().getName(), mrxModuleClassName,
         vertex.getInputSpecList(), vertex.getOutputSpecList());
   }
@@ -866,7 +866,7 @@ public class TaskAttemptImpl implements 
       // recovery.
 
       // Create the remote task.
-      TezTask remoteTaskContext = ta.createRemoteTask();
+      TezTaskContext remoteTaskContext = ta.createRemoteTask();
       // Create startTaskRequest
 
       String[] hostArray = new String[0];
@@ -1285,4 +1285,4 @@ public class TaskAttemptImpl implements 
   public Map<String, String> getEnvironment() {
     return this.environment;
   }
-}
\ No newline at end of file
+}

Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java?rev=1470653&r1=1470652&r2=1470653&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java Mon Apr 22 18:28:58 2013
@@ -24,7 +24,7 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.tez.common.TezTask;
+import org.apache.tez.common.TezTaskContext;
 import org.apache.tez.dag.app.dag.TaskAttempt;
 import org.apache.tez.engine.common.security.JobTokenIdentifier;
 import org.apache.tez.engine.records.TezTaskAttemptID;
@@ -37,7 +37,7 @@ public class AMSchedulerEventTALaunchReq
   private final TezTaskAttemptID attemptId;
   private final Resource capability;
   private final Map<String, LocalResource> localResources;
-  private final TezTask remoteTaskContext;
+  private final TezTaskContext remoteTaskContext;
   private final TaskAttempt taskAttempt;
   private final Credentials credentials;
   private Token<JobTokenIdentifier> jobToken;
@@ -50,7 +50,7 @@ public class AMSchedulerEventTALaunchReq
   public AMSchedulerEventTALaunchRequest(TezTaskAttemptID attemptId,
       Resource capability,
       Map<String, LocalResource> localResources,
-      TezTask remoteTaskContext, TaskAttempt ta,
+      TezTaskContext remoteTaskContext, TaskAttempt ta,
       Credentials credentials, Token<JobTokenIdentifier> jobToken,
       String[] hosts, String[] racks, Priority priority,
       Map<String, String> environment) {
@@ -88,7 +88,7 @@ public class AMSchedulerEventTALaunchReq
     return priority;
   }
   
-  public TezTask getRemoteTaskContext() {
+  public TezTaskContext getRemoteTaskContext() {
     return remoteTaskContext;
   }
   

Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java?rev=1470653&r1=1470652&r2=1470653&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java Mon Apr 22 18:28:58 2013
@@ -18,23 +18,23 @@
 package org.apache.tez.dag.app.rm.container;
 
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.tez.common.TezTask;
+import org.apache.tez.common.TezTaskContext;
 import org.apache.tez.engine.records.TezTaskAttemptID;
 
 public class AMContainerEventAssignTA extends AMContainerEvent {
 
   private final TezTaskAttemptID attemptId;
   // TODO Maybe have tht TAL pull the remoteTask from the TaskAttempt itself ?
-  private final TezTask remoteTaskContext;
+  private final TezTaskContext remoteTaskContext;
   
   public AMContainerEventAssignTA(ContainerId containerId,
       TezTaskAttemptID attemptId, Object remoteTaskContext) {
     super(containerId, AMContainerEventType.C_ASSIGN_TA);
     this.attemptId = attemptId;
-    this.remoteTaskContext = (TezTask)remoteTaskContext;
+    this.remoteTaskContext = (TezTaskContext)remoteTaskContext;
   }
   
-  public TezTask getRemoteTaskContext() {
+  public TezTaskContext getRemoteTaskContext() {
     return this.remoteTaskContext;
   }
   

Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java?rev=1470653&r1=1470652&r2=1470653&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java Mon Apr 22 18:28:58 2013
@@ -42,7 +42,7 @@ import org.apache.hadoop.yarn.state.Mult
 import org.apache.hadoop.yarn.state.SingleArcTransition;
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
-import org.apache.tez.common.TezTask;
+import org.apache.tez.common.TezTaskContext;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ContainerHeartbeatHandler;
 import org.apache.tez.dag.app.TaskAttemptListener;
@@ -75,8 +75,8 @@ public class AMContainerImpl implements 
   private final List<TezTaskAttemptID> completedAttempts = new LinkedList<TezTaskAttemptID>();
 
   // TODO Maybe this should be pulled from the TaskAttempt.s
-  private final Map<TezTaskAttemptID, TezTask> remoteTaskMap =
-      new HashMap<TezTaskAttemptID, TezTask>();
+  private final Map<TezTaskAttemptID, TezTaskContext> remoteTaskMap =
+      new HashMap<TezTaskAttemptID, TezTaskContext>();
   
   // TODO ?? Convert to list and hash.
   

Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerTask.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerTask.java?rev=1470653&r1=1470652&r2=1470653&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerTask.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerTask.java Mon Apr 22 18:28:58 2013
@@ -18,13 +18,13 @@
 
 package org.apache.tez.dag.app.rm.container;
 
-import org.apache.tez.common.TezTask;
+import org.apache.tez.common.TezTaskContext;
 
 public class AMContainerTask {
   private final boolean shouldDie;
-  private final TezTask tezTask;
+  private final TezTaskContext tezTask;
 
-  public AMContainerTask(boolean shouldDie, TezTask tezTask) {
+  public AMContainerTask(boolean shouldDie, TezTaskContext tezTask) {
     this.shouldDie = shouldDie;
     this.tezTask = tezTask;
   }
@@ -33,7 +33,7 @@ public class AMContainerTask {
     return this.shouldDie;
   }
 
-  public TezTask getTask() {
+  public TezTaskContext getTask() {
     return this.tezTask;
   }
 }

Modified: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/common/ContainerTask.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/common/ContainerTask.java?rev=1470653&r1=1470652&r2=1470653&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/common/ContainerTask.java (original)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/common/ContainerTask.java Mon Apr 22 18:28:58 2013
@@ -25,18 +25,18 @@ import org.apache.hadoop.io.Writable;
 
 public class ContainerTask implements Writable {
 
-  TezEngineTask tezEngineTask;
+  TezEngineTaskContext tezEngineTask;
   boolean shouldDie;
 
   public ContainerTask() {
   }
 
-  public ContainerTask(TezTask tezTaskContext, boolean shouldDie) {
-    this.tezEngineTask = (TezEngineTask)tezTaskContext;
+  public ContainerTask(TezTaskContext tezTaskContext, boolean shouldDie) {
+    this.tezEngineTask = (TezEngineTaskContext)tezTaskContext;
     this.shouldDie = shouldDie;
   }
 
-  public TezEngineTask getTezEngineTaskContext() {
+  public TezEngineTaskContext getTezEngineTaskContext() {
     return tezEngineTask;
   }
 
@@ -60,7 +60,7 @@ public class ContainerTask implements Wr
     shouldDie = in.readBoolean();
     boolean taskComing = in.readBoolean();
     if (taskComing) {
-      tezEngineTask = new TezEngineTask();
+      tezEngineTask = new TezEngineTaskContext();
       tezEngineTask.readFields(in);
     }
   }

Added: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/common/RunningTaskContext.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/common/RunningTaskContext.java?rev=1470653&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/common/RunningTaskContext.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/common/RunningTaskContext.java Mon Apr 22 18:28:58 2013
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.common;
+
+import java.io.IOException;
+
+import javax.crypto.SecretKey;
+
+import org.apache.hadoop.util.Progress;
+import org.apache.tez.engine.api.Partitioner;
+import org.apache.tez.engine.api.Processor;
+
+public class RunningTaskContext {
+  
+  protected SecretKey jobTokenSecret;
+  protected TezTaskReporter reporter;
+  protected Partitioner partitioner;
+  protected Processor combineProcessor;
+  protected TezTaskStatus status;
+  protected Progress progress = new Progress();
+
+  public Progress getProgress() {
+    return progress;
+  }
+
+  public void setJobTokenSecret(SecretKey jobTokenSecret) {
+    this.jobTokenSecret = jobTokenSecret;
+  }
+
+  public TezTaskStatus getStatus() {
+    return status;
+  }
+
+  public TezTaskReporter getTaskReporter() {
+    return reporter;
+  }
+
+  // TODO Doesn't belong here.
+  public Processor getCombineProcessor() {
+    return combineProcessor;
+  }
+
+  // TODO Doesn't belong here.
+  public Partitioner getPartitioner() {
+    return partitioner;
+  }
+
+  // TODO Doesn't belong here.
+  public SecretKey getJobTokenSecret() {
+    return jobTokenSecret;
+  }
+  
+  public void statusUpdate() throws IOException, InterruptedException {
+  }
+}

Added: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/common/TezEngineTaskContext.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/common/TezEngineTaskContext.java?rev=1470653&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/common/TezEngineTaskContext.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/common/TezEngineTaskContext.java Mon Apr 22 18:28:58 2013
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.Text;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+
+public class TezEngineTaskContext extends TezTaskContext {
+
+  // These two could be replaced by a TezConfiguration / DagSpec.
+  private List<InputSpec> inputSpecList;
+  private List<OutputSpec> outputSpecList;
+  private String taskModuleClassName;
+  
+  public TezEngineTaskContext() {
+    super();
+  }
+
+  public TezEngineTaskContext(TezTaskAttemptID taskAttemptID, String user,
+      String jobName, String vertexName, String moduleClassName,
+      List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList) {
+    super(taskAttemptID, user, jobName, vertexName);
+    this.inputSpecList = inputSpecList;
+    this.outputSpecList = outputSpecList;
+    if (this.inputSpecList == null) {
+      inputSpecList = new ArrayList<InputSpec>(0);
+    }
+    if (this.outputSpecList == null) {
+      outputSpecList = new ArrayList<OutputSpec>(0);
+    }
+    this.inputSpecList = inputSpecList;
+    this.outputSpecList = outputSpecList;
+    this.taskModuleClassName = moduleClassName;
+  }
+
+  public String getTaskModuleClassName() {
+    return taskModuleClassName;
+  }
+  
+  public List<InputSpec> getInputSpecList() {
+    return this.inputSpecList;
+  }
+  
+  public List<OutputSpec> getOutputSpecList() {
+    return this.outputSpecList;
+  }
+  
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    Text.writeString(out, taskModuleClassName);
+    out.writeInt(inputSpecList.size());
+    for (InputSpec inputSpec : inputSpecList) {
+      inputSpec.write(out);
+    }
+    out.writeInt(outputSpecList.size());
+    for (OutputSpec outputSpec : outputSpecList) {
+      outputSpec.write(out);
+    }
+  }
+  
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    
+    taskModuleClassName = Text.readString(in);
+    int numInputSpecs = in.readInt();
+    inputSpecList = new ArrayList<InputSpec>(numInputSpecs);
+    for (int i = 0; i < numInputSpecs; i++) {
+      InputSpec inputSpec = new InputSpec();
+      inputSpec.readFields(in);
+      inputSpecList.add(inputSpec);
+    }
+    int numOutputSpecs = in.readInt();
+    outputSpecList = new ArrayList<OutputSpec>(numOutputSpecs);
+    for (int i = 0; i < numOutputSpecs; i++) {
+      OutputSpec outputSpec = new OutputSpec();
+      outputSpec.readFields(in);
+      outputSpecList.add(outputSpec);
+    }
+  }
+}

Modified: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/LocalShuffle.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/LocalShuffle.java?rev=1470653&r1=1470652&r2=1470653&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/LocalShuffle.java (original)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/LocalShuffle.java Mon Apr 22 18:28:58 2013
@@ -29,11 +29,12 @@ import org.apache.hadoop.io.RawComparato
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.common.RunningTaskContext;
+import org.apache.tez.common.TezEngineTaskContext;
 import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.TezTask;
 import org.apache.tez.common.TezTaskReporter;
-import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.engine.common.ConfigUtils;
 import org.apache.tez.engine.common.sort.impl.TezMerger;
 import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
@@ -43,7 +44,8 @@ import org.apache.tez.engine.common.task
 @SuppressWarnings({"rawtypes"})
 public class LocalShuffle {
 
-  private final TezTask task;
+  private final TezEngineTaskContext taskContext;
+  private final RunningTaskContext runningTaskContext;
   private final Configuration conf;
   private final int tasksInDegree;
 
@@ -58,11 +60,13 @@ public class LocalShuffle {
   private final CompressionCodec codec;
   private final TezTaskOutput mapOutputFile;
 
-  public LocalShuffle(TezTask task, 
+  public LocalShuffle(TezEngineTaskContext taskContext, 
+      RunningTaskContext runningTaskContext, 
       Configuration conf,
       TezTaskReporter reporter
       ) throws IOException {
-    this.task = task;
+    this.taskContext = taskContext;
+    this.runningTaskContext = runningTaskContext;
     this.conf = conf;
     this.keyClass = ConfigUtils.getMapOutputKeyClass(conf);
     this.valClass = ConfigUtils.getMapOutputValueClass(conf);
@@ -87,10 +91,7 @@ public class LocalShuffle {
       this.codec = null;
     }
 
-    this.tasksInDegree = 
-        conf.getInt(
-            TezJobConfig.TEZ_ENGINE_TASK_INDEGREE, 
-            TezJobConfig.DEFAULT_TEZ_ENGINE_TASK_INDEGREE);
+    this.tasksInDegree = taskContext.getInputSpecList().get(0).getNumInputs();
 
     // Always local
     this.mapOutputFile = new TezLocalTaskOutputFiles();
@@ -100,7 +101,7 @@ public class LocalShuffle {
   
   public TezRawKeyValueIterator run() throws IOException {
     // Copy is complete, obviously! 
-    this.task.getProgress().addPhase("copy", 0.33f).complete();
+    this.runningTaskContext.getProgress().addPhase("copy", 0.33f).complete();
 
     // Merge
     return TezMerger.merge(conf, rfs, 
@@ -109,9 +110,9 @@ public class LocalShuffle {
         getMapFiles(),
         false, 
         sortFactor,
-        new Path(task.getTaskAttemptId().toString()), 
+        new Path(taskContext.getTaskAttemptId().toString()), 
         comparator,
-        task.getTaskReporter(), spilledRecordsCounter, null, null);
+        runningTaskContext.getTaskReporter(), spilledRecordsCounter, null, null);
   }
   
   private Path[] getMapFiles() 

Modified: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java?rev=1470653&r1=1470652&r2=1470653&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java (original)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java Mon Apr 22 18:28:58 2013
@@ -27,8 +27,9 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.util.Progress;
+import org.apache.tez.common.RunningTaskContext;
+import org.apache.tez.common.TezEngineTaskContext;
 import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.TezTask;
 import org.apache.tez.common.TezTaskReporter;
 import org.apache.tez.common.TezTaskStatus;
 import org.apache.tez.common.counters.TezCounter;
@@ -46,7 +47,8 @@ public class Shuffle implements Exceptio
   private static final int MIN_EVENTS_TO_FETCH = 100;
   private static final int MAX_RPC_OUTSTANDING_EVENTS = 3000000;
 
-  private final TezTask task;
+  private final TezEngineTaskContext taskContext;
+  private final RunningTaskContext runningTaskContext;
   private final Configuration conf;
   private final TezTaskReporter reporter;
   private final ShuffleClientMetrics metrics;
@@ -59,28 +61,31 @@ public class Shuffle implements Exceptio
   private final Progress mergePhase;
   private final int tasksInDegree;
   
-  public Shuffle(TezTask task, 
+  public Shuffle(TezEngineTaskContext taskContext,
+                 RunningTaskContext runningTaskContext,
                  Configuration conf,
                  int tasksInDegree,
                  TezTaskReporter reporter,
                  Processor combineProcessor
                  ) throws IOException {
-    this.task = task;
+    this.taskContext = taskContext;
+    this.runningTaskContext = runningTaskContext;
     this.conf = conf;
     this.reporter = reporter;
     this.metrics = 
         new ShuffleClientMetrics(
-            task.getTaskAttemptId(), this.conf, 
-            this.task.getUser(), this.task.getJobName());
+            taskContext.getTaskAttemptId(), this.conf, 
+            this.taskContext.getUser(), this.taskContext.getJobName());
     this.tasksInDegree = tasksInDegree;
     
     FileSystem localFS = FileSystem.getLocal(this.conf);
     LocalDirAllocator localDirAllocator = 
         new LocalDirAllocator(TezJobConfig.LOCAL_DIR);
     
-    copyPhase = this.task.getProgress().addPhase("copy", 0.33f);
-    mergePhase = this.task.getProgress().addPhase("merge", 0.66f);
+    copyPhase = this.runningTaskContext.getProgress().addPhase("copy", 0.33f);
+    mergePhase = this.runningTaskContext.getProgress().addPhase("merge", 0.66f);
 
+    // TODO TEZ Get rid of Map / Reduce references.
     TezCounter shuffledMapsCounter = 
         reporter.getCounter(TaskCounter.SHUFFLED_MAPS);
     TezCounter reduceShuffleBytes =
@@ -95,12 +100,12 @@ public class Shuffle implements Exceptio
         reporter.getCounter(TaskCounter.MERGED_MAP_OUTPUTS);
     
     scheduler = 
-      new ShuffleScheduler(this.conf, tasksInDegree, task.getStatus(), 
+      new ShuffleScheduler(this.conf, tasksInDegree, runningTaskContext.getStatus(), 
                                 this, copyPhase, 
                                 shuffledMapsCounter, 
                                 reduceShuffleBytes, 
                                 failedShuffleCounter);
-    merger = new MergeManager(this.task.getTaskAttemptId(), 
+    merger = new MergeManager(this.taskContext.getTaskAttemptId(), 
                                     this.conf, localFS, 
                                     localDirAllocator, reporter, 
                                     combineProcessor, 
@@ -120,7 +125,7 @@ public class Shuffle implements Exceptio
 
     // Start the map-completion events fetcher thread
     final EventFetcher eventFetcher = 
-      new EventFetcher(task.getTaskAttemptId(), reporter, scheduler, this,
+      new EventFetcher(taskContext.getTaskAttemptId(), reporter, scheduler, this,
           maxEventsToFetch);
     eventFetcher.start();
     
@@ -131,10 +136,10 @@ public class Shuffle implements Exceptio
             TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_PARALLEL_COPIES);
     Fetcher[] fetchers = new Fetcher[numFetchers];
     for (int i=0; i < numFetchers; ++i) {
-      fetchers[i] = new Fetcher(conf, task.getTaskAttemptId(), 
+      fetchers[i] = new Fetcher(conf, taskContext.getTaskAttemptId(), 
                                      scheduler, merger, 
                                      reporter, metrics, this, 
-                                     task.getJobTokenSecret());
+                                     runningTaskContext.getJobTokenSecret());
       fetchers[i].start();
     }
     
@@ -163,9 +168,9 @@ public class Shuffle implements Exceptio
     scheduler.close();
 
     copyPhase.complete(); // copy is already complete
-    task.getStatus().setPhase(TezTaskStatus.Phase.SORT);
+    runningTaskContext.getStatus().setPhase(TezTaskStatus.Phase.SORT);
     
-    task.statusUpdate();
+    runningTaskContext.statusUpdate();
     
     // Finish the on-going merges...
     TezRawKeyValueIterator kvIter = null;

Modified: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/server/ShuffleHandler.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/server/ShuffleHandler.java?rev=1470653&r1=1470652&r2=1470653&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/server/ShuffleHandler.java (original)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/server/ShuffleHandler.java Mon Apr 22 18:28:58 2013
@@ -62,8 +62,8 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices;
 import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.tez.common.RunningTaskContext;
 import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.TezTask;
 import org.apache.tez.engine.common.security.JobTokenIdentifier;
 import org.apache.tez.engine.common.security.JobTokenSecretManager;
 import org.apache.tez.engine.common.security.SecureShuffleUtils;
@@ -241,7 +241,7 @@ public class ShuffleHandler extends Abst
     userRsrc.remove(appId.toString());
   }
 
-  public synchronized void init(Configuration conf, TezTask task) {
+  public synchronized void init(Configuration conf, RunningTaskContext task) {
     this.init(conf);
     tokenSecret = task.getJobTokenSecret();
   }

Modified: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/SortingOutput.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/SortingOutput.java?rev=1470653&r1=1470652&r2=1470653&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/SortingOutput.java (original)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/SortingOutput.java Mon Apr 22 18:28:58 2013
@@ -17,7 +17,7 @@
  */
 package org.apache.tez.engine.common.sort;
 
-import org.apache.tez.common.TezTask;
+import org.apache.tez.common.RunningTaskContext;
 import org.apache.tez.engine.api.Output;
 
 /**
@@ -26,6 +26,7 @@ import org.apache.tez.engine.api.Output;
  */
 public interface SortingOutput extends Output {
   
-  public void setTask(TezTask task);
+  // TODO PreCommit rename
+  public void setTask(RunningTaskContext runningTaskContext);
   
 }

Modified: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java?rev=1470653&r1=1470652&r2=1470653&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java (original)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java Mon Apr 22 18:28:58 2013
@@ -40,10 +40,10 @@ import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.QuickSort;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tez.common.Constants;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.TezTask;
-import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.RunningTaskContext;
+import org.apache.tez.common.TezEngineTaskContext;
 import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.engine.api.Master;
 import org.apache.tez.engine.api.Partitioner;
 import org.apache.tez.engine.api.Processor;
@@ -71,7 +71,8 @@ public abstract class ExternalSorter {
 
   protected Processor combineProcessor;
   protected Partitioner partitioner;
-  protected TezTask task;
+  protected TezEngineTaskContext task;
+  protected RunningTaskContext runningTaskContext;
   protected Configuration job;
   protected FileSystem rfs;
   protected TezTaskOutput mapOutputFile;
@@ -102,10 +103,11 @@ public abstract class ExternalSorter {
     LOG.info("TEZ_ENGINE_TASK_ATTEMPT_ID: " + 
         job.get(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID));
 
-    partitions = 
-        job.getInt(
-            TezJobConfig.TEZ_ENGINE_TASK_OUTDEGREE, 
-            TezJobConfig.DEFAULT_TEZ_ENGINE_TASK_OUTDEGREE);
+    partitions = task.getOutputSpecList().get(0).getNumOutputs();
+//    partitions = 
+//        job.getInt(
+//            TezJobConfig.TEZ_ENGINE_TASK_OUTDEGREE, 
+//            TezJobConfig.DEFAULT_TEZ_ENGINE_TASK_OUTDEGREE);
     rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();
     
     // sorter
@@ -123,14 +125,14 @@ public abstract class ExternalSorter {
     
     //    counters
     mapOutputByteCounter = 
-        task.getTaskReporter().getCounter(TaskCounter.MAP_OUTPUT_BYTES);
+        runningTaskContext.getTaskReporter().getCounter(TaskCounter.MAP_OUTPUT_BYTES);
     mapOutputRecordCounter =
-      task.getTaskReporter().getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
+      runningTaskContext.getTaskReporter().getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
     fileOutputByteCounter = 
-        task.getTaskReporter().
+        runningTaskContext.getTaskReporter().
             getCounter(TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES);
     spilledRecordsCounter = 
-        task.getTaskReporter().getCounter(TaskCounter.SPILLED_RECORDS);
+        runningTaskContext.getTaskReporter().getCounter(TaskCounter.SPILLED_RECORDS);
     // compression
     if (ConfigUtils.getCompressMapOutput(job)) {
       Class<? extends CompressionCodec> codecClass =
@@ -149,7 +151,7 @@ public abstract class ExternalSorter {
 //    LOG.info("XXX mapOutputFile: " + mapOutputFile.getClass());
     
     // sortPhase
-    sortPhase  = task.getProgress().addPhase("sort", 0.333f);
+    sortPhase  = runningTaskContext.getProgress().addPhase("sort", 0.333f);
   }
 
   /**
@@ -163,8 +165,8 @@ public abstract class ExternalSorter {
     }
   }
 
-  public void setTask(TezTask task) {
-    this.task = task;
+  public void setTask(RunningTaskContext task) {
+    this.runningTaskContext = task;
     this.combineProcessor = task.getCombineProcessor();
     this.partitioner = task.getPartitioner();
   }
@@ -182,10 +184,10 @@ public abstract class ExternalSorter {
       Writer writer) throws IOException, InterruptedException {
 
     CombineInput combineIn = new CombineInput(kvIter);
-    combineIn.initialize(job, task.getTaskReporter());
+    combineIn.initialize(job, runningTaskContext.getTaskReporter());
 
     CombineOutput combineOut = new CombineOutput(writer);
-    combineOut.initialize(job, task.getTaskReporter());
+    combineOut.initialize(job, runningTaskContext.getTaskReporter());
 
     combineProcessor.process(combineIn, combineOut);
 
@@ -216,8 +218,12 @@ public abstract class ExternalSorter {
 //    LOG.info("XXX sameVolRename src=" + src + ", dst=" + dst);
   }
 
-  public ExternalSorter() {
-    super();
+//  public ExternalSorter() {
+//    super();
+//  }
+  
+  public ExternalSorter(TezEngineTaskContext tezEngineTask) {
+    this.task = tezEngineTask;
   }
 
   public InputStream getSortedStream(int partition) {

Modified: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/PipelinedSorter.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/PipelinedSorter.java?rev=1470653&r1=1470652&r2=1470653&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/PipelinedSorter.java (original)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/PipelinedSorter.java Mon Apr 22 18:28:58 2013
@@ -43,8 +43,8 @@ import org.apache.hadoop.io.RawComparato
 import org.apache.hadoop.util.IndexedSortable;
 import org.apache.hadoop.util.IndexedSorter;
 import org.apache.hadoop.util.Progress;
+import org.apache.tez.common.TezEngineTaskContext;
 import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.TezTask;
 import org.apache.tez.engine.api.Master;
 import org.apache.tez.engine.common.ConfigUtils;
 import org.apache.tez.engine.common.sort.SortingOutput;
@@ -98,8 +98,9 @@ public class PipelinedSorter extends Ext
 
   @Inject
   public PipelinedSorter(
-      @Assisted TezTask task
+      @Assisted TezEngineTaskContext task
       ) throws IOException {
+    super(task);
   }
 
   public void initialize(Configuration conf, Master master) 
@@ -211,7 +212,7 @@ public class PipelinedSorter extends Ext
    */
   synchronized void collect(Object key, Object value, final int partition
                                    ) throws IOException {
-    task.getTaskReporter().progress();
+    runningTaskContext.getTaskReporter().progress();
     if (key.getClass() != keyClass) {
       throw new IOException("Type mismatch in key from map: expected "
                             + keyClass.getName() + ", received "
@@ -267,7 +268,7 @@ public class PipelinedSorter extends Ext
     }
     mapOutputRecordCounter.increment(1);
     mapOutputByteCounter.increment(valend - keystart);
-    task.getTaskReporter().progress();
+    runningTaskContext.getTaskReporter().progress();
   }
 
   public void spill() throws IOException { 
@@ -392,7 +393,7 @@ public class PipelinedSorter extends Ext
                      segmentList, mergeFactor,
                      new Path(mapId.toString()),
                      (RawComparator)ConfigUtils.getOutputKeyComparator(job), 
-                     task.getTaskReporter(), sortSegments,
+                     runningTaskContext.getTaskReporter(), sortSegments,
                      null, spilledRecordsCounter, sortPhase.phase());
 
       //write merged output to disk
@@ -402,7 +403,7 @@ public class PipelinedSorter extends Ext
                            spilledRecordsCounter);
       writer.setRLE(merger.needsRLE());
       if (combineProcessor == null || numSpills < minSpillsForCombine) {
-        TezMerger.writeFile(kvIter, writer, task.getTaskReporter(), job);
+        TezMerger.writeFile(kvIter, writer, runningTaskContext.getTaskReporter(), job);
       } else {
     	runCombineProcessor(kvIter, writer);
       }
@@ -525,7 +526,7 @@ public class PipelinedSorter extends Ext
       kj = new byte[keymax];
       LOG.info("begin sorting Span"+index + " ("+length()+")");
       if(length() > 1) {
-        sorter.sort(this, 0, length(), task.getTaskReporter());
+        sorter.sort(this, 0, length(), runningTaskContext.getTaskReporter());
       }
       LOG.info("done sorting Span"+index);
       return new SpanIterator(this);
@@ -958,4 +959,5 @@ public class PipelinedSorter extends Ext
   public OutputContext getOutputContext() {
     return null;
   }
+
 }

Modified: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java?rev=1470653&r1=1470652&r2=1470653&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java (original)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java Mon Apr 22 18:28:58 2013
@@ -39,8 +39,9 @@ import org.apache.hadoop.io.RawComparato
 import org.apache.hadoop.util.IndexedSortable;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.tez.common.TezEngineTaskContext;
 import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.TezTask;
+import org.apache.tez.common.TezTaskContext;
 import org.apache.tez.engine.api.Master;
 import org.apache.tez.engine.common.ConfigUtils;
 import org.apache.tez.engine.common.sort.impl.ExternalSorter;
@@ -116,8 +117,10 @@ public class DefaultSorter extends Exter
 
   @Inject
   public DefaultSorter(
-      @Assisted TezTask task
+      @Assisted TezTaskContext task
       ) throws IOException {
+    // Does this assisted inject work ?
+    super((TezEngineTaskContext)task);
   }
 
   @Override
@@ -209,7 +212,7 @@ public class DefaultSorter extends Exter
    */
   synchronized void collect(Object key, Object value, final int partition
                                    ) throws IOException {
-    task.getTaskReporter().progress();
+    runningTaskContext.getTaskReporter().progress();
     if (key.getClass() != keyClass) {
       throw new IOException("Type mismatch in key from map: expected "
                             + keyClass.getName() + ", received "
@@ -574,7 +577,7 @@ public class DefaultSorter extends Exter
               // wait for spill
               try {
                 while (spillInProgress) {
-                  task.getTaskReporter().progress();
+                  runningTaskContext.getTaskReporter().progress();
                   spillDone.await();
                 }
               } catch (InterruptedException e) {
@@ -606,7 +609,7 @@ public class DefaultSorter extends Exter
     spillLock.lock();
     try {
       while (spillInProgress) {
-        task.getTaskReporter().progress();
+        runningTaskContext.getTaskReporter().progress();
         spillDone.await();
       }
       checkSpillException();
@@ -702,7 +705,7 @@ public class DefaultSorter extends Exter
       if (lspillException instanceof Error) {
         final String logMsg = "Task " + task.getTaskAttemptId() + " failed : " +
           StringUtils.stringifyException(lspillException);
-        task.getTaskReporter().reportFatalError(
+        runningTaskContext.getTaskReporter().reportFatalError(
             task.getTaskAttemptId(), lspillException, logMsg);
       }
       throw new IOException("Spill failed", lspillException);
@@ -741,7 +744,7 @@ public class DefaultSorter extends Exter
       throws IOException, InterruptedException {
     final int mstart = getMetaStart();
     final int mend = getMetaEnd();
-    sorter.sort(this, mstart, mend, task.getTaskReporter());
+    sorter.sort(this, mstart, mend, runningTaskContext.getTaskReporter());
     spill(mstart, mend); 
   }
   
@@ -1091,7 +1094,7 @@ public class DefaultSorter extends Exter
                        segmentList, mergeFactor,
                        new Path(mapId.toString()),
                        (RawComparator)ConfigUtils.getOutputKeyComparator(job), 
-                       task.getTaskReporter(), sortSegments,
+                       runningTaskContext.getTaskReporter(), sortSegments,
                        null, spilledRecordsCounter, 
                        sortPhase.phase());
 
@@ -1101,7 +1104,7 @@ public class DefaultSorter extends Exter
             new Writer(job, finalOut, keyClass, valClass, codec,
                              spilledRecordsCounter);
         if (combineProcessor == null || numSpills < minSpillsForCombine) {
-          TezMerger.writeFile(kvIter, writer, task.getTaskReporter(), job);
+          TezMerger.writeFile(kvIter, writer, runningTaskContext.getTaskReporter(), job);
           writer.close();
         } else {
           runCombineProcessor(kvIter, writer);

Modified: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/InMemoryShuffleSorter.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/InMemoryShuffleSorter.java?rev=1470653&r1=1470652&r2=1470653&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/InMemoryShuffleSorter.java (original)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/InMemoryShuffleSorter.java Mon Apr 22 18:28:58 2013
@@ -29,7 +29,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.util.DataChecksum;
-import org.apache.tez.common.TezTask;
+import org.apache.tez.common.TezTaskContext;
 import org.apache.tez.engine.api.Master;
 import org.apache.tez.engine.common.shuffle.impl.ShuffleHeader;
 import org.apache.tez.engine.common.shuffle.server.ShuffleHandler;
@@ -57,7 +57,7 @@ public class InMemoryShuffleSorter exten
   
   @Inject
   public InMemoryShuffleSorter(
-      @Assisted TezTask task
+      @Assisted TezTaskContext task
       ) throws IOException {
     super(task);
   }
@@ -66,7 +66,7 @@ public class InMemoryShuffleSorter exten
   public void initialize(Configuration conf, Master master) throws IOException,
       InterruptedException {
     super.initialize(conf, master);
-    shuffleHandler.init(conf, task);
+    shuffleHandler.init(conf, runningTaskContext);
   }
 
   @Override

Modified: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/input/LocalMergedInput.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/input/LocalMergedInput.java?rev=1470653&r1=1470652&r2=1470653&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/input/LocalMergedInput.java (original)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/input/LocalMergedInput.java Mon Apr 22 18:28:58 2013
@@ -20,7 +20,7 @@ package org.apache.tez.engine.lib.input;
 import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.TezTask;
+import org.apache.tez.common.TezEngineTaskContext;
 import org.apache.tez.common.TezTaskReporter;
 import org.apache.tez.engine.api.Input;
 import org.apache.tez.engine.api.Master;
@@ -38,18 +38,15 @@ import com.google.inject.assistedinject.
 public class LocalMergedInput extends ShuffledMergedInput {
 
   TezRawKeyValueIterator rIter = null;
-
-  private final TezTask task;
   
   private Configuration conf;
   private CombineInput raw;
 
   @Inject
   public LocalMergedInput(
-      @Assisted TezTask task
+      @Assisted TezEngineTaskContext task
       ) {
     super(task);
-    this.task = task;
   }
 
   public void initialize(Configuration conf, Master master) throws IOException,
@@ -57,7 +54,7 @@ public class LocalMergedInput extends Sh
     this.conf = conf;
 
     LocalShuffle shuffle =
-        new LocalShuffle(task, this.conf, (TezTaskReporter)master);
+        new LocalShuffle(task, runningTaskContext, this.conf, (TezTaskReporter)master);
     rIter = shuffle.run();
     raw = new CombineInput(rIter);
   }

Modified: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java?rev=1470653&r1=1470652&r2=1470653&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java (original)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java Mon Apr 22 18:28:58 2013
@@ -22,8 +22,8 @@ import java.io.IOException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.TezTask;
+import org.apache.tez.common.RunningTaskContext;
+import org.apache.tez.common.TezEngineTaskContext;
 import org.apache.tez.common.TezTaskReporter;
 import org.apache.tez.engine.api.Input;
 import org.apache.tez.engine.api.Master;
@@ -43,19 +43,21 @@ public class ShuffledMergedInput impleme
   static final Log LOG = LogFactory.getLog(ShuffledMergedInput.class);
   TezRawKeyValueIterator rIter = null;
 
-  private TezTask task;
+  protected TezEngineTaskContext task;
+  protected RunningTaskContext runningTaskContext;
   
   private Configuration conf;
   private CombineInput raw;
 
   @Inject
   public ShuffledMergedInput(
-      @Assisted TezTask task
+      @Assisted TezEngineTaskContext task
       ) {
+    this.task = task;
   }
 
-  public void setTask(TezTask task) {
-    this.task = task;
+  public void setTask(RunningTaskContext runningTaskContext) {
+    this.runningTaskContext = runningTaskContext;
   }
   
   public void initialize(Configuration conf, Master master) throws IOException,
@@ -64,12 +66,10 @@ public class ShuffledMergedInput impleme
     
     Shuffle shuffle = 
       new Shuffle(
-          task, this.conf,      
-          this.conf.getInt(
-              TezJobConfig.TEZ_ENGINE_TASK_INDEGREE, 
-              TezJobConfig.DEFAULT_TEZ_ENGINE_TASK_INDEGREE),
+          task, runningTaskContext, this.conf, 
+          task.getInputSpecList().get(0).getNumInputs(),
           (TezTaskReporter)master, 
-          task.getCombineProcessor());
+          runningTaskContext.getCombineProcessor());
     rIter = shuffle.run();
     
     raw = new CombineInput(rIter);

Modified: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/output/InMemorySortedOutput.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/output/InMemorySortedOutput.java?rev=1470653&r1=1470652&r2=1470653&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/output/InMemorySortedOutput.java (original)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/output/InMemorySortedOutput.java Mon Apr 22 18:28:58 2013
@@ -20,7 +20,8 @@ package org.apache.tez.engine.lib.output
 import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.TezTask;
+import org.apache.tez.common.RunningTaskContext;
+import org.apache.tez.common.TezEngineTaskContext;
 import org.apache.tez.engine.api.Master;
 import org.apache.tez.engine.api.Output;
 import org.apache.tez.engine.common.sort.SortingOutput;
@@ -40,7 +41,7 @@ public class InMemorySortedOutput implem
   
   @Inject
   public InMemorySortedOutput(
-      @Assisted TezTask task
+      @Assisted TezEngineTaskContext task
       ) throws IOException {
     sorter = new InMemoryShuffleSorter(task);
   }
@@ -50,7 +51,7 @@ public class InMemorySortedOutput implem
     sorter.initialize(conf, master);
   }
 
-  public void setTask(TezTask task) {
+  public void setTask(RunningTaskContext task) {
     sorter.setTask(task);
   }
   

Modified: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java?rev=1470653&r1=1470652&r2=1470653&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java (original)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java Mon Apr 22 18:28:58 2013
@@ -20,12 +20,11 @@ package org.apache.tez.engine.lib.output
 
 import java.io.IOException;
 
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.tez.common.TezTask;
+import org.apache.tez.common.TezEngineTaskContext;
 import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
 
 import com.google.inject.Inject;
@@ -37,7 +36,7 @@ public class LocalOnFileSorterOutput ext
   
   @Inject
   public LocalOnFileSorterOutput(
-      @Assisted TezTask task
+      @Assisted TezEngineTaskContext task
       ) throws IOException {
     super(task);
   }

Modified: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java?rev=1470653&r1=1470652&r2=1470653&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java (original)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java Mon Apr 22 18:28:58 2013
@@ -20,7 +20,8 @@ package org.apache.tez.engine.lib.output
 import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.TezTask;
+import org.apache.tez.common.RunningTaskContext;
+import org.apache.tez.common.TezEngineTaskContext;
 import org.apache.tez.engine.api.Master;
 import org.apache.tez.engine.api.Output;
 import org.apache.tez.engine.common.sort.SortingOutput;
@@ -41,7 +42,7 @@ public class OnFileSortedOutput implemen
   
   @Inject
   public OnFileSortedOutput(
-      @Assisted TezTask task
+      @Assisted TezEngineTaskContext task
       ) throws IOException {
     sorter = new DefaultSorter(task);
   }
@@ -51,7 +52,8 @@ public class OnFileSortedOutput implemen
     sorter.initialize(conf, master);
   }
 
-  public void setTask(TezTask task) {
+  @Override
+  public void setTask(RunningTaskContext task) {
     sorter.setTask(task);
   }
   

Modified: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/InputFactory.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/InputFactory.java?rev=1470653&r1=1470652&r2=1470653&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/InputFactory.java (original)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/InputFactory.java Mon Apr 22 18:28:58 2013
@@ -18,11 +18,11 @@
 
 package org.apache.tez.engine.runtime;
 
-import org.apache.tez.common.TezTask;
+import org.apache.tez.common.TezEngineTaskContext;
 import org.apache.tez.engine.api.Input;
 
 public interface InputFactory {
   
-  Input create(TezTask task);
+  Input create(TezEngineTaskContext task);
   
 }

Modified: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/OutputFactory.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/OutputFactory.java?rev=1470653&r1=1470652&r2=1470653&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/OutputFactory.java (original)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/OutputFactory.java Mon Apr 22 18:28:58 2013
@@ -18,11 +18,11 @@
 
 package org.apache.tez.engine.runtime;
 
-import org.apache.tez.common.TezTask;
+import org.apache.tez.common.TezEngineTaskContext;
 import org.apache.tez.engine.api.Output;
 
 public interface OutputFactory {
   
-  Output create(TezTask task);
+  Output create(TezEngineTaskContext task);
   
 }

Modified: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/ProcessorFactory.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/ProcessorFactory.java?rev=1470653&r1=1470652&r2=1470653&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/ProcessorFactory.java (original)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/ProcessorFactory.java Mon Apr 22 18:28:58 2013
@@ -18,11 +18,11 @@
 
 package org.apache.tez.engine.runtime;
 
-import org.apache.tez.common.TezTask;
+import org.apache.tez.common.TezEngineTaskContext;
 import org.apache.tez.engine.api.Processor;
 
 public interface ProcessorFactory {
   
-  Processor create(TezTask task);
+  Processor create(TezEngineTaskContext task);
   
 }

Modified: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/TezEngineFactory.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/TezEngineFactory.java?rev=1470653&r1=1470652&r2=1470653&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/TezEngineFactory.java (original)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/TezEngineFactory.java Mon Apr 22 18:28:58 2013
@@ -18,11 +18,11 @@
 
 package org.apache.tez.engine.runtime;
 
-import org.apache.tez.common.TezTask;
+import org.apache.tez.common.TezEngineTaskContext;
 import org.apache.tez.engine.api.Task;
 
 public interface TezEngineFactory {
   
-  public Task createTask(TezTask taskContext);
+  public Task createTask(TezEngineTaskContext taskContext);
 
 }

Modified: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/TezEngineFactoryImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/TezEngineFactoryImpl.java?rev=1470653&r1=1470652&r2=1470653&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/TezEngineFactoryImpl.java (original)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/runtime/TezEngineFactoryImpl.java Mon Apr 22 18:28:58 2013
@@ -18,7 +18,7 @@
 
 package org.apache.tez.engine.runtime;
 
-import org.apache.tez.common.TezTask;
+import org.apache.tez.common.TezEngineTaskContext;
 import org.apache.tez.engine.api.Input;
 import org.apache.tez.engine.api.Output;
 import org.apache.tez.engine.api.Processor;
@@ -47,11 +47,10 @@ implements TezEngineFactory {
     this.taskFactory = taskFactory;
   }
   
-  public Task createTask(TezTask taskContext) {
+  public Task createTask(TezEngineTaskContext taskContext) {
     Input in = inputFactory.create(taskContext);
     Output out = outputFactory.create(taskContext);
     Processor processor = processorFactory.create(taskContext);
     return taskFactory.create(in, processor, out);
-  }
-  
+  }  
 }

Modified: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=1470653&r1=1470652&r2=1470653&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java Mon Apr 22 18:28:58 2013
@@ -63,7 +63,7 @@ import org.apache.hadoop.util.Reflection
 import org.apache.tez.common.Constants;
 import org.apache.tez.common.ContainerTask;
 import org.apache.tez.common.InputSpec;
-import org.apache.tez.common.TezEngineTask;
+import org.apache.tez.common.TezEngineTaskContext;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.TezTaskStatus;
 import org.apache.tez.common.counters.TezCounters;
@@ -240,8 +240,8 @@ public class LocalJobRunner implements C
           TezTaskAttemptID tezMapId =
               IDConverter.fromMRTaskAttemptId(mapId);
           mapIds.add(mapId);
-          TezEngineTask taskContext = 
-              new TezEngineTask(
+          TezEngineTaskContext taskContext = 
+              new TezEngineTaskContext(
                   tezMapId, user, localConf.getJobName(), "TODO_vertexName",
                   InitialTask.class.getName(), null, null);
           Injector injector = Guice.createInjector(new InitialTask());
@@ -444,7 +444,7 @@ public class LocalJobRunner implements C
             
             
             
-            TezEngineTask taskContext = new TezEngineTask(
+            TezEngineTaskContext taskContext = new TezEngineTaskContext(
                 IDConverter.fromMRTaskAttemptId(reduceId), user,
                 localConf.getJobName(), "TODO_vertexName", LocalFinalTask.class.getName(),
                 Collections.singletonList(new InputSpec("TODO_srcVertexName",

Modified: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/SimpleInput.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/SimpleInput.java?rev=1470653&r1=1470652&r2=1470653&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/SimpleInput.java (original)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/SimpleInput.java Mon Apr 22 18:28:58 2013
@@ -40,9 +40,9 @@ import org.apache.hadoop.mapreduce.Input
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
 import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.common.TezTask;
-import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.TezEngineTaskContext;
 import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.engine.api.Input;
 import org.apache.tez.engine.api.Master;
 import org.apache.tez.mapreduce.processor.MRTask;
@@ -87,7 +87,7 @@ public class SimpleInput implements Inpu
 
   @Inject
   public SimpleInput(
-      @Assisted TezTask task
+      @Assisted TezEngineTaskContext task
       ) 
   {}
   

Modified: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/SimpleOutput.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/SimpleOutput.java?rev=1470653&r1=1470652&r2=1470653&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/SimpleOutput.java (original)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/SimpleOutput.java Mon Apr 22 18:28:58 2013
@@ -25,7 +25,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem.Statistics;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.common.TezTask;
+import org.apache.tez.common.TezEngineTaskContext;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.engine.api.Master;
 import org.apache.tez.engine.api.Output;
@@ -65,7 +65,7 @@ public class SimpleOutput implements Out
   
   @Inject
   public SimpleOutput(
-      @Assisted TezTask task
+      @Assisted TezEngineTaskContext task
       ) {
   }
   

Modified: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java?rev=1470653&r1=1470652&r2=1470653&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java (original)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java Mon Apr 22 18:28:58 2013
@@ -41,24 +41,20 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobContext;
 import org.apache.hadoop.mapred.RawKeyValueIterator;
 import org.apache.hadoop.mapred.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter;
 import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
-import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
-import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
-import org.apache.hadoop.mapreduce.split.SplitMetaInfoReaderTez;
 import org.apache.hadoop.mapreduce.task.ReduceContextImpl;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
 import org.apache.tez.common.Constants;
-import org.apache.tez.common.TezEngineTask;
-import org.apache.tez.common.TezTask;
+import org.apache.tez.common.RunningTaskContext;
+import org.apache.tez.common.TezEngineTaskContext;
 import org.apache.tez.common.TezTaskStatus;
 import org.apache.tez.common.TezTaskStatus.Phase;
 import org.apache.tez.common.TezTaskStatus.State;
@@ -74,14 +70,13 @@ import org.apache.tez.mapreduce.hadoop.I
 import org.apache.tez.mapreduce.hadoop.MRConfig;
 import org.apache.tez.mapreduce.hadoop.MRTaskStatus;
 import org.apache.tez.mapreduce.hadoop.TezTaskUmbilicalProtocol;
-import org.apache.tez.mapreduce.hadoop.TezTypeConverters;
 import org.apache.tez.mapreduce.hadoop.mapred.TaskAttemptContextImpl;
 import org.apache.tez.mapreduce.hadoop.mapreduce.JobContextImpl;
 import org.apache.tez.mapreduce.hadoop.records.ProceedToCompletionResponse;
 import org.apache.tez.mapreduce.partition.MRPartitioner;
 
 public abstract class MRTask 
-extends org.apache.tez.common.TezTask {
+extends RunningTaskContext {
 
   static final Log LOG = LogFactory.getLog(MRTask.class);
   
@@ -95,7 +90,8 @@ extends org.apache.tez.common.TezTask {
   protected GcTimeUpdater gcUpdater;
   private ResourceCalculatorProcessTree pTree;
   private long initCpuCumulativeTime = 0;
-  protected TezEngineTask tezTaskContext;
+  protected TezEngineTaskContext tezTaskContext;
+  protected TezTaskAttemptID taskAttemptId;
   
   /* flag to track whether task is done */
   AtomicBoolean taskDone = new AtomicBoolean(false);
@@ -123,18 +119,16 @@ extends org.apache.tez.common.TezTask {
   private Map<String, FileSystemStatisticUpdater> statisticUpdaters =
      new HashMap<String, FileSystemStatisticUpdater>();
 
-  public MRTask(TezTask context) {
-    super(context.getTaskAttemptId(), context.getUser(), context.getJobName(),
-        ((TezEngineTask)context).getTaskModuleClassName());
-    
-    tezTaskContext = (TezEngineTask) context;
+  public MRTask(TezEngineTaskContext context) {
+    tezTaskContext = context;
+    this.taskAttemptId = context.getTaskAttemptId();
     // TODO TEZAM4 Figure out initialization / run sequence of Input, Process,
     // Output. Phase is MR specific.
     status =
         new MRTaskStatus(
-            getTaskAttemptId(),
+            taskAttemptId,
             counters,
-            (getTaskAttemptId().getTaskID().getVertexID().getId() == 0 ? 
+            (taskAttemptId.getTaskID().getVertexID().getId() == 0 ? 
                 Phase.MAP : Phase.SHUFFLE)
         );
     gcUpdater = new GcTimeUpdater(counters);
@@ -153,11 +147,11 @@ extends org.apache.tez.common.TezTask {
     ((TezTaskReporterImpl)reporter).startCommunicationThread();
     
     jobConf.set(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID, 
-        getTaskAttemptId().toString());
+        taskAttemptId.toString());
     
     initResourceCalculatorPlugin();
     
-    LOG.info("MRTask.inited: taskAttemptId = " + getTaskAttemptId().toString());
+    LOG.info("MRTask.inited: taskAttemptId = " + taskAttemptId.toString());
   }
 
   private void initResourceCalculatorPlugin() {
@@ -184,7 +178,7 @@ extends org.apache.tez.common.TezTask {
     this.jobConf = job;
     this.jobContext = new JobContextImpl(job, dagId, mrReporter);
     this.taskAttemptContext =
-        new TaskAttemptContextImpl(job, getTaskAttemptId(), mrReporter);
+        new TaskAttemptContextImpl(job, taskAttemptId, mrReporter);
     this.mrReporter = mrReporter;
 
     if (getState() == State.UNASSIGNED) {
@@ -299,7 +293,7 @@ extends org.apache.tez.common.TezTask {
 
   @Private
   public synchronized String getOutputName() {
-    return "part-" + NUMBER_FORMAT.format(getTaskAttemptId().getTaskID().getId());
+    return "part-" + NUMBER_FORMAT.format(taskAttemptId.getTaskID().getId());
   }
  
   public void waitBeforeCompletion(MRTaskReporter reporter) throws IOException,
@@ -310,7 +304,7 @@ extends org.apache.tez.common.TezTask {
     while (!readyToProceed) {
       try {
         ProceedToCompletionResponse response =
-            umbilical.proceedToCompletion(getTaskAttemptId());
+            umbilical.proceedToCompletion(taskAttemptId);
         if (LOG.isDebugEnabled()) {
           LOG.debug("Got readyToProceed: " + response);
         }
@@ -336,7 +330,7 @@ extends org.apache.tez.common.TezTask {
   public void outputReady(MRTaskReporter reporter, OutputContext outputContext)
       throws IOException,
       InterruptedException {
-    LOG.info("Task: " + getTaskAttemptId() + " reporting outputReady");
+    LOG.info("Task: " + taskAttemptId + " reporting outputReady");
     updateCounters();
     statusUpdate();
 
@@ -344,8 +338,8 @@ extends org.apache.tez.common.TezTask {
     int retries = MAX_RETRIES;
     while (true) {
       try {
-        umbilical.outputReady(getTaskAttemptId(), outputContext);
-        LOG.info("Task '" + getTaskAttemptId() + "' reported outputReady.");
+        umbilical.outputReady(taskAttemptId, outputContext);
+        LOG.info("Task '" + taskAttemptId + "' reported outputReady.");
         return;
       } catch (IOException ie) {
         LOG.warn("Failure signalling outputReady: " +
@@ -364,7 +358,7 @@ extends org.apache.tez.common.TezTask {
     updateCounters();
     if (outputContext != null) {
       LOG.info("Task: "
-          + getTaskAttemptId()
+          + taskAttemptId
           + " is done."
           + " And is in the process of sending output-context with shuffle port: "
           + outputContext.getShufflePort());
@@ -372,7 +366,7 @@ extends org.apache.tez.common.TezTask {
       waitBeforeCompletion(reporter);
     }
 
-    LOG.info("Task:" + getTaskAttemptId() + " is done."
+    LOG.info("Task:" + taskAttemptId + " is done."
         + " And is in the process of committing");
     TezTaskUmbilicalProtocol umbilical = getUmbilical();
     // TODO TEZ Interaciton between Commit and OutputReady. Merge ?
@@ -383,7 +377,7 @@ extends org.apache.tez.common.TezTask {
       // TODO TEZAM2 - Why is the commitRequired check missing ?
       while (true) {
         try {
-          umbilical.commitPending(getTaskAttemptId(), status);
+          umbilical.commitPending(taskAttemptId, status);
           break;
         } catch (InterruptedException ie) {
           // ignore
@@ -423,8 +417,8 @@ extends org.apache.tez.common.TezTask {
     int retries = MAX_RETRIES;
     while (true) {
       try {
-        if (!getUmbilical().statusUpdate(getTaskAttemptId(), status)) {
-          LOG.warn("Parent died.  Exiting " + getTaskAttemptId());
+        if (!getUmbilical().statusUpdate(taskAttemptId, status)) {
+          LOG.warn("Parent died.  Exiting " + taskAttemptId);
           System.exit(66);
         }
         status.clearStatus();
@@ -460,7 +454,7 @@ extends org.apache.tez.common.TezTask {
     int retries = MAX_RETRIES;
     while (true) {
       try {
-        while (!umbilical.canCommit(getTaskAttemptId())) {
+        while (!umbilical.canCommit(taskAttemptId)) {
           // This will loop till the AM asks for the task to be killed. As
           // against, the AM sending a signal to the task to kill itself
           // gracefully.
@@ -485,7 +479,7 @@ extends org.apache.tez.common.TezTask {
 
     // task can Commit now  
     try {
-      LOG.info("Task " + getTaskAttemptId() + " is allowed to commit now");
+      LOG.info("Task " + taskAttemptId + " is allowed to commit now");
       committer.commitTask(taskAttemptContext);
       return;
     } catch (IOException iee) {
@@ -512,8 +506,8 @@ extends org.apache.tez.common.TezTask {
     int retries = MAX_RETRIES;
     while (true) {
       try {
-        umbilical.done(getTaskAttemptId());
-        LOG.info("Task '" + getTaskAttemptId() + "' done.");
+        umbilical.done(taskAttemptId);
+        LOG.info("Task '" + taskAttemptId + "' done.");
         return;
       } catch (IOException ie) {
         LOG.warn("Failure signalling completion: " + 
@@ -683,11 +677,11 @@ extends org.apache.tez.common.TezTask {
 
   public void localizeConfiguration(JobConf jobConf) 
       throws IOException, InterruptedException {
-    jobConf.set(JobContext.TASK_ID, getTaskAttemptId().getTaskID().toString()); 
-    jobConf.set(JobContext.TASK_ATTEMPT_ID, getTaskAttemptId().toString());
+    jobConf.set(JobContext.TASK_ID, taskAttemptId.getTaskID().toString()); 
+    jobConf.set(JobContext.TASK_ATTEMPT_ID, taskAttemptId.toString());
     jobConf.setInt(JobContext.TASK_PARTITION, 
-        getTaskAttemptId().getTaskID().getId());
-    jobConf.set(JobContext.ID, getTaskAttemptId().getTaskID().getVertexID().getDAGId().toString());
+        taskAttemptId.getTaskID().getId());
+    jobConf.set(JobContext.ID, taskAttemptId.getTaskID().getVertexID().getDAGId().toString());
   }
 
   public abstract TezCounter getOutputRecordsCounter();
@@ -713,4 +707,8 @@ extends org.apache.tez.common.TezTask {
   public JobContext getJobContext() {
     return jobContext;
   }
+  
+  public TezTaskAttemptID getTaskAttemptId() {
+    return taskAttemptId;
+  }
 }

Modified: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/TezTaskReporterImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/TezTaskReporterImpl.java?rev=1470653&r1=1470652&r2=1470653&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/TezTaskReporterImpl.java (original)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/TezTaskReporterImpl.java Mon Apr 22 18:28:58 2013
@@ -29,7 +29,6 @@ import org.apache.hadoop.ipc.ProtocolSig
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.engine.records.TezTaskAttemptID;
 import org.apache.tez.engine.records.TezTaskDependencyCompletionEventsUpdate;

Modified: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java?rev=1470653&r1=1470652&r2=1470653&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java (original)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java Mon Apr 22 18:28:58 2013
@@ -36,7 +36,7 @@ import org.apache.hadoop.mapreduce.split
 import org.apache.hadoop.mapreduce.split.SplitMetaInfoReaderTez;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.common.TezTask;
+import org.apache.tez.common.TezEngineTaskContext;
 import org.apache.tez.common.TezTaskStatus;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
@@ -64,7 +64,7 @@ public class MapProcessor extends MRTask
 
   @Inject
   public MapProcessor(
-      @Assisted TezTask context
+      @Assisted TezEngineTaskContext context
       ) throws IOException {
     super(context);
   }
@@ -89,7 +89,8 @@ public class MapProcessor extends MRTask
           throws IOException, InterruptedException {
     MRTaskReporter reporter = new MRTaskReporter(getTaskReporter());
     boolean useNewApi = jobConf.getUseNewMapper();
-    initTask(jobConf, getDAGID(), reporter, useNewApi);
+    initTask(jobConf, taskAttemptId.getTaskID().getVertexID().getDAGId(),
+        reporter, useNewApi);
 
     if (in instanceof SimpleInput) {
       ((SimpleInput)in).setTask(this);
@@ -176,7 +177,7 @@ public class MapProcessor extends MRTask
       ) throws IOException, InterruptedException {
     // make a task context so we can get the classes
     org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
-        new TaskAttemptContextImpl(job, getTaskAttemptId(), reporter);
+        new TaskAttemptContextImpl(job, taskAttemptId, reporter);
 
     // make a mapper
     org.apache.hadoop.mapreduce.Mapper mapper;
@@ -202,7 +203,7 @@ public class MapProcessor extends MRTask
     org.apache.hadoop.mapreduce.MapContext 
     mapContext = 
     new org.apache.tez.mapreduce.hadoop.mapreduce.MapContextImpl(
-        job, IDConverter.toMRTaskAttemptId(getTaskAttemptId()), 
+        job, IDConverter.toMRTaskAttemptId(taskAttemptId), 
         input, output, 
         getCommitter(), 
         reporter, split);