You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/07/24 01:14:12 UTC

git commit: TEZ-298. Fix initialization of VertexOutputCommitter using correct vertex user payload. (hitesh)

Updated Branches:
  refs/heads/master e1f64bf59 -> 4e7629c23


TEZ-298. Fix initialization of VertexOutputCommitter using correct vertex user payload. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/4e7629c2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/4e7629c2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/4e7629c2

Branch: refs/heads/master
Commit: 4e7629c23be81d099fc6a859418bc5d5ca0bad73
Parents: e1f64bf
Author: Hitesh Shah <hi...@apache.org>
Authored: Tue Jul 23 16:13:44 2013 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Tue Jul 23 16:13:44 2013 -0700

----------------------------------------------------------------------
 .../dag/api/client/rpc/DAGClientRPCImpl.java    | 62 ++++++++---------
 .../hadoop/mapred/MRVertexOutputCommitter.java  | 55 +++++++--------
 .../tez/dag/api/committer/VertexContext.java    | 40 ++++++++---
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 70 +++++++++-----------
 .../tez/dag/app/dag/impl/TestTaskAttempt.java   | 35 +++++-----
 5 files changed, 138 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/4e7629c2/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
index e7f9396..5cf296b 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
@@ -55,7 +55,7 @@ public class DAGClientRPCImpl implements DAGClient {
   private ApplicationReport appReport;
   private YarnClient yarnClient;
   private DAGClientAMProtocolBlockingPB proxy = null;
-  
+
   public DAGClientRPCImpl(ApplicationId appId, String dagId,
       TezConfiguration conf) {
     this.appId = appId;
@@ -66,12 +66,12 @@ public class DAGClientRPCImpl implements DAGClient {
     yarnClient.start();
     appReport = null;
   }
-  
+
   @Override
   public ApplicationId getApplicationId() {
     return appId;
   }
-  
+
   @Override
   public DAGStatus getDAGStatus() throws IOException, TezException {
     if(createAMProxyIfNeeded()) {
@@ -81,7 +81,7 @@ public class DAGClientRPCImpl implements DAGClient {
         resetProxy(e); // create proxy again
       }
     }
-    
+
     // Later maybe from History
     return getDAGStatusViaRM();
   }
@@ -96,18 +96,18 @@ public class DAGClientRPCImpl implements DAGClient {
         resetProxy(e); // create proxy again
       }
     }
-    
+
     // need AM for this. Later maybe from History
     return null;
   }
-  
+
   public void tryKillDAG() throws TezException, IOException {
     if(LOG.isDebugEnabled()) {
       LOG.debug("TryKill for app: " + appId + " dag:" + dagId);
     }
     if(createAMProxyIfNeeded()) {
-      TryKillDAGRequestProto requestProto = 
-          TryKillDAGRequestProto.newBuilder().setDagId(dagId).build();    
+      TryKillDAGRequestProto requestProto =
+          TryKillDAGRequestProto.newBuilder().setDagId(dagId).build();
       try {
         proxy.tryKillDAG(null, requestProto);
       } catch (ServiceException e) {
@@ -125,26 +125,26 @@ public class DAGClientRPCImpl implements DAGClient {
       yarnClient.stop();
     }
   }
-  
+
   @Override
   public ApplicationReport getApplicationReport() {
     return appReport;
   }
-  
+
   void resetProxy(Exception e) {
     if(LOG.isDebugEnabled()) {
-      LOG.debug("Resetting AM proxy for app: " + appId + " dag:" + dagId + 
+      LOG.debug("Resetting AM proxy for app: " + appId + " dag:" + dagId +
           " due to exception :", e);
     }
     proxy = null;
   }
-  
+
   DAGStatus getDAGStatusViaAM() throws IOException, TezException {
     if(LOG.isDebugEnabled()) {
       LOG.debug("GetDAGStatus via AM for app: " + appId + " dag:" + dagId);
     }
-    GetDAGStatusRequestProto requestProto = 
-        GetDAGStatusRequestProto.newBuilder().setDagId(dagId).build();    
+    GetDAGStatusRequestProto requestProto =
+        GetDAGStatusRequestProto.newBuilder().setDagId(dagId).build();
     try {
       return new DAGStatus(
                  proxy.getDAGStatus(null, requestProto).getDagStatus());
@@ -153,9 +153,9 @@ public class DAGClientRPCImpl implements DAGClient {
       throw new TezException(e);
     }
   }
-  
-  
-  
+
+
+
   DAGStatus getDAGStatusViaRM() throws TezException, IOException {
     if(LOG.isDebugEnabled()) {
       LOG.debug("GetDAGStatus via AM for app: " + appId + " dag:" + dagId);
@@ -166,11 +166,11 @@ public class DAGClientRPCImpl implements DAGClient {
     } catch (YarnException e) {
       throw new TezException(e);
     }
-    
+
     if(appReport == null) {
       throw new TezException("Unknown/Invalid appId: " + appId);
     }
-    
+
     DAGStatusProto.Builder builder = DAGStatusProto.newBuilder();
     DAGStatus dagStatus = new DAGStatus(builder);
     DAGStatusStateProto dagState = null;
@@ -189,7 +189,7 @@ public class DAGClientRPCImpl implements DAGClient {
       break;
     case KILLED:
       dagState = DAGStatusStateProto.DAG_KILLED;
-      break;      
+      break;
     case FINISHED:
       switch(appReport.getFinalApplicationStatus()) {
       case UNDEFINED:
@@ -198,37 +198,39 @@ public class DAGClientRPCImpl implements DAGClient {
         break;
       case KILLED:
         dagState = DAGStatusStateProto.DAG_KILLED;
-        break;        
+        break;
       case SUCCEEDED:
         dagState = DAGStatusStateProto.DAG_SUCCEEDED;
         break;
-      }
-      throw new TezUncheckedException("Encountered unknown final application"
+      default:
+        throw new TezUncheckedException("Encountered unknown final application"
           + " status from YARN"
           + ", appState=" + appReport.getYarnApplicationState()
           + ", finalStatus=" + appReport.getFinalApplicationStatus());
+      }
+      break;
     default:
       throw new TezUncheckedException("Encountered unknown application state"
           + " from YARN, appState=" + appReport.getYarnApplicationState());
     }
-    
+
     builder.setState(dagState);
     if(appReport.getDiagnostics() != null) {
       builder.addAllDiagnostics(Collections.singleton(appReport.getDiagnostics()));
     }
-    
+
     return dagStatus;
   }
-  
+
   VertexStatus getVertexStatusViaAM(String vertexName) throws TezException {
     if (LOG.isDebugEnabled()) {
       LOG.debug("GetVertexStatus via AM for app: " + appId + " dag: " + dagId
           + " vertex: " + vertexName);
     }
-    GetVertexStatusRequestProto requestProto = 
+    GetVertexStatusRequestProto requestProto =
         GetVertexStatusRequestProto.newBuilder().
                         setDagId(dagId).setVertexName(vertexName).build();
-    
+
     try {
       return new VertexStatus(
                  proxy.getVertexStatus(null, requestProto).getVertexStatus());
@@ -257,7 +259,7 @@ public class DAGClientRPCImpl implements DAGClient {
       return true;
     }
     appReport = getAppReport();
-    
+
     if(appReport == null) {
       return false;
     }
@@ -265,7 +267,7 @@ public class DAGClientRPCImpl implements DAGClient {
     if(appState != YarnApplicationState.RUNNING) {
       return false;
     }
-    
+
     // YARN-808. Cannot ascertain if AM is ready until we connect to it.
     // workaround check the default string set by YARN
     if(appReport.getHost() == null || appReport.getHost().equals("N/A") ||

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/4e7629c2/tez-dag/src/main/java/org/apache/hadoop/mapred/MRVertexOutputCommitter.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/MRVertexOutputCommitter.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/MRVertexOutputCommitter.java
index 1a4da3d..14a0c52 100644
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/MRVertexOutputCommitter.java
+++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/MRVertexOutputCommitter.java
@@ -22,8 +22,6 @@ import java.io.IOException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.JobStatus.State;
 import org.apache.hadoop.mapreduce.OutputCommitter;
@@ -39,6 +37,7 @@ import org.apache.tez.dag.api.committer.VertexOutputCommitter;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.utils.TezBuilderUtils;
+import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 
 public class MRVertexOutputCommitter extends VertexOutputCommitter {
@@ -46,47 +45,47 @@ public class MRVertexOutputCommitter extends VertexOutputCommitter {
   private static final Log LOG = LogFactory.getLog(
       MRVertexOutputCommitter.class);
 
-  private OutputCommitter committer;
-  private JobContext jobContext;
+  private OutputCommitter committer = null;
+  private JobContext jobContext = null;
   private volatile boolean initialized = false;
+  private JobConf jobConf = null;
 
   public MRVertexOutputCommitter() {
   }
 
   @SuppressWarnings("rawtypes")
   private OutputCommitter getOutputCommitter(VertexContext context) {
-    Configuration conf = context.getConf();
 
     OutputCommitter committer = null;
     boolean newApiCommitter = false;
-    if (conf.getBoolean("mapred.reducer.new-api", false)
-        || conf.getBoolean("mapred.mapper.new-api", false))  {
+    if (jobConf.getBoolean("mapred.reducer.new-api", false)
+        || jobConf.getBoolean("mapred.mapper.new-api", false))  {
       newApiCommitter = true;
       LOG.info("Using mapred newApiCommitter.");
     }
-    
+
     LOG.info("OutputCommitter set in config for vertex: "
-        + context.getVertexId() + " : "
-        + conf.get("mapred.output.committer.class"));
+        + context.getVertexID() + " : "
+        + jobConf.get("mapred.output.committer.class"));
 
     if (newApiCommitter) {
-      TezTaskID taskId = TezBuilderUtils.newTaskId(context.getDAGId(),
-          context.getVertexId().getId(), 0);
+      TezTaskID taskId = TezBuilderUtils.newTaskId(context.getDAGID(),
+          context.getVertexID().getId(), 0);
       TezTaskAttemptID attemptID =
           TezBuilderUtils.newTaskAttemptId(taskId, 0);
-      TaskAttemptContext taskContext = new TaskAttemptContextImpl(conf,
+      TaskAttemptContext taskContext = new TaskAttemptContextImpl(jobConf,
           TezMRTypeConverter.fromTez(attemptID));
       try {
         OutputFormat outputFormat = ReflectionUtils.newInstance(taskContext
-            .getOutputFormatClass(), conf);
+            .getOutputFormatClass(), jobConf);
         committer = outputFormat.getOutputCommitter(taskContext);
       } catch (Exception e) {
         throw new TezUncheckedException(e);
       }
     } else {
-      committer = ReflectionUtils.newInstance(conf.getClass(
+      committer = ReflectionUtils.newInstance(jobConf.getClass(
           "mapred.output.committer.class", FileOutputCommitter.class,
-          org.apache.hadoop.mapred.OutputCommitter.class), conf);
+          org.apache.hadoop.mapred.OutputCommitter.class), jobConf);
     }
     LOG.info("OutputCommitter is " + committer.getClass().getName());
     return committer;
@@ -95,15 +94,8 @@ public class MRVertexOutputCommitter extends VertexOutputCommitter {
   // FIXME we are using ApplicationId as DAG id
   private JobContext getJobContextFromVertexContext(VertexContext context)
       throws IOException {
-    // FIXME when we have the vertex level user-land configuration
-    // jobConf should be initialized using the user-land level configuration
-    // for the vertex in question
-
-    Configuration conf = context.getConf();
-
-    JobConf jobConf = new JobConf(conf);
-    JobID jobId = TypeConverter.fromYarn(context.getDAGId().getApplicationId());
-    jobConf.addResource(new Path(MRJobConfig.JOB_CONF_FILE));
+    JobID jobId = TypeConverter.fromYarn(
+        context.getDAGID().getApplicationId());
     return new MRJobContextImpl(jobConf, jobId);
   }
 
@@ -128,8 +120,17 @@ public class MRVertexOutputCommitter extends VertexOutputCommitter {
   @Override
   public void init(VertexContext context) throws IOException {
     // TODO VertexContext not the best way to get ApplicationAttemptId. No
-    // alternates rightnow.
-    context.getConf().setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
+    // alternates right now.
+
+    byte[] userPayload = context.getUserPayload();
+    if (userPayload == null) {
+      jobConf = new JobConf();
+    } else {
+      jobConf = new JobConf(
+          MRHelpers.createConfFromUserPayload(context.getUserPayload()));
+    }
+
+    jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
         context.getApplicationAttemptId().getAttemptId());
     committer = getOutputCommitter(context);
     jobContext = getJobContextFromVertexContext(context);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/4e7629c2/tez-dag/src/main/java/org/apache/tez/dag/api/committer/VertexContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/committer/VertexContext.java b/tez-dag/src/main/java/org/apache/tez/dag/api/committer/VertexContext.java
index e837390..5c4430e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/committer/VertexContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/committer/VertexContext.java
@@ -19,22 +19,42 @@
 package org.apache.tez.dag.api.committer;
 
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezVertexID;
 
-public interface VertexContext {
+public class VertexContext {
 
-  public Configuration getConf();
+  private final TezDAGID tezDAGID;
+  private final byte[] userPayload;
+  private final ApplicationAttemptId applicationAttemptId;
+  private final TezVertexID tezVertexID;
 
-  public TezDAGID getDAGId();
-  
-  public byte[] getUserPayload();
-  
-  // TODO Get rid of this as part of VertexContext cleanup.
-  public ApplicationAttemptId getApplicationAttemptId();
+  public VertexContext(TezDAGID tezDAGID, byte[] userPayload,
+      TezVertexID tezVertexID,
+      ApplicationAttemptId applicationAttemptId) {
+    this.tezDAGID = tezDAGID;
+    this.userPayload = userPayload;
+    this.tezVertexID = tezVertexID;
+    this.applicationAttemptId = applicationAttemptId;
+  }
+
+  public TezDAGID getDAGID() {
+    return tezDAGID;
+  }
+
+  public byte[] getUserPayload() {
+    return userPayload;
+  }
+
+  // TODO get rid of this as part of VertexContext cleanup
+  public ApplicationAttemptId getApplicationAttemptId() {
+    return applicationAttemptId;
+  }
+
+  public TezVertexID getVertexID() {
+    return tezVertexID;
+  }
 
-  public TezVertexID getVertexId();
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/4e7629c2/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 9785de7..eacf0e2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -116,7 +116,7 @@ import com.google.protobuf.ByteString;
  */
 @SuppressWarnings({ "rawtypes", "unchecked" })
 public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
-  EventHandler<VertexEvent>, VertexContext {
+  EventHandler<VertexEvent> {
 
   private static final String LINE_SEPARATOR = System
       .getProperty("line.separator");
@@ -154,7 +154,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   private Resource taskResource;
 
   private TezConfiguration conf;
-  private final Configuration userConf;
 
   //fields initialized in init
 
@@ -340,6 +339,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
   private final String vertexName;
   private final ProcessorDescriptor processorDescriptor;
+  private final byte[] userPayload;
+
+  // For committer
+  private final VertexContext vertexContext;
 
   private Map<Vertex, EdgeProperty> sourceVertices;
   private Map<Vertex, EdgeProperty> targetVertices;
@@ -352,7 +355,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   private Map<String, LocalResource> localResources;
   private Map<String, String> environment;
   private final String javaOpts;
-  private VertexTerminationCause terminationCause; 
+  private VertexTerminationCause terminationCause;
 
   public VertexImpl(TezVertexID vertexId, VertexPlan vertexPlan,
       String vertexName, TezConfiguration conf, EventHandler eventHandler,
@@ -401,25 +404,16 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     this.javaOpts = vertexPlan.getTaskConfig().hasJavaOpts() ? vertexPlan
         .getTaskConfig().getJavaOpts() : null;
 
-    byte[] bb = getUserPayload();
-    if (bb == null) {
-      LOG.info("No user payload - falling back to default AM tez conf");
-      userConf = conf;
-    } else {
-      try {
-        userConf = MRHelpers.createConfFromUserPayload(bb);
-      } catch (IOException e) {
-        LOG.info("Failed to create user conf from ByteBuffer");
-        throw new TezUncheckedException(
-            "Failed to create user conf from ByteBuffer", e);
-      }
-    }
-    
+    this.userPayload = initializeUserPayload();
+    this.vertexContext = new VertexContext(getDAGId(),
+        userPayload, this.vertexId,
+        getApplicationAttemptId());
+
     // This "this leak" is okay because the retained pointer is in an
     //  instance variable.
     stateMachine = stateMachineFactory.make(this);
   }
-  
+
   protected StateMachine<VertexState, VertexEventType, VertexEvent> getStateMachine() {
     return stateMachine;
   }
@@ -440,11 +434,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   }
 
   @Override
-  public Configuration getConf() {
-    return userConf;
-  }
-
-  @Override
   public String getName() {
     return vertexName;
   }
@@ -477,7 +466,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       readLock.unlock();
     }
   }
-  
+
   @Override
   public int getSucceededTasks() {
     readLock.lock();
@@ -619,10 +608,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       readLock.unlock();
     }
   }
-  
+
   /**
    * Set the terminationCause if it had not yet been set.
-   * 
+   *
    * @param trigger The trigger
    * @return true if setting the value succeeded.
    */
@@ -633,7 +622,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     }
     return false;
   }
-  
+
   public VertexTerminationCause getTerminationCause(){
     readLock.lock();
     try {
@@ -759,7 +748,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           + ", completedTaskCount=" + vertex.completedTaskCount
           + ", terminationCause=" + vertex.terminationCause);
     }
-    
+
     if (vertex.completedTaskCount == vertex.tasks.size()) {
       //Only succeed if tasks complete successfully and no terminationCause is registered.
       if(vertex.succeededTaskCount == vertex.tasks.size() && vertex.terminationCause == null) {
@@ -825,7 +814,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
   /**
    * Set the terminationCause and send a kill-message to all tasks.
-   * The task-kill messages are only sent once. 
+   * The task-kill messages are only sent once.
    * @param the trigger that is causing the Vertex to transition to KILLED/FAILED
    * @param event The type of kill event to send to the vertices.
    */
@@ -837,7 +826,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       }
     }
   }
-  
+
   VertexState finished(VertexState finalState) {
     if (finishTime == 0) setFinishTime();
 
@@ -937,7 +926,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         if (vertex.targetVertices.isEmpty()) {
           vertex.committer = new MRVertexOutputCommitter();
         }
-        vertex.committer.init(vertex);
+        vertex.committer.init(vertex.vertexContext);
         vertex.committer.setupVertex();
 
         // TODO: Metrics
@@ -1262,7 +1251,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       if(state == VertexState.RUNNING && forceTransitionToKillWait){
         return VertexState.TERMINATING;
       }
-      
+
       if(state == VertexState.SUCCEEDED) {
         vertex.vertexScheduler.onVertexCompleted();
       }
@@ -1345,14 +1334,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     }
     Vertex other = (Vertex) obj;
     return this.vertexId.equals(other.getVertexId());
-  }  
+  }
 
   @Override
   public int hashCode() {
     final int prime = 11239;
     return prime + prime * this.vertexId.hashCode();
   }
-    
+
   @Override
   public Map<Vertex, EdgeProperty> getInputVertices() {
     return Collections.unmodifiableMap(this.sourceVertices);
@@ -1373,13 +1362,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     return this.targetVertices.size();
   }
 
-  @Override
-  public TezDAGID getDAGId() {
+  private TezDAGID getDAGId() {
     return getDAG().getID();
   }
 
-  @Override
-  public ApplicationAttemptId getApplicationAttemptId() {
+  private ApplicationAttemptId getApplicationAttemptId() {
     return appContext.getApplicationAttemptId();
   }
 
@@ -1457,8 +1444,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     return this.vertexScheduler;
   }
 
-  @Override
-  public byte[] getUserPayload() {
+  private byte[] initializeUserPayload() {
     for (VertexPlan vertexPlan : getDAG().getJobPlan().getVertexList()) {
       if (vertexPlan.getName().equals(vertexName)) {
         if (!vertexPlan.getProcessorDescriptor().hasUserPayload()) {
@@ -1475,4 +1461,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     }
     return null;
   }
+
+  public byte[] getUserPayload() {
+    return userPayload;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/4e7629c2/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 72d2cc8..bc8b523 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -36,6 +36,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeSet;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -90,7 +91,7 @@ import org.mockito.ArgumentCaptor;
 @SuppressWarnings({ "unchecked", "rawtypes" })
 public class TestTaskAttempt {
 
-  private static final ProcessorDescriptor MAP_PROCESSOR_DESC = 
+  private static final ProcessorDescriptor MAP_PROCESSOR_DESC =
       new ProcessorDescriptor(
       "org.apache.tez.mapreduce.processor.map.MapProcessor", null);
 
@@ -124,7 +125,7 @@ public class TestTaskAttempt {
         new TaskAttemptImpl.ScheduleTaskattemptTransition();
 
     EventHandler eventHandler = mock(EventHandler.class);
-    Set<String> hosts = new HashSet<String>(3);
+    Set<String> hosts = new TreeSet<String>();
     hosts.add("host1");
     hosts.add("host2");
     hosts.add("host3");
@@ -171,10 +172,10 @@ public class TestTaskAttempt {
 
     EventHandler eventHandler = mock(EventHandler.class);
     String hosts[] = new String[] { "192.168.1.1", "host2", "host3" };
-    Set<String> resolved = new HashSet<String>(
+    Set<String> resolved = new TreeSet<String>(
         Arrays.asList(new String[]{ "host1", "host2", "host3" }));
     TaskLocationHint locationHint = new TaskLocationHint(
-        new HashSet<String>(Arrays.asList(hosts)), null);
+        new TreeSet<String>(Arrays.asList(hosts)), null);
 
     TezTaskID taskID = new TezTaskID(
         new TezVertexID(new TezDAGID("1", 1, 1), 1), 1);
@@ -351,7 +352,7 @@ public class TestTaskAttempt {
     // null));
     assertFalse(eventHandler.internalError);
   }
-  
+
   @Test
   // Ensure ContainerTerminating and ContainerTerminated is handled correctly by
   // the TaskAttempt
@@ -419,7 +420,7 @@ public class TestTaskAttempt {
     assertFalse(
         "InternalError occurred trying to handle TA_CONTAINER_TERMINATING",
         eventHandler.internalError);
-    
+
     assertEquals("Task attempt is not in the  FAILED state", taImpl.getState(),
         TaskAttemptState.FAILED);
 
@@ -429,14 +430,14 @@ public class TestTaskAttempt {
     int expectedEvenstAfterTerminating = expectedEventsAtRunning + 3;
     arg = ArgumentCaptor.forClass(Event.class);
     verify(eventHandler, times(expectedEvenstAfterTerminating)).handle(arg.capture());
-    
+
     verifyEventType(
         arg.getAllValues().subList(expectedEventsAtRunning,
             expectedEvenstAfterTerminating), TaskEventTAUpdate.class, 1);
     verifyEventType(
         arg.getAllValues().subList(expectedEventsAtRunning,
             expectedEvenstAfterTerminating), AMSchedulerEventTAEnded.class, 1);
-    
+
     taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID,
         "Terminated"));
     int expectedEventAfterTerminated = expectedEvenstAfterTerminating + 0;
@@ -447,7 +448,7 @@ public class TestTaskAttempt {
     assertEquals("Terminated", taImpl.getDiagnostics().get(1));
   }
 
-  
+
   @Test
   // Ensure ContainerTerminated is handled correctly by the TaskAttempt
   public void testContainerTerminatedWhileRunning() throws Exception {
@@ -507,9 +508,9 @@ public class TestTaskAttempt {
     assertFalse(
         "InternalError occurred trying to handle TA_CONTAINER_TERMINATED",
         eventHandler.internalError);
-    
+
     assertEquals("Terminated", taImpl.getDiagnostics().get(0));
-    
+
     // TODO Ensure TA_TERMINATING after this is ingored.
   }
 
@@ -651,14 +652,14 @@ public class TestTaskAttempt {
     int expectedEvenstAfterTerminating = expectedEventsAtRunning + 3;
     arg = ArgumentCaptor.forClass(Event.class);
     verify(eventHandler, times(expectedEvenstAfterTerminating)).handle(arg.capture());
-    
+
     verifyEventType(
         arg.getAllValues().subList(expectedEventsAtRunning,
             expectedEvenstAfterTerminating), TaskEventTAUpdate.class, 1);
     verifyEventType(
         arg.getAllValues().subList(expectedEventsAtRunning,
             expectedEvenstAfterTerminating), AMSchedulerEventTAEnded.class, 1);
-    
+
     taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID,
         "Terminated"));
     int expectedEventAfterTerminated = expectedEvenstAfterTerminating + 0;
@@ -670,7 +671,7 @@ public class TestTaskAttempt {
     assertEquals(0, taImpl.getDiagnostics().size());
   }
 
-  
+
   @Test
   // Verifies that multiple TooManyFetchFailures are handled correctly by the
   // TaskAttempt.
@@ -731,12 +732,12 @@ public class TestTaskAttempt {
         TaskAttemptEventType.TA_DONE));
     assertEquals("Task attempt is not in succeeded state", taImpl.getState(),
         TaskAttemptState.SUCCEEDED);
-    
+
     int expectedEventsTillSucceeded = 6;
     ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
     verify(eventHandler, times(expectedEventsTillSucceeded)).handle(arg.capture());
     verifyEventType(arg.getAllValues(), TaskEventTAUpdate.class, 2);
-    
+
     taImpl.handle(new TaskAttemptEvent(taskAttemptID,
         TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURES));
     int expectedEventsAfterFetchFailure = expectedEventsTillSucceeded + 3;
@@ -771,7 +772,7 @@ public class TestTaskAttempt {
         "Mismatch in num occurences of event: " + eventClass.getCanonicalName(),
         expectedOccurences, count);
   }
-  
+
   public static class MockEventHandler implements EventHandler {
     public boolean internalError;