You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by px...@apache.org on 2016/11/18 22:35:28 UTC
hive git commit: HIVE-14803: S3: Stats gathering for insert queries
can be expensive for partitioned dataset (Rajesh Balamohan reviewed by
Pengcheng Xiong)
Repository: hive
Updated Branches:
refs/heads/master cf87b0e24 -> 3baca6cea
HIVE-14803: S3: Stats gathering for insert queries can be expensive for partitioned dataset (Rajesh Balamohan reviewed by Pengcheng Xiong)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/3baca6ce
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/3baca6ce
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/3baca6ce
Branch: refs/heads/master
Commit: 3baca6ceaaff69d2d367cb742251e069a1daaff8
Parents: cf87b0e
Author: Pengcheng Xiong <px...@apache.org>
Authored: Fri Nov 18 14:35:09 2016 -0800
Committer: Pengcheng Xiong <px...@apache.org>
Committed: Fri Nov 18 14:35:09 2016 -0800
----------------------------------------------------------------------
.../apache/hadoop/hive/ql/exec/StatsTask.java | 71 +++++++++++++++++++-
1 file changed, 68 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/3baca6ce/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
index 9e528b5..aa5d914 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
@@ -24,6 +24,11 @@ import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,6 +61,9 @@ import org.apache.hadoop.hive.ql.stats.StatsFactory;
import org.apache.hadoop.hive.ql.stats.StatsPublisher;
import org.apache.hadoop.util.StringUtils;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
/**
* StatsTask implementation. StatsTask mainly deals with "collectable" stats. These are
* stats that require data scanning and are collected during query execution (unless the user
@@ -146,7 +154,7 @@ public class StatsTask extends Task<StatsWork> implements Serializable {
EnvironmentContext environmentContext = null;
try {
// Stats setup:
- Warehouse wh = new Warehouse(conf);
+ final Warehouse wh = new Warehouse(conf);
if (!getWork().getNoStatsAggregator() && !getWork().isNoScanAnalyzeCommand()) {
try {
scc = getContext();
@@ -216,6 +224,57 @@ public class StatsTask extends Task<StatsWork> implements Serializable {
// Need to get the old stats of the partition
// and update the table stats based on the old and new stats.
List<Partition> updates = new ArrayList<Partition>();
+
+ //Get the file status up-front for all partitions. Beneficial in cases of blob storage systems
+ final Map<String, FileStatus[]> fileStatusMap = new ConcurrentHashMap<String, FileStatus[]>();
+ int poolSize = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 1);
+ // In case thread count is set to 0, use single thread.
+ poolSize = Math.max(poolSize, 1);
+ final ExecutorService pool = Executors.newFixedThreadPool(poolSize,
+ new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat("stats-updater-thread-%d")
+ .build());
+ final List<Future<Void>> futures = Lists.newLinkedList();
+ LOG.debug("Getting file stats of all partitions. threadpool size:" + poolSize);
+ try {
+ for(final Partition partn : partitions) {
+ final String partitionName = partn.getName();
+ final org.apache.hadoop.hive.metastore.api.Partition tPart = partn.getTPartition();
+ Map<String, String> parameters = tPart.getParameters();
+
+ if (!existStats(parameters) && atomic) {
+ continue;
+ }
+ futures.add(pool.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ FileStatus[] partfileStatus = wh.getFileStatusesForSD(tPart.getSd());
+ fileStatusMap.put(partitionName, partfileStatus);
+ return null;
+ }
+ }));
+ }
+ pool.shutdown();
+ for(Future<Void> future : futures) {
+ future.get();
+ }
+ } catch (InterruptedException e) {
+ LOG.debug("Cancelling " + futures.size() + " file stats lookup tasks");
+ //cancel other futures
+ for (Future future : futures) {
+ future.cancel(true);
+ }
+ // Fail the query if the stats are supposed to be reliable
+ if (work.isStatsReliable()) {
+ ret = 1;
+ }
+ } finally {
+ if (pool != null) {
+ pool.shutdownNow();
+ }
+ LOG.debug("Finished getting file stats of all partitions");
+ }
+
for (Partition partn : partitions) {
//
// get the old partition stats
@@ -230,7 +289,8 @@ public class StatsTask extends Task<StatsWork> implements Serializable {
.getDestinationCreateTable().isEmpty())) {
StatsSetupConst.setBasicStatsState(parameters, StatsSetupConst.TRUE);
}
- if (!existStats(parameters) && atomic) {
+ //only when the stats exist, it is added to fileStatusMap
+ if (!fileStatusMap.containsKey(partn.getName())) {
continue;
}
@@ -241,7 +301,7 @@ public class StatsTask extends Task<StatsWork> implements Serializable {
StatsSetupConst.setBasicStatsState(parameters, StatsSetupConst.FALSE);
}
- updateQuickStats(wh, parameters, tPart.getSd());
+ updateQuickStats(parameters, fileStatusMap.get(partn.getName()));
if (StatsSetupConst.areBasicStatsUptoDate(parameters)) {
if (statsAggregator != null) {
String prefix = getAggregationPrefix(table, partn);
@@ -371,6 +431,11 @@ public class StatsTask extends Task<StatsWork> implements Serializable {
* calculate fast statistics
*/
FileStatus[] partfileStatus = wh.getFileStatusesForSD(desc);
+ updateQuickStats(parameters, partfileStatus);
+ }
+
+ private void updateQuickStats(Map<String, String> parameters,
+ FileStatus[] partfileStatus) throws MetaException {
MetaStoreUtils.populateQuickStats(partfileStatus, parameters);
}