You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2014/09/03 12:46:06 UTC
svn commit: r1622216 [2/4] - in /hive/branches/tez:
common/src/java/org/apache/hadoop/hive/conf/ data/files/ itests/qtest/
ql/if/ ql/src/gen/thrift/gen-cpp/
ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/
ql/src/gen/thrift/gen-php/ q...
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java?rev=1622216&r1=1622215&r2=1622216&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java Wed Sep 3 10:46:04 2014
@@ -22,16 +22,14 @@ import java.util.Stack;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
-import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
/**
- * FileSinkProcessor handles addition of merge, move and stats tasks for filesinks
+ * FileSinkProcessor is a simple rule to remember seen file sinks for later
+ * processing.
*
*/
public class FileSinkProcessor implements NodeProcessor {
@@ -39,12 +37,6 @@ public class FileSinkProcessor implement
static final private Log LOG = LogFactory.getLog(FileSinkProcessor.class.getName());
@Override
- /*
- * (non-Javadoc)
- * we should ideally not modify the tree we traverse.
- * However, since we need to walk the tree at any time when we modify the
- * operator, we might as well do it here.
- */
public Object process(Node nd, Stack<Node> stack,
NodeProcessorCtx procCtx, Object... nodeOutputs)
throws SemanticException {
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java?rev=1622216&r1=1622215&r2=1622216&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java Wed Sep 3 10:46:04 2014
@@ -26,29 +26,28 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator;
import org.apache.hadoop.hive.ql.exec.DependencyCollectionTask;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.exec.UnionOperator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.UnionOperator;
import org.apache.hadoop.hive.ql.exec.tez.TezTask;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork;
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.MoveWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
-import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
-import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
import org.apache.hadoop.hive.ql.plan.TezWork;
/**
@@ -134,6 +133,15 @@ public class GenTezProcContext implement
// remember which reducesinks we've already connected
public final Set<ReduceSinkOperator> connectedReduceSinks;
+ // remember the event operators we've seen
+ public final Set<AppMasterEventOperator> eventOperatorSet;
+
+ // remember the event operators we've abandoned.
+ public final Set<AppMasterEventOperator> abandonedEventOperatorSet;
+
+ // remember the connections between ts and event
+ public final Map<TableScanOperator, List<AppMasterEventOperator>> tsToEventMap;
+
@SuppressWarnings("unchecked")
public GenTezProcContext(HiveConf conf, ParseContext parseContext,
List<Task<MoveWork>> moveTask, List<Task<? extends Serializable>> rootTasks,
@@ -165,6 +173,9 @@ public class GenTezProcContext implement
this.linkedFileSinks = new LinkedHashMap<Path, List<FileSinkDesc>>();
this.fileSinkSet = new LinkedHashSet<FileSinkOperator>();
this.connectedReduceSinks = new LinkedHashSet<ReduceSinkOperator>();
+ this.eventOperatorSet = new LinkedHashSet<AppMasterEventOperator>();
+ this.abandonedEventOperatorSet = new LinkedHashSet<AppMasterEventOperator>();
+ this.tsToEventMap = new LinkedHashMap<TableScanOperator, List<AppMasterEventOperator>>();
rootTasks.add(currentTask);
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java?rev=1622216&r1=1622215&r2=1622216&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java Wed Sep 3 10:46:04 2014
@@ -20,38 +20,43 @@ package org.apache.hadoop.hive.ql.parse;
import java.util.ArrayList;
import java.util.Deque;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
-import java.util.List;
import java.util.LinkedList;
-import java.util.Map;
+import java.util.List;
import java.util.Set;
-import org.apache.hadoop.fs.Path;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator;
import org.apache.hadoop.hive.ql.exec.FetchTask;
-import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.UnionOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
import org.apache.hadoop.hive.ql.plan.TezWork;
import org.apache.hadoop.hive.ql.plan.UnionWork;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+
/**
* GenTezUtils is a collection of shared helper methods to produce
* TezWork
@@ -119,12 +124,12 @@ public class GenTezUtils {
int maxReducers = context.conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS);
// min we allow tez to pick
- int minPartition = Math.max(1, (int) (reduceSink.getConf().getNumReducers()
+ int minPartition = Math.max(1, (int) (reduceSink.getConf().getNumReducers()
* minPartitionFactor));
minPartition = (minPartition > maxReducers) ? maxReducers : minPartition;
// max we allow tez to pick
- int maxPartition = (int) (reduceSink.getConf().getNumReducers() * maxPartitionFactor);
+ int maxPartition = (int) (reduceSink.getConf().getNumReducers() * maxPartitionFactor);
maxPartition = (maxPartition > maxReducers) ? maxReducers : maxPartition;
reduceWork.setMinReduceTasks(minPartition);
@@ -210,18 +215,20 @@ public class GenTezUtils {
BaseWork work)
throws SemanticException {
- Set<Operator<?>> roots = work.getAllRootOperators();
+ List<Operator<?>> roots = new ArrayList<Operator<?>>();
+ roots.addAll(work.getAllRootOperators());
if (work.getDummyOps() != null) {
roots.addAll(work.getDummyOps());
}
+ roots.addAll(context.eventOperatorSet);
// need to clone the plan.
- Set<Operator<?>> newRoots = Utilities.cloneOperatorTree(conf, roots);
+ List<Operator<?>> newRoots = Utilities.cloneOperatorTree(conf, roots);
// we're cloning the operator plan but we're retaining the original work. That means
// that root operators have to be replaced with the cloned ops. The replacement map
// tells you what that mapping is.
- Map<Operator<?>, Operator<?>> replacementMap = new HashMap<Operator<?>, Operator<?>>();
+ BiMap<Operator<?>, Operator<?>> replacementMap = HashBiMap.create();
// there's some special handling for dummyOps required. Mapjoins won't be properly
// initialized if their dummy parents aren't initialized. Since we cloned the plan
@@ -231,11 +238,35 @@ public class GenTezUtils {
Iterator<Operator<?>> it = newRoots.iterator();
for (Operator<?> orig: roots) {
Operator<?> newRoot = it.next();
+
+ replacementMap.put(orig, newRoot);
+
if (newRoot instanceof HashTableDummyOperator) {
- dummyOps.add((HashTableDummyOperator)newRoot);
+ // dummy ops need to be updated to the cloned ones.
+ dummyOps.add((HashTableDummyOperator) newRoot);
+ it.remove();
+ } else if (newRoot instanceof AppMasterEventOperator) {
+ // event operators point to table scan operators. When cloning these we
+ // need to restore the original scan.
+ if (newRoot.getConf() instanceof DynamicPruningEventDesc) {
+ TableScanOperator ts = ((DynamicPruningEventDesc) orig.getConf()).getTableScan();
+ if (ts == null) {
+ throw new AssertionError("No table scan associated with dynamic event pruning. " + orig);
+ }
+ ((DynamicPruningEventDesc) newRoot.getConf()).setTableScan(ts);
+ }
it.remove();
} else {
- replacementMap.put(orig,newRoot);
+ if (newRoot instanceof TableScanOperator) {
+ if (context.tsToEventMap.containsKey(orig)) {
+ // we need to update event operators with the cloned table scan
+ for (AppMasterEventOperator event : context.tsToEventMap.get(orig)) {
+ ((DynamicPruningEventDesc) event.getConf()).setTableScan((TableScanOperator) newRoot);
+ }
+ }
+ }
+ context.rootToWorkMap.remove(orig);
+ context.rootToWorkMap.put(newRoot, work);
}
}
@@ -272,6 +303,15 @@ public class GenTezUtils {
desc.setLinkedFileSinkDesc(linked);
}
+ if (current instanceof AppMasterEventOperator) {
+ // remember for additional processing later
+ context.eventOperatorSet.add((AppMasterEventOperator) current);
+
+ // mark the original as abandoned. Don't need it anymore.
+ context.abandonedEventOperatorSet.add((AppMasterEventOperator) replacementMap.inverse()
+ .get(current));
+ }
+
if (current instanceof UnionOperator) {
Operator<?> parent = null;
int count = 0;
@@ -337,4 +377,87 @@ public class GenTezUtils {
}
}
}
+
+ /**
+ * processAppMasterEvent sets up the event descriptor and the MapWork.
+ *
+ * @param procCtx
+ * @param event
+ */
+ public void processAppMasterEvent(GenTezProcContext procCtx, AppMasterEventOperator event) {
+
+ if (procCtx.abandonedEventOperatorSet.contains(event)) {
+ // don't need this anymore
+ return;
+ }
+
+ DynamicPruningEventDesc eventDesc = (DynamicPruningEventDesc)event.getConf();
+ TableScanOperator ts = eventDesc.getTableScan();
+
+ MapWork work = (MapWork) procCtx.rootToWorkMap.get(ts);
+ if (work == null) {
+ throw new AssertionError("No work found for tablescan " + ts);
+ }
+
+ BaseWork enclosingWork = getEnclosingWork(event, procCtx);
+ if (enclosingWork == null) {
+ throw new AssertionError("Cannot find work for operator" + event);
+ }
+ String sourceName = enclosingWork.getName();
+
+ // store the vertex name in the operator pipeline
+ eventDesc.setVertexName(work.getName());
+ eventDesc.setInputName(work.getAliases().get(0));
+
+ // store table descriptor in map-work
+ if (!work.getEventSourceTableDescMap().containsKey(sourceName)) {
+ work.getEventSourceTableDescMap().put(sourceName, new LinkedList<TableDesc>());
+ }
+ List<TableDesc> tables = work.getEventSourceTableDescMap().get(sourceName);
+ tables.add(event.getConf().getTable());
+
+ // store column name in map-work
+ if (!work.getEventSourceColumnNameMap().containsKey(sourceName)) {
+ work.getEventSourceColumnNameMap().put(sourceName, new LinkedList<String>());
+ }
+ List<String> columns = work.getEventSourceColumnNameMap().get(sourceName);
+ columns.add(eventDesc.getTargetColumnName());
+
+ // store partition key expr in map-work
+ if (!work.getEventSourcePartKeyExprMap().containsKey(sourceName)) {
+ work.getEventSourcePartKeyExprMap().put(sourceName, new LinkedList<ExprNodeDesc>());
+ }
+ List<ExprNodeDesc> keys = work.getEventSourcePartKeyExprMap().get(sourceName);
+ keys.add(eventDesc.getPartKey());
+
+ }
+
+ /**
+ * getEncosingWork finds the BaseWork any given operator belongs to.
+ */
+ public BaseWork getEnclosingWork(Operator<?> op, GenTezProcContext procCtx) {
+ List<Operator<?>> ops = new ArrayList<Operator<?>>();
+ findRoots(op, ops);
+ for (Operator<?> r : ops) {
+ BaseWork work = procCtx.rootToWorkMap.get(r);
+ if (work != null) {
+ return work;
+ }
+ }
+ return null;
+ }
+
+ /*
+ * findRoots returns all root operators (in ops) that result in operator op
+ */
+ private void findRoots(Operator<?> op, List<Operator<?>> ops) {
+ List<Operator<?>> parents = op.getParentOperators();
+ if (parents == null || parents.isEmpty()) {
+ ops.add(op);
+ return;
+ }
+ for (Operator<?> p : parents) {
+ findRoots(p, ops);
+ }
+ }
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/OptimizeTezProcContext.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/OptimizeTezProcContext.java?rev=1622216&r1=1622215&r2=1622216&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/OptimizeTezProcContext.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/OptimizeTezProcContext.java Wed Sep 3 10:46:04 2014
@@ -23,13 +23,18 @@ import java.util.HashSet;
import java.util.Set;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+
/**
* OptimizeTezProcContext. OptimizeTezProcContext maintains information
* about the current operator plan as we walk the operator tree
@@ -47,19 +52,23 @@ public class OptimizeTezProcContext impl
public final Set<ReduceSinkOperator> visitedReduceSinks
= new HashSet<ReduceSinkOperator>();
+ public final Multimap<AppMasterEventOperator, TableScanOperator> eventOpToTableScanMap =
+ HashMultimap.create();
+
// rootOperators are all the table scan operators in sequence
// of traversal
- public final Deque<Operator<? extends OperatorDesc>> rootOperators;
+ public Deque<Operator<? extends OperatorDesc>> rootOperators;
- @SuppressWarnings("unchecked")
- public OptimizeTezProcContext(HiveConf conf, ParseContext parseContext,
- Set<ReadEntity> inputs, Set<WriteEntity> outputs,
- Deque<Operator<?>> rootOperators) {
+ public OptimizeTezProcContext(HiveConf conf, ParseContext parseContext, Set<ReadEntity> inputs,
+ Set<WriteEntity> outputs) {
this.conf = conf;
this.parseContext = parseContext;
this.inputs = inputs;
this.outputs = outputs;
- this.rootOperators = rootOperators;
+ }
+
+ public void setRootOperators(Deque<Operator<? extends OperatorDesc>> roots) {
+ this.rootOperators = roots;
}
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java?rev=1622216&r1=1622215&r2=1622216&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java Wed Sep 3 10:46:04 2014
@@ -21,20 +21,24 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Stack;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator;
import org.apache.hadoop.hive.ql.exec.ConditionalTask;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.FilterOperator;
import org.apache.hadoop.hive.ql.exec.JoinOperator;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
@@ -52,20 +56,25 @@ import org.apache.hadoop.hive.ql.lib.For
import org.apache.hadoop.hive.ql.lib.GraphWalker;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
-import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
import org.apache.hadoop.hive.ql.lib.Rule;
import org.apache.hadoop.hive.ql.lib.RuleRegExp;
import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.optimizer.ConstantPropagate;
import org.apache.hadoop.hive.ql.optimizer.ConvertJoinMapJoin;
+import org.apache.hadoop.hive.ql.optimizer.DynamicPartitionPruningOptimization;
import org.apache.hadoop.hive.ql.optimizer.ReduceSinkMapJoinProc;
+import org.apache.hadoop.hive.ql.optimizer.RemoveDynamicPruningBySize;
import org.apache.hadoop.hive.ql.optimizer.SetReducerParallelism;
+import org.apache.hadoop.hive.ql.optimizer.metainfo.annotation.AnnotateWithOpTraits;
import org.apache.hadoop.hive.ql.optimizer.physical.CrossProductCheck;
import org.apache.hadoop.hive.ql.optimizer.physical.MetadataOnlyOptimizer;
import org.apache.hadoop.hive.ql.optimizer.physical.NullScanOptimizer;
import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext;
-import org.apache.hadoop.hive.ql.optimizer.physical.Vectorizer;
import org.apache.hadoop.hive.ql.optimizer.physical.StageIDsRearranger;
+import org.apache.hadoop.hive.ql.optimizer.physical.Vectorizer;
+import org.apache.hadoop.hive.ql.optimizer.stats.annotation.AnnotateWithStatistics;
import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MoveWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -85,7 +94,7 @@ public class TezCompiler extends TaskCom
@Override
public void init(HiveConf conf, LogHelper console, Hive db) {
super.init(conf, console, db);
-
+
// Tez requires us to use RPC for the query plan
HiveConf.setBoolVar(conf, ConfVars.HIVE_RPC_QUERY_PLAN, true);
@@ -98,31 +107,203 @@ public class TezCompiler extends TaskCom
protected void optimizeOperatorPlan(ParseContext pCtx, Set<ReadEntity> inputs,
Set<WriteEntity> outputs) throws SemanticException {
- // Sequence of TableScan operators to be walked
+ // Create the context for the walker
+ OptimizeTezProcContext procCtx = new OptimizeTezProcContext(conf, pCtx, inputs, outputs);
+
+ // setup dynamic partition pruning where possible
+ runDynamicPartitionPruning(procCtx, inputs, outputs);
+
+ // setup stats in the operator plan
+ runStatsAnnotation(procCtx);
+
+ // run the optimizations that use stats for optimization
+ runStatsDependentOptimizations(procCtx, inputs, outputs);
+
+ // after the stats phase we might have some cyclic dependencies that we need
+ // to take care of.
+ runCycleAnalysisForPartitionPruning(procCtx, inputs, outputs);
+
+ }
+
+ private void runCycleAnalysisForPartitionPruning(OptimizeTezProcContext procCtx,
+ Set<ReadEntity> inputs, Set<WriteEntity> outputs) throws SemanticException {
+
+ if (!procCtx.conf.getBoolVar(ConfVars.TEZ_DYNAMIC_PARTITION_PRUNING)) {
+ return;
+ }
+
+ boolean cycleFree = false;
+ while (!cycleFree) {
+ cycleFree = true;
+ Set<Set<Operator<?>>> components = getComponents(procCtx);
+ for (Set<Operator<?>> component : components) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Component: ");
+ for (Operator<?> co : component) {
+ LOG.debug("Operator: " + co.getName() + ", " + co.getIdentifier());
+ }
+ }
+ if (component.size() != 1) {
+ LOG.info("Found cycle in operator plan...");
+ cycleFree = false;
+ removeEventOperator(component);
+ }
+ }
+ LOG.info("Cycle free: " + cycleFree);
+ }
+ }
+
+ private void removeEventOperator(Set<Operator<?>> component) {
+ AppMasterEventOperator victim = null;
+ for (Operator<?> o : component) {
+ if (o instanceof AppMasterEventOperator) {
+ if (victim == null
+ || o.getConf().getStatistics().getDataSize() < victim.getConf().getStatistics()
+ .getDataSize()) {
+ victim = (AppMasterEventOperator) o;
+ }
+ }
+ }
+
+ Operator<?> child = victim;
+ Operator<?> curr = victim;
+
+ while (curr.getChildOperators().size() <= 1) {
+ child = curr;
+ curr = curr.getParentOperators().get(0);
+ }
+
+ // at this point we've found the fork in the op pipeline that has the
+ // pruning as a child plan.
+ LOG.info("Disabling dynamic pruning for: "
+ + ((DynamicPruningEventDesc) victim.getConf()).getTableScan().toString()
+ + ". Needed to break cyclic dependency");
+ curr.removeChild(child);
+ }
+
+ // Tarjan's algo
+ private Set<Set<Operator<?>>> getComponents(OptimizeTezProcContext procCtx) {
Deque<Operator<?>> deque = new LinkedList<Operator<?>>();
- deque.addAll(pCtx.getTopOps().values());
+ deque.addAll(procCtx.parseContext.getTopOps().values());
- // Create the context for the walker
- OptimizeTezProcContext procCtx
- = new OptimizeTezProcContext(conf, pCtx, inputs, outputs, deque);
+ AtomicInteger index = new AtomicInteger();
+ Map<Operator<?>, Integer> indexes = new HashMap<Operator<?>, Integer>();
+ Map<Operator<?>, Integer> lowLinks = new HashMap<Operator<?>, Integer>();
+ Stack<Operator<?>> nodes = new Stack<Operator<?>>();
+ Set<Set<Operator<?>>> components = new HashSet<Set<Operator<?>>>();
+
+ for (Operator<?> o : deque) {
+ if (!indexes.containsKey(o)) {
+ connect(o, index, nodes, indexes, lowLinks, components);
+ }
+ }
+
+ return components;
+ }
+
+ private void connect(Operator<?> o, AtomicInteger index, Stack<Operator<?>> nodes,
+ Map<Operator<?>, Integer> indexes, Map<Operator<?>, Integer> lowLinks,
+ Set<Set<Operator<?>>> components) {
+
+ indexes.put(o, index.get());
+ lowLinks.put(o, index.get());
+ index.incrementAndGet();
+ nodes.push(o);
+
+ List<Operator<?>> children;
+ if (o instanceof AppMasterEventOperator) {
+ children = new ArrayList<Operator<?>>();
+ children.addAll(o.getChildOperators());
+ TableScanOperator ts = ((DynamicPruningEventDesc) o.getConf()).getTableScan();
+ LOG.debug("Adding special edge: " + o.getName() + " --> " + ts.toString());
+ children.add(ts);
+ } else {
+ children = o.getChildOperators();
+ }
+
+ for (Operator<?> child : children) {
+ if (!indexes.containsKey(child)) {
+ connect(child, index, nodes, indexes, lowLinks, components);
+ lowLinks.put(child, Math.min(lowLinks.get(o), lowLinks.get(child)));
+ } else if (nodes.contains(child)) {
+ lowLinks.put(o, Math.min(lowLinks.get(o), indexes.get(child)));
+ }
+ }
+
+ if (lowLinks.get(o).equals(indexes.get(o))) {
+ Set<Operator<?>> component = new HashSet<Operator<?>>();
+ components.add(component);
+ Operator<?> current;
+ do {
+ current = nodes.pop();
+ component.add(current);
+ } while (current != o);
+ }
+ }
+
+ private void runStatsAnnotation(OptimizeTezProcContext procCtx) throws SemanticException {
+ new AnnotateWithStatistics().transform(procCtx.parseContext);
+ new AnnotateWithOpTraits().transform(procCtx.parseContext);
+ }
+
+ private void runStatsDependentOptimizations(OptimizeTezProcContext procCtx,
+ Set<ReadEntity> inputs, Set<WriteEntity> outputs) throws SemanticException {
+
+ // Sequence of TableScan operators to be walked
+ Deque<Operator<?>> deque = new LinkedList<Operator<?>>();
+ deque.addAll(procCtx.parseContext.getTopOps().values());
// create a walker which walks the tree in a DFS manner while maintaining
// the operator stack.
Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
- opRules.put(new RuleRegExp(new String("Set parallelism - ReduceSink"),
+ opRules.put(new RuleRegExp("Set parallelism - ReduceSink",
ReduceSinkOperator.getOperatorName() + "%"),
new SetReducerParallelism());
- opRules.put(new RuleRegExp(new String("Convert Join to Map-join"),
+ opRules.put(new RuleRegExp("Convert Join to Map-join",
JoinOperator.getOperatorName() + "%"), new ConvertJoinMapJoin());
+ opRules.put(
+ new RuleRegExp("Remove dynamic pruning by size",
+ AppMasterEventOperator.getOperatorName() + "%"),
+ new RemoveDynamicPruningBySize());
+
// The dispatcher fires the processor corresponding to the closest matching
// rule and passes the context along
Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx);
List<Node> topNodes = new ArrayList<Node>();
- topNodes.addAll(pCtx.getTopOps().values());
+ topNodes.addAll(procCtx.parseContext.getTopOps().values());
+ GraphWalker ogw = new ForwardWalker(disp);
+ ogw.startWalking(topNodes, null);
+ }
+
+ private void runDynamicPartitionPruning(OptimizeTezProcContext procCtx, Set<ReadEntity> inputs,
+ Set<WriteEntity> outputs) throws SemanticException {
+
+ if (!procCtx.conf.getBoolVar(ConfVars.TEZ_DYNAMIC_PARTITION_PRUNING)) {
+ return;
+ }
+
+ // Sequence of TableScan operators to be walked
+ Deque<Operator<?>> deque = new LinkedList<Operator<?>>();
+ deque.addAll(procCtx.parseContext.getTopOps().values());
+
+ Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+ opRules.put(
+ new RuleRegExp(new String("Dynamic Partition Pruning"), FilterOperator.getOperatorName()
+ + "%"), new DynamicPartitionPruningOptimization());
+
+ // The dispatcher fires the processor corresponding to the closest matching
+ // rule and passes the context along
+ Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx);
+ List<Node> topNodes = new ArrayList<Node>();
+ topNodes.addAll(procCtx.parseContext.getTopOps().values());
GraphWalker ogw = new ForwardWalker(disp);
ogw.startWalking(topNodes, null);
+
+ // need a new run of the constant folding because we might have created lots
+ // of "and true and true" conditions.
+ new ConstantPropagate().transform(procCtx.parseContext);
}
@Override
@@ -158,19 +339,12 @@ public class TezCompiler extends TaskCom
new ProcessAnalyzeTable(GenTezUtils.getUtils()));
opRules.put(new RuleRegExp("Remember union",
- UnionOperator.getOperatorName() + "%"), new NodeProcessor()
- {
- @Override
- public Object process(Node n, Stack<Node> s,
- NodeProcessorCtx procCtx, Object... os) throws SemanticException {
- GenTezProcContext context = (GenTezProcContext) procCtx;
- UnionOperator union = (UnionOperator) n;
-
- // simply need to remember that we've seen a union.
- context.currentUnionOperators.add(union);
- return null;
- }
- });
+ UnionOperator.getOperatorName() + "%"),
+ new UnionProcessor());
+
+ opRules.put(new RuleRegExp("AppMasterEventOperator",
+ AppMasterEventOperator.getOperatorName() + "%"),
+ new AppMasterEventProcessor());
// The dispatcher fires the processor corresponding to the closest matching
// rule and passes the context along
@@ -185,10 +359,17 @@ public class TezCompiler extends TaskCom
GenTezUtils.getUtils().removeUnionOperators(conf, procCtx, w);
}
- // finally make sure the file sink operators are set up right
+ // then we make sure the file sink operators are set up right
for (FileSinkOperator fileSink: procCtx.fileSinkSet) {
GenTezUtils.getUtils().processFileSink(procCtx, fileSink);
}
+
+ // and finally we hook up any events that need to be sent to the tez AM
+ LOG.debug("There are " + procCtx.eventOperatorSet.size() + " app master events.");
+ for (AppMasterEventOperator event : procCtx.eventOperatorSet) {
+ LOG.debug("Handling AppMasterEventOperator: " + event);
+ GenTezUtils.getUtils().processAppMasterEvent(procCtx, event);
+ }
}
@Override
Added: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/UnionProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/UnionProcessor.java?rev=1622216&view=auto
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/UnionProcessor.java (added)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/UnionProcessor.java Wed Sep 3 10:46:04 2014
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.parse;
+
+import java.util.Stack;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.UnionOperator;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+
+/**
+ * FileSinkProcessor is a simple rule to remember seen unions for later
+ * processing.
+ *
+ */
+public class UnionProcessor implements NodeProcessor {
+
+ static final private Log LOG = LogFactory.getLog(UnionProcessor.class.getName());
+
+ @Override
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, Object... nodeOutputs)
+ throws SemanticException {
+ GenTezProcContext context = (GenTezProcContext) procCtx;
+ UnionOperator union = (UnionOperator) nd;
+
+ // simply need to remember that we've seen a union.
+ context.currentUnionOperators.add(union);
+ return null;
+ }
+}
Added: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/AppMasterEventDesc.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/AppMasterEventDesc.java?rev=1622216&view=auto
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/AppMasterEventDesc.java (added)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/AppMasterEventDesc.java Wed Sep 3 10:46:04 2014
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.DataOutputBuffer;
+
+
+@SuppressWarnings("serial")
+@Explain(displayName = "Application Master Event Operator")
+public class AppMasterEventDesc extends AbstractOperatorDesc {
+
+ private TableDesc table;
+ private String vertexName;
+ private String inputName;
+
+ @Explain(displayName = "Target Vertex")
+ public String getVertexName() {
+ return vertexName;
+ }
+
+ @Explain(displayName = "Target Input")
+ public String getInputName() {
+ return inputName;
+ }
+
+ public void setInputName(String inputName) {
+ this.inputName = inputName;
+ }
+
+ public void setVertexName(String vertexName) {
+ this.vertexName = vertexName;
+ }
+
+ public TableDesc getTable() {
+ return table;
+ }
+
+ public void setTable(TableDesc table) {
+ this.table = table;
+ }
+
+ public void writeEventHeader(DataOutputBuffer buffer) throws IOException {
+ // nothing to add
+ }
+}
Added: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPruningEventDesc.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPruningEventDesc.java?rev=1622216&view=auto
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPruningEventDesc.java (added)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPruningEventDesc.java Wed Sep 3 10:46:04 2014
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.io.DataOutputBuffer;
+
+@SuppressWarnings("serial")
+@Explain(displayName = "Dynamic Partitioning Event Operator")
+public class DynamicPruningEventDesc extends AppMasterEventDesc {
+
+ // column in the target table that will be pruned against
+ private String targetColumnName;
+
+ // tableScan is only available during compile
+ private transient TableScanOperator tableScan;
+
+ // the partition column we're interested in
+ private ExprNodeDesc partKey;
+
+ public TableScanOperator getTableScan() {
+ return tableScan;
+ }
+
+ public void setTableScan(TableScanOperator tableScan) {
+ this.tableScan = tableScan;
+ }
+
+ @Explain(displayName = "Target column")
+ public String getTargetColumnName() {
+ return targetColumnName;
+ }
+
+ public void setTargetColumnName(String columnName) {
+ this.targetColumnName = columnName;
+ }
+
+ @Override
+ public void writeEventHeader(DataOutputBuffer buffer) throws IOException {
+ super.writeEventHeader(buffer);
+ buffer.writeUTF(targetColumnName);
+ }
+
+ public void setPartKey(ExprNodeDesc partKey) {
+ this.partKey = partKey;
+ }
+
+ @Explain(displayName = "Partition key expr")
+ public String getPartKeyString() {
+ return this.partKey.getExprString();
+ }
+
+ public ExprNodeDesc getPartKey() {
+ return this.partKey;
+ }
+}
Added: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDynamicListDesc.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDynamicListDesc.java?rev=1622216&view=auto
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDynamicListDesc.java (added)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDynamicListDesc.java Wed Sep 3 10:46:04 2014
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+
+/**
+ * This expression represents a list that will be available at runtime.
+ */
+@SuppressWarnings("serial")
+public class ExprNodeDynamicListDesc extends ExprNodeDesc {
+
+ Operator<? extends OperatorDesc> source;
+ int keyIndex;
+
+ public ExprNodeDynamicListDesc() {
+ }
+
+ public ExprNodeDynamicListDesc(TypeInfo typeInfo, Operator<? extends OperatorDesc> source, int keyIndex) {
+ super(typeInfo);
+ this.source = source;
+ this.keyIndex = keyIndex;
+ }
+
+ public void setSource(Operator<? extends OperatorDesc> source) {
+ this.source = source;
+ }
+
+ public Operator<? extends OperatorDesc> getSource() {
+ return source;
+ }
+
+ public void setKeyIndex(int keyIndex) {
+ this.keyIndex = keyIndex;
+ }
+
+ public int getKeyIndex() {
+ return this.keyIndex;
+ }
+
+ @Override
+ public ExprNodeDesc clone() {
+ ExprNodeDynamicListDesc clone = new ExprNodeDynamicListDesc(typeInfo, source, keyIndex);
+ return clone;
+ }
+
+ @Override
+ public boolean isSame(Object o) {
+ if (o instanceof ExprNodeDynamicListDesc) {
+ return source.equals(((ExprNodeDynamicListDesc)o).getSource());
+ }
+ return false;
+ }
+
+ @Override
+ public String getExprString() {
+ return source.toString();
+ }
+
+ @Override
+ public String toString() {
+ return source.toString();
+ }
+}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java?rev=1622216&r1=1622215&r2=1622216&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java Wed Sep 3 10:46:04 2014
@@ -26,9 +26,9 @@ import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
-import java.util.Set;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -118,6 +118,14 @@ public class MapWork extends BaseWork {
private boolean dummyTableScan = false;
+ // used for dynamic partitioning
+ private Map<String, List<TableDesc>> eventSourceTableDescMap =
+ new LinkedHashMap<String, List<TableDesc>>();
+ private Map<String, List<String>> eventSourceColumnNameMap =
+ new LinkedHashMap<String, List<String>>();
+ private Map<String, List<ExprNodeDesc>> eventSourcePartKeyExprMap =
+ new LinkedHashMap<String, List<ExprNodeDesc>>();
+
public MapWork() {}
public MapWork(String name) {
@@ -535,4 +543,28 @@ public class MapWork extends BaseWork {
public boolean getDummyTableScan() {
return dummyTableScan;
}
+
+ public void setEventSourceTableDescMap(Map<String, List<TableDesc>> map) {
+ this.eventSourceTableDescMap = map;
+ }
+
+ public Map<String, List<TableDesc>> getEventSourceTableDescMap() {
+ return eventSourceTableDescMap;
+ }
+
+ public void setEventSourceColumnNameMap(Map<String, List<String>> map) {
+ this.eventSourceColumnNameMap = map;
+ }
+
+ public Map<String, List<String>> getEventSourceColumnNameMap() {
+ return eventSourceColumnNameMap;
+ }
+
+ public Map<String, List<ExprNodeDesc>> getEventSourcePartKeyExprMap() {
+ return eventSourcePartKeyExprMap;
+ }
+
+ public void setEventSourcePartKeyExprMap(Map<String, List<ExprNodeDesc>> map) {
+ this.eventSourcePartKeyExprMap = map;
+ }
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java?rev=1622216&r1=1622215&r2=1622216&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java Wed Sep 3 10:46:04 2014
@@ -20,8 +20,8 @@ package org.apache.hadoop.hive.ql.plan;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.LinkedHashSet;
import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -32,13 +32,9 @@ import org.apache.hadoop.hive.ql.exec.Fi
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.OperatorUtils;
import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.lib.NodeProcessor;
-import org.apache.hadoop.hive.ql.lib.Rule;
import org.apache.hadoop.hive.serde2.Deserializer;
-import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.mapred.JobConf;
@@ -99,7 +95,7 @@ public class ReduceWork extends BaseWork
private ObjectInspector keyObjectInspector = null;
private ObjectInspector valueObjectInspector = null;
- private Map<String, Integer> reduceColumnNameMap = new LinkedHashMap<String, Integer>();
+ private final Map<String, Integer> reduceColumnNameMap = new LinkedHashMap<String, Integer>();
/**
* If the plan has a reducer and correspondingly a reduce-sink, then store the TableDesc pointing
@@ -118,7 +114,7 @@ public class ReduceWork extends BaseWork
private ObjectInspector getObjectInspector(TableDesc desc) {
ObjectInspector objectInspector;
try {
- Deserializer deserializer = (SerDe) ReflectionUtils.newInstance(desc
+ Deserializer deserializer = ReflectionUtils.newInstance(desc
.getDeserializerClass(), null);
SerDeUtils.initializeSerDe(deserializer, null, desc.getProperties(), null);
objectInspector = deserializer.getObjectInspector();
@@ -239,7 +235,6 @@ public class ReduceWork extends BaseWork
@Override
public void replaceRoots(Map<Operator<?>, Operator<?>> replacementMap) {
- assert replacementMap.size() == 1;
setReducer(replacementMap.get(getReducer()));
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java?rev=1622216&r1=1622215&r2=1622216&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java Wed Sep 3 10:46:04 2014
@@ -906,11 +906,8 @@ public final class OpProcFactory {
}
ExprNodeDesc condn = ExprNodeDescUtils.mergePredicates(preds);
- if(!(condn instanceof ExprNodeGenericFuncDesc)) {
- return null;
- }
- if (op instanceof TableScanOperator) {
+ if (op instanceof TableScanOperator && condn instanceof ExprNodeGenericFuncDesc) {
boolean pushFilterToStorage;
HiveConf hiveConf = owi.getParseContext().getConf();
pushFilterToStorage =
Added: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/ppd/SyntheticJoinPredicate.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/ppd/SyntheticJoinPredicate.java?rev=1622216&view=auto
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/ppd/SyntheticJoinPredicate.java (added)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/ppd/SyntheticJoinPredicate.java Wed Sep 3 10:46:04 2014
@@ -0,0 +1,283 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.ppd;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Stack;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.exec.CommonJoinOperator;
+import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
+import org.apache.hadoop.hive.ql.exec.JoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.lib.PreOrderWalker;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.ql.lib.RuleRegExp;
+import org.apache.hadoop.hive.ql.optimizer.Transform;
+import org.apache.hadoop.hive.ql.parse.OpParseContext;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.RowResolver;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDynamicListDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.FilterDesc;
+import org.apache.hadoop.hive.ql.plan.JoinCondDesc;
+import org.apache.hadoop.hive.ql.plan.JoinDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+
+/**
+ * creates synthetic predicates that represent "IN (keylist other table)"
+ */
+public class SyntheticJoinPredicate implements Transform {
+
+ private static transient Log LOG = LogFactory.getLog(SyntheticJoinPredicate.class.getName());
+
+ @Override
+ public ParseContext transform(ParseContext pctx) throws SemanticException {
+
+ if (!pctx.getConf().getVar(ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")
+ || !pctx.getConf().getBoolVar(ConfVars.TEZ_DYNAMIC_PARTITION_PRUNING)) {
+ return pctx;
+ }
+
+ Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+ opRules.put(new RuleRegExp("R1", "(" +
+ TableScanOperator.getOperatorName() + "%" + ".*" +
+ ReduceSinkOperator.getOperatorName() + "%" +
+ JoinOperator.getOperatorName() + "%)"), new JoinSynthetic());
+
+ // The dispatcher fires the processor corresponding to the closest matching
+ // rule and passes the context along
+ SyntheticContext context = new SyntheticContext(pctx);
+ Dispatcher disp = new DefaultRuleDispatcher(null, opRules, context);
+ GraphWalker ogw = new PreOrderWalker(disp);
+
+ // Create a list of top op nodes
+ List<Node> topNodes = new ArrayList<Node>();
+ topNodes.addAll(pctx.getTopOps().values());
+ ogw.startWalking(topNodes, null);
+
+ return pctx;
+ }
+
+ // insert filter operator between target(child) and input(parent)
+ private static Operator<FilterDesc> createFilter(Operator<?> target, Operator<?> parent,
+ RowResolver parentRR, ExprNodeDesc filterExpr) {
+ Operator<FilterDesc> filter = OperatorFactory.get(new FilterDesc(filterExpr, false),
+ new RowSchema(parentRR.getColumnInfos()));
+ filter.setParentOperators(new ArrayList<Operator<? extends OperatorDesc>>());
+ filter.setChildOperators(new ArrayList<Operator<? extends OperatorDesc>>());
+ filter.getParentOperators().add(parent);
+ filter.getChildOperators().add(target);
+ parent.replaceChild(target, filter);
+ target.replaceParent(parent, filter);
+ return filter;
+ }
+
+ private static class SyntheticContext implements NodeProcessorCtx {
+
+ ParseContext parseContext;
+
+ public SyntheticContext(ParseContext pCtx) {
+ parseContext = pCtx;
+ }
+
+ public ParseContext getParseContext() {
+ return parseContext;
+ }
+ }
+
+ private static class JoinSynthetic implements NodeProcessor {
+ @Override
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+
+ ParseContext pCtx = ((SyntheticContext) procCtx).getParseContext();
+
+ @SuppressWarnings("unchecked")
+ CommonJoinOperator<JoinDesc> join = (CommonJoinOperator<JoinDesc>) nd;
+
+ ReduceSinkOperator source = (ReduceSinkOperator) stack.get(stack.size() - 2);
+ int srcPos = join.getParentOperators().indexOf(source);
+
+ List<Operator<? extends OperatorDesc>> parents = join.getParentOperators();
+
+ int[][] targets = getTargets(join);
+
+ Operator<? extends OperatorDesc> parent = source.getParentOperators().get(0);
+ RowResolver parentRR = pCtx.getOpParseCtx().get(parent).getRowResolver();
+
+ // don't generate for null-safes.
+ if (join.getConf().getNullSafes() != null) {
+ for (boolean b : join.getConf().getNullSafes()) {
+ if (b) {
+ return null;
+ }
+ }
+ }
+
+ for (int targetPos: targets[srcPos]) {
+ if (srcPos == targetPos) {
+ continue;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Synthetic predicate: " + srcPos + " --> " + targetPos);
+ }
+ ReduceSinkOperator target = (ReduceSinkOperator) parents.get(targetPos);
+ List<ExprNodeDesc> sourceKeys = source.getConf().getKeyCols();
+ List<ExprNodeDesc> targetKeys = target.getConf().getKeyCols();
+
+ if (sourceKeys.size() < 1) {
+ continue;
+ }
+
+ ExprNodeDesc syntheticExpr = null;
+
+ for (int i = 0; i < sourceKeys.size(); ++i) {
+ List<ExprNodeDesc> inArgs = new ArrayList<ExprNodeDesc>();
+ inArgs.add(sourceKeys.get(i));
+
+ ExprNodeDynamicListDesc dynamicExpr =
+ new ExprNodeDynamicListDesc(targetKeys.get(i).getTypeInfo(), target, i);
+
+ inArgs.add(dynamicExpr);
+
+ ExprNodeDesc syntheticInExpr =
+ ExprNodeGenericFuncDesc.newInstance(FunctionRegistry.getFunctionInfo("in")
+ .getGenericUDF(), inArgs);
+
+ if (syntheticExpr != null) {
+ List<ExprNodeDesc> andArgs = new ArrayList<ExprNodeDesc>();
+ andArgs.add(syntheticExpr);
+ andArgs.add(syntheticInExpr);
+
+ syntheticExpr =
+ ExprNodeGenericFuncDesc.newInstance(FunctionRegistry.getFunctionInfo("and")
+ .getGenericUDF(), andArgs);
+ } else {
+ syntheticExpr = syntheticInExpr;
+ }
+ }
+
+ Operator<FilterDesc> newFilter = createFilter(source, parent, parentRR, syntheticExpr);
+ pCtx.getOpParseCtx().put(newFilter, new OpParseContext(parentRR));
+ parent = newFilter;
+ }
+
+ return null;
+ }
+
+ // calculate filter propagation directions for each alias
+ // L<->R for inner/semi join, L<-R for left outer join, R<-L for right outer
+ // join
+ private int[][] getTargets(CommonJoinOperator<JoinDesc> join) {
+ JoinCondDesc[] conds = join.getConf().getConds();
+
+ int aliases = conds.length + 1;
+ Vectors vector = new Vectors(aliases);
+ for (JoinCondDesc cond : conds) {
+ int left = cond.getLeft();
+ int right = cond.getRight();
+ switch (cond.getType()) {
+ case JoinDesc.INNER_JOIN:
+ case JoinDesc.LEFT_SEMI_JOIN:
+ vector.add(left, right);
+ vector.add(right, left);
+ break;
+ case JoinDesc.LEFT_OUTER_JOIN:
+ vector.add(right, left);
+ break;
+ case JoinDesc.RIGHT_OUTER_JOIN:
+ vector.add(left, right);
+ break;
+ case JoinDesc.FULL_OUTER_JOIN:
+ break;
+ }
+ }
+ int[][] result = new int[aliases][];
+ for (int pos = 0 ; pos < aliases; pos++) {
+ // find all targets recursively
+ result[pos] = vector.traverse(pos);
+ }
+ return result;
+ }
+ }
+
+ private static class Vectors {
+
+ private final Set<Integer>[] vector;
+
+ @SuppressWarnings("unchecked")
+ public Vectors(int length) {
+ vector = new Set[length];
+ }
+
+ public void add(int from, int to) {
+ if (vector[from] == null) {
+ vector[from] = new HashSet<Integer>();
+ }
+ vector[from].add(to);
+ }
+
+ public int[] traverse(int pos) {
+ Set<Integer> targets = new HashSet<Integer>();
+ traverse(targets, pos);
+ return toArray(targets);
+ }
+
+ private int[] toArray(Set<Integer> values) {
+ int index = 0;
+ int[] result = new int[values.size()];
+ for (int value : values) {
+ result[index++] = value;
+ }
+ return result;
+ }
+
+ private void traverse(Set<Integer> targets, int pos) {
+ if (vector[pos] == null) {
+ return;
+ }
+ for (int target : vector[pos]) {
+ if (targets.add(target)) {
+ traverse(targets, target);
+ }
+ }
+ }
+ }
+}
Added: hive/branches/tez/ql/src/test/queries/clientpositive/dynamic_partition_pruning.q
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/test/queries/clientpositive/dynamic_partition_pruning.q?rev=1622216&view=auto
==============================================================================
--- hive/branches/tez/ql/src/test/queries/clientpositive/dynamic_partition_pruning.q (added)
+++ hive/branches/tez/ql/src/test/queries/clientpositive/dynamic_partition_pruning.q Wed Sep 3 10:46:04 2014
@@ -0,0 +1,191 @@
+set hive.optimize.ppd=true;
+set hive.ppd.remove.duplicatefilters=true;
+set hive.tez.dynamic.partition.pruning=true;
+set hive.optimize.metadataonly=false;
+set hive.optimize.index.filter=true;
+
+
+select distinct ds from srcpart;
+select distinct hr from srcpart;
+
+EXPLAIN create table srcpart_date as select ds as ds, ds as date from srcpart group by ds;
+create table srcpart_date as select ds as ds, ds as date from srcpart group by ds;
+create table srcpart_hour as select hr as hr, hr as hour from srcpart group by hr;
+create table srcpart_date_hour as select ds as ds, ds as date, hr as hr, hr as hour from srcpart group by ds, hr;
+create table srcpart_double_hour as select (hr*2) as hr, hr as hour from srcpart group by hr;
+
+-- single column, single key
+EXPLAIN select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) where srcpart_date.date = '2008-04-08';
+select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) where srcpart_date.date = '2008-04-08';
+set hive.tez.dynamic.partition.pruning=false;
+EXPLAIN select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) where srcpart_date.date = '2008-04-08';
+select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) where srcpart_date.date = '2008-04-08';
+set hive.tez.dynamic.partition.pruning=true;
+select count(*) from srcpart where ds = '2008-04-08';
+
+-- multiple sources, single key
+EXPLAIN select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) join srcpart_hour on (srcpart.hr = srcpart_hour.hr)
+where srcpart_date.date = '2008-04-08' and srcpart_hour.hour = 11;
+select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) join srcpart_hour on (srcpart.hr = srcpart_hour.hr)
+where srcpart_date.date = '2008-04-08' and srcpart_hour.hour = 11;
+set hive.tez.dynamic.partition.pruning=false;
+EXPLAIN select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) join srcpart_hour on (srcpart.hr = srcpart_hour.hr)
+where srcpart_date.date = '2008-04-08' and srcpart_hour.hour = 11;
+select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) join srcpart_hour on (srcpart.hr = srcpart_hour.hr)
+where srcpart_date.date = '2008-04-08' and srcpart_hour.hour = 11;
+set hive.tez.dynamic.partition.pruning=true;
+select count(*) from srcpart where hr = 11 and ds = '2008-04-08';
+
+-- multiple columns single source
+EXPLAIN select count(*) from srcpart join srcpart_date_hour on (srcpart.ds = srcpart_date_hour.ds and srcpart.hr = srcpart_date_hour.hr) where srcpart_date_hour.date = '2008-04-08' and srcpart_date_hour.hour = 11;
+select count(*) from srcpart join srcpart_date_hour on (srcpart.ds = srcpart_date_hour.ds and srcpart.hr = srcpart_date_hour.hr) where srcpart_date_hour.date = '2008-04-08' and srcpart_date_hour.hour = 11;
+set hive.tez.dynamic.partition.pruning=false;
+EXPLAIN select count(*) from srcpart join srcpart_date_hour on (srcpart.ds = srcpart_date_hour.ds and srcpart.hr = srcpart_date_hour.hr) where srcpart_date_hour.date = '2008-04-08' and srcpart_date_hour.hour = 11;
+select count(*) from srcpart join srcpart_date_hour on (srcpart.ds = srcpart_date_hour.ds and srcpart.hr = srcpart_date_hour.hr) where srcpart_date_hour.date = '2008-04-08' and srcpart_date_hour.hour = 11;
+set hive.tez.dynamic.partition.pruning=true;
+select count(*) from srcpart where ds = '2008-04-08' and hr = 11;
+
+-- empty set
+EXPLAIN select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) where srcpart_date.date = 'I DONT EXIST';
+select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) where srcpart_date.date = 'I DONT EXIST';
+set hive.tez.dynamic.partition.pruning=false;
+EXPLAIN select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) where srcpart_date.date = 'I DONT EXIST';
+select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) where srcpart_date.date = 'I DONT EXIST';
+set hive.tez.dynamic.partition.pruning=true;
+select count(*) from srcpart where ds = 'I DONT EXIST';
+
+-- expressions
+EXPLAIN select count(*) from srcpart join srcpart_double_hour on (srcpart.hr = cast(srcpart_double_hour.hr/2 as int)) where srcpart_double_hour.hour = 11;
+select count(*) from srcpart join srcpart_double_hour on (srcpart.hr = cast(srcpart_double_hour.hr/2 as int)) where srcpart_double_hour.hour = 11;
+EXPLAIN select count(*) from srcpart join srcpart_double_hour on (srcpart.hr*2 = srcpart_double_hour.hr) where srcpart_double_hour.hour = 11;
+select count(*) from srcpart join srcpart_double_hour on (srcpart.hr*2 = srcpart_double_hour.hr) where srcpart_double_hour.hour = 11;
+set hive.tez.dynamic.partition.pruning=false;
+EXPLAIN select count(*) from srcpart join srcpart_double_hour on (srcpart.hr = cast(srcpart_double_hour.hr/2 as int)) where srcpart_double_hour.hour = 11;
+select count(*) from srcpart join srcpart_double_hour on (srcpart.hr = cast(srcpart_double_hour.hr/2 as int)) where srcpart_double_hour.hour = 11;
+EXPLAIN select count(*) from srcpart join srcpart_double_hour on (srcpart.hr*2 = srcpart_double_hour.hr) where srcpart_double_hour.hour = 11;
+select count(*) from srcpart join srcpart_double_hour on (srcpart.hr*2 = srcpart_double_hour.hr) where srcpart_double_hour.hour = 11;
+set hive.tez.dynamic.partition.pruning=true;
+select count(*) from srcpart where hr = 11;
+EXPLAIN select count(*) from srcpart join srcpart_double_hour on (cast(srcpart.hr*2 as string) = cast(srcpart_double_hour.hr as string)) where srcpart_double_hour.hour = 11;
+select count(*) from srcpart join srcpart_double_hour on (cast(srcpart.hr*2 as string) = cast(srcpart_double_hour.hr as string)) where srcpart_double_hour.hour = 11;
+set hive.tez.dynamic.partition.pruning=true;
+select count(*) from srcpart where cast(hr as string) = 11;
+
+
+-- parent is reduce tasks
+EXPLAIN select count(*) from srcpart join (select ds as ds, ds as date from srcpart group by ds) s on (srcpart.ds = s.ds) where s.date = '2008-04-08';
+select count(*) from srcpart join (select ds as ds, ds as date from srcpart group by ds) s on (srcpart.ds = s.ds) where s.date = '2008-04-08';
+select count(*) from srcpart where ds = '2008-04-08';
+
+-- non-equi join
+EXPLAIN select count(*) from srcpart, srcpart_date_hour where (srcpart_date_hour.date = '2008-04-08' and srcpart_date_hour.hour = 11) and (srcpart.ds = srcpart_date_hour.ds or srcpart.hr = srcpart_date_hour.hr);
+select count(*) from srcpart, srcpart_date_hour where (srcpart_date_hour.date = '2008-04-08' and srcpart_date_hour.hour = 11) and (srcpart.ds = srcpart_date_hour.ds or srcpart.hr = srcpart_date_hour.hr);
+
+-- old style join syntax
+EXPLAIN select count(*) from srcpart, srcpart_date_hour where srcpart_date_hour.date = '2008-04-08' and srcpart_date_hour.hour = 11 and srcpart.ds = srcpart_date_hour.ds and srcpart.hr = srcpart_date_hour.hr;
+select count(*) from srcpart, srcpart_date_hour where srcpart_date_hour.date = '2008-04-08' and srcpart_date_hour.hour = 11 and srcpart.ds = srcpart_date_hour.ds and srcpart.hr = srcpart_date_hour.hr;
+
+-- left join
+EXPLAIN select count(*) from srcpart left join srcpart_date on (srcpart.ds = srcpart_date.ds) where srcpart_date.date = '2008-04-08';
+EXPLAIN select count(*) from srcpart_date left join srcpart on (srcpart.ds = srcpart_date.ds) where srcpart_date.date = '2008-04-08';
+
+-- full outer
+EXPLAIN select count(*) from srcpart full outer join srcpart_date on (srcpart.ds = srcpart_date.ds) where srcpart_date.date = '2008-04-08';
+
+-- with static pruning
+EXPLAIN select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) join srcpart_hour on (srcpart.hr = srcpart_hour.hr)
+where srcpart_date.date = '2008-04-08' and srcpart_hour.hour = 11 and srcpart.hr = 11;
+select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) join srcpart_hour on (srcpart.hr = srcpart_hour.hr)
+where srcpart_date.date = '2008-04-08' and srcpart_hour.hour = 11 and srcpart.hr = 11;
+EXPLAIN select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) join srcpart_hour on (srcpart.hr = srcpart_hour.hr)
+where srcpart_date.date = '2008-04-08' and srcpart.hr = 13;
+select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) join srcpart_hour on (srcpart.hr = srcpart_hour.hr)
+where srcpart_date.date = '2008-04-08' and srcpart.hr = 13;
+
+-- union + subquery
+EXPLAIN select count(*) from srcpart where srcpart.ds in (select max(srcpart.ds) from srcpart union all select min(srcpart.ds) from srcpart);
+select count(*) from srcpart where srcpart.ds in (select max(srcpart.ds) from srcpart union all select min(srcpart.ds) from srcpart);
+EXPLAIN select distinct(ds) from srcpart where srcpart.ds in (select max(srcpart.ds) from srcpart union all select min(srcpart.ds) from srcpart);
+select distinct(ds) from srcpart where srcpart.ds in (select max(srcpart.ds) from srcpart union all select min(srcpart.ds) from srcpart);
+EXPLAIN select ds from (select distinct(ds) as ds from srcpart union all select distinct(ds) as ds from srcpart) s where s.ds in (select max(srcpart.ds) from srcpart union all select min(srcpart.ds) from srcpart);
+select ds from (select distinct(ds) as ds from srcpart union all select distinct(ds) as ds from srcpart) s where s.ds in (select max(srcpart.ds) from srcpart union all select min(srcpart.ds) from srcpart);
+
+set hive.auto.convert.join=true;
+set hive.auto.convert.join.noconditionaltask = true;
+set hive.auto.convert.join.noconditionaltask.size = 10000000;
+
+-- single column, single key
+EXPLAIN select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) where srcpart_date.date = '2008-04-08';
+select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) where srcpart_date.date = '2008-04-08';
+select count(*) from srcpart where ds = '2008-04-08';
+
+-- multiple sources, single key
+EXPLAIN select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) join srcpart_hour on (srcpart.hr = srcpart_hour.hr)
+where srcpart_date.date = '2008-04-08' and srcpart_hour.hour = 11;
+select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) join srcpart_hour on (srcpart.hr = srcpart_hour.hr)
+where srcpart_date.date = '2008-04-08' and srcpart_hour.hour = 11;
+select count(*) from srcpart where hr = 11 and ds = '2008-04-08';
+
+-- multiple columns single source
+EXPLAIN select count(*) from srcpart join srcpart_date_hour on (srcpart.ds = srcpart_date_hour.ds and srcpart.hr = srcpart_date_hour.hr) where srcpart_date_hour.date = '2008-04-08' and srcpart_date_hour.hour = 11;
+select count(*) from srcpart join srcpart_date_hour on (srcpart.ds = srcpart_date_hour.ds and srcpart.hr = srcpart_date_hour.hr) where srcpart_date_hour.date = '2008-04-08' and srcpart_date_hour.hour = 11;
+select count(*) from srcpart where ds = '2008-04-08' and hr = 11;
+
+-- empty set
+EXPLAIN select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) where srcpart_date.date = 'I DONT EXIST';
+-- Disabled until TEZ-1486 is fixed
+-- select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) where srcpart_date.date = 'I DONT EXIST';
+
+-- expressions
+EXPLAIN select count(*) from srcpart join srcpart_double_hour on (srcpart.hr = cast(srcpart_double_hour.hr/2 as int)) where srcpart_double_hour.hour = 11;
+select count(*) from srcpart join srcpart_double_hour on (srcpart.hr = cast(srcpart_double_hour.hr/2 as int)) where srcpart_double_hour.hour = 11;
+EXPLAIN select count(*) from srcpart join srcpart_double_hour on (srcpart.hr*2 = srcpart_double_hour.hr) where srcpart_double_hour.hour = 11;
+select count(*) from srcpart join srcpart_double_hour on (srcpart.hr*2 = srcpart_double_hour.hr) where srcpart_double_hour.hour = 11;
+select count(*) from srcpart where hr = 11;
+
+-- parent is reduce tasks
+EXPLAIN select count(*) from srcpart join (select ds as ds, ds as date from srcpart group by ds) s on (srcpart.ds = s.ds) where s.date = '2008-04-08';
+select count(*) from srcpart join (select ds as ds, ds as date from srcpart group by ds) s on (srcpart.ds = s.ds) where s.date = '2008-04-08';
+select count(*) from srcpart where ds = '2008-04-08';
+
+-- left join
+EXPLAIN select count(*) from srcpart left join srcpart_date on (srcpart.ds = srcpart_date.ds) where srcpart_date.date = '2008-04-08';
+EXPLAIN select count(*) from srcpart_date left join srcpart on (srcpart.ds = srcpart_date.ds) where srcpart_date.date = '2008-04-08';
+
+-- full outer
+EXPLAIN select count(*) from srcpart full outer join srcpart_date on (srcpart.ds = srcpart_date.ds) where srcpart_date.date = '2008-04-08';
+
+-- with static pruning
+EXPLAIN select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) join srcpart_hour on (srcpart.hr = srcpart_hour.hr)
+where srcpart_date.date = '2008-04-08' and srcpart_hour.hour = 11 and srcpart.hr = 11;
+select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) join srcpart_hour on (srcpart.hr = srcpart_hour.hr)
+where srcpart_date.date = '2008-04-08' and srcpart_hour.hour = 11 and srcpart.hr = 11;
+EXPLAIN select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) join srcpart_hour on (srcpart.hr = srcpart_hour.hr)
+where srcpart_date.date = '2008-04-08' and srcpart.hr = 13;
+-- Disabled until TEZ-1486 is fixed
+-- select count(*) from srcpart join srcpart_date on (srcpart.ds = srcpart_date.ds) join srcpart_hour on (srcpart.hr = srcpart_hour.hr)
+-- where srcpart_date.date = '2008-04-08' and srcpart.hr = 13;
+
+-- union + subquery
+EXPLAIN select distinct(ds) from srcpart where srcpart.ds in (select max(srcpart.ds) from srcpart union all select min(srcpart.ds) from srcpart);
+select distinct(ds) from srcpart where srcpart.ds in (select max(srcpart.ds) from srcpart union all select min(srcpart.ds) from srcpart);
+
+
+-- different file format
+create table srcpart_orc (key int, value string) partitioned by (ds string, hr int) stored as orc;
+
+
+set hive.exec.dynamic.partition.mode=nonstrict;
+set hive.vectorized.execution.enabled=false;
+set hive.exec.max.dynamic.partitions=1000;
+
+insert into table srcpart_orc partition (ds, hr) select key, value, ds, hr from srcpart;
+EXPLAIN select count(*) from srcpart_orc join srcpart_date_hour on (srcpart_orc.ds = srcpart_date_hour.ds and srcpart_orc.hr = srcpart_date_hour.hr) where srcpart_date_hour.hour = 11 and (srcpart_date_hour.date = '2008-04-08' or srcpart_date_hour.date = '2008-04-09');
+select count(*) from srcpart_orc join srcpart_date_hour on (srcpart_orc.ds = srcpart_date_hour.ds and srcpart_orc.hr = srcpart_date_hour.hr) where srcpart_date_hour.hour = 11 and (srcpart_date_hour.date = '2008-04-08' or srcpart_date_hour.date = '2008-04-09');
+select count(*) from srcpart where (ds = '2008-04-08' or ds = '2008-04-09') and hr = 11;
+
+drop table srcpart_orc;
+drop table srcpart_date;
+drop table srcpart_hour;
+drop table srcpart_date_hour;
+drop table srcpart_double_hour;
Added: hive/branches/tez/ql/src/test/queries/clientpositive/dynamic_partition_pruning_2.q
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/test/queries/clientpositive/dynamic_partition_pruning_2.q?rev=1622216&view=auto
==============================================================================
--- hive/branches/tez/ql/src/test/queries/clientpositive/dynamic_partition_pruning_2.q (added)
+++ hive/branches/tez/ql/src/test/queries/clientpositive/dynamic_partition_pruning_2.q Wed Sep 3 10:46:04 2014
@@ -0,0 +1,41 @@
+set hive.optimize.ppd=true;
+set hive.ppd.remove.duplicatefilters=true;
+set hive.tez.dynamic.partition.pruning=true;
+set hive.optimize.metadataonly=false;
+set hive.optimize.index.filter=true;
+set hive.auto.convert.join=true;
+set hive.auto.convert.join.noconditionaltask = true;
+set hive.auto.convert.join.noconditionaltask.size = 10000000;
+
+create table dim_shops (id int, label string) row format delimited fields terminated by ',' stored as textfile;
+load data local inpath '../../data/files/dim_shops.txt' into table dim_shops;
+
+create table agg_01 (amount decimal) partitioned by (dim_shops_id int) row format delimited fields terminated by ',' stored as textfile;
+alter table agg_01 add partition (dim_shops_id = 1);
+alter table agg_01 add partition (dim_shops_id = 2);
+alter table agg_01 add partition (dim_shops_id = 3);
+
+load data local inpath '../../data/files/agg_01-p1.txt' into table agg_01 partition (dim_shops_id=1);
+load data local inpath '../../data/files/agg_01-p2.txt' into table agg_01 partition (dim_shops_id=2);
+load data local inpath '../../data/files/agg_01-p3.txt' into table agg_01 partition (dim_shops_id=3);
+
+select * from dim_shops;
+select * from agg_01;
+
+EXPLAIN SELECT d1.label, count(*), sum(agg.amount)
+FROM agg_01 agg,
+dim_shops d1
+WHERE agg.dim_shops_id = d1.id
+and
+d1.label in ('foo', 'bar')
+GROUP BY d1.label
+ORDER BY d1.label;
+
+SELECT d1.label, count(*), sum(agg.amount)
+FROM agg_01 agg,
+dim_shops d1
+WHERE agg.dim_shops_id = d1.id
+and
+d1.label in ('foo', 'bar')
+GROUP BY d1.label
+ORDER BY d1.label;
Modified: hive/branches/tez/ql/src/test/results/clientpositive/tez/bucket2.q.out
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/test/results/clientpositive/tez/bucket2.q.out?rev=1622216&r1=1622215&r2=1622216&view=diff
==============================================================================
Files hive/branches/tez/ql/src/test/results/clientpositive/tez/bucket2.q.out (original) and hive/branches/tez/ql/src/test/results/clientpositive/tez/bucket2.q.out Wed Sep 3 10:46:04 2014 differ
Modified: hive/branches/tez/ql/src/test/results/clientpositive/tez/bucket3.q.out
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/test/results/clientpositive/tez/bucket3.q.out?rev=1622216&r1=1622215&r2=1622216&view=diff
==============================================================================
Files hive/branches/tez/ql/src/test/results/clientpositive/tez/bucket3.q.out (original) and hive/branches/tez/ql/src/test/results/clientpositive/tez/bucket3.q.out Wed Sep 3 10:46:04 2014 differ
Modified: hive/branches/tez/ql/src/test/results/clientpositive/tez/bucket4.q.out
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/test/results/clientpositive/tez/bucket4.q.out?rev=1622216&r1=1622215&r2=1622216&view=diff
==============================================================================
Files hive/branches/tez/ql/src/test/results/clientpositive/tez/bucket4.q.out (original) and hive/branches/tez/ql/src/test/results/clientpositive/tez/bucket4.q.out Wed Sep 3 10:46:04 2014 differ
Modified: hive/branches/tez/ql/src/test/results/clientpositive/tez/cross_product_check_2.q.out
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/test/results/clientpositive/tez/cross_product_check_2.q.out?rev=1622216&r1=1622215&r2=1622216&view=diff
==============================================================================
--- hive/branches/tez/ql/src/test/results/clientpositive/tez/cross_product_check_2.q.out (original)
+++ hive/branches/tez/ql/src/test/results/clientpositive/tez/cross_product_check_2.q.out Wed Sep 3 10:46:04 2014
@@ -83,7 +83,7 @@ STAGE PLANS:
Processor Tree:
ListSink
-Warning: Map Join MAPJOIN[16][bigTable=a] in task 'Map 3' is a cross product
+Warning: Map Join MAPJOIN[18][bigTable=a] in task 'Map 3' is a cross product
PREHOOK: query: explain select * from B d1 join B d2 on d1.key = d2.key join A
PREHOOK: type: QUERY
POSTHOOK: query: explain select * from B d1 join B d2 on d1.key = d2.key join A
@@ -171,7 +171,7 @@ STAGE PLANS:
Processor Tree:
ListSink
-Warning: Map Join MAPJOIN[23][bigTable=a] in task 'Map 4' is a cross product
+Warning: Map Join MAPJOIN[25][bigTable=a] in task 'Map 4' is a cross product
PREHOOK: query: explain select * from A join
(select d1.key
from B d1 join B d2 on d1.key = d2.key
@@ -396,7 +396,7 @@ STAGE PLANS:
Processor Tree:
ListSink
-Warning: Map Join MAPJOIN[28][bigTable=?] in task 'Reducer 5' is a cross product
+Warning: Map Join MAPJOIN[30][bigTable=?] in task 'Reducer 5' is a cross product
PREHOOK: query: explain select * from
(select A.key from A group by key) ss join
(select d1.key from B d1 join B d2 on d1.key = d2.key where 1 = 1 group by d1.key) od1