You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2014/09/03 12:46:06 UTC

svn commit: r1622216 [2/4] - in /hive/branches/tez: common/src/java/org/apache/hadoop/hive/conf/ data/files/ itests/qtest/ ql/if/ ql/src/gen/thrift/gen-cpp/ ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/ ql/src/gen/thrift/gen-php/ q...

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java?rev=1622216&r1=1622215&r2=1622216&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java Wed Sep  3 10:46:04 2014
@@ -22,16 +22,14 @@ import java.util.Stack;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
-import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
 
 /**
- * FileSinkProcessor handles addition of merge, move and stats tasks for filesinks
+ * FileSinkProcessor is a simple rule to remember seen file sinks for later
+ * processing.
  *
  */
 public class FileSinkProcessor implements NodeProcessor {
@@ -39,12 +37,6 @@ public class FileSinkProcessor implement
   static final private Log LOG = LogFactory.getLog(FileSinkProcessor.class.getName());
 
   @Override
-  /*
-   * (non-Javadoc)
-   * we should ideally not modify the tree we traverse.
-   * However, since we need to walk the tree at any time when we modify the
-   * operator, we might as well do it here.
-   */
   public Object process(Node nd, Stack<Node> stack,
       NodeProcessorCtx procCtx, Object... nodeOutputs)
       throws SemanticException {

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java?rev=1622216&r1=1622215&r2=1622216&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java Wed Sep  3 10:46:04 2014
@@ -26,29 +26,28 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator;
 import org.apache.hadoop.hive.ql.exec.DependencyCollectionTask;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.exec.UnionOperator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.UnionOperator;
 import org.apache.hadoop.hive.ql.exec.tez.TezTask;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork;
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.MoveWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
-import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
-import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
 import org.apache.hadoop.hive.ql.plan.TezWork;
 
 /**
@@ -134,6 +133,15 @@ public class GenTezProcContext implement
   // remember which reducesinks we've already connected
   public final Set<ReduceSinkOperator> connectedReduceSinks;
 
+  // remember the event operators we've seen
+  public final Set<AppMasterEventOperator> eventOperatorSet;
+
+  // remember the event operators we've abandoned.
+  public final Set<AppMasterEventOperator> abandonedEventOperatorSet;
+
+  // remember the connections between ts and event
+  public final Map<TableScanOperator, List<AppMasterEventOperator>> tsToEventMap;
+
   @SuppressWarnings("unchecked")
   public GenTezProcContext(HiveConf conf, ParseContext parseContext,
       List<Task<MoveWork>> moveTask, List<Task<? extends Serializable>> rootTasks,
@@ -165,6 +173,9 @@ public class GenTezProcContext implement
     this.linkedFileSinks = new LinkedHashMap<Path, List<FileSinkDesc>>();
     this.fileSinkSet = new LinkedHashSet<FileSinkOperator>();
     this.connectedReduceSinks = new LinkedHashSet<ReduceSinkOperator>();
+    this.eventOperatorSet = new LinkedHashSet<AppMasterEventOperator>();
+    this.abandonedEventOperatorSet = new LinkedHashSet<AppMasterEventOperator>();
+    this.tsToEventMap = new LinkedHashMap<TableScanOperator, List<AppMasterEventOperator>>();
 
     rootTasks.add(currentTask);
   }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java?rev=1622216&r1=1622215&r2=1622216&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java Wed Sep  3 10:46:04 2014
@@ -20,38 +20,43 @@ package org.apache.hadoop.hive.ql.parse;
 
 import java.util.ArrayList;
 import java.util.Deque;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.List;
 import java.util.LinkedList;
-import java.util.Map;
+import java.util.List;
 import java.util.Set;
 
-import org.apache.hadoop.fs.Path;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator;
 import org.apache.hadoop.hive.ql.exec.FetchTask;
-import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.UnionOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
 import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
 import org.apache.hadoop.hive.ql.plan.TezWork;
 import org.apache.hadoop.hive.ql.plan.UnionWork;
 
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+
 /**
  * GenTezUtils is a collection of shared helper methods to produce
  * TezWork
@@ -119,12 +124,12 @@ public class GenTezUtils {
       int maxReducers = context.conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS);
 
       // min we allow tez to pick
-      int minPartition = Math.max(1, (int) (reduceSink.getConf().getNumReducers() 
+      int minPartition = Math.max(1, (int) (reduceSink.getConf().getNumReducers()
         * minPartitionFactor));
       minPartition = (minPartition > maxReducers) ? maxReducers : minPartition;
 
       // max we allow tez to pick
-      int maxPartition = (int) (reduceSink.getConf().getNumReducers() * maxPartitionFactor); 
+      int maxPartition = (int) (reduceSink.getConf().getNumReducers() * maxPartitionFactor);
       maxPartition = (maxPartition > maxReducers) ? maxReducers : maxPartition;
 
       reduceWork.setMinReduceTasks(minPartition);
@@ -210,18 +215,20 @@ public class GenTezUtils {
       BaseWork work)
     throws SemanticException {
 
-    Set<Operator<?>> roots = work.getAllRootOperators();
+    List<Operator<?>> roots = new ArrayList<Operator<?>>();
+    roots.addAll(work.getAllRootOperators());
     if (work.getDummyOps() != null) {
       roots.addAll(work.getDummyOps());
     }
+    roots.addAll(context.eventOperatorSet);
 
     // need to clone the plan.
-    Set<Operator<?>> newRoots = Utilities.cloneOperatorTree(conf, roots);
+    List<Operator<?>> newRoots = Utilities.cloneOperatorTree(conf, roots);
 
     // we're cloning the operator plan but we're retaining the original work. That means
     // that root operators have to be replaced with the cloned ops. The replacement map
     // tells you what that mapping is.
-    Map<Operator<?>, Operator<?>> replacementMap = new HashMap<Operator<?>, Operator<?>>();
+    BiMap<Operator<?>, Operator<?>> replacementMap = HashBiMap.create();
 
     // there's some special handling for dummyOps required. Mapjoins won't be properly
     // initialized if their dummy parents aren't initialized. Since we cloned the plan
@@ -231,11 +238,35 @@ public class GenTezUtils {
     Iterator<Operator<?>> it = newRoots.iterator();
     for (Operator<?> orig: roots) {
       Operator<?> newRoot = it.next();
+
+      replacementMap.put(orig, newRoot);
+
       if (newRoot instanceof HashTableDummyOperator) {
-        dummyOps.add((HashTableDummyOperator)newRoot);
+        // dummy ops need to be updated to the cloned ones.
+        dummyOps.add((HashTableDummyOperator) newRoot);
+        it.remove();
+      } else if (newRoot instanceof AppMasterEventOperator) {
+        // event operators point to table scan operators. When cloning these we
+        // need to restore the original scan.
+        if (newRoot.getConf() instanceof DynamicPruningEventDesc) {
+          TableScanOperator ts = ((DynamicPruningEventDesc) orig.getConf()).getTableScan();
+          if (ts == null) {
+            throw new AssertionError("No table scan associated with dynamic event pruning. " + orig);
+          }
+          ((DynamicPruningEventDesc) newRoot.getConf()).setTableScan(ts);
+        }
         it.remove();
       } else {
-        replacementMap.put(orig,newRoot);
+        if (newRoot instanceof TableScanOperator) {
+          if (context.tsToEventMap.containsKey(orig)) {
+            // we need to update event operators with the cloned table scan
+            for (AppMasterEventOperator event : context.tsToEventMap.get(orig)) {
+              ((DynamicPruningEventDesc) event.getConf()).setTableScan((TableScanOperator) newRoot);
+            }
+          }
+        }
+        context.rootToWorkMap.remove(orig);
+        context.rootToWorkMap.put(newRoot, work);
       }
     }
 
@@ -272,6 +303,15 @@ public class GenTezUtils {
         desc.setLinkedFileSinkDesc(linked);
       }
 
+      if (current instanceof AppMasterEventOperator) {
+        // remember for additional processing later
+        context.eventOperatorSet.add((AppMasterEventOperator) current);
+
+        // mark the original as abandoned. Don't need it anymore.
+        context.abandonedEventOperatorSet.add((AppMasterEventOperator) replacementMap.inverse()
+            .get(current));
+      }
+
       if (current instanceof UnionOperator) {
         Operator<?> parent = null;
         int count = 0;
@@ -337,4 +377,87 @@ public class GenTezUtils {
       }
     }
   }
+
+  /**
+   * processAppMasterEvent sets up the event descriptor and the MapWork.
+   *
+   * @param procCtx
+   * @param event
+   */
+  public void processAppMasterEvent(GenTezProcContext procCtx, AppMasterEventOperator event) {
+
+    if (procCtx.abandonedEventOperatorSet.contains(event)) {
+      // don't need this anymore
+      return;
+    }
+
+    DynamicPruningEventDesc eventDesc = (DynamicPruningEventDesc)event.getConf();
+    TableScanOperator ts = eventDesc.getTableScan();
+
+    MapWork work = (MapWork) procCtx.rootToWorkMap.get(ts);
+    if (work == null) {
+      throw new AssertionError("No work found for tablescan " + ts);
+    }
+
+    BaseWork enclosingWork = getEnclosingWork(event, procCtx);
+    if (enclosingWork == null) {
+      throw new AssertionError("Cannot find work for operator" + event);
+    }
+    String sourceName = enclosingWork.getName();
+
+    // store the vertex name in the operator pipeline
+    eventDesc.setVertexName(work.getName());
+    eventDesc.setInputName(work.getAliases().get(0));
+
+    // store table descriptor in map-work
+    if (!work.getEventSourceTableDescMap().containsKey(sourceName)) {
+      work.getEventSourceTableDescMap().put(sourceName, new LinkedList<TableDesc>());
+    }
+    List<TableDesc> tables = work.getEventSourceTableDescMap().get(sourceName);
+    tables.add(event.getConf().getTable());
+
+    // store column name in map-work
+    if (!work.getEventSourceColumnNameMap().containsKey(sourceName)) {
+      work.getEventSourceColumnNameMap().put(sourceName, new LinkedList<String>());
+    }
+    List<String> columns = work.getEventSourceColumnNameMap().get(sourceName);
+    columns.add(eventDesc.getTargetColumnName());
+
+    // store partition key expr in map-work
+    if (!work.getEventSourcePartKeyExprMap().containsKey(sourceName)) {
+      work.getEventSourcePartKeyExprMap().put(sourceName, new LinkedList<ExprNodeDesc>());
+    }
+    List<ExprNodeDesc> keys = work.getEventSourcePartKeyExprMap().get(sourceName);
+    keys.add(eventDesc.getPartKey());
+
+  }
+
+  /**
+   * getEncosingWork finds the BaseWork any given operator belongs to.
+   */
+  public BaseWork getEnclosingWork(Operator<?> op, GenTezProcContext procCtx) {
+    List<Operator<?>> ops = new ArrayList<Operator<?>>();
+    findRoots(op, ops);
+    for (Operator<?> r : ops) {
+      BaseWork work = procCtx.rootToWorkMap.get(r);
+      if (work != null) {
+        return work;
+      }
+    }
+    return null;
+  }
+
+  /*
+   * findRoots returns all root operators (in ops) that result in operator op
+   */
+  private void findRoots(Operator<?> op, List<Operator<?>> ops) {
+    List<Operator<?>> parents = op.getParentOperators();
+    if (parents == null || parents.isEmpty()) {
+      ops.add(op);
+      return;
+    }
+    for (Operator<?> p : parents) {
+      findRoots(p, ops);
+    }
+  }
 }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/OptimizeTezProcContext.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/OptimizeTezProcContext.java?rev=1622216&r1=1622215&r2=1622216&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/OptimizeTezProcContext.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/OptimizeTezProcContext.java Wed Sep  3 10:46:04 2014
@@ -23,13 +23,18 @@ import java.util.HashSet;
 import java.util.Set;
 
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+
 /**
  * OptimizeTezProcContext. OptimizeTezProcContext maintains information
  * about the current operator plan as we walk the operator tree
@@ -47,19 +52,23 @@ public class OptimizeTezProcContext impl
   public final Set<ReduceSinkOperator> visitedReduceSinks
     = new HashSet<ReduceSinkOperator>();
 
+  public final Multimap<AppMasterEventOperator, TableScanOperator> eventOpToTableScanMap =
+      HashMultimap.create();
+
   // rootOperators are all the table scan operators in sequence
   // of traversal
-  public final Deque<Operator<? extends OperatorDesc>> rootOperators;
+  public Deque<Operator<? extends OperatorDesc>> rootOperators;
 
-  @SuppressWarnings("unchecked")
-  public OptimizeTezProcContext(HiveConf conf, ParseContext parseContext,
-      Set<ReadEntity> inputs, Set<WriteEntity> outputs,
-      Deque<Operator<?>> rootOperators) {
+  public OptimizeTezProcContext(HiveConf conf, ParseContext parseContext, Set<ReadEntity> inputs,
+      Set<WriteEntity> outputs) {
 
     this.conf = conf;
     this.parseContext = parseContext;
     this.inputs = inputs;
     this.outputs = outputs;
-    this.rootOperators = rootOperators;
+  }
+
+  public void setRootOperators(Deque<Operator<? extends OperatorDesc>> roots) {
+    this.rootOperators = roots;
   }
 }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java?rev=1622216&r1=1622215&r2=1622216&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java Wed Sep  3 10:46:04 2014
@@ -21,20 +21,24 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Deque;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.Stack;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator;
 import org.apache.hadoop.hive.ql.exec.ConditionalTask;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.FilterOperator;
 import org.apache.hadoop.hive.ql.exec.JoinOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
@@ -52,20 +56,25 @@ import org.apache.hadoop.hive.ql.lib.For
 import org.apache.hadoop.hive.ql.lib.GraphWalker;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
-import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
 import org.apache.hadoop.hive.ql.lib.Rule;
 import org.apache.hadoop.hive.ql.lib.RuleRegExp;
 import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.optimizer.ConstantPropagate;
 import org.apache.hadoop.hive.ql.optimizer.ConvertJoinMapJoin;
+import org.apache.hadoop.hive.ql.optimizer.DynamicPartitionPruningOptimization;
 import org.apache.hadoop.hive.ql.optimizer.ReduceSinkMapJoinProc;
+import org.apache.hadoop.hive.ql.optimizer.RemoveDynamicPruningBySize;
 import org.apache.hadoop.hive.ql.optimizer.SetReducerParallelism;
+import org.apache.hadoop.hive.ql.optimizer.metainfo.annotation.AnnotateWithOpTraits;
 import org.apache.hadoop.hive.ql.optimizer.physical.CrossProductCheck;
 import org.apache.hadoop.hive.ql.optimizer.physical.MetadataOnlyOptimizer;
 import org.apache.hadoop.hive.ql.optimizer.physical.NullScanOptimizer;
 import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext;
-import org.apache.hadoop.hive.ql.optimizer.physical.Vectorizer;
 import org.apache.hadoop.hive.ql.optimizer.physical.StageIDsRearranger;
+import org.apache.hadoop.hive.ql.optimizer.physical.Vectorizer;
+import org.apache.hadoop.hive.ql.optimizer.stats.annotation.AnnotateWithStatistics;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MoveWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -85,7 +94,7 @@ public class TezCompiler extends TaskCom
   @Override
   public void init(HiveConf conf, LogHelper console, Hive db) {
     super.init(conf, console, db);
-    
+
     // Tez requires us to use RPC for the query plan
     HiveConf.setBoolVar(conf, ConfVars.HIVE_RPC_QUERY_PLAN, true);
 
@@ -98,31 +107,203 @@ public class TezCompiler extends TaskCom
   protected void optimizeOperatorPlan(ParseContext pCtx, Set<ReadEntity> inputs,
       Set<WriteEntity> outputs) throws SemanticException {
 
-    // Sequence of TableScan operators to be walked
+    // Create the context for the walker
+    OptimizeTezProcContext procCtx = new OptimizeTezProcContext(conf, pCtx, inputs, outputs);
+
+    // setup dynamic partition pruning where possible
+    runDynamicPartitionPruning(procCtx, inputs, outputs);
+
+    // setup stats in the operator plan
+    runStatsAnnotation(procCtx);
+
+    // run the optimizations that use stats for optimization
+    runStatsDependentOptimizations(procCtx, inputs, outputs);
+
+    // after the stats phase we might have some cyclic dependencies that we need
+    // to take care of.
+    runCycleAnalysisForPartitionPruning(procCtx, inputs, outputs);
+
+  }
+
+  private void runCycleAnalysisForPartitionPruning(OptimizeTezProcContext procCtx,
+      Set<ReadEntity> inputs, Set<WriteEntity> outputs) throws SemanticException {
+
+    if (!procCtx.conf.getBoolVar(ConfVars.TEZ_DYNAMIC_PARTITION_PRUNING)) {
+      return;
+    }
+
+    boolean cycleFree = false;
+    while (!cycleFree) {
+      cycleFree = true;
+      Set<Set<Operator<?>>> components = getComponents(procCtx);
+      for (Set<Operator<?>> component : components) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Component: ");
+          for (Operator<?> co : component) {
+            LOG.debug("Operator: " + co.getName() + ", " + co.getIdentifier());
+          }
+        }
+        if (component.size() != 1) {
+          LOG.info("Found cycle in operator plan...");
+          cycleFree = false;
+          removeEventOperator(component);
+        }
+      }
+      LOG.info("Cycle free: " + cycleFree);
+    }
+  }
+
+  private void removeEventOperator(Set<Operator<?>> component) {
+    AppMasterEventOperator victim = null;
+    for (Operator<?> o : component) {
+      if (o instanceof AppMasterEventOperator) {
+        if (victim == null
+            || o.getConf().getStatistics().getDataSize() < victim.getConf().getStatistics()
+                .getDataSize()) {
+          victim = (AppMasterEventOperator) o;
+        }
+      }
+    }
+
+    Operator<?> child = victim;
+    Operator<?> curr = victim;
+
+    while (curr.getChildOperators().size() <= 1) {
+      child = curr;
+      curr = curr.getParentOperators().get(0);
+    }
+
+    // at this point we've found the fork in the op pipeline that has the
+    // pruning as a child plan.
+    LOG.info("Disabling dynamic pruning for: "
+        + ((DynamicPruningEventDesc) victim.getConf()).getTableScan().toString()
+        + ". Needed to break cyclic dependency");
+    curr.removeChild(child);
+  }
+
+  // Tarjan's algo
+  private Set<Set<Operator<?>>> getComponents(OptimizeTezProcContext procCtx) {
     Deque<Operator<?>> deque = new LinkedList<Operator<?>>();
-    deque.addAll(pCtx.getTopOps().values());
+    deque.addAll(procCtx.parseContext.getTopOps().values());
 
-    // Create the context for the walker
-    OptimizeTezProcContext procCtx
-      = new OptimizeTezProcContext(conf, pCtx, inputs, outputs, deque);
+    AtomicInteger index = new AtomicInteger();
+    Map<Operator<?>, Integer> indexes = new HashMap<Operator<?>, Integer>();
+    Map<Operator<?>, Integer> lowLinks = new HashMap<Operator<?>, Integer>();
+    Stack<Operator<?>> nodes = new Stack<Operator<?>>();
+    Set<Set<Operator<?>>> components = new HashSet<Set<Operator<?>>>();
+
+    for (Operator<?> o : deque) {
+      if (!indexes.containsKey(o)) {
+        connect(o, index, nodes, indexes, lowLinks, components);
+      }
+    }
+
+    return components;
+  }
+
+  private void connect(Operator<?> o, AtomicInteger index, Stack<Operator<?>> nodes,
+      Map<Operator<?>, Integer> indexes, Map<Operator<?>, Integer> lowLinks,
+      Set<Set<Operator<?>>> components) {
+
+    indexes.put(o, index.get());
+    lowLinks.put(o, index.get());
+    index.incrementAndGet();
+    nodes.push(o);
+
+    List<Operator<?>> children;
+    if (o instanceof AppMasterEventOperator) {
+      children = new ArrayList<Operator<?>>();
+      children.addAll(o.getChildOperators());
+      TableScanOperator ts = ((DynamicPruningEventDesc) o.getConf()).getTableScan();
+      LOG.debug("Adding special edge: " + o.getName() + " --> " + ts.toString());
+      children.add(ts);
+    } else {
+      children = o.getChildOperators();
+    }
+
+    for (Operator<?> child : children) {
+      if (!indexes.containsKey(child)) {
+        connect(child, index, nodes, indexes, lowLinks, components);
+        lowLinks.put(child, Math.min(lowLinks.get(o), lowLinks.get(child)));
+      } else if (nodes.contains(child)) {
+        lowLinks.put(o, Math.min(lowLinks.get(o), indexes.get(child)));
+      }
+    }
+
+    if (lowLinks.get(o).equals(indexes.get(o))) {
+      Set<Operator<?>> component = new HashSet<Operator<?>>();
+      components.add(component);
+      Operator<?> current;
+      do {
+        current = nodes.pop();
+        component.add(current);
+      } while (current != o);
+    }
+  }
+
+  private void runStatsAnnotation(OptimizeTezProcContext procCtx) throws SemanticException {
+    new AnnotateWithStatistics().transform(procCtx.parseContext);
+    new AnnotateWithOpTraits().transform(procCtx.parseContext);
+  }
+
+  private void runStatsDependentOptimizations(OptimizeTezProcContext procCtx,
+      Set<ReadEntity> inputs, Set<WriteEntity> outputs) throws SemanticException {
+
+    // Sequence of TableScan operators to be walked
+    Deque<Operator<?>> deque = new LinkedList<Operator<?>>();
+    deque.addAll(procCtx.parseContext.getTopOps().values());
 
     // create a walker which walks the tree in a DFS manner while maintaining
     // the operator stack.
     Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
-    opRules.put(new RuleRegExp(new String("Set parallelism - ReduceSink"),
+    opRules.put(new RuleRegExp("Set parallelism - ReduceSink",
         ReduceSinkOperator.getOperatorName() + "%"),
         new SetReducerParallelism());
 
-    opRules.put(new RuleRegExp(new String("Convert Join to Map-join"),
+    opRules.put(new RuleRegExp("Convert Join to Map-join",
         JoinOperator.getOperatorName() + "%"), new ConvertJoinMapJoin());
 
+    opRules.put(
+        new RuleRegExp("Remove dynamic pruning by size",
+        AppMasterEventOperator.getOperatorName() + "%"),
+        new RemoveDynamicPruningBySize());
+
     // The dispatcher fires the processor corresponding to the closest matching
     // rule and passes the context along
     Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx);
     List<Node> topNodes = new ArrayList<Node>();
-    topNodes.addAll(pCtx.getTopOps().values());
+    topNodes.addAll(procCtx.parseContext.getTopOps().values());
+    GraphWalker ogw = new ForwardWalker(disp);
+    ogw.startWalking(topNodes, null);
+  }
+
+  private void runDynamicPartitionPruning(OptimizeTezProcContext procCtx, Set<ReadEntity> inputs,
+      Set<WriteEntity> outputs) throws SemanticException {
+
+    if (!procCtx.conf.getBoolVar(ConfVars.TEZ_DYNAMIC_PARTITION_PRUNING)) {
+      return;
+    }
+
+    // Sequence of TableScan operators to be walked
+    Deque<Operator<?>> deque = new LinkedList<Operator<?>>();
+    deque.addAll(procCtx.parseContext.getTopOps().values());
+
+    Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+    opRules.put(
+        new RuleRegExp(new String("Dynamic Partition Pruning"), FilterOperator.getOperatorName()
+            + "%"), new DynamicPartitionPruningOptimization());
+
+    // The dispatcher fires the processor corresponding to the closest matching
+    // rule and passes the context along
+    Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx);
+    List<Node> topNodes = new ArrayList<Node>();
+    topNodes.addAll(procCtx.parseContext.getTopOps().values());
     GraphWalker ogw = new ForwardWalker(disp);
     ogw.startWalking(topNodes, null);
+
+    // need a new run of the constant folding because we might have created lots
+    // of "and true and true" conditions.
+    new ConstantPropagate().transform(procCtx.parseContext);
   }
 
   @Override
@@ -158,19 +339,12 @@ public class TezCompiler extends TaskCom
         new ProcessAnalyzeTable(GenTezUtils.getUtils()));
 
     opRules.put(new RuleRegExp("Remember union",
-        UnionOperator.getOperatorName() + "%"), new NodeProcessor()
-    {
-      @Override
-      public Object process(Node n, Stack<Node> s,
-          NodeProcessorCtx procCtx, Object... os) throws SemanticException {
-        GenTezProcContext context = (GenTezProcContext) procCtx;
-        UnionOperator union = (UnionOperator) n;
-
-        // simply need to remember that we've seen a union.
-        context.currentUnionOperators.add(union);
-        return null;
-      }
-    });
+        UnionOperator.getOperatorName() + "%"),
+        new UnionProcessor());
+
+    opRules.put(new RuleRegExp("AppMasterEventOperator",
+        AppMasterEventOperator.getOperatorName() + "%"),
+        new AppMasterEventProcessor());
 
     // The dispatcher fires the processor corresponding to the closest matching
     // rule and passes the context along
@@ -185,10 +359,17 @@ public class TezCompiler extends TaskCom
       GenTezUtils.getUtils().removeUnionOperators(conf, procCtx, w);
     }
 
-    // finally make sure the file sink operators are set up right
+    // then we make sure the file sink operators are set up right
     for (FileSinkOperator fileSink: procCtx.fileSinkSet) {
       GenTezUtils.getUtils().processFileSink(procCtx, fileSink);
     }
+
+    // and finally we hook up any events that need to be sent to the tez AM
+    LOG.debug("There are " + procCtx.eventOperatorSet.size() + " app master events.");
+    for (AppMasterEventOperator event : procCtx.eventOperatorSet) {
+      LOG.debug("Handling AppMasterEventOperator: " + event);
+      GenTezUtils.getUtils().processAppMasterEvent(procCtx, event);
+    }
   }
 
   @Override

Added: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/UnionProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/UnionProcessor.java?rev=1622216&view=auto
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/UnionProcessor.java (added)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/UnionProcessor.java Wed Sep  3 10:46:04 2014
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.parse;
+
+import java.util.Stack;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.UnionOperator;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+
+/**
+ * FileSinkProcessor is a simple rule to remember seen unions for later
+ * processing.
+ *
+ */
+public class UnionProcessor implements NodeProcessor {
+
+  static final private Log LOG = LogFactory.getLog(UnionProcessor.class.getName());
+
+  @Override
+  public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, Object... nodeOutputs)
+      throws SemanticException {
+    GenTezProcContext context = (GenTezProcContext) procCtx;
+    UnionOperator union = (UnionOperator) nd;
+
+    // simply need to remember that we've seen a union.
+    context.currentUnionOperators.add(union);
+    return null;
+  }
+}

Added: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/AppMasterEventDesc.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/AppMasterEventDesc.java?rev=1622216&view=auto
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/AppMasterEventDesc.java (added)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/AppMasterEventDesc.java Wed Sep  3 10:46:04 2014
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.DataOutputBuffer;
+
+
+@SuppressWarnings("serial")
+@Explain(displayName = "Application Master Event Operator")
+public class AppMasterEventDesc extends AbstractOperatorDesc {
+
+  private TableDesc table;
+  private String vertexName;
+  private String inputName;
+
+  @Explain(displayName = "Target Vertex")
+  public String getVertexName() {
+    return vertexName;
+  }
+
+  @Explain(displayName = "Target Input")
+  public String getInputName() {
+    return inputName;
+  }
+
+  public void setInputName(String inputName) {
+    this.inputName = inputName;
+  }
+
+  public void setVertexName(String vertexName) {
+    this.vertexName = vertexName;
+  }
+
+  public TableDesc getTable() {
+    return table;
+  }
+
+  public void setTable(TableDesc table) {
+    this.table = table;
+  }
+
+  public void writeEventHeader(DataOutputBuffer buffer) throws IOException {
+    // nothing to add
+  }
+}

Added: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPruningEventDesc.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPruningEventDesc.java?rev=1622216&view=auto
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPruningEventDesc.java (added)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPruningEventDesc.java Wed Sep  3 10:46:04 2014
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.io.DataOutputBuffer;
+
+@SuppressWarnings("serial")
+@Explain(displayName = "Dynamic Partitioning Event Operator")
+public class DynamicPruningEventDesc extends AppMasterEventDesc {
+
+  // column in the target table that will be pruned against
+  private String targetColumnName;
+
+  // tableScan is only available during compile
+  private transient TableScanOperator tableScan;
+
+  // the partition column we're interested in
+  private ExprNodeDesc partKey;
+
+  public TableScanOperator getTableScan() {
+    return tableScan;
+  }
+
+  public void setTableScan(TableScanOperator tableScan) {
+    this.tableScan = tableScan;
+  }
+
+  @Explain(displayName = "Target column")
+  public String getTargetColumnName() {
+    return targetColumnName;
+  }
+
+  public void setTargetColumnName(String columnName) {
+    this.targetColumnName = columnName;
+  }
+
+  @Override
+  public void writeEventHeader(DataOutputBuffer buffer) throws IOException {
+    super.writeEventHeader(buffer);
+    buffer.writeUTF(targetColumnName);
+  }
+
+  public void setPartKey(ExprNodeDesc partKey) {
+    this.partKey = partKey;
+  }
+
+  @Explain(displayName = "Partition key expr")
+  public String getPartKeyString() {
+    return this.partKey.getExprString();
+  }
+
+  public ExprNodeDesc getPartKey() {
+    return this.partKey;
+  }
+}

Added: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDynamicListDesc.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDynamicListDesc.java?rev=1622216&view=auto
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDynamicListDesc.java (added)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDynamicListDesc.java Wed Sep  3 10:46:04 2014
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+
+/**
+ * This expression represents a list that will be available at runtime.
+ */
+@SuppressWarnings("serial")
+public class ExprNodeDynamicListDesc extends ExprNodeDesc {
+
+  Operator<? extends OperatorDesc> source;
+  int keyIndex;
+
+  public ExprNodeDynamicListDesc() {
+  }
+
+  public ExprNodeDynamicListDesc(TypeInfo typeInfo, Operator<? extends OperatorDesc> source, int keyIndex) {
+    super(typeInfo);
+    this.source = source;
+    this.keyIndex = keyIndex;
+  }
+
+  public void setSource(Operator<? extends OperatorDesc> source) {
+    this.source = source;
+  }
+
+  public Operator<? extends OperatorDesc> getSource() {
+    return source;
+  }
+
+  public void setKeyIndex(int keyIndex) {
+    this.keyIndex = keyIndex;
+  }
+
+  public int getKeyIndex() {
+    return this.keyIndex;
+  }
+
+  @Override
+  public ExprNodeDesc clone() {
+    ExprNodeDynamicListDesc clone = new ExprNodeDynamicListDesc(typeInfo, source, keyIndex);
+    return clone;
+  }
+
+  @Override
+  public boolean isSame(Object o) {
+    if (o instanceof ExprNodeDynamicListDesc) {
+      return source.equals(((ExprNodeDynamicListDesc)o).getSource());
+    }
+    return false;
+  }
+
+  @Override
+  public String getExprString() {
+    return source.toString();
+  }
+
+  @Override
+  public String toString() {
+    return source.toString();
+  }
+}

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java?rev=1622216&r1=1622215&r2=1622216&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java Wed Sep  3 10:46:04 2014
@@ -26,9 +26,9 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
-import java.util.Set;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -118,6 +118,14 @@ public class MapWork extends BaseWork {
 
   private boolean dummyTableScan = false;
 
+  // used for dynamic partitioning
+  private Map<String, List<TableDesc>> eventSourceTableDescMap =
+      new LinkedHashMap<String, List<TableDesc>>();
+  private Map<String, List<String>> eventSourceColumnNameMap =
+      new LinkedHashMap<String, List<String>>();
+  private Map<String, List<ExprNodeDesc>> eventSourcePartKeyExprMap =
+      new LinkedHashMap<String, List<ExprNodeDesc>>();
+
   public MapWork() {}
 
   public MapWork(String name) {
@@ -535,4 +543,28 @@ public class MapWork extends BaseWork {
   public boolean getDummyTableScan() {
     return dummyTableScan;
   }
+
+  public void setEventSourceTableDescMap(Map<String, List<TableDesc>> map) {
+    this.eventSourceTableDescMap = map;
+  }
+
+  public Map<String, List<TableDesc>> getEventSourceTableDescMap() {
+    return eventSourceTableDescMap;
+  }
+
+  public void setEventSourceColumnNameMap(Map<String, List<String>> map) {
+    this.eventSourceColumnNameMap = map;
+  }
+
+  public Map<String, List<String>> getEventSourceColumnNameMap() {
+    return eventSourceColumnNameMap;
+  }
+
+  public Map<String, List<ExprNodeDesc>> getEventSourcePartKeyExprMap() {
+    return eventSourcePartKeyExprMap;
+  }
+
+  public void setEventSourcePartKeyExprMap(Map<String, List<ExprNodeDesc>> map) {
+    this.eventSourcePartKeyExprMap = map;
+  }
 }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java?rev=1622216&r1=1622215&r2=1622216&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java Wed Sep  3 10:46:04 2014
@@ -20,8 +20,8 @@ package org.apache.hadoop.hive.ql.plan;
 
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.LinkedHashSet;
 import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -32,13 +32,9 @@ import org.apache.hadoop.hive.ql.exec.Fi
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.OperatorUtils;
 import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.lib.NodeProcessor;
-import org.apache.hadoop.hive.ql.lib.Rule;
 import org.apache.hadoop.hive.serde2.Deserializer;
-import org.apache.hadoop.hive.serde2.SerDe;
 import org.apache.hadoop.hive.serde2.SerDeUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.mapred.JobConf;
@@ -99,7 +95,7 @@ public class ReduceWork extends BaseWork
   private ObjectInspector keyObjectInspector = null;
   private ObjectInspector valueObjectInspector = null;
 
-  private Map<String, Integer> reduceColumnNameMap = new LinkedHashMap<String, Integer>();
+  private final Map<String, Integer> reduceColumnNameMap = new LinkedHashMap<String, Integer>();
 
   /**
    * If the plan has a reducer and correspondingly a reduce-sink, then store the TableDesc pointing
@@ -118,7 +114,7 @@ public class ReduceWork extends BaseWork
   private ObjectInspector getObjectInspector(TableDesc desc) {
     ObjectInspector objectInspector;
     try {
-      Deserializer deserializer = (SerDe) ReflectionUtils.newInstance(desc
+      Deserializer deserializer = ReflectionUtils.newInstance(desc
                 .getDeserializerClass(), null);
       SerDeUtils.initializeSerDe(deserializer, null, desc.getProperties(), null);
       objectInspector = deserializer.getObjectInspector();
@@ -239,7 +235,6 @@ public class ReduceWork extends BaseWork
 
   @Override
   public void replaceRoots(Map<Operator<?>, Operator<?>> replacementMap) {
-    assert replacementMap.size() == 1;
     setReducer(replacementMap.get(getReducer()));
   }
 

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java?rev=1622216&r1=1622215&r2=1622216&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java Wed Sep  3 10:46:04 2014
@@ -906,11 +906,8 @@ public final class OpProcFactory {
     }
 
     ExprNodeDesc condn = ExprNodeDescUtils.mergePredicates(preds);
-    if(!(condn instanceof ExprNodeGenericFuncDesc)) {
-      return null;
-    }
 
-    if (op instanceof TableScanOperator) {
+    if (op instanceof TableScanOperator && condn instanceof ExprNodeGenericFuncDesc) {
       boolean pushFilterToStorage;
       HiveConf hiveConf = owi.getParseContext().getConf();
       pushFilterToStorage =

Added: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/ppd/SyntheticJoinPredicate.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/ppd/SyntheticJoinPredicate.java?rev=1622216&view=auto
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/ppd/SyntheticJoinPredicate.java (added)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/ppd/SyntheticJoinPredicate.java Wed Sep  3 10:46:04 2014
@@ -0,0 +1,283 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.ppd;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Stack;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.exec.CommonJoinOperator;
+import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
+import org.apache.hadoop.hive.ql.exec.JoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.lib.PreOrderWalker;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.ql.lib.RuleRegExp;
+import org.apache.hadoop.hive.ql.optimizer.Transform;
+import org.apache.hadoop.hive.ql.parse.OpParseContext;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.RowResolver;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDynamicListDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.FilterDesc;
+import org.apache.hadoop.hive.ql.plan.JoinCondDesc;
+import org.apache.hadoop.hive.ql.plan.JoinDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+
+/**
+ * creates synthetic predicates that represent "IN (keylist other table)"
+ */
+public class SyntheticJoinPredicate implements Transform {
+
+  private static transient Log LOG = LogFactory.getLog(SyntheticJoinPredicate.class.getName());
+
+  @Override
+  public ParseContext transform(ParseContext pctx) throws SemanticException {
+
+    if (!pctx.getConf().getVar(ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")
+        || !pctx.getConf().getBoolVar(ConfVars.TEZ_DYNAMIC_PARTITION_PRUNING)) {
+      return pctx;
+    }
+
+    Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+    opRules.put(new RuleRegExp("R1", "(" +
+        TableScanOperator.getOperatorName() + "%" + ".*" +
+        ReduceSinkOperator.getOperatorName() + "%" +
+        JoinOperator.getOperatorName() + "%)"), new JoinSynthetic());
+
+    // The dispatcher fires the processor corresponding to the closest matching
+    // rule and passes the context along
+    SyntheticContext context = new SyntheticContext(pctx);
+    Dispatcher disp = new DefaultRuleDispatcher(null, opRules, context);
+    GraphWalker ogw = new PreOrderWalker(disp);
+
+    // Create a list of top op nodes
+    List<Node> topNodes = new ArrayList<Node>();
+    topNodes.addAll(pctx.getTopOps().values());
+    ogw.startWalking(topNodes, null);
+
+    return pctx;
+  }
+
+  // insert filter operator between target(child) and input(parent)
+  private static Operator<FilterDesc> createFilter(Operator<?> target, Operator<?> parent,
+      RowResolver parentRR, ExprNodeDesc filterExpr) {
+    Operator<FilterDesc> filter = OperatorFactory.get(new FilterDesc(filterExpr, false),
+        new RowSchema(parentRR.getColumnInfos()));
+    filter.setParentOperators(new ArrayList<Operator<? extends OperatorDesc>>());
+    filter.setChildOperators(new ArrayList<Operator<? extends OperatorDesc>>());
+    filter.getParentOperators().add(parent);
+    filter.getChildOperators().add(target);
+    parent.replaceChild(target, filter);
+    target.replaceParent(parent, filter);
+    return filter;
+  }
+
+  private static class SyntheticContext implements NodeProcessorCtx {
+
+    ParseContext parseContext;
+
+    public SyntheticContext(ParseContext pCtx) {
+      parseContext = pCtx;
+    }
+
+    public ParseContext getParseContext() {
+      return parseContext;
+    }
+  }
+
+  private static class JoinSynthetic implements NodeProcessor {
+    @Override
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+        Object... nodeOutputs) throws SemanticException {
+
+      ParseContext pCtx = ((SyntheticContext) procCtx).getParseContext();
+
+      @SuppressWarnings("unchecked")
+      CommonJoinOperator<JoinDesc> join = (CommonJoinOperator<JoinDesc>) nd;
+
+      ReduceSinkOperator source = (ReduceSinkOperator) stack.get(stack.size() - 2);
+      int srcPos = join.getParentOperators().indexOf(source);
+
+      List<Operator<? extends OperatorDesc>> parents = join.getParentOperators();
+
+      int[][] targets = getTargets(join);
+
+      Operator<? extends OperatorDesc> parent = source.getParentOperators().get(0);
+      RowResolver parentRR = pCtx.getOpParseCtx().get(parent).getRowResolver();
+
+      // don't generate for null-safes.
+      if (join.getConf().getNullSafes() != null) {
+        for (boolean b : join.getConf().getNullSafes()) {
+          if (b) {
+            return null;
+          }
+        }
+      }
+
+      for (int targetPos: targets[srcPos]) {
+        if (srcPos == targetPos) {
+          continue;
+        }
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Synthetic predicate: " + srcPos + " --> " + targetPos);
+        }
+        ReduceSinkOperator target = (ReduceSinkOperator) parents.get(targetPos);
+        List<ExprNodeDesc> sourceKeys = source.getConf().getKeyCols();
+        List<ExprNodeDesc> targetKeys = target.getConf().getKeyCols();
+
+        if (sourceKeys.size() < 1) {
+          continue;
+        }
+
+        ExprNodeDesc syntheticExpr = null;
+
+        for (int i = 0; i < sourceKeys.size(); ++i) {
+          List<ExprNodeDesc> inArgs = new ArrayList<ExprNodeDesc>();
+          inArgs.add(sourceKeys.get(i));
+
+          ExprNodeDynamicListDesc dynamicExpr =
+              new ExprNodeDynamicListDesc(targetKeys.get(i).getTypeInfo(), target, i);
+
+          inArgs.add(dynamicExpr);
+
+          ExprNodeDesc syntheticInExpr =
+              ExprNodeGenericFuncDesc.newInstance(FunctionRegistry.getFunctionInfo("in")
+                  .getGenericUDF(), inArgs);
+
+          if (syntheticExpr != null) {
+            List<ExprNodeDesc> andArgs = new ArrayList<ExprNodeDesc>();
+            andArgs.add(syntheticExpr);
+            andArgs.add(syntheticInExpr);
+
+            syntheticExpr =
+                ExprNodeGenericFuncDesc.newInstance(FunctionRegistry.getFunctionInfo("and")
+                    .getGenericUDF(), andArgs);
+          } else {
+            syntheticExpr = syntheticInExpr;
+          }
+        }
+
+        Operator<FilterDesc> newFilter = createFilter(source, parent, parentRR, syntheticExpr);
+        pCtx.getOpParseCtx().put(newFilter, new OpParseContext(parentRR));
+        parent = newFilter;
+      }
+
+      return null;
+    }
+
+    // calculate filter propagation directions for each alias
+    // L<->R for inner/semi join, L<-R for left outer join, R<-L for right outer
+    // join
+    private int[][] getTargets(CommonJoinOperator<JoinDesc> join) {
+      JoinCondDesc[] conds = join.getConf().getConds();
+
+      int aliases = conds.length + 1;
+      Vectors vector = new Vectors(aliases);
+      for (JoinCondDesc cond : conds) {
+        int left = cond.getLeft();
+        int right = cond.getRight();
+        switch (cond.getType()) {
+        case JoinDesc.INNER_JOIN:
+        case JoinDesc.LEFT_SEMI_JOIN:
+          vector.add(left, right);
+          vector.add(right, left);
+          break;
+        case JoinDesc.LEFT_OUTER_JOIN:
+          vector.add(right, left);
+          break;
+        case JoinDesc.RIGHT_OUTER_JOIN:
+          vector.add(left, right);
+          break;
+        case JoinDesc.FULL_OUTER_JOIN:
+          break;
+        }
+      }
+      int[][] result = new int[aliases][];
+      for (int pos = 0 ; pos < aliases; pos++) {
+        // find all targets recursively
+        result[pos] = vector.traverse(pos);
+      }
+      return result;
+    }
+  }
+
+  private static class Vectors {
+
+    private final Set<Integer>[] vector;
+
+    @SuppressWarnings("unchecked")
+    public Vectors(int length) {
+      vector = new Set[length];
+    }
+
+    public void add(int from, int to) {
+      if (vector[from] == null) {
+        vector[from] = new HashSet<Integer>();
+      }
+      vector[from].add(to);
+    }
+
+    public int[] traverse(int pos) {
+      Set<Integer> targets = new HashSet<Integer>();
+      traverse(targets, pos);
+      return toArray(targets);
+    }
+
+    private int[] toArray(Set<Integer> values) {
+      int index = 0;
+      int[] result = new int[values.size()];
+      for (int value : values) {
+        result[index++] = value;
+      }
+      return result;
+    }
+
+    private void traverse(Set<Integer> targets, int pos) {
+      if (vector[pos] == null) {
+        return;
+      }
+      for (int target : vector[pos]) {
+        if (targets.add(target)) {
+          traverse(targets, target);
+        }
+      }
+    }
+  }
+}

Added: hive/branches/tez/ql/src/test/queries/clientpositive/dynamic_partition_pruning.q
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/test/queries/clientpositive/dynamic_partition_pruning.q?rev=1622216&view=auto
==============================================================================
--- hive/branches/tez/ql/src/test/queries/clientpositive/dynamic_partition_pruning.q (added)
+++ hive/branches/tez/ql/src/test/queries/clientpositive/dynamic_partition_pruning.q Wed Sep  3 10:46:04 2014
@@ -0,0 +1,191 @@
+set hive.optimize.ppd=true;
+set hive.ppd.remove.duplicatefilters=true;
+set hive.tez.dynamic.partition.pruning=true;
+set hive.optimize.metadataonly=false;
+set hive.optimize.index.filter=true;
+
+
+select distinct ds from srcpart;
+select distinct hr from srcpart;
+
+EXPLAIN create table srcpart_date as select ds as ds, ds as date from srcpart group by ds;
+create table srcpart_date as select ds as ds, ds as date from srcpart group by ds;
+create table srcpart_hour as select hr as hr, hr as hour from srcpart group by hr;
+create table srcpart_date_hour as select ds as ds, ds as date, hr as hr, hr as hour from srcpart group by ds, hr;
+create table srcpart_double_hour as select (hr*2) as hr, hr as hour from srcpart group by hr;
+
+-- single column, single key
+EXPLAIN select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) where srcpart_date.date = '2008-04-08';
+select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) where srcpart_date.date = '2008-04-08';
+set hive.tez.dynamic.partition.pruning=false;
+EXPLAIN select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) where srcpart_date.date = '2008-04-08';
+select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) where srcpart_date.date = '2008-04-08';
+set hive.tez.dynamic.partition.pruning=true;
+select count(*) from srcpart where ds = '2008-04-08';
+
+-- multiple sources, single key
+EXPLAIN select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) join srcpart_hour on (srcpart.hr = srcpart_hour.hr) 
+where srcpart_date.date = '2008-04-08' and srcpart_hour.hour = 11;
+select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) join srcpart_hour on (srcpart.hr = srcpart_hour.hr) 
+where srcpart_date.date = '2008-04-08' and srcpart_hour.hour = 11;
+set hive.tez.dynamic.partition.pruning=false;
+EXPLAIN select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) join srcpart_hour on (srcpart.hr = srcpart_hour.hr) 
+where srcpart_date.date = '2008-04-08' and srcpart_hour.hour = 11;
+select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) join srcpart_hour on (srcpart.hr = srcpart_hour.hr) 
+where srcpart_date.date = '2008-04-08' and srcpart_hour.hour = 11;
+set hive.tez.dynamic.partition.pruning=true;
+select count(*) from srcpart where hr = 11 and ds = '2008-04-08';
+
+-- multiple columns single source
+EXPLAIN select count(*) from srcpart join srcpart_date_hour on (srcpart.ds = srcpart_date_hour.ds and srcpart.hr = srcpart_date_hour.hr) where srcpart_date_hour.date = '2008-04-08' and srcpart_date_hour.hour = 11;
+select count(*) from srcpart join srcpart_date_hour on (srcpart.ds = srcpart_date_hour.ds and srcpart.hr = srcpart_date_hour.hr) where srcpart_date_hour.date = '2008-04-08' and srcpart_date_hour.hour = 11;
+set hive.tez.dynamic.partition.pruning=false;
+EXPLAIN select count(*) from srcpart join srcpart_date_hour on (srcpart.ds = srcpart_date_hour.ds and srcpart.hr = srcpart_date_hour.hr) where srcpart_date_hour.date = '2008-04-08' and srcpart_date_hour.hour = 11;
+select count(*) from srcpart join srcpart_date_hour on (srcpart.ds = srcpart_date_hour.ds and srcpart.hr = srcpart_date_hour.hr) where srcpart_date_hour.date = '2008-04-08' and srcpart_date_hour.hour = 11;
+set hive.tez.dynamic.partition.pruning=true;
+select count(*) from srcpart where ds = '2008-04-08' and hr = 11;
+
+-- empty set
+EXPLAIN select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) where srcpart_date.date = 'I DONT EXIST';
+select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) where srcpart_date.date = 'I DONT EXIST';
+set hive.tez.dynamic.partition.pruning=false;
+EXPLAIN select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) where srcpart_date.date = 'I DONT EXIST';
+select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) where srcpart_date.date = 'I DONT EXIST';
+set hive.tez.dynamic.partition.pruning=true;
+select count(*) from srcpart where ds = 'I DONT EXIST';
+
+-- expressions
+EXPLAIN select count(*) from srcpart join srcpart_double_hour on (srcpart.hr = cast(srcpart_double_hour.hr/2 as int)) where srcpart_double_hour.hour = 11;
+select count(*) from srcpart join srcpart_double_hour on (srcpart.hr = cast(srcpart_double_hour.hr/2 as int)) where srcpart_double_hour.hour = 11;
+EXPLAIN select count(*) from srcpart join srcpart_double_hour on (srcpart.hr*2 = srcpart_double_hour.hr) where srcpart_double_hour.hour = 11;
+select count(*) from srcpart join srcpart_double_hour on (srcpart.hr*2 = srcpart_double_hour.hr) where srcpart_double_hour.hour = 11;
+set hive.tez.dynamic.partition.pruning=false;
+EXPLAIN select count(*) from srcpart join srcpart_double_hour on (srcpart.hr = cast(srcpart_double_hour.hr/2 as int)) where srcpart_double_hour.hour = 11;
+select count(*) from srcpart join srcpart_double_hour on (srcpart.hr = cast(srcpart_double_hour.hr/2 as int)) where srcpart_double_hour.hour = 11;
+EXPLAIN select count(*) from srcpart join srcpart_double_hour on (srcpart.hr*2 = srcpart_double_hour.hr) where srcpart_double_hour.hour = 11;
+select count(*) from srcpart join srcpart_double_hour on (srcpart.hr*2 = srcpart_double_hour.hr) where srcpart_double_hour.hour = 11;
+set hive.tez.dynamic.partition.pruning=true;
+select count(*) from srcpart where hr = 11;
+EXPLAIN select count(*) from srcpart join srcpart_double_hour on (cast(srcpart.hr*2 as string) = cast(srcpart_double_hour.hr as string)) where srcpart_double_hour.hour = 11;
+select count(*) from srcpart join srcpart_double_hour on (cast(srcpart.hr*2 as string) = cast(srcpart_double_hour.hr as string)) where srcpart_double_hour.hour = 11;
+set hive.tez.dynamic.partition.pruning=true;
+select count(*) from srcpart where cast(hr as string) = 11;
+
+
+-- parent is reduce tasks
+EXPLAIN select count(*) from srcpart join (select ds as ds, ds as date from srcpart group by ds) s on (srcpart.ds = s.ds) where s.date = '2008-04-08';
+select count(*) from srcpart join (select ds as ds, ds as date from srcpart group by ds) s on (srcpart.ds = s.ds) where s.date = '2008-04-08';
+select count(*) from srcpart where ds = '2008-04-08';
+
+-- non-equi join
+EXPLAIN select count(*) from srcpart, srcpart_date_hour where (srcpart_date_hour.date = '2008-04-08' and srcpart_date_hour.hour = 11) and (srcpart.ds = srcpart_date_hour.ds or srcpart.hr = srcpart_date_hour.hr);
+select count(*) from srcpart, srcpart_date_hour where (srcpart_date_hour.date = '2008-04-08' and srcpart_date_hour.hour = 11) and (srcpart.ds = srcpart_date_hour.ds or srcpart.hr = srcpart_date_hour.hr);
+
+-- old style join syntax
+EXPLAIN select count(*) from srcpart, srcpart_date_hour where srcpart_date_hour.date = '2008-04-08' and srcpart_date_hour.hour = 11 and srcpart.ds = srcpart_date_hour.ds and srcpart.hr = srcpart_date_hour.hr;
+select count(*) from srcpart, srcpart_date_hour where srcpart_date_hour.date = '2008-04-08' and srcpart_date_hour.hour = 11 and srcpart.ds = srcpart_date_hour.ds and srcpart.hr = srcpart_date_hour.hr;
+
+-- left join
+EXPLAIN select count(*) from srcpart left join srcpart_date on (srcpart.ds = srcpart_date.ds) where srcpart_date.date = '2008-04-08';
+EXPLAIN select count(*) from srcpart_date left join srcpart on (srcpart.ds = srcpart_date.ds) where srcpart_date.date = '2008-04-08';
+
+-- full outer
+EXPLAIN select count(*) from srcpart full outer join srcpart_date on (srcpart.ds = srcpart_date.ds) where srcpart_date.date = '2008-04-08';
+
+-- with static pruning
+EXPLAIN select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) join srcpart_hour on (srcpart.hr = srcpart_hour.hr) 
+where srcpart_date.date = '2008-04-08' and srcpart_hour.hour = 11 and srcpart.hr = 11;
+select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) join srcpart_hour on (srcpart.hr = srcpart_hour.hr) 
+where srcpart_date.date = '2008-04-08' and srcpart_hour.hour = 11 and srcpart.hr = 11;
+EXPLAIN select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) join srcpart_hour on (srcpart.hr = srcpart_hour.hr) 
+where srcpart_date.date = '2008-04-08' and srcpart.hr = 13;
+select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) join srcpart_hour on (srcpart.hr = srcpart_hour.hr) 
+where srcpart_date.date = '2008-04-08' and srcpart.hr = 13;
+
+-- union + subquery
+EXPLAIN select count(*) from srcpart where srcpart.ds in (select max(srcpart.ds) from srcpart union all select min(srcpart.ds) from srcpart);
+select count(*) from srcpart where srcpart.ds in (select max(srcpart.ds) from srcpart union all select min(srcpart.ds) from srcpart);
+EXPLAIN select distinct(ds) from srcpart where srcpart.ds in (select max(srcpart.ds) from srcpart union all select min(srcpart.ds) from srcpart);
+select distinct(ds) from srcpart where srcpart.ds in (select max(srcpart.ds) from srcpart union all select min(srcpart.ds) from srcpart);
+EXPLAIN select ds from (select distinct(ds) as ds from srcpart union all select distinct(ds) as ds from srcpart) s where s.ds in (select max(srcpart.ds) from srcpart union all select min(srcpart.ds) from srcpart);
+select ds from (select distinct(ds) as ds from srcpart union all select distinct(ds) as ds from srcpart) s where s.ds in (select max(srcpart.ds) from srcpart union all select min(srcpart.ds) from srcpart);
+
+set hive.auto.convert.join=true;
+set hive.auto.convert.join.noconditionaltask = true;
+set hive.auto.convert.join.noconditionaltask.size = 10000000;
+
+-- single column, single key
+EXPLAIN select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) where srcpart_date.date = '2008-04-08';
+select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) where srcpart_date.date = '2008-04-08';
+select count(*) from srcpart where ds = '2008-04-08';
+
+-- multiple sources, single key
+EXPLAIN select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) join srcpart_hour on (srcpart.hr = srcpart_hour.hr) 
+where srcpart_date.date = '2008-04-08' and srcpart_hour.hour = 11;
+select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) join srcpart_hour on (srcpart.hr = srcpart_hour.hr) 
+where srcpart_date.date = '2008-04-08' and srcpart_hour.hour = 11;
+select count(*) from srcpart where hr = 11 and ds = '2008-04-08';
+
+-- multiple columns single source
+EXPLAIN select count(*) from srcpart join srcpart_date_hour on (srcpart.ds = srcpart_date_hour.ds and srcpart.hr = srcpart_date_hour.hr) where srcpart_date_hour.date = '2008-04-08' and srcpart_date_hour.hour = 11;
+select count(*) from srcpart join srcpart_date_hour on (srcpart.ds = srcpart_date_hour.ds and srcpart.hr = srcpart_date_hour.hr) where srcpart_date_hour.date = '2008-04-08' and srcpart_date_hour.hour = 11;
+select count(*) from srcpart where ds = '2008-04-08' and hr = 11;
+
+-- empty set
+EXPLAIN select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) where srcpart_date.date = 'I DONT EXIST';
+-- Disabled until TEZ-1486 is fixed
+-- select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) where srcpart_date.date = 'I DONT EXIST';
+
+-- expressions
+EXPLAIN select count(*) from srcpart join srcpart_double_hour on (srcpart.hr = cast(srcpart_double_hour.hr/2 as int)) where srcpart_double_hour.hour = 11;
+select count(*) from srcpart join srcpart_double_hour on (srcpart.hr = cast(srcpart_double_hour.hr/2 as int)) where srcpart_double_hour.hour = 11;
+EXPLAIN select count(*) from srcpart join srcpart_double_hour on (srcpart.hr*2 = srcpart_double_hour.hr) where srcpart_double_hour.hour = 11;
+select count(*) from srcpart join srcpart_double_hour on (srcpart.hr*2 = srcpart_double_hour.hr) where srcpart_double_hour.hour = 11;
+select count(*) from srcpart where hr = 11;
+
+-- parent is reduce tasks
+EXPLAIN select count(*) from srcpart join (select ds as ds, ds as date from srcpart group by ds) s on (srcpart.ds = s.ds) where s.date = '2008-04-08';
+select count(*) from srcpart join (select ds as ds, ds as date from srcpart group by ds) s on (srcpart.ds = s.ds) where s.date = '2008-04-08';
+select count(*) from srcpart where ds = '2008-04-08';
+
+-- left join
+EXPLAIN select count(*) from srcpart left join srcpart_date on (srcpart.ds = srcpart_date.ds) where srcpart_date.date = '2008-04-08';
+EXPLAIN select count(*) from srcpart_date left join srcpart on (srcpart.ds = srcpart_date.ds) where srcpart_date.date = '2008-04-08';
+
+-- full outer
+EXPLAIN select count(*) from srcpart full outer join srcpart_date on (srcpart.ds = srcpart_date.ds) where srcpart_date.date = '2008-04-08';
+
+-- with static pruning
+EXPLAIN select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) join srcpart_hour on (srcpart.hr = srcpart_hour.hr) 
+where srcpart_date.date = '2008-04-08' and srcpart_hour.hour = 11 and srcpart.hr = 11;
+select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) join srcpart_hour on (srcpart.hr = srcpart_hour.hr) 
+where srcpart_date.date = '2008-04-08' and srcpart_hour.hour = 11 and srcpart.hr = 11;
+EXPLAIN select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) join srcpart_hour on (srcpart.hr = srcpart_hour.hr) 
+where srcpart_date.date = '2008-04-08' and srcpart.hr = 13;
+-- Disabled until TEZ-1486 is fixed
+-- select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) join srcpart_hour on (srcpart.hr = srcpart_hour.hr) 
+-- where srcpart_date.date = '2008-04-08' and srcpart.hr = 13;
+
+-- union + subquery
+EXPLAIN select distinct(ds) from srcpart where srcpart.ds in (select max(srcpart.ds) from srcpart union all select min(srcpart.ds) from srcpart);
+select distinct(ds) from srcpart where srcpart.ds in (select max(srcpart.ds) from srcpart union all select min(srcpart.ds) from srcpart);
+
+
+-- different file format
+create table srcpart_orc (key int, value string) partitioned by (ds string, hr int) stored as orc;
+
+
+set hive.exec.dynamic.partition.mode=nonstrict;
+set hive.vectorized.execution.enabled=false;
+set hive.exec.max.dynamic.partitions=1000;
+
+insert into table srcpart_orc partition (ds, hr) select key, value, ds, hr from srcpart;
+EXPLAIN select count(*) from srcpart_orc join srcpart_date_hour on (srcpart_orc.ds = srcpart_date_hour.ds and srcpart_orc.hr = srcpart_date_hour.hr) where srcpart_date_hour.hour = 11 and (srcpart_date_hour.date = '2008-04-08' or srcpart_date_hour.date = '2008-04-09');
+select count(*) from srcpart_orc join srcpart_date_hour on (srcpart_orc.ds = srcpart_date_hour.ds and srcpart_orc.hr = srcpart_date_hour.hr) where srcpart_date_hour.hour = 11 and (srcpart_date_hour.date = '2008-04-08' or srcpart_date_hour.date = '2008-04-09');
+select count(*) from srcpart where (ds = '2008-04-08' or ds = '2008-04-09') and hr = 11;
+
+drop table srcpart_orc;
+drop table srcpart_date;
+drop table srcpart_hour;
+drop table srcpart_date_hour;
+drop table srcpart_double_hour;

Added: hive/branches/tez/ql/src/test/queries/clientpositive/dynamic_partition_pruning_2.q
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/test/queries/clientpositive/dynamic_partition_pruning_2.q?rev=1622216&view=auto
==============================================================================
--- hive/branches/tez/ql/src/test/queries/clientpositive/dynamic_partition_pruning_2.q (added)
+++ hive/branches/tez/ql/src/test/queries/clientpositive/dynamic_partition_pruning_2.q Wed Sep  3 10:46:04 2014
@@ -0,0 +1,41 @@
+set hive.optimize.ppd=true;
+set hive.ppd.remove.duplicatefilters=true;
+set hive.tez.dynamic.partition.pruning=true;
+set hive.optimize.metadataonly=false;
+set hive.optimize.index.filter=true;
+set hive.auto.convert.join=true;
+set hive.auto.convert.join.noconditionaltask = true;
+set hive.auto.convert.join.noconditionaltask.size = 10000000;
+
+create table dim_shops (id int, label string) row format delimited fields terminated by ',' stored as textfile;
+load data local inpath '../../data/files/dim_shops.txt' into table dim_shops;
+
+create table agg_01 (amount decimal) partitioned by (dim_shops_id int) row format delimited fields terminated by ',' stored as textfile;
+alter table agg_01 add partition (dim_shops_id = 1);
+alter table agg_01 add partition (dim_shops_id = 2);
+alter table agg_01 add partition (dim_shops_id = 3);
+
+load data local inpath '../../data/files/agg_01-p1.txt' into table agg_01 partition (dim_shops_id=1);
+load data local inpath '../../data/files/agg_01-p2.txt' into table agg_01 partition (dim_shops_id=2);
+load data local inpath '../../data/files/agg_01-p3.txt' into table agg_01 partition (dim_shops_id=3);
+
+select * from dim_shops;
+select * from agg_01;
+
+EXPLAIN SELECT d1.label, count(*), sum(agg.amount)
+FROM agg_01 agg,
+dim_shops d1
+WHERE agg.dim_shops_id = d1.id
+and
+d1.label in ('foo', 'bar')
+GROUP BY d1.label
+ORDER BY d1.label;
+
+SELECT d1.label, count(*), sum(agg.amount)
+FROM agg_01 agg,
+dim_shops d1
+WHERE agg.dim_shops_id = d1.id
+and
+d1.label in ('foo', 'bar')
+GROUP BY d1.label
+ORDER BY d1.label;

Modified: hive/branches/tez/ql/src/test/results/clientpositive/tez/bucket2.q.out
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/test/results/clientpositive/tez/bucket2.q.out?rev=1622216&r1=1622215&r2=1622216&view=diff
==============================================================================
Files hive/branches/tez/ql/src/test/results/clientpositive/tez/bucket2.q.out (original) and hive/branches/tez/ql/src/test/results/clientpositive/tez/bucket2.q.out Wed Sep  3 10:46:04 2014 differ

Modified: hive/branches/tez/ql/src/test/results/clientpositive/tez/bucket3.q.out
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/test/results/clientpositive/tez/bucket3.q.out?rev=1622216&r1=1622215&r2=1622216&view=diff
==============================================================================
Files hive/branches/tez/ql/src/test/results/clientpositive/tez/bucket3.q.out (original) and hive/branches/tez/ql/src/test/results/clientpositive/tez/bucket3.q.out Wed Sep  3 10:46:04 2014 differ

Modified: hive/branches/tez/ql/src/test/results/clientpositive/tez/bucket4.q.out
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/test/results/clientpositive/tez/bucket4.q.out?rev=1622216&r1=1622215&r2=1622216&view=diff
==============================================================================
Files hive/branches/tez/ql/src/test/results/clientpositive/tez/bucket4.q.out (original) and hive/branches/tez/ql/src/test/results/clientpositive/tez/bucket4.q.out Wed Sep  3 10:46:04 2014 differ

Modified: hive/branches/tez/ql/src/test/results/clientpositive/tez/cross_product_check_2.q.out
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/test/results/clientpositive/tez/cross_product_check_2.q.out?rev=1622216&r1=1622215&r2=1622216&view=diff
==============================================================================
--- hive/branches/tez/ql/src/test/results/clientpositive/tez/cross_product_check_2.q.out (original)
+++ hive/branches/tez/ql/src/test/results/clientpositive/tez/cross_product_check_2.q.out Wed Sep  3 10:46:04 2014
@@ -83,7 +83,7 @@ STAGE PLANS:
       Processor Tree:
         ListSink
 
-Warning: Map Join MAPJOIN[16][bigTable=a] in task 'Map 3' is a cross product
+Warning: Map Join MAPJOIN[18][bigTable=a] in task 'Map 3' is a cross product
 PREHOOK: query: explain select * from B d1 join B d2 on d1.key = d2.key join A
 PREHOOK: type: QUERY
 POSTHOOK: query: explain select * from B d1 join B d2 on d1.key = d2.key join A
@@ -171,7 +171,7 @@ STAGE PLANS:
       Processor Tree:
         ListSink
 
-Warning: Map Join MAPJOIN[23][bigTable=a] in task 'Map 4' is a cross product
+Warning: Map Join MAPJOIN[25][bigTable=a] in task 'Map 4' is a cross product
 PREHOOK: query: explain select * from A join 
          (select d1.key 
           from B d1 join B d2 on d1.key = d2.key 
@@ -396,7 +396,7 @@ STAGE PLANS:
       Processor Tree:
         ListSink
 
-Warning: Map Join MAPJOIN[28][bigTable=?] in task 'Reducer 5' is a cross product
+Warning: Map Join MAPJOIN[30][bigTable=?] in task 'Reducer 5' is a cross product
 PREHOOK: query: explain select * from 
 (select A.key from A group by key) ss join 
 (select d1.key from B d1 join B d2 on d1.key = d2.key where 1 = 1 group by d1.key) od1