You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/09/20 01:36:22 UTC

git commit: TEZ-468. Add log messages to new engine impl to aid in debugging. (hitesh)

Updated Branches:
  refs/heads/TEZ-398 1eea4b6a8 -> bda095c73


TEZ-468. Add log messages to new engine impl to aid in debugging. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/bda095c7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/bda095c7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/bda095c7

Branch: refs/heads/TEZ-398
Commit: bda095c73661d4f774697671ed5dc5c32b39d8ce
Parents: 1eea4b6
Author: Hitesh Shah <hi...@apache.org>
Authored: Thu Sep 19 16:35:43 2013 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Thu Sep 19 16:35:43 2013 -0700

----------------------------------------------------------------------
 .../org/apache/tez/common/TezJobConfig.java     |  3 --
 .../apache/hadoop/mapred/YarnTezDagChild.java   | 31 ++++++++---
 .../dag/app/TaskAttemptListenerImpTezDag.java   |  5 ++
 .../tez/engine/newapi/impl/InputSpec.java       |  7 +++
 .../tez/engine/newapi/impl/OutputSpec.java      | 16 ++++--
 .../engine/newapi/impl/TezHeartbeatRequest.java | 13 ++++-
 .../newapi/impl/TezHeartbeatResponse.java       | 14 +++--
 .../LogicalIOProcessorRuntimeTask.java          | 55 +++++++++++++++-----
 .../tez/mapreduce/newoutput/SimpleOutput.java   |  2 -
 .../tez/mapreduce/newprocessor/MRTask.java      |  2 -
 .../tez/mapreduce/task/MRRuntimeTask.java       |  2 +-
 .../processor/map/TestMapProcessor.java         |  3 +-
 .../processor/reduce/TestReduceProcessor.java   |  3 +-
 13 files changed, 117 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bda095c7/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java b/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
index af53bca..12c2b4b 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
@@ -306,7 +306,4 @@ public class TezJobConfig {
    */
   public static final String DAG_CREDENTIALS_BINARY =  "tez.dag.credentials.binary";
   
-  
-  public static final String APPLICATION_ATTEMPT_ID =
-      "tez.job.application.attempt.id";
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bda095c7/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
index dd757c7..6e4e418 100644
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
+++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
@@ -125,18 +125,13 @@ public class YarnTezDagChild {
               if(!heartbeat()) {
                 return;
               }
-            } catch (TezException e) {
-              LOG.error("Error communicating with AM: " + e.getMessage() , e);
-              heartbeatErrorException = e;
-              heartbeatError.set(true);
-              return;
             } catch (InvalidToken e) {
-              LOG.error("Error in authenticating with AM: ", e);
+              LOG.error("Heartbeat error in authenticating with AM: ", e);
               heartbeatErrorException = e;
               heartbeatError.set(true);
               return;
-            } catch (Exception e) {
-              LOG.error("Error in heartbeating with AM. ", e);
+            } catch (Throwable e) {
+              LOG.error("Heartbeat error in communicating with AM. ", e);
               heartbeatErrorException = e;
               heartbeatError.set(true);
               return;
@@ -194,8 +189,17 @@ public class YarnTezDagChild {
     long reqId = requestCounter.incrementAndGet();
     TezHeartbeatRequest request = new TezHeartbeatRequest(reqId, events,
         containerIdStr, taskAttemptID, eventCounter, eventsRange);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Sending heartbeat to AM"
+          + ", request=" + request.toString());
+    }
     TezHeartbeatResponse response = umbilical.heartbeat(request);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Received heartbeat response from AM"
+          + ", response=" + response);
+    }
     if(response.shouldDie()) {
+      LOG.info("Received should die response from AM");
       return false;
     }
     if (response.getLastRequestId() != reqId) {
@@ -207,9 +211,20 @@ public class YarnTezDagChild {
       taskLock.readLock().lock();
       if (taskAttemptID == null
           || !taskAttemptID.equals(currentTaskAttemptID)) {
+        if (response.getEvents() != null
+            && !response.getEvents().isEmpty()) {
+          LOG.warn("No current assigned task, ignoring all events in"
+              + " heartbeat response, eventCount="
+              + response.getEvents().size());
+        }
         return true;
       }
       if (currentTask != null && response.getEvents() != null) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Routing events from heartbeat response to task"
+              + ", currentTaskAttemptId=" + currentTaskAttemptID
+              + ", eventCount=" + response.getEvents().size());
+        }
         currentTask.handleEvents(response.getEvents());
       }
     } finally {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bda095c7/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index c89c840..8c29fd9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -552,6 +552,11 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
     ContainerId containerId = ConverterUtils.toContainerId(request
         .getContainerIdentifier());
     long requestId = request.getRequestId();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Received heartbeat from container"
+          + ", request=" + request);
+    }
+
     ContainerInfo containerInfo = registeredContainers.get(containerId);
     if(containerInfo == null) {
       TezHeartbeatResponse response = new TezHeartbeatResponse();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bda095c7/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/InputSpec.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/InputSpec.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/InputSpec.java
index c9870f1..a2b8cc8 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/InputSpec.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/InputSpec.java
@@ -78,4 +78,11 @@ public class InputSpec implements Writable {
             TezEntityDescriptorProto.parseFrom(inputDescBytes));
   }
 
+  public String toString() {
+    return "{ sourceVertexName=" + sourceVertexName
+        + ", physicalEdgeCount" + physicalEdgeCount
+        + ", inputClassName=" + inputDescriptor.getClassName()
+        + " }";
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bda095c7/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/OutputSpec.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/OutputSpec.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/OutputSpec.java
index 4e6fa64..1b34ef0 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/OutputSpec.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/OutputSpec.java
@@ -30,7 +30,7 @@ import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
 public class OutputSpec implements Writable {
 
   private String destinationVertexName;
-  private OutputDescriptor inputDescriptor;
+  private OutputDescriptor outputDescriptor;
   private int physicalEdgeCount;
 
   public OutputSpec() {
@@ -39,7 +39,7 @@ public class OutputSpec implements Writable {
   public OutputSpec(String destinationVertexName,
       OutputDescriptor inputDescriptor, int physicalEdgeCount) {
     this.destinationVertexName = destinationVertexName;
-    this.inputDescriptor = inputDescriptor;
+    this.outputDescriptor = inputDescriptor;
     this.physicalEdgeCount = physicalEdgeCount;
   }
 
@@ -48,7 +48,7 @@ public class OutputSpec implements Writable {
   }
 
   public OutputDescriptor getOutputDescriptor() {
-    return inputDescriptor;
+    return outputDescriptor;
   }
 
   public int getPhysicalEdgeCount() {
@@ -61,7 +61,7 @@ public class OutputSpec implements Writable {
     out.writeUTF(destinationVertexName);
     out.writeInt(physicalEdgeCount);
     byte[] inputDescBytes =
-        DagTypeConverters.convertToDAGPlan(inputDescriptor).toByteArray();
+        DagTypeConverters.convertToDAGPlan(outputDescriptor).toByteArray();
     out.writeInt(inputDescBytes.length);
     out.write(inputDescBytes);
   }
@@ -73,9 +73,15 @@ public class OutputSpec implements Writable {
     int inputDescLen = in.readInt();
     byte[] inputDescBytes = new byte[inputDescLen];
     in.readFully(inputDescBytes);
-    inputDescriptor =
+    outputDescriptor =
         DagTypeConverters.convertOutputDescriptorFromDAGPlan(
             TezEntityDescriptorProto.parseFrom(inputDescBytes));
   }
 
+  public String toString() {
+    return "{ destinationVertexName=" + destinationVertexName
+        + ", physicalEdgeCount" + physicalEdgeCount
+        + ", outputClassName=" + outputDescriptor.getClassName()
+        + " }";
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bda095c7/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatRequest.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatRequest.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatRequest.java
index 462423b..79a0968 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatRequest.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatRequest.java
@@ -56,7 +56,7 @@ public class TezHeartbeatRequest implements Writable {
   public String getContainerIdentifier() {
     return containerIdentifier;
   }
-  
+
   public List<TezEvent> getEvents() {
     return events;
   }
@@ -123,4 +123,15 @@ public class TezHeartbeatRequest implements Writable {
     containerIdentifier = Text.readString(in);
   }
 
+  @Override
+  public String toString() {
+    return "{ "
+        + " containerId=" + containerIdentifier
+        + ", requestId=" + requestId
+        + ", startIndex=" + startIndex
+        + ", maxEventsToGet=" + maxEvents
+        + ", taskAttemptId" + currentTaskAttemptID
+        + ", eventCount=" + (events != null ? events.size() : 0)
+        + " }";
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bda095c7/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatResponse.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatResponse.java
index 0f17311..addd17f 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatResponse.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatResponse.java
@@ -43,7 +43,7 @@ public class TezHeartbeatResponse implements Writable {
   public List<TezEvent> getEvents() {
     return events;
   }
-  
+
   public boolean shouldDie() {
     return shouldDie;
   }
@@ -53,13 +53,13 @@ public class TezHeartbeatResponse implements Writable {
   }
 
   public void setEvents(List<TezEvent> events) {
-    this.events = events;
+    this.events = Collections.unmodifiableList(events);
   }
 
   public void setLastRequestId(long lastRequestId ) {
     this.lastRequestId = lastRequestId;
   }
-  
+
   public void setShouldDie() {
     this.shouldDie = true;
   }
@@ -94,4 +94,12 @@ public class TezHeartbeatResponse implements Writable {
     }
   }
 
+  @Override
+  public String toString() {
+    return "{ "
+        + " lastRequestId=" + lastRequestId
+        + ", shouldDie=" + shouldDie
+        + ", eventCount=" + (events != null ? events.size() : 0)
+        + " }";
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bda095c7/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
index ef092ad..eb055b6 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
@@ -110,6 +110,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   }
 
   public void initialize() throws Exception {
+    LOG.info("Initializing LogicalProcessorIORuntimeTask");
     Preconditions.checkState(this.state == State.NEW, "Already initialized");
     this.state = State.INITED;
     inputMap = new LinkedHashMap<String, LogicalInput>(inputs.size());
@@ -188,6 +189,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
       ((LogicalInput) input).setNumPhysicalInputs(inputSpec
           .getPhysicalEdgeCount());
     }
+    LOG.info("Initializing Input using InputSpec: " + inputSpec);
     List<Event> events = input.initialize(tezInputContext);
     sendTaskGeneratedEvents(events, EventProducerConsumerType.INPUT,
         tezInputContext.getTaskVertexName(),
@@ -201,6 +203,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
       ((LogicalOutput) output).setNumPhysicalOutputs(outputSpec
           .getPhysicalEdgeCount());
     }
+    LOG.info("Initializing Output using OutputSpec: " + outputSpec);
     List<Event> events = output.initialize(tezOutputContext);
     sendTaskGeneratedEvents(events, EventProducerConsumerType.OUTPUT,
         tezOutputContext.getTaskVertexName(),
@@ -209,6 +212,8 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   }
 
   private void initializeLogicalIOProcessor() throws Exception {
+    LOG.info("Initializing processor"
+        + ", processorClassName=" + processorDescriptor.getClassName());
     TezProcessorContext processorContext = createProcessorContext();
     processor.initialize(processorContext);
   }
@@ -248,6 +253,8 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   private List<LogicalInput> createInputs(List<InputSpec> inputSpecs) {
     List<LogicalInput> inputs = new ArrayList<LogicalInput>(inputSpecs.size());
     for (InputSpec inputSpec : inputSpecs) {
+      LOG.info("Creating Input from InputSpec: "
+          + inputSpec);
       Input input = RuntimeUtils.createClazzInstance(inputSpec
           .getInputDescriptor().getClassName());
 
@@ -266,6 +273,8 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     List<LogicalOutput> outputs = new ArrayList<LogicalOutput>(
         outputSpecs.size());
     for (OutputSpec outputSpec : outputSpecs) {
+      LOG.info("Creating Output from OutputSpec"
+          + outputSpec);
       Output output = RuntimeUtils.createClazzInstance(outputSpec
           .getOutputDescriptor().getClassName());
       if (output instanceof LogicalOutput) {
@@ -294,19 +303,34 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   private void sendTaskGeneratedEvents(List<Event> events,
       EventProducerConsumerType generator, String taskVertexName,
       String edgeVertexName, TezTaskAttemptID taskAttemptID) {
-    if (events != null && events.size() > 0) {
-      EventMetaData eventMetaData = new EventMetaData(generator,
-          taskVertexName, edgeVertexName, taskAttemptID);
-      List<TezEvent> tezEvents = new ArrayList<TezEvent>(events.size());
-      for (Event e : events) {
-        TezEvent te = new TezEvent(e, eventMetaData);
-        tezEvents.add(te);
+    if (events == null || events.isEmpty()) {
+      return;
+    }
+    EventMetaData eventMetaData = new EventMetaData(generator,
+        taskVertexName, edgeVertexName, taskAttemptID);
+    List<TezEvent> tezEvents = new ArrayList<TezEvent>(events.size());
+    for (Event e : events) {
+      TezEvent te = new TezEvent(e, eventMetaData);
+      tezEvents.add(te);
+    }
+    if (LOG.isDebugEnabled()) {
+      for (TezEvent e : tezEvents) {
+        LOG.debug("Generated event info"
+            + ", eventMetaData=" + eventMetaData.toString()
+            + ", eventType=" + e.getEventType());
       }
-      tezUmbilical.addEvents(tezEvents);
     }
+    tezUmbilical.addEvents(tezEvents);
   }
 
   private boolean handleEvent(TezEvent e) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Handling TezEvent in task"
+          + ", taskAttemptId=" + taskSpec.getTaskAttemptID()
+          + ", eventType=" + e.getEventType()
+          + ", eventSourceInfo=" + e.getSourceInfo()
+          + ", eventDestinationInfo=" + e.getDestinationInfo());
+    }
     try {
       switch (e.getDestinationInfo().getEventGenerator()) {
       case INPUT:
@@ -352,10 +376,17 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
 
   @Override
   public synchronized void handleEvents(Collection<TezEvent> events) {
-    if (!(events == null || events.size() == 0)) {
-      eventsToBeProcessed.addAll(events);
-      eventCounter.addAndGet(events.size());
+    if (events == null || events.isEmpty()) {
+      return;
+    }
+    eventCounter.addAndGet(events.size());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Received events to be processed by task"
+          + ", taskAttemptId=" + taskSpec.getTaskAttemptID()
+          + ", eventCount=" + events.size()
+          + ", newEventCounter=" + eventCounter.get());
     }
+    eventsToBeProcessed.addAll(events);
   }
 
   private void startRouterThread() {
@@ -371,7 +402,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
             if (!handleEvent(e)) {
               LOG.warn("Stopping Event Router thread as failed to handle"
                   + " event: " + e);
-              break;
+              return;
             }
           } catch (InterruptedException e) {
             if (!isTaskDone()) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bda095c7/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newoutput/SimpleOutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newoutput/SimpleOutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newoutput/SimpleOutput.java
index 9344b53..d00ffc0 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newoutput/SimpleOutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newoutput/SimpleOutput.java
@@ -84,8 +84,6 @@ public class SimpleOutput implements LogicalOutput {
     this.useNewApi = this.jobConf.getUseNewMapper();
     this.isMapperOutput = jobConf.getBoolean(MRConfig.IS_MAP_PROCESSOR,
         false);
-    jobConf.setInt(TezJobConfig.APPLICATION_ATTEMPT_ID,
-        outputContext.getDAGAttemptNumber());
     jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
         outputContext.getDAGAttemptNumber());
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bda095c7/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/MRTask.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/MRTask.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/MRTask.java
index 28bd327..2db823d 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/MRTask.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/MRTask.java
@@ -161,8 +161,6 @@ public abstract class MRTask {
     }
     jobConf.set(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID,
         taskAttemptId.toString());
-    jobConf.setInt(TezJobConfig.APPLICATION_ATTEMPT_ID,
-        context.getDAGAttemptNumber());
     jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
         context.getDAGAttemptNumber());
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bda095c7/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/MRRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/MRRuntimeTask.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/MRRuntimeTask.java
index 917ecc0..8a4c6c1 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/MRRuntimeTask.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/MRRuntimeTask.java
@@ -185,7 +185,7 @@ public class MRRuntimeTask extends RuntimeTask {
     // Containers.
     // Set it in conf, so as to be able to be used the the OutputCommitter.
     job.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
-        job.getInt(TezJobConfig.APPLICATION_ATTEMPT_ID, -1));
+        job.getInt(MRJobConfig.APPLICATION_ATTEMPT_ID, -1));
 
     job.setClass(MRConfig.TASK_LOCAL_OUTPUT_CLASS, YarnOutputFiles.class,
         MapOutputFile.class); // MR

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bda095c7/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
index a9862c1..f5d0b02 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
@@ -40,6 +40,7 @@ import org.apache.tez.engine.common.task.local.output.TezLocalTaskOutputFiles;
 import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
 import org.apache.tez.engine.lib.oldoutput.OldLocalOnFileSorterOutput;
 import org.apache.tez.mapreduce.TestUmbilicalProtocol;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
 import org.apache.tez.mapreduce.hadoop.mapreduce.TezNullOutputCommitter;
@@ -100,7 +101,7 @@ public class TestMapProcessor {
     mapOutputs.setConf(jobConf);
 
     Configuration conf = MultiStageMRConfToTezTranslator.convertMRToLinearTez(jobConf);
-    conf.setInt(TezJobConfig.APPLICATION_ATTEMPT_ID, 0);
+    conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0);
 
     Configuration stageConf = MultiStageMRConfigUtil.getConfForVertex(conf,
         vertexName);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bda095c7/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
index 0c6a4c5..7ed18d6 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
@@ -46,6 +46,7 @@ import org.apache.tez.engine.runtime.RuntimeUtils;
 import org.apache.tez.mapreduce.TestUmbilicalProtocol;
 import org.apache.tez.mapreduce.TezTestUtils;
 import org.apache.tez.mapreduce.hadoop.IDConverter;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
 import org.apache.tez.mapreduce.newinput.SimpleInputLegacy;
@@ -106,7 +107,7 @@ public class TestReduceProcessor {
     mapOutputs.setConf(jobConf);
     
     Configuration conf = MultiStageMRConfToTezTranslator.convertMRToLinearTez(jobConf);
-    conf.setInt(TezJobConfig.APPLICATION_ATTEMPT_ID, 0);
+    conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0);
     Configuration mapStageConf = MultiStageMRConfigUtil.getConfForVertex(conf,
         mapVertexName);