You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jc...@apache.org on 2018/01/05 02:44:48 UTC
[8/8] hive git commit: HIVE-18361: Extend shared work optimizer to
reuse computation beyond work boundaries (Jesus Camacho Rodriguez,
reviewed by Ashutosh Chauhan)
HIVE-18361: Extend shared work optimizer to reuse computation beyond work boundaries (Jesus Camacho Rodriguez, reviewed by Ashutosh Chauhan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/96a409e1
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/96a409e1
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/96a409e1
Branch: refs/heads/master
Commit: 96a409e1c6ec846eeb6c72b50bed60790ccc1836
Parents: 3f5148d
Author: Jesus Camacho Rodriguez <jc...@apache.org>
Authored: Thu Dec 21 17:08:07 2017 -0800
Committer: Jesus Camacho Rodriguez <jc...@apache.org>
Committed: Thu Jan 4 18:44:15 2018 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 4 +
.../test/resources/testconfiguration.properties | 1 +
.../hive/ql/optimizer/SharedWorkOptimizer.java | 407 ++++++-
.../test/queries/clientpositive/sharedworkext.q | 53 +
.../clientpositive/llap/explainuser_1.q.out | 47 +-
.../clientpositive/llap/sharedworkext.q.out | 1076 ++++++++++++++++++
.../clientpositive/llap/subquery_multi.q.out | 185 +--
.../clientpositive/llap/subquery_notin.q.out | 752 +++++-------
.../clientpositive/llap/subquery_scalar.q.out | 167 +--
.../clientpositive/llap/subquery_select.q.out | 110 +-
.../clientpositive/llap/subquery_views.q.out | 83 +-
.../llap/vector_groupby_grouping_id2.q.out | 138 +--
.../llap/vector_groupby_grouping_sets4.q.out | 71 +-
.../clientpositive/perf/tez/query14.q.out | 264 ++---
.../clientpositive/perf/tez/query2.q.out | 103 +-
.../clientpositive/perf/tez/query23.q.out | 360 ++----
.../clientpositive/perf/tez/query32.q.out | 30 +-
.../clientpositive/perf/tez/query33.q.out | 232 ++--
.../clientpositive/perf/tez/query44.q.out | 69 +-
.../clientpositive/perf/tez/query47.q.out | 224 ++--
.../clientpositive/perf/tez/query54.q.out | 34 +-
.../clientpositive/perf/tez/query56.q.out | 232 ++--
.../clientpositive/perf/tez/query57.q.out | 224 ++--
.../clientpositive/perf/tez/query58.q.out | 200 +---
.../clientpositive/perf/tez/query59.q.out | 52 +-
.../clientpositive/perf/tez/query60.q.out | 232 ++--
.../clientpositive/perf/tez/query61.q.out | 118 +-
.../clientpositive/perf/tez/query64.q.out | 468 +++-----
.../clientpositive/perf/tez/query70.q.out | 66 +-
.../clientpositive/perf/tez/query83.q.out | 116 +-
.../clientpositive/perf/tez/query90.q.out | 50 +-
.../clientpositive/perf/tez/query92.q.out | 30 +-
32 files changed, 3056 insertions(+), 3142 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/96a409e1/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 841d075..6529da6 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1695,6 +1695,10 @@ public class HiveConf extends Configuration {
HIVE_SHARED_WORK_OPTIMIZATION("hive.optimize.shared.work", true,
"Whether to enable shared work optimizer. The optimizer finds scan operator over the same table\n" +
"and follow-up operators in the query plan and merges them if they meet some preconditions. Tez only."),
+ HIVE_SHARED_WORK_EXTENDED_OPTIMIZATION("hive.optimize.shared.work.extended", true,
+ "Whether to enable shared work extended optimizer. The optimizer tries to merge equal operators\n" +
+ "after a work boundary after shared work optimizer has been executed. Requires hive.optimize.shared.work\n" +
+ "to be set to true. Tez only."),
HIVE_COMBINE_EQUIVALENT_WORK_OPTIMIZATION("hive.combine.equivalent.work.optimization", true, "Whether to " +
"combine equivalent work objects during physical optimization.\n This optimization looks for equivalent " +
"work objects and combines them if they meet certain preconditions. Spark only."),
http://git-wip-us.apache.org/repos/asf/hive/blob/96a409e1/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index 98e390c..ac81995 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -628,6 +628,7 @@ minillaplocal.query.files=\
semijoin6.q,\
semijoin7.q,\
semijoin_hint.q,\
+ sharedworkext.q,\
smb_cache.q,\
special_character_in_tabnames_1.q,\
sqlmerge.q,\
http://git-wip-us.apache.org/repos/asf/hive/blob/96a409e1/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java
index d4ddb75..08fec42 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java
@@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hadoop.hive.ql.optimizer;
import java.util.ArrayList;
@@ -36,6 +35,7 @@ import java.util.Set;
import java.util.TreeMap;
import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator;
import org.apache.hadoop.hive.ql.exec.DummyStoreOperator;
import org.apache.hadoop.hive.ql.exec.FilterOperator;
@@ -79,6 +79,7 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multiset;
+import com.google.common.collect.Sets;
import com.google.common.collect.TreeMultiset;
/**
@@ -102,8 +103,8 @@ import com.google.common.collect.TreeMultiset;
* | | / \
* Op Op Op Op
*
- * <p>A limitation in the current implementation is that the optimizer does not
- * go beyond a work boundary.
+ * <p>If the extended version of the optimizer is enabled, it can go beyond
+ * a work boundary to find reutilization opportunities.
*
* <p>The optimization only works with the Tez execution engine.
*/
@@ -146,13 +147,13 @@ public class SharedWorkOptimizer extends Transform {
String tableName = tablePair.getKey();
for (TableScanOperator discardableTsOp : tableNameToOps.get(tableName)) {
if (removedOps.contains(discardableTsOp)) {
- LOG.debug("Skip {} as it has been already removed", discardableTsOp);
+ LOG.debug("Skip {} as it has already been removed", discardableTsOp);
continue;
}
Collection<TableScanOperator> prevTsOps = existingOps.get(tableName);
for (TableScanOperator retainableTsOp : prevTsOps) {
if (removedOps.contains(retainableTsOp)) {
- LOG.debug("Skip {} as it has been already removed", retainableTsOp);
+ LOG.debug("Skip {} as it has already been removed", retainableTsOp);
continue;
}
@@ -167,7 +168,7 @@ public class SharedWorkOptimizer extends Transform {
// Secondly, we extract information about the part of the tree that can be merged
// as well as some structural information (memory consumption) that needs to be
// used to determined whether the merge can happen
- SharedResult sr = extractSharedOptimizationInfo(
+ SharedResult sr = extractSharedOptimizationInfoForRoot(
pctx, optimizerCache, retainableTsOp, discardableTsOp);
// It seems these two operators can be merged.
@@ -197,7 +198,8 @@ public class SharedWorkOptimizer extends Transform {
}
}
- LOG.debug("Merging subtree starting at {} into subtree starting at {}", discardableTsOp, retainableTsOp);
+ LOG.debug("Merging subtree starting at {} into subtree starting at {}",
+ discardableTsOp, retainableTsOp);
} else {
// Only TS operator
ExprNodeGenericFuncDesc exprNode = null;
@@ -262,6 +264,7 @@ public class SharedWorkOptimizer extends Transform {
!sr.discardableInputOps.contains(sjbi.getTsOp())) {
GenTezUtils.removeSemiJoinOperator(
pctx, (ReduceSinkOperator) op, sjbi.getTsOp());
+ optimizerCache.tableScanToDPPSource.remove(sjbi.getTsOp(), op);
}
} else if (op instanceof AppMasterEventOperator) {
DynamicPruningEventDesc dped = (DynamicPruningEventDesc) op.getConf();
@@ -269,6 +272,7 @@ public class SharedWorkOptimizer extends Transform {
!sr.discardableInputOps.contains(dped.getTableScan())) {
GenTezUtils.removeSemiJoinOperator(
pctx, (AppMasterEventOperator) op, dped.getTableScan());
+ optimizerCache.tableScanToDPPSource.remove(dped.getTableScan(), op);
}
}
LOG.debug("Input operator removed: {}", op);
@@ -292,10 +296,12 @@ public class SharedWorkOptimizer extends Transform {
GenTezUtils.removeSemiJoinOperator(pctx,
(ReduceSinkOperator) dppSource,
(TableScanOperator) sr.retainableOps.get(0));
+ optimizerCache.tableScanToDPPSource.remove(sr.retainableOps.get(0), op);
} else if (dppSource instanceof AppMasterEventOperator) {
GenTezUtils.removeSemiJoinOperator(pctx,
(AppMasterEventOperator) dppSource,
(TableScanOperator) sr.retainableOps.get(0));
+ optimizerCache.tableScanToDPPSource.remove(sr.retainableOps.get(0), op);
}
}
}
@@ -328,6 +334,203 @@ public class SharedWorkOptimizer extends Transform {
LOG.debug("After SharedWorkOptimizer:\n" + Operator.toString(pctx.getTopOps().values()));
}
+ if(pctx.getConf().getBoolVar(ConfVars.HIVE_SHARED_WORK_EXTENDED_OPTIMIZATION)) {
+ // Gather RS operators that 1) belong to root works, i.e., works containing TS operators,
+ // and 2) share the same input operator.
+ // These will be the first target for extended shared work optimization
+ Multimap<Operator<?>, ReduceSinkOperator> parentToRsOps = ArrayListMultimap.create();
+ Set<Operator<?>> visited = new HashSet<>();
+ for (Entry<String, TableScanOperator> e : topOps.entrySet()) {
+ gatherReduceSinkOpsByInput(parentToRsOps, visited,
+ findWorkOperators(optimizerCache, e.getValue()));
+ }
+
+ while (!parentToRsOps.isEmpty()) {
+ // As above, we enforce a certain order when we do the reutilization.
+ // In particular, we use size of data in RS x number of uses.
+ List<Entry<Operator<?>, Long>> sortedRSGroups =
+ rankOpsByAccumulatedSize(parentToRsOps.keySet());
+ LOG.debug("Sorted operators by size: {}", sortedRSGroups);
+
+ // Execute extended optimization
+ // For each RS, check whether other RS in same work could be merge into this one.
+ // If they are merged, RS operators in the resulting work will be considered
+ // mergeable in next loop iteration.
+ Multimap<Operator<?>, ReduceSinkOperator> existingRsOps = ArrayListMultimap.create();
+ for (Entry<Operator<?>, Long> rsGroupInfo : sortedRSGroups) {
+ Operator<?> rsParent = rsGroupInfo.getKey();
+ for (ReduceSinkOperator discardableRsOp : parentToRsOps.get(rsParent)) {
+ if (removedOps.contains(discardableRsOp)) {
+ LOG.debug("Skip {} as it has already been removed", discardableRsOp);
+ continue;
+ }
+ Collection<ReduceSinkOperator> otherRsOps = existingRsOps.get(rsParent);
+ for (ReduceSinkOperator retainableRsOp : otherRsOps) {
+ if (removedOps.contains(retainableRsOp)) {
+ LOG.debug("Skip {} as it has already been removed", retainableRsOp);
+ continue;
+ }
+
+ // First we quickly check if the two RS operators can actually be merged.
+ // We already know that these two RS operators have the same parent, but
+ // we need to check whether both RS are actually equal. Further, we check
+ // whether their child is also equal. If any of these conditions are not
+ // met, we are not going to try to merge.
+ boolean mergeable = compareOperator(pctx, retainableRsOp, discardableRsOp) &&
+ compareOperator(pctx, retainableRsOp.getChildOperators().get(0),
+ discardableRsOp.getChildOperators().get(0));
+ if (!mergeable) {
+ // Skip
+ LOG.debug("{} and {} cannot be merged", retainableRsOp, discardableRsOp);
+ continue;
+ }
+
+ LOG.debug("Checking additional conditions for merging subtree starting at {}"
+ + " into subtree starting at {}", discardableRsOp, retainableRsOp);
+
+ // Secondly, we extract information about the part of the tree that can be merged
+ // as well as some structural information (memory consumption) that needs to be
+ // used to determined whether the merge can happen
+ Operator<?> retainableRsOpChild = retainableRsOp.getChildOperators().get(0);
+ Operator<?> discardableRsOpChild = discardableRsOp.getChildOperators().get(0);
+ SharedResult sr = extractSharedOptimizationInfo(
+ pctx, optimizerCache, retainableRsOp, discardableRsOp,
+ retainableRsOpChild, discardableRsOpChild);
+
+ // It seems these two operators can be merged.
+ // Check that plan meets some preconditions before doing it.
+ // In particular, in the presence of map joins in the upstream plan:
+ // - we cannot exceed the noconditional task size, and
+ // - if we already merged the big table, we cannot merge the broadcast
+ // tables.
+ if (sr.retainableOps.isEmpty() || !validPreConditions(pctx, optimizerCache, sr)) {
+ // Skip
+ LOG.debug("{} and {} do not meet preconditions", retainableRsOp, discardableRsOp);
+ continue;
+ }
+
+ // We can merge
+ Operator<?> lastRetainableOp = sr.retainableOps.get(sr.retainableOps.size() - 1);
+ Operator<?> lastDiscardableOp = sr.discardableOps.get(sr.discardableOps.size() - 1);
+ if (lastDiscardableOp.getNumChild() != 0) {
+ List<Operator<? extends OperatorDesc>> allChildren =
+ Lists.newArrayList(lastDiscardableOp.getChildOperators());
+ for (Operator<? extends OperatorDesc> op : allChildren) {
+ lastDiscardableOp.getChildOperators().remove(op);
+ op.replaceParent(lastDiscardableOp, lastRetainableOp);
+ lastRetainableOp.getChildOperators().add(op);
+ }
+ }
+
+ LOG.debug("Merging subtree starting at {} into subtree starting at {}",
+ discardableRsOp, retainableRsOp);
+
+ // First we remove the input operators of the expression that
+ // we are going to eliminate
+ for (Operator<?> op : sr.discardableInputOps) {
+ OperatorUtils.removeOperator(op);
+ optimizerCache.removeOp(op);
+ removedOps.add(op);
+ // Remove DPP predicates
+ if (op instanceof ReduceSinkOperator) {
+ SemiJoinBranchInfo sjbi = pctx.getRsToSemiJoinBranchInfo().get(op);
+ if (sjbi != null && !sr.discardableOps.contains(sjbi.getTsOp()) &&
+ !sr.discardableInputOps.contains(sjbi.getTsOp())) {
+ GenTezUtils.removeSemiJoinOperator(
+ pctx, (ReduceSinkOperator) op, sjbi.getTsOp());
+ optimizerCache.tableScanToDPPSource.remove(sjbi.getTsOp(), op);
+ }
+ } else if (op instanceof AppMasterEventOperator) {
+ DynamicPruningEventDesc dped = (DynamicPruningEventDesc) op.getConf();
+ if (!sr.discardableOps.contains(dped.getTableScan()) &&
+ !sr.discardableInputOps.contains(dped.getTableScan())) {
+ GenTezUtils.removeSemiJoinOperator(
+ pctx, (AppMasterEventOperator) op, dped.getTableScan());
+ optimizerCache.tableScanToDPPSource.remove(dped.getTableScan(), op);
+ }
+ }
+ LOG.debug("Input operator removed: {}", op);
+ }
+ // We remove the discardable RS operator
+ OperatorUtils.removeOperator(discardableRsOp);
+ optimizerCache.removeOp(discardableRsOp);
+ removedOps.add(discardableRsOp);
+ LOG.debug("Operator removed: {}", discardableRsOp);
+ // Then we merge the operators of the works we are going to merge
+ optimizerCache.removeOpAndCombineWork(discardableRsOpChild, retainableRsOpChild);
+ // Finally we remove the rest of the expression from the tree
+ for (Operator<?> op : sr.discardableOps) {
+ OperatorUtils.removeOperator(op);
+ optimizerCache.removeOp(op);
+ removedOps.add(op);
+ LOG.debug("Operator removed: {}", op);
+ }
+
+ break;
+ }
+
+ if (removedOps.contains(discardableRsOp)) {
+ // This operator has been removed, remove it from the list of existing operators
+ existingRsOps.remove(rsParent, discardableRsOp);
+ } else {
+ // This operator has not been removed, include it in the list of existing operators
+ existingRsOps.put(rsParent, discardableRsOp);
+ }
+ }
+ }
+
+ // We gather the operators that will be used for next iteration of extended optimization
+ // (if any)
+ parentToRsOps = ArrayListMultimap.create();
+ visited = new HashSet<>();
+ for (Entry<Operator<?>, ReduceSinkOperator> e : existingRsOps.entries()) {
+ if (removedOps.contains(e.getValue()) || e.getValue().getNumChild() < 1) {
+ // If 1) RS has been removed, or 2) it does not have a child (for instance, it is a
+ // semijoin RS), we can quickly skip this one
+ continue;
+ }
+ gatherReduceSinkOpsByInput(parentToRsOps, visited,
+ findWorkOperators(optimizerCache, e.getValue().getChildOperators().get(0)));
+ }
+ }
+
+ // Remove unused table scan operators
+ it = topOps.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, TableScanOperator> e = it.next();
+ if (e.getValue().getNumChild() == 0) {
+ it.remove();
+ }
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("After SharedWorkExtendedOptimizer:\n"
+ + Operator.toString(pctx.getTopOps().values()));
+ }
+ }
+
+ // If we are running tests, we are going to verify that the contents of the cache
+ // correspond with the contents of the plan, and otherwise we fail.
+ // This check always run when we are running in test mode, independently on whether
+ // we use the basic or the extended version of the optimizer.
+ if (pctx.getConf().getBoolVar(ConfVars.HIVE_IN_TEST)) {
+ Set<Operator<?>> visited = new HashSet<>();
+ it = topOps.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, TableScanOperator> e = it.next();
+ for (Operator<?> op : OperatorUtils.findOperators(e.getValue(), Operator.class)) {
+ if (!visited.contains(op)) {
+ if (!findWorkOperators(optimizerCache, op).equals(
+ findWorkOperators(op, new HashSet<Operator<?>>()))) {
+ throw new SemanticException("Error in shared work optimizer: operator cache contents"
+ + "and actual plan differ");
+ }
+ visited.add(op);
+ }
+ }
+ }
+ }
+
return pctx;
}
@@ -392,16 +595,59 @@ public class SharedWorkOptimizer extends Transform {
}
}
List<Entry<String, Long>> sortedTables =
- new LinkedList<>(tableToTotalSize.entrySet());
+ new LinkedList<>(tableToTotalSize.entrySet());
Collections.sort(sortedTables, Collections.reverseOrder(
- new Comparator<Map.Entry<String, Long>>() {
- public int compare(Map.Entry<String, Long> o1, Map.Entry<String, Long> o2) {
- return (o1.getValue()).compareTo(o2.getValue());
- }
- }));
+ new Comparator<Map.Entry<String, Long>>() {
+ public int compare(Map.Entry<String, Long> o1, Map.Entry<String, Long> o2) {
+ return (o1.getValue()).compareTo(o2.getValue());
+ }
+ }));
return sortedTables;
}
+ private static void gatherReduceSinkOpsByInput(Multimap<Operator<?>,
+ ReduceSinkOperator> parentToRsOps, Set<Operator<?>> visited, Set<Operator<?>> ops) {
+ for (Operator<?> op : ops) {
+ // If the RS has other RS siblings, we will add it to be considered in next iteration
+ if (op instanceof ReduceSinkOperator && !visited.contains(op)) {
+ Operator<?> parent = op.getParentOperators().get(0);
+ Set<ReduceSinkOperator> s = new LinkedHashSet<>();
+ for (Operator<?> c : parent.getChildOperators()) {
+ if (c instanceof ReduceSinkOperator) {
+ s.add((ReduceSinkOperator) c);
+ visited.add(c);
+ }
+ }
+ if (s.size() > 1) {
+ parentToRsOps.putAll(parent, s);
+ }
+ }
+ }
+ }
+
+ private static List<Entry<Operator<?>, Long>> rankOpsByAccumulatedSize(Set<Operator<?>> opsSet) {
+ Map<Operator<?>, Long> opToTotalSize = new HashMap<>();
+ for (Operator<?> op : opsSet) {
+ long size = op.getStatistics() != null ?
+ op.getStatistics().getDataSize() : 0L;
+ opToTotalSize.put(op,
+ StatsUtils.safeMult(op.getChildOperators().size(), size));
+ }
+ List<Entry<Operator<?>, Long>> sortedOps =
+ new LinkedList<>(opToTotalSize.entrySet());
+ Collections.sort(sortedOps, Collections.reverseOrder(
+ new Comparator<Map.Entry<Operator<?>, Long>>() {
+ public int compare(Map.Entry<Operator<?>, Long> o1, Map.Entry<Operator<?>, Long> o2) {
+ int valCmp = o1.getValue().compareTo(o2.getValue());
+ if (valCmp == 0) {
+ return o1.getKey().toString().compareTo(o2.getKey().toString());
+ }
+ return valCmp;
+ }
+ }));
+ return sortedOps;
+ }
+
private static boolean areMergeable(ParseContext pctx, SharedWorkOptimizerCache optimizerCache,
TableScanOperator tsOp1, TableScanOperator tsOp2) throws SemanticException {
// First we check if the two table scan operators can actually be merged
@@ -486,15 +732,15 @@ public class SharedWorkOptimizer extends Transform {
return true;
}
- private static SharedResult extractSharedOptimizationInfo(ParseContext pctx,
+ private static SharedResult extractSharedOptimizationInfoForRoot(ParseContext pctx,
SharedWorkOptimizerCache optimizerCache,
TableScanOperator retainableTsOp,
TableScanOperator discardableTsOp) throws SemanticException {
- Set<Operator<?>> retainableOps = new LinkedHashSet<>();
- Set<Operator<?>> discardableOps = new LinkedHashSet<>();
+ LinkedHashSet<Operator<?>> retainableOps = new LinkedHashSet<>();
+ LinkedHashSet<Operator<?>> discardableOps = new LinkedHashSet<>();
Set<Operator<?>> discardableInputOps = new HashSet<>();
- long dataSize = 0l;
- long maxDataSize = 0l;
+ long dataSize = 0L;
+ long maxDataSize = 0L;
retainableOps.add(retainableTsOp);
discardableOps.add(discardableTsOp);
@@ -503,7 +749,8 @@ public class SharedWorkOptimizer extends Transform {
if (equalOp1.getNumChild() > 1 || equalOp2.getNumChild() > 1) {
// TODO: Support checking multiple child operators to merge further.
discardableInputOps.addAll(gatherDPPBranchOps(pctx, optimizerCache, discardableOps));
- return new SharedResult(retainableOps, discardableOps, discardableInputOps, dataSize, maxDataSize);
+ return new SharedResult(retainableOps, discardableOps, discardableInputOps,
+ dataSize, maxDataSize);
}
Operator<?> currentOp1 = retainableTsOp.getChildOperators().get(0);
Operator<?> currentOp2 = discardableTsOp.getChildOperators().get(0);
@@ -532,19 +779,54 @@ public class SharedWorkOptimizer extends Transform {
currentOp2.getChildOperators().size() > 1) {
// TODO: Support checking multiple child operators to merge further.
discardableInputOps.addAll(gatherDPPBranchOps(pctx, optimizerCache, discardableOps));
- discardableInputOps.addAll(gatherDPPBranchOps(pctx, optimizerCache, retainableOps, discardableInputOps));
- return new SharedResult(retainableOps, discardableOps, discardableInputOps, dataSize, maxDataSize);
+ discardableInputOps.addAll(gatherDPPBranchOps(pctx, optimizerCache, retainableOps,
+ discardableInputOps));
+ return new SharedResult(retainableOps, discardableOps, discardableInputOps,
+ dataSize, maxDataSize);
}
currentOp1 = currentOp1.getChildOperators().get(0);
currentOp2 = currentOp2.getChildOperators().get(0);
} else {
// Bail out
discardableInputOps.addAll(gatherDPPBranchOps(pctx, optimizerCache, discardableOps));
- discardableInputOps.addAll(gatherDPPBranchOps(pctx, optimizerCache, retainableOps, discardableInputOps));
- return new SharedResult(retainableOps, discardableOps, discardableInputOps, dataSize, maxDataSize);
+ discardableInputOps.addAll(gatherDPPBranchOps(pctx, optimizerCache, retainableOps,
+ discardableInputOps));
+ return new SharedResult(retainableOps, discardableOps, discardableInputOps,
+ dataSize, maxDataSize);
}
}
+ return extractSharedOptimizationInfo(pctx, optimizerCache, equalOp1, equalOp2,
+ currentOp1, currentOp2, retainableOps, discardableOps, discardableInputOps, false);
+ }
+
+ private static SharedResult extractSharedOptimizationInfo(ParseContext pctx,
+ SharedWorkOptimizerCache optimizerCache,
+ Operator<?> retainableOpEqualParent,
+ Operator<?> discardableOpEqualParent,
+ Operator<?> retainableOp,
+ Operator<?> discardableOp) throws SemanticException {
+ return extractSharedOptimizationInfo(pctx, optimizerCache,
+ retainableOpEqualParent, discardableOpEqualParent, retainableOp, discardableOp,
+ new LinkedHashSet<>(), new LinkedHashSet<>(), new HashSet<>(), true);
+ }
+
+ private static SharedResult extractSharedOptimizationInfo(ParseContext pctx,
+ SharedWorkOptimizerCache optimizerCache,
+ Operator<?> retainableOpEqualParent,
+ Operator<?> discardableOpEqualParent,
+ Operator<?> retainableOp,
+ Operator<?> discardableOp,
+ LinkedHashSet<Operator<?>> retainableOps,
+ LinkedHashSet<Operator<?>> discardableOps,
+ Set<Operator<?>> discardableInputOps,
+ boolean removeInputBranch) throws SemanticException {
+ Operator<?> equalOp1 = retainableOpEqualParent;
+ Operator<?> equalOp2 = discardableOpEqualParent;
+ Operator<?> currentOp1 = retainableOp;
+ Operator<?> currentOp2 = discardableOp;
+ long dataSize = 0L;
+ long maxDataSize = 0L;
// Try to merge rest of operators
while (!(currentOp1 instanceof ReduceSinkOperator)) {
// Check whether current operators are equal
@@ -563,7 +845,7 @@ public class SharedWorkOptimizer extends Transform {
for (; idx < currentOp1.getParentOperators().size(); idx++) {
Operator<?> parentOp1 = currentOp1.getParentOperators().get(idx);
Operator<?> parentOp2 = currentOp2.getParentOperators().get(idx);
- if (parentOp1 == equalOp1 && parentOp2 == equalOp2) {
+ if (parentOp1 == equalOp1 && parentOp2 == equalOp2 && !removeInputBranch) {
continue;
}
if ((parentOp1 == equalOp1 && parentOp2 != equalOp2) ||
@@ -572,7 +854,8 @@ public class SharedWorkOptimizer extends Transform {
break;
}
// Compare input
- List<Operator<?>> removeOpsForCurrentInput = compareAndGatherOps(pctx, parentOp1, parentOp2);
+ List<Operator<?>> removeOpsForCurrentInput =
+ compareAndGatherOps(pctx, parentOp1, parentOp2);
if (removeOpsForCurrentInput == null) {
// Inputs are not the same, bail out
break;
@@ -626,8 +909,10 @@ public class SharedWorkOptimizer extends Transform {
discardableInputOps.addAll(gatherDPPBranchOps(pctx, optimizerCache, discardableInputOps));
discardableInputOps.addAll(gatherDPPBranchOps(pctx, optimizerCache, discardableOps));
- discardableInputOps.addAll(gatherDPPBranchOps(pctx, optimizerCache, retainableOps, discardableInputOps));
- return new SharedResult(retainableOps, discardableOps, discardableInputOps, dataSize, maxDataSize);
+ discardableInputOps.addAll(gatherDPPBranchOps(pctx, optimizerCache, retainableOps,
+ discardableInputOps));
+ return new SharedResult(retainableOps, discardableOps, discardableInputOps,
+ dataSize, maxDataSize);
}
private static Multiset<String> extractConjsIgnoringDPPPreds(ExprNodeDesc predicate) {
@@ -656,7 +941,8 @@ public class SharedWorkOptimizer extends Transform {
Set<Operator<?>> dppBranches = new HashSet<>();
for (Operator<?> op : ops) {
if (op instanceof TableScanOperator) {
- Collection<Operator<?>> c = optimizerCache.tableScanToDPPSource.get((TableScanOperator) op);
+ Collection<Operator<?>> c = optimizerCache.tableScanToDPPSource
+ .get((TableScanOperator) op);
for (Operator<?> dppSource : c) {
// Remove the branches
Operator<?> currentOp = dppSource;
@@ -676,7 +962,8 @@ public class SharedWorkOptimizer extends Transform {
Set<Operator<?>> dppBranches = new HashSet<>();
for (Operator<?> op : ops) {
if (op instanceof TableScanOperator) {
- Collection<Operator<?>> c = optimizerCache.tableScanToDPPSource.get((TableScanOperator) op);
+ Collection<Operator<?>> c = optimizerCache.tableScanToDPPSource
+ .get((TableScanOperator) op);
for (Operator<?> dppSource : c) {
Set<Operator<?>> ascendants =
findAscendantWorkOperators(pctx, optimizerCache, dppSource);
@@ -711,6 +998,11 @@ public class SharedWorkOptimizer extends Transform {
return false;
}
+ if (gather && op2.getChildOperators().size() > 1) {
+ // If the second operator has more than one child, we stop gathering
+ gather = false;
+ }
+
if (gather) {
result.add(op2);
}
@@ -724,12 +1016,8 @@ public class SharedWorkOptimizer extends Transform {
for (int i = 0; i < op1ParentOperators.size(); i++) {
Operator<?> op1ParentOp = op1ParentOperators.get(i);
Operator<?> op2ParentOp = op2ParentOperators.get(i);
- boolean mergeable;
- if (gather && op2ParentOp.getChildOperators().size() < 2) {
- mergeable = compareAndGatherOps(pctx, op1ParentOp, op2ParentOp, result, true);
- } else {
- mergeable = compareAndGatherOps(pctx, op1ParentOp, op2ParentOp, result, false);
- }
+ boolean mergeable =
+ compareAndGatherOps(pctx, op1ParentOp, op2ParentOp, result, gather);
if (!mergeable) {
return false;
}
@@ -741,7 +1029,6 @@ public class SharedWorkOptimizer extends Transform {
return true;
}
- @SuppressWarnings({ "rawtypes", "unchecked" })
private static boolean compareOperator(ParseContext pctx, Operator<?> op1, Operator<?> op2)
throws SemanticException {
if (!op1.getClass().getName().equals(op2.getClass().getName())) {
@@ -809,21 +1096,21 @@ public class SharedWorkOptimizer extends Transform {
return false;
}
- TableScanOperator tsOp1 = (TableScanOperator) sr.retainableOps.get(0);
- TableScanOperator tsOp2 = (TableScanOperator) sr.discardableOps.get(0);
+ Operator<?> op1 = sr.retainableOps.get(0);
+ Operator<?> op2 = sr.discardableOps.get(0);
- // 1) The set of operators in the works of the TS operators need to meet
+ // 1) The set of operators in the works that we are merging need to meet
// some requirements. In particular:
- // 1.1. None of the works that contain the TS operators can contain a Union
+ // 1.1. None of the works that we are merging can contain a Union
// operator. This is not supported yet as we might end up with cycles in
// the Tez DAG.
// 1.2. There cannot be more than one DummyStore operator in the new resulting
- // work when the TS operators are merged. This is due to an assumption in
+ // work when the operators are merged. This is due to an assumption in
// MergeJoinProc that needs to be further explored.
// If any of these conditions are not met, we cannot merge.
// TODO: Extend rule so it can be applied for these cases.
- final Set<Operator<?>> workOps1 = findWorkOperators(optimizerCache, tsOp1);
- final Set<Operator<?>> workOps2 = findWorkOperators(optimizerCache, tsOp2);
+ final Set<Operator<?>> workOps1 = findWorkOperators(optimizerCache, op1);
+ final Set<Operator<?>> workOps2 = findWorkOperators(optimizerCache, op2);
boolean foundDummyStoreOp = false;
for (Operator<?> op : workOps1) {
if (op instanceof UnionOperator) {
@@ -853,8 +1140,8 @@ public class SharedWorkOptimizer extends Transform {
// If we do, we cannot merge. The reason is that Tez currently does
// not support parallel edges, i.e., multiple edges from same work x
// into same work y.
- final Set<Operator<?>> outputWorksOps1 = findChildWorkOperators(pctx, optimizerCache, tsOp1);
- final Set<Operator<?>> outputWorksOps2 = findChildWorkOperators(pctx, optimizerCache, tsOp2);
+ final Set<Operator<?>> outputWorksOps1 = findChildWorkOperators(pctx, optimizerCache, op1);
+ final Set<Operator<?>> outputWorksOps2 = findChildWorkOperators(pctx, optimizerCache, op2);
if (!Collections.disjoint(outputWorksOps1, outputWorksOps2)) {
// We cannot merge
return false;
@@ -866,10 +1153,19 @@ public class SharedWorkOptimizer extends Transform {
// Work2 Work3 Work2
//
// If we do, we cannot merge. The reason is the same as above, currently
- // Tez currently does not support parallel edges.
- final Set<Operator<?>> inputWorksOps1 = findParentWorkOperators(pctx, optimizerCache, tsOp1);
+ // Tez does not support parallel edges.
+ //
+ // In the check, we exclude the inputs to the root operator that we are trying
+ // to merge (only useful for extended merging as TS do not have inputs).
+ final Set<Operator<?>> excludeOps1 = sr.retainableOps.get(0).getNumParent() > 0 ?
+ ImmutableSet.copyOf(sr.retainableOps.get(0).getParentOperators()) : ImmutableSet.of();
+ final Set<Operator<?>> inputWorksOps1 =
+ findParentWorkOperators(pctx, optimizerCache, op1, excludeOps1);
+ final Set<Operator<?>> excludeOps2 = sr.discardableOps.get(0).getNumParent() > 0 ?
+ Sets.union(ImmutableSet.copyOf(sr.discardableOps.get(0).getParentOperators()), sr.discardableInputOps) :
+ sr.discardableInputOps;
final Set<Operator<?>> inputWorksOps2 =
- findParentWorkOperators(pctx, optimizerCache, tsOp2, sr.discardableInputOps);
+ findParentWorkOperators(pctx, optimizerCache, op2, excludeOps2);
if (!Collections.disjoint(inputWorksOps1, inputWorksOps2)) {
// We cannot merge
return false;
@@ -885,9 +1181,9 @@ public class SharedWorkOptimizer extends Transform {
//
// If we do, we cannot merge, as we would end up with a cycle in the DAG.
final Set<Operator<?>> descendantWorksOps1 =
- findDescendantWorkOperators(pctx, optimizerCache, tsOp1, sr.discardableInputOps);
+ findDescendantWorkOperators(pctx, optimizerCache, op1, sr.discardableInputOps);
final Set<Operator<?>> descendantWorksOps2 =
- findDescendantWorkOperators(pctx, optimizerCache, tsOp2, sr.discardableInputOps);
+ findDescendantWorkOperators(pctx, optimizerCache, op2, sr.discardableInputOps);
if (!Collections.disjoint(descendantWorksOps1, workOps2)
|| !Collections.disjoint(workOps1, descendantWorksOps2)) {
return false;
@@ -1120,6 +1416,12 @@ public class SharedWorkOptimizer extends Transform {
this.dataSize = dataSize;
this.maxDataSize = maxDataSize;
}
+
+ @Override
+ public String toString() {
+ return "SharedResult { " + this.retainableOps + "; " + this.discardableOps + "; "
+ + this.discardableInputOps + "};";
+ }
}
/** Cache to accelerate optimization */
@@ -1173,6 +1475,11 @@ public class SharedWorkOptimizer extends Transform {
}
}
}
+
+ @Override
+ public String toString() {
+ return "SharedWorkOptimizerCache { \n" + operatorToWorkOperators.toString() + "\n };";
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/96a409e1/ql/src/test/queries/clientpositive/sharedworkext.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/sharedworkext.q b/ql/src/test/queries/clientpositive/sharedworkext.q
new file mode 100644
index 0000000..b1801ea
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/sharedworkext.q
@@ -0,0 +1,53 @@
+EXPLAIN
+SELECT a.key FROM
+(
+ SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key)
+ GROUP BY a1.key, a2.value
+) a
+JOIN
+(
+ SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key)
+ GROUP BY a1.key, a2.value
+) b
+ON a.key = b.key;
+
+SELECT a.key FROM
+(
+ SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key)
+ GROUP BY a1.key, a2.value
+) a
+JOIN
+(
+ SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key)
+ GROUP BY a1.key, a2.value
+) b
+ON a.key = b.key;
+
+EXPLAIN
+SELECT a.key FROM
+(
+ SELECT rank() OVER (ORDER BY key) AS key FROM
+ (SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key)
+ GROUP BY a1.key, a2.value) a
+) a
+JOIN
+(
+ SELECT rank() OVER (ORDER BY key) AS key FROM
+ (SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key)
+ GROUP BY a1.key, a2.value) b
+) b
+ON a.key = b.key;
+
+SELECT a.key FROM
+(
+ SELECT rank() OVER (ORDER BY key) AS key FROM
+ (SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key)
+ GROUP BY a1.key, a2.value) a
+) a
+JOIN
+(
+ SELECT rank() OVER (ORDER BY key) AS key FROM
+ (SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key)
+ GROUP BY a1.key, a2.value) b
+) b
+ON a.key = b.key;
http://git-wip-us.apache.org/repos/asf/hive/blob/96a409e1/ql/src/test/results/clientpositive/llap/explainuser_1.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/explainuser_1.q.out b/ql/src/test/results/clientpositive/llap/explainuser_1.q.out
index 8ab5e3a..2fb1854 100644
--- a/ql/src/test/results/clientpositive/llap/explainuser_1.q.out
+++ b/ql/src/test/results/clientpositive/llap/explainuser_1.q.out
@@ -2363,10 +2363,9 @@ Plan optimized by CBO.
Vertex dependency in root stage
Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE), Reducer 6 (CUSTOM_SIMPLE_EDGE)
-Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 7 (SIMPLE_EDGE)
+Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
Reducer 4 <- Reducer 3 (SIMPLE_EDGE)
Reducer 6 <- Map 5 (CUSTOM_SIMPLE_EDGE)
-Reducer 7 <- Map 5 (CUSTOM_SIMPLE_EDGE)
Stage-0
Fetch Operator
@@ -2384,41 +2383,37 @@ Stage-0
predicate:((_col2 = 0) or (_col5 is null and _col1 is not null and (_col3 >= _col2)))
Merge Join Operator [MERGEJOIN_37] (rows=26 width=141)
Conds:RS_24.UDFToDouble(_col1)=RS_25._col0(Left Outer),Output:["_col0","_col1","_col2","_col3","_col5"]
+ <-Reducer 6 [SIMPLE_EDGE] llap
+ SHUFFLE [RS_25]
+ PartitionCols:_col0
+ Select Operator [SEL_20] (rows=1 width=12)
+ Output:["_col0","_col1"]
+ Group By Operator [GBY_7] (rows=1 width=8)
+ Output:["_col0"],aggregations:["avg(VALUE._col0)"]
+ <-Map 5 [CUSTOM_SIMPLE_EDGE] llap
+ PARTITION_ONLY_SHUFFLE [RS_6]
+ Group By Operator [GBY_5] (rows=1 width=76)
+ Output:["_col0"],aggregations:["avg(p_size)"]
+ Filter Operator [FIL_33] (rows=8 width=4)
+ predicate:(p_size < 10)
+ TableScan [TS_2] (rows=26 width=4)
+ default@part,part,Tbl:COMPLETE,Col:COMPLETE,Output:["p_size"]
<-Reducer 2 [SIMPLE_EDGE] llap
SHUFFLE [RS_24]
PartitionCols:UDFToDouble(_col1)
Merge Join Operator [MERGEJOIN_36] (rows=26 width=141)
Conds:(Inner),Output:["_col0","_col1","_col2","_col3"]
+ <-Reducer 6 [CUSTOM_SIMPLE_EDGE] llap
+ SHUFFLE [RS_22]
+ Group By Operator [GBY_12] (rows=1 width=16)
+ Output:["_col0","_col1"],aggregations:["count()","count(_col0)"]
+ Please refer to the previous Group By Operator [GBY_7]
<-Map 1 [CUSTOM_SIMPLE_EDGE] llap
PARTITION_ONLY_SHUFFLE [RS_21]
Select Operator [SEL_1] (rows=26 width=125)
Output:["_col0","_col1"]
TableScan [TS_0] (rows=26 width=125)
default@part,part,Tbl:COMPLETE,Col:COMPLETE,Output:["p_name","p_size"]
- <-Reducer 6 [CUSTOM_SIMPLE_EDGE] llap
- PARTITION_ONLY_SHUFFLE [RS_22]
- Group By Operator [GBY_12] (rows=1 width=16)
- Output:["_col0","_col1"],aggregations:["count()","count(_col0)"]
- Group By Operator [GBY_7] (rows=1 width=8)
- Output:["_col0"],aggregations:["avg(VALUE._col0)"]
- <-Map 5 [CUSTOM_SIMPLE_EDGE] llap
- PARTITION_ONLY_SHUFFLE [RS_6]
- Group By Operator [GBY_5] (rows=1 width=76)
- Output:["_col0"],aggregations:["avg(p_size)"]
- Filter Operator [FIL_33] (rows=8 width=4)
- predicate:(p_size < 10)
- TableScan [TS_2] (rows=26 width=4)
- default@part,part,Tbl:COMPLETE,Col:COMPLETE,Output:["p_size"]
- <-Reducer 7 [SIMPLE_EDGE] llap
- SHUFFLE [RS_25]
- PartitionCols:_col0
- Select Operator [SEL_20] (rows=1 width=12)
- Output:["_col0","_col1"]
- Group By Operator [GBY_19] (rows=1 width=8)
- Output:["_col0"],aggregations:["avg(VALUE._col0)"]
- <-Map 5 [CUSTOM_SIMPLE_EDGE] llap
- PARTITION_ONLY_SHUFFLE [RS_18]
- Please refer to the previous Group By Operator [GBY_5]
PREHOOK: query: explain select b.p_mfgr, min(p_retailprice)
from part b
http://git-wip-us.apache.org/repos/asf/hive/blob/96a409e1/ql/src/test/results/clientpositive/llap/sharedworkext.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/sharedworkext.q.out b/ql/src/test/results/clientpositive/llap/sharedworkext.q.out
new file mode 100644
index 0000000..e56b1ce
--- /dev/null
+++ b/ql/src/test/results/clientpositive/llap/sharedworkext.q.out
@@ -0,0 +1,1076 @@
+PREHOOK: query: EXPLAIN
+SELECT a.key FROM
+(
+ SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key)
+ GROUP BY a1.key, a2.value
+) a
+JOIN
+(
+ SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key)
+ GROUP BY a1.key, a2.value
+) b
+ON a.key = b.key
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN
+SELECT a.key FROM
+(
+ SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key)
+ GROUP BY a1.key, a2.value
+) a
+JOIN
+(
+ SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key)
+ GROUP BY a1.key, a2.value
+) b
+ON a.key = b.key
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+ Stage: Stage-1
+ Tez
+#### A masked pattern was here ####
+ Edges:
+ Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 6 (SIMPLE_EDGE)
+ Reducer 3 <- Reducer 2 (SIMPLE_EDGE)
+ Reducer 4 <- Reducer 3 (SIMPLE_EDGE), Reducer 5 (SIMPLE_EDGE)
+ Reducer 5 <- Reducer 2 (SIMPLE_EDGE)
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
+ alias: a2
+ Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+ Filter Operator
+ predicate: key is not null (type: boolean)
+ Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+ Select Operator
+ expressions: key (type: string), value (type: string)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+ Reduce Output Operator
+ key expressions: _col0 (type: string)
+ sort order: +
+ Map-reduce partition columns: _col0 (type: string)
+ Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+ value expressions: _col1 (type: string)
+ Execution mode: llap
+ LLAP IO: no inputs
+ Map 6
+ Map Operator Tree:
+ TableScan
+ alias: a1
+ Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
+ Filter Operator
+ predicate: key is not null (type: boolean)
+ Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
+ Select Operator
+ expressions: key (type: string)
+ outputColumnNames: _col0
+ Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
+ Reduce Output Operator
+ key expressions: _col0 (type: string)
+ sort order: +
+ Map-reduce partition columns: _col0 (type: string)
+ Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
+ Execution mode: llap
+ LLAP IO: no inputs
+ Reducer 2
+ Execution mode: llap
+ Reduce Operator Tree:
+ Merge Join Operator
+ condition map:
+ Inner Join 0 to 1
+ keys:
+ 0 _col0 (type: string)
+ 1 _col0 (type: string)
+ outputColumnNames: _col1, _col2
+ Statistics: Num rows: 809 Data size: 144002 Basic stats: COMPLETE Column stats: COMPLETE
+ Group By Operator
+ keys: _col2 (type: string), _col1 (type: string)
+ mode: hash
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 404 Data size: 71912 Basic stats: COMPLETE Column stats: COMPLETE
+ Reduce Output Operator
+ key expressions: _col0 (type: string), _col1 (type: string)
+ sort order: ++
+ Map-reduce partition columns: _col0 (type: string), _col1 (type: string)
+ Statistics: Num rows: 404 Data size: 71912 Basic stats: COMPLETE Column stats: COMPLETE
+ Reduce Output Operator
+ key expressions: _col0 (type: string), _col1 (type: string)
+ sort order: ++
+ Map-reduce partition columns: _col0 (type: string), _col1 (type: string)
+ Statistics: Num rows: 404 Data size: 71912 Basic stats: COMPLETE Column stats: COMPLETE
+ Reducer 3
+ Execution mode: llap
+ Reduce Operator Tree:
+ Group By Operator
+ keys: KEY._col0 (type: string), KEY._col1 (type: string)
+ mode: mergepartial
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 404 Data size: 71912 Basic stats: COMPLETE Column stats: COMPLETE
+ Select Operator
+ expressions: _col0 (type: string)
+ outputColumnNames: _col0
+ Statistics: Num rows: 404 Data size: 71912 Basic stats: COMPLETE Column stats: COMPLETE
+ Reduce Output Operator
+ key expressions: _col0 (type: string)
+ sort order: +
+ Map-reduce partition columns: _col0 (type: string)
+ Statistics: Num rows: 404 Data size: 71912 Basic stats: COMPLETE Column stats: COMPLETE
+ Reducer 4
+ Execution mode: llap
+ Reduce Operator Tree:
+ Merge Join Operator
+ condition map:
+ Inner Join 0 to 1
+ keys:
+ 0 _col0 (type: string)
+ 1 _col0 (type: string)
+ outputColumnNames: _col0
+ Statistics: Num rows: 528 Data size: 45936 Basic stats: COMPLETE Column stats: COMPLETE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 528 Data size: 45936 Basic stats: COMPLETE Column stats: COMPLETE
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ Reducer 5
+ Execution mode: llap
+ Reduce Operator Tree:
+ Group By Operator
+ keys: KEY._col0 (type: string), KEY._col1 (type: string)
+ mode: mergepartial
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 404 Data size: 71912 Basic stats: COMPLETE Column stats: COMPLETE
+ Select Operator
+ expressions: _col0 (type: string)
+ outputColumnNames: _col0
+ Statistics: Num rows: 404 Data size: 71912 Basic stats: COMPLETE Column stats: COMPLETE
+ Reduce Output Operator
+ key expressions: _col0 (type: string)
+ sort order: +
+ Map-reduce partition columns: _col0 (type: string)
+ Statistics: Num rows: 404 Data size: 71912 Basic stats: COMPLETE Column stats: COMPLETE
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+ Processor Tree:
+ ListSink
+
+PREHOOK: query: SELECT a.key FROM
+(
+ SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key)
+ GROUP BY a1.key, a2.value
+) a
+JOIN
+(
+ SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key)
+ GROUP BY a1.key, a2.value
+) b
+ON a.key = b.key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT a.key FROM
+(
+ SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key)
+ GROUP BY a1.key, a2.value
+) a
+JOIN
+(
+ SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key)
+ GROUP BY a1.key, a2.value
+) b
+ON a.key = b.key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+#### A masked pattern was here ####
+0
+10
+100
+103
+104
+105
+11
+111
+113
+114
+116
+118
+119
+12
+120
+125
+126
+128
+129
+131
+133
+134
+136
+137
+138
+143
+145
+146
+149
+15
+150
+152
+153
+155
+156
+157
+158
+160
+162
+163
+164
+165
+166
+167
+168
+169
+17
+170
+172
+174
+175
+176
+177
+178
+179
+18
+180
+181
+183
+186
+187
+189
+19
+190
+191
+192
+193
+194
+195
+196
+197
+199
+2
+20
+200
+201
+202
+203
+205
+207
+208
+209
+213
+214
+216
+217
+218
+219
+221
+222
+223
+224
+226
+228
+229
+230
+233
+235
+237
+238
+239
+24
+241
+242
+244
+247
+248
+249
+252
+255
+256
+257
+258
+26
+260
+262
+263
+265
+266
+27
+272
+273
+274
+275
+277
+278
+28
+280
+281
+282
+283
+284
+285
+286
+287
+288
+289
+291
+292
+296
+298
+30
+302
+305
+306
+307
+308
+309
+310
+311
+315
+316
+317
+318
+321
+322
+323
+325
+327
+33
+331
+332
+333
+335
+336
+338
+339
+34
+341
+342
+344
+345
+348
+35
+351
+353
+356
+360
+362
+364
+365
+366
+367
+368
+369
+37
+373
+374
+375
+377
+378
+379
+382
+384
+386
+389
+392
+393
+394
+395
+396
+397
+399
+4
+400
+401
+402
+403
+404
+406
+407
+409
+41
+411
+413
+414
+417
+418
+419
+42
+421
+424
+427
+429
+43
+430
+431
+432
+435
+436
+437
+438
+439
+44
+443
+444
+446
+448
+449
+452
+453
+454
+455
+457
+458
+459
+460
+462
+463
+466
+467
+468
+469
+47
+470
+472
+475
+477
+478
+479
+480
+481
+482
+483
+484
+485
+487
+489
+490
+491
+492
+493
+494
+495
+496
+497
+498
+5
+51
+53
+54
+57
+58
+64
+65
+66
+67
+69
+70
+72
+74
+76
+77
+78
+8
+80
+82
+83
+84
+85
+86
+87
+9
+90
+92
+95
+96
+97
+98
+PREHOOK: query: EXPLAIN
+SELECT a.key FROM
+(
+ SELECT rank() OVER (ORDER BY key) AS key FROM
+ (SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key)
+ GROUP BY a1.key, a2.value) a
+) a
+JOIN
+(
+ SELECT rank() OVER (ORDER BY key) AS key FROM
+ (SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key)
+ GROUP BY a1.key, a2.value) b
+) b
+ON a.key = b.key
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN
+SELECT a.key FROM
+(
+ SELECT rank() OVER (ORDER BY key) AS key FROM
+ (SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key)
+ GROUP BY a1.key, a2.value) a
+) a
+JOIN
+(
+ SELECT rank() OVER (ORDER BY key) AS key FROM
+ (SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key)
+ GROUP BY a1.key, a2.value) b
+) b
+ON a.key = b.key
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+ Stage: Stage-1
+ Tez
+#### A masked pattern was here ####
+ Edges:
+ Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 7 (SIMPLE_EDGE)
+ Reducer 3 <- Reducer 2 (SIMPLE_EDGE)
+ Reducer 4 <- Reducer 3 (SIMPLE_EDGE)
+ Reducer 5 <- Reducer 4 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
+ Reducer 6 <- Reducer 3 (SIMPLE_EDGE)
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
+ alias: a2
+ Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+ Filter Operator
+ predicate: key is not null (type: boolean)
+ Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+ Select Operator
+ expressions: key (type: string), value (type: string)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+ Reduce Output Operator
+ key expressions: _col0 (type: string)
+ sort order: +
+ Map-reduce partition columns: _col0 (type: string)
+ Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+ value expressions: _col1 (type: string)
+ Execution mode: llap
+ LLAP IO: no inputs
+ Map 7
+ Map Operator Tree:
+ TableScan
+ alias: a1
+ Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
+ Filter Operator
+ predicate: key is not null (type: boolean)
+ Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
+ Select Operator
+ expressions: key (type: string)
+ outputColumnNames: _col0
+ Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
+ Reduce Output Operator
+ key expressions: _col0 (type: string)
+ sort order: +
+ Map-reduce partition columns: _col0 (type: string)
+ Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
+ Execution mode: llap
+ LLAP IO: no inputs
+ Reducer 2
+ Execution mode: llap
+ Reduce Operator Tree:
+ Merge Join Operator
+ condition map:
+ Inner Join 0 to 1
+ keys:
+ 0 _col0 (type: string)
+ 1 _col0 (type: string)
+ outputColumnNames: _col1, _col2
+ Statistics: Num rows: 809 Data size: 144002 Basic stats: COMPLETE Column stats: COMPLETE
+ Group By Operator
+ keys: _col1 (type: string), _col2 (type: string)
+ mode: hash
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 404 Data size: 71912 Basic stats: COMPLETE Column stats: COMPLETE
+ Reduce Output Operator
+ key expressions: _col0 (type: string), _col1 (type: string)
+ sort order: ++
+ Map-reduce partition columns: _col0 (type: string), _col1 (type: string)
+ Statistics: Num rows: 404 Data size: 71912 Basic stats: COMPLETE Column stats: COMPLETE
+ Reducer 3
+ Execution mode: llap
+ Reduce Operator Tree:
+ Group By Operator
+ keys: KEY._col0 (type: string), KEY._col1 (type: string)
+ mode: mergepartial
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 404 Data size: 71912 Basic stats: COMPLETE Column stats: COMPLETE
+ Select Operator
+ expressions: _col1 (type: string)
+ outputColumnNames: _col0
+ Statistics: Num rows: 404 Data size: 71912 Basic stats: COMPLETE Column stats: COMPLETE
+ Reduce Output Operator
+ key expressions: 0 (type: int), _col0 (type: string)
+ sort order: ++
+ Map-reduce partition columns: 0 (type: int)
+ Statistics: Num rows: 404 Data size: 71912 Basic stats: COMPLETE Column stats: COMPLETE
+ Reduce Output Operator
+ key expressions: 0 (type: int), _col0 (type: string)
+ sort order: ++
+ Map-reduce partition columns: 0 (type: int)
+ Statistics: Num rows: 404 Data size: 71912 Basic stats: COMPLETE Column stats: COMPLETE
+ Reducer 4
+ Execution mode: llap
+ Reduce Operator Tree:
+ Select Operator
+ expressions: KEY.reducesinkkey1 (type: string)
+ outputColumnNames: _col0
+ Statistics: Num rows: 404 Data size: 35148 Basic stats: COMPLETE Column stats: COMPLETE
+ PTF Operator
+ Function definitions:
+ Input definition
+ input alias: ptf_0
+ output shape: _col0: string
+ type: WINDOWING
+ Windowing table definition
+ input alias: ptf_1
+ name: windowingtablefunction
+ order by: _col0 ASC NULLS FIRST
+ partition by: 0
+ raw input shape:
+ window functions:
+ window function definition
+ alias: rank_window_0
+ arguments: _col0
+ name: rank
+ window function: GenericUDAFRankEvaluator
+ window frame: ROWS PRECEDING(MAX)~FOLLOWING(MAX)
+ isPivotResult: true
+ Statistics: Num rows: 404 Data size: 35148 Basic stats: COMPLETE Column stats: COMPLETE
+ Filter Operator
+ predicate: rank_window_0 is not null (type: boolean)
+ Statistics: Num rows: 404 Data size: 35148 Basic stats: COMPLETE Column stats: COMPLETE
+ Select Operator
+ expressions: rank_window_0 (type: int)
+ outputColumnNames: _col0
+ Statistics: Num rows: 404 Data size: 1616 Basic stats: COMPLETE Column stats: COMPLETE
+ Reduce Output Operator
+ key expressions: _col0 (type: int)
+ sort order: +
+ Map-reduce partition columns: _col0 (type: int)
+ Statistics: Num rows: 404 Data size: 1616 Basic stats: COMPLETE Column stats: COMPLETE
+ Reducer 5
+ Execution mode: llap
+ Reduce Operator Tree:
+ Merge Join Operator
+ condition map:
+ Inner Join 0 to 1
+ keys:
+ 0 _col0 (type: int)
+ 1 _col0 (type: int)
+ outputColumnNames: _col0
+ Statistics: Num rows: 404 Data size: 1616 Basic stats: COMPLETE Column stats: COMPLETE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 404 Data size: 1616 Basic stats: COMPLETE Column stats: COMPLETE
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ Reducer 6
+ Execution mode: llap
+ Reduce Operator Tree:
+ Select Operator
+ expressions: KEY.reducesinkkey1 (type: string)
+ outputColumnNames: _col0
+ Statistics: Num rows: 404 Data size: 35148 Basic stats: COMPLETE Column stats: COMPLETE
+ PTF Operator
+ Function definitions:
+ Input definition
+ input alias: ptf_0
+ output shape: _col0: string
+ type: WINDOWING
+ Windowing table definition
+ input alias: ptf_1
+ name: windowingtablefunction
+ order by: _col0 ASC NULLS FIRST
+ partition by: 0
+ raw input shape:
+ window functions:
+ window function definition
+ alias: rank_window_0
+ arguments: _col0
+ name: rank
+ window function: GenericUDAFRankEvaluator
+ window frame: ROWS PRECEDING(MAX)~FOLLOWING(MAX)
+ isPivotResult: true
+ Statistics: Num rows: 404 Data size: 35148 Basic stats: COMPLETE Column stats: COMPLETE
+ Filter Operator
+ predicate: rank_window_0 is not null (type: boolean)
+ Statistics: Num rows: 404 Data size: 35148 Basic stats: COMPLETE Column stats: COMPLETE
+ Select Operator
+ expressions: rank_window_0 (type: int)
+ outputColumnNames: _col0
+ Statistics: Num rows: 404 Data size: 1616 Basic stats: COMPLETE Column stats: COMPLETE
+ Reduce Output Operator
+ key expressions: _col0 (type: int)
+ sort order: +
+ Map-reduce partition columns: _col0 (type: int)
+ Statistics: Num rows: 404 Data size: 1616 Basic stats: COMPLETE Column stats: COMPLETE
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+ Processor Tree:
+ ListSink
+
+PREHOOK: query: SELECT a.key FROM
+(
+ SELECT rank() OVER (ORDER BY key) AS key FROM
+ (SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key)
+ GROUP BY a1.key, a2.value) a
+) a
+JOIN
+(
+ SELECT rank() OVER (ORDER BY key) AS key FROM
+ (SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key)
+ GROUP BY a1.key, a2.value) b
+) b
+ON a.key = b.key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT a.key FROM
+(
+ SELECT rank() OVER (ORDER BY key) AS key FROM
+ (SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key)
+ GROUP BY a1.key, a2.value) a
+) a
+JOIN
+(
+ SELECT rank() OVER (ORDER BY key) AS key FROM
+ (SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key)
+ GROUP BY a1.key, a2.value) b
+) b
+ON a.key = b.key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+#### A masked pattern was here ####
+1
+2
+3
+4
+5
+6
+7
+8
+9
+10
+11
+12
+13
+14
+15
+16
+17
+18
+19
+20
+21
+22
+23
+24
+25
+26
+27
+28
+29
+30
+31
+32
+33
+34
+35
+36
+37
+38
+39
+40
+41
+42
+43
+44
+45
+46
+47
+48
+49
+50
+51
+52
+53
+54
+55
+56
+57
+58
+59
+60
+61
+62
+63
+64
+65
+66
+67
+68
+69
+70
+71
+72
+73
+74
+75
+76
+77
+78
+79
+80
+81
+82
+83
+84
+85
+86
+87
+88
+89
+90
+91
+92
+93
+94
+95
+96
+97
+98
+99
+100
+101
+102
+103
+104
+105
+106
+107
+108
+109
+110
+111
+112
+113
+114
+115
+116
+117
+118
+119
+120
+121
+122
+123
+124
+125
+126
+127
+128
+129
+130
+131
+132
+133
+134
+135
+136
+137
+138
+139
+140
+141
+142
+143
+144
+145
+146
+147
+148
+149
+150
+151
+152
+153
+154
+155
+156
+157
+158
+159
+160
+161
+162
+163
+164
+165
+166
+167
+168
+169
+170
+171
+172
+173
+174
+175
+176
+177
+178
+179
+180
+181
+182
+183
+184
+185
+186
+187
+188
+189
+190
+191
+192
+193
+194
+195
+196
+197
+198
+199
+200
+201
+202
+203
+204
+205
+206
+207
+208
+209
+210
+211
+212
+213
+214
+215
+216
+217
+218
+219
+220
+221
+222
+223
+224
+225
+226
+227
+228
+229
+230
+231
+232
+233
+234
+235
+236
+237
+238
+239
+240
+241
+242
+243
+244
+245
+246
+247
+248
+249
+250
+251
+252
+253
+254
+255
+256
+257
+258
+259
+260
+261
+262
+263
+264
+265
+266
+267
+268
+269
+270
+271
+272
+273
+274
+275
+276
+277
+278
+279
+280
+281
+282
+283
+284
+285
+286
+287
+288
+289
+290
+291
+292
+293
+294
+295
+296
+297
+298
+299
+300
+301
+302
+303
+304
+305
+306
+307
+308
+309
http://git-wip-us.apache.org/repos/asf/hive/blob/96a409e1/ql/src/test/results/clientpositive/llap/subquery_multi.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/subquery_multi.q.out b/ql/src/test/results/clientpositive/llap/subquery_multi.q.out
index 975fd13..45f698b 100644
--- a/ql/src/test/results/clientpositive/llap/subquery_multi.q.out
+++ b/ql/src/test/results/clientpositive/llap/subquery_multi.q.out
@@ -1663,13 +1663,12 @@ STAGE PLANS:
Tez
#### A masked pattern was here ####
Edges:
- Reducer 10 <- Reducer 9 (SIMPLE_EDGE)
Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 5 (SIMPLE_EDGE)
Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 8 (ONE_TO_ONE_EDGE)
- Reducer 4 <- Reducer 10 (ONE_TO_ONE_EDGE), Reducer 3 (SIMPLE_EDGE)
- Reducer 7 <- Map 11 (SIMPLE_EDGE), Map 6 (SIMPLE_EDGE)
+ Reducer 4 <- Reducer 3 (SIMPLE_EDGE), Reducer 9 (ONE_TO_ONE_EDGE)
+ Reducer 7 <- Map 10 (SIMPLE_EDGE), Map 6 (SIMPLE_EDGE)
Reducer 8 <- Reducer 7 (SIMPLE_EDGE)
- Reducer 9 <- Map 11 (SIMPLE_EDGE), Map 6 (SIMPLE_EDGE)
+ Reducer 9 <- Reducer 7 (SIMPLE_EDGE)
#### A masked pattern was here ####
Vertices:
Map 1
@@ -1692,7 +1691,7 @@ STAGE PLANS:
value expressions: _col0 (type: int), _col2 (type: string), _col3 (type: string), _col5 (type: int), _col6 (type: string), _col7 (type: double), _col8 (type: string)
Execution mode: llap
LLAP IO: no inputs
- Map 11
+ Map 10
Map Operator Tree:
TableScan
alias: pp
@@ -1714,11 +1713,6 @@ STAGE PLANS:
sort order: ++
Map-reduce partition columns: _col0 (type: string), _col1 (type: string)
Statistics: Num rows: 13 Data size: 2548 Basic stats: COMPLETE Column stats: COMPLETE
- Reduce Output Operator
- key expressions: _col0 (type: string), _col1 (type: string)
- sort order: ++
- Map-reduce partition columns: _col0 (type: string), _col1 (type: string)
- Statistics: Num rows: 13 Data size: 2548 Basic stats: COMPLETE Column stats: COMPLETE
Execution mode: llap
LLAP IO: no inputs
Map 5
@@ -1763,35 +1757,8 @@ STAGE PLANS:
Map-reduce partition columns: _col1 (type: string), _col0 (type: string)
Statistics: Num rows: 26 Data size: 7488 Basic stats: COMPLETE Column stats: COMPLETE
value expressions: _col2 (type: string)
- Reduce Output Operator
- key expressions: _col1 (type: string), _col0 (type: string)
- sort order: ++
- Map-reduce partition columns: _col1 (type: string), _col0 (type: string)
- Statistics: Num rows: 26 Data size: 7488 Basic stats: COMPLETE Column stats: COMPLETE
- value expressions: _col2 (type: string)
Execution mode: llap
LLAP IO: no inputs
- Reducer 10
- Execution mode: llap
- Reduce Operator Tree:
- Group By Operator
- keys: KEY._col0 (type: string), KEY._col1 (type: string)
- mode: mergepartial
- outputColumnNames: _col0, _col1
- Statistics: Num rows: 7 Data size: 1372 Basic stats: COMPLETE Column stats: COMPLETE
- Filter Operator
- predicate: _col0 is not null (type: boolean)
- Statistics: Num rows: 7 Data size: 1372 Basic stats: COMPLETE Column stats: COMPLETE
- Select Operator
- expressions: _col0 (type: string), _col1 (type: string), true (type: boolean)
- outputColumnNames: _col0, _col1, _col2
- Statistics: Num rows: 7 Data size: 1400 Basic stats: COMPLETE Column stats: COMPLETE
- Reduce Output Operator
- key expressions: _col0 (type: string), _col1 (type: string)
- sort order: ++
- Map-reduce partition columns: _col0 (type: string), _col1 (type: string)
- Statistics: Num rows: 7 Data size: 1400 Basic stats: COMPLETE Column stats: COMPLETE
- value expressions: _col2 (type: boolean)
Reducer 2
Execution mode: llap
Reduce Operator Tree:
@@ -1874,6 +1841,16 @@ STAGE PLANS:
Map-reduce partition columns: _col0 (type: string)
Statistics: Num rows: 7 Data size: 840 Basic stats: COMPLETE Column stats: COMPLETE
value expressions: _col1 (type: bigint), _col2 (type: bigint)
+ Group By Operator
+ keys: _col2 (type: string), _col1 (type: string)
+ mode: hash
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 7 Data size: 1372 Basic stats: COMPLETE Column stats: COMPLETE
+ Reduce Output Operator
+ key expressions: _col0 (type: string), _col1 (type: string)
+ sort order: ++
+ Map-reduce partition columns: _col0 (type: string), _col1 (type: string)
+ Statistics: Num rows: 7 Data size: 1372 Basic stats: COMPLETE Column stats: COMPLETE
Reducer 8
Execution mode: llap
Reduce Operator Tree:
@@ -1892,24 +1869,24 @@ STAGE PLANS:
Reducer 9
Execution mode: llap
Reduce Operator Tree:
- Merge Join Operator
- condition map:
- Left Semi Join 0 to 1
- keys:
- 0 _col1 (type: string), _col0 (type: string)
- 1 _col0 (type: string), _col1 (type: string)
- outputColumnNames: _col1, _col2
- Statistics: Num rows: 14 Data size: 2744 Basic stats: COMPLETE Column stats: COMPLETE
- Group By Operator
- keys: _col2 (type: string), _col1 (type: string)
- mode: hash
- outputColumnNames: _col0, _col1
+ Group By Operator
+ keys: KEY._col0 (type: string), KEY._col1 (type: string)
+ mode: mergepartial
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 7 Data size: 1372 Basic stats: COMPLETE Column stats: COMPLETE
+ Filter Operator
+ predicate: _col0 is not null (type: boolean)
Statistics: Num rows: 7 Data size: 1372 Basic stats: COMPLETE Column stats: COMPLETE
- Reduce Output Operator
- key expressions: _col0 (type: string), _col1 (type: string)
- sort order: ++
- Map-reduce partition columns: _col0 (type: string), _col1 (type: string)
- Statistics: Num rows: 7 Data size: 1372 Basic stats: COMPLETE Column stats: COMPLETE
+ Select Operator
+ expressions: _col0 (type: string), _col1 (type: string), true (type: boolean)
+ outputColumnNames: _col0, _col1, _col2
+ Statistics: Num rows: 7 Data size: 1400 Basic stats: COMPLETE Column stats: COMPLETE
+ Reduce Output Operator
+ key expressions: _col0 (type: string), _col1 (type: string)
+ sort order: ++
+ Map-reduce partition columns: _col0 (type: string), _col1 (type: string)
+ Statistics: Num rows: 7 Data size: 1400 Basic stats: COMPLETE Column stats: COMPLETE
+ value expressions: _col2 (type: boolean)
Stage: Stage-0
Fetch Operator
@@ -2143,13 +2120,12 @@ STAGE PLANS:
Tez
#### A masked pattern was here ####
Edges:
- Reducer 10 <- Reducer 9 (SIMPLE_EDGE)
Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 5 (SIMPLE_EDGE)
Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 8 (ONE_TO_ONE_EDGE)
- Reducer 4 <- Reducer 10 (ONE_TO_ONE_EDGE), Reducer 3 (SIMPLE_EDGE)
- Reducer 7 <- Map 11 (SIMPLE_EDGE), Map 6 (SIMPLE_EDGE)
+ Reducer 4 <- Reducer 3 (SIMPLE_EDGE), Reducer 9 (ONE_TO_ONE_EDGE)
+ Reducer 7 <- Map 10 (SIMPLE_EDGE), Map 6 (SIMPLE_EDGE)
Reducer 8 <- Reducer 7 (SIMPLE_EDGE)
- Reducer 9 <- Map 11 (SIMPLE_EDGE), Map 6 (SIMPLE_EDGE)
+ Reducer 9 <- Reducer 7 (SIMPLE_EDGE)
#### A masked pattern was here ####
Vertices:
Map 1
@@ -2172,7 +2148,7 @@ STAGE PLANS:
value expressions: _col0 (type: int), _col2 (type: string), _col3 (type: string), _col5 (type: int), _col7 (type: double), _col8 (type: string)
Execution mode: llap
LLAP IO: no inputs
- Map 11
+ Map 10
Map Operator Tree:
TableScan
alias: pp
@@ -2194,11 +2170,6 @@ STAGE PLANS:
sort order: ++
Map-reduce partition columns: _col0 (type: string), _col1 (type: string)
Statistics: Num rows: 13 Data size: 2548 Basic stats: COMPLETE Column stats: COMPLETE
- Reduce Output Operator
- key expressions: _col0 (type: string), _col1 (type: string)
- sort order: ++
- Map-reduce partition columns: _col0 (type: string), _col1 (type: string)
- Statistics: Num rows: 13 Data size: 2548 Basic stats: COMPLETE Column stats: COMPLETE
Execution mode: llap
LLAP IO: no inputs
Map 5
@@ -2243,35 +2214,8 @@ STAGE PLANS:
Map-reduce partition columns: _col1 (type: string), _col0 (type: string)
Statistics: Num rows: 26 Data size: 7488 Basic stats: COMPLETE Column stats: COMPLETE
value expressions: _col2 (type: string)
- Reduce Output Operator
- key expressions: _col1 (type: string), _col0 (type: string)
- sort order: ++
- Map-reduce partition columns: _col1 (type: string), _col0 (type: string)
- Statistics: Num rows: 26 Data size: 7488 Basic stats: COMPLETE Column stats: COMPLETE
- value expressions: _col2 (type: string)
Execution mode: llap
LLAP IO: no inputs
- Reducer 10
- Execution mode: llap
- Reduce Operator Tree:
- Group By Operator
- keys: KEY._col0 (type: string), KEY._col1 (type: string)
- mode: mergepartial
- outputColumnNames: _col0, _col1
- Statistics: Num rows: 7 Data size: 1372 Basic stats: COMPLETE Column stats: COMPLETE
- Filter Operator
- predicate: _col0 is not null (type: boolean)
- Statistics: Num rows: 7 Data size: 1372 Basic stats: COMPLETE Column stats: COMPLETE
- Select Operator
- expressions: _col0 (type: string), _col1 (type: string), true (type: boolean)
- outputColumnNames: _col0, _col1, _col2
- Statistics: Num rows: 7 Data size: 1400 Basic stats: COMPLETE Column stats: COMPLETE
- Reduce Output Operator
- key expressions: _col0 (type: string), _col1 (type: string)
- sort order: ++
- Map-reduce partition columns: _col0 (type: string), _col1 (type: string)
- Statistics: Num rows: 7 Data size: 1400 Basic stats: COMPLETE Column stats: COMPLETE
- value expressions: _col2 (type: boolean)
Reducer 2
Execution mode: llap
Reduce Operator Tree:
@@ -2354,6 +2298,16 @@ STAGE PLANS:
Map-reduce partition columns: _col0 (type: string)
Statistics: Num rows: 7 Data size: 840 Basic stats: COMPLETE Column stats: COMPLETE
value expressions: _col1 (type: bigint), _col2 (type: bigint)
+ Group By Operator
+ keys: _col2 (type: string), _col1 (type: string)
+ mode: hash
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 7 Data size: 1372 Basic stats: COMPLETE Column stats: COMPLETE
+ Reduce Output Operator
+ key expressions: _col0 (type: string), _col1 (type: string)
+ sort order: ++
+ Map-reduce partition columns: _col0 (type: string), _col1 (type: string)
+ Statistics: Num rows: 7 Data size: 1372 Basic stats: COMPLETE Column stats: COMPLETE
Reducer 8
Execution mode: llap
Reduce Operator Tree:
@@ -2372,24 +2326,24 @@ STAGE PLANS:
Reducer 9
Execution mode: llap
Reduce Operator Tree:
- Merge Join Operator
- condition map:
- Left Semi Join 0 to 1
- keys:
- 0 _col1 (type: string), _col0 (type: string)
- 1 _col0 (type: string), _col1 (type: string)
- outputColumnNames: _col1, _col2
- Statistics: Num rows: 14 Data size: 2744 Basic stats: COMPLETE Column stats: COMPLETE
- Group By Operator
- keys: _col2 (type: string), _col1 (type: string)
- mode: hash
- outputColumnNames: _col0, _col1
+ Group By Operator
+ keys: KEY._col0 (type: string), KEY._col1 (type: string)
+ mode: mergepartial
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 7 Data size: 1372 Basic stats: COMPLETE Column stats: COMPLETE
+ Filter Operator
+ predicate: _col0 is not null (type: boolean)
Statistics: Num rows: 7 Data size: 1372 Basic stats: COMPLETE Column stats: COMPLETE
- Reduce Output Operator
- key expressions: _col0 (type: string), _col1 (type: string)
- sort order: ++
- Map-reduce partition columns: _col0 (type: string), _col1 (type: string)
- Statistics: Num rows: 7 Data size: 1372 Basic stats: COMPLETE Column stats: COMPLETE
+ Select Operator
+ expressions: _col0 (type: string), _col1 (type: string), true (type: boolean)
+ outputColumnNames: _col0, _col1, _col2
+ Statistics: Num rows: 7 Data size: 1400 Basic stats: COMPLETE Column stats: COMPLETE
+ Reduce Output Operator
+ key expressions: _col0 (type: string), _col1 (type: string)
+ sort order: ++
+ Map-reduce partition columns: _col0 (type: string), _col1 (type: string)
+ Statistics: Num rows: 7 Data size: 1400 Basic stats: COMPLETE Column stats: COMPLETE
+ value expressions: _col2 (type: boolean)
Stage: Stage-0
Fetch Operator
@@ -2855,12 +2809,11 @@ STAGE PLANS:
#### A masked pattern was here ####
Edges:
Reducer 10 <- Map 9 (CUSTOM_SIMPLE_EDGE)
- Reducer 11 <- Map 9 (CUSTOM_SIMPLE_EDGE)
Reducer 2 <- Map 1 (SIMPLE_EDGE)
Reducer 3 <- Map 5 (SIMPLE_EDGE), Reducer 2 (ONE_TO_ONE_EDGE)
Reducer 4 <- Reducer 3 (SIMPLE_EDGE), Reducer 8 (SIMPLE_EDGE)
Reducer 7 <- Map 6 (XPROD_EDGE), Reducer 10 (XPROD_EDGE)
- Reducer 8 <- Reducer 11 (SIMPLE_EDGE), Reducer 7 (SIMPLE_EDGE)
+ Reducer 8 <- Reducer 10 (SIMPLE_EDGE), Reducer 7 (SIMPLE_EDGE)
#### A masked pattern was here ####
Vertices:
Map 1
@@ -2939,10 +2892,6 @@ STAGE PLANS:
sort order:
Statistics: Num rows: 1 Data size: 80 Basic stats: COMPLETE Column stats: COMPLETE
value expressions: _col0 (type: struct<count:bigint,sum:double,input:double>)
- Reduce Output Operator
- sort order:
- Statistics: Num rows: 1 Data size: 80 Basic stats: COMPLETE Column stats: COMPLETE
- value expressions: _col0 (type: struct<count:bigint,sum:double,input:double>)
Execution mode: llap
LLAP IO: no inputs
Reducer 10
@@ -2962,14 +2911,6 @@ STAGE PLANS:
sort order:
Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: COMPLETE
value expressions: _col0 (type: bigint), _col1 (type: bigint)
- Reducer 11
- Execution mode: llap
- Reduce Operator Tree:
- Group By Operator
- aggregations: avg(VALUE._col0)
- mode: mergepartial
- outputColumnNames: _col0
- Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
Select Operator
expressions: _col0 (type: double), true (type: boolean)
outputColumnNames: _col0, _col1