You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/07/24 01:14:12 UTC
git commit: TEZ-298. Fix initialization of VertexOutputCommitter
using correct vertex user payload. (hitesh)
Updated Branches:
refs/heads/master e1f64bf59 -> 4e7629c23
TEZ-298. Fix initialization of VertexOutputCommitter using correct vertex user payload. (hitesh)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/4e7629c2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/4e7629c2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/4e7629c2
Branch: refs/heads/master
Commit: 4e7629c23be81d099fc6a859418bc5d5ca0bad73
Parents: e1f64bf
Author: Hitesh Shah <hi...@apache.org>
Authored: Tue Jul 23 16:13:44 2013 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Tue Jul 23 16:13:44 2013 -0700
----------------------------------------------------------------------
.../dag/api/client/rpc/DAGClientRPCImpl.java | 62 ++++++++---------
.../hadoop/mapred/MRVertexOutputCommitter.java | 55 +++++++--------
.../tez/dag/api/committer/VertexContext.java | 40 ++++++++---
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 70 +++++++++-----------
.../tez/dag/app/dag/impl/TestTaskAttempt.java | 35 +++++-----
5 files changed, 138 insertions(+), 124 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/4e7629c2/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
index e7f9396..5cf296b 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
@@ -55,7 +55,7 @@ public class DAGClientRPCImpl implements DAGClient {
private ApplicationReport appReport;
private YarnClient yarnClient;
private DAGClientAMProtocolBlockingPB proxy = null;
-
+
public DAGClientRPCImpl(ApplicationId appId, String dagId,
TezConfiguration conf) {
this.appId = appId;
@@ -66,12 +66,12 @@ public class DAGClientRPCImpl implements DAGClient {
yarnClient.start();
appReport = null;
}
-
+
@Override
public ApplicationId getApplicationId() {
return appId;
}
-
+
@Override
public DAGStatus getDAGStatus() throws IOException, TezException {
if(createAMProxyIfNeeded()) {
@@ -81,7 +81,7 @@ public class DAGClientRPCImpl implements DAGClient {
resetProxy(e); // create proxy again
}
}
-
+
// Later maybe from History
return getDAGStatusViaRM();
}
@@ -96,18 +96,18 @@ public class DAGClientRPCImpl implements DAGClient {
resetProxy(e); // create proxy again
}
}
-
+
// need AM for this. Later maybe from History
return null;
}
-
+
public void tryKillDAG() throws TezException, IOException {
if(LOG.isDebugEnabled()) {
LOG.debug("TryKill for app: " + appId + " dag:" + dagId);
}
if(createAMProxyIfNeeded()) {
- TryKillDAGRequestProto requestProto =
- TryKillDAGRequestProto.newBuilder().setDagId(dagId).build();
+ TryKillDAGRequestProto requestProto =
+ TryKillDAGRequestProto.newBuilder().setDagId(dagId).build();
try {
proxy.tryKillDAG(null, requestProto);
} catch (ServiceException e) {
@@ -125,26 +125,26 @@ public class DAGClientRPCImpl implements DAGClient {
yarnClient.stop();
}
}
-
+
@Override
public ApplicationReport getApplicationReport() {
return appReport;
}
-
+
void resetProxy(Exception e) {
if(LOG.isDebugEnabled()) {
- LOG.debug("Resetting AM proxy for app: " + appId + " dag:" + dagId +
+ LOG.debug("Resetting AM proxy for app: " + appId + " dag:" + dagId +
" due to exception :", e);
}
proxy = null;
}
-
+
DAGStatus getDAGStatusViaAM() throws IOException, TezException {
if(LOG.isDebugEnabled()) {
LOG.debug("GetDAGStatus via AM for app: " + appId + " dag:" + dagId);
}
- GetDAGStatusRequestProto requestProto =
- GetDAGStatusRequestProto.newBuilder().setDagId(dagId).build();
+ GetDAGStatusRequestProto requestProto =
+ GetDAGStatusRequestProto.newBuilder().setDagId(dagId).build();
try {
return new DAGStatus(
proxy.getDAGStatus(null, requestProto).getDagStatus());
@@ -153,9 +153,9 @@ public class DAGClientRPCImpl implements DAGClient {
throw new TezException(e);
}
}
-
-
-
+
+
+
DAGStatus getDAGStatusViaRM() throws TezException, IOException {
if(LOG.isDebugEnabled()) {
LOG.debug("GetDAGStatus via AM for app: " + appId + " dag:" + dagId);
@@ -166,11 +166,11 @@ public class DAGClientRPCImpl implements DAGClient {
} catch (YarnException e) {
throw new TezException(e);
}
-
+
if(appReport == null) {
throw new TezException("Unknown/Invalid appId: " + appId);
}
-
+
DAGStatusProto.Builder builder = DAGStatusProto.newBuilder();
DAGStatus dagStatus = new DAGStatus(builder);
DAGStatusStateProto dagState = null;
@@ -189,7 +189,7 @@ public class DAGClientRPCImpl implements DAGClient {
break;
case KILLED:
dagState = DAGStatusStateProto.DAG_KILLED;
- break;
+ break;
case FINISHED:
switch(appReport.getFinalApplicationStatus()) {
case UNDEFINED:
@@ -198,37 +198,39 @@ public class DAGClientRPCImpl implements DAGClient {
break;
case KILLED:
dagState = DAGStatusStateProto.DAG_KILLED;
- break;
+ break;
case SUCCEEDED:
dagState = DAGStatusStateProto.DAG_SUCCEEDED;
break;
- }
- throw new TezUncheckedException("Encountered unknown final application"
+ default:
+ throw new TezUncheckedException("Encountered unknown final application"
+ " status from YARN"
+ ", appState=" + appReport.getYarnApplicationState()
+ ", finalStatus=" + appReport.getFinalApplicationStatus());
+ }
+ break;
default:
throw new TezUncheckedException("Encountered unknown application state"
+ " from YARN, appState=" + appReport.getYarnApplicationState());
}
-
+
builder.setState(dagState);
if(appReport.getDiagnostics() != null) {
builder.addAllDiagnostics(Collections.singleton(appReport.getDiagnostics()));
}
-
+
return dagStatus;
}
-
+
VertexStatus getVertexStatusViaAM(String vertexName) throws TezException {
if (LOG.isDebugEnabled()) {
LOG.debug("GetVertexStatus via AM for app: " + appId + " dag: " + dagId
+ " vertex: " + vertexName);
}
- GetVertexStatusRequestProto requestProto =
+ GetVertexStatusRequestProto requestProto =
GetVertexStatusRequestProto.newBuilder().
setDagId(dagId).setVertexName(vertexName).build();
-
+
try {
return new VertexStatus(
proxy.getVertexStatus(null, requestProto).getVertexStatus());
@@ -257,7 +259,7 @@ public class DAGClientRPCImpl implements DAGClient {
return true;
}
appReport = getAppReport();
-
+
if(appReport == null) {
return false;
}
@@ -265,7 +267,7 @@ public class DAGClientRPCImpl implements DAGClient {
if(appState != YarnApplicationState.RUNNING) {
return false;
}
-
+
// YARN-808. Cannot ascertain if AM is ready until we connect to it.
// workaround check the default string set by YARN
if(appReport.getHost() == null || appReport.getHost().equals("N/A") ||
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/4e7629c2/tez-dag/src/main/java/org/apache/hadoop/mapred/MRVertexOutputCommitter.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/MRVertexOutputCommitter.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/MRVertexOutputCommitter.java
index 1a4da3d..14a0c52 100644
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/MRVertexOutputCommitter.java
+++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/MRVertexOutputCommitter.java
@@ -22,8 +22,6 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.JobStatus.State;
import org.apache.hadoop.mapreduce.OutputCommitter;
@@ -39,6 +37,7 @@ import org.apache.tez.dag.api.committer.VertexOutputCommitter;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.utils.TezBuilderUtils;
+import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
public class MRVertexOutputCommitter extends VertexOutputCommitter {
@@ -46,47 +45,47 @@ public class MRVertexOutputCommitter extends VertexOutputCommitter {
private static final Log LOG = LogFactory.getLog(
MRVertexOutputCommitter.class);
- private OutputCommitter committer;
- private JobContext jobContext;
+ private OutputCommitter committer = null;
+ private JobContext jobContext = null;
private volatile boolean initialized = false;
+ private JobConf jobConf = null;
public MRVertexOutputCommitter() {
}
@SuppressWarnings("rawtypes")
private OutputCommitter getOutputCommitter(VertexContext context) {
- Configuration conf = context.getConf();
OutputCommitter committer = null;
boolean newApiCommitter = false;
- if (conf.getBoolean("mapred.reducer.new-api", false)
- || conf.getBoolean("mapred.mapper.new-api", false)) {
+ if (jobConf.getBoolean("mapred.reducer.new-api", false)
+ || jobConf.getBoolean("mapred.mapper.new-api", false)) {
newApiCommitter = true;
LOG.info("Using mapred newApiCommitter.");
}
-
+
LOG.info("OutputCommitter set in config for vertex: "
- + context.getVertexId() + " : "
- + conf.get("mapred.output.committer.class"));
+ + context.getVertexID() + " : "
+ + jobConf.get("mapred.output.committer.class"));
if (newApiCommitter) {
- TezTaskID taskId = TezBuilderUtils.newTaskId(context.getDAGId(),
- context.getVertexId().getId(), 0);
+ TezTaskID taskId = TezBuilderUtils.newTaskId(context.getDAGID(),
+ context.getVertexID().getId(), 0);
TezTaskAttemptID attemptID =
TezBuilderUtils.newTaskAttemptId(taskId, 0);
- TaskAttemptContext taskContext = new TaskAttemptContextImpl(conf,
+ TaskAttemptContext taskContext = new TaskAttemptContextImpl(jobConf,
TezMRTypeConverter.fromTez(attemptID));
try {
OutputFormat outputFormat = ReflectionUtils.newInstance(taskContext
- .getOutputFormatClass(), conf);
+ .getOutputFormatClass(), jobConf);
committer = outputFormat.getOutputCommitter(taskContext);
} catch (Exception e) {
throw new TezUncheckedException(e);
}
} else {
- committer = ReflectionUtils.newInstance(conf.getClass(
+ committer = ReflectionUtils.newInstance(jobConf.getClass(
"mapred.output.committer.class", FileOutputCommitter.class,
- org.apache.hadoop.mapred.OutputCommitter.class), conf);
+ org.apache.hadoop.mapred.OutputCommitter.class), jobConf);
}
LOG.info("OutputCommitter is " + committer.getClass().getName());
return committer;
@@ -95,15 +94,8 @@ public class MRVertexOutputCommitter extends VertexOutputCommitter {
// FIXME we are using ApplicationId as DAG id
private JobContext getJobContextFromVertexContext(VertexContext context)
throws IOException {
- // FIXME when we have the vertex level user-land configuration
- // jobConf should be initialized using the user-land level configuration
- // for the vertex in question
-
- Configuration conf = context.getConf();
-
- JobConf jobConf = new JobConf(conf);
- JobID jobId = TypeConverter.fromYarn(context.getDAGId().getApplicationId());
- jobConf.addResource(new Path(MRJobConfig.JOB_CONF_FILE));
+ JobID jobId = TypeConverter.fromYarn(
+ context.getDAGID().getApplicationId());
return new MRJobContextImpl(jobConf, jobId);
}
@@ -128,8 +120,17 @@ public class MRVertexOutputCommitter extends VertexOutputCommitter {
@Override
public void init(VertexContext context) throws IOException {
// TODO VertexContext not the best way to get ApplicationAttemptId. No
- // alternates rightnow.
- context.getConf().setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
+ // alternates right now.
+
+ byte[] userPayload = context.getUserPayload();
+ if (userPayload == null) {
+ jobConf = new JobConf();
+ } else {
+ jobConf = new JobConf(
+ MRHelpers.createConfFromUserPayload(context.getUserPayload()));
+ }
+
+ jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
context.getApplicationAttemptId().getAttemptId());
committer = getOutputCommitter(context);
jobContext = getJobContextFromVertexContext(context);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/4e7629c2/tez-dag/src/main/java/org/apache/tez/dag/api/committer/VertexContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/committer/VertexContext.java b/tez-dag/src/main/java/org/apache/tez/dag/api/committer/VertexContext.java
index e837390..5c4430e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/committer/VertexContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/committer/VertexContext.java
@@ -19,22 +19,42 @@
package org.apache.tez.dag.api.committer;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezVertexID;
-public interface VertexContext {
+public class VertexContext {
- public Configuration getConf();
+ private final TezDAGID tezDAGID;
+ private final byte[] userPayload;
+ private final ApplicationAttemptId applicationAttemptId;
+ private final TezVertexID tezVertexID;
- public TezDAGID getDAGId();
-
- public byte[] getUserPayload();
-
- // TODO Get rid of this as part of VertexContext cleanup.
- public ApplicationAttemptId getApplicationAttemptId();
+ public VertexContext(TezDAGID tezDAGID, byte[] userPayload,
+ TezVertexID tezVertexID,
+ ApplicationAttemptId applicationAttemptId) {
+ this.tezDAGID = tezDAGID;
+ this.userPayload = userPayload;
+ this.tezVertexID = tezVertexID;
+ this.applicationAttemptId = applicationAttemptId;
+ }
+
+ public TezDAGID getDAGID() {
+ return tezDAGID;
+ }
+
+ public byte[] getUserPayload() {
+ return userPayload;
+ }
+
+ // TODO get rid of this as part of VertexContext cleanup
+ public ApplicationAttemptId getApplicationAttemptId() {
+ return applicationAttemptId;
+ }
+
+ public TezVertexID getVertexID() {
+ return tezVertexID;
+ }
- public TezVertexID getVertexId();
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/4e7629c2/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 9785de7..eacf0e2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -116,7 +116,7 @@ import com.google.protobuf.ByteString;
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
- EventHandler<VertexEvent>, VertexContext {
+ EventHandler<VertexEvent> {
private static final String LINE_SEPARATOR = System
.getProperty("line.separator");
@@ -154,7 +154,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
private Resource taskResource;
private TezConfiguration conf;
- private final Configuration userConf;
//fields initialized in init
@@ -340,6 +339,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
private final String vertexName;
private final ProcessorDescriptor processorDescriptor;
+ private final byte[] userPayload;
+
+ // For committer
+ private final VertexContext vertexContext;
private Map<Vertex, EdgeProperty> sourceVertices;
private Map<Vertex, EdgeProperty> targetVertices;
@@ -352,7 +355,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
private Map<String, LocalResource> localResources;
private Map<String, String> environment;
private final String javaOpts;
- private VertexTerminationCause terminationCause;
+ private VertexTerminationCause terminationCause;
public VertexImpl(TezVertexID vertexId, VertexPlan vertexPlan,
String vertexName, TezConfiguration conf, EventHandler eventHandler,
@@ -401,25 +404,16 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
this.javaOpts = vertexPlan.getTaskConfig().hasJavaOpts() ? vertexPlan
.getTaskConfig().getJavaOpts() : null;
- byte[] bb = getUserPayload();
- if (bb == null) {
- LOG.info("No user payload - falling back to default AM tez conf");
- userConf = conf;
- } else {
- try {
- userConf = MRHelpers.createConfFromUserPayload(bb);
- } catch (IOException e) {
- LOG.info("Failed to create user conf from ByteBuffer");
- throw new TezUncheckedException(
- "Failed to create user conf from ByteBuffer", e);
- }
- }
-
+ this.userPayload = initializeUserPayload();
+ this.vertexContext = new VertexContext(getDAGId(),
+ userPayload, this.vertexId,
+ getApplicationAttemptId());
+
// This "this leak" is okay because the retained pointer is in an
// instance variable.
stateMachine = stateMachineFactory.make(this);
}
-
+
protected StateMachine<VertexState, VertexEventType, VertexEvent> getStateMachine() {
return stateMachine;
}
@@ -440,11 +434,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
@Override
- public Configuration getConf() {
- return userConf;
- }
-
- @Override
public String getName() {
return vertexName;
}
@@ -477,7 +466,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
readLock.unlock();
}
}
-
+
@Override
public int getSucceededTasks() {
readLock.lock();
@@ -619,10 +608,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
readLock.unlock();
}
}
-
+
/**
* Set the terminationCause if it had not yet been set.
- *
+ *
* @param trigger The trigger
* @return true if setting the value succeeded.
*/
@@ -633,7 +622,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
return false;
}
-
+
public VertexTerminationCause getTerminationCause(){
readLock.lock();
try {
@@ -759,7 +748,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
+ ", completedTaskCount=" + vertex.completedTaskCount
+ ", terminationCause=" + vertex.terminationCause);
}
-
+
if (vertex.completedTaskCount == vertex.tasks.size()) {
//Only succeed if tasks complete successfully and no terminationCause is registered.
if(vertex.succeededTaskCount == vertex.tasks.size() && vertex.terminationCause == null) {
@@ -825,7 +814,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
/**
* Set the terminationCause and send a kill-message to all tasks.
- * The task-kill messages are only sent once.
+ * The task-kill messages are only sent once.
* @param the trigger that is causing the Vertex to transition to KILLED/FAILED
* @param event The type of kill event to send to the vertices.
*/
@@ -837,7 +826,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
}
}
-
+
VertexState finished(VertexState finalState) {
if (finishTime == 0) setFinishTime();
@@ -937,7 +926,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
if (vertex.targetVertices.isEmpty()) {
vertex.committer = new MRVertexOutputCommitter();
}
- vertex.committer.init(vertex);
+ vertex.committer.init(vertex.vertexContext);
vertex.committer.setupVertex();
// TODO: Metrics
@@ -1262,7 +1251,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
if(state == VertexState.RUNNING && forceTransitionToKillWait){
return VertexState.TERMINATING;
}
-
+
if(state == VertexState.SUCCEEDED) {
vertex.vertexScheduler.onVertexCompleted();
}
@@ -1345,14 +1334,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
Vertex other = (Vertex) obj;
return this.vertexId.equals(other.getVertexId());
- }
+ }
@Override
public int hashCode() {
final int prime = 11239;
return prime + prime * this.vertexId.hashCode();
}
-
+
@Override
public Map<Vertex, EdgeProperty> getInputVertices() {
return Collections.unmodifiableMap(this.sourceVertices);
@@ -1373,13 +1362,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
return this.targetVertices.size();
}
- @Override
- public TezDAGID getDAGId() {
+ private TezDAGID getDAGId() {
return getDAG().getID();
}
- @Override
- public ApplicationAttemptId getApplicationAttemptId() {
+ private ApplicationAttemptId getApplicationAttemptId() {
return appContext.getApplicationAttemptId();
}
@@ -1457,8 +1444,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
return this.vertexScheduler;
}
- @Override
- public byte[] getUserPayload() {
+ private byte[] initializeUserPayload() {
for (VertexPlan vertexPlan : getDAG().getJobPlan().getVertexList()) {
if (vertexPlan.getName().equals(vertexName)) {
if (!vertexPlan.getProcessorDescriptor().hasUserPayload()) {
@@ -1475,4 +1461,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
return null;
}
+
+ public byte[] getUserPayload() {
+ return userPayload;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/4e7629c2/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 72d2cc8..bc8b523 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -36,6 +36,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.TreeSet;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -90,7 +91,7 @@ import org.mockito.ArgumentCaptor;
@SuppressWarnings({ "unchecked", "rawtypes" })
public class TestTaskAttempt {
- private static final ProcessorDescriptor MAP_PROCESSOR_DESC =
+ private static final ProcessorDescriptor MAP_PROCESSOR_DESC =
new ProcessorDescriptor(
"org.apache.tez.mapreduce.processor.map.MapProcessor", null);
@@ -124,7 +125,7 @@ public class TestTaskAttempt {
new TaskAttemptImpl.ScheduleTaskattemptTransition();
EventHandler eventHandler = mock(EventHandler.class);
- Set<String> hosts = new HashSet<String>(3);
+ Set<String> hosts = new TreeSet<String>();
hosts.add("host1");
hosts.add("host2");
hosts.add("host3");
@@ -171,10 +172,10 @@ public class TestTaskAttempt {
EventHandler eventHandler = mock(EventHandler.class);
String hosts[] = new String[] { "192.168.1.1", "host2", "host3" };
- Set<String> resolved = new HashSet<String>(
+ Set<String> resolved = new TreeSet<String>(
Arrays.asList(new String[]{ "host1", "host2", "host3" }));
TaskLocationHint locationHint = new TaskLocationHint(
- new HashSet<String>(Arrays.asList(hosts)), null);
+ new TreeSet<String>(Arrays.asList(hosts)), null);
TezTaskID taskID = new TezTaskID(
new TezVertexID(new TezDAGID("1", 1, 1), 1), 1);
@@ -351,7 +352,7 @@ public class TestTaskAttempt {
// null));
assertFalse(eventHandler.internalError);
}
-
+
@Test
// Ensure ContainerTerminating and ContainerTerminated is handled correctly by
// the TaskAttempt
@@ -419,7 +420,7 @@ public class TestTaskAttempt {
assertFalse(
"InternalError occurred trying to handle TA_CONTAINER_TERMINATING",
eventHandler.internalError);
-
+
assertEquals("Task attempt is not in the FAILED state", taImpl.getState(),
TaskAttemptState.FAILED);
@@ -429,14 +430,14 @@ public class TestTaskAttempt {
int expectedEvenstAfterTerminating = expectedEventsAtRunning + 3;
arg = ArgumentCaptor.forClass(Event.class);
verify(eventHandler, times(expectedEvenstAfterTerminating)).handle(arg.capture());
-
+
verifyEventType(
arg.getAllValues().subList(expectedEventsAtRunning,
expectedEvenstAfterTerminating), TaskEventTAUpdate.class, 1);
verifyEventType(
arg.getAllValues().subList(expectedEventsAtRunning,
expectedEvenstAfterTerminating), AMSchedulerEventTAEnded.class, 1);
-
+
taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID,
"Terminated"));
int expectedEventAfterTerminated = expectedEvenstAfterTerminating + 0;
@@ -447,7 +448,7 @@ public class TestTaskAttempt {
assertEquals("Terminated", taImpl.getDiagnostics().get(1));
}
-
+
@Test
// Ensure ContainerTerminated is handled correctly by the TaskAttempt
public void testContainerTerminatedWhileRunning() throws Exception {
@@ -507,9 +508,9 @@ public class TestTaskAttempt {
assertFalse(
"InternalError occurred trying to handle TA_CONTAINER_TERMINATED",
eventHandler.internalError);
-
+
assertEquals("Terminated", taImpl.getDiagnostics().get(0));
-
+
// TODO Ensure TA_TERMINATING after this is ingored.
}
@@ -651,14 +652,14 @@ public class TestTaskAttempt {
int expectedEvenstAfterTerminating = expectedEventsAtRunning + 3;
arg = ArgumentCaptor.forClass(Event.class);
verify(eventHandler, times(expectedEvenstAfterTerminating)).handle(arg.capture());
-
+
verifyEventType(
arg.getAllValues().subList(expectedEventsAtRunning,
expectedEvenstAfterTerminating), TaskEventTAUpdate.class, 1);
verifyEventType(
arg.getAllValues().subList(expectedEventsAtRunning,
expectedEvenstAfterTerminating), AMSchedulerEventTAEnded.class, 1);
-
+
taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID,
"Terminated"));
int expectedEventAfterTerminated = expectedEvenstAfterTerminating + 0;
@@ -670,7 +671,7 @@ public class TestTaskAttempt {
assertEquals(0, taImpl.getDiagnostics().size());
}
-
+
@Test
// Verifies that multiple TooManyFetchFailures are handled correctly by the
// TaskAttempt.
@@ -731,12 +732,12 @@ public class TestTaskAttempt {
TaskAttemptEventType.TA_DONE));
assertEquals("Task attempt is not in succeeded state", taImpl.getState(),
TaskAttemptState.SUCCEEDED);
-
+
int expectedEventsTillSucceeded = 6;
ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
verify(eventHandler, times(expectedEventsTillSucceeded)).handle(arg.capture());
verifyEventType(arg.getAllValues(), TaskEventTAUpdate.class, 2);
-
+
taImpl.handle(new TaskAttemptEvent(taskAttemptID,
TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURES));
int expectedEventsAfterFetchFailure = expectedEventsTillSucceeded + 3;
@@ -771,7 +772,7 @@ public class TestTaskAttempt {
"Mismatch in num occurences of event: " + eventClass.getCanonicalName(),
expectedOccurences, count);
}
-
+
public static class MockEventHandler implements EventHandler {
public boolean internalError;