You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/10/11 23:10:12 UTC
[17/22] hive git commit: HIVE-14803: S3: Stats gathering for insert
queries can be expensive for partitioned dataset (Rajesh Balamohan via
Pengcheng Xiong)
HIVE-14803: S3: Stats gathering for insert queries can be expensive for partitioned dataset (Rajesh Balamohan via Pengcheng Xiong)
Signed-off-by: Pengcheng Xiong <px...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c52c17b2
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c52c17b2
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c52c17b2
Branch: refs/heads/hive-14535
Commit: c52c17b227671ad1cb44b1fe24134c7ccb4a06e1
Parents: 1876723
Author: Pengcheng Xiong <px...@apache.org>
Authored: Mon Oct 10 10:46:38 2016 -0700
Committer: Pengcheng Xiong <px...@apache.org>
Committed: Mon Oct 10 10:46:38 2016 -0700
----------------------------------------------------------------------
.../apache/hadoop/hive/ql/exec/StatsTask.java | 134 ++++++++++++-------
1 file changed, 82 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/c52c17b2/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
index 9e528b5..51bafc4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
@@ -24,7 +24,13 @@ import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.FileStatus;
@@ -143,10 +149,10 @@ public class StatsTask extends Task<StatsWork> implements Serializable {
StatsAggregator statsAggregator = null;
int ret = 0;
StatsCollectionContext scc = null;
- EnvironmentContext environmentContext = null;
+ final EnvironmentContext environmentContext = new EnvironmentContext();;
try {
// Stats setup:
- Warehouse wh = new Warehouse(conf);
+ final Warehouse wh = new Warehouse(conf);
if (!getWork().getNoStatsAggregator() && !getWork().isNoScanAnalyzeCommand()) {
try {
scc = getContext();
@@ -160,9 +166,8 @@ public class StatsTask extends Task<StatsWork> implements Serializable {
}
List<Partition> partitions = getPartitionsList(db);
- boolean atomic = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_ATOMIC);
-
- String tableFullName = table.getDbName() + "." + table.getTableName();
+ final boolean atomic = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_ATOMIC);
+ final String tableFullName = table.getDbName() + "." + table.getTableName();
if (partitions == null) {
org.apache.hadoop.hive.metastore.api.Table tTable = table.getTTable();
@@ -200,7 +205,6 @@ public class StatsTask extends Task<StatsWork> implements Serializable {
}
// write table stats to metastore
if (!getWork().getNoStatsAggregator()) {
- environmentContext = new EnvironmentContext();
environmentContext.putToProperties(StatsSetupConst.STATS_GENERATED,
StatsSetupConst.TASK);
}
@@ -212,61 +216,87 @@ public class StatsTask extends Task<StatsWork> implements Serializable {
}
LOG.info("Table " + tableFullName + " stats: [" + toString(parameters) + ']');
} else {
+ int poolSize = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 1);
+ final ExecutorService pool = Executors.newFixedThreadPool(poolSize,
+ new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat("stats-updater-thread-%d")
+ .build());
+ final List<Future<Void>> futures = Lists.newLinkedList();
+ final StatsAggregator statsAgg = statsAggregator;
+
// Partitioned table:
// Need to get the old stats of the partition
// and update the table stats based on the old and new stats.
- List<Partition> updates = new ArrayList<Partition>();
- for (Partition partn : partitions) {
- //
- // get the old partition stats
- //
- org.apache.hadoop.hive.metastore.api.Partition tPart = partn.getTPartition();
- Map<String, String> parameters = tPart.getParameters();
- if (work.getTableSpecs() == null && AcidUtils.isAcidTable(table)) {
- StatsSetupConst.setBasicStatsState(parameters, StatsSetupConst.FALSE);
- } else if (work.getTableSpecs() != null
- || (work.getLoadTableDesc() != null && work.getLoadTableDesc().getReplace())
- || (work.getLoadFileDesc() != null && !work.getLoadFileDesc()
- .getDestinationCreateTable().isEmpty())) {
- StatsSetupConst.setBasicStatsState(parameters, StatsSetupConst.TRUE);
- }
- if (!existStats(parameters) && atomic) {
- continue;
- }
-
- // The collectable stats for the aggregator needs to be cleared.
- // For eg. if a file is being loaded, the old number of rows are not valid
- if (work.isClearAggregatorStats()) {
- // we choose to keep the invalid stats and only change the setting.
- StatsSetupConst.setBasicStatsState(parameters, StatsSetupConst.FALSE);
+ final List<Partition> updates = new ArrayList<Partition>();
+ try {
+ for (final Partition partn : partitions) {
+ futures.add(pool.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ //
+ // get the old partition stats
+ //
+ org.apache.hadoop.hive.metastore.api.Partition tPart = partn.getTPartition();
+ Map<String, String> parameters = tPart.getParameters();
+ if (work.getTableSpecs() == null && AcidUtils.isAcidTable(table)) {
+ StatsSetupConst.setBasicStatsState(parameters, StatsSetupConst.FALSE);
+ } else if (work.getTableSpecs() != null
+ || (work.getLoadTableDesc() != null && work.getLoadTableDesc().getReplace())
+ || (work.getLoadFileDesc() != null && !work.getLoadFileDesc()
+ .getDestinationCreateTable().isEmpty())) {
+ StatsSetupConst.setBasicStatsState(parameters, StatsSetupConst.TRUE);
+ }
+ if (!existStats(parameters) && atomic) {
+ return null;
+ }
+
+ // The collectable stats for the aggregator needs to be cleared.
+ // For eg. if a file is being loaded, the old number of rows are not valid
+ if (work.isClearAggregatorStats()) {
+ // we choose to keep the invalid stats and only change the setting.
+ StatsSetupConst.setBasicStatsState(parameters, StatsSetupConst.FALSE);
+ }
+
+ updateQuickStats(wh, parameters, tPart.getSd());
+ if (StatsSetupConst.areBasicStatsUptoDate(parameters)) {
+ if (statsAgg != null) {
+ String prefix = getAggregationPrefix(table, partn);
+ updateStats(statsAgg, parameters, prefix, atomic);
+ }
+ if (!getWork().getNoStatsAggregator()) {
+ environmentContext.putToProperties(StatsSetupConst.STATS_GENERATED,
+ StatsSetupConst.TASK);
+ }
+ }
+ updates.add(new Partition(table, tPart));
+
+ if (conf.getBoolVar(ConfVars.TEZ_EXEC_SUMMARY)) {
+ console.printInfo("Partition " + tableFullName + partn.getSpec() +
+ " stats: [" + StatsTask.toString(parameters) + ']');
+ }
+ LOG.info("Partition " + tableFullName + partn.getSpec() +
+ " stats: [" + StatsTask.toString(parameters) + ']');
+ return null;
+ }
+ }));
}
+ pool.shutdown();
- updateQuickStats(wh, parameters, tPart.getSd());
- if (StatsSetupConst.areBasicStatsUptoDate(parameters)) {
- if (statsAggregator != null) {
- String prefix = getAggregationPrefix(table, partn);
- updateStats(statsAggregator, parameters, prefix, atomic);
- }
- if (!getWork().getNoStatsAggregator()) {
- environmentContext = new EnvironmentContext();
- environmentContext.putToProperties(StatsSetupConst.STATS_GENERATED,
- StatsSetupConst.TASK);
- }
+ for (Future future : futures) {
+ future.get();
}
- updates.add(new Partition(table, tPart));
-
- if (conf.getBoolVar(ConfVars.TEZ_EXEC_SUMMARY)) {
- console.printInfo("Partition " + tableFullName + partn.getSpec() +
- " stats: [" + toString(parameters) + ']');
+ } catch (InterruptedException e) {
+ LOG.debug("Cancelling " + futures.size() + " for partition stats update");
+ //cancel other futures
+ for (Future future : futures) {
+ future.cancel(true);
}
- LOG.info("Partition " + tableFullName + partn.getSpec() +
- " stats: [" + toString(parameters) + ']');
}
if (!updates.isEmpty()) {
- db.alterPartitions(tableFullName, updates, environmentContext);
+ db.alterPartitions(tableFullName, updates, environmentContext);
+ }
}
- }
-
} catch (Exception e) {
console.printInfo("[Warning] could not update stats.",
"Failed with exception " + e.getMessage() + "\n"
@@ -374,7 +404,7 @@ public class StatsTask extends Task<StatsWork> implements Serializable {
MetaStoreUtils.populateQuickStats(partfileStatus, parameters);
}
- private String toString(Map<String, String> parameters) {
+ static String toString(Map<String, String> parameters) {
StringBuilder builder = new StringBuilder();
for (String statType : StatsSetupConst.supportedStats) {
String value = parameters.get(statType);