You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by px...@apache.org on 2016/11/18 22:35:28 UTC

hive git commit: HIVE-14803: S3: Stats gathering for insert queries can be expensive for partitioned dataset (Rajesh Balamohan reviewed by Pengcheng Xiong)

Repository: hive
Updated Branches:
  refs/heads/master cf87b0e24 -> 3baca6cea


HIVE-14803: S3: Stats gathering for insert queries can be expensive for partitioned dataset (Rajesh Balamohan reviewed by Pengcheng Xiong)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/3baca6ce
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/3baca6ce
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/3baca6ce

Branch: refs/heads/master
Commit: 3baca6ceaaff69d2d367cb742251e069a1daaff8
Parents: cf87b0e
Author: Pengcheng Xiong <px...@apache.org>
Authored: Fri Nov 18 14:35:09 2016 -0800
Committer: Pengcheng Xiong <px...@apache.org>
Committed: Fri Nov 18 14:35:09 2016 -0800

----------------------------------------------------------------------
 .../apache/hadoop/hive/ql/exec/StatsTask.java   | 71 +++++++++++++++++++-
 1 file changed, 68 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/3baca6ce/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
index 9e528b5..aa5d914 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
@@ -24,6 +24,11 @@ import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,6 +61,9 @@ import org.apache.hadoop.hive.ql.stats.StatsFactory;
 import org.apache.hadoop.hive.ql.stats.StatsPublisher;
 import org.apache.hadoop.util.StringUtils;
 
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 /**
  * StatsTask implementation. StatsTask mainly deals with "collectable" stats. These are
  * stats that require data scanning and are collected during query execution (unless the user
@@ -146,7 +154,7 @@ public class StatsTask extends Task<StatsWork> implements Serializable {
     EnvironmentContext environmentContext = null;
     try {
       // Stats setup:
-      Warehouse wh = new Warehouse(conf);
+      final Warehouse wh = new Warehouse(conf);
       if (!getWork().getNoStatsAggregator() && !getWork().isNoScanAnalyzeCommand()) {
         try {
           scc = getContext();
@@ -216,6 +224,57 @@ public class StatsTask extends Task<StatsWork> implements Serializable {
         // Need to get the old stats of the partition
         // and update the table stats based on the old and new stats.
         List<Partition> updates = new ArrayList<Partition>();
+
+        //Get the file status up-front for all partitions. Beneficial in cases of blob storage systems
+        final Map<String, FileStatus[]> fileStatusMap = new ConcurrentHashMap<String, FileStatus[]>();
+        int poolSize = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 1);
+        // In case thread count is set to 0, use single thread.
+        poolSize = Math.max(poolSize, 1);
+        final ExecutorService pool = Executors.newFixedThreadPool(poolSize,
+          new ThreadFactoryBuilder().setDaemon(true)
+            .setNameFormat("stats-updater-thread-%d")
+            .build());
+        final List<Future<Void>> futures = Lists.newLinkedList();
+        LOG.debug("Getting file stats of all partitions. threadpool size:" + poolSize);
+        try {
+          for(final Partition partn : partitions) {
+            final String partitionName = partn.getName();
+            final org.apache.hadoop.hive.metastore.api.Partition tPart = partn.getTPartition();
+            Map<String, String> parameters = tPart.getParameters();
+
+            if (!existStats(parameters) && atomic) {
+              continue;
+            }
+            futures.add(pool.submit(new Callable<Void>() {
+              @Override
+              public Void call() throws Exception {
+                FileStatus[] partfileStatus = wh.getFileStatusesForSD(tPart.getSd());
+                fileStatusMap.put(partitionName,  partfileStatus);
+                return null;
+              }
+            }));
+          }
+          pool.shutdown();
+          for(Future<Void> future : futures) {
+            future.get();
+          }
+        } catch (InterruptedException e) {
+          LOG.debug("Cancelling " + futures.size() + " file stats lookup tasks");
+          //cancel other futures
+          for (Future future : futures) {
+            future.cancel(true);
+          }
+          // Fail the query if the stats are supposed to be reliable
+          if (work.isStatsReliable()) {
+            ret = 1;
+          }
+        } finally {
+          if (pool != null) {
+            pool.shutdownNow();
+          }
+          LOG.debug("Finished getting file stats of all partitions");
+        }
+
         for (Partition partn : partitions) {
           //
           // get the old partition stats
@@ -230,7 +289,8 @@ public class StatsTask extends Task<StatsWork> implements Serializable {
                   .getDestinationCreateTable().isEmpty())) {
             StatsSetupConst.setBasicStatsState(parameters, StatsSetupConst.TRUE);
           }
-          if (!existStats(parameters) && atomic) {
+          //only when the stats exist, it is added to fileStatusMap
+          if (!fileStatusMap.containsKey(partn.getName())) {
             continue;
           }
 
@@ -241,7 +301,7 @@ public class StatsTask extends Task<StatsWork> implements Serializable {
             StatsSetupConst.setBasicStatsState(parameters, StatsSetupConst.FALSE);
           }
 
-          updateQuickStats(wh, parameters, tPart.getSd());
+          updateQuickStats(parameters, fileStatusMap.get(partn.getName()));
           if (StatsSetupConst.areBasicStatsUptoDate(parameters)) {
             if (statsAggregator != null) {
               String prefix = getAggregationPrefix(table, partn);
@@ -371,6 +431,11 @@ public class StatsTask extends Task<StatsWork> implements Serializable {
      * calculate fast statistics
      */
     FileStatus[] partfileStatus = wh.getFileStatusesForSD(desc);
+    updateQuickStats(parameters, partfileStatus);
+  }
+
+  private void updateQuickStats(Map<String, String> parameters,
+      FileStatus[] partfileStatus) throws MetaException {
     MetaStoreUtils.populateQuickStats(partfileStatus, parameters);
   }