You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2016/10/27 21:23:16 UTC
hive git commit: HIVE-15065: SimpleFetchOptimizer should decide based
on metastore stats when available (Prasanth Jayachandran reviewed by Ashutosh
Chauhan)
Repository: hive
Updated Branches:
refs/heads/master a977c3680 -> d7a43c7a0
HIVE-15065: SimpleFetchOptimizer should decide based on metastore stats when available (Prasanth Jayachandran reviewed by Ashutosh Chauhan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d7a43c7a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d7a43c7a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d7a43c7a
Branch: refs/heads/master
Commit: d7a43c7a0f85d0529430df6e4190ad58072d4bbe
Parents: a977c36
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Thu Oct 27 14:22:48 2016 -0700
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Thu Oct 27 14:22:48 2016 -0700
----------------------------------------------------------------------
.../test/resources/testconfiguration.properties | 1 +
.../hive/ql/optimizer/SimpleFetchOptimizer.java | 169 +++++++++++-------
.../clientpositive/stats_based_fetch_decision.q | 15 ++
.../llap/stats_based_fetch_decision.q.out | 176 +++++++++++++++++++
4 files changed, 300 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/d7a43c7a/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index 4e91452..f30152b 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -558,6 +558,7 @@ minillaplocal.query.files=acid_globallimit.q,\
semijoin.q,\
smb_cache.q,\
special_character_in_tabnames_1.q,\
+ stats_based_fetch_decision.q,\
subquery_notin.q,\
table_access_keys_stats.q,\
tez_bmj_schema_evolution.q,\
http://git-wip-us.apache.org/repos/asf/hive/blob/d7a43c7a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
index 0481110..a68ceb4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
@@ -36,9 +36,10 @@ import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.ql.stats.StatsUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -55,7 +56,6 @@ import org.apache.hadoop.hive.ql.exec.ScriptOperator;
import org.apache.hadoop.hive.ql.exec.SelectOperator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
-import org.apache.hadoop.hive.ql.exec.UDTFOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.io.ContentSummaryInputFormat;
@@ -171,7 +171,7 @@ public class SimpleFetchOptimizer extends Transform {
return true;
}
}
- return data.isDataLengthWitInThreshold(pctx, threshold);
+ return data.isDataLengthWithInThreshold(pctx, threshold);
}
// all we can handle is LimitOperator, FilterOperator SelectOperator and final FS
@@ -321,6 +321,12 @@ public class SimpleFetchOptimizer extends Transform {
return true;
}
+ enum Status {
+ PASS,
+ FAIL,
+ UNAVAILABLE
+ }
+
private class FetchData {
// source table scan
@@ -417,7 +423,7 @@ public class SimpleFetchOptimizer extends Transform {
return replaceFSwithLS(fileSink, work.getSerializationNullFormat());
}
- private boolean isDataLengthWitInThreshold(ParseContext pctx, final long threshold)
+ private boolean isDataLengthWithInThreshold(ParseContext pctx, final long threshold)
throws Exception {
if (splitSample != null && splitSample.getTotalLength() != null) {
if (LOG.isDebugEnabled()) {
@@ -426,74 +432,115 @@ public class SimpleFetchOptimizer extends Transform {
return (threshold - splitSample.getTotalLength()) > 0;
}
- final JobConf jobConf = new JobConf(pctx.getConf());
- Utilities.setColumnNameList(jobConf, scanOp, true);
- Utilities.setColumnTypeList(jobConf, scanOp, true);
- HiveStorageHandler handler = table.getStorageHandler();
- if (handler instanceof InputEstimator) {
- InputEstimator estimator = (InputEstimator) handler;
- TableDesc tableDesc = Utilities.getTableDesc(table);
- PlanUtils.configureInputJobPropertiesForStorageHandler(tableDesc);
- Utilities.copyTableJobPropertiesToConf(tableDesc, jobConf);
- long len = estimator.estimate(jobConf, scanOp, threshold).getTotalLength();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Threshold " + len + " exceeded for pseudoMR mode");
+ Status status = checkThresholdWithMetastoreStats(table, partsList, threshold);
+ if (status.equals(Status.PASS)) {
+ return true;
+ } else if (status.equals(Status.FAIL)) {
+ return false;
+ } else {
+ LOG.info("Cannot fetch stats from metastore for table: {}. Falling back to filesystem scan..",
+ table.getCompleteName());
+ // metastore stats is unavailable, fallback to old way
+ final JobConf jobConf = new JobConf(pctx.getConf());
+ Utilities.setColumnNameList(jobConf, scanOp, true);
+ Utilities.setColumnTypeList(jobConf, scanOp, true);
+ HiveStorageHandler handler = table.getStorageHandler();
+ if (handler instanceof InputEstimator) {
+ InputEstimator estimator = (InputEstimator) handler;
+ TableDesc tableDesc = Utilities.getTableDesc(table);
+ PlanUtils.configureInputJobPropertiesForStorageHandler(tableDesc);
+ Utilities.copyTableJobPropertiesToConf(tableDesc, jobConf);
+ long len = estimator.estimate(jobConf, scanOp, threshold).getTotalLength();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Threshold " + len + " exceeded for pseudoMR mode");
+ }
+ return (threshold - len) > 0;
}
- return (threshold - len) > 0;
- }
- if (table.isNonNative()) {
- return true; // nothing can be done
- }
- if (!table.isPartitioned()) {
- long len = getPathLength(jobConf, table.getPath(), table.getInputFormatClass(), threshold);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Threshold " + len + " exceeded for pseudoMR mode");
+ if (table.isNonNative()) {
+ return true; // nothing can be done
}
- return (threshold - len) > 0;
- }
- final AtomicLong total = new AtomicLong(0);
- //TODO: use common thread pool later?
- int threadCount = HiveConf.getIntVar(pctx.getConf(),
+ if (!table.isPartitioned()) {
+ long len = getPathLength(jobConf, table.getPath(), table.getInputFormatClass(), threshold);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Threshold " + len + " exceeded for pseudoMR mode");
+ }
+ return (threshold - len) > 0;
+ }
+ final AtomicLong total = new AtomicLong(0);
+ //TODO: use common thread pool later?
+ int threadCount = HiveConf.getIntVar(pctx.getConf(),
HiveConf.ConfVars.HIVE_STATS_GATHER_NUM_THREADS);
- final ExecutorService pool = (threadCount > 0) ?
+ final ExecutorService pool = (threadCount > 0) ?
Executors.newFixedThreadPool(threadCount,
- new ThreadFactoryBuilder()
- .setDaemon(true)
- .setNameFormat("SimpleFetchOptimizer-FileLength-%d").build()) : null;
- try {
- List<Future> futures = Lists.newLinkedList();
- for (final Partition partition : partsList.getNotDeniedPartns()) {
- final Path path = partition.getDataLocation();
+ new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat("SimpleFetchOptimizer-FileLength-%d").build()) : null;
+ try {
+ List<Future> futures = Lists.newLinkedList();
+ for (final Partition partition : partsList.getNotDeniedPartns()) {
+ final Path path = partition.getDataLocation();
+ if (pool != null) {
+ futures.add(pool.submit(new Callable<Long>() {
+ @Override
+ public Long call() throws Exception {
+ long len = getPathLength(jobConf, path, partition.getInputFormatClass(), threshold);
+ LOG.trace(path + ", length=" + len);
+ return total.addAndGet(len);
+ }
+ }));
+ } else {
+ total.addAndGet(getPathLength(jobConf, path, partition.getInputFormatClass(), threshold));
+ }
+ }
if (pool != null) {
- futures.add(pool.submit(new Callable<Long>() {
- @Override
- public Long call() throws Exception {
- long len = getPathLength(jobConf, path, partition.getInputFormatClass(), threshold);
- LOG.trace(path + ", length=" + len);
- return total.addAndGet(len);
+ pool.shutdown();
+ for (Future<Long> future : futures) {
+ long totalLen = future.get();
+ if ((threshold - totalLen) <= 0) {
+ // early exit, as getting file lengths can be expensive in object stores.
+ return false;
}
- }));
- } else {
- total.addAndGet(getPathLength(jobConf, path, partition.getInputFormatClass(), threshold));
- }
- }
- if (pool != null) {
- pool.shutdown();
- for (Future<Long> future : futures) {
- long totalLen = future.get();
- if ((threshold - totalLen) <= 0) {
- // early exit, as getting file lengths can be expensive in object stores.
- return false;
}
}
+ return (threshold - total.get()) >= 0;
+ } finally {
+ LOG.info("Data set size=" + total.get() + ", threshold=" + threshold);
+ if (pool != null) {
+ pool.shutdownNow();
+ }
}
- return (threshold - total.get()) >= 0;
- } finally {
- LOG.info("Data set size=" + total.get() + ", threshold=" + threshold);
- if (pool != null) {
- pool.shutdownNow();
+ }
+ }
+
+ // This method gets the basic stats from metastore for table/partitions. This will make use of the statistics from
+ // AnnotateWithStatistics optimizer when available. If execution engine is tez or spark, AnnotateWithStatistics
+ // optimization is applied only during physical compilation because of DPP changing the stats. In such case, we
+ // we will get the basic stats from metastore. When statistics is absent in metastore we will use the fallback of
+ // scanning the filesystem to get file lengths.
+ private Status checkThresholdWithMetastoreStats(final Table table, final PrunedPartitionList partsList,
+ final long threshold) {
+ if (table != null && !table.isPartitioned()) {
+ long dataSize = StatsUtils.getTotalSize(table);
+ if (dataSize <= 0) {
+ LOG.warn("Cannot determine basic stats for table: {} from metastore. Falling back.", table.getCompleteName());
+ return Status.UNAVAILABLE;
}
+
+ return (threshold - dataSize) >= 0 ? Status.PASS : Status.FAIL;
+ } else if (table != null && table.isPartitioned() && partsList != null) {
+ List<Long> dataSizes = StatsUtils.getBasicStatForPartitions(table, partsList.getNotDeniedPartns(),
+ StatsSetupConst.TOTAL_SIZE);
+ long totalDataSize = StatsUtils.getSumIgnoreNegatives(dataSizes);
+ if (totalDataSize <= 0) {
+ LOG.warn("Cannot determine basic stats for partitioned table: {} from metastore. Falling back.",
+ table.getCompleteName());
+ return Status.UNAVAILABLE;
+ }
+
+ return (threshold - totalDataSize) >= 0 ? Status.PASS : Status.FAIL;
}
+
+ return Status.UNAVAILABLE;
}
private long getPathLength(JobConf conf, Path path,
http://git-wip-us.apache.org/repos/asf/hive/blob/d7a43c7a/ql/src/test/queries/clientpositive/stats_based_fetch_decision.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/stats_based_fetch_decision.q b/ql/src/test/queries/clientpositive/stats_based_fetch_decision.q
new file mode 100644
index 0000000..c66cafc
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/stats_based_fetch_decision.q
@@ -0,0 +1,15 @@
+SET hive.fetch.task.conversion=more;
+SET hive.explain.user=false;
+
+-- will not print tez counters as tasks will not be launched
+select * from src where key is null;
+select * from srcpart where key is null;
+explain select * from src where key is null;
+explain select * from srcpart where key is null;
+
+SET hive.fetch.task.conversion.threshold=1000;
+-- will print tez counters as tasks will be launched
+select * from src where key is null;
+select * from srcpart where key is null;
+explain select * from src where key is null;
+explain select * from srcpart where key is null;
http://git-wip-us.apache.org/repos/asf/hive/blob/d7a43c7a/ql/src/test/results/clientpositive/llap/stats_based_fetch_decision.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/stats_based_fetch_decision.q.out b/ql/src/test/results/clientpositive/llap/stats_based_fetch_decision.q.out
new file mode 100644
index 0000000..f61483b
--- /dev/null
+++ b/ql/src/test/results/clientpositive/llap/stats_based_fetch_decision.q.out
@@ -0,0 +1,176 @@
+PREHOOK: query: -- will not print tez counters as tasks will not be launched
+select * from src where key is null
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+#### A masked pattern was here ####
+POSTHOOK: query: -- will not print tez counters as tasks will not be launched
+select * from src where key is null
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+#### A masked pattern was here ####
+PREHOOK: query: select * from srcpart where key is null
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart
+PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+POSTHOOK: query: select * from srcpart where key is null
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart
+POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+PREHOOK: query: explain select * from src where key is null
+PREHOOK: type: QUERY
+POSTHOOK: query: explain select * from src where key is null
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-0 is a root stage
+
+STAGE PLANS:
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+ Processor Tree:
+ TableScan
+ alias: src
+ Filter Operator
+ predicate: key is null (type: boolean)
+ Select Operator
+ expressions: null (type: string), value (type: string)
+ outputColumnNames: _col0, _col1
+ ListSink
+
+PREHOOK: query: explain select * from srcpart where key is null
+PREHOOK: type: QUERY
+POSTHOOK: query: explain select * from srcpart where key is null
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-0 is a root stage
+
+STAGE PLANS:
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+ Processor Tree:
+ TableScan
+ alias: srcpart
+ Filter Operator
+ predicate: key is null (type: boolean)
+ Select Operator
+ expressions: null (type: string), value (type: string), ds (type: string), hr (type: string)
+ outputColumnNames: _col0, _col1, _col2, _col3
+ ListSink
+
+PREHOOK: query: -- will print tez counters as tasks will be launched
+select * from src where key is null
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+#### A masked pattern was here ####
+POSTHOOK: query: -- will print tez counters as tasks will be launched
+select * from src where key is null
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+#### A masked pattern was here ####
+PREHOOK: query: select * from srcpart where key is null
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart
+PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+POSTHOOK: query: select * from srcpart where key is null
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart
+POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+PREHOOK: query: explain select * from src where key is null
+PREHOOK: type: QUERY
+POSTHOOK: query: explain select * from src where key is null
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+ Stage: Stage-1
+ Tez
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
+ alias: src
+ Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+ Filter Operator
+ predicate: key is null (type: boolean)
+ Statistics: Num rows: 1 Data size: 178 Basic stats: COMPLETE Column stats: COMPLETE
+ Select Operator
+ expressions: null (type: string), value (type: string)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 1 Data size: 175 Basic stats: COMPLETE Column stats: COMPLETE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 1 Data size: 175 Basic stats: COMPLETE Column stats: COMPLETE
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ Execution mode: llap
+ LLAP IO: no inputs
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+ Processor Tree:
+ ListSink
+
+PREHOOK: query: explain select * from srcpart where key is null
+PREHOOK: type: QUERY
+POSTHOOK: query: explain select * from srcpart where key is null
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+ Stage: Stage-1
+ Tez
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
+ alias: srcpart
+ Statistics: Num rows: 2000 Data size: 1092000 Basic stats: COMPLETE Column stats: COMPLETE
+ Filter Operator
+ predicate: key is null (type: boolean)
+ Statistics: Num rows: 1 Data size: 546 Basic stats: COMPLETE Column stats: COMPLETE
+ Select Operator
+ expressions: null (type: string), value (type: string), ds (type: string), hr (type: string)
+ outputColumnNames: _col0, _col1, _col2, _col3
+ Statistics: Num rows: 1 Data size: 543 Basic stats: COMPLETE Column stats: COMPLETE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 1 Data size: 543 Basic stats: COMPLETE Column stats: COMPLETE
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ Execution mode: llap
+ LLAP IO: no inputs
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+ Processor Tree:
+ ListSink
+