You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by we...@apache.org on 2017/05/17 02:53:06 UTC
[28/50] [abbrv] hive git commit: HIVE-16602: Implement shared scans
with Tez (Jesus Camacho Rodriguez, reviewed by Ashutosh Chauhan)
HIVE-16602: Implement shared scans with Tez (Jesus Camacho Rodriguez, reviewed by Ashutosh Chauhan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/59f65772
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/59f65772
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/59f65772
Branch: refs/heads/hive-14535
Commit: 59f6577296bfcdb5d8e74657f7eb6d6294630b23
Parents: f2fa83c
Author: Jesus Camacho Rodriguez <jc...@apache.org>
Authored: Thu May 4 20:27:40 2017 +0100
Committer: Jesus Camacho Rodriguez <jc...@apache.org>
Committed: Sat May 13 08:09:14 2017 +0100
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 4 +
.../hive/ql/optimizer/ConvertJoinMapJoin.java | 9 +-
.../hive/ql/optimizer/MapJoinProcessor.java | 11 +-
.../ql/optimizer/ReduceSinkMapJoinProc.java | 6 +-
.../hive/ql/optimizer/SharedScanOptimizer.java | 625 ++++++
.../physical/GenMRSkewJoinProcessor.java | 2 +-
.../physical/GenSparkSkewJoinProcessor.java | 2 +-
.../apache/hadoop/hive/ql/parse/GenTezWork.java | 10 +-
.../hadoop/hive/ql/parse/TezCompiler.java | 75 +-
.../apache/hadoop/hive/ql/plan/JoinDesc.java | 12 +
.../apache/hadoop/hive/ql/plan/MapJoinDesc.java | 3 +-
.../test/queries/clientpositive/perf/query88.q | 2 +
.../clientpositive/llap/auto_join0.q.out | 15 +-
.../clientpositive/llap/auto_join30.q.out | 67 +-
.../llap/auto_sortmerge_join_9.q.out | 70 +-
.../llap/bucket_map_join_tez1.q.out | 11 +-
.../llap/correlationoptimizer2.q.out | 26 +-
.../llap/correlationoptimizer3.q.out | 188 +-
.../llap/correlationoptimizer6.q.out | 104 +-
.../llap/dynamic_partition_pruning.q.out | 92 +-
.../clientpositive/llap/except_distinct.q.out | 92 +-
.../clientpositive/llap/explainuser_1.q.out | 137 +-
.../clientpositive/llap/explainuser_2.q.out | 772 ++++---
.../clientpositive/llap/intersect_merge.q.out | 83 +-
.../results/clientpositive/llap/join46.q.out | 38 +-
.../llap/limit_join_transpose.q.out | 242 +--
.../clientpositive/llap/limit_pushdown.q.out | 17 +-
.../clientpositive/llap/llap_nullscan.q.out | 15 +-
.../results/clientpositive/llap/mapjoin46.q.out | 43 +-
.../test/results/clientpositive/llap/mrr.q.out | 82 +-
.../clientpositive/llap/multiMapJoin2.q.out | 209 +-
.../llap/offset_limit_ppd_optimizer.q.out | 17 +-
.../clientpositive/llap/subquery_in.q.out | 173 +-
.../clientpositive/llap/subquery_multi.q.out | 617 +++---
.../clientpositive/llap/subquery_notin.q.out | 1539 ++++++-------
.../clientpositive/llap/subquery_null_agg.q.out | 26 +-
.../clientpositive/llap/subquery_scalar.q.out | 359 ++--
.../clientpositive/llap/subquery_select.q.out | 957 ++++-----
.../clientpositive/llap/subquery_views.q.out | 274 +--
.../clientpositive/llap/tez_join_tests.q.out | 29 +-
.../clientpositive/llap/tez_joins_explain.q.out | 29 +-
.../clientpositive/llap/unionDistinct_1.q.out | 261 +--
.../clientpositive/llap/union_top_level.q.out | 50 +-
.../llap/vector_groupby_grouping_sets4.q.out | 43 +-
.../llap/vector_groupby_mapjoin.q.out | 26 +-
.../clientpositive/llap/vector_join30.q.out | 122 +-
.../vectorized_dynamic_partition_pruning.q.out | 108 +-
.../results/clientpositive/perf/query1.q.out | 88 +-
.../results/clientpositive/perf/query14.q.out | 2032 +++++++++---------
.../results/clientpositive/perf/query16.q.out | 27 +-
.../results/clientpositive/perf/query17.q.out | 68 +-
.../results/clientpositive/perf/query23.q.out | 477 ++--
.../results/clientpositive/perf/query25.q.out | 68 +-
.../results/clientpositive/perf/query28.q.out | 87 +-
.../results/clientpositive/perf/query29.q.out | 19 +-
.../results/clientpositive/perf/query30.q.out | 114 +-
.../results/clientpositive/perf/query31.q.out | 270 ++-
.../results/clientpositive/perf/query32.q.out | 34 +-
.../results/clientpositive/perf/query33.q.out | 252 ++-
.../results/clientpositive/perf/query38.q.out | 94 +-
.../results/clientpositive/perf/query39.q.out | 58 +-
.../results/clientpositive/perf/query46.q.out | 75 +-
.../results/clientpositive/perf/query5.q.out | 70 +-
.../results/clientpositive/perf/query51.q.out | 83 +-
.../results/clientpositive/perf/query56.q.out | 252 ++-
.../results/clientpositive/perf/query58.q.out | 120 +-
.../results/clientpositive/perf/query6.q.out | 19 +-
.../results/clientpositive/perf/query60.q.out | 252 ++-
.../results/clientpositive/perf/query64.q.out | 443 ++--
.../results/clientpositive/perf/query65.q.out | 86 +-
.../results/clientpositive/perf/query66.q.out | 90 +-
.../results/clientpositive/perf/query68.q.out | 75 +-
.../results/clientpositive/perf/query69.q.out | 76 +-
.../results/clientpositive/perf/query70.q.out | 44 +-
.../results/clientpositive/perf/query75.q.out | 258 ++-
.../results/clientpositive/perf/query76.q.out | 126 +-
.../results/clientpositive/perf/query80.q.out | 106 +-
.../results/clientpositive/perf/query81.q.out | 57 +-
.../results/clientpositive/perf/query83.q.out | 225 +-
.../results/clientpositive/perf/query85.q.out | 11 +-
.../results/clientpositive/perf/query87.q.out | 94 +-
.../results/clientpositive/perf/query88.q.out | 358 ++-
.../results/clientpositive/perf/query9.q.out | 170 +-
.../results/clientpositive/perf/query90.q.out | 58 +-
.../results/clientpositive/perf/query92.q.out | 35 +-
.../results/clientpositive/perf/query95.q.out | 70 +-
.../results/clientpositive/perf/query97.q.out | 35 +-
.../clientpositive/tez/explainanalyze_2.q.out | 412 ++--
.../clientpositive/tez/explainanalyze_3.q.out | 8 +-
.../clientpositive/tez/explainuser_3.q.out | 8 +-
90 files changed, 7086 insertions(+), 8012 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/59f65772/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 73e0290..d6a80ae 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1645,6 +1645,10 @@ public class HiveConf extends Configuration {
"If the skew information is correctly stored in the metadata, hive.optimize.skewjoin.compiletime\n" +
"would change the query plan to take care of it, and hive.optimize.skewjoin will be a no-op."),
+ HIVE_SHARED_SCAN_OPTIMIZATION("hive.optimize.shared.scan", true,
+ "Whether to enable shared scan optimizer. The optimizer finds scan operator over the same table\n" +
+ "in the query plan and merges them if they meet some preconditions."),
+
// CTE
HIVE_CTE_MATERIALIZE_THRESHOLD("hive.optimize.cte.materialize.threshold", -1,
"If the number of references to a CTE clause exceeds this threshold, Hive will materialize it\n" +
http://git-wip-us.apache.org/repos/asf/hive/blob/59f65772/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
index d0fdb52..0eec78e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
@@ -281,7 +281,8 @@ public class ConvertJoinMapJoin implements NodeProcessor {
joinOp.getConf().getBaseSrc(), joinOp).getSecond(),
null, joinDesc.getExprs(), null, null,
joinDesc.getOutputColumnNames(), mapJoinConversionPos, joinDesc.getConds(),
- joinDesc.getFilters(), joinDesc.getNoOuterJoin(), null, joinDesc.getNoConditionalTaskSize());
+ joinDesc.getFilters(), joinDesc.getNoOuterJoin(), null,
+ joinDesc.getNoConditionalTaskSize(), joinDesc.getInMemoryDataSize());
mapJoinDesc.setNullSafes(joinDesc.getNullSafes());
mapJoinDesc.setFilterMap(joinDesc.getFilterMap());
mapJoinDesc.setResidualFilterExprs(joinDesc.getResidualFilterExprs());
@@ -419,7 +420,6 @@ public class ConvertJoinMapJoin implements NodeProcessor {
// each side better have 0 or more RS. if either side is unbalanced, cannot convert.
// This is a workaround for now. Right fix would be to refactor code in the
// MapRecordProcessor and ReduceRecordProcessor with respect to the sources.
- @SuppressWarnings({"rawtypes","unchecked"})
Set<ReduceSinkOperator> set =
OperatorUtils.findOperatorsUpstream(parentOp.getParentOperators(),
ReduceSinkOperator.class);
@@ -719,6 +719,11 @@ public class ConvertJoinMapJoin implements NodeProcessor {
}
+ // We store the total memory that this MapJoin is going to use,
+ // which is calculated as totalSize/buckets, with totalSize
+ // equal to sum of small tables size.
+ joinOp.getConf().setInMemoryDataSize(totalSize/buckets);
+
return bigTablePosition;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/59f65772/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
index 85d46f3..f01fb9c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
@@ -29,8 +29,6 @@ import java.util.Map;
import java.util.Set;
import java.util.Stack;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.ObjectPair;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -78,6 +76,8 @@ import org.apache.hadoop.hive.ql.plan.SelectDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Implementation of one of the rule-based map join optimization. User passes hints to specify
@@ -432,7 +432,7 @@ public class MapJoinProcessor extends Transform {
smbJoinDesc.getOutputColumnNames(),
bigTablePos, smbJoinDesc.getConds(),
smbJoinDesc.getFilters(), smbJoinDesc.isNoOuterJoin(), smbJoinDesc.getDumpFilePrefix(),
- smbJoinDesc.getNoConditionalTaskSize());
+ smbJoinDesc.getNoConditionalTaskSize(), smbJoinDesc.getInMemoryDataSize());
mapJoinDesc.setStatistics(smbJoinDesc.getStatistics());
@@ -1184,8 +1184,9 @@ public class MapJoinProcessor extends Transform {
JoinCondDesc[] joinCondns = op.getConf().getConds();
MapJoinDesc mapJoinDescriptor =
new MapJoinDesc(keyExprMap, keyTableDesc, newValueExprs, valueTableDescs,
- valueFilteredTableDescs, outputColumnNames, mapJoinPos, joinCondns, filters, op
- .getConf().getNoOuterJoin(), dumpFilePrefix, op.getConf().getNoConditionalTaskSize());
+ valueFilteredTableDescs, outputColumnNames, mapJoinPos, joinCondns, filters,
+ op.getConf().getNoOuterJoin(), dumpFilePrefix,
+ op.getConf().getNoConditionalTaskSize(), op.getConf().getInMemoryDataSize());
mapJoinDescriptor.setStatistics(op.getConf().getStatistics());
mapJoinDescriptor.setTagOrder(tagOrder);
mapJoinDescriptor.setNullSafes(desc.getNullSafes());
http://git-wip-us.apache.org/repos/asf/hive/blob/59f65772/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
index 3a6baca..ac234d0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
@@ -219,8 +219,8 @@ public class ReduceSinkMapJoinProc implements NodeProcessor {
if (tableSize == 0) {
tableSize = 1;
}
- LOG.info("Mapjoin " + mapJoinOp + "(bucket map join = )" + joinConf.isBucketMapJoin()
- + ", pos: " + pos + " --> " + parentWork.getName() + " (" + keyCount
+ LOG.info("Mapjoin " + mapJoinOp + "(bucket map join = " + joinConf.isBucketMapJoin()
+ + "), pos: " + pos + " --> " + parentWork.getName() + " (" + keyCount
+ " keys estimated from " + rowCount + " rows, " + bucketCount + " buckets)");
joinConf.getParentToInput().put(pos, parentWork.getName());
if (keyCount != Long.MAX_VALUE) {
@@ -290,7 +290,7 @@ public class ReduceSinkMapJoinProc implements NodeProcessor {
ReduceSinkOperator r = null;
if (context.connectedReduceSinks.contains(parentRS)) {
- LOG.debug("Cloning reduce sink for multi-child broadcast edge");
+ LOG.debug("Cloning reduce sink " + parentRS + " for multi-child broadcast edge");
// we've already set this one up. Need to clone for the next work.
r = (ReduceSinkOperator) OperatorFactory.getAndMakeChild(
parentRS.getCompilationOpContext(),
http://git-wip-us.apache.org/repos/asf/hive/blob/59f65772/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedScanOptimizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedScanOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedScanOptimizer.java
new file mode 100644
index 0000000..d04fc64
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedScanOptimizer.java
@@ -0,0 +1,625 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator;
+import org.apache.hadoop.hive.ql.exec.DummyStoreOperator;
+import org.apache.hadoop.hive.ql.exec.FilterOperator;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.exec.OperatorUtils;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
+import org.apache.hadoop.hive.ql.exec.UnionOperator;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.SemiJoinBranchInfo;
+import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.FilterDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.stats.StatsUtils;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+
+/**
+ * Shared scan optimizer. This rule finds scan operator over the same table
+ * in the query plan and merges them if they meet some preconditions.
+ *
+ * TS TS TS
+ * | | -> / \
+ * Op Op Op Op
+ *
+ * <p>Currently it only works with the Tez execution engine.
+ */
+public class SharedScanOptimizer extends Transform {
+
+ private final static Logger LOG = LoggerFactory.getLogger(SharedScanOptimizer.class);
+
+ @Override
+ public ParseContext transform(ParseContext pctx) throws SemanticException {
+
+ final Map<String, TableScanOperator> topOps = pctx.getTopOps();
+ if (topOps.size() < 2) {
+ // Nothing to do, bail out
+ return pctx;
+ }
+
+ // Cache to use during optimization
+ SharedScanOptimizerCache optimizerCache = new SharedScanOptimizerCache();
+
+ // We will not apply this optimization on some table scan operators.
+ Set<TableScanOperator> excludeTableScanOps = gatherNotValidTableScanOps(pctx, optimizerCache);
+ LOG.debug("Exclude TableScan ops: {}", excludeTableScanOps);
+
+ // Map of dbName.TblName -> Pair(tableAlias, TSOperator)
+ Multimap<String, Entry<String, TableScanOperator>> tableNameToOps = splitTableScanOpsByTable(pctx);
+
+ // We enforce a certain order when we do the reutilization.
+ // In particular, we use size of table x number of reads to
+ // rank the tables.
+ List<Entry<String, Long>> sortedTables = rankTablesByAccumulatedSize(pctx, excludeTableScanOps);
+ LOG.debug("Sorted tables by size: {}", sortedTables);
+
+ // Execute optimization
+ Multimap<String, TableScanOperator> existingOps = ArrayListMultimap.create();
+ Set<String> entriesToRemove = new HashSet<>();
+ for (Entry<String, Long> tablePair : sortedTables) {
+ for (Entry<String, TableScanOperator> tableScanOpPair : tableNameToOps.get(tablePair.getKey())) {
+ TableScanOperator tsOp = tableScanOpPair.getValue();
+ if (excludeTableScanOps.contains(tsOp)) {
+ // Skip operator, currently we do not merge
+ continue;
+ }
+ String tableName = tablePair.getKey();
+ Collection<TableScanOperator> prevTsOps = existingOps.get(tableName);
+ if (!prevTsOps.isEmpty()) {
+ for (TableScanOperator prevTsOp : prevTsOps) {
+
+ // First we check if the two table scan operators can actually be merged
+ // If schemas do not match, we currently do not merge
+ List<String> prevTsOpNeededColumns = prevTsOp.getNeededColumns();
+ List<String> tsOpNeededColumns = tsOp.getNeededColumns();
+ if (prevTsOpNeededColumns.size() != tsOpNeededColumns.size()) {
+ // Skip
+ continue;
+ }
+ boolean notEqual = false;
+ for (int i = 0; i < prevTsOpNeededColumns.size(); i++) {
+ if (!prevTsOpNeededColumns.get(i).equals(tsOpNeededColumns.get(i))) {
+ notEqual = true;
+ break;
+ }
+ }
+ if (notEqual) {
+ // Skip
+ continue;
+ }
+ // If row limit does not match, we currently do not merge
+ if (prevTsOp.getConf().getRowLimit() != tsOp.getConf().getRowLimit()) {
+ // Skip
+ continue;
+ }
+
+ // It seems these two operators can be merged.
+ // Check that plan meets some preconditions before doing it.
+ // In particular, in the presence of map joins in the upstream plan:
+ // - we cannot exceed the noconditional task size, and
+ // - if we already merged the big table, we cannot merge the broadcast
+ // tables.
+ if (!validPreConditions(pctx, optimizerCache, prevTsOp, tsOp)) {
+ // Skip
+ LOG.debug("{} does not meet preconditions", tsOp);
+ continue;
+ }
+
+ // We can merge
+ ExprNodeGenericFuncDesc exprNode = null;
+ if (prevTsOp.getConf().getFilterExpr() != null) {
+ // Push filter on top of children
+ pushFilterToTopOfTableScan(optimizerCache, prevTsOp);
+ // Clone to push to table scan
+ exprNode = (ExprNodeGenericFuncDesc) prevTsOp.getConf().getFilterExpr();
+ }
+ if (tsOp.getConf().getFilterExpr() != null) {
+ // Push filter on top
+ pushFilterToTopOfTableScan(optimizerCache, tsOp);
+ ExprNodeGenericFuncDesc tsExprNode = tsOp.getConf().getFilterExpr();
+ if (exprNode != null && !exprNode.isSame(tsExprNode)) {
+ // We merge filters from previous scan by ORing with filters from current scan
+ if (exprNode.getGenericUDF() instanceof GenericUDFOPOr) {
+ List<ExprNodeDesc> newChildren = new ArrayList<>(exprNode.getChildren().size() + 1);
+ for (ExprNodeDesc childExprNode : exprNode.getChildren()) {
+ if (childExprNode.isSame(tsExprNode)) {
+ // We do not need to do anything, it is in the OR expression
+ break;
+ }
+ newChildren.add(childExprNode);
+ }
+ if (exprNode.getChildren().size() == newChildren.size()) {
+ newChildren.add(tsExprNode);
+ exprNode = ExprNodeGenericFuncDesc.newInstance(
+ new GenericUDFOPOr(),
+ newChildren);
+ }
+ } else {
+ exprNode = ExprNodeGenericFuncDesc.newInstance(
+ new GenericUDFOPOr(),
+ Arrays.<ExprNodeDesc>asList(exprNode, tsExprNode));
+ }
+ }
+ }
+ // Replace filter
+ prevTsOp.getConf().setFilterExpr(exprNode);
+ // Replace table scan operator
+ List<Operator<? extends OperatorDesc>> allChildren =
+ Lists.newArrayList(tsOp.getChildOperators());
+ for (Operator<? extends OperatorDesc> op : allChildren) {
+ tsOp.getChildOperators().remove(op);
+ op.replaceParent(tsOp, prevTsOp);
+ prevTsOp.getChildOperators().add(op);
+ }
+ entriesToRemove.add(tableScanOpPair.getKey());
+ // Remove and combine
+ optimizerCache.removeOpAndCombineWork(tsOp, prevTsOp);
+
+ LOG.debug("Merged {} into {}", tsOp, prevTsOp);
+
+ break;
+ }
+ if (!entriesToRemove.contains(tableScanOpPair.getKey())) {
+ existingOps.put(tableName, tsOp);
+ }
+ } else {
+ // Add to existing ops
+ existingOps.put(tableName, tsOp);
+ }
+ }
+ }
+ // Remove unused operators
+ for (String key : entriesToRemove) {
+ topOps.remove(key);
+ }
+
+ return pctx;
+ }
+
+ private static Set<TableScanOperator> gatherNotValidTableScanOps(
+ ParseContext pctx, SharedScanOptimizerCache optimizerCache) {
+ // Find TS operators with partition pruning enabled in plan
+ // because these TS may potentially read different data for
+ // different pipeline.
+ // These can be:
+ // 1) TS with static partitioning.
+ // TODO: Check partition list of different TS and do not add if they are identical
+ // 2) TS with DPP.
+ // TODO: Check if dynamic filters are identical and do not add.
+ // 3) TS with semijoin DPP.
+ // TODO: Check for dynamic filters.
+ Set<TableScanOperator> notValidTableScanOps = new HashSet<>();
+ // 1) TS with static partitioning.
+ Map<String, TableScanOperator> topOps = pctx.getTopOps();
+ for (TableScanOperator tsOp : topOps.values()) {
+ if (tsOp.getConf().getPartColumns() != null &&
+ !tsOp.getConf().getPartColumns().isEmpty()) {
+ notValidTableScanOps.add(tsOp);
+ }
+ }
+ // 2) TS with DPP.
+ Collection<Operator<? extends OperatorDesc>> tableScanOps =
+ Lists.<Operator<?>>newArrayList(topOps.values());
+ Set<AppMasterEventOperator> s =
+ OperatorUtils.findOperators(tableScanOps, AppMasterEventOperator.class);
+ for (AppMasterEventOperator a : s) {
+ if (a.getConf() instanceof DynamicPruningEventDesc) {
+ DynamicPruningEventDesc dped = (DynamicPruningEventDesc) a.getConf();
+ notValidTableScanOps.add(dped.getTableScan());
+ optimizerCache.tableScanToDPPSource.put(dped.getTableScan(), a);
+ }
+ }
+ // 3) TS with semijoin DPP.
+ for (Entry<ReduceSinkOperator, SemiJoinBranchInfo> e
+ : pctx.getRsToSemiJoinBranchInfo().entrySet()) {
+ notValidTableScanOps.add(e.getValue().getTsOp());
+ optimizerCache.tableScanToDPPSource.put(e.getValue().getTsOp(), e.getKey());
+ }
+ return notValidTableScanOps;
+ }
+
+ private static Multimap<String, Entry<String, TableScanOperator>> splitTableScanOpsByTable(
+ ParseContext pctx) {
+ Multimap<String, Entry<String, TableScanOperator>> tableNameToOps = ArrayListMultimap.create();
+ for (Entry<String, TableScanOperator> e : pctx.getTopOps().entrySet()) {
+ TableScanOperator tsOp = e.getValue();
+ tableNameToOps.put(
+ tsOp.getConf().getTableMetadata().getDbName() + "."
+ + tsOp.getConf().getTableMetadata().getTableName(), e);
+ }
+ return tableNameToOps;
+ }
+
+ private static List<Entry<String, Long>> rankTablesByAccumulatedSize(ParseContext pctx,
+ Set<TableScanOperator> excludeTables) {
+ Map<String, Long> tableToTotalSize = new HashMap<>();
+ for (Entry<String, TableScanOperator> e : pctx.getTopOps().entrySet()) {
+ TableScanOperator tsOp = e.getValue();
+ if (excludeTables.contains(tsOp)) {
+ // Skip operator, currently we do not merge
+ continue;
+ }
+ String tableName = tsOp.getConf().getTableMetadata().getDbName() + "."
+ + tsOp.getConf().getTableMetadata().getTableName();
+ long tableSize = tsOp.getStatistics() != null ?
+ tsOp.getStatistics().getDataSize() : 0L;
+ Long totalSize = tableToTotalSize.get(tableName);
+ if (totalSize != null) {
+ tableToTotalSize.put(tableName,
+ StatsUtils.safeAdd(totalSize, tableSize));
+ } else {
+ tableToTotalSize.put(tableName, tableSize);
+ }
+ }
+ List<Entry<String, Long>> sortedTables =
+ new LinkedList<>(tableToTotalSize.entrySet());
+ Collections.sort(sortedTables, Collections.reverseOrder(
+ new Comparator<Map.Entry<String, Long>>() {
+ public int compare(Map.Entry<String, Long> o1, Map.Entry<String, Long> o2) {
+ return (o1.getValue()).compareTo(o2.getValue());
+ }
+ }));
+ return sortedTables;
+ }
+
+ private static boolean validPreConditions(ParseContext pctx, SharedScanOptimizerCache optimizerCache,
+ TableScanOperator prevTsOp, TableScanOperator tsOp) {
+ // 1) The set of operators in the works of the TS operators need to meet
+ // some requirements. In particular:
+ // 1.1. None of the works that contain the TS operators can contain a Union
+ // operator. This is not supported yet as we might end up with cycles in
+ // the Tez DAG.
+ // 1.2. There cannot be more than one DummyStore operator in the new resulting
+ // work when the TS operators are merged. This is due to an assumption in
+ // MergeJoinProc that needs to be further explored.
+ // If any of these conditions are not met, we cannot merge.
+ // TODO: Extend rule so it can be apply for these cases.
+ final Set<Operator<?>> workOps1 = findWorkOperators(optimizerCache, prevTsOp);
+ final Set<Operator<?>> workOps2 = findWorkOperators(optimizerCache, tsOp);
+ boolean foundDummyStoreOp = false;
+ for (Operator<?> op : workOps1) {
+ if (op instanceof UnionOperator) {
+ // We cannot merge (1.1)
+ return false;
+ }
+ if (op instanceof DummyStoreOperator) {
+ foundDummyStoreOp = true;
+ }
+ }
+ for (Operator<?> op : workOps2) {
+ if (op instanceof UnionOperator) {
+ // We cannot merge (1.1)
+ return false;
+ }
+ if (foundDummyStoreOp && op instanceof DummyStoreOperator) {
+ // We cannot merge (1.2)
+ return false;
+ }
+ }
+ // 2) We check whether output works when we merge the operators will collide.
+ //
+ // Work1 Work2 (merge TS in W1 & W2) Work1
+ // \ / -> | | X
+ // Work3 Work3
+ //
+ // If we do, we cannot merge. The reason is that Tez currently does
+ // not support parallel edges, i.e., multiple edges from same work x
+ // into same work y.
+ final Set<Operator<?>> outputWorksOps1 = findChildWorkOperators(pctx, optimizerCache, prevTsOp);
+ final Set<Operator<?>> outputWorksOps2 = findChildWorkOperators(pctx, optimizerCache, tsOp);
+ if (!Collections.disjoint(outputWorksOps1, outputWorksOps2)) {
+ // We cannot merge
+ return false;
+ }
+ // 3) We check whether we will end up with same operators inputing on same work.
+ //
+ // Work1 (merge TS in W2 & W3) Work1
+ // / \ -> | | X
+ // Work2 Work3 Work2
+ //
+ // If we do, we cannot merge. The reason is the same as above, currently
+ // Tez currently does not support parallel edges.
+ final Set<Operator<?>> inputWorksOps1 = findParentWorkOperators(pctx, optimizerCache, prevTsOp);
+ final Set<Operator<?>> inputWorksOps2 = findParentWorkOperators(pctx, optimizerCache, tsOp);
+ if (!Collections.disjoint(inputWorksOps1, inputWorksOps2)) {
+ // We cannot merge
+ return false;
+ }
+ // 4) We check whether one of the operators is part of a work that is an input for
+ // the work of the other operator.
+ //
+ // Work1 (merge TS in W1 & W3) Work1
+ // | -> | X
+ // Work2 Work2
+ // | |
+ // Work3 Work1
+ //
+ // If we do, we cannot merge, as we would end up with a cycle in the DAG.
+ final Set<Operator<?>> descendantWorksOps1 =
+ findDescendantWorkOperators(pctx, optimizerCache, prevTsOp);
+ final Set<Operator<?>> descendantWorksOps2 =
+ findDescendantWorkOperators(pctx, optimizerCache, tsOp);
+ if (!Collections.disjoint(descendantWorksOps1, workOps2)
+ || !Collections.disjoint(workOps1, descendantWorksOps2)) {
+ return false;
+ }
+ // 5) We check whether merging the works would cause the size of
+ // the data in memory grow too large.
+ // TODO: Currently ignores GBY and PTF which may also buffer data in memory.
+ final Set<Operator<?>> newWorkOps = workOps1;
+ newWorkOps.addAll(workOps2);
+ long dataSize = 0L;
+ for (Operator<?> op : newWorkOps) {
+ if (op instanceof MapJoinOperator) {
+ MapJoinOperator mop = (MapJoinOperator) op;
+ dataSize = StatsUtils.safeAdd(dataSize, mop.getConf().getInMemoryDataSize());
+ if (dataSize > mop.getConf().getNoConditionalTaskSize()) {
+ // Size surpasses limit, we cannot convert
+ LOG.debug("accumulated data size: {} / max size: {}",
+ dataSize, mop.getConf().getNoConditionalTaskSize());
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ private static Set<Operator<?>> findParentWorkOperators(ParseContext pctx,
+ SharedScanOptimizerCache optimizerCache, Operator<?> start) {
+ // Find operators in work
+ Set<Operator<?>> workOps = findWorkOperators(optimizerCache, start);
+ // Gather input works operators
+ Set<Operator<?>> set = new HashSet<Operator<?>>();
+ for (Operator<?> op : workOps) {
+ if (op.getParentOperators() != null) {
+ for (Operator<?> parent : op.getParentOperators()) {
+ if (parent instanceof ReduceSinkOperator) {
+ set.addAll(findWorkOperators(optimizerCache, parent));
+ }
+ }
+ } else if (op instanceof TableScanOperator) {
+ // Check for DPP and semijoin DPP
+ for (Operator<?> parent : optimizerCache.tableScanToDPPSource.get((TableScanOperator) op)) {
+ set.addAll(findWorkOperators(optimizerCache, parent));
+ }
+ }
+ }
+ return set;
+ }
+
+ private static Set<Operator<?>> findChildWorkOperators(ParseContext pctx,
+ SharedScanOptimizerCache optimizerCache, Operator<?> start) {
+ // Find operators in work
+ Set<Operator<?>> workOps = findWorkOperators(optimizerCache, start);
+ // Gather output works operators
+ Set<Operator<?>> set = new HashSet<Operator<?>>();
+ for (Operator<?> op : workOps) {
+ if (op instanceof ReduceSinkOperator) {
+ if (op.getChildOperators() != null) {
+ // All children of RS are descendants
+ for (Operator<?> child : op.getChildOperators()) {
+ set.addAll(findWorkOperators(optimizerCache, child));
+ }
+ }
+ // Semijoin DPP work is considered an child because work needs
+ // to finish for it to execute
+ SemiJoinBranchInfo sjbi = pctx.getRsToSemiJoinBranchInfo().get(op);
+ if (sjbi != null) {
+ set.addAll(findWorkOperators(optimizerCache, sjbi.getTsOp()));
+ }
+ } else if(op.getConf() instanceof DynamicPruningEventDesc) {
+ // DPP work is considered an child because work needs
+ // to finish for it to execute
+ set.addAll(findWorkOperators(
+ optimizerCache, ((DynamicPruningEventDesc) op.getConf()).getTableScan()));
+ }
+ }
+ return set;
+ }
+
+ private static Set<Operator<?>> findDescendantWorkOperators(ParseContext pctx,
+ SharedScanOptimizerCache optimizerCache, Operator<?> start) {
+ // Find operators in work
+ Set<Operator<?>> workOps = findWorkOperators(optimizerCache, start);
+ // Gather output works operators
+ Set<Operator<?>> result = new HashSet<Operator<?>>();
+ Set<Operator<?>> set;
+ while (!workOps.isEmpty()) {
+ set = new HashSet<Operator<?>>();
+ for (Operator<?> op : workOps) {
+ if (op instanceof ReduceSinkOperator) {
+ if (op.getChildOperators() != null) {
+ // All children of RS are descendants
+ for (Operator<?> child : op.getChildOperators()) {
+ set.addAll(findWorkOperators(optimizerCache, child));
+ }
+ }
+ // Semijoin DPP work is considered a descendant because work needs
+ // to finish for it to execute
+ SemiJoinBranchInfo sjbi = pctx.getRsToSemiJoinBranchInfo().get(op);
+ if (sjbi != null) {
+ set.addAll(findWorkOperators(optimizerCache, sjbi.getTsOp()));
+ }
+ } else if(op.getConf() instanceof DynamicPruningEventDesc) {
+ // DPP work is considered a descendant because work needs
+ // to finish for it to execute
+ set.addAll(findWorkOperators(
+ optimizerCache, ((DynamicPruningEventDesc) op.getConf()).getTableScan()));
+ }
+ }
+ workOps = set;
+ result.addAll(set);
+ }
+ return result;
+ }
+
+ // Stores result in cache
+ private static Set<Operator<?>> findWorkOperators(
+ SharedScanOptimizerCache optimizerCache, Operator<?> start) {
+ Set<Operator<?>> c = optimizerCache.operatorToWorkOperators.get(start);
+ if (!c.isEmpty()) {
+ return c;
+ }
+ c = findWorkOperators(start, new HashSet<Operator<?>>());
+ for (Operator<?> op : c) {
+ optimizerCache.operatorToWorkOperators.putAll(op, c);
+ }
+ return c;
+ }
+
+ private static Set<Operator<?>> findWorkOperators(Operator<?> start, Set<Operator<?>> found) {
+ found.add(start);
+ if (start.getParentOperators() != null) {
+ for (Operator<?> parent : start.getParentOperators()) {
+ if (parent instanceof ReduceSinkOperator) {
+ continue;
+ }
+ if (!found.contains(parent)) {
+ findWorkOperators(parent, found);
+ }
+ }
+ }
+ if (start instanceof ReduceSinkOperator) {
+ return found;
+ }
+ if (start.getChildOperators() != null) {
+ for (Operator<?> child : start.getChildOperators()) {
+ if (!found.contains(child)) {
+ findWorkOperators(child, found);
+ }
+ }
+ }
+ return found;
+ }
+
+ private static void pushFilterToTopOfTableScan(
+ SharedScanOptimizerCache optimizerCache, TableScanOperator tsOp)
+ throws UDFArgumentException {
+ ExprNodeGenericFuncDesc tableScanExprNode = tsOp.getConf().getFilterExpr();
+ List<Operator<? extends OperatorDesc>> allChildren =
+ Lists.newArrayList(tsOp.getChildOperators());
+ for (Operator<? extends OperatorDesc> op : allChildren) {
+ if (op instanceof FilterOperator) {
+ FilterOperator filterOp = (FilterOperator) op;
+ ExprNodeDesc filterExprNode = filterOp.getConf().getPredicate();
+ if (tableScanExprNode.isSame(filterExprNode)) {
+ // We do not need to do anything
+ return;
+ }
+ if (tableScanExprNode.getGenericUDF() instanceof GenericUDFOPOr) {
+ for (ExprNodeDesc childExprNode : tableScanExprNode.getChildren()) {
+ if (childExprNode.isSame(filterExprNode)) {
+ // We do not need to do anything, it is in the OR expression
+ // so probably we pushed previously
+ return;
+ }
+ }
+ }
+ ExprNodeGenericFuncDesc newPred = ExprNodeGenericFuncDesc.newInstance(
+ new GenericUDFOPAnd(),
+ Arrays.<ExprNodeDesc>asList(tableScanExprNode.clone(), filterExprNode));
+ filterOp.getConf().setPredicate(newPred);
+ } else {
+ Operator<FilterDesc> newOp = OperatorFactory.get(tsOp.getCompilationOpContext(),
+ new FilterDesc(tableScanExprNode.clone(), false),
+ new RowSchema(tsOp.getSchema().getSignature()));
+ tsOp.replaceChild(op, newOp);
+ newOp.getParentOperators().add(tsOp);
+ op.replaceParent(tsOp, newOp);
+ newOp.getChildOperators().add(op);
+ // Add to cache (same group as tsOp)
+ optimizerCache.putIfWorkExists(newOp, tsOp);
+ }
+ }
+ }
+
+ /** Cache to accelerate optimization */
+ private static class SharedScanOptimizerCache {
+ // Operators that belong to each work
+ final HashMultimap<Operator<?>, Operator<?>> operatorToWorkOperators =
+ HashMultimap.<Operator<?>, Operator<?>>create();
+ // Table scan operators to DPP sources
+ final Multimap<TableScanOperator, Operator<?>> tableScanToDPPSource =
+ HashMultimap.<TableScanOperator, Operator<?>>create();
+
+ // Add new operator to cache work group of existing operator (if group exists)
+ void putIfWorkExists(Operator<?> opToAdd, Operator<?> existingOp) {
+ List<Operator<?>> c = ImmutableList.copyOf(operatorToWorkOperators.get(existingOp));
+ if (!c.isEmpty()) {
+ for (Operator<?> op : c) {
+ operatorToWorkOperators.get(op).add(opToAdd);
+ }
+ operatorToWorkOperators.putAll(opToAdd, c);
+ operatorToWorkOperators.put(opToAdd, opToAdd);
+ }
+ }
+
+ // Remove operator and combine
+ void removeOpAndCombineWork(Operator<?> opToRemove, Operator<?> replacementOp) {
+ Set<Operator<?>> s = operatorToWorkOperators.get(opToRemove);
+ s.remove(opToRemove);
+ List<Operator<?>> c1 = ImmutableList.copyOf(s);
+ List<Operator<?>> c2 = ImmutableList.copyOf(operatorToWorkOperators.get(replacementOp));
+ if (!c1.isEmpty() && !c2.isEmpty()) {
+ for (Operator<?> op1 : c1) {
+ operatorToWorkOperators.remove(op1, opToRemove); // Remove operator
+ operatorToWorkOperators.putAll(op1, c2); // Add ops of new collection
+ }
+ operatorToWorkOperators.removeAll(opToRemove); // Remove entry for operator
+ for (Operator<?> op2 : c2) {
+ operatorToWorkOperators.putAll(op2, c1); // Add ops to existing collection
+ }
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/59f65772/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
index f78bd7c..53abb21 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
@@ -282,7 +282,7 @@ public final class GenMRSkewJoinProcessor {
newJoinValues, newJoinValueTblDesc, newJoinValueTblDesc,joinDescriptor
.getOutputColumnNames(), i, joinDescriptor.getConds(),
joinDescriptor.getFilters(), joinDescriptor.getNoOuterJoin(), dumpFilePrefix,
- joinDescriptor.getNoConditionalTaskSize());
+ joinDescriptor.getNoConditionalTaskSize(), joinDescriptor.getInMemoryDataSize());
mapJoinDescriptor.setTagOrder(tags);
mapJoinDescriptor.setHandleSkewJoin(false);
mapJoinDescriptor.setNullSafes(joinDescriptor.getNullSafes());
http://git-wip-us.apache.org/repos/asf/hive/blob/59f65772/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
index c970611..a5f0b2a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
@@ -241,7 +241,7 @@ public class GenSparkSkewJoinProcessor {
newJoinValues, newJoinValueTblDesc, newJoinValueTblDesc, joinDescriptor
.getOutputColumnNames(), i, joinDescriptor.getConds(),
joinDescriptor.getFilters(), joinDescriptor.getNoOuterJoin(), dumpFilePrefix,
- joinDescriptor.getNoConditionalTaskSize());
+ joinDescriptor.getNoConditionalTaskSize(), joinDescriptor.getInMemoryDataSize());
mapJoinDescriptor.setTagOrder(tags);
mapJoinDescriptor.setHandleSkewJoin(false);
mapJoinDescriptor.setNullSafes(joinDescriptor.getNullSafes());
http://git-wip-us.apache.org/repos/asf/hive/blob/59f65772/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
index c87de16..188590f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
@@ -291,8 +291,14 @@ public class GenTezWork implements NodeProcessor {
// of the downstream work
for (ReduceSinkOperator r:
context.linkWorkWithReduceSinkMap.get(parentWork)) {
+ if (!context.mapJoinParentMap.get(mj).contains(r)) {
+ // We might be visiting twice because of reutilization of intermediary results.
+ // If that is the case, we do not need to do anything because either we have
+ // already connected this RS operator or we will connect it at subsequent pass.
+ continue;
+ }
if (r.getConf().getOutputName() != null) {
- LOG.debug("Cloning reduce sink for multi-child broadcast edge");
+ LOG.debug("Cloning reduce sink " + r + " for multi-child broadcast edge");
// we've already set this one up. Need to clone for the next work.
r = (ReduceSinkOperator) OperatorFactory.getAndMakeChild(
r.getCompilationOpContext(), (ReduceSinkDesc)r.getConf().clone(),
@@ -370,7 +376,7 @@ public class GenTezWork implements NodeProcessor {
long bytesPerReducer = context.conf.getLongVar(HiveConf.ConfVars.BYTESPERREDUCER);
LOG.debug("Second pass. Leaf operator: "+operator
- +" has common downstream work:"+followingWork);
+ +" has common downstream work: "+followingWork);
if (operator instanceof DummyStoreOperator) {
// this is the small table side.
http://git-wip-us.apache.org/repos/asf/hive/blob/59f65772/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
index f469cd2..7e156f6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
@@ -17,26 +17,55 @@
*/
package org.apache.hadoop.hive.ql.parse;
-import org.apache.hadoop.hive.ql.optimizer.physical.LlapClusterStateForCompile;
-
-import com.google.common.base.Preconditions;
import java.io.Serializable;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Stack;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.hadoop.hive.ql.exec.*;
-import org.apache.hadoop.hive.ql.lib.*;
-import org.apache.hadoop.hive.ql.plan.*;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBloomFilter.GenericUDAFBloomFilterEvaluator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator;
+import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator;
+import org.apache.hadoop.hive.ql.exec.ConditionalTask;
+import org.apache.hadoop.hive.ql.exec.DummyStoreOperator;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.FilterOperator;
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.JoinOperator;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorUtils;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TezDummyStoreOperator;
+import org.apache.hadoop.hive.ql.exec.UnionOperator;
import org.apache.hadoop.hive.ql.exec.tez.TezTask;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.lib.CompositeProcessor;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.ForwardWalker;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.lib.PreOrderOnceWalker;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.ql.lib.RuleRegExp;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.optimizer.ConstantPropagate;
@@ -47,9 +76,11 @@ import org.apache.hadoop.hive.ql.optimizer.MergeJoinProc;
import org.apache.hadoop.hive.ql.optimizer.ReduceSinkMapJoinProc;
import org.apache.hadoop.hive.ql.optimizer.RemoveDynamicPruningBySize;
import org.apache.hadoop.hive.ql.optimizer.SetReducerParallelism;
+import org.apache.hadoop.hive.ql.optimizer.SharedScanOptimizer;
import org.apache.hadoop.hive.ql.optimizer.metainfo.annotation.AnnotateWithOpTraits;
import org.apache.hadoop.hive.ql.optimizer.physical.AnnotateRunTimeStatsOptimizer;
import org.apache.hadoop.hive.ql.optimizer.physical.CrossProductCheck;
+import org.apache.hadoop.hive.ql.optimizer.physical.LlapClusterStateForCompile;
import org.apache.hadoop.hive.ql.optimizer.physical.LlapDecider;
import org.apache.hadoop.hive.ql.optimizer.physical.LlapPreVectorizationPass;
import org.apache.hadoop.hive.ql.optimizer.physical.MemoryDecider;
@@ -60,10 +91,25 @@ import org.apache.hadoop.hive.ql.optimizer.physical.SerializeFilter;
import org.apache.hadoop.hive.ql.optimizer.physical.StageIDsRearranger;
import org.apache.hadoop.hive.ql.optimizer.physical.Vectorizer;
import org.apache.hadoop.hive.ql.optimizer.stats.annotation.AnnotateWithStatistics;
+import org.apache.hadoop.hive.ql.plan.AggregationDesc;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.ColStatistics;
+import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.MoveWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.Statistics;
+import org.apache.hadoop.hive.ql.plan.TezWork;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
-import org.apache.hadoop.hive.ql.exec.SelectOperator;
import org.apache.hadoop.hive.ql.stats.StatsUtils;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBloomFilter.GenericUDAFBloomFilterEvaluator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* TezCompiler translates the operator plan into TezTasks.
@@ -92,6 +138,7 @@ public class TezCompiler extends TaskCompiler {
PerfLogger perfLogger = SessionState.getPerfLogger();
// Create the context for the walker
OptimizeTezProcContext procCtx = new OptimizeTezProcContext(conf, pCtx, inputs, outputs);
+
perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.TEZ_COMPILER);
// setup dynamic partition pruning where possible
runDynamicPartitionPruning(procCtx, inputs, outputs);
@@ -136,6 +183,12 @@ public class TezCompiler extends TaskCompiler {
runCycleAnalysisForPartitionPruning(procCtx, inputs, outputs);
perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Run cycle analysis for partition pruning");
+ perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.TEZ_COMPILER);
+ if(procCtx.conf.getBoolVar(ConfVars.HIVE_SHARED_SCAN_OPTIMIZATION)) {
+ new SharedScanOptimizer().transform(procCtx.parseContext);
+ }
+ perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Shared scans optimization");
+
// need a new run of the constant folding because we might have created lots
// of "and true and true" conditions.
// Rather than run the full constant folding just need to shortcut AND/OR expressions
http://git-wip-us.apache.org/repos/asf/hive/blob/59f65772/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java
index c4fb3f3..12e1ff5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java
@@ -107,6 +107,9 @@ public class JoinDesc extends AbstractOperatorDesc {
private transient boolean leftInputJoin;
private transient List<String> streamAliases;
+ // represents the total memory that this Join operator will use if it is a MapJoin operator
+ protected transient long inMemoryDataSize;
+
// non-transient field, used at runtime to kill a task if it exceeded memory limits when running in LLAP
protected long noConditionalTaskSize;
@@ -202,6 +205,7 @@ public class JoinDesc extends AbstractOperatorDesc {
this.residualFilterExprs = clone.residualFilterExprs;
this.statistics = clone.statistics;
this.noConditionalTaskSize = clone.noConditionalTaskSize;
+ this.inMemoryDataSize = clone.inMemoryDataSize;
}
public Map<Byte, List<ExprNodeDesc>> getExprs() {
@@ -696,4 +700,12 @@ public class JoinDesc extends AbstractOperatorDesc {
public void setNoConditionalTaskSize(final long noConditionalTaskSize) {
this.noConditionalTaskSize = noConditionalTaskSize;
}
+
+ public long getInMemoryDataSize() {
+ return inMemoryDataSize;
+ }
+
+ public void setInMemoryDataSize(final long inMemoryDataSize) {
+ this.inMemoryDataSize = inMemoryDataSize;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/59f65772/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java
index 8da85d2..f387e6a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java
@@ -113,7 +113,7 @@ public class MapJoinDesc extends JoinDesc implements Serializable {
final List<TableDesc> valueTblDescs, final List<TableDesc> valueFilteredTblDescs, List<String> outputColumnNames,
final int posBigTable, final JoinCondDesc[] conds,
final Map<Byte, List<ExprNodeDesc>> filters, boolean noOuterJoin, String dumpFilePrefix,
- final long noConditionalTaskSize) {
+ final long noConditionalTaskSize, final long inMemoryDataSize) {
super(values, outputColumnNames, noOuterJoin, conds, filters, null, noConditionalTaskSize);
vectorDesc = null;
this.keys = keys;
@@ -123,6 +123,7 @@ public class MapJoinDesc extends JoinDesc implements Serializable {
this.posBigTable = posBigTable;
this.bigTableBucketNumMapping = new LinkedHashMap<String, Integer>();
this.dumpFilePrefix = dumpFilePrefix;
+ this.inMemoryDataSize = inMemoryDataSize;
initRetainExprList();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/59f65772/ql/src/test/queries/clientpositive/perf/query88.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/perf/query88.q b/ql/src/test/queries/clientpositive/perf/query88.q
index 2be814e..bb6ef6d 100644
--- a/ql/src/test/queries/clientpositive/perf/query88.q
+++ b/ql/src/test/queries/clientpositive/perf/query88.q
@@ -1,3 +1,5 @@
+set hive.strict.checks.cartesian.product=false;
+
explain
select *
from
http://git-wip-us.apache.org/repos/asf/hive/blob/59f65772/ql/src/test/results/clientpositive/llap/auto_join0.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/auto_join0.q.out b/ql/src/test/results/clientpositive/llap/auto_join0.q.out
index cba6001..6d051ea 100644
--- a/ql/src/test/results/clientpositive/llap/auto_join0.q.out
+++ b/ql/src/test/results/clientpositive/llap/auto_join0.q.out
@@ -30,10 +30,10 @@ STAGE PLANS:
Tez
#### A masked pattern was here ####
Edges:
- Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 6 (BROADCAST_EDGE)
+ Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (BROADCAST_EDGE)
Reducer 3 <- Reducer 2 (SIMPLE_EDGE)
Reducer 4 <- Reducer 3 (CUSTOM_SIMPLE_EDGE)
- Reducer 6 <- Map 5 (SIMPLE_EDGE)
+ Reducer 5 <- Map 1 (SIMPLE_EDGE)
#### A masked pattern was here ####
Vertices:
Map 1
@@ -52,13 +52,6 @@ STAGE PLANS:
key expressions: _col0 (type: string), _col1 (type: string)
sort order: ++
Statistics: Num rows: 166 Data size: 29548 Basic stats: COMPLETE Column stats: COMPLETE
- Execution mode: llap
- LLAP IO: no inputs
- Map 5
- Map Operator Tree:
- TableScan
- alias: src
- Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
Filter Operator
predicate: (key < 10) (type: boolean)
Statistics: Num rows: 166 Data size: 29548 Basic stats: COMPLETE Column stats: COMPLETE
@@ -87,7 +80,7 @@ STAGE PLANS:
1
outputColumnNames: _col0, _col1, _col2, _col3
input vertices:
- 1 Reducer 6
+ 1 Reducer 5
Statistics: Num rows: 27556 Data size: 9809936 Basic stats: COMPLETE Column stats: COMPLETE
Reduce Output Operator
key expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col3 (type: string)
@@ -124,7 +117,7 @@ STAGE PLANS:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- Reducer 6
+ Reducer 5
Execution mode: llap
Reduce Operator Tree:
Select Operator
http://git-wip-us.apache.org/repos/asf/hive/blob/59f65772/ql/src/test/results/clientpositive/llap/auto_join30.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/auto_join30.q.out b/ql/src/test/results/clientpositive/llap/auto_join30.q.out
index a26db55..cc59c5c 100644
--- a/ql/src/test/results/clientpositive/llap/auto_join30.q.out
+++ b/ql/src/test/results/clientpositive/llap/auto_join30.q.out
@@ -457,9 +457,9 @@ STAGE PLANS:
#### A masked pattern was here ####
Edges:
Reducer 2 <- Map 1 (SIMPLE_EDGE)
- Reducer 4 <- Map 3 (SIMPLE_EDGE), Reducer 2 (BROADCAST_EDGE), Reducer 7 (BROADCAST_EDGE)
+ Reducer 4 <- Map 3 (SIMPLE_EDGE), Reducer 2 (BROADCAST_EDGE), Reducer 6 (BROADCAST_EDGE)
Reducer 5 <- Reducer 4 (CUSTOM_SIMPLE_EDGE)
- Reducer 7 <- Map 6 (SIMPLE_EDGE)
+ Reducer 6 <- Map 3 (SIMPLE_EDGE)
#### A masked pattern was here ####
Vertices:
Map 1
@@ -497,13 +497,6 @@ STAGE PLANS:
sort order: +
Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
value expressions: _col0 (type: string)
- Execution mode: llap
- LLAP IO: no inputs
- Map 6
- Map Operator Tree:
- TableScan
- alias: src
- Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
Filter Operator
predicate: key is not null (type: boolean)
Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
@@ -548,7 +541,7 @@ STAGE PLANS:
outputColumnNames: _col2, _col3
input vertices:
0 Reducer 2
- 2 Reducer 7
+ 2 Reducer 6
Statistics: Num rows: 2974 Data size: 529372 Basic stats: COMPLETE Column stats: COMPLETE
Group By Operator
aggregations: sum(hash(_col2,_col3))
@@ -574,7 +567,7 @@ STAGE PLANS:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- Reducer 7
+ Reducer 6
Execution mode: llap
Reduce Operator Tree:
Select Operator
@@ -650,10 +643,10 @@ STAGE PLANS:
#### A masked pattern was here ####
Edges:
Reducer 2 <- Map 1 (SIMPLE_EDGE)
- Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE), Reducer 8 (SIMPLE_EDGE)
+ Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE), Reducer 7 (SIMPLE_EDGE)
Reducer 4 <- Reducer 3 (CUSTOM_SIMPLE_EDGE)
Reducer 6 <- Map 5 (SIMPLE_EDGE)
- Reducer 8 <- Map 7 (SIMPLE_EDGE)
+ Reducer 7 <- Map 5 (SIMPLE_EDGE)
#### A masked pattern was here ####
Vertices:
Map 1
@@ -685,13 +678,6 @@ STAGE PLANS:
sort order: +
Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
value expressions: _col0 (type: string)
- Execution mode: llap
- LLAP IO: no inputs
- Map 7
- Map Operator Tree:
- TableScan
- alias: src
- Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
Select Operator
expressions: key (type: string), value (type: string)
outputColumnNames: _col0, _col1
@@ -765,7 +751,7 @@ STAGE PLANS:
Map-reduce partition columns: _col0 (type: string)
Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
value expressions: _col1 (type: string)
- Reducer 8
+ Reducer 7
Execution mode: llap
Reduce Operator Tree:
Select Operator
@@ -841,10 +827,10 @@ STAGE PLANS:
#### A masked pattern was here ####
Edges:
Reducer 2 <- Map 1 (SIMPLE_EDGE)
- Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE), Reducer 8 (SIMPLE_EDGE)
+ Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE), Reducer 7 (SIMPLE_EDGE)
Reducer 4 <- Reducer 3 (CUSTOM_SIMPLE_EDGE)
Reducer 6 <- Map 5 (SIMPLE_EDGE)
- Reducer 8 <- Map 7 (SIMPLE_EDGE)
+ Reducer 7 <- Map 5 (SIMPLE_EDGE)
#### A masked pattern was here ####
Vertices:
Map 1
@@ -876,13 +862,6 @@ STAGE PLANS:
sort order: +
Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
value expressions: _col0 (type: string)
- Execution mode: llap
- LLAP IO: no inputs
- Map 7
- Map Operator Tree:
- TableScan
- alias: src
- Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
Select Operator
expressions: key (type: string), value (type: string)
outputColumnNames: _col0, _col1
@@ -956,7 +935,7 @@ STAGE PLANS:
Map-reduce partition columns: _col0 (type: string)
Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
value expressions: _col1 (type: string)
- Reducer 8
+ Reducer 7
Execution mode: llap
Reduce Operator Tree:
Select Operator
@@ -1032,10 +1011,10 @@ STAGE PLANS:
#### A masked pattern was here ####
Edges:
Reducer 2 <- Map 1 (SIMPLE_EDGE)
- Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE), Reducer 8 (SIMPLE_EDGE)
+ Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE), Reducer 7 (SIMPLE_EDGE)
Reducer 4 <- Reducer 3 (CUSTOM_SIMPLE_EDGE)
Reducer 6 <- Map 5 (SIMPLE_EDGE)
- Reducer 8 <- Map 7 (SIMPLE_EDGE)
+ Reducer 7 <- Map 5 (SIMPLE_EDGE)
#### A masked pattern was here ####
Vertices:
Map 1
@@ -1067,13 +1046,6 @@ STAGE PLANS:
sort order: +
Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
value expressions: _col0 (type: string)
- Execution mode: llap
- LLAP IO: no inputs
- Map 7
- Map Operator Tree:
- TableScan
- alias: src
- Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
Select Operator
expressions: key (type: string), value (type: string)
outputColumnNames: _col0, _col1
@@ -1147,7 +1119,7 @@ STAGE PLANS:
Map-reduce partition columns: _col0 (type: string)
Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
value expressions: _col1 (type: string)
- Reducer 8
+ Reducer 7
Execution mode: llap
Reduce Operator Tree:
Select Operator
@@ -1223,10 +1195,10 @@ STAGE PLANS:
#### A masked pattern was here ####
Edges:
Reducer 2 <- Map 1 (SIMPLE_EDGE)
- Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE), Reducer 8 (SIMPLE_EDGE)
+ Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE), Reducer 7 (SIMPLE_EDGE)
Reducer 4 <- Reducer 3 (CUSTOM_SIMPLE_EDGE)
Reducer 6 <- Map 5 (SIMPLE_EDGE)
- Reducer 8 <- Map 7 (SIMPLE_EDGE)
+ Reducer 7 <- Map 5 (SIMPLE_EDGE)
#### A masked pattern was here ####
Vertices:
Map 1
@@ -1258,13 +1230,6 @@ STAGE PLANS:
sort order: +
Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
value expressions: _col0 (type: string)
- Execution mode: llap
- LLAP IO: no inputs
- Map 7
- Map Operator Tree:
- TableScan
- alias: src
- Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
Select Operator
expressions: key (type: string), value (type: string)
outputColumnNames: _col0, _col1
@@ -1338,7 +1303,7 @@ STAGE PLANS:
Map-reduce partition columns: _col0 (type: string)
Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
value expressions: _col1 (type: string)
- Reducer 8
+ Reducer 7
Execution mode: llap
Reduce Operator Tree:
Select Operator
http://git-wip-us.apache.org/repos/asf/hive/blob/59f65772/ql/src/test/results/clientpositive/llap/auto_sortmerge_join_9.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/auto_sortmerge_join_9.q.out b/ql/src/test/results/clientpositive/llap/auto_sortmerge_join_9.q.out
index b69d0bd..bdb30d7 100644
--- a/ql/src/test/results/clientpositive/llap/auto_sortmerge_join_9.q.out
+++ b/ql/src/test/results/clientpositive/llap/auto_sortmerge_join_9.q.out
@@ -475,7 +475,7 @@ STAGE PLANS:
#### A masked pattern was here ####
Edges:
Map 1 <- Map 3 (BROADCAST_EDGE)
- Map 4 <- Map 6 (BROADCAST_EDGE)
+ Map 4 <- Map 3 (BROADCAST_EDGE)
Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (BROADCAST_EDGE)
Reducer 5 <- Map 4 (SIMPLE_EDGE)
#### A masked pattern was here ####
@@ -533,6 +533,18 @@ STAGE PLANS:
sort order: +
Map-reduce partition columns: _col0 (type: int)
Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: key is not null (type: boolean)
+ Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: key (type: int)
+ outputColumnNames: _col0
+ Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col0 (type: int)
+ sort order: +
+ Map-reduce partition columns: _col0 (type: int)
+ Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
Execution mode: llap
LLAP IO: no inputs
Map 4
@@ -555,7 +567,7 @@ STAGE PLANS:
1 _col0 (type: int)
outputColumnNames: _col0
input vertices:
- 1 Map 6
+ 1 Map 3
Statistics: Num rows: 11 Data size: 77 Basic stats: COMPLETE Column stats: NONE
Group By Operator
aggregations: count()
@@ -571,25 +583,6 @@ STAGE PLANS:
value expressions: _col1 (type: bigint)
Execution mode: llap
LLAP IO: no inputs
- Map 6
- Map Operator Tree:
- TableScan
- alias: b
- Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
- Filter Operator
- predicate: key is not null (type: boolean)
- Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
- Select Operator
- expressions: key (type: int)
- outputColumnNames: _col0
- Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
- Reduce Output Operator
- key expressions: _col0 (type: int)
- sort order: +
- Map-reduce partition columns: _col0 (type: int)
- Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
- Execution mode: llap
- LLAP IO: no inputs
Reducer 2
Execution mode: llap
Reduce Operator Tree:
@@ -2306,7 +2299,7 @@ STAGE PLANS:
#### A masked pattern was here ####
Edges:
Map 1 <- Map 3 (BROADCAST_EDGE)
- Map 4 <- Map 6 (BROADCAST_EDGE)
+ Map 4 <- Map 3 (BROADCAST_EDGE)
Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (BROADCAST_EDGE)
Reducer 5 <- Map 4 (SIMPLE_EDGE)
#### A masked pattern was here ####
@@ -2364,6 +2357,18 @@ STAGE PLANS:
sort order: +
Map-reduce partition columns: _col0 (type: int)
Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: key is not null (type: boolean)
+ Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: key (type: int)
+ outputColumnNames: _col0
+ Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col0 (type: int)
+ sort order: +
+ Map-reduce partition columns: _col0 (type: int)
+ Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
Execution mode: llap
LLAP IO: no inputs
Map 4
@@ -2386,7 +2391,7 @@ STAGE PLANS:
1 _col0 (type: int)
outputColumnNames: _col0
input vertices:
- 1 Map 6
+ 1 Map 3
Statistics: Num rows: 11 Data size: 77 Basic stats: COMPLETE Column stats: NONE
Group By Operator
aggregations: count()
@@ -2402,25 +2407,6 @@ STAGE PLANS:
value expressions: _col1 (type: bigint)
Execution mode: llap
LLAP IO: no inputs
- Map 6
- Map Operator Tree:
- TableScan
- alias: b
- Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
- Filter Operator
- predicate: key is not null (type: boolean)
- Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
- Select Operator
- expressions: key (type: int)
- outputColumnNames: _col0
- Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
- Reduce Output Operator
- key expressions: _col0 (type: int)
- sort order: +
- Map-reduce partition columns: _col0 (type: int)
- Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
- Execution mode: llap
- LLAP IO: no inputs
Reducer 2
Execution mode: llap
Reduce Operator Tree:
http://git-wip-us.apache.org/repos/asf/hive/blob/59f65772/ql/src/test/results/clientpositive/llap/bucket_map_join_tez1.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/bucket_map_join_tez1.q.out b/ql/src/test/results/clientpositive/llap/bucket_map_join_tez1.q.out
index 964d058..042c60b 100644
--- a/ql/src/test/results/clientpositive/llap/bucket_map_join_tez1.q.out
+++ b/ql/src/test/results/clientpositive/llap/bucket_map_join_tez1.q.out
@@ -757,7 +757,7 @@ STAGE PLANS:
#### A masked pattern was here ####
Edges:
Map 1 <- Map 3 (CUSTOM_EDGE)
- Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (CUSTOM_SIMPLE_EDGE)
+ Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 3 (CUSTOM_SIMPLE_EDGE)
#### A masked pattern was here ####
Vertices:
Map 1
@@ -818,13 +818,6 @@ STAGE PLANS:
Map-reduce partition columns: _col0 (type: int)
Statistics: Num rows: 242 Data size: 4502 Basic stats: COMPLETE Column stats: NONE
value expressions: _col1 (type: string)
- Execution mode: llap
- LLAP IO: no inputs
- Map 4
- Map Operator Tree:
- TableScan
- alias: b
- Statistics: Num rows: 242 Data size: 4502 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: key is not null (type: boolean)
Statistics: Num rows: 242 Data size: 4502 Basic stats: COMPLETE Column stats: NONE
@@ -861,7 +854,7 @@ STAGE PLANS:
1 _col0 (type: int)
outputColumnNames: _col0, _col1, _col3
input vertices:
- 1 Map 4
+ 1 Map 3
Statistics: Num rows: 302 Data size: 5633 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: _col1 (type: int), _col0 (type: double), _col3 (type: string)
http://git-wip-us.apache.org/repos/asf/hive/blob/59f65772/ql/src/test/results/clientpositive/llap/correlationoptimizer2.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/correlationoptimizer2.q.out b/ql/src/test/results/clientpositive/llap/correlationoptimizer2.q.out
index b628cb1..291a1f2 100644
--- a/ql/src/test/results/clientpositive/llap/correlationoptimizer2.q.out
+++ b/ql/src/test/results/clientpositive/llap/correlationoptimizer2.q.out
@@ -1738,9 +1738,9 @@ STAGE PLANS:
#### A masked pattern was here ####
Edges:
Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 5 (SIMPLE_EDGE)
- Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 7 (SIMPLE_EDGE)
+ Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
Reducer 4 <- Reducer 3 (CUSTOM_SIMPLE_EDGE)
- Reducer 7 <- Map 6 (SIMPLE_EDGE)
+ Reducer 6 <- Map 5 (SIMPLE_EDGE)
#### A masked pattern was here ####
Vertices:
Map 1
@@ -1780,13 +1780,6 @@ STAGE PLANS:
Map-reduce partition columns: _col0 (type: string)
Statistics: Num rows: 25 Data size: 4375 Basic stats: COMPLETE Column stats: COMPLETE
value expressions: _col1 (type: string)
- Execution mode: llap
- LLAP IO: no inputs
- Map 6
- Map Operator Tree:
- TableScan
- alias: z
- Statistics: Num rows: 25 Data size: 4375 Basic stats: COMPLETE Column stats: COMPLETE
Filter Operator
predicate: key is not null (type: boolean)
Statistics: Num rows: 25 Data size: 4375 Basic stats: COMPLETE Column stats: COMPLETE
@@ -1864,7 +1857,7 @@ STAGE PLANS:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- Reducer 7
+ Reducer 6
Execution mode: llap
Reduce Operator Tree:
Group By Operator
@@ -1929,9 +1922,9 @@ STAGE PLANS:
#### A masked pattern was here ####
Edges:
Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 5 (SIMPLE_EDGE)
- Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 7 (SIMPLE_EDGE)
+ Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
Reducer 4 <- Reducer 3 (CUSTOM_SIMPLE_EDGE)
- Reducer 7 <- Map 6 (SIMPLE_EDGE)
+ Reducer 6 <- Map 5 (SIMPLE_EDGE)
#### A masked pattern was here ####
Vertices:
Map 1
@@ -1971,13 +1964,6 @@ STAGE PLANS:
Map-reduce partition columns: _col0 (type: string)
Statistics: Num rows: 25 Data size: 4375 Basic stats: COMPLETE Column stats: COMPLETE
value expressions: _col1 (type: string)
- Execution mode: llap
- LLAP IO: no inputs
- Map 6
- Map Operator Tree:
- TableScan
- alias: z
- Statistics: Num rows: 25 Data size: 4375 Basic stats: COMPLETE Column stats: COMPLETE
Filter Operator
predicate: key is not null (type: boolean)
Statistics: Num rows: 25 Data size: 4375 Basic stats: COMPLETE Column stats: COMPLETE
@@ -2055,7 +2041,7 @@ STAGE PLANS:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- Reducer 7
+ Reducer 6
Execution mode: llap
Reduce Operator Tree:
Group By Operator