You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2018/04/19 18:44:35 UTC
hive git commit: HIVE-19009 : Retain and use runtime statistics
during hs2 lifetime (Zoltan Haindrich via Ashutosh Chauhan)
Repository: hive
Updated Branches:
refs/heads/master 046bc646b -> 9f15e22f4
HIVE-19009 : Retain and use runtime statistics during hs2 lifetime (Zoltan Haindrich via Ashutosh Chauhan)
Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9f15e22f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9f15e22f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9f15e22f
Branch: refs/heads/master
Commit: 9f15e22f4aea99891a37aa1e54d490921e6e1174
Parents: 046bc64
Author: Zoltan Haindrich <ki...@rxd.hu>
Authored: Tue Apr 3 08:51:00 2018 -0700
Committer: Ashutosh Chauhan <ha...@apache.org>
Committed: Thu Apr 19 11:44:04 2018 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 11 +-
.../test/resources/testconfiguration.properties | 1 +
.../org/apache/hadoop/hive/ql/QTestUtil.java | 10 +-
.../java/org/apache/hadoop/hive/ql/Context.java | 12 +-
.../java/org/apache/hadoop/hive/ql/Driver.java | 4 +
.../hive/ql/optimizer/physical/Vectorizer.java | 12 +-
.../apache/hadoop/hive/ql/plan/JoinDesc.java | 2 +-
.../hive/ql/plan/mapper/CachingStatsSource.java | 68 +++++++++
.../hive/ql/plan/mapper/EmptyStatsSource.java | 11 ++
.../plan/mapper/SimpleRuntimeStatsSource.java | 6 +
.../hadoop/hive/ql/plan/mapper/StatsSource.java | 5 +-
.../hive/ql/plan/mapper/StatsSources.java | 122 ++++++++++++++++
.../hive/ql/reexec/IReExecutionPlugin.java | 1 +
.../hadoop/hive/ql/reexec/ReExecDriver.java | 20 ++-
.../ql/reexec/ReExecutionOverlayPlugin.java | 4 +
.../hadoop/hive/ql/reexec/ReOptimizePlugin.java | 48 +++++--
.../signature/TestOperatorSignature.java | 9 +-
.../ql/plan/mapping/TestCounterMapping.java | 1 -
.../queries/clientpositive/runtime_stats_hs2.q | 22 +++
.../clientpositive/llap/runtime_stats_hs2.q.out | 141 +++++++++++++++++++
20 files changed, 479 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/9f15e22f/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 73492ff..536c7b4 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -4263,10 +4263,19 @@ public class HiveConf extends Configuration {
"comma separated list of plugin can be used:\n"
+ " overlay: hiveconf subtree 'reexec.overlay' is used as an overlay in case of an execution errors out\n"
+ " reoptimize: collects operator statistics during execution and recompile the query after a failure"),
+ HIVE_QUERY_REEXECUTION_STATS_PERSISTENCE("hive.query.reexecution.stats.persist.scope", "query",
+ new StringSet("query", "hiveserver", "metastore"),
+ "Sets the persistence scope of runtime statistics\n"
+ + " query: runtime statistics are only used during re-execution\n"
+ + " hiveserver: runtime statistics are persisted in the hiveserver - all sessions share it"),
+
HIVE_QUERY_MAX_REEXECUTION_COUNT("hive.query.reexecution.max.count", 1,
"Maximum number of re-executions for a single query."),
HIVE_QUERY_REEXECUTION_ALWAYS_COLLECT_OPERATOR_STATS("hive.query.reexecution.always.collect.operator.stats", false,
- "Used during testing"),
+ "If sessionstats are enabled; this option can be used to collect statistics all the time"),
+ HIVE_QUERY_REEXECUTION_STATS_CACHE_SIZE("hive.query.reexecution.stats.cache.size", 100_000,
+ "Size of the runtime statistics cache. Unit is: OperatorStat entry; a query plan consist ~100"),
+
HIVE_QUERY_RESULTS_CACHE_ENABLED("hive.query.results.cache.enabled", true,
"If the query results cache is enabled. This will keep results of previously executed queries " +
http://git-wip-us.apache.org/repos/asf/hive/blob/9f15e22f/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index d26f0cc..4e7c519 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -480,6 +480,7 @@ minillaplocal.query.files=\
retry_failure.q,\
retry_failure_stat_changes.q,\
retry_failure_oom.q,\
+ runtime_stats_hs2.q,\
bucketsortoptimize_insert_2.q,\
check_constraint.q,\
cbo_gby.q,\
http://git-wip-us.apache.org/repos/asf/hive/blob/9f15e22f/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
----------------------------------------------------------------------
diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
index fad8c0f..88022be 100644
--- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
+++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
@@ -98,6 +98,7 @@ import org.apache.hadoop.hive.ql.parse.ParseDriver;
import org.apache.hadoop.hive.ql.parse.ParseException;
import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.mapper.StatsSources;
import org.apache.hadoop.hive.ql.processors.CommandProcessor;
import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
@@ -208,7 +209,7 @@ public class QTestUtil {
getSrcTables().add(table);
storeSrcTables();
}
-
+
public static Set<String> initSrcTables() {
if (srcTables == null){
initSrcTablesFromSystemProperty();
@@ -1066,13 +1067,14 @@ public class QTestUtil {
clearTablesCreatedDuringTests();
clearUDFsCreatedDuringTests();
clearKeysCreatedInTests();
+ StatsSources.clearAllStats();
}
-
+
protected void clearSettingsCreatedInTests() throws IOException {
getCliDriver().processLine(String.format("set hive.security.authorization.enabled=false;"));
getCliDriver().processLine(String.format("set user.name=%s;",
System.getProperty(TEST_HIVE_USER_PROPERTY, "hive_test_user")));
-
+
getCliDriver().processLine("set hive.metastore.partition.name.whitelist.pattern=;");
getCliDriver().processLine("set hive.test.mode=false;");
getCliDriver().processLine("set hive.mapred.mode=nonstrict;");
@@ -1098,7 +1100,7 @@ public class QTestUtil {
clearTablesCreatedDuringTests();
clearUDFsCreatedDuringTests();
clearKeysCreatedInTests();
-
+
cleanupFromFile();
// delete any contents in the warehouse dir
http://git-wip-us.apache.org/repos/asf/hive/blob/9f15e22f/ql/src/java/org/apache/hadoop/hive/ql/Context.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
index 9ca8b00..70846ac 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
@@ -159,7 +159,7 @@ public class Context {
private boolean isExplainPlan = false;
private PlanMapper planMapper = new PlanMapper();
- private StatsSource runtimeStatsSource;
+ private StatsSource statsSource;
private int executionIndex;
public void setOperation(Operation operation) {
@@ -1047,16 +1047,16 @@ public class Context {
return planMapper;
}
- public void setStatsSource(StatsSource runtimeStatsSource) {
- this.runtimeStatsSource = runtimeStatsSource;
+ public void setStatsSource(StatsSource statsSource) {
+ this.statsSource = statsSource;
}
public StatsSource getStatsSource() {
- if (runtimeStatsSource != null) {
- return runtimeStatsSource;
+ if (statsSource != null) {
+ return statsSource;
} else {
// hierarchical; add def stats also here
- return new EmptyStatsSource();
+ return EmptyStatsSource.INSTANCE;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9f15e22f/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index f761fff..9cb2ff1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -2748,6 +2748,10 @@ public class Driver implements IDriver {
this.statsSource = runtimeStatsSource;
}
+ public StatsSource getStatsSource() {
+ return statsSource;
+ }
+
@Override
public boolean hasResultSet() {
http://git-wip-us.apache.org/repos/asf/hive/blob/9f15e22f/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
index e15c5b7..068f25e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
@@ -161,6 +161,7 @@ import org.apache.hadoop.hive.ql.plan.VectorReduceSinkInfo;
import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc;
import org.apache.hadoop.hive.ql.plan.VectorSelectDesc;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper;
import org.apache.hadoop.hive.ql.plan.ptf.OrderExpressionDef;
import org.apache.hadoop.hive.ql.plan.ptf.PTFExpressionDef;
import org.apache.hadoop.hive.ql.plan.ptf.PartitionedTableFunctionDef;
@@ -375,6 +376,8 @@ public class Vectorizer implements PhysicalPlanResolver {
private Set<VirtualColumn> availableVectorizedVirtualColumnSet = null;
private Set<VirtualColumn> neededVirtualColumnSet = null;
+ private PlanMapper planMapper;
+
public class VectorizerCannotVectorizeException extends Exception {
}
@@ -867,7 +870,7 @@ public class Vectorizer implements PhysicalPlanResolver {
}
private void runDelayedFixups() {
- for (Entry<Operator<? extends OperatorDesc>, Set<ImmutablePair<Operator<? extends OperatorDesc>, Operator<? extends OperatorDesc>>>> delayed
+ for (Entry<Operator<? extends OperatorDesc>, Set<ImmutablePair<Operator<? extends OperatorDesc>, Operator<? extends OperatorDesc>>>> delayed
: delayedFixups.entrySet()) {
Operator<? extends OperatorDesc> key = delayed.getKey();
Set<ImmutablePair<Operator<? extends OperatorDesc>, Operator<? extends OperatorDesc>>> value =
@@ -1470,7 +1473,7 @@ public class Vectorizer implements PhysicalPlanResolver {
enabledConditionsNotMetList.add(HiveConf.ConfVars.HIVE_VECTORIZATION_USE_ROW_DESERIALIZE.varname);
}
}
-
+
return false;
}
@@ -2247,6 +2250,7 @@ public class Vectorizer implements PhysicalPlanResolver {
public PhysicalContext resolve(PhysicalContext physicalContext) throws SemanticException {
hiveConf = physicalContext.getConf();
+ planMapper = physicalContext.getContext().getPlanMapper();
String vectorizationEnabledOverrideString =
HiveConf.getVar(hiveConf,
@@ -2776,7 +2780,7 @@ public class Vectorizer implements PhysicalPlanResolver {
}
if (exprNodeDescList != null) {
ExprNodeDesc exprNodeDesc = exprNodeDescList.get(0);
-
+
if (containsLeadLag(exprNodeDesc)) {
setOperatorIssue("lead and lag function not supported in argument expression of aggregation function " + functionName);
return false;
@@ -5019,6 +5023,8 @@ public class Vectorizer implements PhysicalPlanResolver {
LOG.debug("vectorizeOperator " + vectorOp.getClass().getName());
LOG.debug("vectorizeOperator " + vectorOp.getConf().getClass().getName());
+ // These operators need to be linked to enable runtime statistics to be gathered/used correctly
+ planMapper.link(op, vectorOp);
return vectorOp;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9f15e22f/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java
index 5b7f4c3..e7ca7f6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java
@@ -556,10 +556,10 @@ public class JoinDesc extends AbstractOperatorDesc {
}
protected Map<Integer, String> toCompactString(int[][] filterMap) {
+ filterMap = compactFilter(filterMap);
if (filterMap == null) {
return null;
}
- filterMap = compactFilter(filterMap);
Map<Integer, String> result = new LinkedHashMap<Integer, String>();
for (int i = 0 ; i < filterMap.length; i++) {
if (filterMap[i] == null) {
http://git-wip-us.apache.org/repos/asf/hive/blob/9f15e22f/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/CachingStatsSource.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/CachingStatsSource.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/CachingStatsSource.java
new file mode 100644
index 0000000..c515276
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/CachingStatsSource.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan.mapper;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.optimizer.signature.OpTreeSignature;
+import org.apache.hadoop.hive.ql.stats.OperatorStats;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
+public class CachingStatsSource implements StatsSource {
+
+
+ private final Cache<OpTreeSignature, OperatorStats> cache;
+
+ public CachingStatsSource(HiveConf conf) {
+ int size = conf.getIntVar(ConfVars.HIVE_QUERY_REEXECUTION_STATS_CACHE_SIZE);
+ cache = CacheBuilder.newBuilder().maximumSize(size).build();
+ }
+
+ public void put(OpTreeSignature sig, OperatorStats opStat) {
+ cache.put(sig, opStat);
+ }
+
+ @Override
+ public Optional<OperatorStats> lookup(OpTreeSignature treeSig) {
+ return Optional.ofNullable(cache.getIfPresent(treeSig));
+ }
+
+ @Override
+ public boolean canProvideStatsFor(Class<?> clazz) {
+ if (Operator.class.isAssignableFrom(clazz)) {
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public void putAll(Map<OpTreeSignature, OperatorStats> map) {
+ for (Entry<OpTreeSignature, OperatorStats> entry : map.entrySet()) {
+ put(entry.getKey(), entry.getValue());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9f15e22f/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/EmptyStatsSource.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/EmptyStatsSource.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/EmptyStatsSource.java
index 72092ce..19df13a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/EmptyStatsSource.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/EmptyStatsSource.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.plan.mapper;
+import java.util.Map;
import java.util.Optional;
import org.apache.hadoop.hive.ql.optimizer.signature.OpTreeSignature;
@@ -25,6 +26,11 @@ import org.apache.hadoop.hive.ql.stats.OperatorStats;
public class EmptyStatsSource implements StatsSource {
+ public static StatsSource INSTANCE = new EmptyStatsSource();
+
+ private EmptyStatsSource() {
+ }
+
@Override
public boolean canProvideStatsFor(Class<?> class1) {
return false;
@@ -35,4 +41,9 @@ public class EmptyStatsSource implements StatsSource {
return Optional.empty();
}
+ @Override
+ public void putAll(Map<OpTreeSignature, OperatorStats> map) {
+ throw new RuntimeException("This is an empty source!");
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9f15e22f/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/SimpleRuntimeStatsSource.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/SimpleRuntimeStatsSource.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/SimpleRuntimeStatsSource.java
index b5a3c24..3d6c257 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/SimpleRuntimeStatsSource.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/SimpleRuntimeStatsSource.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.plan.mapper;
import java.util.List;
+import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
@@ -56,4 +57,9 @@ public class SimpleRuntimeStatsSource implements StatsSource {
return false;
}
+ @Override
+ public void putAll(Map<OpTreeSignature, OperatorStats> map) {
+ throw new RuntimeException();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9f15e22f/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/StatsSource.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/StatsSource.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/StatsSource.java
index df5aa0c..e8d51c9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/StatsSource.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/StatsSource.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.plan.mapper;
+import java.util.Map;
import java.util.Optional;
import org.apache.hadoop.hive.ql.optimizer.signature.OpTreeSignature;
@@ -25,8 +26,10 @@ import org.apache.hadoop.hive.ql.stats.OperatorStats;
public interface StatsSource {
- boolean canProvideStatsFor(Class<?> class1);
+ boolean canProvideStatsFor(Class<?> clazz);
Optional<OperatorStats> lookup(OpTreeSignature treeSig);
+ void putAll(Map<OpTreeSignature, OperatorStats> map);
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9f15e22f/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/StatsSources.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/StatsSources.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/StatsSources.java
new file mode 100644
index 0000000..a4e33c3
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/StatsSources.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan.mapper;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.optimizer.signature.OpTreeSignature;
+import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper.EquivGroup;
+import org.apache.hadoop.hive.ql.stats.OperatorStats;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class StatsSources {
+
+ public static class MapBackedStatsSource implements StatsSource {
+
+ private Map<OpTreeSignature, OperatorStats> map = new HashMap<>();
+
+ @Override
+ public boolean canProvideStatsFor(Class<?> clazz) {
+ if (Operator.class.isAssignableFrom(clazz)) {
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public Optional<OperatorStats> lookup(OpTreeSignature treeSig) {
+ return Optional.ofNullable(map.get(treeSig));
+ }
+
+ @Override
+ public void putAll(Map<OpTreeSignature, OperatorStats> map) {
+ map.putAll(map);
+ }
+
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(StatsSources.class);
+
+ public static StatsSource getStatsSourceContaining(StatsSource currentStatsSource, PlanMapper pm) {
+ if (currentStatsSource instanceof CachingStatsSource) {
+ CachingStatsSource sessionStatsSource = (CachingStatsSource) currentStatsSource;
+ loadFromPlanMapper(sessionStatsSource, pm);
+ return sessionStatsSource;
+ } else {
+ return new SimpleRuntimeStatsSource(pm);
+ }
+ }
+
+ public static void loadFromPlanMapper(CachingStatsSource sessionStatsSource, PlanMapper pm) {
+ Map<OpTreeSignature, OperatorStats> map = extractStatMapFromPlanMapper(pm);
+ sessionStatsSource.putAll(map);
+ }
+
+
+ private static Map<OpTreeSignature, OperatorStats> extractStatMapFromPlanMapper(PlanMapper pm) {
+ Map<OpTreeSignature, OperatorStats> map = new HashMap<OpTreeSignature, OperatorStats>();
+ Iterator<EquivGroup> it = pm.iterateGroups();
+ while (it.hasNext()) {
+ EquivGroup e = it.next();
+ List<OperatorStats> stat = e.getAll(OperatorStats.class);
+ List<OpTreeSignature> sig = e.getAll(OpTreeSignature.class);
+
+ if (stat.size() > 1 || sig.size() > 1) {
+ StringBuffer sb = new StringBuffer();
+ sb.append(String.format("expected(stat-sig) 1-1, got {}-{} ;", stat.size(), sig.size()));
+ for (OperatorStats s : stat) {
+ sb.append(s);
+ sb.append(";");
+ }
+ for (OpTreeSignature s : sig) {
+ sb.append(s);
+ sb.append(";");
+ }
+ LOG.debug(sb.toString());
+ }
+ if (stat.size() >= 1 && sig.size() >= 1) {
+ map.put(sig.get(0), stat.get(0));
+ }
+ }
+ return map;
+ }
+
+ private static StatsSource globalStatsSource;
+
+ public static StatsSource globalStatsSource(HiveConf conf) {
+ if (globalStatsSource == null) {
+ globalStatsSource = new CachingStatsSource(conf);
+ }
+ return globalStatsSource;
+ }
+
+ @VisibleForTesting
+ public static void clearAllStats() {
+ globalStatsSource = null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9f15e22f/ql/src/java/org/apache/hadoop/hive/ql/reexec/IReExecutionPlugin.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/reexec/IReExecutionPlugin.java b/ql/src/java/org/apache/hadoop/hive/ql/reexec/IReExecutionPlugin.java
index 2b0d23c..be62fc0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/reexec/IReExecutionPlugin.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/reexec/IReExecutionPlugin.java
@@ -59,6 +59,7 @@ public interface IReExecutionPlugin {
*/
boolean shouldReExecute(int executionNum, PlanMapper oldPlanMapper, PlanMapper newPlanMapper);
+ void afterExecute(PlanMapper planMapper, boolean successfull);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9f15e22f/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecDriver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecDriver.java
index 8a5595d..501f0b4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecDriver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecDriver.java
@@ -156,6 +156,9 @@ public class ReExecDriver implements IDriver {
LOG.info("Execution #{} of query", executionIndex);
CommandProcessorResponse cpr = coreDriver.run();
+ PlanMapper oldPlanMapper = coreDriver.getPlanMapper();
+ afterExecute(oldPlanMapper, cpr.getResponseCode() == 0);
+
boolean shouldReExecute = explainReOptimization && executionIndex==1;
shouldReExecute |= cpr.getResponseCode() != 0 && shouldReExecute();
@@ -164,25 +167,34 @@ public class ReExecDriver implements IDriver {
}
LOG.info("Preparing to re-execute query");
prepareToReExecute();
- PlanMapper oldPlanMapper = coreDriver.getPlanMapper();
CommandProcessorResponse compile_resp = coreDriver.compileAndRespond(currentQuery);
if (compile_resp.failed()) {
+ LOG.error("Recompilation of the query failed; this is unexpected.");
// FIXME: somehow place pointers that re-execution compilation have failed; the query have been successfully compiled before?
return compile_resp;
}
PlanMapper newPlanMapper = coreDriver.getPlanMapper();
if (!explainReOptimization && !shouldReExecuteAfterCompile(oldPlanMapper, newPlanMapper)) {
+ LOG.info("re-running the query would probably not yield better results; returning with last error");
// FIXME: retain old error; or create a new one?
return cpr;
}
}
}
+ private void afterExecute(PlanMapper planMapper, boolean success) {
+ for (IReExecutionPlugin p : plugins) {
+ p.afterExecute(planMapper, success);
+ }
+ }
+
private boolean shouldReExecuteAfterCompile(PlanMapper oldPlanMapper, PlanMapper newPlanMapper) {
boolean ret = false;
for (IReExecutionPlugin p : plugins) {
- ret |= p.shouldReExecute(executionIndex, oldPlanMapper, newPlanMapper);
+ boolean shouldReExecute = p.shouldReExecute(executionIndex, oldPlanMapper, newPlanMapper);
+ LOG.debug("{}.shouldReExecuteAfterCompile = {}", p, shouldReExecute);
+ ret |= shouldReExecute;
}
return ret;
}
@@ -190,7 +202,9 @@ public class ReExecDriver implements IDriver {
private boolean shouldReExecute() {
boolean ret = false;
for (IReExecutionPlugin p : plugins) {
- ret |= p.shouldReExecute(executionIndex);
+ boolean shouldReExecute = p.shouldReExecute(executionIndex);
+ LOG.debug("{}.shouldReExecute = {}", p, shouldReExecute);
+ ret |= shouldReExecute;
}
return ret;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9f15e22f/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecutionOverlayPlugin.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecutionOverlayPlugin.java b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecutionOverlayPlugin.java
index 4ee3c14..950903c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecutionOverlayPlugin.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecutionOverlayPlugin.java
@@ -80,4 +80,8 @@ public class ReExecutionOverlayPlugin implements IReExecutionPlugin {
public void beforeExecute(int executionIndex, boolean explainReOptimization) {
}
+ @Override
+ public void afterExecute(PlanMapper planMapper, boolean success) {
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9f15e22f/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReOptimizePlugin.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReOptimizePlugin.java b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReOptimizePlugin.java
index f731315..409cc73 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReOptimizePlugin.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReOptimizePlugin.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.reexec;
import java.util.Iterator;
import java.util.List;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.exec.Operator;
@@ -29,7 +30,8 @@ import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext;
import org.apache.hadoop.hive.ql.hooks.HookContext;
import org.apache.hadoop.hive.ql.hooks.HookContext.HookType;
import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper;
-import org.apache.hadoop.hive.ql.plan.mapper.SimpleRuntimeStatsSource;
+import org.apache.hadoop.hive.ql.plan.mapper.StatsSource;
+import org.apache.hadoop.hive.ql.plan.mapper.StatsSources;
import org.apache.hadoop.hive.ql.stats.OperatorStatsReaderHook;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,6 +49,8 @@ public class ReOptimizePlugin implements IReExecutionPlugin {
private OperatorStatsReaderHook statsReaderHook;
+ private boolean alwaysCollectStats;
+
class LocalHook implements ExecuteWithHookContext {
@Override
@@ -62,10 +66,10 @@ public class ReOptimizePlugin implements IReExecutionPlugin {
if (message.contains("Vertex failed,") && isOOM) {
retryPossible = true;
}
- System.out.println(exception);
}
}
}
+ LOG.info("ReOptimization: retryPossible: {}", retryPossible);
}
}
}
@@ -77,9 +81,25 @@ public class ReOptimizePlugin implements IReExecutionPlugin {
statsReaderHook = new OperatorStatsReaderHook();
coreDriver.getHookRunner().addOnFailureHook(statsReaderHook);
coreDriver.getHookRunner().addPostHook(statsReaderHook);
- // statsReaderHook.setCollectOnSuccess(true);
- statsReaderHook.setCollectOnSuccess(
- driver.getConf().getBoolVar(ConfVars.HIVE_QUERY_REEXECUTION_ALWAYS_COLLECT_OPERATOR_STATS));
+ alwaysCollectStats = driver.getConf().getBoolVar(ConfVars.HIVE_QUERY_REEXECUTION_ALWAYS_COLLECT_OPERATOR_STATS);
+ statsReaderHook.setCollectOnSuccess(alwaysCollectStats);
+
+ coreDriver.setStatsSource(getStatsSource(driver.getConf()));
+ }
+
+ static enum StatsSourceMode {
+ query, hiveserver;
+ }
+
+ private StatsSource getStatsSource(HiveConf conf) {
+ StatsSourceMode mode = StatsSourceMode.valueOf(conf.getVar(ConfVars.HIVE_QUERY_REEXECUTION_STATS_PERSISTENCE));
+ switch (mode) {
+ case query:
+ return new StatsSources.MapBackedStatsSource();
+ case hiveserver:
+ return StatsSources.globalStatsSource(conf);
+ }
+ throw new RuntimeException("Unknown StatsSource setting: " + mode);
}
@Override
@@ -90,17 +110,19 @@ public class ReOptimizePlugin implements IReExecutionPlugin {
@Override
public void prepareToReExecute() {
statsReaderHook.setCollectOnSuccess(true);
- PlanMapper pm = coreDriver.getContext().getPlanMapper();
- coreDriver.setStatsSource(new SimpleRuntimeStatsSource(pm));
retryPossible = false;
+ coreDriver.setStatsSource(
+ StatsSources.getStatsSourceContaining(coreDriver.getStatsSource(), coreDriver.getPlanMapper()));
}
@Override
public boolean shouldReExecute(int executionNum, PlanMapper oldPlanMapper, PlanMapper newPlanMapper) {
- return planDidChange(oldPlanMapper, newPlanMapper);
+ boolean planDidChange = !planEquals(oldPlanMapper, newPlanMapper);
+ LOG.info("planDidChange: {}", planDidChange);
+ return planDidChange;
}
- private boolean planDidChange(PlanMapper pmL, PlanMapper pmR) {
+ private boolean planEquals(PlanMapper pmL, PlanMapper pmR) {
List<Operator> opsL = getRootOps(pmL);
List<Operator> opsR = getRootOps(pmR);
for (Iterator<Operator> itL = opsL.iterator(); itL.hasNext();) {
@@ -135,4 +157,12 @@ public class ReOptimizePlugin implements IReExecutionPlugin {
}
}
+ @Override
+ public void afterExecute(PlanMapper planMapper, boolean success) {
+ if (alwaysCollectStats) {
+ coreDriver.setStatsSource(
+ StatsSources.getStatsSourceContaining(coreDriver.getStatsSource(), planMapper));
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9f15e22f/ql/src/test/org/apache/hadoop/hive/ql/optimizer/signature/TestOperatorSignature.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/optimizer/signature/TestOperatorSignature.java b/ql/src/test/org/apache/hadoop/hive/ql/optimizer/signature/TestOperatorSignature.java
index 0afc533..b09aafb 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/optimizer/signature/TestOperatorSignature.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/optimizer/signature/TestOperatorSignature.java
@@ -77,12 +77,16 @@ public class TestOperatorSignature {
Operator<TableScanDesc> ts = getTsOp(i);
Operator<? extends OperatorDesc> fil = getFilterOp(j);
- ts.getChildOperators().add(fil);
- fil.getParentOperators().add(ts);
+ connectOperators(ts, fil);
return fil;
}
+ private void connectOperators(Operator<?> parent, Operator<?> child) {
+ parent.getChildOperators().add(child);
+ child.getParentOperators().add(parent);
+ }
+
@Test
public void testTableScand() {
Operator<TableScanDesc> t1 = getTsOp(3);
@@ -157,4 +161,5 @@ public class TestOperatorSignature {
}
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9f15e22f/ql/src/test/org/apache/hadoop/hive/ql/plan/mapping/TestCounterMapping.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/plan/mapping/TestCounterMapping.java b/ql/src/test/org/apache/hadoop/hive/ql/plan/mapping/TestCounterMapping.java
index 18aeb33..8126970 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/plan/mapping/TestCounterMapping.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/plan/mapping/TestCounterMapping.java
@@ -172,7 +172,6 @@ public class TestCounterMapping {
}
private static IDriver createDriver() {
- // HiveConf conf = new HiveConf(Driver.class);
HiveConf conf = env_setup.getTestCtx().hiveConf;
conf.setBoolVar(ConfVars.HIVE_QUERY_REEXECUTION_ENABLED, true);
conf.setBoolVar(ConfVars.HIVE_QUERY_REEXECUTION_ALWAYS_COLLECT_OPERATOR_STATS, true);
http://git-wip-us.apache.org/repos/asf/hive/blob/9f15e22f/ql/src/test/queries/clientpositive/runtime_stats_hs2.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/runtime_stats_hs2.q b/ql/src/test/queries/clientpositive/runtime_stats_hs2.q
new file mode 100644
index 0000000..34a8dd3
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/runtime_stats_hs2.q
@@ -0,0 +1,22 @@
+
+create table tx(a int,u int);
+insert into tx values (1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(10,10);
+
+create table px(a int,p int);
+insert into px values (2,2),(3,3),(5,5),(7,7),(11,11);
+
+set hive.explain.user=true;
+set hive.query.reexecution.enabled=true;
+set hive.query.reexecution.always.collect.operator.stats=true;
+set hive.query.reexecution.strategies=overlay,reoptimize;
+set hive.query.reexecution.stats.persist.scope=hiveserver;
+
+-- join output estimate is underestimated: 1 row
+explain
+select sum(u*p) from tx join px on (u=p) where u<10 and p>2;
+
+select sum(u*p) from tx join px on (u=p) where u<10 and p>2;
+
+-- join output estimate is 3 rows ; all the operators stats are "runtime"
+explain
+select sum(u*p) from tx join px on (u=p) where u<10 and p>2;
http://git-wip-us.apache.org/repos/asf/hive/blob/9f15e22f/ql/src/test/results/clientpositive/llap/runtime_stats_hs2.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/runtime_stats_hs2.q.out b/ql/src/test/results/clientpositive/llap/runtime_stats_hs2.q.out
new file mode 100644
index 0000000..4d60b8c
--- /dev/null
+++ b/ql/src/test/results/clientpositive/llap/runtime_stats_hs2.q.out
@@ -0,0 +1,141 @@
+PREHOOK: query: create table tx(a int,u int)
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@tx
+POSTHOOK: query: create table tx(a int,u int)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@tx
+PREHOOK: query: insert into tx values (1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(10,10)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@tx
+POSTHOOK: query: insert into tx values (1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(10,10)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@tx
+POSTHOOK: Lineage: tx.a SCRIPT []
+POSTHOOK: Lineage: tx.u SCRIPT []
+PREHOOK: query: create table px(a int,p int)
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@px
+POSTHOOK: query: create table px(a int,p int)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@px
+PREHOOK: query: insert into px values (2,2),(3,3),(5,5),(7,7),(11,11)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@px
+POSTHOOK: query: insert into px values (2,2),(3,3),(5,5),(7,7),(11,11)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@px
+POSTHOOK: Lineage: px.a SCRIPT []
+POSTHOOK: Lineage: px.p SCRIPT []
+PREHOOK: query: explain
+select sum(u*p) from tx join px on (u=p) where u<10 and p>2
+PREHOOK: type: QUERY
+POSTHOOK: query: explain
+select sum(u*p) from tx join px on (u=p) where u<10 and p>2
+POSTHOOK: type: QUERY
+Plan optimized by CBO.
+
+Vertex dependency in root stage
+Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE)
+Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)
+
+Stage-0
+ Fetch Operator
+ limit:-1
+ Stage-1
+ Reducer 3 llap
+ File Output Operator [FS_15]
+ Group By Operator [GBY_13] (rows=1 width=8)
+ Output:["_col0"],aggregations:["sum(VALUE._col0)"]
+ <-Reducer 2 [CUSTOM_SIMPLE_EDGE] llap
+ PARTITION_ONLY_SHUFFLE [RS_12]
+ Group By Operator [GBY_11] (rows=1 width=8)
+ Output:["_col0"],aggregations:["sum(_col0)"]
+ Select Operator [SEL_9] (rows=1 width=8)
+ Output:["_col0"]
+ Merge Join Operator [MERGEJOIN_20] (rows=1 width=8)
+ Conds:RS_6._col0=RS_7._col0(Inner),Output:["_col0","_col1"]
+ <-Map 1 [SIMPLE_EDGE] llap
+ SHUFFLE [RS_6]
+ PartitionCols:_col0
+ Select Operator [SEL_2] (rows=1 width=4)
+ Output:["_col0"]
+ Filter Operator [FIL_18] (rows=1 width=4)
+ predicate:((u < 10) and (u > 2))
+ TableScan [TS_0] (rows=8 width=4)
+ default@tx,tx,Tbl:COMPLETE,Col:COMPLETE,Output:["u"]
+ <-Map 4 [SIMPLE_EDGE] llap
+ SHUFFLE [RS_7]
+ PartitionCols:_col0
+ Select Operator [SEL_5] (rows=1 width=4)
+ Output:["_col0"]
+ Filter Operator [FIL_19] (rows=1 width=4)
+ predicate:((p < 10) and (p > 2))
+ TableScan [TS_3] (rows=5 width=4)
+ default@px,px,Tbl:COMPLETE,Col:COMPLETE,Output:["p"]
+
+PREHOOK: query: select sum(u*p) from tx join px on (u=p) where u<10 and p>2
+PREHOOK: type: QUERY
+PREHOOK: Input: default@px
+PREHOOK: Input: default@tx
+#### A masked pattern was here ####
+POSTHOOK: query: select sum(u*p) from tx join px on (u=p) where u<10 and p>2
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@px
+POSTHOOK: Input: default@tx
+#### A masked pattern was here ####
+83
+PREHOOK: query: explain
+select sum(u*p) from tx join px on (u=p) where u<10 and p>2
+PREHOOK: type: QUERY
+POSTHOOK: query: explain
+select sum(u*p) from tx join px on (u=p) where u<10 and p>2
+POSTHOOK: type: QUERY
+Plan optimized by CBO.
+
+Vertex dependency in root stage
+Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE)
+Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)
+
+Stage-0
+ Fetch Operator
+ limit:-1
+ Stage-1
+ Reducer 3 llap
+ File Output Operator [FS_15]
+ Group By Operator [GBY_13] (runtime: rows=1 width=8)
+ Output:["_col0"],aggregations:["sum(VALUE._col0)"]
+ <-Reducer 2 [CUSTOM_SIMPLE_EDGE] llap
+ PARTITION_ONLY_SHUFFLE [RS_12]
+ Group By Operator [GBY_11] (runtime: rows=1 width=8)
+ Output:["_col0"],aggregations:["sum(_col0)"]
+ Select Operator [SEL_9] (runtime: rows=3 width=8)
+ Output:["_col0"]
+ Merge Join Operator [MERGEJOIN_20] (runtime: rows=3 width=8)
+ Conds:RS_6._col0=RS_7._col0(Inner),Output:["_col0","_col1"]
+ <-Map 1 [SIMPLE_EDGE] llap
+ SHUFFLE [RS_6]
+ PartitionCols:_col0
+ Select Operator [SEL_2] (runtime: rows=5 width=4)
+ Output:["_col0"]
+ Filter Operator [FIL_18] (runtime: rows=5 width=4)
+ predicate:((u < 10) and (u > 2))
+ TableScan [TS_0] (runtime: rows=8 width=4)
+ default@tx,tx,Tbl:COMPLETE,Col:COMPLETE,Output:["u"]
+ <-Map 4 [SIMPLE_EDGE] llap
+ SHUFFLE [RS_7]
+ PartitionCols:_col0
+ Select Operator [SEL_5] (runtime: rows=3 width=4)
+ Output:["_col0"]
+ Filter Operator [FIL_19] (runtime: rows=3 width=4)
+ predicate:((p < 10) and (p > 2))
+ TableScan [TS_3] (runtime: rows=5 width=4)
+ default@px,px,Tbl:COMPLETE,Col:COMPLETE,Output:["p"]
+