You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2014/09/05 22:16:10 UTC

svn commit: r1622783 [2/3] - in /hive/trunk: ./ common/src/java/org/apache/hadoop/hive/conf/ data/files/ hbase-handler/ itests/src/test/resources/ itests/util/src/main/java/org/apache/hadoop/hive/ql/ ql/ ql/if/ ql/src/gen/thrift/gen-cpp/ ql/src/gen/thr...

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java Fri Sep  5 20:16:08 2014
@@ -18,45 +18,29 @@
 package org.apache.hadoop.hive.ql.exec.tez;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.debug.Utils;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.MapredContext;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
 import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinBytesTableContainer;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKeyObject;
-import org.apache.hadoop.hive.ql.exec.persistence.LazyFlatRowContainer;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectSerDeContext;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
-import org.apache.hadoop.hive.serde2.ByteStream.Output;
 import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryFactory;
-import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe;
-import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryStruct;
-import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
-import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.library.api.KeyValueReader;
@@ -73,6 +57,7 @@ public class HashTableLoader implements 
   private Configuration hconf;
   private MapJoinDesc desc;
   private MapJoinKey lastKey = null;
+  private int rowCount = 0;
 
   @Override
   public void init(ExecMapperContext context, Configuration hconf, MapJoinOperator joinOp) {
@@ -125,6 +110,7 @@ public class HashTableLoader implements 
             : new HashMapWrapper(hconf, keyCount);
 
         while (kvReader.next()) {
+          rowCount++;
           lastKey = tableContainer.putRow(keyCtx, (Writable)kvReader.getCurrentKey(),
               valCtx, (Writable)kvReader.getCurrentValue());
         }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HivePreWarmProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HivePreWarmProcessor.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HivePreWarmProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HivePreWarmProcessor.java Fri Sep  5 20:16:08 2014
@@ -25,15 +25,15 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.ReadaheadPool;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
 import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.LogicalIOProcessor;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
-import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.api.ProcessorContext;
 
 import java.net.URL;
 import java.net.JarURLConnection;
-import java.util.ArrayList;
 import java.util.Enumeration;
 import java.util.List;
 import java.util.Map;
@@ -48,7 +48,7 @@ import javax.crypto.Mac;
  *
  * @see Config for configuring the HivePreWarmProcessor
  */
-public class HivePreWarmProcessor implements LogicalIOProcessor {
+public class HivePreWarmProcessor extends AbstractLogicalIOProcessor {
 
   private static boolean prewarmed = false;
 
@@ -56,10 +56,13 @@ public class HivePreWarmProcessor implem
 
   private Configuration conf;
 
+  public HivePreWarmProcessor(ProcessorContext context) {
+    super(context);
+  }
+
   @Override
-  public void initialize(TezProcessorContext processorContext)
-    throws Exception {
-    byte[] userPayload = processorContext.getUserPayload();
+  public void initialize() throws Exception {
+    UserPayload userPayload = getContext().getUserPayload();
     this.conf = TezUtils.createConfFromUserPayload(userPayload);
   }
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java Fri Sep  5 20:16:08 2014
@@ -35,19 +35,23 @@ import org.apache.hadoop.mapred.FileSpli
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.split.TezMapReduceSplitsGrouper;
 import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.VertexLocationHint;
+import org.apache.tez.dag.api.TaskLocationHint;
 import org.apache.tez.mapreduce.hadoop.InputSplitInfoMem;
-import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto;
 import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.TezRootInputInitializer;
-import org.apache.tez.runtime.api.TezRootInputInitializerContext;
-import org.apache.tez.runtime.api.events.RootInputConfigureVertexTasksEvent;
-import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
+import org.apache.tez.runtime.api.InputInitializer;
+import org.apache.tez.runtime.api.InputInitializerContext;
+import org.apache.tez.runtime.api.InputSpecUpdate;
+import org.apache.tez.runtime.api.events.InputConfigureVertexTasksEvent;
+import org.apache.tez.runtime.api.events.InputDataInformationEvent;
+import org.apache.tez.runtime.api.events.InputInitializerEvent;
 
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Lists;
@@ -59,20 +63,30 @@ import com.google.common.collect.Multima
  * making sure that splits from different partitions are only grouped if they
  * are of the same schema, format and serde
  */
-public class HiveSplitGenerator implements TezRootInputInitializer {
+@SuppressWarnings("deprecation")
+public class HiveSplitGenerator extends InputInitializer {
 
   private static final Log LOG = LogFactory.getLog(HiveSplitGenerator.class);
 
-  private final SplitGrouper grouper = new SplitGrouper();
+  private static final SplitGrouper grouper = new SplitGrouper();
+  private final DynamicPartitionPruner pruner = new DynamicPartitionPruner();
+  private InputInitializerContext context;
+
+  public HiveSplitGenerator(InputInitializerContext initializerContext) {
+    super(initializerContext);
+  }
 
   @Override
-  public List<Event> initialize(TezRootInputInitializerContext rootInputContext) throws Exception {
+  public List<Event> initialize() throws Exception {
+    InputInitializerContext rootInputContext = getContext();
+
+    context = rootInputContext;
 
     MRInputUserPayloadProto userPayloadProto =
-        MRHelpers.parseMRInputPayload(rootInputContext.getUserPayload());
+        MRInputHelpers.parseMRInputPayload(rootInputContext.getInputUserPayload());
 
     Configuration conf =
-        MRHelpers.createConfFromByteString(userPayloadProto.getConfigurationBytes());
+        TezUtils.createConfFromByteString(userPayloadProto.getConfigurationBytes());
 
     boolean sendSerializedEvents =
         conf.getBoolean("mapreduce.tez.input.initializer.serialize.event.payload", true);
@@ -81,42 +95,66 @@ public class HiveSplitGenerator implemen
     JobConf jobConf = new JobConf(conf);
     ShimLoader.getHadoopShims().getMergedCredentials(jobConf);
 
+    MapWork work = Utilities.getMapWork(jobConf);
+
+    // perform dynamic partition pruning
+    pruner.prune(work, jobConf, context);
+
     InputSplitInfoMem inputSplitInfo = null;
-    String realInputFormatName = userPayloadProto.getInputFormatName();
-    if (realInputFormatName != null && !realInputFormatName.isEmpty()) {
-      inputSplitInfo = generateGroupedSplits(rootInputContext, jobConf, conf, realInputFormatName);
+    String realInputFormatName = conf.get("mapred.input.format.class");
+    boolean groupingEnabled = userPayloadProto.getGroupingEnabled();
+    if (groupingEnabled) {
+      // Need to instantiate the realInputFormat
+      InputFormat<?, ?> inputFormat =
+          (InputFormat<?, ?>) ReflectionUtils.newInstance(Class.forName(realInputFormatName),
+              jobConf);
+
+      int totalResource = rootInputContext.getTotalAvailableResource().getMemory();
+      int taskResource = rootInputContext.getVertexTaskResource().getMemory();
+      int availableSlots = totalResource / taskResource;
+
+      // Create the un-grouped splits
+      float waves =
+          conf.getFloat(TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_WAVES,
+              TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_WAVES_DEFAULT);
+
+      InputSplit[] splits = inputFormat.getSplits(jobConf, (int) (availableSlots * waves));
+      LOG.info("Number of input splits: " + splits.length + ". " + availableSlots
+          + " available slots, " + waves + " waves. Input format is: " + realInputFormatName);
+
+      Multimap<Integer, InputSplit> groupedSplits =
+          generateGroupedSplits(jobConf, conf, splits, waves, availableSlots);
+      // And finally return them in a flat array
+      InputSplit[] flatSplits = groupedSplits.values().toArray(new InputSplit[0]);
+      LOG.info("Number of grouped splits: " + flatSplits.length);
+
+      List<TaskLocationHint> locationHints = grouper.createTaskLocationHints(flatSplits);
+
+      Utilities.clearWork(jobConf);
+
+      inputSplitInfo =
+          new InputSplitInfoMem(flatSplits, locationHints, flatSplits.length, null, jobConf);
     } else {
-      inputSplitInfo = MRHelpers.generateInputSplitsToMem(jobConf);
+      // no need for grouping and the target #of tasks.
+      // This code path should never be triggered at the moment. If grouping is disabled,
+      // DAGUtils uses MRInputAMSplitGenerator.
+      // If this is used in the future - make sure to disable grouping in the payload, if it isn't already disabled
+      throw new RuntimeException(
+          "HiveInputFormat does not support non-grouped splits, InputFormatName is: "
+              + realInputFormatName);
+      // inputSplitInfo = MRInputHelpers.generateInputSplitsToMem(jobConf, false, 0);
     }
 
     return createEventList(sendSerializedEvents, inputSplitInfo);
   }
 
-  private InputSplitInfoMem generateGroupedSplits(TezRootInputInitializerContext context,
-      JobConf jobConf, Configuration conf, String realInputFormatName) throws Exception {
 
-    int totalResource = context.getTotalAvailableResource().getMemory();
-    int taskResource = context.getVertexTaskResource().getMemory();
-    int availableSlots = totalResource / taskResource;
-
-    float waves =
-        conf.getFloat(TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES,
-            TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES_DEFAULT);
+  public static Multimap<Integer, InputSplit> generateGroupedSplits(JobConf jobConf,
+      Configuration conf, InputSplit[] splits, float waves, int availableSlots)
+      throws Exception {
 
     MapWork work = Utilities.getMapWork(jobConf);
 
-    LOG.info("Grouping splits for " + work.getName() + ". " + availableSlots + " available slots, "
-        + waves + " waves. Input format is: " + realInputFormatName);
-
-    // Need to instantiate the realInputFormat
-    InputFormat<?, ?> inputFormat =
-        (InputFormat<?, ?>) ReflectionUtils
-            .newInstance(Class.forName(realInputFormatName), jobConf);
-
-    // Create the un-grouped splits
-    InputSplit[] splits = inputFormat.getSplits(jobConf, (int) (availableSlots * waves));
-    LOG.info("Number of input splits: " + splits.length);
-
     Multimap<Integer, InputSplit> bucketSplitMultiMap =
         ArrayListMultimap.<Integer, InputSplit> create();
 
@@ -159,41 +197,42 @@ public class HiveSplitGenerator implemen
     Multimap<Integer, InputSplit> groupedSplits =
         grouper.group(jobConf, bucketSplitMultiMap, availableSlots, waves);
 
-    // And finally return them in a flat array
-    InputSplit[] flatSplits = groupedSplits.values().toArray(new InputSplit[0]);
-    LOG.info("Number of grouped splits: " + flatSplits.length);
-
-    List<TaskLocationHint> locationHints = grouper.createTaskLocationHints(flatSplits);
-
-    Utilities.clearWork(jobConf);
-
-    return new InputSplitInfoMem(flatSplits, locationHints, flatSplits.length, null, jobConf);
+    return groupedSplits;
   }
 
   private List<Event> createEventList(boolean sendSerializedEvents, InputSplitInfoMem inputSplitInfo) {
 
     List<Event> events = Lists.newArrayListWithCapacity(inputSplitInfo.getNumTasks() + 1);
 
-    RootInputConfigureVertexTasksEvent configureVertexEvent =
-        new RootInputConfigureVertexTasksEvent(inputSplitInfo.getNumTasks(),
-            inputSplitInfo.getTaskLocationHints());
+    InputConfigureVertexTasksEvent configureVertexEvent =
+        InputConfigureVertexTasksEvent.create(inputSplitInfo.getNumTasks(),
+        VertexLocationHint.create(inputSplitInfo.getTaskLocationHints()),
+        InputSpecUpdate.getDefaultSinglePhysicalInputSpecUpdate());
     events.add(configureVertexEvent);
 
     if (sendSerializedEvents) {
       MRSplitsProto splitsProto = inputSplitInfo.getSplitsProto();
       int count = 0;
       for (MRSplitProto mrSplit : splitsProto.getSplitsList()) {
-        RootInputDataInformationEvent diEvent =
-            new RootInputDataInformationEvent(count++, mrSplit.toByteArray());
+        InputDataInformationEvent diEvent = InputDataInformationEvent.createWithSerializedPayload(
+            count++, mrSplit.toByteString().asReadOnlyByteBuffer());
         events.add(diEvent);
       }
     } else {
       int count = 0;
       for (org.apache.hadoop.mapred.InputSplit split : inputSplitInfo.getOldFormatSplits()) {
-        RootInputDataInformationEvent diEvent = new RootInputDataInformationEvent(count++, split);
+        InputDataInformationEvent diEvent = InputDataInformationEvent.createWithObjectPayload(
+            count++, split);
         events.add(diEvent);
       }
     }
     return events;
   }
+
+  @Override
+  public void handleInputInitializerEvent(List<InputInitializerEvent> events) throws Exception {
+    for (InputInitializerEvent e : events) {
+      pruner.getQueue().put(e);
+    }
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java Fri Sep  5 20:16:08 2014
@@ -47,7 +47,7 @@ import org.apache.tez.mapreduce.input.MR
 import org.apache.tez.mapreduce.processor.MRTaskReporter;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
-import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.api.ProcessorContext;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 
 /**
@@ -64,8 +64,25 @@ public class MapRecordProcessor extends 
   protected static final String MAP_PLAN_KEY = "__MAP_PLAN__";
   private MapWork mapWork;
 
+  public MapRecordProcessor(JobConf jconf) {
+    ObjectCache cache = ObjectCacheFactory.getCache(jconf);
+    execContext.setJc(jconf);
+    // create map and fetch operators
+    mapWork = (MapWork) cache.retrieve(MAP_PLAN_KEY);
+    if (mapWork == null) {
+      mapWork = Utilities.getMapWork(jconf);
+      cache.cache(MAP_PLAN_KEY, mapWork);
+      l4j.info("Plan: "+mapWork);
+      for (String s: mapWork.getAliases()) {
+        l4j.info("Alias: "+s);
+      }
+    } else {
+      Utilities.setMapWork(jconf, mapWork);
+    }
+  }
+
   @Override
-  void init(JobConf jconf, TezProcessorContext processorContext, MRTaskReporter mrReporter,
+  void init(JobConf jconf, ProcessorContext processorContext, MRTaskReporter mrReporter,
       Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception {
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS);
     super.init(jconf, processorContext, mrReporter, inputs, outputs);
@@ -87,22 +104,7 @@ public class MapRecordProcessor extends 
       ((TezKVOutputCollector) outMap.get(outputEntry.getKey())).initialize();
     }
 
-    ObjectCache cache = ObjectCacheFactory.getCache(jconf);
     try {
-
-      execContext.setJc(jconf);
-      // create map and fetch operators
-      mapWork = (MapWork) cache.retrieve(MAP_PLAN_KEY);
-      if (mapWork == null) {
-        mapWork = Utilities.getMapWork(jconf);
-        cache.cache(MAP_PLAN_KEY, mapWork);
-        l4j.info("Plan: "+mapWork);
-        for (String s: mapWork.getAliases()) {
-          l4j.info("Alias: "+s);
-        }
-      } else {
-        Utilities.setMapWork(jconf, mapWork);
-      }
       if (mapWork.getVectorMode()) {
         mapOp = new VectorMapOperator();
       } else {
@@ -115,7 +117,8 @@ public class MapRecordProcessor extends 
       l4j.info(mapOp.dump(0));
 
       MapredContext.init(true, new JobConf(jconf));
-      ((TezContext)MapredContext.get()).setInputs(inputs);
+      ((TezContext) MapredContext.get()).setInputs(inputs);
+      ((TezContext) MapredContext.get()).setTezProcessorContext(processorContext);
       mapOp.setExecContext(execContext);
       mapOp.initializeLocalWork(jconf);
       mapOp.initialize(jconf, null);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapTezProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapTezProcessor.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapTezProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapTezProcessor.java Fri Sep  5 20:16:08 2014
@@ -17,11 +17,15 @@
  */
 package org.apache.hadoop.hive.ql.exec.tez;
 
+import org.apache.tez.runtime.api.ProcessorContext;
+
 /**
  * Subclass that is used to indicate if this is a map or reduce process
  */
 public class MapTezProcessor extends TezProcessor {
-  public MapTezProcessor(){
-    super(true);
+
+  public MapTezProcessor(ProcessorContext context) {
+    super(context);
+    this.isMap = true;
   }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java Fri Sep  5 20:16:08 2014
@@ -20,24 +20,40 @@ package org.apache.hadoop.hive.ql.exec.t
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.tez.runtime.common.objectregistry.ObjectLifeCycle;
-import org.apache.tez.runtime.common.objectregistry.ObjectRegistry;
-import org.apache.tez.runtime.common.objectregistry.ObjectRegistryFactory;
+import org.apache.tez.runtime.api.ObjectRegistry;
 
+import com.google.common.base.Preconditions;
 
 /**
  * ObjectCache. Tez implementation based on the tez object registry.
  *
  */
 public class ObjectCache implements org.apache.hadoop.hive.ql.exec.ObjectCache {
-
+  
   private static final Log LOG = LogFactory.getLog(ObjectCache.class.getName());
-  private final ObjectRegistry registry = ObjectRegistryFactory.getObjectRegistry();
+  
+  // ObjectRegistry is available via the Input/Output/ProcessorContext.
+  // This is setup as part of the Tez Processor construction, so that it is available whenever an
+  // instance of the ObjectCache is created. The assumption is that Tez will initialize the Processor
+  // before anything else.
+  private volatile static ObjectRegistry staticRegistry;
+ 
+  private final ObjectRegistry registry;
+  
+  public ObjectCache() {
+    Preconditions.checkNotNull(staticRegistry,
+        "Object registry not setup yet. This should have been setup by the TezProcessor");
+    registry = staticRegistry;
+  }
 
+  public static void setupObjectRegistry(ObjectRegistry objectRegistry) {
+    staticRegistry = objectRegistry;
+  }
+  
   @Override
   public void cache(String key, Object value) {
     LOG.info("Adding " + key + " to cache with value " + value);
-    registry.add(ObjectLifeCycle.VERTEX, key, value);
+    registry.cacheForVertex(key, value);
   }
 
   @Override

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java Fri Sep  5 20:16:08 2014
@@ -32,7 +32,7 @@ import org.apache.hadoop.mapred.OutputCo
 import org.apache.tez.mapreduce.processor.MRTaskReporter;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
-import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.api.ProcessorContext;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
@@ -47,7 +47,7 @@ public abstract class RecordProcessor  {
   protected Map<String, LogicalInput> inputs;
   protected Map<String, LogicalOutput> outputs;
   protected Map<String, OutputCollector> outMap;
-  protected TezProcessorContext processorContext;
+  protected ProcessorContext processorContext;
 
   public static final Log l4j = LogFactory.getLog(RecordProcessor.class);
 
@@ -72,7 +72,7 @@ public abstract class RecordProcessor  {
    * @param outputs map of Output names to {@link LogicalOutput}s
    * @throws Exception
    */
-  void init(JobConf jconf, TezProcessorContext processorContext, MRTaskReporter mrReporter,
+  void init(JobConf jconf, ProcessorContext processorContext, MRTaskReporter mrReporter,
       Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception {
     this.jconf = jconf;
     this.reporter = mrReporter;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java Fri Sep  5 20:16:08 2014
@@ -20,7 +20,6 @@ package org.apache.hadoop.hive.ql.exec.t
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -59,14 +58,13 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.tez.mapreduce.processor.MRTaskReporter;
 import org.apache.tez.runtime.api.Input;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
-import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.api.ProcessorContext;
 import org.apache.tez.runtime.library.api.KeyValuesReader;
 
 /**
@@ -113,7 +111,7 @@ public class ReduceRecordProcessor  exte
   private List<VectorExpressionWriter>[] valueStringWriters;
 
   @Override
-  void init(JobConf jconf, TezProcessorContext processorContext, MRTaskReporter mrReporter,
+  void init(JobConf jconf, ProcessorContext processorContext, MRTaskReporter mrReporter,
       Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception {
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS);
     super.init(jconf, processorContext, mrReporter, inputs, outputs);
@@ -140,7 +138,7 @@ public class ReduceRecordProcessor  exte
 
     try {
       keyTableDesc = redWork.getKeyDesc();
-      inputKeyDeserializer = (SerDe) ReflectionUtils.newInstance(keyTableDesc
+      inputKeyDeserializer = ReflectionUtils.newInstance(keyTableDesc
           .getDeserializerClass(), null);
       SerDeUtils.initializeSerDe(inputKeyDeserializer, null, keyTableDesc.getProperties(), null);
       keyObjectInspector = inputKeyDeserializer.getObjectInspector();
@@ -152,7 +150,7 @@ public class ReduceRecordProcessor  exte
         keyStructInspector = (StructObjectInspector)keyObjectInspector;
         batches = new VectorizedRowBatch[maxTags];
         valueStructInspectors = new StructObjectInspector[maxTags];
-        valueStringWriters = (List<VectorExpressionWriter>[])new List[maxTags];
+        valueStringWriters = new List[maxTags];
         keysColumnOffset = keyStructInspector.getAllStructFieldRefs().size();
         buffer = new DataOutputBuffer();
       }
@@ -215,7 +213,8 @@ public class ReduceRecordProcessor  exte
     }
 
     MapredContext.init(false, new JobConf(jconf));
-    ((TezContext)MapredContext.get()).setInputs(inputs);
+    ((TezContext) MapredContext.get()).setInputs(inputs);
+    ((TezContext) MapredContext.get()).setTezProcessorContext(processorContext);
 
     // initialize reduce operator tree
     try {
@@ -306,7 +305,7 @@ public class ReduceRecordProcessor  exte
     Map<Integer, String> tag2input = redWork.getTagToInput();
     ArrayList<LogicalInput> shuffleInputs = new ArrayList<LogicalInput>();
     for(String inpStr : tag2input.values()){
-      shuffleInputs.add((LogicalInput)inputs.get(inpStr));
+      shuffleInputs.add(inputs.get(inpStr));
     }
     return shuffleInputs;
   }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceTezProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceTezProcessor.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceTezProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceTezProcessor.java Fri Sep  5 20:16:08 2014
@@ -17,11 +17,15 @@
  */
 package org.apache.hadoop.hive.ql.exec.tez;
 
+import org.apache.tez.runtime.api.ProcessorContext;
+
 /**
  * Subclass that is used to indicate if this is a map or reduce process
  */
 public class ReduceTezProcessor extends TezProcessor {
-  public ReduceTezProcessor(){
-    super(false);
+
+  public ReduceTezProcessor(ProcessorContext context) {
+    super(context);
+    this.isMap = false;
   }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java Fri Sep  5 20:16:08 2014
@@ -35,7 +35,7 @@ import org.apache.hadoop.mapred.FileSpli
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.split.TezGroupedSplit;
 import org.apache.hadoop.mapred.split.TezMapredSplitsGrouper;
-import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+import org.apache.tez.dag.api.TaskLocationHint;
 
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Lists;
@@ -141,13 +141,13 @@ public class SplitGrouper {
       String rack = (split instanceof TezGroupedSplit) ? ((TezGroupedSplit) split).getRack() : null;
       if (rack == null) {
         if (split.getLocations() != null) {
-          locationHints.add(new TaskLocationHint(new HashSet<String>(Arrays.asList(split
+          locationHints.add(TaskLocationHint.createTaskLocationHint(new HashSet<String>(Arrays.asList(split
               .getLocations())), null));
         } else {
-          locationHints.add(new TaskLocationHint(null, null));
+          locationHints.add(TaskLocationHint.createTaskLocationHint(null, null));
         }
       } else {
-        locationHints.add(new TaskLocationHint(null, Collections.singleton(rack)));
+        locationHints.add(TaskLocationHint.createTaskLocationHint(null, Collections.singleton(rack)));
       }
     }
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezContext.java Fri Sep  5 20:16:08 2014
@@ -23,6 +23,7 @@ import org.apache.hadoop.hive.ql.exec.Ma
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.ProcessorContext;
 
 /**
  * TezContext contains additional context only available with Tez
@@ -31,9 +32,11 @@ public class TezContext extends MapredCo
 
   // all the inputs for the tez processor
   private Map<String, LogicalInput> inputs;
-  
+
   private Map<String, LogicalOutput> outputs;
 
+  private ProcessorContext processorContext;
+
   public TezContext(boolean isMap, JobConf jobConf) {
     super(isMap, jobConf);
   }
@@ -41,7 +44,7 @@ public class TezContext extends MapredCo
   public void setInputs(Map<String, LogicalInput> inputs) {
     this.inputs = inputs;
   }
-  
+
   public void setOutputs(Map<String, LogicalOutput> outputs) {
     this.outputs = outputs;
   }
@@ -52,11 +55,19 @@ public class TezContext extends MapredCo
     }
     return inputs.get(name);
   }
-  
+
   public LogicalOutput getOutput(String name) {
     if (outputs == null) {
       return null;
     }
     return outputs.get(name);
   }
+
+  public void setTezProcessorContext(ProcessorContext processorContext) {
+    this.processorContext = processorContext;
+  }
+
+  public ProcessorContext getTezProcessorContext() {
+    return processorContext;
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java Fri Sep  5 20:16:08 2014
@@ -142,7 +142,7 @@ public class TezJobMonitor {
             if (!running) {
               perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_SUBMIT_TO_RUNNING);
               console.printInfo("Status: Running (application id: "
-                +dagClient.getApplicationId()+")\n");
+                +dagClient.getExecutionContext()+")\n");
               for (String s: progressMap.keySet()) {
                 perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
               }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java Fri Sep  5 20:16:08 2014
@@ -33,23 +33,23 @@ import org.apache.hadoop.util.StringUtil
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.mapreduce.input.MRInputLegacy;
 import org.apache.tez.mapreduce.processor.MRTaskReporter;
+import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
 import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.LogicalIOProcessor;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
-import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.api.ProcessorContext;
 import org.apache.tez.runtime.library.api.KeyValueWriter;
 
 /**
  * Hive processor for Tez that forms the vertices in Tez and processes the data.
  * Does what ExecMapper and ExecReducer does for hive in MR framework.
  */
-public class TezProcessor implements LogicalIOProcessor {
+public class TezProcessor extends AbstractLogicalIOProcessor {
 
 
 
   private static final Log LOG = LogFactory.getLog(TezProcessor.class);
-  private boolean isMap = false;
+  protected boolean isMap = false;
 
   RecordProcessor rproc = null;
 
@@ -58,8 +58,6 @@ public class TezProcessor implements Log
   private static final String CLASS_NAME = TezProcessor.class.getName();
   private final PerfLogger perfLogger = PerfLogger.getPerfLogger();
 
-  private TezProcessorContext processorContext;
-
   protected static final NumberFormat taskIdFormat = NumberFormat.getInstance();
   protected static final NumberFormat jobIdFormat = NumberFormat.getInstance();
   static {
@@ -69,8 +67,9 @@ public class TezProcessor implements Log
     jobIdFormat.setMinimumIntegerDigits(4);
   }
 
-  public TezProcessor(boolean isMap) {
-    this.isMap = isMap;
+  public TezProcessor(ProcessorContext context) {
+    super(context);
+    ObjectCache.setupObjectRegistry(context.getObjectRegistry());
   }
 
   @Override
@@ -86,19 +85,15 @@ public class TezProcessor implements Log
   }
 
   @Override
-  public void initialize(TezProcessorContext processorContext)
-      throws IOException {
+  public void initialize() throws IOException {
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INITIALIZE_PROCESSOR);
-    this.processorContext = processorContext;
-    //get the jobconf
-    byte[] userPayload = processorContext.getUserPayload();
-    Configuration conf = TezUtils.createConfFromUserPayload(userPayload);
+    Configuration conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload());
     this.jobConf = new JobConf(conf);
-    setupMRLegacyConfigs(processorContext);
+    setupMRLegacyConfigs(getContext());
     perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_INITIALIZE_PROCESSOR);
   }
 
-  private void setupMRLegacyConfigs(TezProcessorContext processorContext) {
+  private void setupMRLegacyConfigs(ProcessorContext processorContext) {
     // Hive "insert overwrite local directory" uses task id as dir name
     // Setting the id in jobconf helps to have the similar dir name as MR
     StringBuilder taskAttemptIdBuilder = new StringBuilder("attempt_");
@@ -133,10 +128,10 @@ public class TezProcessor implements Log
       // in case of broadcast-join read the broadcast edge inputs
       // (possibly asynchronously)
 
-      LOG.info("Running task: " + processorContext.getUniqueIdentifier());
+      LOG.info("Running task: " + getContext().getUniqueIdentifier());
 
       if (isMap) {
-        rproc = new MapRecordProcessor();
+        rproc = new MapRecordProcessor(jobConf);
         MRInputLegacy mrInput = getMRInput(inputs);
         try {
           mrInput.init();
@@ -160,8 +155,8 @@ public class TezProcessor implements Log
 
       // Outputs will be started later by the individual Processors.
 
-      MRTaskReporter mrReporter = new MRTaskReporter(processorContext);
-      rproc.init(jobConf, processorContext, mrReporter, inputs, outputs);
+      MRTaskReporter mrReporter = new MRTaskReporter(getContext());
+      rproc.init(jobConf, getContext(), mrReporter, inputs, outputs);
       rproc.run();
 
       //done - output does not need to be committed as hive does not use outputcommitter
@@ -207,6 +202,7 @@ public class TezProcessor implements Log
       this.writer = (KeyValueWriter) output.getWriter();
     }
 
+    @Override
     public void collect(Object key, Object value) throws IOException {
       writer.write(key, value);
     }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java Fri Sep  5 20:16:08 2014
@@ -47,10 +47,8 @@ import org.apache.hadoop.hive.ql.session
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.tez.client.AMConfiguration;
-import org.apache.tez.client.PreWarmContext;
-import org.apache.tez.client.TezSession;
-import org.apache.tez.client.TezSessionConfiguration;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.dag.api.PreWarmVertex;
 import org.apache.tez.dag.api.SessionNotRunning;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
@@ -67,7 +65,7 @@ public class TezSessionState {
   private HiveConf conf;
   private Path tezScratchDir;
   private LocalResource appJarLr;
-  private TezSession session;
+  private TezClient session;
   private String sessionId;
   private DagUtils utils;
   private String queueName;
@@ -150,11 +148,6 @@ public class TezSessionState {
 
     refreshLocalResourcesFromConf(conf);
 
-    // generate basic tez config
-    TezConfiguration tezConfig = new TezConfiguration(conf);
-
-    tezConfig.set(TezConfiguration.TEZ_AM_STAGING_DIR, tezScratchDir.toUri().toString());
-
     // unless already installed on all the cluster nodes, we'll have to
     // localize hive-exec.jar as well.
     appJarLr = createJarLocalResource(utils.getExecJarPathLocal());
@@ -168,15 +161,23 @@ public class TezSessionState {
 
     // Create environment for AM.
     Map<String, String> amEnv = new HashMap<String, String>();
-    MRHelpers.updateEnvironmentForMRAM(conf, amEnv);
+    MRHelpers.updateEnvBasedOnMRAMEnv(conf, amEnv);
 
-    AMConfiguration amConfig = new AMConfiguration(amEnv, commonLocalResources, tezConfig, null);
+    // and finally we're ready to create and start the session
+    // generate basic tez config
+    TezConfiguration tezConfig = new TezConfiguration(conf);
+    tezConfig.set(TezConfiguration.TEZ_AM_STAGING_DIR, tezScratchDir.toUri().toString());
 
-    // configuration for the session
-    TezSessionConfiguration sessionConfig = new TezSessionConfiguration(amConfig, tezConfig);
+    if (HiveConf.getBoolVar(conf, ConfVars.HIVE_PREWARM_ENABLED)) {
+      int n = HiveConf.getIntVar(conf, ConfVars.HIVE_PREWARM_NUM_CONTAINERS);
+      n = Math.max(tezConfig.getInt(
+          TezConfiguration.TEZ_AM_SESSION_MIN_HELD_CONTAINERS,
+          TezConfiguration.TEZ_AM_SESSION_MIN_HELD_CONTAINERS_DEFAULT), n);
+      tezConfig.setInt(TezConfiguration.TEZ_AM_SESSION_MIN_HELD_CONTAINERS, n);
+    }
 
-    // and finally we're ready to create and start the session
-    session = new TezSession("HIVE-" + sessionId, sessionConfig);
+    session = TezClient.create("HIVE-" + sessionId, tezConfig, true,
+        commonLocalResources, null);
 
     LOG.info("Opening new Tez Session (id: " + sessionId
         + ", scratch dir: " + tezScratchDir + ")");
@@ -187,20 +188,30 @@ public class TezSessionState {
       int n = HiveConf.getIntVar(conf, ConfVars.HIVE_PREWARM_NUM_CONTAINERS);
       LOG.info("Prewarming " + n + " containers  (id: " + sessionId
           + ", scratch dir: " + tezScratchDir + ")");
-      PreWarmContext context = utils.createPreWarmContext(sessionConfig, n, commonLocalResources);
+      PreWarmVertex prewarmVertex = utils.createPreWarmVertex(tezConfig, n,
+          commonLocalResources);
       try {
-        session.preWarm(context);
-      } catch (InterruptedException ie) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Hive Prewarm threw an exception ", ie);
+        session.preWarm(prewarmVertex);
+      } catch (IOException ie) {
+        if (ie.getMessage().contains("Interrupted while waiting")) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Hive Prewarm threw an exception ", ie);
+          }
+        } else {
+          throw ie;
         }
       }
     }
-
+    try {
+      session.waitTillReady();
+    } catch(InterruptedException ie) {
+      //ignore
+    }
     // In case we need to run some MR jobs, we'll run them under tez MR emulation. The session
     // id is used for tez to reuse the current session rather than start a new one.
     conf.set("mapreduce.framework.name", "yarn-tez");
-    conf.set("mapreduce.tez.session.tokill-application-id", session.getApplicationId().toString());
+    conf.set("mapreduce.tez.session.tokill-application-id",
+        session.getAppMasterApplicationId().toString());
 
     openSessions.add(this);
   }
@@ -277,7 +288,7 @@ public class TezSessionState {
     return sessionId;
   }
 
-  public TezSession getSession() {
+  public TezClient getSession() {
     return session;
   }
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java Fri Sep  5 20:16:08 2014
@@ -29,7 +29,6 @@ import java.util.Set;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.DriverContext;
 import org.apache.hadoop.hive.ql.exec.Operator;
@@ -67,7 +66,7 @@ import org.apache.tez.dag.api.client.Sta
  * using the Tez APIs directly.
  *
  */
-@SuppressWarnings({"serial", "deprecation"})
+@SuppressWarnings({"serial"})
 public class TezTask extends Task<TezWork> {
 
   private static final String CLASS_NAME = TezTask.class.getName();
@@ -135,7 +134,7 @@ public class TezTask extends Task<TezWor
       }
 
       List<LocalResource> additionalLr = session.getLocalizedResources();
-      
+
       // log which resources we're adding (apart from the hive exec)
       if (LOG.isDebugEnabled()) {
         if (additionalLr == null || additionalLr.size() == 0) {
@@ -166,7 +165,7 @@ public class TezTask extends Task<TezWor
       counters = client.getDAGStatus(statusGetOpts).getDAGCounters();
       TezSessionPoolManager.getInstance().returnSession(session);
 
-      if (LOG.isInfoEnabled()) {
+      if (LOG.isInfoEnabled() && counters != null) {
         for (CounterGroup group: counters) {
           LOG.info(group.getDisplayName() +":");
           for (TezCounter counter: group) {
@@ -212,7 +211,7 @@ public class TezTask extends Task<TezWor
     FileSystem fs = scratchDir.getFileSystem(conf);
 
     // the name of the dag is what is displayed in the AM/Job UI
-    DAG dag = new DAG(work.getName());
+    DAG dag = DAG.create(work.getName());
 
     for (BaseWork w: ws) {
 
@@ -247,16 +246,14 @@ public class TezTask extends Task<TezWor
         }
         VertexGroup group = dag.createVertexGroup(w.getName(), vertexArray);
 
+        // For a vertex group, all Outputs use the same Key-class, Val-class and partitioner.
+        // Pick any one source vertex to figure out the Edge configuration.
+        JobConf parentConf = workToConf.get(unionWorkItems.get(0));
+
         // now hook up the children
         for (BaseWork v: children) {
-          // need to pairwise patch up the configuration of the vertices
-          for (BaseWork part: unionWorkItems) {
-            utils.updateConfigurationForEdge(workToConf.get(part), workToVertex.get(part),
-                 workToConf.get(v), workToVertex.get(v));
-          }
-
           // finally we can create the grouped edge
-          GroupInputEdge e = utils.createEdge(group, workToConf.get(v),
+          GroupInputEdge e = utils.createEdge(group, parentConf,
                workToVertex.get(v), work.getEdgeProperty(w, v));
 
           dag.addEdge(e);
@@ -279,7 +276,7 @@ public class TezTask extends Task<TezWor
 
           TezEdgeProperty edgeProp = work.getEdgeProperty(w, v);
 
-          e = utils.createEdge(wxConf, wx, workToConf.get(v), workToVertex.get(v), edgeProp);
+          e = utils.createEdge(wxConf, wx, workToVertex.get(v), edgeProp);
           dag.addEdge(e);
         }
       }
@@ -305,7 +302,8 @@ public class TezTask extends Task<TezWor
 
     try {
       // ready to start execution on the cluster
-      dagClient = sessionState.getSession().submitDAG(dag, resourceMap);
+      sessionState.getSession().addAppMasterLocalFiles(resourceMap);
+      dagClient = sessionState.getSession().submitDAG(dag);
     } catch (SessionNotRunning nr) {
       console.printInfo("Tez session was closed. Reopening...");
 
@@ -313,7 +311,7 @@ public class TezTask extends Task<TezWor
       TezSessionPoolManager.getInstance().closeAndOpen(sessionState, this.conf);
       console.printInfo("Session re-established.");
 
-      dagClient = sessionState.getSession().submitDAG(dag, resourceMap);
+      dagClient = sessionState.getSession().submitDAG(dag);
     }
 
     perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_SUBMIT_DAG);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/InputMerger.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/InputMerger.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/InputMerger.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/InputMerger.java Fri Sep  5 20:16:08 2014
@@ -37,7 +37,7 @@ import org.apache.tez.runtime.library.ap
  * Uses a priority queue to pick the KeyValuesReader of the input that is next in
  * sort order.
  */
-public class InputMerger implements KeyValuesReader {
+public class InputMerger extends KeyValuesReader {
 
   public static final Log l4j = LogFactory.getLog(ReduceRecordProcessor.class);
   private PriorityQueue<KeyValuesReader> pQueue = null;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java Fri Sep  5 20:16:08 2014
@@ -18,9 +18,11 @@
 package org.apache.hadoop.hive.ql.exec.tez.tools;
 
 import java.util.IdentityHashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.tez.runtime.api.Input;
+import org.apache.tez.runtime.api.MergedInputContext;
 import org.apache.tez.runtime.api.MergedLogicalInput;
 import org.apache.tez.runtime.api.Reader;
 
@@ -31,7 +33,11 @@ import org.apache.tez.runtime.api.Reader
 public class TezMergedLogicalInput extends MergedLogicalInput {
 
   private Map<Input, Boolean> readyInputs = new IdentityHashMap<Input, Boolean>();
-  
+
+  public TezMergedLogicalInput(MergedInputContext context, List<Input> inputs) {
+    super(context, inputs);
+  }
+ 
   @Override
   public Reader getReader() throws Exception {
     return new InputMerger(getInputs());

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java Fri Sep  5 20:16:08 2014
@@ -18,14 +18,13 @@
 
 package org.apache.hadoop.hive.ql.io;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapred.lib.HashPartitioner;
 
 /** Partition keys by their {@link Object#hashCode()}. */
 public class DefaultHivePartitioner<K2, V2> extends HashPartitioner<K2, V2> implements HivePartitioner<K2, V2> {
 
   /** Use {@link Object#hashCode()} to partition. */
+  @Override
   public int getBucket(K2 key, V2 value, int numBuckets) {
     return (key.hashCode() & Integer.MAX_VALUE) % numBuckets;
   }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java Fri Sep  5 20:16:08 2014
@@ -13,6 +13,8 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hive.common.FileUtils;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java Fri Sep  5 20:16:08 2014
@@ -941,12 +941,12 @@ public final class ConstantPropagateProc
         return null;
       }
 
-      List<ExprNodeDesc> newChildren = new ArrayList<ExprNodeDesc>();
-      for (ExprNodeDesc expr : pred.getChildren()) {
-        ExprNodeDesc constant = foldExpr(expr, constants, cppCtx, op, 0, false);
-        newChildren.add(constant);
+      ExprNodeDesc constant = foldExpr(pred, constants, cppCtx, op, 0, false);
+      if (constant instanceof ExprNodeGenericFuncDesc) {
+        conf.setFilterExpr((ExprNodeGenericFuncDesc) constant);
+      } else {
+        conf.setFilterExpr(null);
       }
-      pred.setChildren(newChildren);
       return null;
     }
   }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java Fri Sep  5 20:16:08 2014
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.optimizer;
 
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -27,6 +28,7 @@ import java.util.Stack;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator;
 import org.apache.hadoop.hive.ql.exec.GroupByOperator;
 import org.apache.hadoop.hive.ql.exec.JoinOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
@@ -39,6 +41,7 @@ import org.apache.hadoop.hive.ql.lib.Nod
 import org.apache.hadoop.hive.ql.parse.OptimizeTezProcContext;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
@@ -363,6 +366,19 @@ public class ConvertJoinMapJoin implemen
     Operator<? extends OperatorDesc> parentBigTableOp
       = mapJoinOp.getParentOperators().get(bigTablePosition);
     if (parentBigTableOp instanceof ReduceSinkOperator) {
+      for (Operator<?> p : parentBigTableOp.getParentOperators()) {
+        // we might have generated a dynamic partition operator chain. Since
+        // we're removing the reduce sink we need do remove that too.
+        Set<Operator<?>> dynamicPartitionOperators = new HashSet<Operator<?>>();
+        for (Operator<?> c : p.getChildOperators()) {
+          if (hasDynamicPartitionBroadcast(c)) {
+            dynamicPartitionOperators.add(c);
+          }
+        }
+        for (Operator<?> c : dynamicPartitionOperators) {
+          p.removeChild(c);
+        }
+      }
       mapJoinOp.getParentOperators().remove(bigTablePosition);
       if (!(mapJoinOp.getParentOperators().contains(
               parentBigTableOp.getParentOperators().get(0)))) {
@@ -380,4 +396,16 @@ public class ConvertJoinMapJoin implemen
 
     return mapJoinOp;
   }
+
+  private boolean hasDynamicPartitionBroadcast(Operator<?> op) {
+    if (op instanceof AppMasterEventOperator && op.getConf() instanceof DynamicPruningEventDesc) {
+      return true;
+    }
+    for (Operator<?> c : op.getChildOperators()) {
+      if (hasDynamicPartitionBroadcast(c)) {
+        return true;
+      }
+    }
+    return false;
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java Fri Sep  5 20:16:08 2014
@@ -36,6 +36,7 @@ import org.apache.hadoop.hive.ql.parse.P
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.ppd.PredicatePushDown;
 import org.apache.hadoop.hive.ql.ppd.PredicateTransitivePropagate;
+import org.apache.hadoop.hive.ql.ppd.SyntheticJoinPredicate;
 
 /**
  * Implementation of the optimizer.
@@ -55,6 +56,7 @@ public class Optimizer {
     transformations.add(new Generator());
     if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTPPD)) {
       transformations.add(new PredicateTransitivePropagate());
+      transformations.add(new SyntheticJoinPredicate());
       transformations.add(new PredicatePushDown());
       transformations.add(new PartitionPruner());
       transformations.add(new PartitionConditionRemover());
@@ -125,8 +127,8 @@ public class Optimizer {
     if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTIMIZEMETADATAQUERIES)) {
       transformations.add(new StatsOptimizer());
     }
-    if (pctx.getContext().getExplain() ||
-        HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
+    if (pctx.getContext().getExplain()
+        && !HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
       transformations.add(new AnnotateWithStatistics());
       transformations.add(new AnnotateWithOpTraits());
     }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/PrunerExpressionOperatorFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/PrunerExpressionOperatorFactory.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/PrunerExpressionOperatorFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/PrunerExpressionOperatorFactory.java Fri Sep  5 20:16:08 2014
@@ -186,8 +186,7 @@ public abstract class PrunerExpressionOp
         return ((ExprNodeNullDesc) nd).clone();
       }
 
-      assert (false);
-      return null;
+      return new ExprNodeConstantDesc(((ExprNodeDesc)nd).getTypeInfo(), null);
     }
   }
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java Fri Sep  5 20:16:08 2014
@@ -26,8 +26,6 @@ import java.util.Stack;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
@@ -42,7 +40,6 @@ import org.apache.hadoop.hive.ql.parse.G
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.ColStatistics;
-import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.HashTableDummyDesc;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
@@ -134,7 +131,8 @@ public class ReduceSinkMapJoinProc imple
         String prefix = Utilities.ReduceField.KEY.toString();
         for (String keyCol : keyCols) {
           ExprNodeDesc realCol = parentRS.getColumnExprMap().get(prefix + "." + keyCol);
-          ColStatistics cs = StatsUtils.getColStatisticsFromExpression(null, stats, realCol);
+          ColStatistics cs =
+              StatsUtils.getColStatisticsFromExpression(context.conf, stats, realCol);
           if (cs == null || cs.getCountDistint() <= 0) {
             maxKeyCount = Long.MAX_VALUE;
             break;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcFactory.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcFactory.java Fri Sep  5 20:16:08 2014
@@ -396,8 +396,7 @@ public final class PcrExprProcFactory {
         return new NodeInfoWrapper(WalkState.CONSTANT, null,
             (ExprNodeDesc) nd);
       }
-      assert (false);
-      return null;
+      return new NodeInfoWrapper(WalkState.UNKNOWN, null, (ExprNodeDesc)nd);
     }
   }
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java Fri Sep  5 20:16:08 2014
@@ -18,13 +18,18 @@
 
 package org.apache.hadoop.hive.ql.optimizer.stats.annotation;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Stack;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator;
 import org.apache.hadoop.hive.ql.exec.ColumnInfo;
 import org.apache.hadoop.hive.ql.exec.CommonJoinOperator;
 import org.apache.hadoop.hive.ql.exec.FilterOperator;
@@ -67,12 +72,8 @@ import org.apache.hadoop.hive.ql.udf.gen
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr;
 import org.apache.hadoop.hive.serde.serdeConstants;
 
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Stack;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 
 public class StatsRulesProcFactory {
 
@@ -657,7 +658,8 @@ public class StatsRulesProcFactory {
           if (parentStats != null) {
 
             // worst case, in the absence of column statistics assume half the rows are emitted
-            if (gop.getChildOperators().get(0) instanceof ReduceSinkOperator) {
+            if (gop.getChildOperators().get(0) instanceof ReduceSinkOperator
+                || gop.getChildOperators().get(0) instanceof AppMasterEventOperator) {
 
               // map side
               stats = parentStats.clone();

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java Fri Sep  5 20:16:08 2014
@@ -22,16 +22,14 @@ import java.util.Stack;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
-import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
 
 /**
- * FileSinkProcessor handles addition of merge, move and stats tasks for filesinks
+ * FileSinkProcessor is a simple rule to remember seen file sinks for later
+ * processing.
  *
  */
 public class FileSinkProcessor implements NodeProcessor {
@@ -39,12 +37,6 @@ public class FileSinkProcessor implement
   static final private Log LOG = LogFactory.getLog(FileSinkProcessor.class.getName());
 
   @Override
-  /*
-   * (non-Javadoc)
-   * we should ideally not modify the tree we traverse.
-   * However, since we need to walk the tree at any time when we modify the
-   * operator, we might as well do it here.
-   */
   public Object process(Node nd, Stack<Node> stack,
       NodeProcessorCtx procCtx, Object... nodeOutputs)
       throws SemanticException {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java Fri Sep  5 20:16:08 2014
@@ -26,29 +26,28 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator;
 import org.apache.hadoop.hive.ql.exec.DependencyCollectionTask;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.exec.UnionOperator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.UnionOperator;
 import org.apache.hadoop.hive.ql.exec.tez.TezTask;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork;
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.MoveWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
-import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
-import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
 import org.apache.hadoop.hive.ql.plan.TezWork;
 
 /**
@@ -134,6 +133,15 @@ public class GenTezProcContext implement
   // remember which reducesinks we've already connected
   public final Set<ReduceSinkOperator> connectedReduceSinks;
 
+  // remember the event operators we've seen
+  public final Set<AppMasterEventOperator> eventOperatorSet;
+
+  // remember the event operators we've abandoned.
+  public final Set<AppMasterEventOperator> abandonedEventOperatorSet;
+
+  // remember the connections between ts and event
+  public final Map<TableScanOperator, List<AppMasterEventOperator>> tsToEventMap;
+
   @SuppressWarnings("unchecked")
   public GenTezProcContext(HiveConf conf, ParseContext parseContext,
       List<Task<MoveWork>> moveTask, List<Task<? extends Serializable>> rootTasks,
@@ -165,6 +173,9 @@ public class GenTezProcContext implement
     this.linkedFileSinks = new LinkedHashMap<Path, List<FileSinkDesc>>();
     this.fileSinkSet = new LinkedHashSet<FileSinkOperator>();
     this.connectedReduceSinks = new LinkedHashSet<ReduceSinkOperator>();
+    this.eventOperatorSet = new LinkedHashSet<AppMasterEventOperator>();
+    this.abandonedEventOperatorSet = new LinkedHashSet<AppMasterEventOperator>();
+    this.tsToEventMap = new LinkedHashMap<TableScanOperator, List<AppMasterEventOperator>>();
 
     rootTasks.add(currentTask);
   }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java Fri Sep  5 20:16:08 2014
@@ -20,38 +20,43 @@ package org.apache.hadoop.hive.ql.parse;
 
 import java.util.ArrayList;
 import java.util.Deque;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.List;
 import java.util.LinkedList;
-import java.util.Map;
+import java.util.List;
 import java.util.Set;
 
-import org.apache.hadoop.fs.Path;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator;
 import org.apache.hadoop.hive.ql.exec.FetchTask;
-import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.UnionOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
 import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
 import org.apache.hadoop.hive.ql.plan.TezWork;
 import org.apache.hadoop.hive.ql.plan.UnionWork;
 
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+
 /**
  * GenTezUtils is a collection of shared helper methods to produce
  * TezWork
@@ -119,12 +124,12 @@ public class GenTezUtils {
       int maxReducers = context.conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS);
 
       // min we allow tez to pick
-      int minPartition = Math.max(1, (int) (reduceSink.getConf().getNumReducers() 
+      int minPartition = Math.max(1, (int) (reduceSink.getConf().getNumReducers()
         * minPartitionFactor));
       minPartition = (minPartition > maxReducers) ? maxReducers : minPartition;
 
       // max we allow tez to pick
-      int maxPartition = (int) (reduceSink.getConf().getNumReducers() * maxPartitionFactor); 
+      int maxPartition = (int) (reduceSink.getConf().getNumReducers() * maxPartitionFactor);
       maxPartition = (maxPartition > maxReducers) ? maxReducers : maxPartition;
 
       reduceWork.setMinReduceTasks(minPartition);
@@ -210,18 +215,20 @@ public class GenTezUtils {
       BaseWork work)
     throws SemanticException {
 
-    Set<Operator<?>> roots = work.getAllRootOperators();
+    List<Operator<?>> roots = new ArrayList<Operator<?>>();
+    roots.addAll(work.getAllRootOperators());
     if (work.getDummyOps() != null) {
       roots.addAll(work.getDummyOps());
     }
+    roots.addAll(context.eventOperatorSet);
 
     // need to clone the plan.
-    Set<Operator<?>> newRoots = Utilities.cloneOperatorTree(conf, roots);
+    List<Operator<?>> newRoots = Utilities.cloneOperatorTree(conf, roots);
 
     // we're cloning the operator plan but we're retaining the original work. That means
     // that root operators have to be replaced with the cloned ops. The replacement map
     // tells you what that mapping is.
-    Map<Operator<?>, Operator<?>> replacementMap = new HashMap<Operator<?>, Operator<?>>();
+    BiMap<Operator<?>, Operator<?>> replacementMap = HashBiMap.create();
 
     // there's some special handling for dummyOps required. Mapjoins won't be properly
     // initialized if their dummy parents aren't initialized. Since we cloned the plan
@@ -231,11 +238,35 @@ public class GenTezUtils {
     Iterator<Operator<?>> it = newRoots.iterator();
     for (Operator<?> orig: roots) {
       Operator<?> newRoot = it.next();
+
+      replacementMap.put(orig, newRoot);
+
       if (newRoot instanceof HashTableDummyOperator) {
-        dummyOps.add((HashTableDummyOperator)newRoot);
+        // dummy ops need to be updated to the cloned ones.
+        dummyOps.add((HashTableDummyOperator) newRoot);
+        it.remove();
+      } else if (newRoot instanceof AppMasterEventOperator) {
+        // event operators point to table scan operators. When cloning these we
+        // need to restore the original scan.
+        if (newRoot.getConf() instanceof DynamicPruningEventDesc) {
+          TableScanOperator ts = ((DynamicPruningEventDesc) orig.getConf()).getTableScan();
+          if (ts == null) {
+            throw new AssertionError("No table scan associated with dynamic event pruning. " + orig);
+          }
+          ((DynamicPruningEventDesc) newRoot.getConf()).setTableScan(ts);
+        }
         it.remove();
       } else {
-        replacementMap.put(orig,newRoot);
+        if (newRoot instanceof TableScanOperator) {
+          if (context.tsToEventMap.containsKey(orig)) {
+            // we need to update event operators with the cloned table scan
+            for (AppMasterEventOperator event : context.tsToEventMap.get(orig)) {
+              ((DynamicPruningEventDesc) event.getConf()).setTableScan((TableScanOperator) newRoot);
+            }
+          }
+        }
+        context.rootToWorkMap.remove(orig);
+        context.rootToWorkMap.put(newRoot, work);
       }
     }
 
@@ -272,6 +303,15 @@ public class GenTezUtils {
         desc.setLinkedFileSinkDesc(linked);
       }
 
+      if (current instanceof AppMasterEventOperator) {
+        // remember for additional processing later
+        context.eventOperatorSet.add((AppMasterEventOperator) current);
+
+        // mark the original as abandoned. Don't need it anymore.
+        context.abandonedEventOperatorSet.add((AppMasterEventOperator) replacementMap.inverse()
+            .get(current));
+      }
+
       if (current instanceof UnionOperator) {
         Operator<?> parent = null;
         int count = 0;
@@ -337,4 +377,87 @@ public class GenTezUtils {
       }
     }
   }
+
+  /**
+   * processAppMasterEvent sets up the event descriptor and the MapWork.
+   *
+   * @param procCtx
+   * @param event
+   */
+  public void processAppMasterEvent(GenTezProcContext procCtx, AppMasterEventOperator event) {
+
+    if (procCtx.abandonedEventOperatorSet.contains(event)) {
+      // don't need this anymore
+      return;
+    }
+
+    DynamicPruningEventDesc eventDesc = (DynamicPruningEventDesc)event.getConf();
+    TableScanOperator ts = eventDesc.getTableScan();
+
+    MapWork work = (MapWork) procCtx.rootToWorkMap.get(ts);
+    if (work == null) {
+      throw new AssertionError("No work found for tablescan " + ts);
+    }
+
+    BaseWork enclosingWork = getEnclosingWork(event, procCtx);
+    if (enclosingWork == null) {
+      throw new AssertionError("Cannot find work for operator" + event);
+    }
+    String sourceName = enclosingWork.getName();
+
+    // store the vertex name in the operator pipeline
+    eventDesc.setVertexName(work.getName());
+    eventDesc.setInputName(work.getAliases().get(0));
+
+    // store table descriptor in map-work
+    if (!work.getEventSourceTableDescMap().containsKey(sourceName)) {
+      work.getEventSourceTableDescMap().put(sourceName, new LinkedList<TableDesc>());
+    }
+    List<TableDesc> tables = work.getEventSourceTableDescMap().get(sourceName);
+    tables.add(event.getConf().getTable());
+
+    // store column name in map-work
+    if (!work.getEventSourceColumnNameMap().containsKey(sourceName)) {
+      work.getEventSourceColumnNameMap().put(sourceName, new LinkedList<String>());
+    }
+    List<String> columns = work.getEventSourceColumnNameMap().get(sourceName);
+    columns.add(eventDesc.getTargetColumnName());
+
+    // store partition key expr in map-work
+    if (!work.getEventSourcePartKeyExprMap().containsKey(sourceName)) {
+      work.getEventSourcePartKeyExprMap().put(sourceName, new LinkedList<ExprNodeDesc>());
+    }
+    List<ExprNodeDesc> keys = work.getEventSourcePartKeyExprMap().get(sourceName);
+    keys.add(eventDesc.getPartKey());
+
+  }
+
+  /**
+   * getEncosingWork finds the BaseWork any given operator belongs to.
+   */
+  public BaseWork getEnclosingWork(Operator<?> op, GenTezProcContext procCtx) {
+    List<Operator<?>> ops = new ArrayList<Operator<?>>();
+    findRoots(op, ops);
+    for (Operator<?> r : ops) {
+      BaseWork work = procCtx.rootToWorkMap.get(r);
+      if (work != null) {
+        return work;
+      }
+    }
+    return null;
+  }
+
+  /*
+   * findRoots returns all root operators (in ops) that result in operator op
+   */
+  private void findRoots(Operator<?> op, List<Operator<?>> ops) {
+    List<Operator<?>> parents = op.getParentOperators();
+    if (parents == null || parents.isEmpty()) {
+      ops.add(op);
+      return;
+    }
+    for (Operator<?> p : parents) {
+      findRoots(p, ops);
+    }
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/OptimizeTezProcContext.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/OptimizeTezProcContext.java?rev=1622783&r1=1622782&r2=1622783&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/OptimizeTezProcContext.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/OptimizeTezProcContext.java Fri Sep  5 20:16:08 2014
@@ -23,13 +23,18 @@ import java.util.HashSet;
 import java.util.Set;
 
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+
 /**
  * OptimizeTezProcContext. OptimizeTezProcContext maintains information
  * about the current operator plan as we walk the operator tree
@@ -47,19 +52,23 @@ public class OptimizeTezProcContext impl
   public final Set<ReduceSinkOperator> visitedReduceSinks
     = new HashSet<ReduceSinkOperator>();
 
+  public final Multimap<AppMasterEventOperator, TableScanOperator> eventOpToTableScanMap =
+      HashMultimap.create();
+
   // rootOperators are all the table scan operators in sequence
   // of traversal
-  public final Deque<Operator<? extends OperatorDesc>> rootOperators;
+  public Deque<Operator<? extends OperatorDesc>> rootOperators;
 
-  @SuppressWarnings("unchecked")
-  public OptimizeTezProcContext(HiveConf conf, ParseContext parseContext,
-      Set<ReadEntity> inputs, Set<WriteEntity> outputs,
-      Deque<Operator<?>> rootOperators) {
+  public OptimizeTezProcContext(HiveConf conf, ParseContext parseContext, Set<ReadEntity> inputs,
+      Set<WriteEntity> outputs) {
 
     this.conf = conf;
     this.parseContext = parseContext;
     this.inputs = inputs;
     this.outputs = outputs;
-    this.rootOperators = rootOperators;
+  }
+
+  public void setRootOperators(Deque<Operator<? extends OperatorDesc>> roots) {
+    this.rootOperators = roots;
   }
 }