You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2014/08/21 01:15:10 UTC

svn commit: r1619261 - in /hive/branches/tez/ql/src: java/org/apache/hadoop/hive/ql/exec/tez/ java/org/apache/hadoop/hive/ql/exec/tez/tools/ java/org/apache/hadoop/hive/ql/metadata/ test/org/apache/hadoop/hive/ql/exec/tez/

Author: gunther
Date: Wed Aug 20 23:15:09 2014
New Revision: 1619261

URL: http://svn.apache.org/r1619261
Log:
HIVE-7808: Changes to work against Tez-0.5 RC (Siddharth Seth via Gunther Hagleitner)

Modified:
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HivePreWarmProcessor.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapTezProcessor.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceTezProcessor.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java
    hive/branches/tez/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java?rev=1619261&r1=1619260&r2=1619261&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java Wed Aug 20 23:15:09 2014
@@ -19,29 +19,28 @@
 package org.apache.hadoop.hive.ql.exec.tez;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.tez.dag.api.EdgeManager;
-import org.apache.tez.dag.api.EdgeManagerContext;
+import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.tez.dag.api.EdgeManagerPlugin;
+import org.apache.tez.dag.api.EdgeManagerPluginContext;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.InputReadErrorEvent;
 
-import com.google.common.collect.Multimap;
-
-public class CustomPartitionEdge extends EdgeManager {
+public class CustomPartitionEdge extends EdgeManagerPlugin {
 
   private static final Log LOG = LogFactory.getLog(CustomPartitionEdge.class.getName());
 
   CustomEdgeConfiguration conf = null;
-  EdgeManagerContext context = null;
+  final EdgeManagerPluginContext context;
 
   // used by the framework at runtime. initialize is the real initializer at runtime
-  public CustomPartitionEdge(EdgeManagerContext context) {
+  public CustomPartitionEdge(EdgeManagerPluginContext context) {
     super(context);
     this.context = context;
   }
@@ -65,17 +64,17 @@ public class CustomPartitionEdge extends
   // called at runtime to initialize the custom edge.
   @Override
   public void initialize() {
-    byte[] payload = context.getUserPayload();
+    ByteBuffer payload = context.getUserPayload().getPayload();
     LOG.info("Initializing the edge, payload: " + payload);
     if (payload == null) {
       throw new RuntimeException("Invalid payload");
     }
     // De-serialization code
-    DataInputBuffer dib = new DataInputBuffer();
-    dib.reset(payload, payload.length);
+    DataInputByteBuffer dibb = new DataInputByteBuffer();
+    dibb.reset(payload);
     conf = new CustomEdgeConfiguration();
     try {
-      conf.readFields(dib);
+      conf.readFields(dibb);
     } catch (IOException e) {
       throw new RuntimeException(e);
     }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java?rev=1619261&r1=1619260&r2=1619261&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java Wed Aug 20 23:15:09 2014
@@ -37,24 +37,24 @@ import org.apache.hadoop.io.serializer.S
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat;
-import org.apache.tez.dag.api.EdgeManagerDescriptor;
+import org.apache.hadoop.mapreduce.split.TezMapReduceSplitsGrouper;
+import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
 import org.apache.tez.dag.api.InputDescriptor;
-import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.api.VertexManagerPlugin;
 import org.apache.tez.dag.api.VertexManagerPluginContext;
-import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
-import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
 import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.RootInputSpecUpdate;
-import org.apache.tez.runtime.api.events.RootInputConfigureVertexTasksEvent;
-import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
-import org.apache.tez.runtime.api.events.RootInputUpdatePayloadEvent;
+import org.apache.tez.runtime.api.InputSpecUpdate;
+import org.apache.tez.runtime.api.events.InputConfigureVertexTasksEvent;
+import org.apache.tez.runtime.api.events.InputDataInformationEvent;
+import org.apache.tez.runtime.api.events.InputUpdatePayloadEvent;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
 
 import com.google.common.base.Preconditions;
@@ -63,6 +63,7 @@ import com.google.common.collect.HashMul
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
+import com.google.protobuf.ByteString;
 
 /*
  * Only works with old mapred API
@@ -74,8 +75,8 @@ public class CustomPartitionVertex exten
 
   VertexManagerPluginContext context;
 
-  private RootInputConfigureVertexTasksEvent configureVertexTaskEvent;
-  private List<RootInputDataInformationEvent> dataInformationEvents;
+  private InputConfigureVertexTasksEvent configureVertexTaskEvent;
+  private List<InputDataInformationEvent> dataInformationEvents;
   private int numBuckets = -1;
   private Configuration conf = null;
   private boolean rootVertexInitialized = false;
@@ -89,7 +90,7 @@ public class CustomPartitionVertex exten
   @Override
   public void initialize() {
     this.context = getContext();
-    ByteBuffer byteBuf = ByteBuffer.wrap(context.getUserPayload());
+    ByteBuffer byteBuf = context.getUserPayload().getPayload();
     this.numBuckets = byteBuf.getInt();
   }
 
@@ -129,8 +130,8 @@ public class CustomPartitionVertex exten
       // but that
       // means serializing another instance.
       MRInputUserPayloadProto protoPayload =
-          MRHelpers.parseMRInputPayload(inputDescriptor.getUserPayload());
-      this.conf = MRHelpers.createConfFromByteString(protoPayload.getConfigurationBytes());
+          MRInputHelpers.parseMRInputPayload(inputDescriptor.getUserPayload());
+      this.conf = TezUtils.createConfFromByteString(protoPayload.getConfigurationBytes());
 
       /*
        * Currently in tez, the flow of events is thus:
@@ -146,13 +147,10 @@ public class CustomPartitionVertex exten
        */
 
       // This assumes that Grouping will always be used.
-      // Changing the InputFormat - so that the correct one is initialized in
-      // MRInput.
-      this.conf.set("mapred.input.format.class", TezGroupedSplitsInputFormat.class.getName());
+      // Enabling grouping on the payload.
       MRInputUserPayloadProto updatedPayload =
-          MRInputUserPayloadProto.newBuilder(protoPayload)
-              .setConfigurationBytes(MRHelpers.createByteStringFromConf(conf)).build();
-      inputDescriptor.setUserPayload(updatedPayload.toByteArray());
+          MRInputUserPayloadProto.newBuilder(protoPayload).setGroupingEnabled(true).build();
+      inputDescriptor.setUserPayload(UserPayload.create(updatedPayload.toByteString().asReadOnlyByteBuffer()));
     } catch (IOException e) {
       e.printStackTrace();
       throw new RuntimeException(e);
@@ -162,14 +160,14 @@ public class CustomPartitionVertex exten
     Map<String, List<FileSplit>> pathFileSplitsMap = new TreeMap<String, List<FileSplit>>();
 
     for (Event event : events) {
-      if (event instanceof RootInputConfigureVertexTasksEvent) {
+      if (event instanceof InputConfigureVertexTasksEvent) {
         // No tasks should have been started yet. Checked by initial state
         // check.
         Preconditions.checkState(dataInformationEventSeen == false);
         Preconditions
             .checkState(context.getVertexNumTasks(context.getVertexName()) == -1,
                 "Parallelism for the vertex should be set to -1 if the InputInitializer is setting parallelism");
-        RootInputConfigureVertexTasksEvent cEvent = (RootInputConfigureVertexTasksEvent) event;
+        InputConfigureVertexTasksEvent cEvent = (InputConfigureVertexTasksEvent) event;
 
         // The vertex cannot be configured until all DataEvents are seen - to
         // build the routing table.
@@ -177,12 +175,12 @@ public class CustomPartitionVertex exten
         dataInformationEvents =
             Lists.newArrayListWithCapacity(configureVertexTaskEvent.getNumTasks());
       }
-      if (event instanceof RootInputUpdatePayloadEvent) {
+      if (event instanceof InputUpdatePayloadEvent) {
         // this event can never occur. If it does, fail.
         Preconditions.checkState(false);
-      } else if (event instanceof RootInputDataInformationEvent) {
+      } else if (event instanceof InputDataInformationEvent) {
         dataInformationEventSeen = true;
-        RootInputDataInformationEvent diEvent = (RootInputDataInformationEvent) event;
+        InputDataInformationEvent diEvent = (InputDataInformationEvent) event;
         dataInformationEvents.add(diEvent);
         FileSplit fileSplit;
         try {
@@ -206,8 +204,8 @@ public class CustomPartitionVertex exten
       int totalResource = context.getTotalAvailableResource().getMemory();
       int taskResource = context.getVertexTaskResource().getMemory();
       float waves =
-          conf.getFloat(TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES,
-              TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES_DEFAULT);
+          conf.getFloat(TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_WAVES,
+              TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_WAVES_DEFAULT);
 
       int availableSlots = totalResource / taskResource;
 
@@ -250,12 +248,12 @@ public class CustomPartitionVertex exten
 
     // Construct the EdgeManager descriptor to be used by all edges which need
     // the routing table.
-    EdgeManagerDescriptor hiveEdgeManagerDesc =
-        new EdgeManagerDescriptor(CustomPartitionEdge.class.getName());
-    byte[] payload = getBytePayload(bucketToTaskMap);
+    EdgeManagerPluginDescriptor hiveEdgeManagerDesc =
+        EdgeManagerPluginDescriptor.create(CustomPartitionEdge.class.getName());
+    UserPayload payload = getBytePayload(bucketToTaskMap);
     hiveEdgeManagerDesc.setUserPayload(payload);
 
-    Map<String, EdgeManagerDescriptor> emMap = Maps.newHashMap();
+    Map<String, EdgeManagerPluginDescriptor> emMap = Maps.newHashMap();
 
     // Replace the edge manager for all vertices which have routing type custom.
     for (Entry<String, EdgeProperty> edgeEntry : context.getInputVertexEdgeProperties().entrySet()) {
@@ -268,52 +266,51 @@ public class CustomPartitionVertex exten
 
     LOG.info("Task count is " + taskCount);
 
-    List<RootInputDataInformationEvent> taskEvents =
+    List<InputDataInformationEvent> taskEvents =
         Lists.newArrayListWithCapacity(finalSplits.size());
     // Re-serialize the splits after grouping.
     int count = 0;
     for (InputSplit inputSplit : finalSplits) {
-      MRSplitProto serializedSplit = MRHelpers.createSplitProto(inputSplit);
-      RootInputDataInformationEvent diEvent =
-          new RootInputDataInformationEvent(count, serializedSplit.toByteArray());
+      MRSplitProto serializedSplit = MRInputHelpers.createSplitProto(inputSplit);
+      InputDataInformationEvent diEvent =
+          InputDataInformationEvent.create(count, serializedSplit.toByteString().asReadOnlyByteBuffer());
       diEvent.setTargetIndex(count);
       count++;
       taskEvents.add(diEvent);
     }
 
     // Replace the Edge Managers
-    Map<String, RootInputSpecUpdate> rootInputSpecUpdate =
-      new HashMap<String, RootInputSpecUpdate>();
+    Map<String, InputSpecUpdate> rootInputSpecUpdate =
+      new HashMap<String, InputSpecUpdate>();
     rootInputSpecUpdate.put(
         inputName,
-        RootInputSpecUpdate.getDefaultSinglePhysicalInputSpecUpdate());
+        InputSpecUpdate.getDefaultSinglePhysicalInputSpecUpdate());
     context.setVertexParallelism(
         taskCount,
-        new VertexLocationHint(grouper.createTaskLocationHints(finalSplits
+        VertexLocationHint.create(grouper.createTaskLocationHints(finalSplits
             .toArray(new InputSplit[finalSplits.size()]))), emMap, rootInputSpecUpdate);
 
     // Set the actual events for the tasks.
     context.addRootInputEvents(inputName, taskEvents);
   }
 
-  private byte[] getBytePayload(Multimap<Integer, Integer> routingTable) throws IOException {
+  UserPayload getBytePayload(Multimap<Integer, Integer> routingTable) throws IOException {
     CustomEdgeConfiguration edgeConf =
         new CustomEdgeConfiguration(routingTable.keySet().size(), routingTable);
     DataOutputBuffer dob = new DataOutputBuffer();
     edgeConf.write(dob);
     byte[] serialized = dob.getData();
-
-    return serialized;
+    return UserPayload.create(ByteBuffer.wrap(serialized));
   }
 
-  private FileSplit getFileSplitFromEvent(RootInputDataInformationEvent event) throws IOException {
+  private FileSplit getFileSplitFromEvent(InputDataInformationEvent event) throws IOException {
     InputSplit inputSplit = null;
     if (event.getDeserializedUserPayload() != null) {
       inputSplit = (InputSplit) event.getDeserializedUserPayload();
     } else {
-      MRSplitProto splitProto = MRSplitProto.parseFrom(event.getUserPayload());
+      MRSplitProto splitProto = MRSplitProto.parseFrom(ByteString.copyFrom(event.getUserPayload()));
       SerializationFactory serializationFactory = new SerializationFactory(new Configuration());
-      inputSplit = MRHelpers.createOldFormatSplitFromUserPayload(splitProto, serializationFactory);
+      inputSplit = MRInputHelpers.createOldFormatSplitFromUserPayload(splitProto, serializationFactory);
     }
 
     if (!(inputSplit instanceof FileSplit)) {

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1619261&r1=1619260&r2=1619261&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Wed Aug 20 23:15:09 2014
@@ -70,7 +70,6 @@ import org.apache.hadoop.io.DataOutputBu
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputFormat;
-import org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
@@ -79,43 +78,41 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Records;
-import org.apache.tez.client.PreWarmVertex;
+import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.DataSinkDescriptor;
 import org.apache.tez.dag.api.DataSourceDescriptor;
 import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.EdgeManagerDescriptor;
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.GroupInputEdge;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.InputInitializerDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.PreWarmVertex;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.VertexGroup;
-import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
 import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
-import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
-import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.input.MRInputLegacy;
 import org.apache.tez.mapreduce.output.MROutput;
 import org.apache.tez.mapreduce.partition.MRPartitioner;
-import org.apache.tez.runtime.api.TezRootInputInitializer;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
-import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfigurer;
-import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfigurer;
-import org.apache.tez.runtime.library.conf.UnorderedUnpartitionedKVEdgeConfigurer;
+import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig;
+import org.apache.tez.runtime.library.conf.UnorderedKVEdgeConfig;
+import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfig;
 import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValueInput;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
-import com.google.protobuf.ByteString;
 
 /**
  * DagUtils. DagUtils is a collection of helper methods to convert
@@ -236,9 +233,10 @@ public class DagUtils {
       mergeInputClass = ConcatenatedMergedKeyValueInput.class;
       int numBuckets = edgeProp.getNumBuckets();
       VertexManagerPluginDescriptor desc =
-          new VertexManagerPluginDescriptor(CustomPartitionVertex.class.getName());
-      byte[] userPayload = ByteBuffer.allocate(4).putInt(numBuckets).array();
-      desc.setUserPayload(userPayload);
+          VertexManagerPluginDescriptor.create(CustomPartitionVertex.class.getName());
+      ByteBuffer userPayload = ByteBuffer.allocate(4).putInt(numBuckets);
+      userPayload.flip();
+      desc.setUserPayload(UserPayload.create(userPayload));
       w.setVertexManagerPlugin(desc);
       break;
     }
@@ -256,8 +254,8 @@ public class DagUtils {
       break;
     }
 
-    return new GroupInputEdge(group, w, createEdgeProperty(edgeProp, vConf),
-        new InputDescriptor(mergeInputClass.getName()));
+    return GroupInputEdge.create(group, w, createEdgeProperty(edgeProp, vConf),
+        InputDescriptor.create(mergeInputClass.getName()));
   }
 
   /**
@@ -276,10 +274,11 @@ public class DagUtils {
     switch(edgeProp.getEdgeType()) {
     case CUSTOM_EDGE: {
       int numBuckets = edgeProp.getNumBuckets();
-      byte[] userPayload = ByteBuffer.allocate(4).putInt(numBuckets).array();
-      VertexManagerPluginDescriptor desc = new VertexManagerPluginDescriptor(
+      ByteBuffer userPayload = ByteBuffer.allocate(4).putInt(numBuckets);
+      userPayload.flip();
+      VertexManagerPluginDescriptor desc = VertexManagerPluginDescriptor.create(
           CustomPartitionVertex.class.getName());
-      desc.setUserPayload(userPayload);
+      desc.setUserPayload(UserPayload.create(userPayload));
       w.setVertexManagerPlugin(desc);
       break;
     }
@@ -291,7 +290,7 @@ public class DagUtils {
       // nothing
     }
 
-    return new Edge(v, w, createEdgeProperty(edgeProp, vConf));
+    return Edge.create(v, w, createEdgeProperty(edgeProp, vConf));
   }
 
   /*
@@ -300,7 +299,7 @@ public class DagUtils {
   @SuppressWarnings("rawtypes")
   private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp, Configuration conf)
       throws IOException {
-    MRHelpers.translateVertexConfToTez(conf);
+    MRHelpers.translateMRConfToTez(conf);
     String keyClass = conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS);
     String valClass = conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS);
     String partitionerClassName = conf.get("mapred.partitioner.class");
@@ -309,28 +308,28 @@ public class DagUtils {
     EdgeType edgeType = edgeProp.getEdgeType();
     switch (edgeType) {
     case BROADCAST_EDGE:
-      UnorderedUnpartitionedKVEdgeConfigurer et1Conf = UnorderedUnpartitionedKVEdgeConfigurer
+      UnorderedKVEdgeConfig et1Conf = UnorderedKVEdgeConfig
           .newBuilder(keyClass, valClass).setFromConfiguration(conf).build();
       return et1Conf.createDefaultBroadcastEdgeProperty();
     case CUSTOM_EDGE:
       assert partitionerClassName != null;
       partitionerConf = createPartitionerConf(partitionerClassName, conf);
-      UnorderedPartitionedKVEdgeConfigurer et2Conf = UnorderedPartitionedKVEdgeConfigurer
+      UnorderedPartitionedKVEdgeConfig et2Conf = UnorderedPartitionedKVEdgeConfig
           .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf)
           .setFromConfiguration(conf).build();
-      EdgeManagerDescriptor edgeDesc =
-          new EdgeManagerDescriptor(CustomPartitionEdge.class.getName());
+      EdgeManagerPluginDescriptor edgeDesc =
+          EdgeManagerPluginDescriptor.create(CustomPartitionEdge.class.getName());
       CustomEdgeConfiguration edgeConf =
           new CustomEdgeConfiguration(edgeProp.getNumBuckets(), null);
       DataOutputBuffer dob = new DataOutputBuffer();
       edgeConf.write(dob);
       byte[] userPayload = dob.getData();
-      edgeDesc.setUserPayload(userPayload);
+      edgeDesc.setUserPayload(UserPayload.create(ByteBuffer.wrap(userPayload)));
       return et2Conf.createDefaultCustomEdgeProperty(edgeDesc);
     case CUSTOM_SIMPLE_EDGE:
       assert partitionerClassName != null;
       partitionerConf = createPartitionerConf(partitionerClassName, conf);
-      UnorderedPartitionedKVEdgeConfigurer et3Conf = UnorderedPartitionedKVEdgeConfigurer
+      UnorderedPartitionedKVEdgeConfig et3Conf = UnorderedPartitionedKVEdgeConfig
           .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf)
           .setFromConfiguration(conf).build();
       return et3Conf.createDefaultEdgeProperty();
@@ -338,7 +337,7 @@ public class DagUtils {
     default:
       assert partitionerClassName != null;
       partitionerConf = createPartitionerConf(partitionerClassName, conf);
-      OrderedPartitionedKVEdgeConfigurer et4Conf = OrderedPartitionedKVEdgeConfigurer
+      OrderedPartitionedKVEdgeConfig et4Conf = OrderedPartitionedKVEdgeConfig
           .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf)
           .setFromConfiguration(conf).build();
       return et4Conf.createDefaultEdgeProperty();
@@ -384,7 +383,7 @@ public class DagUtils {
    */
   private Map<String, String> getContainerEnvironment(Configuration conf, boolean isMap) {
     Map<String, String> environment = new HashMap<String, String>();
-    MRHelpers.updateEnvironmentForMRTasks(conf, environment, isMap);
+    MRHelpers.updateEnvBasedOnMRTaskEnv(conf, environment, isMap);
     return environment;
   }
 
@@ -398,14 +397,14 @@ public class DagUtils {
     if (javaOpts != null && !javaOpts.isEmpty()) {
       String logLevel = HiveConf.getVar(conf, HiveConf.ConfVars.HIVETEZLOGLEVEL);
       List<String> logProps = Lists.newArrayList();
-      MRHelpers.addLog4jSystemProperties(logLevel, logProps);
+      TezUtils.addLog4jSystemProperties(logLevel, logProps);
       StringBuilder sb = new StringBuilder();
       for (String str : logProps) {
         sb.append(str).append(" ");
       }
       return javaOpts + " " + sb.toString();
     }
-    return MRHelpers.getMapJavaOpts(conf);
+    return MRHelpers.getJavaOptsForMRMapper(conf);
   }
 
   /*
@@ -427,11 +426,11 @@ public class DagUtils {
     Vertex map = null;
 
     // use tez to combine splits
-    boolean useTezGroupedSplits = false;
+    boolean groupSplitsInInputInitializer;
+    
+    DataSourceDescriptor dataSource;
 
     int numTasks = -1;
-    Class<? extends TezRootInputInitializer> amSplitGeneratorClass = null;
-    InputSplitInfo inputSplitInfo = null;
     Class inputFormatClass = conf.getClass("mapred.input.format.class",
         InputFormat.class);
 
@@ -445,9 +444,9 @@ public class DagUtils {
     }
 
     if (vertexHasCustomInput) {
-      useTezGroupedSplits = false;
-      // grouping happens in execution phase. Setting the class to TezGroupedSplitsInputFormat
-      // here would cause pre-mature grouping which would be incorrect.
+      groupSplitsInInputInitializer = false;
+      // grouping happens in execution phase. The input payload should not enable grouping here,
+      // it will be enabled in the CustomVertex.
       inputFormatClass = HiveInputFormat.class;
       conf.setClass("mapred.input.format.class", HiveInputFormat.class, InputFormat.class);
       // mapreduce.tez.input.initializer.serialize.event.payload should be set to false when using
@@ -457,50 +456,48 @@ public class DagUtils {
       // we'll set up tez to combine spits for us iff the input format
       // is HiveInputFormat
       if (inputFormatClass == HiveInputFormat.class) {
-        useTezGroupedSplits = true;
-        conf.setClass("mapred.input.format.class", HiveInputFormat.class, InputFormat.class);
+        groupSplitsInInputInitializer = true;
+      } else {
+        groupSplitsInInputInitializer = false;
       }
     }
 
+    // set up the operator plan. Before setting up Inputs since the config is updated.
+    Utilities.setMapWork(conf, mapWork, mrScratchDir, false);
+    
     if (HiveConf.getBoolVar(conf, ConfVars.HIVE_AM_SPLIT_GENERATION)
         && !mapWork.isUseOneNullRowInputFormat()) {
       // if we're generating the splits in the AM, we just need to set
       // the correct plugin.
-      if (useTezGroupedSplits) {
-        amSplitGeneratorClass = HiveSplitGenerator.class;
+      if (groupSplitsInInputInitializer) {
+        // Not setting a payload, since the MRInput payload is the same and can be accessed.
+        InputInitializerDescriptor descriptor = InputInitializerDescriptor.create(
+            HiveSplitGenerator.class.getName());
+        dataSource = MRInputLegacy.createConfigBuilder(conf, inputFormatClass).groupSplits(true)
+            .setCustomInitializerDescriptor(descriptor).build();
       } else {
-        amSplitGeneratorClass = MRInputAMSplitGenerator.class;
+        // Not HiveInputFormat, or a custom VertexManager will take care of grouping splits
+        dataSource = MRInputLegacy.createConfigBuilder(conf, inputFormatClass).groupSplits(false).build();
       }
     } else {
-      // client side split generation means we have to compute them now
-      inputSplitInfo = MRHelpers.generateInputSplits(conf,
-          new Path(tezDir, "split_"+mapWork.getName().replaceAll(" ", "_")));
-      numTasks = inputSplitInfo.getNumTasks();
+      // Setup client side split generation.
+      dataSource = MRInputHelpers.configureMRInputWithLegacySplitGeneration(conf, new Path(tezDir,
+          "split_" + mapWork.getName().replaceAll(" ", "_")), true);
+      numTasks = dataSource.getNumberOfShards();
     }
 
-    // set up the operator plan
-    Utilities.setMapWork(conf, mapWork, mrScratchDir, false);
-
-    byte[] serializedConf = MRHelpers.createUserPayloadFromConf(conf);
-    map = new Vertex(mapWork.getName(),
-        new ProcessorDescriptor(MapTezProcessor.class.getName()).
+    UserPayload serializedConf = TezUtils.createUserPayloadFromConf(conf);
+    map = Vertex.create(mapWork.getName(),
+        ProcessorDescriptor.create(MapTezProcessor.class.getName()).
         setUserPayload(serializedConf), numTasks, getContainerResource(conf));
     map.setTaskEnvironment(getContainerEnvironment(conf, true));
     map.setTaskLaunchCmdOpts(getContainerJavaOpts(conf));
 
     assert mapWork.getAliasToWork().keySet().size() == 1;
 
+    // Add the actual source input
     String alias = mapWork.getAliasToWork().keySet().iterator().next();
-
-    byte[] mrInput = null;
-    if (useTezGroupedSplits) {
-      mrInput = MRHelpers.createMRInputPayloadWithGrouping(serializedConf);
-    } else {
-      mrInput = MRHelpers.createMRInputPayload(serializedConf);
-    }
-    map.addDataSource(alias,
-        new DataSourceDescriptor(new InputDescriptor(MRInputLegacy.class.getName()).
-        setUserPayload(mrInput), new InputInitializerDescriptor(amSplitGeneratorClass.getName()).setUserPayload(mrInput),null));
+    map.addDataSource(alias, dataSource);
 
     Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
     localResources.put(getBaseName(appJarLr), appJarLr);
@@ -508,13 +505,6 @@ public class DagUtils {
       localResources.put(getBaseName(lr), lr);
     }
 
-    if (inputSplitInfo != null) {
-      // only relevant for client-side split generation
-      map.setLocationHint(new VertexLocationHint(inputSplitInfo.getTaskLocationHints()));
-      MRHelpers.updateLocalResourcesForInputSplits(FileSystem.get(conf), inputSplitInfo,
-          localResources);
-    }
-
     map.setTaskLocalFiles(localResources);
     return map;
   }
@@ -550,9 +540,9 @@ public class DagUtils {
     Utilities.createTmpDirs(conf, reduceWork);
 
     // create the vertex
-    Vertex reducer = new Vertex(reduceWork.getName(),
-        new ProcessorDescriptor(ReduceTezProcessor.class.getName()).
-        setUserPayload(MRHelpers.createUserPayloadFromConf(conf)),
+    Vertex reducer = Vertex.create(reduceWork.getName(),
+        ProcessorDescriptor.create(ReduceTezProcessor.class.getName()).
+        setUserPayload(TezUtils.createUserPayloadFromConf(conf)),
             reduceWork.isAutoReduceParallelism() ? reduceWork.getMaxReduceTasks() : reduceWork
                 .getNumReduceTasks(), getContainerResource(conf));
 
@@ -606,10 +596,10 @@ public class DagUtils {
       int numContainers, Map<String, LocalResource> localResources) throws
       IOException, TezException {
 
-    ProcessorDescriptor prewarmProcDescriptor = new ProcessorDescriptor(HivePreWarmProcessor.class.getName());
-    prewarmProcDescriptor.setUserPayload(MRHelpers.createUserPayloadFromConf(conf));
+    ProcessorDescriptor prewarmProcDescriptor = ProcessorDescriptor.create(HivePreWarmProcessor.class.getName());
+    prewarmProcDescriptor.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
 
-    PreWarmVertex prewarmVertex = new PreWarmVertex("prewarm", prewarmProcDescriptor, numContainers,getContainerResource(conf));
+    PreWarmVertex prewarmVertex = PreWarmVertex.create("prewarm", prewarmProcDescriptor, numContainers,getContainerResource(conf));
 
     Map<String, LocalResource> combinedResources = new HashMap<String, LocalResource>();
 
@@ -855,7 +845,7 @@ public class DagUtils {
   public JobConf createConfiguration(HiveConf hiveConf) throws IOException {
     hiveConf.setBoolean("mapred.mapper.new-api", false);
 
-    JobConf conf = (JobConf) MRHelpers.getBaseMRConfiguration(hiveConf);
+    JobConf conf = new JobConf(hiveConf);
 
     conf.set("mapred.output.committer.class", NullOutputCommitter.class.getName());
 
@@ -946,8 +936,8 @@ public class DagUtils {
     // final vertices need to have at least one output
     if (!hasChildren) {
       v.addDataSink("out_"+work.getName(), new DataSinkDescriptor(
-          new OutputDescriptor(MROutput.class.getName())
-          .setUserPayload(MRHelpers.createUserPayloadFromConf(conf)), null, null));
+          OutputDescriptor.create(MROutput.class.getName())
+          .setUserPayload(TezUtils.createUserPayloadFromConf(conf)), null, null));
     }
 
     return v;
@@ -1015,16 +1005,16 @@ public class DagUtils {
     if (edgeProp.isAutoReduce()) {
       Configuration pluginConf = new Configuration(false);
       VertexManagerPluginDescriptor desc =
-          new VertexManagerPluginDescriptor(ShuffleVertexManager.class.getName());
+          VertexManagerPluginDescriptor.create(ShuffleVertexManager.class.getName());
       pluginConf.setBoolean(
-          ShuffleVertexManager.TEZ_AM_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, true);
-      pluginConf.setInt(ShuffleVertexManager.TEZ_AM_SHUFFLE_VERTEX_MANAGER_MIN_TASK_PARALLELISM,
+          ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, true);
+      pluginConf.setInt(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_TASK_PARALLELISM,
           edgeProp.getMinReducer());
       pluginConf.setLong(
-          ShuffleVertexManager.TEZ_AM_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
+          ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
           edgeProp.getInputSizePerReducer());
-      ByteString payload = MRHelpers.createByteStringFromConf(pluginConf);
-      desc.setUserPayload(payload.toByteArray());
+      UserPayload payload = TezUtils.createUserPayloadFromConf(pluginConf);
+      desc.setUserPayload(payload);
       v.setVertexManagerPlugin(desc);
     }
   }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HivePreWarmProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HivePreWarmProcessor.java?rev=1619261&r1=1619260&r2=1619261&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HivePreWarmProcessor.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HivePreWarmProcessor.java Wed Aug 20 23:15:09 2014
@@ -25,16 +25,15 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.ReadaheadPool;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
 import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.LogicalIOProcessor;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
-import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.api.ProcessorContext;
 
 import java.net.URL;
 import java.net.JarURLConnection;
-import java.util.ArrayList;
 import java.util.Enumeration;
 import java.util.List;
 import java.util.Map;
@@ -57,14 +56,13 @@ public class HivePreWarmProcessor extend
 
   private Configuration conf;
 
-  public HivePreWarmProcessor(TezProcessorContext context) {
+  public HivePreWarmProcessor(ProcessorContext context) {
     super(context);
   }
 
   @Override
   public void initialize() throws Exception {
-    TezProcessorContext processorContext = getContext();
-    byte[] userPayload = processorContext.getUserPayload();
+    UserPayload userPayload = getContext().getUserPayload();
     this.conf = TezUtils.createConfFromUserPayload(userPayload);
   }
 

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java?rev=1619261&r1=1619260&r2=1619261&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java Wed Aug 20 23:15:09 2014
@@ -35,22 +35,23 @@ import org.apache.hadoop.mapred.FileSpli
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.split.TezMapReduceSplitsGrouper;
 import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
 import org.apache.tez.mapreduce.hadoop.InputSplitInfoMem;
-import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto;
 import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.RootInputSpecUpdate;
-import org.apache.tez.runtime.api.events.RootInputConfigureVertexTasksEvent;
-import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
-import org.apache.tez.runtime.api.events.RootInputInitializerEvent;
-import org.apache.tez.runtime.api.RootInputSpecUpdate;
-import org.apache.tez.runtime.api.TezRootInputInitializer;
-import org.apache.tez.runtime.api.TezRootInputInitializerContext;
+import org.apache.tez.runtime.api.events.InputConfigureVertexTasksEvent;
+import org.apache.tez.runtime.api.events.InputDataInformationEvent;
+import org.apache.tez.runtime.api.events.InputInitializerEvent;
+import org.apache.tez.runtime.api.InputInitializer;
+import org.apache.tez.runtime.api.InputInitializerContext;
+import org.apache.tez.runtime.api.InputSpecUpdate;
 
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Lists;
@@ -62,25 +63,25 @@ import com.google.common.collect.Multima
  * making sure that splits from different partitions are only grouped if they
  * are of the same schema, format and serde
  */
-public class HiveSplitGenerator extends TezRootInputInitializer {
+public class HiveSplitGenerator extends InputInitializer {
 
   private static final Log LOG = LogFactory.getLog(HiveSplitGenerator.class);
 
   private static final SplitGrouper grouper = new SplitGrouper();
 
-  public HiveSplitGenerator(TezRootInputInitializerContext initializerContext) {
+  public HiveSplitGenerator(InputInitializerContext initializerContext) {
     super(initializerContext);
   }
 
   @Override
   public List<Event> initialize() throws Exception {
-    TezRootInputInitializerContext rootInputContext = getContext();
+    InputInitializerContext rootInputContext = getContext();
 
     MRInputUserPayloadProto userPayloadProto =
-        MRHelpers.parseMRInputPayload(rootInputContext.getUserPayload());
+        MRInputHelpers.parseMRInputPayload(rootInputContext.getInputUserPayload());
 
     Configuration conf =
-        MRHelpers.createConfFromByteString(userPayloadProto.getConfigurationBytes());
+        TezUtils.createConfFromByteString(userPayloadProto.getConfigurationBytes());
 
     boolean sendSerializedEvents =
         conf.getBoolean("mapreduce.tez.input.initializer.serialize.event.payload", true);
@@ -91,7 +92,8 @@ public class HiveSplitGenerator extends 
 
     InputSplitInfoMem inputSplitInfo = null;
     String realInputFormatName = conf.get("mapred.input.format.class");
-    if (realInputFormatName != null && !realInputFormatName.isEmpty()) {
+    boolean groupingEnabled = userPayloadProto.getGroupingEnabled();
+    if (groupingEnabled) {
       // Need to instantiate the realInputFormat
       InputFormat<?, ?> inputFormat =
           (InputFormat<?, ?>) ReflectionUtils.newInstance(Class.forName(realInputFormatName),
@@ -103,8 +105,8 @@ public class HiveSplitGenerator extends 
 
       // Create the un-grouped splits
       float waves =
-          conf.getFloat(TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES,
-              TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES_DEFAULT);
+          conf.getFloat(TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_WAVES,
+              TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_WAVES_DEFAULT);
 
       InputSplit[] splits = inputFormat.getSplits(jobConf, (int) (availableSlots * waves));
       LOG.info("Number of input splits: " + splits.length + ". " + availableSlots
@@ -124,7 +126,13 @@ public class HiveSplitGenerator extends 
           new InputSplitInfoMem(flatSplits, locationHints, flatSplits.length, null, jobConf);
     } else {
       // no need for grouping and the target #of tasks.
-      inputSplitInfo = MRHelpers.generateInputSplitsToMem(jobConf, false, 0);
+      // This code path should never be triggered at the moment. If grouping is disabled,
+      // DAGUtils uses MRInputAMSplitGenerator.
+      // If this is used in the future - make sure to disable grouping in the payload, if it isn't already disabled
+      throw new RuntimeException(
+          "HiveInputFormat does not support non-grouped splits, InputFormatName is: "
+              + realInputFormatName);
+      // inputSplitInfo = MRInputHelpers.generateInputSplitsToMem(jobConf, false, 0);
     }
 
     return createEventList(sendSerializedEvents, inputSplitInfo);
@@ -181,31 +189,32 @@ public class HiveSplitGenerator extends 
     return groupedSplits;
   }
 
-  public void handleInputInitializerEvent(List<RootInputInitializerEvent> events) throws Exception {
+  @Override
+  public void handleInputInitializerEvent(List<InputInitializerEvent> events) throws Exception {
   }
 
   private List<Event> createEventList(boolean sendSerializedEvents, InputSplitInfoMem inputSplitInfo) {
 
     List<Event> events = Lists.newArrayListWithCapacity(inputSplitInfo.getNumTasks() + 1);
 
-    RootInputConfigureVertexTasksEvent configureVertexEvent = new
-        RootInputConfigureVertexTasksEvent(inputSplitInfo.getNumTasks(),
-        inputSplitInfo.getTaskLocationHints(),
-        RootInputSpecUpdate.getDefaultSinglePhysicalInputSpecUpdate());
+    InputConfigureVertexTasksEvent configureVertexEvent =
+        InputConfigureVertexTasksEvent.create(inputSplitInfo.getNumTasks(),
+        VertexLocationHint.create(inputSplitInfo.getTaskLocationHints()),
+        InputSpecUpdate.getDefaultSinglePhysicalInputSpecUpdate());
     events.add(configureVertexEvent);
 
     if (sendSerializedEvents) {
       MRSplitsProto splitsProto = inputSplitInfo.getSplitsProto();
       int count = 0;
       for (MRSplitProto mrSplit : splitsProto.getSplitsList()) {
-        RootInputDataInformationEvent diEvent =
-            new RootInputDataInformationEvent(count++, mrSplit.toByteArray());
+        InputDataInformationEvent diEvent =
+            InputDataInformationEvent.create(count++, mrSplit.toByteString().asReadOnlyByteBuffer());
         events.add(diEvent);
       }
     } else {
       int count = 0;
       for (org.apache.hadoop.mapred.InputSplit split : inputSplitInfo.getOldFormatSplits()) {
-        RootInputDataInformationEvent diEvent = new RootInputDataInformationEvent(count++, split);
+        InputDataInformationEvent diEvent = InputDataInformationEvent.create(count++, split);
         events.add(diEvent);
       }
     }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java?rev=1619261&r1=1619260&r2=1619261&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java Wed Aug 20 23:15:09 2014
@@ -47,7 +47,7 @@ import org.apache.tez.mapreduce.input.MR
 import org.apache.tez.mapreduce.processor.MRTaskReporter;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
-import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.api.ProcessorContext;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 
 /**
@@ -65,7 +65,7 @@ public class MapRecordProcessor extends 
   private MapWork mapWork;
 
   @Override
-  void init(JobConf jconf, TezProcessorContext processorContext, MRTaskReporter mrReporter,
+  void init(JobConf jconf, ProcessorContext processorContext, MRTaskReporter mrReporter,
       Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception {
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS);
     super.init(jconf, processorContext, mrReporter, inputs, outputs);

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapTezProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapTezProcessor.java?rev=1619261&r1=1619260&r2=1619261&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapTezProcessor.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapTezProcessor.java Wed Aug 20 23:15:09 2014
@@ -17,14 +17,14 @@
  */
 package org.apache.hadoop.hive.ql.exec.tez;
 
-import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.api.ProcessorContext;
 
 /**
  * Subclass that is used to indicate if this is a map or reduce process
  */
 public class MapTezProcessor extends TezProcessor {
 
-  public MapTezProcessor(TezProcessorContext context) {
+  public MapTezProcessor(ProcessorContext context) {
     super(context);
     this.isMap = true;
   }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java?rev=1619261&r1=1619260&r2=1619261&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java Wed Aug 20 23:15:09 2014
@@ -20,7 +20,7 @@ package org.apache.hadoop.hive.ql.exec.t
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.tez.runtime.common.objectregistry.ObjectRegistry;
+import org.apache.tez.runtime.api.ObjectRegistry;
 import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl;
 
 /**
@@ -30,6 +30,7 @@ import org.apache.tez.runtime.common.obj
 public class ObjectCache implements org.apache.hadoop.hive.ql.exec.ObjectCache {
 
   private static final Log LOG = LogFactory.getLog(ObjectCache.class.getName());
+  // TODO HIVE-7809. This is broken. A new instance of ObjectRegistry should not be created. 
   private final ObjectRegistry registry = new ObjectRegistryImpl();
 
   @Override

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java?rev=1619261&r1=1619260&r2=1619261&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java Wed Aug 20 23:15:09 2014
@@ -32,7 +32,7 @@ import org.apache.hadoop.mapred.OutputCo
 import org.apache.tez.mapreduce.processor.MRTaskReporter;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
-import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.api.ProcessorContext;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
@@ -47,7 +47,7 @@ public abstract class RecordProcessor  {
   protected Map<String, LogicalInput> inputs;
   protected Map<String, LogicalOutput> outputs;
   protected Map<String, OutputCollector> outMap;
-  protected TezProcessorContext processorContext;
+  protected ProcessorContext processorContext;
 
   public static final Log l4j = LogFactory.getLog(RecordProcessor.class);
 
@@ -72,7 +72,7 @@ public abstract class RecordProcessor  {
    * @param outputs map of Output names to {@link LogicalOutput}s
    * @throws Exception
    */
-  void init(JobConf jconf, TezProcessorContext processorContext, MRTaskReporter mrReporter,
+  void init(JobConf jconf, ProcessorContext processorContext, MRTaskReporter mrReporter,
       Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception {
     this.jconf = jconf;
     this.reporter = mrReporter;

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java?rev=1619261&r1=1619260&r2=1619261&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java Wed Aug 20 23:15:09 2014
@@ -20,7 +20,6 @@ package org.apache.hadoop.hive.ql.exec.t
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -59,14 +58,13 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.tez.mapreduce.processor.MRTaskReporter;
 import org.apache.tez.runtime.api.Input;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
-import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.api.ProcessorContext;
 import org.apache.tez.runtime.library.api.KeyValuesReader;
 
 /**
@@ -113,7 +111,7 @@ public class ReduceRecordProcessor  exte
   private List<VectorExpressionWriter>[] valueStringWriters;
 
   @Override
-  void init(JobConf jconf, TezProcessorContext processorContext, MRTaskReporter mrReporter,
+  void init(JobConf jconf, ProcessorContext processorContext, MRTaskReporter mrReporter,
       Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception {
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS);
     super.init(jconf, processorContext, mrReporter, inputs, outputs);

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceTezProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceTezProcessor.java?rev=1619261&r1=1619260&r2=1619261&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceTezProcessor.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceTezProcessor.java Wed Aug 20 23:15:09 2014
@@ -17,14 +17,14 @@
  */
 package org.apache.hadoop.hive.ql.exec.tez;
 
-import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.api.ProcessorContext;
 
 /**
  * Subclass that is used to indicate if this is a map or reduce process
  */
 public class ReduceTezProcessor extends TezProcessor {
 
-  public ReduceTezProcessor(TezProcessorContext context) {
+  public ReduceTezProcessor(ProcessorContext context) {
     super(context);
     this.isMap = false;
   }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java?rev=1619261&r1=1619260&r2=1619261&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java Wed Aug 20 23:15:09 2014
@@ -142,7 +142,7 @@ public class TezJobMonitor {
             if (!running) {
               perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_SUBMIT_TO_RUNNING);
               console.printInfo("Status: Running (application id: "
-                +dagClient.getApplicationId()+")\n");
+                +dagClient.getExecutionContext()+")\n");
               for (String s: progressMap.keySet()) {
                 perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
               }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java?rev=1619261&r1=1619260&r2=1619261&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java Wed Aug 20 23:15:09 2014
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hive.ql.exec.tez;
 import java.io.IOException;
 import java.text.NumberFormat;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -35,10 +34,9 @@ import org.apache.tez.mapreduce.input.MR
 import org.apache.tez.mapreduce.processor.MRTaskReporter;
 import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
 import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.LogicalIOProcessor;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
-import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.api.ProcessorContext;
 import org.apache.tez.runtime.library.api.KeyValueWriter;
 
 /**
@@ -59,8 +57,6 @@ public class TezProcessor extends Abstra
   private static final String CLASS_NAME = TezProcessor.class.getName();
   private final PerfLogger perfLogger = PerfLogger.getPerfLogger();
 
-  private TezProcessorContext processorContext;
-
   protected static final NumberFormat taskIdFormat = NumberFormat.getInstance();
   protected static final NumberFormat jobIdFormat = NumberFormat.getInstance();
   static {
@@ -70,7 +66,7 @@ public class TezProcessor extends Abstra
     jobIdFormat.setMinimumIntegerDigits(4);
   }
 
-  public TezProcessor(TezProcessorContext context) {
+  public TezProcessor(ProcessorContext context) {
     super(context);
   }
 
@@ -88,18 +84,14 @@ public class TezProcessor extends Abstra
 
   @Override
   public void initialize() throws IOException {
-    TezProcessorContext processorContext = getContext();
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INITIALIZE_PROCESSOR);
-    this.processorContext = processorContext;
-    //get the jobconf
-    byte[] userPayload = processorContext.getUserPayload();
-    Configuration conf = TezUtils.createConfFromUserPayload(userPayload);
+    Configuration conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload());
     this.jobConf = new JobConf(conf);
-    setupMRLegacyConfigs(processorContext);
+    setupMRLegacyConfigs(getContext());
     perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_INITIALIZE_PROCESSOR);
   }
 
-  private void setupMRLegacyConfigs(TezProcessorContext processorContext) {
+  private void setupMRLegacyConfigs(ProcessorContext processorContext) {
     // Hive "insert overwrite local directory" uses task id as dir name
     // Setting the id in jobconf helps to have the similar dir name as MR
     StringBuilder taskAttemptIdBuilder = new StringBuilder("task");
@@ -134,7 +126,7 @@ public class TezProcessor extends Abstra
       // in case of broadcast-join read the broadcast edge inputs
       // (possibly asynchronously)
 
-      LOG.info("Running task: " + processorContext.getUniqueIdentifier());
+      LOG.info("Running task: " + getContext().getUniqueIdentifier());
 
       if (isMap) {
         rproc = new MapRecordProcessor();
@@ -161,8 +153,8 @@ public class TezProcessor extends Abstra
 
       // Outputs will be started later by the individual Processors.
 
-      MRTaskReporter mrReporter = new MRTaskReporter(processorContext);
-      rproc.init(jobConf, processorContext, mrReporter, inputs, outputs);
+      MRTaskReporter mrReporter = new MRTaskReporter(getContext());
+      rproc.init(jobConf, getContext(), mrReporter, inputs, outputs);
       rproc.run();
 
       //done - output does not need to be committed as hive does not use outputcommitter

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java?rev=1619261&r1=1619260&r2=1619261&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java Wed Aug 20 23:15:09 2014
@@ -41,21 +41,17 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.tez.client.TezClient;
-import org.apache.tez.client.PreWarmVertex;
+import org.apache.tez.dag.api.PreWarmVertex;
 import org.apache.tez.dag.api.SessionNotRunning;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 
 /**
@@ -165,7 +161,7 @@ public class TezSessionState {
 
     // Create environment for AM.
     Map<String, String> amEnv = new HashMap<String, String>();
-    MRHelpers.updateEnvironmentForMRAM(conf, amEnv);
+    MRHelpers.updateEnvBasedOnMRAMEnv(conf, amEnv);
 
     // and finally we're ready to create and start the session
     // generate basic tez config
@@ -180,7 +176,7 @@ public class TezSessionState {
       tezConfig.setInt(TezConfiguration.TEZ_AM_SESSION_MIN_HELD_CONTAINERS, n);
     }
 
-    session = new TezClient("HIVE-" + sessionId, tezConfig, true,
+    session = TezClient.create("HIVE-" + sessionId, tezConfig, true,
         commonLocalResources, null);
 
     LOG.info("Opening new Tez Session (id: " + sessionId
@@ -196,9 +192,13 @@ public class TezSessionState {
           commonLocalResources);
       try {
         session.preWarm(prewarmVertex);
-      } catch (InterruptedException ie) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Hive Prewarm threw an exception ", ie);
+      } catch (IOException ie) {
+        if (ie.getMessage().contains("Interrupted while waiting")) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Hive Prewarm threw an exception ", ie);
+          }
+        } else {
+          throw ie;
         }
       }
     }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java?rev=1619261&r1=1619260&r2=1619261&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java Wed Aug 20 23:15:09 2014
@@ -22,9 +22,9 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.tez.runtime.api.Input;
+import org.apache.tez.runtime.api.MergedInputContext;
 import org.apache.tez.runtime.api.MergedLogicalInput;
 import org.apache.tez.runtime.api.Reader;
-import org.apache.tez.runtime.api.TezMergedInputContext;
 
 /**
  * TezMergedLogicalInput is an adapter to make union input look like
@@ -34,7 +34,7 @@ public class TezMergedLogicalInput exten
 
   private Map<Input, Boolean> readyInputs = new IdentityHashMap<Input, Boolean>();
 
-  public TezMergedLogicalInput(TezMergedInputContext context, List<Input> inputs) {
+  public TezMergedLogicalInput(MergedInputContext context, List<Input> inputs) {
     super(context, inputs);
   }
  

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java?rev=1619261&r1=1619260&r2=1619261&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java Wed Aug 20 23:15:09 2014
@@ -14,7 +14,7 @@ import java.util.regex.Pattern;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hive.common.FileUtils;
-import org.apache.hadoop.hive.conf.HiveConf;;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaHook;
 import org.apache.hadoop.hive.metastore.HiveMetaHookLoader;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;

Modified: hive/branches/tez/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java?rev=1619261&r1=1619260&r2=1619261&view=diff
==============================================================================
--- hive/branches/tez/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java (original)
+++ hive/branches/tez/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java Wed Aug 20 23:15:09 2014
@@ -33,7 +33,6 @@ import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -97,7 +96,7 @@ public class TestTezTask {
           @Override
           public Vertex answer(InvocationOnMock invocation) throws Throwable {
             Object[] args = invocation.getArguments();
-            return new Vertex(((BaseWork)args[1]).getName(),
+            return Vertex.create(((BaseWork)args[1]).getName(),
                 mock(ProcessorDescriptor.class), 0, mock(Resource.class));
           }
         });
@@ -108,7 +107,7 @@ public class TestTezTask {
           @Override
           public Edge answer(InvocationOnMock invocation) throws Throwable {
             Object[] args = invocation.getArguments();
-            return new Edge((Vertex)args[1], (Vertex)args[2], mock(EdgeProperty.class));
+            return Edge.create((Vertex)args[1], (Vertex)args[2], mock(EdgeProperty.class));
           }
         });