You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by rb...@apache.org on 2019/03/05 03:06:32 UTC
[hive] branch master updated: HIVE-21312:
FSStatsAggregator::connect is slow (Rajesh Balamohan,
reviewed by Zoltan Haindrich)
This is an automated email from the ASF dual-hosted git repository.
rbalamohan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new fc3eefa HIVE-21312: FSStatsAggregator::connect is slow (Rajesh Balamohan, reviewed by Zoltan Haindrich)
fc3eefa is described below
commit fc3eefad503a2cca4796703ff7debf9d2055682b
Author: Rajesh Balamohan <rb...@apache.org>
AuthorDate: Tue Mar 5 08:35:59 2019 +0530
HIVE-21312: FSStatsAggregator::connect is slow (Rajesh Balamohan, reviewed by Zoltan Haindrich)
---
.../hadoop/hive/ql/stats/fs/FSStatsAggregator.java | 65 +++++++++++++++++-----
1 file changed, 50 insertions(+), 15 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsAggregator.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsAggregator.java
index cb5fee4..595735e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsAggregator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsAggregator.java
@@ -21,14 +21,21 @@ package org.apache.hadoop.hive.ql.stats.fs;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.stats.StatsAggregator;
@@ -42,7 +49,6 @@ import com.esotericsoftware.kryo.io.Input;
public class FSStatsAggregator implements StatsAggregator {
private final Logger LOG = LoggerFactory.getLogger(this.getClass().getName());
private List<Map<String,Map<String,String>>> statsList;
- private Map<String, Map<String,String>> statsMap;
private FileSystem fs;
@Override
@@ -51,34 +57,63 @@ public class FSStatsAggregator implements StatsAggregator {
assert statsDirs.size() == 1 : "Found multiple stats dirs: " + statsDirs;
Path statsDir = new Path(statsDirs.get(0));
Utilities.FILE_OP_LOGGER.trace("About to read stats from {}", statsDir);
- statsMap = new HashMap<String, Map<String,String>>();
+ int poolSize = HiveConf.getIntVar(scc.getHiveConf(), HiveConf.ConfVars.HIVE_MOVE_FILES_THREAD_COUNT);
+ // In case thread count is set to 0, use single thread.
+ poolSize = Math.max(poolSize, 1);
+ final ExecutorService pool = Executors.newFixedThreadPool(poolSize,
+ new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat("stats-updater-thread-%d")
+ .build());;
+
+ final List<Future<Map<String, Map<String,String>>>> futureList = new LinkedList<>();
try {
fs = statsDir.getFileSystem(scc.getHiveConf());
- statsList = new ArrayList<Map<String,Map<String,String>>>();
+ statsList = new ArrayList<>();
FileStatus[] status = fs.listStatus(statsDir, new PathFilter() {
@Override
public boolean accept(Path file) {
return file.getName().startsWith(StatsSetupConst.STATS_FILE_PREFIX);
}
});
- for (FileStatus file : status) {
- Utilities.FILE_OP_LOGGER.trace("About to read stats file {} ", file.getPath());
- Input in = new Input(fs.open(file.getPath()));
- Kryo kryo = SerializationUtilities.borrowKryo();
- try {
- statsMap = kryo.readObject(in, statsMap.getClass());
- } finally {
- SerializationUtilities.releaseKryo(kryo);
+ Map<String, Map<String,String>> statsMap = new HashMap<>();
+ for (final FileStatus file : status) {
+ futureList.add(pool.submit(() -> {
+ Kryo kryo = null;
+ try (Input in = new Input(fs.open(file.getPath()))) {
+ kryo = SerializationUtilities.borrowKryo();
+ Map<String, Map<String,String>> stats = kryo.readObject(in, statsMap.getClass());
+ Utilities.FILE_OP_LOGGER.trace("Read stats {}", stats);
+ return stats;
+ } finally {
+ SerializationUtilities.releaseKryo(kryo);
+ }
+ }));
+ }
+ for(Future<Map<String, Map<String,String>>> future : futureList) {
+ Map<String, Map<String,String>> stats = future.get();
+ if (stats != null) {
+ statsList.add(stats);
}
- Utilities.FILE_OP_LOGGER.trace("Read : {}", statsMap);
- statsList.add(statsMap);
- in.close();
}
return true;
- } catch (IOException e) {
+ } catch (IOException | ExecutionException e) {
Utilities.FILE_OP_LOGGER.error("Failed to read stats from filesystem ", e);
+ cancelRunningTasks(futureList);
return false;
+ } catch (InterruptedException e) {
+ cancelRunningTasks(futureList);
+ //reset interrupt state
+ Thread.currentThread().interrupt();
+ } finally {
+ pool.shutdownNow();
+ }
+ return false;
+ }
+
+ private void cancelRunningTasks(List<Future<Map<String, Map<String,String>>>> tasks) {
+ for(Future<Map<String, Map<String,String>>> task: tasks) {
+ task.cancel(true);
}
}