You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/10/11 23:10:12 UTC

[17/22] hive git commit: HIVE-14803: S3: Stats gathering for insert queries can be expensive for partitioned dataset (Rajesh Balamohan via Pengcheng Xiong)

HIVE-14803: S3: Stats gathering for insert queries can be expensive for partitioned dataset (Rajesh Balamohan via Pengcheng Xiong)

Signed-off-by: Pengcheng Xiong <px...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c52c17b2
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c52c17b2
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c52c17b2

Branch: refs/heads/hive-14535
Commit: c52c17b227671ad1cb44b1fe24134c7ccb4a06e1
Parents: 1876723
Author: Pengcheng Xiong <px...@apache.org>
Authored: Mon Oct 10 10:46:38 2016 -0700
Committer: Pengcheng Xiong <px...@apache.org>
Committed: Mon Oct 10 10:46:38 2016 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hive/ql/exec/StatsTask.java   | 134 ++++++++++++-------
 1 file changed, 82 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/c52c17b2/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
index 9e528b5..51bafc4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
@@ -24,7 +24,13 @@ import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.fs.FileStatus;
@@ -143,10 +149,10 @@ public class StatsTask extends Task<StatsWork> implements Serializable {
     StatsAggregator statsAggregator = null;
     int ret = 0;
     StatsCollectionContext scc = null;
-    EnvironmentContext environmentContext = null;
+    final EnvironmentContext environmentContext = new EnvironmentContext();;
     try {
       // Stats setup:
-      Warehouse wh = new Warehouse(conf);
+      final Warehouse wh = new Warehouse(conf);
       if (!getWork().getNoStatsAggregator() && !getWork().isNoScanAnalyzeCommand()) {
         try {
           scc = getContext();
@@ -160,9 +166,8 @@ public class StatsTask extends Task<StatsWork> implements Serializable {
       }
 
       List<Partition> partitions = getPartitionsList(db);
-      boolean atomic = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_ATOMIC);
-
-      String tableFullName = table.getDbName() + "." + table.getTableName();
+      final boolean atomic = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_ATOMIC);
+      final String tableFullName = table.getDbName() + "." + table.getTableName();
 
       if (partitions == null) {
         org.apache.hadoop.hive.metastore.api.Table tTable = table.getTTable();
@@ -200,7 +205,6 @@ public class StatsTask extends Task<StatsWork> implements Serializable {
           }
           // write table stats to metastore
           if (!getWork().getNoStatsAggregator()) {
-            environmentContext = new EnvironmentContext();
             environmentContext.putToProperties(StatsSetupConst.STATS_GENERATED,
                 StatsSetupConst.TASK);
           }
@@ -212,61 +216,87 @@ public class StatsTask extends Task<StatsWork> implements Serializable {
         }
         LOG.info("Table " + tableFullName + " stats: [" + toString(parameters) + ']');
       } else {
+        int poolSize = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 1);
+        final ExecutorService pool = Executors.newFixedThreadPool(poolSize,
+            new ThreadFactoryBuilder()
+                .setDaemon(true)
+                .setNameFormat("stats-updater-thread-%d")
+                .build());
+        final List<Future<Void>> futures = Lists.newLinkedList();
+        final StatsAggregator statsAgg = statsAggregator;
+
         // Partitioned table:
         // Need to get the old stats of the partition
         // and update the table stats based on the old and new stats.
-        List<Partition> updates = new ArrayList<Partition>();
-        for (Partition partn : partitions) {
-          //
-          // get the old partition stats
-          //
-          org.apache.hadoop.hive.metastore.api.Partition tPart = partn.getTPartition();
-          Map<String, String> parameters = tPart.getParameters();
-          if (work.getTableSpecs() == null && AcidUtils.isAcidTable(table)) {
-            StatsSetupConst.setBasicStatsState(parameters, StatsSetupConst.FALSE);
-          } else if (work.getTableSpecs() != null
-              || (work.getLoadTableDesc() != null && work.getLoadTableDesc().getReplace())
-              || (work.getLoadFileDesc() != null && !work.getLoadFileDesc()
-                  .getDestinationCreateTable().isEmpty())) {
-            StatsSetupConst.setBasicStatsState(parameters, StatsSetupConst.TRUE);
-          }
-          if (!existStats(parameters) && atomic) {
-            continue;
-          }
-
-          // The collectable stats for the aggregator needs to be cleared.
-          // For eg. if a file is being loaded, the old number of rows are not valid
-          if (work.isClearAggregatorStats()) {
-            // we choose to keep the invalid stats and only change the setting.
-            StatsSetupConst.setBasicStatsState(parameters, StatsSetupConst.FALSE);
+        final List<Partition> updates = new ArrayList<Partition>();
+        try {
+          for (final Partition partn : partitions) {
+            futures.add(pool.submit(new Callable<Void>() {
+              @Override
+              public Void call() throws Exception {
+                //
+                // get the old partition stats
+                //
+                org.apache.hadoop.hive.metastore.api.Partition tPart = partn.getTPartition();
+                Map<String, String> parameters = tPart.getParameters();
+                if (work.getTableSpecs() == null && AcidUtils.isAcidTable(table)) {
+                  StatsSetupConst.setBasicStatsState(parameters, StatsSetupConst.FALSE);
+                } else if (work.getTableSpecs() != null
+                    || (work.getLoadTableDesc() != null && work.getLoadTableDesc().getReplace())
+                    || (work.getLoadFileDesc() != null && !work.getLoadFileDesc()
+                    .getDestinationCreateTable().isEmpty())) {
+                  StatsSetupConst.setBasicStatsState(parameters, StatsSetupConst.TRUE);
+                }
+                if (!existStats(parameters) && atomic) {
+                  return null;
+                }
+
+                // The collectable stats for the aggregator needs to be cleared.
+                // For eg. if a file is being loaded, the old number of rows are not valid
+                if (work.isClearAggregatorStats()) {
+                  // we choose to keep the invalid stats and only change the setting.
+                  StatsSetupConst.setBasicStatsState(parameters, StatsSetupConst.FALSE);
+                }
+
+                updateQuickStats(wh, parameters, tPart.getSd());
+                if (StatsSetupConst.areBasicStatsUptoDate(parameters)) {
+                  if (statsAgg != null) {
+                    String prefix = getAggregationPrefix(table, partn);
+                    updateStats(statsAgg, parameters, prefix, atomic);
+                  }
+                  if (!getWork().getNoStatsAggregator()) {
+                    environmentContext.putToProperties(StatsSetupConst.STATS_GENERATED,
+                        StatsSetupConst.TASK);
+                  }
+                }
+                updates.add(new Partition(table, tPart));
+
+                if (conf.getBoolVar(ConfVars.TEZ_EXEC_SUMMARY)) {
+                  console.printInfo("Partition " + tableFullName + partn.getSpec() +
+                      " stats: [" + StatsTask.toString(parameters) + ']');
+                }
+                LOG.info("Partition " + tableFullName + partn.getSpec() +
+                    " stats: [" + StatsTask.toString(parameters) + ']');
+                return null;
+              }
+            }));
           }
+          pool.shutdown();
 
-          updateQuickStats(wh, parameters, tPart.getSd());
-          if (StatsSetupConst.areBasicStatsUptoDate(parameters)) {
-            if (statsAggregator != null) {
-              String prefix = getAggregationPrefix(table, partn);
-              updateStats(statsAggregator, parameters, prefix, atomic);
-            }
-            if (!getWork().getNoStatsAggregator()) {
-              environmentContext = new EnvironmentContext();
-              environmentContext.putToProperties(StatsSetupConst.STATS_GENERATED,
-                  StatsSetupConst.TASK);
-            }
+          for (Future future : futures) {
+            future.get();
           }
-          updates.add(new Partition(table, tPart));
-
-          if (conf.getBoolVar(ConfVars.TEZ_EXEC_SUMMARY)) {
-            console.printInfo("Partition " + tableFullName + partn.getSpec() +
-            " stats: [" + toString(parameters) + ']');
+        } catch (InterruptedException e) {
+          LOG.debug("Cancelling " + futures.size() + " for partition stats update");
+          //cancel other futures
+          for (Future future : futures) {
+            future.cancel(true);
           }
-          LOG.info("Partition " + tableFullName + partn.getSpec() +
-              " stats: [" + toString(parameters) + ']');
         }
         if (!updates.isEmpty()) {
-          db.alterPartitions(tableFullName, updates, environmentContext);
+            db.alterPartitions(tableFullName, updates, environmentContext);
+          }
         }
-      }
-
     } catch (Exception e) {
       console.printInfo("[Warning] could not update stats.",
           "Failed with exception " + e.getMessage() + "\n"
@@ -374,7 +404,7 @@ public class StatsTask extends Task<StatsWork> implements Serializable {
     MetaStoreUtils.populateQuickStats(partfileStatus, parameters);
   }
 
-  private String toString(Map<String, String> parameters) {
+  static String toString(Map<String, String> parameters) {
     StringBuilder builder = new StringBuilder();
     for (String statType : StatsSetupConst.supportedStats) {
       String value = parameters.get(statType);