You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/09/08 06:38:26 UTC

svn commit: r1623263 [17/28] - in /hive/branches/spark: ./ accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/ ant/src/org/apache/hadoop/hive/ant/ beeline/src/java/org/apache/hive/beeline/ beeline/src/test/org/apache/hive/beeline/ bin/...

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java Mon Sep  8 04:38:17 2014
@@ -19,61 +19,62 @@
 package org.apache.hadoop.hive.ql.exec.tez;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.List;
-import java.util.ArrayList;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.tez.dag.api.EdgeManager;
-import org.apache.tez.dag.api.EdgeManagerContext;
+import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.tez.dag.api.EdgeManagerPlugin;
+import org.apache.tez.dag.api.EdgeManagerPluginContext;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.InputReadErrorEvent;
 
-import com.google.common.collect.Multimap;
-
-public class CustomPartitionEdge implements EdgeManager {
+public class CustomPartitionEdge extends EdgeManagerPlugin {
 
   private static final Log LOG = LogFactory.getLog(CustomPartitionEdge.class.getName());
 
   CustomEdgeConfiguration conf = null;
+  final EdgeManagerPluginContext context;
 
   // used by the framework at runtime. initialize is the real initializer at runtime
-  public CustomPartitionEdge() {  
+  public CustomPartitionEdge(EdgeManagerPluginContext context) {
+    super(context);
+    this.context = context;
   }
 
+
   @Override
-  public int getNumDestinationTaskPhysicalInputs(int numSourceTasks, 
-      int destinationTaskIndex) {
-    return numSourceTasks;
+  public int getNumDestinationTaskPhysicalInputs(int destinationTaskIndex) {
+    return context.getSourceVertexNumTasks();
   }
 
   @Override
-  public int getNumSourceTaskPhysicalOutputs(int numDestinationTasks, 
-      int sourceTaskIndex) {
+  public int getNumSourceTaskPhysicalOutputs(int sourceTaskIndex) {
     return conf.getNumBuckets();
   }
 
   @Override
-  public int getNumDestinationConsumerTasks(int sourceTaskIndex, int numDestinationTasks) {
-    return numDestinationTasks;
+  public int getNumDestinationConsumerTasks(int sourceTaskIndex) {
+    return context.getDestinationVertexNumTasks();
   }
 
   // called at runtime to initialize the custom edge.
   @Override
-  public void initialize(EdgeManagerContext context) {
-    byte[] payload = context.getUserPayload();
+  public void initialize() {
+    ByteBuffer payload = context.getUserPayload().getPayload();
     LOG.info("Initializing the edge, payload: " + payload);
     if (payload == null) {
       throw new RuntimeException("Invalid payload");
     }
     // De-serialization code
-    DataInputBuffer dib = new DataInputBuffer();
-    dib.reset(payload, payload.length);
+    DataInputByteBuffer dibb = new DataInputByteBuffer();
+    dibb.reset(payload);
     conf = new CustomEdgeConfiguration();
     try {
-      conf.readFields(dib);
+      conf.readFields(dibb);
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
@@ -83,30 +84,25 @@ public class CustomPartitionEdge impleme
 
   @Override
   public void routeDataMovementEventToDestination(DataMovementEvent event,
-      int sourceTaskIndex, int numDestinationTasks, Map<Integer, List<Integer>> mapDestTaskIndices) {
-    int srcIndex = event.getSourceIndex();
-    List<Integer> destTaskIndices = new ArrayList<Integer>();
-    destTaskIndices.addAll(conf.getRoutingTable().get(srcIndex));
-    mapDestTaskIndices.put(new Integer(sourceTaskIndex), destTaskIndices);
+      int sourceTaskIndex, int sourceOutputIndex, Map<Integer, List<Integer>> mapDestTaskIndices) {
+    List<Integer> outputIndices = Collections.singletonList(sourceTaskIndex);
+    for (Integer destIndex : conf.getRoutingTable().get(sourceOutputIndex)) {
+      mapDestTaskIndices.put(destIndex, outputIndices);
+    }
   }
 
   @Override
-  public void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex, 
-      int numDestinationTasks, Map<Integer, List<Integer>> mapDestTaskIndices) {
-    List<Integer> destTaskIndices = new ArrayList<Integer>();
-    addAllDestinationTaskIndices(numDestinationTasks, destTaskIndices);
-    mapDestTaskIndices.put(new Integer(sourceTaskIndex), destTaskIndices);
+  public void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex,
+      Map<Integer, List<Integer>> mapDestTaskIndices) {
+    List<Integer> outputIndices = Collections.singletonList(sourceTaskIndex);
+    for (int i = 0; i < context.getDestinationVertexNumTasks(); i++) {
+      mapDestTaskIndices.put(i, outputIndices);
+    }
   }
 
   @Override
-  public int routeInputErrorEventToSource(InputReadErrorEvent event, 
-      int destinationTaskIndex) {
+  public int routeInputErrorEventToSource(InputReadErrorEvent event,
+      int destinationTaskIndex, int destinationFailedInputIndex) {
     return event.getIndex();
   }
-
-  void addAllDestinationTaskIndices(int numDestinationTasks, List<Integer> taskIndices) {
-    for(int i=0; i<numDestinationTasks; ++i) {
-      taskIndices.add(new Integer(i));
-    }
-  }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java Mon Sep  8 04:38:17 2014
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -30,27 +31,30 @@ import java.util.TreeMap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat;
-import org.apache.tez.dag.api.EdgeManagerDescriptor;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.split.TezMapReduceSplitsGrouper;
+import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
 import org.apache.tez.dag.api.InputDescriptor;
-import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.api.VertexManagerPlugin;
 import org.apache.tez.dag.api.VertexManagerPluginContext;
-import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
 import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.events.RootInputConfigureVertexTasksEvent;
-import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
-import org.apache.tez.runtime.api.events.RootInputUpdatePayloadEvent;
+import org.apache.tez.runtime.api.InputSpecUpdate;
+import org.apache.tez.runtime.api.events.InputConfigureVertexTasksEvent;
+import org.apache.tez.runtime.api.events.InputDataInformationEvent;
+import org.apache.tez.runtime.api.events.InputUpdatePayloadEvent;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
 
 import com.google.common.base.Preconditions;
@@ -59,40 +63,44 @@ import com.google.common.collect.HashMul
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
+import com.google.protobuf.ByteString;
 
 /*
  * Only works with old mapred API
  * Will only work with a single MRInput for now.
  */
-public class CustomPartitionVertex implements VertexManagerPlugin {
+public class CustomPartitionVertex extends VertexManagerPlugin {
 
   private static final Log LOG = LogFactory.getLog(CustomPartitionVertex.class.getName());
 
   VertexManagerPluginContext context;
 
-  private RootInputConfigureVertexTasksEvent configureVertexTaskEvent;
-  private List<RootInputDataInformationEvent> dataInformationEvents;
+  private InputConfigureVertexTasksEvent configureVertexTaskEvent;
+  private List<InputDataInformationEvent> dataInformationEvents;
   private int numBuckets = -1;
   private Configuration conf = null;
   private boolean rootVertexInitialized = false;
   private final SplitGrouper grouper = new SplitGrouper();
+  private int taskCount = 0;
 
-  public CustomPartitionVertex() {
+  public CustomPartitionVertex(VertexManagerPluginContext context) {
+    super(context);
   }
 
   @Override
-  public void initialize(VertexManagerPluginContext context) {
-    this.context = context;
-    ByteBuffer byteBuf = ByteBuffer.wrap(context.getUserPayload());
+  public void initialize() {
+    this.context = getContext();
+    ByteBuffer byteBuf = context.getUserPayload().getPayload();
     this.numBuckets = byteBuf.getInt();
   }
 
   @Override
   public void onVertexStarted(Map<String, List<Integer>> completions) {
     int numTasks = context.getVertexNumTasks(context.getVertexName());
-    List<Integer> scheduledTasks = new ArrayList<Integer>(numTasks);
+    List<VertexManagerPluginContext.TaskWithLocationHint> scheduledTasks = 
+      new ArrayList<VertexManagerPluginContext.TaskWithLocationHint>(numTasks);
     for (int i = 0; i < numTasks; ++i) {
-      scheduledTasks.add(new Integer(i));
+      scheduledTasks.add(new VertexManagerPluginContext.TaskWithLocationHint(new Integer(i), null));
     }
     context.scheduleVertexTasks(scheduledTasks);
   }
@@ -114,6 +122,7 @@ public class CustomPartitionVertex imple
     // ensure this method is called only once. Tez will call it once per Root
     // Input.
     Preconditions.checkState(rootVertexInitialized == false);
+    LOG.info("Root vertex not initialized");
     rootVertexInitialized = true;
     try {
       // This is using the payload from the RootVertexInitializer corresponding
@@ -121,8 +130,8 @@ public class CustomPartitionVertex imple
       // but that
       // means serializing another instance.
       MRInputUserPayloadProto protoPayload =
-          MRHelpers.parseMRInputPayload(inputDescriptor.getUserPayload());
-      this.conf = MRHelpers.createConfFromByteString(protoPayload.getConfigurationBytes());
+          MRInputHelpers.parseMRInputPayload(inputDescriptor.getUserPayload());
+      this.conf = TezUtils.createConfFromByteString(protoPayload.getConfigurationBytes());
 
       /*
        * Currently in tez, the flow of events is thus:
@@ -138,30 +147,27 @@ public class CustomPartitionVertex imple
        */
 
       // This assumes that Grouping will always be used.
-      // Changing the InputFormat - so that the correct one is initialized in
-      // MRInput.
-      this.conf.set("mapred.input.format.class", TezGroupedSplitsInputFormat.class.getName());
+      // Enabling grouping on the payload.
       MRInputUserPayloadProto updatedPayload =
-          MRInputUserPayloadProto.newBuilder(protoPayload)
-              .setConfigurationBytes(MRHelpers.createByteStringFromConf(conf)).build();
-      inputDescriptor.setUserPayload(updatedPayload.toByteArray());
+          MRInputUserPayloadProto.newBuilder(protoPayload).setGroupingEnabled(true).build();
+      inputDescriptor.setUserPayload(UserPayload.create(updatedPayload.toByteString().asReadOnlyByteBuffer()));
     } catch (IOException e) {
       e.printStackTrace();
       throw new RuntimeException(e);
     }
 
     boolean dataInformationEventSeen = false;
-    Map<Path, List<FileSplit>> pathFileSplitsMap = new TreeMap<Path, List<FileSplit>>();
+    Map<String, List<FileSplit>> pathFileSplitsMap = new TreeMap<String, List<FileSplit>>();
 
     for (Event event : events) {
-      if (event instanceof RootInputConfigureVertexTasksEvent) {
+      if (event instanceof InputConfigureVertexTasksEvent) {
         // No tasks should have been started yet. Checked by initial state
         // check.
         Preconditions.checkState(dataInformationEventSeen == false);
         Preconditions
             .checkState(context.getVertexNumTasks(context.getVertexName()) == -1,
                 "Parallelism for the vertex should be set to -1 if the InputInitializer is setting parallelism");
-        RootInputConfigureVertexTasksEvent cEvent = (RootInputConfigureVertexTasksEvent) event;
+        InputConfigureVertexTasksEvent cEvent = (InputConfigureVertexTasksEvent) event;
 
         // The vertex cannot be configured until all DataEvents are seen - to
         // build the routing table.
@@ -169,12 +175,12 @@ public class CustomPartitionVertex imple
         dataInformationEvents =
             Lists.newArrayListWithCapacity(configureVertexTaskEvent.getNumTasks());
       }
-      if (event instanceof RootInputUpdatePayloadEvent) {
+      if (event instanceof InputUpdatePayloadEvent) {
         // this event can never occur. If it does, fail.
         Preconditions.checkState(false);
-      } else if (event instanceof RootInputDataInformationEvent) {
+      } else if (event instanceof InputDataInformationEvent) {
         dataInformationEventSeen = true;
-        RootInputDataInformationEvent diEvent = (RootInputDataInformationEvent) event;
+        InputDataInformationEvent diEvent = (InputDataInformationEvent) event;
         dataInformationEvents.add(diEvent);
         FileSplit fileSplit;
         try {
@@ -182,10 +188,10 @@ public class CustomPartitionVertex imple
         } catch (IOException e) {
           throw new RuntimeException("Failed to get file split for event: " + diEvent);
         }
-        List<FileSplit> fsList = pathFileSplitsMap.get(fileSplit.getPath());
+        List<FileSplit> fsList = pathFileSplitsMap.get(fileSplit.getPath().getName());
         if (fsList == null) {
           fsList = new ArrayList<FileSplit>();
-          pathFileSplitsMap.put(fileSplit.getPath(), fsList);
+          pathFileSplitsMap.put(fileSplit.getPath().getName(), fsList);
         }
         fsList.add(fileSplit);
       }
@@ -195,21 +201,32 @@ public class CustomPartitionVertex imple
         getBucketSplitMapForPath(pathFileSplitsMap);
 
     try {
-      int totalResource = context.getTotalAVailableResource().getMemory();
+      int totalResource = context.getTotalAvailableResource().getMemory();
       int taskResource = context.getVertexTaskResource().getMemory();
       float waves =
-          conf.getFloat(TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES,
-              TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES_DEFAULT);
+          conf.getFloat(TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_WAVES,
+              TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_WAVES_DEFAULT);
 
       int availableSlots = totalResource / taskResource;
 
       LOG.info("Grouping splits. " + availableSlots + " available slots, " + waves + " waves.");
+      JobConf jobConf = new JobConf(conf);
+      ShimLoader.getHadoopShims().getMergedCredentials(jobConf);
 
       Multimap<Integer, InputSplit> bucketToGroupedSplitMap =
-          grouper.group(conf, bucketToInitialSplitMap, availableSlots, waves);
+          HashMultimap.<Integer, InputSplit> create();
+      for (Integer key : bucketToInitialSplitMap.keySet()) {
+        InputSplit[] inputSplitArray =
+            (bucketToInitialSplitMap.get(key).toArray(new InputSplit[0]));
+        Multimap<Integer, InputSplit> groupedSplit =
+            HiveSplitGenerator.generateGroupedSplits(jobConf, conf, inputSplitArray, waves,
+            availableSlots);
+        bucketToGroupedSplitMap.putAll(key, groupedSplit.values());
+      }
 
+      LOG.info("We have grouped the splits into " + bucketToGroupedSplitMap.size() + " tasks");
       processAllEvents(inputName, bucketToGroupedSplitMap);
-    } catch (IOException e) {
+    } catch (Exception e) {
       throw new RuntimeException(e);
     }
   }
@@ -219,7 +236,6 @@ public class CustomPartitionVertex imple
 
     Multimap<Integer, Integer> bucketToTaskMap = HashMultimap.<Integer, Integer> create();
     List<InputSplit> finalSplits = Lists.newLinkedList();
-    int taskCount = 0;
     for (Entry<Integer, Collection<InputSplit>> entry : bucketToGroupedSplitMap.asMap().entrySet()) {
       int bucketNum = entry.getKey();
       Collection<InputSplit> initialSplits = entry.getValue();
@@ -232,12 +248,12 @@ public class CustomPartitionVertex imple
 
     // Construct the EdgeManager descriptor to be used by all edges which need
     // the routing table.
-    EdgeManagerDescriptor hiveEdgeManagerDesc =
-        new EdgeManagerDescriptor(CustomPartitionEdge.class.getName());
-    byte[] payload = getBytePayload(bucketToTaskMap);
+    EdgeManagerPluginDescriptor hiveEdgeManagerDesc =
+        EdgeManagerPluginDescriptor.create(CustomPartitionEdge.class.getName());
+    UserPayload payload = getBytePayload(bucketToTaskMap);
     hiveEdgeManagerDesc.setUserPayload(payload);
 
-    Map<String, EdgeManagerDescriptor> emMap = Maps.newHashMap();
+    Map<String, EdgeManagerPluginDescriptor> emMap = Maps.newHashMap();
 
     // Replace the edge manager for all vertices which have routing type custom.
     for (Entry<String, EdgeProperty> edgeEntry : context.getInputVertexEdgeProperties().entrySet()) {
@@ -250,47 +266,51 @@ public class CustomPartitionVertex imple
 
     LOG.info("Task count is " + taskCount);
 
-    List<RootInputDataInformationEvent> taskEvents =
+    List<InputDataInformationEvent> taskEvents =
         Lists.newArrayListWithCapacity(finalSplits.size());
     // Re-serialize the splits after grouping.
     int count = 0;
     for (InputSplit inputSplit : finalSplits) {
-      MRSplitProto serializedSplit = MRHelpers.createSplitProto(inputSplit);
-      RootInputDataInformationEvent diEvent =
-          new RootInputDataInformationEvent(count, serializedSplit.toByteArray());
+      MRSplitProto serializedSplit = MRInputHelpers.createSplitProto(inputSplit);
+      InputDataInformationEvent diEvent = InputDataInformationEvent.createWithSerializedPayload(
+          count, serializedSplit.toByteString().asReadOnlyByteBuffer());
       diEvent.setTargetIndex(count);
       count++;
       taskEvents.add(diEvent);
     }
 
     // Replace the Edge Managers
+    Map<String, InputSpecUpdate> rootInputSpecUpdate =
+      new HashMap<String, InputSpecUpdate>();
+    rootInputSpecUpdate.put(
+        inputName,
+        InputSpecUpdate.getDefaultSinglePhysicalInputSpecUpdate());
     context.setVertexParallelism(
         taskCount,
-        new VertexLocationHint(grouper.createTaskLocationHints(finalSplits
-            .toArray(new InputSplit[finalSplits.size()]))), emMap);
+        VertexLocationHint.create(grouper.createTaskLocationHints(finalSplits
+            .toArray(new InputSplit[finalSplits.size()]))), emMap, rootInputSpecUpdate);
 
     // Set the actual events for the tasks.
     context.addRootInputEvents(inputName, taskEvents);
   }
 
-  private byte[] getBytePayload(Multimap<Integer, Integer> routingTable) throws IOException {
+  UserPayload getBytePayload(Multimap<Integer, Integer> routingTable) throws IOException {
     CustomEdgeConfiguration edgeConf =
         new CustomEdgeConfiguration(routingTable.keySet().size(), routingTable);
     DataOutputBuffer dob = new DataOutputBuffer();
     edgeConf.write(dob);
     byte[] serialized = dob.getData();
-
-    return serialized;
+    return UserPayload.create(ByteBuffer.wrap(serialized));
   }
 
-  private FileSplit getFileSplitFromEvent(RootInputDataInformationEvent event) throws IOException {
+  private FileSplit getFileSplitFromEvent(InputDataInformationEvent event) throws IOException {
     InputSplit inputSplit = null;
     if (event.getDeserializedUserPayload() != null) {
       inputSplit = (InputSplit) event.getDeserializedUserPayload();
     } else {
-      MRSplitProto splitProto = MRSplitProto.parseFrom(event.getUserPayload());
+      MRSplitProto splitProto = MRSplitProto.parseFrom(ByteString.copyFrom(event.getUserPayload()));
       SerializationFactory serializationFactory = new SerializationFactory(new Configuration());
-      inputSplit = MRHelpers.createOldFormatSplitFromUserPayload(splitProto, serializationFactory);
+      inputSplit = MRInputHelpers.createOldFormatSplitFromUserPayload(splitProto, serializationFactory);
     }
 
     if (!(inputSplit instanceof FileSplit)) {
@@ -304,7 +324,7 @@ public class CustomPartitionVertex imple
    * This method generates the map of bucket to file splits.
    */
   private Multimap<Integer, InputSplit> getBucketSplitMapForPath(
-      Map<Path, List<FileSplit>> pathFileSplitsMap) {
+      Map<String, List<FileSplit>> pathFileSplitsMap) {
 
     int bucketNum = 0;
     int fsCount = 0;
@@ -312,7 +332,7 @@ public class CustomPartitionVertex imple
     Multimap<Integer, InputSplit> bucketToInitialSplitMap =
         ArrayListMultimap.<Integer, InputSplit> create();
 
-    for (Map.Entry<Path, List<FileSplit>> entry : pathFileSplitsMap.entrySet()) {
+    for (Map.Entry<String, List<FileSplit>> entry : pathFileSplitsMap.entrySet()) {
       int bucketId = bucketNum % numBuckets;
       for (FileSplit fsplit : entry.getValue()) {
         fsCount++;

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Mon Sep  8 04:38:17 2014
@@ -29,6 +29,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import javax.security.auth.login.LoginException;
 
@@ -70,7 +71,6 @@ import org.apache.hadoop.io.DataOutputBu
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputFormat;
-import org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
@@ -79,43 +79,43 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Records;
-import org.apache.tez.client.PreWarmContext;
-import org.apache.tez.client.TezSessionConfiguration;
+import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.DataSinkDescriptor;
+import org.apache.tez.dag.api.DataSourceDescriptor;
 import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.EdgeManagerDescriptor;
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
 import org.apache.tez.dag.api.EdgeProperty;
-import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
-import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
-import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
 import org.apache.tez.dag.api.GroupInputEdge;
 import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.InputInitializerDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.PreWarmVertex;
 import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.VertexGroup;
-import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
 import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
-import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
-import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
 import org.apache.tez.mapreduce.input.MRInputLegacy;
 import org.apache.tez.mapreduce.output.MROutput;
 import org.apache.tez.mapreduce.partition.MRPartitioner;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.comparator.TezBytesComparator;
+import org.apache.tez.runtime.library.common.serializer.TezBytesWritableSerialization;
+import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig;
+import org.apache.tez.runtime.library.conf.UnorderedKVEdgeConfig;
+import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfig;
 import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValueInput;
-import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy;
-import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput;
-import org.apache.tez.runtime.library.output.OnFileSortedOutput;
-import org.apache.tez.runtime.library.output.OnFileUnorderedKVOutput;
-import org.apache.tez.runtime.library.output.OnFileUnorderedPartitionedKVOutput;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
-import com.google.protobuf.ByteString;
 
 /**
  * DagUtils. DagUtils is a collection of helper methods to convert
@@ -131,7 +131,7 @@ public class DagUtils {
 
   private void addCredentials(MapWork mapWork, DAG dag) {
     Set<String> paths = mapWork.getPathToAliases().keySet();
-    if (paths != null && !paths.isEmpty()) {
+    if (!paths.isEmpty()) {
       Iterator<URI> pathIterator = Iterators.transform(paths.iterator(), new Function<String, URI>() {
         @Override
         public URI apply(String input) {
@@ -163,6 +163,7 @@ public class DagUtils {
     JobConf conf = new JobConf(baseConf);
 
     if (mapWork.getNumMapTasks() != null) {
+      // Is this required ?
       conf.setInt(MRJobConfig.NUM_MAPS, mapWork.getNumMapTasks().intValue());
     }
 
@@ -201,6 +202,12 @@ public class DagUtils {
       inpFormat = CombineHiveInputFormat.class.getName();
     }
 
+    if (mapWork.getDummyTableScan()) {
+      // hive input format doesn't handle the special condition of no paths + 1
+      // split correctly.
+      inpFormat = CombineHiveInputFormat.class.getName();
+    }
+
     conf.set(TEZ_TMP_DIR_KEY, context.getMRTmpPath().toUri().toString());
     conf.set("mapred.mapper.class", ExecMapper.class.getName());
     conf.set("mapred.input.format.class", inpFormat);
@@ -213,20 +220,19 @@ public class DagUtils {
    * Edge between them.
    *
    * @param group The parent VertexGroup
-   * @param wConf The job conf of the child vertex
+   * @param vConf The job conf of one of the parrent (grouped) vertices
    * @param w The child vertex
    * @param edgeProp the edge property of connection between the two
    * endpoints.
    */
   @SuppressWarnings("rawtypes")
-  public GroupInputEdge createEdge(VertexGroup group, JobConf wConf,
+  public GroupInputEdge createEdge(VertexGroup group, JobConf vConf,
       Vertex w, TezEdgeProperty edgeProp)
     throws IOException {
 
     Class mergeInputClass;
 
-    LOG.info("Creating Edge between " + group.getGroupName() + " and " + w.getVertexName());
-    w.getProcessorDescriptor().setUserPayload(MRHelpers.createUserPayloadFromConf(wConf));
+    LOG.info("Creating Edge between " + group.getGroupName() + " and " + w.getName());
 
     EdgeType edgeType = edgeProp.getEdgeType();
     switch (edgeType) {
@@ -237,9 +243,10 @@ public class DagUtils {
       mergeInputClass = ConcatenatedMergedKeyValueInput.class;
       int numBuckets = edgeProp.getNumBuckets();
       VertexManagerPluginDescriptor desc =
-          new VertexManagerPluginDescriptor(CustomPartitionVertex.class.getName());
-      byte[] userPayload = ByteBuffer.allocate(4).putInt(numBuckets).array();
-      desc.setUserPayload(userPayload);
+          VertexManagerPluginDescriptor.create(CustomPartitionVertex.class.getName());
+      ByteBuffer userPayload = ByteBuffer.allocate(4).putInt(numBuckets);
+      userPayload.flip();
+      desc.setUserPayload(UserPayload.create(userPayload));
       w.setVertexManagerPlugin(desc);
       break;
     }
@@ -257,47 +264,31 @@ public class DagUtils {
       break;
     }
 
-    return new GroupInputEdge(group, w, createEdgeProperty(edgeProp),
-        new InputDescriptor(mergeInputClass.getName()));
+    return GroupInputEdge.create(group, w, createEdgeProperty(edgeProp, vConf),
+        InputDescriptor.create(mergeInputClass.getName()));
   }
 
   /**
-   * Given two vertices a, b update their configurations to be used in an Edge a-b
-   */
-  public void updateConfigurationForEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex w)
-    throws IOException {
-
-    // Tez needs to setup output subsequent input pairs correctly
-    MultiStageMRConfToTezTranslator.translateVertexConfToTez(wConf, vConf);
-
-    // update payloads (configuration for the vertices might have changed)
-    v.getProcessorDescriptor().setUserPayload(MRHelpers.createUserPayloadFromConf(vConf));
-    w.getProcessorDescriptor().setUserPayload(MRHelpers.createUserPayloadFromConf(wConf));
-  }
-
-  /**
-   * Given two vertices and their respective configuration objects createEdge
+   * Given two vertices and the configuration for the source vertex, createEdge
    * will create an Edge object that connects the two.
    *
-   * @param vConf JobConf of the first vertex
+   * @param vConf JobConf of the first (source) vertex
    * @param v The first vertex (source)
-   * @param wConf JobConf of the second vertex
    * @param w The second vertex (sink)
    * @return
    */
-  public Edge createEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex w,
+  public Edge createEdge(JobConf vConf, Vertex v, Vertex w,
       TezEdgeProperty edgeProp)
     throws IOException {
 
-    updateConfigurationForEdge(vConf, v, wConf, w);
-
     switch(edgeProp.getEdgeType()) {
     case CUSTOM_EDGE: {
       int numBuckets = edgeProp.getNumBuckets();
-      byte[] userPayload = ByteBuffer.allocate(4).putInt(numBuckets).array();
-      VertexManagerPluginDescriptor desc = new VertexManagerPluginDescriptor(
+      ByteBuffer userPayload = ByteBuffer.allocate(4).putInt(numBuckets);
+      userPayload.flip();
+      VertexManagerPluginDescriptor desc = VertexManagerPluginDescriptor.create(
           CustomPartitionVertex.class.getName());
-      desc.setUserPayload(userPayload);
+      desc.setUserPayload(UserPayload.create(userPayload));
       w.setVertexManagerPlugin(desc);
       break;
     }
@@ -309,71 +300,92 @@ public class DagUtils {
       // nothing
     }
 
-    return new Edge(v, w, createEdgeProperty(edgeProp));
+    return Edge.create(v, w, createEdgeProperty(edgeProp, vConf));
   }
 
   /*
    * Helper function to create an edge property from an edge type.
    */
   @SuppressWarnings("rawtypes")
-  private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp) throws IOException {
-    DataMovementType dataMovementType;
-    Class logicalInputClass;
-    Class logicalOutputClass;
+  private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp, Configuration conf)
+      throws IOException {
+    MRHelpers.translateMRConfToTez(conf);
+    String keyClass = conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS);
+    String valClass = conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS);
+    String partitionerClassName = conf.get("mapred.partitioner.class");
+    Map<String, String> partitionerConf;
 
-    EdgeProperty edgeProperty = null;
     EdgeType edgeType = edgeProp.getEdgeType();
     switch (edgeType) {
     case BROADCAST_EDGE:
-      dataMovementType = DataMovementType.BROADCAST;
-      logicalOutputClass = OnFileUnorderedKVOutput.class;
-      logicalInputClass = ShuffledUnorderedKVInput.class;
-      break;
-
+      UnorderedKVEdgeConfig et1Conf = UnorderedKVEdgeConfig
+          .newBuilder(keyClass, valClass)
+          .setFromConfiguration(conf)
+          .setKeySerializationClass(TezBytesWritableSerialization.class.getName(), null)
+          .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null)
+          .build();
+      return et1Conf.createDefaultBroadcastEdgeProperty();
     case CUSTOM_EDGE:
-      dataMovementType = DataMovementType.CUSTOM;
-      logicalOutputClass = OnFileUnorderedPartitionedKVOutput.class;
-      logicalInputClass = ShuffledUnorderedKVInput.class;
-      EdgeManagerDescriptor edgeDesc =
-          new EdgeManagerDescriptor(CustomPartitionEdge.class.getName());
+      assert partitionerClassName != null;
+      partitionerConf = createPartitionerConf(partitionerClassName, conf);
+      UnorderedPartitionedKVEdgeConfig et2Conf = UnorderedPartitionedKVEdgeConfig
+          .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf)
+          .setFromConfiguration(conf)
+          .setKeySerializationClass(TezBytesWritableSerialization.class.getName(), null)
+          .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null)
+          .build();
+      EdgeManagerPluginDescriptor edgeDesc =
+          EdgeManagerPluginDescriptor.create(CustomPartitionEdge.class.getName());
       CustomEdgeConfiguration edgeConf =
           new CustomEdgeConfiguration(edgeProp.getNumBuckets(), null);
       DataOutputBuffer dob = new DataOutputBuffer();
       edgeConf.write(dob);
       byte[] userPayload = dob.getData();
-      edgeDesc.setUserPayload(userPayload);
-      edgeProperty =
-          new EdgeProperty(edgeDesc,
-              DataSourceType.PERSISTED,
-              SchedulingType.SEQUENTIAL,
-              new OutputDescriptor(logicalOutputClass.getName()),
-              new InputDescriptor(logicalInputClass.getName()));
-      break;
-
+      edgeDesc.setUserPayload(UserPayload.create(ByteBuffer.wrap(userPayload)));
+      return et2Conf.createDefaultCustomEdgeProperty(edgeDesc);
     case CUSTOM_SIMPLE_EDGE:
-      dataMovementType = DataMovementType.SCATTER_GATHER;
-      logicalOutputClass = OnFileUnorderedPartitionedKVOutput.class;
-      logicalInputClass = ShuffledUnorderedKVInput.class;
-      break;
-
+      assert partitionerClassName != null;
+      partitionerConf = createPartitionerConf(partitionerClassName, conf);
+      UnorderedPartitionedKVEdgeConfig et3Conf = UnorderedPartitionedKVEdgeConfig
+          .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf)
+          .setFromConfiguration(conf)
+          .setKeySerializationClass(TezBytesWritableSerialization.class.getName(), null)
+          .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null)
+          .build();
+      return et3Conf.createDefaultEdgeProperty();
     case SIMPLE_EDGE:
     default:
-      dataMovementType = DataMovementType.SCATTER_GATHER;
-      logicalOutputClass = OnFileSortedOutput.class;
-      logicalInputClass = ShuffledMergedInputLegacy.class;
-      break;
+      assert partitionerClassName != null;
+      partitionerConf = createPartitionerConf(partitionerClassName, conf);
+      OrderedPartitionedKVEdgeConfig et4Conf = OrderedPartitionedKVEdgeConfig
+          .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf)
+          .setFromConfiguration(conf)
+          .setKeySerializationClass(TezBytesWritableSerialization.class.getName(),
+              TezBytesComparator.class.getName(), null)
+          .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null)
+          .build();
+      return et4Conf.createDefaultEdgeProperty();
     }
+  }
 
-    if (edgeProperty == null) {
-      edgeProperty =
-        new EdgeProperty(dataMovementType,
-            DataSourceType.PERSISTED,
-            SchedulingType.SEQUENTIAL,
-            new OutputDescriptor(logicalOutputClass.getName()),
-            new InputDescriptor(logicalInputClass.getName()));
+  /**
+   * Utility method to create a stripped down configuration for the MR partitioner.
+   *
+   * @param partitionerClassName
+   *          the real MR partitioner class name
+   * @param baseConf
+   *          a base configuration to extract relevant properties
+   * @return
+   */
+  private Map<String, String> createPartitionerConf(String partitionerClassName,
+      Configuration baseConf) {
+    Map<String, String> partitionerConf = new HashMap<String, String>();
+    partitionerConf.put("mapred.partitioner.class", partitionerClassName);
+    if (baseConf.get("mapreduce.totalorderpartitioner.path") != null) {
+      partitionerConf.put("mapreduce.totalorderpartitioner.path",
+      baseConf.get("mapreduce.totalorderpartitioner.path"));
     }
-
-    return edgeProperty;
+    return partitionerConf;
   }
 
   /*
@@ -391,6 +403,15 @@ public class DagUtils {
   }
 
   /*
+   * Helper to setup default environment for a task in YARN.
+   */
+  private Map<String, String> getContainerEnvironment(Configuration conf, boolean isMap) {
+    Map<String, String> environment = new HashMap<String, String>();
+    MRHelpers.updateEnvBasedOnMRTaskEnv(conf, environment, isMap);
+    return environment;
+  }
+
+  /*
    * Helper to determine what java options to use for the containers
    * Falls back to Map-reduces map java opts if no tez specific options
    * are set
@@ -400,14 +421,14 @@ public class DagUtils {
     if (javaOpts != null && !javaOpts.isEmpty()) {
       String logLevel = HiveConf.getVar(conf, HiveConf.ConfVars.HIVETEZLOGLEVEL);
       List<String> logProps = Lists.newArrayList();
-      MRHelpers.addLog4jSystemProperties(logLevel, logProps);
+      TezUtils.addLog4jSystemProperties(logLevel, logProps);
       StringBuilder sb = new StringBuilder();
       for (String str : logProps) {
         sb.append(str).append(" ");
       }
       return javaOpts + " " + sb.toString();
     }
-    return MRHelpers.getMapJavaOpts(conf);
+    return MRHelpers.getJavaOptsForMRMapper(conf);
   }
 
   /*
@@ -425,18 +446,15 @@ public class DagUtils {
     // create the directories FileSinkOperators need
     Utilities.createTmpDirs(conf, mapWork);
 
-    // Tez ask us to call this even if there's no preceding vertex
-    MultiStageMRConfToTezTranslator.translateVertexConfToTez(conf, null);
-
     // finally create the vertex
     Vertex map = null;
 
     // use tez to combine splits
-    boolean useTezGroupedSplits = false;
+    boolean groupSplitsInInputInitializer;
+
+    DataSourceDescriptor dataSource;
 
     int numTasks = -1;
-    Class<HiveSplitGenerator> amSplitGeneratorClass = null;
-    InputSplitInfo inputSplitInfo = null;
     Class inputFormatClass = conf.getClass("mapred.input.format.class",
         InputFormat.class);
 
@@ -450,9 +468,9 @@ public class DagUtils {
     }
 
     if (vertexHasCustomInput) {
-      useTezGroupedSplits = false;
-      // grouping happens in execution phase. Setting the class to TezGroupedSplitsInputFormat
-      // here would cause pre-mature grouping which would be incorrect.
+      groupSplitsInInputInitializer = false;
+      // grouping happens in execution phase. The input payload should not enable grouping here,
+      // it will be enabled in the CustomVertex.
       inputFormatClass = HiveInputFormat.class;
       conf.setClass("mapred.input.format.class", HiveInputFormat.class, InputFormat.class);
       // mapreduce.tez.input.initializer.serialize.event.payload should be set to false when using
@@ -462,49 +480,52 @@ public class DagUtils {
       // we'll set up tez to combine spits for us iff the input format
       // is HiveInputFormat
       if (inputFormatClass == HiveInputFormat.class) {
-        useTezGroupedSplits = true;
-        conf.setClass("mapred.input.format.class", TezGroupedSplitsInputFormat.class, InputFormat.class);
+        groupSplitsInInputInitializer = true;
+      } else {
+        groupSplitsInInputInitializer = false;
       }
     }
 
     if (HiveConf.getBoolVar(conf, ConfVars.HIVE_AM_SPLIT_GENERATION)
         && !mapWork.isUseOneNullRowInputFormat()) {
+
+      // set up the operator plan. (before setting up splits on the AM)
+      Utilities.setMapWork(conf, mapWork, mrScratchDir, false);
+
       // if we're generating the splits in the AM, we just need to set
       // the correct plugin.
-      amSplitGeneratorClass = HiveSplitGenerator.class;
+      if (groupSplitsInInputInitializer) {
+        // Not setting a payload, since the MRInput payload is the same and can be accessed.
+        InputInitializerDescriptor descriptor = InputInitializerDescriptor.create(
+            HiveSplitGenerator.class.getName());
+        dataSource = MRInputLegacy.createConfigBuilder(conf, inputFormatClass).groupSplits(true)
+            .setCustomInitializerDescriptor(descriptor).build();
+      } else {
+        // Not HiveInputFormat, or a custom VertexManager will take care of grouping splits
+        dataSource = MRInputLegacy.createConfigBuilder(conf, inputFormatClass).groupSplits(false).build();
+      }
     } else {
-      // client side split generation means we have to compute them now
-      inputSplitInfo = MRHelpers.generateInputSplits(conf,
-          new Path(tezDir, "split_"+mapWork.getName().replaceAll(" ", "_")));
-      numTasks = inputSplitInfo.getNumTasks();
+      // Setup client side split generation.
+      dataSource = MRInputHelpers.configureMRInputWithLegacySplitGeneration(conf, new Path(tezDir,
+          "split_" + mapWork.getName().replaceAll(" ", "_")), true);
+      numTasks = dataSource.getNumberOfShards();
+
+      // set up the operator plan. (after generating splits - that changes configs)
+      Utilities.setMapWork(conf, mapWork, mrScratchDir, false);
     }
 
-    // set up the operator plan
-    Utilities.setMapWork(conf, mapWork, mrScratchDir, false);
-
-    byte[] serializedConf = MRHelpers.createUserPayloadFromConf(conf);
-    map = new Vertex(mapWork.getName(),
-        new ProcessorDescriptor(MapTezProcessor.class.getName()).
+    UserPayload serializedConf = TezUtils.createUserPayloadFromConf(conf);
+    map = Vertex.create(mapWork.getName(),
+        ProcessorDescriptor.create(MapTezProcessor.class.getName()).
         setUserPayload(serializedConf), numTasks, getContainerResource(conf));
-    Map<String, String> environment = new HashMap<String, String>();
-    MRHelpers.updateEnvironmentForMRTasks(conf, environment, true);
-    map.setTaskEnvironment(environment);
-    map.setJavaOpts(getContainerJavaOpts(conf));
+    map.setTaskEnvironment(getContainerEnvironment(conf, true));
+    map.setTaskLaunchCmdOpts(getContainerJavaOpts(conf));
 
     assert mapWork.getAliasToWork().keySet().size() == 1;
 
+    // Add the actual source input
     String alias = mapWork.getAliasToWork().keySet().iterator().next();
-
-    byte[] mrInput = null;
-    if (useTezGroupedSplits) {
-      mrInput = MRHelpers.createMRInputPayloadWithGrouping(serializedConf,
-          HiveInputFormat.class.getName());
-    } else {
-      mrInput = MRHelpers.createMRInputPayload(serializedConf, null);
-    }
-    map.addInput(alias,
-        new InputDescriptor(MRInputLegacy.class.getName()).
-        setUserPayload(mrInput), amSplitGeneratorClass);
+    map.addDataSource(alias, dataSource);
 
     Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
     localResources.put(getBaseName(appJarLr), appJarLr);
@@ -512,14 +533,7 @@ public class DagUtils {
       localResources.put(getBaseName(lr), lr);
     }
 
-    if (inputSplitInfo != null) {
-      // only relevant for client-side split generation
-      map.setTaskLocationsHint(inputSplitInfo.getTaskLocationHints());
-      MRHelpers.updateLocalResourcesForInputSplits(FileSystem.get(conf), inputSplitInfo,
-          localResources);
-    }
-
-    map.setTaskLocalResources(localResources);
+    map.addTaskLocalFiles(localResources);
     return map;
   }
 
@@ -529,6 +543,7 @@ public class DagUtils {
   private JobConf initializeVertexConf(JobConf baseConf, Context context, ReduceWork reduceWork) {
     JobConf conf = new JobConf(baseConf);
 
+    // Is this required ?
     conf.set("mapred.reducer.class", ExecReducer.class.getName());
 
     boolean useSpeculativeExecReducers = HiveConf.getBoolVar(conf,
@@ -552,29 +567,22 @@ public class DagUtils {
     // create the directories FileSinkOperators need
     Utilities.createTmpDirs(conf, reduceWork);
 
-    // Call once here, will be updated when we find edges
-    MultiStageMRConfToTezTranslator.translateVertexConfToTez(conf, null);
-
     // create the vertex
-    Vertex reducer = new Vertex(reduceWork.getName(),
-        new ProcessorDescriptor(ReduceTezProcessor.class.getName()).
-        setUserPayload(MRHelpers.createUserPayloadFromConf(conf)),
+    Vertex reducer = Vertex.create(reduceWork.getName(),
+        ProcessorDescriptor.create(ReduceTezProcessor.class.getName()).
+        setUserPayload(TezUtils.createUserPayloadFromConf(conf)),
             reduceWork.isAutoReduceParallelism() ? reduceWork.getMaxReduceTasks() : reduceWork
                 .getNumReduceTasks(), getContainerResource(conf));
 
-    Map<String, String> environment = new HashMap<String, String>();
-
-    MRHelpers.updateEnvironmentForMRTasks(conf, environment, false);
-    reducer.setTaskEnvironment(environment);
-
-    reducer.setJavaOpts(getContainerJavaOpts(conf));
+    reducer.setTaskEnvironment(getContainerEnvironment(conf, false));
+    reducer.setTaskLaunchCmdOpts(getContainerJavaOpts(conf));
 
     Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
     localResources.put(getBaseName(appJarLr), appJarLr);
     for (LocalResource lr: additionalLr) {
       localResources.put(getBaseName(lr), lr);
     }
-    reducer.setTaskLocalResources(localResources);
+    reducer.addTaskLocalFiles(localResources);
 
     return reducer;
   }
@@ -608,37 +616,29 @@ public class DagUtils {
   }
 
   /**
-   * @param sessionConfig session configuration
    * @param numContainers number of containers to pre-warm
    * @param localResources additional resources to pre-warm with
-   * @return prewarm context object
+   * @return prewarm vertex to run
    */
-  public PreWarmContext createPreWarmContext(TezSessionConfiguration sessionConfig, int numContainers,
-      Map<String, LocalResource> localResources) throws IOException, TezException {
+  public PreWarmVertex createPreWarmVertex(TezConfiguration conf,
+      int numContainers, Map<String, LocalResource> localResources) throws
+      IOException, TezException {
 
-    Configuration conf = sessionConfig.getTezConfiguration();
+    ProcessorDescriptor prewarmProcDescriptor = ProcessorDescriptor.create(HivePreWarmProcessor.class.getName());
+    prewarmProcDescriptor.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
 
-    ProcessorDescriptor prewarmProcDescriptor = new ProcessorDescriptor(HivePreWarmProcessor.class.getName());
-    prewarmProcDescriptor.setUserPayload(MRHelpers.createUserPayloadFromConf(conf));
-
-    PreWarmContext context = new PreWarmContext(prewarmProcDescriptor, getContainerResource(conf),
-        numContainers, null);
+    PreWarmVertex prewarmVertex = PreWarmVertex.create("prewarm", prewarmProcDescriptor, numContainers,getContainerResource(conf));
 
     Map<String, LocalResource> combinedResources = new HashMap<String, LocalResource>();
 
-    combinedResources.putAll(sessionConfig.getSessionResources());
     if (localResources != null) {
       combinedResources.putAll(localResources);
     }
 
-    context.setLocalResources(combinedResources);
-
-    /* boiler plate task env */
-    Map<String, String> environment = new HashMap<String, String>();
-    MRHelpers.updateEnvironmentForMRTasks(conf, environment, true);
-    context.setEnvironment(environment);
-    context.setJavaOpts(getContainerJavaOpts(conf));
-    return context;
+    prewarmVertex.addTaskLocalFiles(localResources);
+    prewarmVertex.setTaskLaunchCmdOpts(getContainerJavaOpts(conf));
+    prewarmVertex.setTaskEnvironment(getContainerEnvironment(conf, false));
+    return prewarmVertex;
   }
 
   /**
@@ -710,7 +710,7 @@ public class DagUtils {
 
   /**
    * Localizes files, archives and jars from a provided array of names.
-   * @param hdfsDirPathStr Destination directoty in HDFS.
+   * @param hdfsDirPathStr Destination directory in HDFS.
    * @param conf Configuration.
    * @param inputOutputJars The file names to localize.
    * @return List<LocalResource> local resources to add to execution
@@ -784,7 +784,7 @@ public class DagUtils {
   }
 
   /**
-   * @param pathStr - the string from which we try to determine the resource base name
+   * @param path - the path from which we try to determine the resource base name
    * @return the name of the resource from a given path string.
    */
   public String getResourceBaseName(Path path) {
@@ -830,9 +830,8 @@ public class DagUtils {
         int waitAttempts =
             conf.getInt(HiveConf.ConfVars.HIVE_LOCALIZE_RESOURCE_NUM_WAIT_ATTEMPTS.varname,
                 HiveConf.ConfVars.HIVE_LOCALIZE_RESOURCE_NUM_WAIT_ATTEMPTS.defaultIntVal);
-        long sleepInterval =
-            conf.getLong(HiveConf.ConfVars.HIVE_LOCALIZE_RESOURCE_WAIT_INTERVAL.varname,
-                HiveConf.ConfVars.HIVE_LOCALIZE_RESOURCE_WAIT_INTERVAL.defaultLongVal);
+        long sleepInterval = HiveConf.getTimeVar(
+            conf, HiveConf.ConfVars.HIVE_LOCALIZE_RESOURCE_WAIT_INTERVAL, TimeUnit.MILLISECONDS);
         LOG.info("Number of wait attempts: " + waitAttempts + ". Wait interval: "
             + sleepInterval);
         boolean found = false;
@@ -873,7 +872,7 @@ public class DagUtils {
   public JobConf createConfiguration(HiveConf hiveConf) throws IOException {
     hiveConf.setBoolean("mapred.mapper.new-api", false);
 
-    JobConf conf = (JobConf) MRHelpers.getBaseMRConfiguration(hiveConf);
+    JobConf conf = new JobConf(hiveConf);
 
     conf.set("mapred.output.committer.class", NullOutputCommitter.class.getName());
 
@@ -919,7 +918,6 @@ public class DagUtils {
    * @param work The instance of BaseWork representing the actual work to be performed
    * by this vertex.
    * @param scratchDir HDFS scratch dir for this execution unit.
-   * @param list
    * @param appJarLr Local resource for hive-exec.
    * @param additionalLr
    * @param fileSystem FS corresponding to scratchDir and LocalResources
@@ -963,9 +961,9 @@ public class DagUtils {
 
     // final vertices need to have at least one output
     if (!hasChildren) {
-      v.addOutput("out_"+work.getName(),
-          new OutputDescriptor(MROutput.class.getName())
-          .setUserPayload(MRHelpers.createUserPayloadFromConf(conf)));
+      v.addDataSink("out_"+work.getName(), new DataSinkDescriptor(
+          OutputDescriptor.create(MROutput.class.getName())
+          .setUserPayload(TezUtils.createUserPayloadFromConf(conf)), null, null));
     }
 
     return v;
@@ -1033,16 +1031,16 @@ public class DagUtils {
     if (edgeProp.isAutoReduce()) {
       Configuration pluginConf = new Configuration(false);
       VertexManagerPluginDescriptor desc =
-          new VertexManagerPluginDescriptor(ShuffleVertexManager.class.getName());
+          VertexManagerPluginDescriptor.create(ShuffleVertexManager.class.getName());
       pluginConf.setBoolean(
-          ShuffleVertexManager.TEZ_AM_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, true);
-      pluginConf.setInt(ShuffleVertexManager.TEZ_AM_SHUFFLE_VERTEX_MANAGER_MIN_TASK_PARALLELISM,
+          ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, true);
+      pluginConf.setInt(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_TASK_PARALLELISM,
           edgeProp.getMinReducer());
       pluginConf.setLong(
-          ShuffleVertexManager.TEZ_AM_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
+          ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
           edgeProp.getInputSizePerReducer());
-      ByteString payload = MRHelpers.createByteStringFromConf(pluginConf);
-      desc.setUserPayload(payload.toByteArray());
+      UserPayload payload = TezUtils.createUserPayloadFromConf(pluginConf);
+      desc.setUserPayload(payload);
       v.setVertexManagerPlugin(desc);
     }
   }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java Mon Sep  8 04:38:17 2014
@@ -18,45 +18,29 @@
 package org.apache.hadoop.hive.ql.exec.tez;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.debug.Utils;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.MapredContext;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
 import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinBytesTableContainer;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKeyObject;
-import org.apache.hadoop.hive.ql.exec.persistence.LazyFlatRowContainer;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectSerDeContext;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
-import org.apache.hadoop.hive.serde2.ByteStream.Output;
 import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryFactory;
-import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe;
-import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryStruct;
-import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
-import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.library.api.KeyValueReader;
@@ -73,6 +57,7 @@ public class HashTableLoader implements 
   private Configuration hconf;
   private MapJoinDesc desc;
   private MapJoinKey lastKey = null;
+  private int rowCount = 0;
 
   @Override
   public void init(ExecMapperContext context, Configuration hconf, MapJoinOperator joinOp) {
@@ -125,6 +110,7 @@ public class HashTableLoader implements 
             : new HashMapWrapper(hconf, keyCount);
 
         while (kvReader.next()) {
+          rowCount++;
           lastKey = tableContainer.putRow(keyCtx, (Writable)kvReader.getCurrentKey(),
               valCtx, (Writable)kvReader.getCurrentValue());
         }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HivePreWarmProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HivePreWarmProcessor.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HivePreWarmProcessor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HivePreWarmProcessor.java Mon Sep  8 04:38:17 2014
@@ -25,15 +25,15 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.ReadaheadPool;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
 import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.LogicalIOProcessor;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
-import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.api.ProcessorContext;
 
 import java.net.URL;
 import java.net.JarURLConnection;
-import java.util.ArrayList;
 import java.util.Enumeration;
 import java.util.List;
 import java.util.Map;
@@ -48,7 +48,7 @@ import javax.crypto.Mac;
  *
  * @see Config for configuring the HivePreWarmProcessor
  */
-public class HivePreWarmProcessor implements LogicalIOProcessor {
+public class HivePreWarmProcessor extends AbstractLogicalIOProcessor {
 
   private static boolean prewarmed = false;
 
@@ -56,10 +56,13 @@ public class HivePreWarmProcessor implem
 
   private Configuration conf;
 
+  public HivePreWarmProcessor(ProcessorContext context) {
+    super(context);
+  }
+
   @Override
-  public void initialize(TezProcessorContext processorContext)
-    throws Exception {
-    byte[] userPayload = processorContext.getUserPayload();
+  public void initialize() throws Exception {
+    UserPayload userPayload = getContext().getUserPayload();
     this.conf = TezUtils.createConfFromUserPayload(userPayload);
   }
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java Mon Sep  8 04:38:17 2014
@@ -35,19 +35,23 @@ import org.apache.hadoop.mapred.FileSpli
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.split.TezMapReduceSplitsGrouper;
 import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.VertexLocationHint;
+import org.apache.tez.dag.api.TaskLocationHint;
 import org.apache.tez.mapreduce.hadoop.InputSplitInfoMem;
-import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto;
 import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.TezRootInputInitializer;
-import org.apache.tez.runtime.api.TezRootInputInitializerContext;
-import org.apache.tez.runtime.api.events.RootInputConfigureVertexTasksEvent;
-import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
+import org.apache.tez.runtime.api.InputInitializer;
+import org.apache.tez.runtime.api.InputInitializerContext;
+import org.apache.tez.runtime.api.InputSpecUpdate;
+import org.apache.tez.runtime.api.events.InputConfigureVertexTasksEvent;
+import org.apache.tez.runtime.api.events.InputDataInformationEvent;
+import org.apache.tez.runtime.api.events.InputInitializerEvent;
 
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Lists;
@@ -59,20 +63,30 @@ import com.google.common.collect.Multima
  * making sure that splits from different partitions are only grouped if they
  * are of the same schema, format and serde
  */
-public class HiveSplitGenerator implements TezRootInputInitializer {
+@SuppressWarnings("deprecation")
+public class HiveSplitGenerator extends InputInitializer {
 
   private static final Log LOG = LogFactory.getLog(HiveSplitGenerator.class);
 
-  private final SplitGrouper grouper = new SplitGrouper();
+  private static final SplitGrouper grouper = new SplitGrouper();
+  private final DynamicPartitionPruner pruner = new DynamicPartitionPruner();
+  private InputInitializerContext context;
+
+  public HiveSplitGenerator(InputInitializerContext initializerContext) {
+    super(initializerContext);
+  }
 
   @Override
-  public List<Event> initialize(TezRootInputInitializerContext rootInputContext) throws Exception {
+  public List<Event> initialize() throws Exception {
+    InputInitializerContext rootInputContext = getContext();
+
+    context = rootInputContext;
 
     MRInputUserPayloadProto userPayloadProto =
-        MRHelpers.parseMRInputPayload(rootInputContext.getUserPayload());
+        MRInputHelpers.parseMRInputPayload(rootInputContext.getInputUserPayload());
 
     Configuration conf =
-        MRHelpers.createConfFromByteString(userPayloadProto.getConfigurationBytes());
+        TezUtils.createConfFromByteString(userPayloadProto.getConfigurationBytes());
 
     boolean sendSerializedEvents =
         conf.getBoolean("mapreduce.tez.input.initializer.serialize.event.payload", true);
@@ -81,42 +95,66 @@ public class HiveSplitGenerator implemen
     JobConf jobConf = new JobConf(conf);
     ShimLoader.getHadoopShims().getMergedCredentials(jobConf);
 
+    MapWork work = Utilities.getMapWork(jobConf);
+
+    // perform dynamic partition pruning
+    pruner.prune(work, jobConf, context);
+
     InputSplitInfoMem inputSplitInfo = null;
-    String realInputFormatName = userPayloadProto.getInputFormatName();
-    if (realInputFormatName != null && !realInputFormatName.isEmpty()) {
-      inputSplitInfo = generateGroupedSplits(rootInputContext, jobConf, conf, realInputFormatName);
+    String realInputFormatName = conf.get("mapred.input.format.class");
+    boolean groupingEnabled = userPayloadProto.getGroupingEnabled();
+    if (groupingEnabled) {
+      // Need to instantiate the realInputFormat
+      InputFormat<?, ?> inputFormat =
+          (InputFormat<?, ?>) ReflectionUtils.newInstance(Class.forName(realInputFormatName),
+              jobConf);
+
+      int totalResource = rootInputContext.getTotalAvailableResource().getMemory();
+      int taskResource = rootInputContext.getVertexTaskResource().getMemory();
+      int availableSlots = totalResource / taskResource;
+
+      // Create the un-grouped splits
+      float waves =
+          conf.getFloat(TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_WAVES,
+              TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_WAVES_DEFAULT);
+
+      InputSplit[] splits = inputFormat.getSplits(jobConf, (int) (availableSlots * waves));
+      LOG.info("Number of input splits: " + splits.length + ". " + availableSlots
+          + " available slots, " + waves + " waves. Input format is: " + realInputFormatName);
+
+      Multimap<Integer, InputSplit> groupedSplits =
+          generateGroupedSplits(jobConf, conf, splits, waves, availableSlots);
+      // And finally return them in a flat array
+      InputSplit[] flatSplits = groupedSplits.values().toArray(new InputSplit[0]);
+      LOG.info("Number of grouped splits: " + flatSplits.length);
+
+      List<TaskLocationHint> locationHints = grouper.createTaskLocationHints(flatSplits);
+
+      Utilities.clearWork(jobConf);
+
+      inputSplitInfo =
+          new InputSplitInfoMem(flatSplits, locationHints, flatSplits.length, null, jobConf);
     } else {
-      inputSplitInfo = MRHelpers.generateInputSplitsToMem(jobConf);
+      // no need for grouping and the target #of tasks.
+      // This code path should never be triggered at the moment. If grouping is disabled,
+      // DAGUtils uses MRInputAMSplitGenerator.
+      // If this is used in the future - make sure to disable grouping in the payload, if it isn't already disabled
+      throw new RuntimeException(
+          "HiveInputFormat does not support non-grouped splits, InputFormatName is: "
+              + realInputFormatName);
+      // inputSplitInfo = MRInputHelpers.generateInputSplitsToMem(jobConf, false, 0);
     }
 
     return createEventList(sendSerializedEvents, inputSplitInfo);
   }
 
-  private InputSplitInfoMem generateGroupedSplits(TezRootInputInitializerContext context,
-      JobConf jobConf, Configuration conf, String realInputFormatName) throws Exception {
 
-    int totalResource = context.getTotalAvailableResource().getMemory();
-    int taskResource = context.getVertexTaskResource().getMemory();
-    int availableSlots = totalResource / taskResource;
-
-    float waves =
-        conf.getFloat(TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES,
-            TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES_DEFAULT);
+  public static Multimap<Integer, InputSplit> generateGroupedSplits(JobConf jobConf,
+      Configuration conf, InputSplit[] splits, float waves, int availableSlots)
+      throws Exception {
 
     MapWork work = Utilities.getMapWork(jobConf);
 
-    LOG.info("Grouping splits for " + work.getName() + ". " + availableSlots + " available slots, "
-        + waves + " waves. Input format is: " + realInputFormatName);
-
-    // Need to instantiate the realInputFormat
-    InputFormat<?, ?> inputFormat =
-        (InputFormat<?, ?>) ReflectionUtils
-            .newInstance(Class.forName(realInputFormatName), jobConf);
-
-    // Create the un-grouped splits
-    InputSplit[] splits = inputFormat.getSplits(jobConf, (int) (availableSlots * waves));
-    LOG.info("Number of input splits: " + splits.length);
-
     Multimap<Integer, InputSplit> bucketSplitMultiMap =
         ArrayListMultimap.<Integer, InputSplit> create();
 
@@ -159,41 +197,42 @@ public class HiveSplitGenerator implemen
     Multimap<Integer, InputSplit> groupedSplits =
         grouper.group(jobConf, bucketSplitMultiMap, availableSlots, waves);
 
-    // And finally return them in a flat array
-    InputSplit[] flatSplits = groupedSplits.values().toArray(new InputSplit[0]);
-    LOG.info("Number of grouped splits: " + flatSplits.length);
-
-    List<TaskLocationHint> locationHints = grouper.createTaskLocationHints(flatSplits);
-
-    Utilities.clearWork(jobConf);
-
-    return new InputSplitInfoMem(flatSplits, locationHints, flatSplits.length, null, jobConf);
+    return groupedSplits;
   }
 
   private List<Event> createEventList(boolean sendSerializedEvents, InputSplitInfoMem inputSplitInfo) {
 
     List<Event> events = Lists.newArrayListWithCapacity(inputSplitInfo.getNumTasks() + 1);
 
-    RootInputConfigureVertexTasksEvent configureVertexEvent =
-        new RootInputConfigureVertexTasksEvent(inputSplitInfo.getNumTasks(),
-            inputSplitInfo.getTaskLocationHints());
+    InputConfigureVertexTasksEvent configureVertexEvent =
+        InputConfigureVertexTasksEvent.create(inputSplitInfo.getNumTasks(),
+        VertexLocationHint.create(inputSplitInfo.getTaskLocationHints()),
+        InputSpecUpdate.getDefaultSinglePhysicalInputSpecUpdate());
     events.add(configureVertexEvent);
 
     if (sendSerializedEvents) {
       MRSplitsProto splitsProto = inputSplitInfo.getSplitsProto();
       int count = 0;
       for (MRSplitProto mrSplit : splitsProto.getSplitsList()) {
-        RootInputDataInformationEvent diEvent =
-            new RootInputDataInformationEvent(count++, mrSplit.toByteArray());
+        InputDataInformationEvent diEvent = InputDataInformationEvent.createWithSerializedPayload(
+            count++, mrSplit.toByteString().asReadOnlyByteBuffer());
         events.add(diEvent);
       }
     } else {
       int count = 0;
       for (org.apache.hadoop.mapred.InputSplit split : inputSplitInfo.getOldFormatSplits()) {
-        RootInputDataInformationEvent diEvent = new RootInputDataInformationEvent(count++, split);
+        InputDataInformationEvent diEvent = InputDataInformationEvent.createWithObjectPayload(
+            count++, split);
         events.add(diEvent);
       }
     }
     return events;
   }
+
+  @Override
+  public void handleInputInitializerEvent(List<InputInitializerEvent> events) throws Exception {
+    for (InputInitializerEvent e : events) {
+      pruner.getQueue().put(e);
+    }
+  }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java Mon Sep  8 04:38:17 2014
@@ -47,7 +47,7 @@ import org.apache.tez.mapreduce.input.MR
 import org.apache.tez.mapreduce.processor.MRTaskReporter;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
-import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.api.ProcessorContext;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 
 /**
@@ -64,8 +64,25 @@ public class MapRecordProcessor extends 
   protected static final String MAP_PLAN_KEY = "__MAP_PLAN__";
   private MapWork mapWork;
 
+  public MapRecordProcessor(JobConf jconf) {
+    ObjectCache cache = ObjectCacheFactory.getCache(jconf);
+    execContext.setJc(jconf);
+    // create map and fetch operators
+    mapWork = (MapWork) cache.retrieve(MAP_PLAN_KEY);
+    if (mapWork == null) {
+      mapWork = Utilities.getMapWork(jconf);
+      cache.cache(MAP_PLAN_KEY, mapWork);
+      l4j.info("Plan: "+mapWork);
+      for (String s: mapWork.getAliases()) {
+        l4j.info("Alias: "+s);
+      }
+    } else {
+      Utilities.setMapWork(jconf, mapWork);
+    }
+  }
+
   @Override
-  void init(JobConf jconf, TezProcessorContext processorContext, MRTaskReporter mrReporter,
+  void init(JobConf jconf, ProcessorContext processorContext, MRTaskReporter mrReporter,
       Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception {
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS);
     super.init(jconf, processorContext, mrReporter, inputs, outputs);
@@ -87,22 +104,7 @@ public class MapRecordProcessor extends 
       ((TezKVOutputCollector) outMap.get(outputEntry.getKey())).initialize();
     }
 
-    ObjectCache cache = ObjectCacheFactory.getCache(jconf);
     try {
-
-      execContext.setJc(jconf);
-      // create map and fetch operators
-      mapWork = (MapWork) cache.retrieve(MAP_PLAN_KEY);
-      if (mapWork == null) {
-        mapWork = Utilities.getMapWork(jconf);
-        cache.cache(MAP_PLAN_KEY, mapWork);
-        l4j.info("Plan: "+mapWork);
-        for (String s: mapWork.getAliases()) {
-          l4j.info("Alias: "+s);
-        }
-      } else {
-        Utilities.setMapWork(jconf, mapWork);
-      }
       if (mapWork.getVectorMode()) {
         mapOp = new VectorMapOperator();
       } else {
@@ -115,7 +117,8 @@ public class MapRecordProcessor extends 
       l4j.info(mapOp.dump(0));
 
       MapredContext.init(true, new JobConf(jconf));
-      ((TezContext)MapredContext.get()).setInputs(inputs);
+      ((TezContext) MapredContext.get()).setInputs(inputs);
+      ((TezContext) MapredContext.get()).setTezProcessorContext(processorContext);
       mapOp.setExecContext(execContext);
       mapOp.initializeLocalWork(jconf);
       mapOp.initialize(jconf, null);

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapTezProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapTezProcessor.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapTezProcessor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapTezProcessor.java Mon Sep  8 04:38:17 2014
@@ -17,11 +17,15 @@
  */
 package org.apache.hadoop.hive.ql.exec.tez;
 
+import org.apache.tez.runtime.api.ProcessorContext;
+
 /**
  * Subclass that is used to indicate if this is a map or reduce process
  */
 public class MapTezProcessor extends TezProcessor {
-  public MapTezProcessor(){
-    super(true);
+
+  public MapTezProcessor(ProcessorContext context) {
+    super(context);
+    this.isMap = true;
   }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java Mon Sep  8 04:38:17 2014
@@ -20,24 +20,40 @@ package org.apache.hadoop.hive.ql.exec.t
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.tez.runtime.common.objectregistry.ObjectLifeCycle;
-import org.apache.tez.runtime.common.objectregistry.ObjectRegistry;
-import org.apache.tez.runtime.common.objectregistry.ObjectRegistryFactory;
+import org.apache.tez.runtime.api.ObjectRegistry;
 
+import com.google.common.base.Preconditions;
 
 /**
  * ObjectCache. Tez implementation based on the tez object registry.
  *
  */
 public class ObjectCache implements org.apache.hadoop.hive.ql.exec.ObjectCache {
-
+  
   private static final Log LOG = LogFactory.getLog(ObjectCache.class.getName());
-  private final ObjectRegistry registry = ObjectRegistryFactory.getObjectRegistry();
+  
+  // ObjectRegistry is available via the Input/Output/ProcessorContext.
+  // This is setup as part of the Tez Processor construction, so that it is available whenever an
+  // instance of the ObjectCache is created. The assumption is that Tez will initialize the Processor
+  // before anything else.
+  private volatile static ObjectRegistry staticRegistry;
+ 
+  private final ObjectRegistry registry;
+  
+  public ObjectCache() {
+    Preconditions.checkNotNull(staticRegistry,
+        "Object registry not setup yet. This should have been setup by the TezProcessor");
+    registry = staticRegistry;
+  }
 
+  public static void setupObjectRegistry(ObjectRegistry objectRegistry) {
+    staticRegistry = objectRegistry;
+  }
+  
   @Override
   public void cache(String key, Object value) {
     LOG.info("Adding " + key + " to cache with value " + value);
-    registry.add(ObjectLifeCycle.VERTEX, key, value);
+    registry.cacheForVertex(key, value);
   }
 
   @Override

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java Mon Sep  8 04:38:17 2014
@@ -32,7 +32,7 @@ import org.apache.hadoop.mapred.OutputCo
 import org.apache.tez.mapreduce.processor.MRTaskReporter;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
-import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.api.ProcessorContext;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
@@ -47,7 +47,7 @@ public abstract class RecordProcessor  {
   protected Map<String, LogicalInput> inputs;
   protected Map<String, LogicalOutput> outputs;
   protected Map<String, OutputCollector> outMap;
-  protected TezProcessorContext processorContext;
+  protected ProcessorContext processorContext;
 
   public static final Log l4j = LogFactory.getLog(RecordProcessor.class);
 
@@ -72,7 +72,7 @@ public abstract class RecordProcessor  {
    * @param outputs map of Output names to {@link LogicalOutput}s
    * @throws Exception
    */
-  void init(JobConf jconf, TezProcessorContext processorContext, MRTaskReporter mrReporter,
+  void init(JobConf jconf, ProcessorContext processorContext, MRTaskReporter mrReporter,
       Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception {
     this.jconf = jconf;
     this.reporter = mrReporter;

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java Mon Sep  8 04:38:17 2014
@@ -20,7 +20,6 @@ package org.apache.hadoop.hive.ql.exec.t
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -59,14 +58,13 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.tez.mapreduce.processor.MRTaskReporter;
 import org.apache.tez.runtime.api.Input;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
-import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.api.ProcessorContext;
 import org.apache.tez.runtime.library.api.KeyValuesReader;
 
 /**
@@ -113,7 +111,7 @@ public class ReduceRecordProcessor  exte
   private List<VectorExpressionWriter>[] valueStringWriters;
 
   @Override
-  void init(JobConf jconf, TezProcessorContext processorContext, MRTaskReporter mrReporter,
+  void init(JobConf jconf, ProcessorContext processorContext, MRTaskReporter mrReporter,
       Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception {
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS);
     super.init(jconf, processorContext, mrReporter, inputs, outputs);
@@ -140,7 +138,7 @@ public class ReduceRecordProcessor  exte
 
     try {
       keyTableDesc = redWork.getKeyDesc();
-      inputKeyDeserializer = (SerDe) ReflectionUtils.newInstance(keyTableDesc
+      inputKeyDeserializer = ReflectionUtils.newInstance(keyTableDesc
           .getDeserializerClass(), null);
       SerDeUtils.initializeSerDe(inputKeyDeserializer, null, keyTableDesc.getProperties(), null);
       keyObjectInspector = inputKeyDeserializer.getObjectInspector();
@@ -152,7 +150,7 @@ public class ReduceRecordProcessor  exte
         keyStructInspector = (StructObjectInspector)keyObjectInspector;
         batches = new VectorizedRowBatch[maxTags];
         valueStructInspectors = new StructObjectInspector[maxTags];
-        valueStringWriters = (List<VectorExpressionWriter>[])new List[maxTags];
+        valueStringWriters = new List[maxTags];
         keysColumnOffset = keyStructInspector.getAllStructFieldRefs().size();
         buffer = new DataOutputBuffer();
       }
@@ -215,7 +213,8 @@ public class ReduceRecordProcessor  exte
     }
 
     MapredContext.init(false, new JobConf(jconf));
-    ((TezContext)MapredContext.get()).setInputs(inputs);
+    ((TezContext) MapredContext.get()).setInputs(inputs);
+    ((TezContext) MapredContext.get()).setTezProcessorContext(processorContext);
 
     // initialize reduce operator tree
     try {
@@ -306,7 +305,7 @@ public class ReduceRecordProcessor  exte
     Map<Integer, String> tag2input = redWork.getTagToInput();
     ArrayList<LogicalInput> shuffleInputs = new ArrayList<LogicalInput>();
     for(String inpStr : tag2input.values()){
-      shuffleInputs.add((LogicalInput)inputs.get(inpStr));
+      shuffleInputs.add(inputs.get(inpStr));
     }
     return shuffleInputs;
   }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceTezProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceTezProcessor.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceTezProcessor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceTezProcessor.java Mon Sep  8 04:38:17 2014
@@ -17,11 +17,15 @@
  */
 package org.apache.hadoop.hive.ql.exec.tez;
 
+import org.apache.tez.runtime.api.ProcessorContext;
+
 /**
  * Subclass that is used to indicate if this is a map or reduce process
  */
 public class ReduceTezProcessor extends TezProcessor {
-  public ReduceTezProcessor(){
-    super(false);
+
+  public ReduceTezProcessor(ProcessorContext context) {
+    super(context);
+    this.isMap = false;
   }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java Mon Sep  8 04:38:17 2014
@@ -35,7 +35,7 @@ import org.apache.hadoop.mapred.FileSpli
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.split.TezGroupedSplit;
 import org.apache.hadoop.mapred.split.TezMapredSplitsGrouper;
-import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+import org.apache.tez.dag.api.TaskLocationHint;
 
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Lists;
@@ -141,13 +141,13 @@ public class SplitGrouper {
       String rack = (split instanceof TezGroupedSplit) ? ((TezGroupedSplit) split).getRack() : null;
       if (rack == null) {
         if (split.getLocations() != null) {
-          locationHints.add(new TaskLocationHint(new HashSet<String>(Arrays.asList(split
+          locationHints.add(TaskLocationHint.createTaskLocationHint(new HashSet<String>(Arrays.asList(split
               .getLocations())), null));
         } else {
-          locationHints.add(new TaskLocationHint(null, null));
+          locationHints.add(TaskLocationHint.createTaskLocationHint(null, null));
         }
       } else {
-        locationHints.add(new TaskLocationHint(null, Collections.singleton(rack)));
+        locationHints.add(TaskLocationHint.createTaskLocationHint(null, Collections.singleton(rack)));
       }
     }
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java Mon Sep  8 04:38:17 2014
@@ -23,6 +23,7 @@ import org.apache.hadoop.hive.ql.exec.Ma
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.ProcessorContext;
 
 /**
  * TezContext contains additional context only available with Tez
@@ -31,9 +32,11 @@ public class TezContext extends MapredCo
 
   // all the inputs for the tez processor
   private Map<String, LogicalInput> inputs;
-  
+
   private Map<String, LogicalOutput> outputs;
 
+  private ProcessorContext processorContext;
+
   public TezContext(boolean isMap, JobConf jobConf) {
     super(isMap, jobConf);
   }
@@ -41,7 +44,7 @@ public class TezContext extends MapredCo
   public void setInputs(Map<String, LogicalInput> inputs) {
     this.inputs = inputs;
   }
-  
+
   public void setOutputs(Map<String, LogicalOutput> outputs) {
     this.outputs = outputs;
   }
@@ -52,11 +55,19 @@ public class TezContext extends MapredCo
     }
     return inputs.get(name);
   }
-  
+
   public LogicalOutput getOutput(String name) {
     if (outputs == null) {
       return null;
     }
     return outputs.get(name);
   }
+
+  public void setTezProcessorContext(ProcessorContext processorContext) {
+    this.processorContext = processorContext;
+  }
+
+  public ProcessorContext getTezProcessorContext() {
+    return processorContext;
+  }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java Mon Sep  8 04:38:17 2014
@@ -142,7 +142,7 @@ public class TezJobMonitor {
             if (!running) {
               perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_SUBMIT_TO_RUNNING);
               console.printInfo("Status: Running (application id: "
-                +dagClient.getApplicationId()+")\n");
+                +dagClient.getExecutionContext()+")\n");
               for (String s: progressMap.keySet()) {
                 perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
               }