You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/09/20 01:36:22 UTC
git commit: TEZ-468. Add log messages to new engine impl to aid in
debugging. (hitesh)
Updated Branches:
refs/heads/TEZ-398 1eea4b6a8 -> bda095c73
TEZ-468. Add log messages to new engine impl to aid in debugging. (hitesh)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/bda095c7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/bda095c7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/bda095c7
Branch: refs/heads/TEZ-398
Commit: bda095c73661d4f774697671ed5dc5c32b39d8ce
Parents: 1eea4b6
Author: Hitesh Shah <hi...@apache.org>
Authored: Thu Sep 19 16:35:43 2013 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Thu Sep 19 16:35:43 2013 -0700
----------------------------------------------------------------------
.../org/apache/tez/common/TezJobConfig.java | 3 --
.../apache/hadoop/mapred/YarnTezDagChild.java | 31 ++++++++---
.../dag/app/TaskAttemptListenerImpTezDag.java | 5 ++
.../tez/engine/newapi/impl/InputSpec.java | 7 +++
.../tez/engine/newapi/impl/OutputSpec.java | 16 ++++--
.../engine/newapi/impl/TezHeartbeatRequest.java | 13 ++++-
.../newapi/impl/TezHeartbeatResponse.java | 14 +++--
.../LogicalIOProcessorRuntimeTask.java | 55 +++++++++++++++-----
.../tez/mapreduce/newoutput/SimpleOutput.java | 2 -
.../tez/mapreduce/newprocessor/MRTask.java | 2 -
.../tez/mapreduce/task/MRRuntimeTask.java | 2 +-
.../processor/map/TestMapProcessor.java | 3 +-
.../processor/reduce/TestReduceProcessor.java | 3 +-
13 files changed, 117 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bda095c7/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java b/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
index af53bca..12c2b4b 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
@@ -306,7 +306,4 @@ public class TezJobConfig {
*/
public static final String DAG_CREDENTIALS_BINARY = "tez.dag.credentials.binary";
-
- public static final String APPLICATION_ATTEMPT_ID =
- "tez.job.application.attempt.id";
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bda095c7/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
index dd757c7..6e4e418 100644
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
+++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
@@ -125,18 +125,13 @@ public class YarnTezDagChild {
if(!heartbeat()) {
return;
}
- } catch (TezException e) {
- LOG.error("Error communicating with AM: " + e.getMessage() , e);
- heartbeatErrorException = e;
- heartbeatError.set(true);
- return;
} catch (InvalidToken e) {
- LOG.error("Error in authenticating with AM: ", e);
+ LOG.error("Heartbeat error in authenticating with AM: ", e);
heartbeatErrorException = e;
heartbeatError.set(true);
return;
- } catch (Exception e) {
- LOG.error("Error in heartbeating with AM. ", e);
+ } catch (Throwable e) {
+ LOG.error("Heartbeat error in communicating with AM. ", e);
heartbeatErrorException = e;
heartbeatError.set(true);
return;
@@ -194,8 +189,17 @@ public class YarnTezDagChild {
long reqId = requestCounter.incrementAndGet();
TezHeartbeatRequest request = new TezHeartbeatRequest(reqId, events,
containerIdStr, taskAttemptID, eventCounter, eventsRange);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Sending heartbeat to AM"
+ + ", request=" + request.toString());
+ }
TezHeartbeatResponse response = umbilical.heartbeat(request);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received heartbeat response from AM"
+ + ", response=" + response);
+ }
if(response.shouldDie()) {
+ LOG.info("Received should die response from AM");
return false;
}
if (response.getLastRequestId() != reqId) {
@@ -207,9 +211,20 @@ public class YarnTezDagChild {
taskLock.readLock().lock();
if (taskAttemptID == null
|| !taskAttemptID.equals(currentTaskAttemptID)) {
+ if (response.getEvents() != null
+ && !response.getEvents().isEmpty()) {
+ LOG.warn("No current assigned task, ignoring all events in"
+ + " heartbeat response, eventCount="
+ + response.getEvents().size());
+ }
return true;
}
if (currentTask != null && response.getEvents() != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Routing events from heartbeat response to task"
+ + ", currentTaskAttemptId=" + currentTaskAttemptID
+ + ", eventCount=" + response.getEvents().size());
+ }
currentTask.handleEvents(response.getEvents());
}
} finally {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bda095c7/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index c89c840..8c29fd9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -552,6 +552,11 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
ContainerId containerId = ConverterUtils.toContainerId(request
.getContainerIdentifier());
long requestId = request.getRequestId();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received heartbeat from container"
+ + ", request=" + request);
+ }
+
ContainerInfo containerInfo = registeredContainers.get(containerId);
if(containerInfo == null) {
TezHeartbeatResponse response = new TezHeartbeatResponse();
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bda095c7/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/InputSpec.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/InputSpec.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/InputSpec.java
index c9870f1..a2b8cc8 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/InputSpec.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/InputSpec.java
@@ -78,4 +78,11 @@ public class InputSpec implements Writable {
TezEntityDescriptorProto.parseFrom(inputDescBytes));
}
+ public String toString() {
+ return "{ sourceVertexName=" + sourceVertexName
+ + ", physicalEdgeCount" + physicalEdgeCount
+ + ", inputClassName=" + inputDescriptor.getClassName()
+ + " }";
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bda095c7/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/OutputSpec.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/OutputSpec.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/OutputSpec.java
index 4e6fa64..1b34ef0 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/OutputSpec.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/OutputSpec.java
@@ -30,7 +30,7 @@ import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
public class OutputSpec implements Writable {
private String destinationVertexName;
- private OutputDescriptor inputDescriptor;
+ private OutputDescriptor outputDescriptor;
private int physicalEdgeCount;
public OutputSpec() {
@@ -39,7 +39,7 @@ public class OutputSpec implements Writable {
public OutputSpec(String destinationVertexName,
OutputDescriptor inputDescriptor, int physicalEdgeCount) {
this.destinationVertexName = destinationVertexName;
- this.inputDescriptor = inputDescriptor;
+ this.outputDescriptor = inputDescriptor;
this.physicalEdgeCount = physicalEdgeCount;
}
@@ -48,7 +48,7 @@ public class OutputSpec implements Writable {
}
public OutputDescriptor getOutputDescriptor() {
- return inputDescriptor;
+ return outputDescriptor;
}
public int getPhysicalEdgeCount() {
@@ -61,7 +61,7 @@ public class OutputSpec implements Writable {
out.writeUTF(destinationVertexName);
out.writeInt(physicalEdgeCount);
byte[] inputDescBytes =
- DagTypeConverters.convertToDAGPlan(inputDescriptor).toByteArray();
+ DagTypeConverters.convertToDAGPlan(outputDescriptor).toByteArray();
out.writeInt(inputDescBytes.length);
out.write(inputDescBytes);
}
@@ -73,9 +73,15 @@ public class OutputSpec implements Writable {
int inputDescLen = in.readInt();
byte[] inputDescBytes = new byte[inputDescLen];
in.readFully(inputDescBytes);
- inputDescriptor =
+ outputDescriptor =
DagTypeConverters.convertOutputDescriptorFromDAGPlan(
TezEntityDescriptorProto.parseFrom(inputDescBytes));
}
+ public String toString() {
+ return "{ destinationVertexName=" + destinationVertexName
+ + ", physicalEdgeCount" + physicalEdgeCount
+ + ", outputClassName=" + outputDescriptor.getClassName()
+ + " }";
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bda095c7/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatRequest.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatRequest.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatRequest.java
index 462423b..79a0968 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatRequest.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatRequest.java
@@ -56,7 +56,7 @@ public class TezHeartbeatRequest implements Writable {
public String getContainerIdentifier() {
return containerIdentifier;
}
-
+
public List<TezEvent> getEvents() {
return events;
}
@@ -123,4 +123,15 @@ public class TezHeartbeatRequest implements Writable {
containerIdentifier = Text.readString(in);
}
+ @Override
+ public String toString() {
+ return "{ "
+ + " containerId=" + containerIdentifier
+ + ", requestId=" + requestId
+ + ", startIndex=" + startIndex
+ + ", maxEventsToGet=" + maxEvents
+ + ", taskAttemptId" + currentTaskAttemptID
+ + ", eventCount=" + (events != null ? events.size() : 0)
+ + " }";
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bda095c7/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatResponse.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatResponse.java
index 0f17311..addd17f 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatResponse.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatResponse.java
@@ -43,7 +43,7 @@ public class TezHeartbeatResponse implements Writable {
public List<TezEvent> getEvents() {
return events;
}
-
+
public boolean shouldDie() {
return shouldDie;
}
@@ -53,13 +53,13 @@ public class TezHeartbeatResponse implements Writable {
}
public void setEvents(List<TezEvent> events) {
- this.events = events;
+ this.events = Collections.unmodifiableList(events);
}
public void setLastRequestId(long lastRequestId ) {
this.lastRequestId = lastRequestId;
}
-
+
public void setShouldDie() {
this.shouldDie = true;
}
@@ -94,4 +94,12 @@ public class TezHeartbeatResponse implements Writable {
}
}
+ @Override
+ public String toString() {
+ return "{ "
+ + " lastRequestId=" + lastRequestId
+ + ", shouldDie=" + shouldDie
+ + ", eventCount=" + (events != null ? events.size() : 0)
+ + " }";
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bda095c7/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
index ef092ad..eb055b6 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
@@ -110,6 +110,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
}
public void initialize() throws Exception {
+ LOG.info("Initializing LogicalProcessorIORuntimeTask");
Preconditions.checkState(this.state == State.NEW, "Already initialized");
this.state = State.INITED;
inputMap = new LinkedHashMap<String, LogicalInput>(inputs.size());
@@ -188,6 +189,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
((LogicalInput) input).setNumPhysicalInputs(inputSpec
.getPhysicalEdgeCount());
}
+ LOG.info("Initializing Input using InputSpec: " + inputSpec);
List<Event> events = input.initialize(tezInputContext);
sendTaskGeneratedEvents(events, EventProducerConsumerType.INPUT,
tezInputContext.getTaskVertexName(),
@@ -201,6 +203,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
((LogicalOutput) output).setNumPhysicalOutputs(outputSpec
.getPhysicalEdgeCount());
}
+ LOG.info("Initializing Output using OutputSpec: " + outputSpec);
List<Event> events = output.initialize(tezOutputContext);
sendTaskGeneratedEvents(events, EventProducerConsumerType.OUTPUT,
tezOutputContext.getTaskVertexName(),
@@ -209,6 +212,8 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
}
private void initializeLogicalIOProcessor() throws Exception {
+ LOG.info("Initializing processor"
+ + ", processorClassName=" + processorDescriptor.getClassName());
TezProcessorContext processorContext = createProcessorContext();
processor.initialize(processorContext);
}
@@ -248,6 +253,8 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
private List<LogicalInput> createInputs(List<InputSpec> inputSpecs) {
List<LogicalInput> inputs = new ArrayList<LogicalInput>(inputSpecs.size());
for (InputSpec inputSpec : inputSpecs) {
+ LOG.info("Creating Input from InputSpec: "
+ + inputSpec);
Input input = RuntimeUtils.createClazzInstance(inputSpec
.getInputDescriptor().getClassName());
@@ -266,6 +273,8 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
List<LogicalOutput> outputs = new ArrayList<LogicalOutput>(
outputSpecs.size());
for (OutputSpec outputSpec : outputSpecs) {
+ LOG.info("Creating Output from OutputSpec"
+ + outputSpec);
Output output = RuntimeUtils.createClazzInstance(outputSpec
.getOutputDescriptor().getClassName());
if (output instanceof LogicalOutput) {
@@ -294,19 +303,34 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
private void sendTaskGeneratedEvents(List<Event> events,
EventProducerConsumerType generator, String taskVertexName,
String edgeVertexName, TezTaskAttemptID taskAttemptID) {
- if (events != null && events.size() > 0) {
- EventMetaData eventMetaData = new EventMetaData(generator,
- taskVertexName, edgeVertexName, taskAttemptID);
- List<TezEvent> tezEvents = new ArrayList<TezEvent>(events.size());
- for (Event e : events) {
- TezEvent te = new TezEvent(e, eventMetaData);
- tezEvents.add(te);
+ if (events == null || events.isEmpty()) {
+ return;
+ }
+ EventMetaData eventMetaData = new EventMetaData(generator,
+ taskVertexName, edgeVertexName, taskAttemptID);
+ List<TezEvent> tezEvents = new ArrayList<TezEvent>(events.size());
+ for (Event e : events) {
+ TezEvent te = new TezEvent(e, eventMetaData);
+ tezEvents.add(te);
+ }
+ if (LOG.isDebugEnabled()) {
+ for (TezEvent e : tezEvents) {
+ LOG.debug("Generated event info"
+ + ", eventMetaData=" + eventMetaData.toString()
+ + ", eventType=" + e.getEventType());
}
- tezUmbilical.addEvents(tezEvents);
}
+ tezUmbilical.addEvents(tezEvents);
}
private boolean handleEvent(TezEvent e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Handling TezEvent in task"
+ + ", taskAttemptId=" + taskSpec.getTaskAttemptID()
+ + ", eventType=" + e.getEventType()
+ + ", eventSourceInfo=" + e.getSourceInfo()
+ + ", eventDestinationInfo=" + e.getDestinationInfo());
+ }
try {
switch (e.getDestinationInfo().getEventGenerator()) {
case INPUT:
@@ -352,10 +376,17 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
@Override
public synchronized void handleEvents(Collection<TezEvent> events) {
- if (!(events == null || events.size() == 0)) {
- eventsToBeProcessed.addAll(events);
- eventCounter.addAndGet(events.size());
+ if (events == null || events.isEmpty()) {
+ return;
+ }
+ eventCounter.addAndGet(events.size());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received events to be processed by task"
+ + ", taskAttemptId=" + taskSpec.getTaskAttemptID()
+ + ", eventCount=" + events.size()
+ + ", newEventCounter=" + eventCounter.get());
}
+ eventsToBeProcessed.addAll(events);
}
private void startRouterThread() {
@@ -371,7 +402,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
if (!handleEvent(e)) {
LOG.warn("Stopping Event Router thread as failed to handle"
+ " event: " + e);
- break;
+ return;
}
} catch (InterruptedException e) {
if (!isTaskDone()) {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bda095c7/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newoutput/SimpleOutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newoutput/SimpleOutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newoutput/SimpleOutput.java
index 9344b53..d00ffc0 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newoutput/SimpleOutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newoutput/SimpleOutput.java
@@ -84,8 +84,6 @@ public class SimpleOutput implements LogicalOutput {
this.useNewApi = this.jobConf.getUseNewMapper();
this.isMapperOutput = jobConf.getBoolean(MRConfig.IS_MAP_PROCESSOR,
false);
- jobConf.setInt(TezJobConfig.APPLICATION_ATTEMPT_ID,
- outputContext.getDAGAttemptNumber());
jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
outputContext.getDAGAttemptNumber());
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bda095c7/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/MRTask.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/MRTask.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/MRTask.java
index 28bd327..2db823d 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/MRTask.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/MRTask.java
@@ -161,8 +161,6 @@ public abstract class MRTask {
}
jobConf.set(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID,
taskAttemptId.toString());
- jobConf.setInt(TezJobConfig.APPLICATION_ATTEMPT_ID,
- context.getDAGAttemptNumber());
jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
context.getDAGAttemptNumber());
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bda095c7/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/MRRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/MRRuntimeTask.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/MRRuntimeTask.java
index 917ecc0..8a4c6c1 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/MRRuntimeTask.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/MRRuntimeTask.java
@@ -185,7 +185,7 @@ public class MRRuntimeTask extends RuntimeTask {
// Containers.
// Set it in conf, so as to be able to be used the the OutputCommitter.
job.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
- job.getInt(TezJobConfig.APPLICATION_ATTEMPT_ID, -1));
+ job.getInt(MRJobConfig.APPLICATION_ATTEMPT_ID, -1));
job.setClass(MRConfig.TASK_LOCAL_OUTPUT_CLASS, YarnOutputFiles.class,
MapOutputFile.class); // MR
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bda095c7/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
index a9862c1..f5d0b02 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
@@ -40,6 +40,7 @@ import org.apache.tez.engine.common.task.local.output.TezLocalTaskOutputFiles;
import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
import org.apache.tez.engine.lib.oldoutput.OldLocalOnFileSorterOutput;
import org.apache.tez.mapreduce.TestUmbilicalProtocol;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
import org.apache.tez.mapreduce.hadoop.mapreduce.TezNullOutputCommitter;
@@ -100,7 +101,7 @@ public class TestMapProcessor {
mapOutputs.setConf(jobConf);
Configuration conf = MultiStageMRConfToTezTranslator.convertMRToLinearTez(jobConf);
- conf.setInt(TezJobConfig.APPLICATION_ATTEMPT_ID, 0);
+ conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0);
Configuration stageConf = MultiStageMRConfigUtil.getConfForVertex(conf,
vertexName);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bda095c7/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
index 0c6a4c5..7ed18d6 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
@@ -46,6 +46,7 @@ import org.apache.tez.engine.runtime.RuntimeUtils;
import org.apache.tez.mapreduce.TestUmbilicalProtocol;
import org.apache.tez.mapreduce.TezTestUtils;
import org.apache.tez.mapreduce.hadoop.IDConverter;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
import org.apache.tez.mapreduce.newinput.SimpleInputLegacy;
@@ -106,7 +107,7 @@ public class TestReduceProcessor {
mapOutputs.setConf(jobConf);
Configuration conf = MultiStageMRConfToTezTranslator.convertMRToLinearTez(jobConf);
- conf.setInt(TezJobConfig.APPLICATION_ATTEMPT_ID, 0);
+ conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0);
Configuration mapStageConf = MultiStageMRConfigUtil.getConfForVertex(conf,
mapVertexName);