You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/10/28 02:33:18 UTC

[10/11] hive git commit: HIVE-15065: SimpleFetchOptimizer should decide based on metastore stats when available (Prasanth Jayachandran reviewed by Ashutosh Chauhan)

HIVE-15065: SimpleFetchOptimizer should decide based on metastore stats when available (Prasanth Jayachandran reviewed by Ashutosh Chauhan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d7a43c7a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d7a43c7a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d7a43c7a

Branch: refs/heads/hive-14535
Commit: d7a43c7a0f85d0529430df6e4190ad58072d4bbe
Parents: a977c36
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Thu Oct 27 14:22:48 2016 -0700
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Thu Oct 27 14:22:48 2016 -0700

----------------------------------------------------------------------
 .../test/resources/testconfiguration.properties |   1 +
 .../hive/ql/optimizer/SimpleFetchOptimizer.java | 169 +++++++++++-------
 .../clientpositive/stats_based_fetch_decision.q |  15 ++
 .../llap/stats_based_fetch_decision.q.out       | 176 +++++++++++++++++++
 4 files changed, 300 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/d7a43c7a/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index 4e91452..f30152b 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -558,6 +558,7 @@ minillaplocal.query.files=acid_globallimit.q,\
   semijoin.q,\
   smb_cache.q,\
   special_character_in_tabnames_1.q,\
+  stats_based_fetch_decision.q,\
   subquery_notin.q,\
   table_access_keys_stats.q,\
   tez_bmj_schema_evolution.q,\

http://git-wip-us.apache.org/repos/asf/hive/blob/d7a43c7a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
index 0481110..a68ceb4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
@@ -36,9 +36,10 @@ import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.ql.stats.StatsUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -55,7 +56,6 @@ import org.apache.hadoop.hive.ql.exec.ScriptOperator;
 import org.apache.hadoop.hive.ql.exec.SelectOperator;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
-import org.apache.hadoop.hive.ql.exec.UDTFOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.io.ContentSummaryInputFormat;
@@ -171,7 +171,7 @@ public class SimpleFetchOptimizer extends Transform {
         return true;
       }
     }
-    return data.isDataLengthWitInThreshold(pctx, threshold);
+    return data.isDataLengthWithInThreshold(pctx, threshold);
   }
 
   // all we can handle is LimitOperator, FilterOperator SelectOperator and final FS
@@ -321,6 +321,12 @@ public class SimpleFetchOptimizer extends Transform {
     return true;
   }
 
+  enum Status {
+    PASS,
+    FAIL,
+    UNAVAILABLE
+  }
+
   private class FetchData {
 
     // source table scan
@@ -417,7 +423,7 @@ public class SimpleFetchOptimizer extends Transform {
       return replaceFSwithLS(fileSink, work.getSerializationNullFormat());
     }
 
-    private boolean isDataLengthWitInThreshold(ParseContext pctx, final long threshold)
+    private boolean isDataLengthWithInThreshold(ParseContext pctx, final long threshold)
         throws Exception {
       if (splitSample != null && splitSample.getTotalLength() != null) {
         if (LOG.isDebugEnabled()) {
@@ -426,74 +432,115 @@ public class SimpleFetchOptimizer extends Transform {
         return (threshold - splitSample.getTotalLength()) > 0;
       }
 
-      final JobConf jobConf = new JobConf(pctx.getConf());
-      Utilities.setColumnNameList(jobConf, scanOp, true);
-      Utilities.setColumnTypeList(jobConf, scanOp, true);
-      HiveStorageHandler handler = table.getStorageHandler();
-      if (handler instanceof InputEstimator) {
-        InputEstimator estimator = (InputEstimator) handler;
-        TableDesc tableDesc = Utilities.getTableDesc(table);
-        PlanUtils.configureInputJobPropertiesForStorageHandler(tableDesc);
-        Utilities.copyTableJobPropertiesToConf(tableDesc, jobConf);
-        long len = estimator.estimate(jobConf, scanOp, threshold).getTotalLength();
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Threshold " + len + " exceeded for pseudoMR mode");
+      Status status = checkThresholdWithMetastoreStats(table, partsList, threshold);
+      if (status.equals(Status.PASS)) {
+        return true;
+      } else if (status.equals(Status.FAIL)) {
+        return false;
+      } else {
+        LOG.info("Cannot fetch stats from metastore for table: {}. Falling back to filesystem scan..",
+          table.getCompleteName());
+        // metastore stats is unavailable, fallback to old way
+        final JobConf jobConf = new JobConf(pctx.getConf());
+        Utilities.setColumnNameList(jobConf, scanOp, true);
+        Utilities.setColumnTypeList(jobConf, scanOp, true);
+        HiveStorageHandler handler = table.getStorageHandler();
+        if (handler instanceof InputEstimator) {
+          InputEstimator estimator = (InputEstimator) handler;
+          TableDesc tableDesc = Utilities.getTableDesc(table);
+          PlanUtils.configureInputJobPropertiesForStorageHandler(tableDesc);
+          Utilities.copyTableJobPropertiesToConf(tableDesc, jobConf);
+          long len = estimator.estimate(jobConf, scanOp, threshold).getTotalLength();
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Threshold " + len + " exceeded for pseudoMR mode");
+          }
+          return (threshold - len) > 0;
         }
-        return (threshold - len) > 0;
-      }
-      if (table.isNonNative()) {
-        return true; // nothing can be done
-      }
-      if (!table.isPartitioned()) {
-        long len = getPathLength(jobConf, table.getPath(), table.getInputFormatClass(), threshold);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Threshold " + len + " exceeded for pseudoMR mode");
+        if (table.isNonNative()) {
+          return true; // nothing can be done
         }
-        return (threshold - len) > 0;
-      }
-      final AtomicLong total = new AtomicLong(0);
-      //TODO: use common thread pool later?
-      int threadCount = HiveConf.getIntVar(pctx.getConf(),
+        if (!table.isPartitioned()) {
+          long len = getPathLength(jobConf, table.getPath(), table.getInputFormatClass(), threshold);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Threshold " + len + " exceeded for pseudoMR mode");
+          }
+          return (threshold - len) > 0;
+        }
+        final AtomicLong total = new AtomicLong(0);
+        //TODO: use common thread pool later?
+        int threadCount = HiveConf.getIntVar(pctx.getConf(),
           HiveConf.ConfVars.HIVE_STATS_GATHER_NUM_THREADS);
-      final ExecutorService pool = (threadCount > 0) ?
+        final ExecutorService pool = (threadCount > 0) ?
           Executors.newFixedThreadPool(threadCount,
-              new ThreadFactoryBuilder()
-                  .setDaemon(true)
-                  .setNameFormat("SimpleFetchOptimizer-FileLength-%d").build()) : null;
-      try {
-        List<Future> futures = Lists.newLinkedList();
-        for (final Partition partition : partsList.getNotDeniedPartns()) {
-          final Path path = partition.getDataLocation();
+            new ThreadFactoryBuilder()
+              .setDaemon(true)
+              .setNameFormat("SimpleFetchOptimizer-FileLength-%d").build()) : null;
+        try {
+          List<Future> futures = Lists.newLinkedList();
+          for (final Partition partition : partsList.getNotDeniedPartns()) {
+            final Path path = partition.getDataLocation();
+            if (pool != null) {
+              futures.add(pool.submit(new Callable<Long>() {
+                @Override
+                public Long call() throws Exception {
+                  long len = getPathLength(jobConf, path, partition.getInputFormatClass(), threshold);
+                  LOG.trace(path + ", length=" + len);
+                  return total.addAndGet(len);
+                }
+              }));
+            } else {
+              total.addAndGet(getPathLength(jobConf, path, partition.getInputFormatClass(), threshold));
+            }
+          }
           if (pool != null) {
-            futures.add(pool.submit(new Callable<Long>() {
-              @Override
-              public Long call() throws Exception {
-                long len = getPathLength(jobConf, path, partition.getInputFormatClass(), threshold);
-                LOG.trace(path  + ", length=" + len);
-                return total.addAndGet(len);
+            pool.shutdown();
+            for (Future<Long> future : futures) {
+              long totalLen = future.get();
+              if ((threshold - totalLen) <= 0) {
+                // early exit, as getting file lengths can be expensive in object stores.
+                return false;
               }
-            }));
-          } else {
-            total.addAndGet(getPathLength(jobConf, path, partition.getInputFormatClass(), threshold));
-          }
-        }
-        if (pool != null) {
-          pool.shutdown();
-          for (Future<Long> future : futures) {
-            long totalLen = future.get();
-            if ((threshold - totalLen) <= 0) {
-              // early exit, as getting file lengths can be expensive in object stores.
-              return false;
             }
           }
+          return (threshold - total.get()) >= 0;
+        } finally {
+          LOG.info("Data set size=" + total.get() + ", threshold=" + threshold);
+          if (pool != null) {
+            pool.shutdownNow();
+          }
         }
-        return (threshold - total.get()) >= 0;
-      } finally {
-        LOG.info("Data set size=" + total.get() + ", threshold=" + threshold);
-        if (pool != null) {
-          pool.shutdownNow();
+      }
+    }
+
+    // This method gets the basic stats from metastore for table/partitions. This will make use of the statistics from
+    // AnnotateWithStatistics optimizer when available. If execution engine is tez or spark, AnnotateWithStatistics
+    // optimization is applied only during physical compilation because of DPP changing the stats. In such case, we
+    // we will get the basic stats from metastore. When statistics is absent in metastore we will use the fallback of
+    // scanning the filesystem to get file lengths.
+    private Status checkThresholdWithMetastoreStats(final Table table, final PrunedPartitionList partsList,
+      final long threshold) {
+      if (table != null && !table.isPartitioned()) {
+        long dataSize = StatsUtils.getTotalSize(table);
+        if (dataSize <= 0) {
+          LOG.warn("Cannot determine basic stats for table: {} from metastore. Falling back.", table.getCompleteName());
+          return Status.UNAVAILABLE;
         }
+
+        return (threshold - dataSize) >= 0 ? Status.PASS : Status.FAIL;
+      } else if (table != null && table.isPartitioned() && partsList != null) {
+        List<Long> dataSizes = StatsUtils.getBasicStatForPartitions(table, partsList.getNotDeniedPartns(),
+          StatsSetupConst.TOTAL_SIZE);
+        long totalDataSize = StatsUtils.getSumIgnoreNegatives(dataSizes);
+        if (totalDataSize <= 0) {
+          LOG.warn("Cannot determine basic stats for partitioned table: {} from metastore. Falling back.",
+            table.getCompleteName());
+          return Status.UNAVAILABLE;
+        }
+
+        return (threshold - totalDataSize) >= 0 ? Status.PASS : Status.FAIL;
       }
+
+      return Status.UNAVAILABLE;
     }
 
     private long getPathLength(JobConf conf, Path path,

http://git-wip-us.apache.org/repos/asf/hive/blob/d7a43c7a/ql/src/test/queries/clientpositive/stats_based_fetch_decision.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/stats_based_fetch_decision.q b/ql/src/test/queries/clientpositive/stats_based_fetch_decision.q
new file mode 100644
index 0000000..c66cafc
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/stats_based_fetch_decision.q
@@ -0,0 +1,15 @@
+SET hive.fetch.task.conversion=more;
+SET hive.explain.user=false;
+
+-- will not print tez counters as tasks will not be launched
+select * from src where key is null;
+select * from srcpart where key is null;
+explain select * from src where key is null;
+explain select * from srcpart where key is null;
+
+SET hive.fetch.task.conversion.threshold=1000;
+-- will print tez counters as tasks will be launched
+select * from src where key is null;
+select * from srcpart where key is null;
+explain select * from src where key is null;
+explain select * from srcpart where key is null;

http://git-wip-us.apache.org/repos/asf/hive/blob/d7a43c7a/ql/src/test/results/clientpositive/llap/stats_based_fetch_decision.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/stats_based_fetch_decision.q.out b/ql/src/test/results/clientpositive/llap/stats_based_fetch_decision.q.out
new file mode 100644
index 0000000..f61483b
--- /dev/null
+++ b/ql/src/test/results/clientpositive/llap/stats_based_fetch_decision.q.out
@@ -0,0 +1,176 @@
+PREHOOK: query: -- will not print tez counters as tasks will not be launched
+select * from src where key is null
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+#### A masked pattern was here ####
+POSTHOOK: query: -- will not print tez counters as tasks will not be launched
+select * from src where key is null
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+#### A masked pattern was here ####
+PREHOOK: query: select * from srcpart where key is null
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart
+PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+POSTHOOK: query: select * from srcpart where key is null
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart
+POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+PREHOOK: query: explain select * from src where key is null
+PREHOOK: type: QUERY
+POSTHOOK: query: explain select * from src where key is null
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        TableScan
+          alias: src
+          Filter Operator
+            predicate: key is null (type: boolean)
+            Select Operator
+              expressions: null (type: string), value (type: string)
+              outputColumnNames: _col0, _col1
+              ListSink
+
+PREHOOK: query: explain select * from srcpart where key is null
+PREHOOK: type: QUERY
+POSTHOOK: query: explain select * from srcpart where key is null
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        TableScan
+          alias: srcpart
+          Filter Operator
+            predicate: key is null (type: boolean)
+            Select Operator
+              expressions: null (type: string), value (type: string), ds (type: string), hr (type: string)
+              outputColumnNames: _col0, _col1, _col2, _col3
+              ListSink
+
+PREHOOK: query: -- will print tez counters as tasks will be launched
+select * from src where key is null
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+#### A masked pattern was here ####
+POSTHOOK: query: -- will print tez counters as tasks will be launched
+select * from src where key is null
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+#### A masked pattern was here ####
+PREHOOK: query: select * from srcpart where key is null
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcpart
+PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=11
+PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=12
+PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=11
+PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+POSTHOOK: query: select * from srcpart where key is null
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcpart
+POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=11
+POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=12
+POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=11
+POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=12
+#### A masked pattern was here ####
+PREHOOK: query: explain select * from src where key is null
+PREHOOK: type: QUERY
+POSTHOOK: query: explain select * from src where key is null
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Tez
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: src
+                  Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+                  Filter Operator
+                    predicate: key is null (type: boolean)
+                    Statistics: Num rows: 1 Data size: 178 Basic stats: COMPLETE Column stats: COMPLETE
+                    Select Operator
+                      expressions: null (type: string), value (type: string)
+                      outputColumnNames: _col0, _col1
+                      Statistics: Num rows: 1 Data size: 175 Basic stats: COMPLETE Column stats: COMPLETE
+                      File Output Operator
+                        compressed: false
+                        Statistics: Num rows: 1 Data size: 175 Basic stats: COMPLETE Column stats: COMPLETE
+                        table:
+                            input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                            output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                            serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+            Execution mode: llap
+            LLAP IO: no inputs
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: explain select * from srcpart where key is null
+PREHOOK: type: QUERY
+POSTHOOK: query: explain select * from srcpart where key is null
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Tez
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: srcpart
+                  Statistics: Num rows: 2000 Data size: 1092000 Basic stats: COMPLETE Column stats: COMPLETE
+                  Filter Operator
+                    predicate: key is null (type: boolean)
+                    Statistics: Num rows: 1 Data size: 546 Basic stats: COMPLETE Column stats: COMPLETE
+                    Select Operator
+                      expressions: null (type: string), value (type: string), ds (type: string), hr (type: string)
+                      outputColumnNames: _col0, _col1, _col2, _col3
+                      Statistics: Num rows: 1 Data size: 543 Basic stats: COMPLETE Column stats: COMPLETE
+                      File Output Operator
+                        compressed: false
+                        Statistics: Num rows: 1 Data size: 543 Basic stats: COMPLETE Column stats: COMPLETE
+                        table:
+                            input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                            output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                            serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+            Execution mode: llap
+            LLAP IO: no inputs
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+