You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by rb...@apache.org on 2019/03/05 03:06:32 UTC

[hive] branch master updated: HIVE-21312: FSStatsAggregator::connect is slow (Rajesh Balamohan, reviewed by Zoltan Haindrich)

This is an automated email from the ASF dual-hosted git repository.

rbalamohan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new fc3eefa  HIVE-21312: FSStatsAggregator::connect is slow (Rajesh Balamohan, reviewed by Zoltan Haindrich)
fc3eefa is described below

commit fc3eefad503a2cca4796703ff7debf9d2055682b
Author: Rajesh Balamohan <rb...@apache.org>
AuthorDate: Tue Mar 5 08:35:59 2019 +0530

    HIVE-21312: FSStatsAggregator::connect is slow (Rajesh Balamohan, reviewed by Zoltan Haindrich)
---
 .../hadoop/hive/ql/stats/fs/FSStatsAggregator.java | 65 +++++++++++++++++-----
 1 file changed, 50 insertions(+), 15 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsAggregator.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsAggregator.java
index cb5fee4..595735e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsAggregator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsAggregator.java
@@ -21,14 +21,21 @@ package org.apache.hadoop.hive.ql.stats.fs;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.stats.StatsAggregator;
@@ -42,7 +49,6 @@ import com.esotericsoftware.kryo.io.Input;
 public class FSStatsAggregator implements StatsAggregator {
   private final Logger LOG = LoggerFactory.getLogger(this.getClass().getName());
   private List<Map<String,Map<String,String>>> statsList;
-  private Map<String, Map<String,String>> statsMap;
   private FileSystem fs;
 
   @Override
@@ -51,34 +57,63 @@ public class FSStatsAggregator implements StatsAggregator {
     assert statsDirs.size() == 1 : "Found multiple stats dirs: " + statsDirs;
     Path statsDir = new Path(statsDirs.get(0));
     Utilities.FILE_OP_LOGGER.trace("About to read stats from {}", statsDir);
-    statsMap  = new HashMap<String, Map<String,String>>();
+    int poolSize = HiveConf.getIntVar(scc.getHiveConf(), HiveConf.ConfVars.HIVE_MOVE_FILES_THREAD_COUNT);
+    // In case thread count is set to 0, use single thread.
+    poolSize = Math.max(poolSize, 1);
+    final ExecutorService pool = Executors.newFixedThreadPool(poolSize,
+        new ThreadFactoryBuilder().setDaemon(true)
+            .setNameFormat("stats-updater-thread-%d")
+            .build());;
+
+    final List<Future<Map<String, Map<String,String>>>> futureList = new LinkedList<>();
 
     try {
       fs = statsDir.getFileSystem(scc.getHiveConf());
-      statsList = new ArrayList<Map<String,Map<String,String>>>();
+      statsList = new ArrayList<>();
       FileStatus[] status = fs.listStatus(statsDir, new PathFilter() {
         @Override
         public boolean accept(Path file) {
           return file.getName().startsWith(StatsSetupConst.STATS_FILE_PREFIX);
         }
       });
-      for (FileStatus file : status) {
-        Utilities.FILE_OP_LOGGER.trace("About to read stats file {} ", file.getPath());
-        Input in = new Input(fs.open(file.getPath()));
-        Kryo kryo = SerializationUtilities.borrowKryo();
-        try {
-          statsMap = kryo.readObject(in, statsMap.getClass());
-        } finally {
-          SerializationUtilities.releaseKryo(kryo);
+      Map<String, Map<String,String>> statsMap  = new HashMap<>();
+      for (final FileStatus file : status) {
+        futureList.add(pool.submit(() -> {
+            Kryo kryo = null;
+            try (Input in = new Input(fs.open(file.getPath()))) {
+              kryo = SerializationUtilities.borrowKryo();
+              Map<String, Map<String,String>> stats = kryo.readObject(in, statsMap.getClass());
+              Utilities.FILE_OP_LOGGER.trace("Read stats {}", stats);
+              return stats;
+            } finally {
+              SerializationUtilities.releaseKryo(kryo);
+            }
+          }));
+      }
+      for(Future<Map<String, Map<String,String>>> future : futureList) {
+        Map<String, Map<String,String>> stats = future.get();
+        if (stats != null) {
+          statsList.add(stats);
         }
-        Utilities.FILE_OP_LOGGER.trace("Read : {}", statsMap);
-        statsList.add(statsMap);
-        in.close();
       }
       return true;
-    } catch (IOException e) {
+    } catch (IOException | ExecutionException e) {
       Utilities.FILE_OP_LOGGER.error("Failed to read stats from filesystem ", e);
+      cancelRunningTasks(futureList);
       return false;
+    } catch (InterruptedException e) {
+      cancelRunningTasks(futureList);
+      //reset interrupt state
+      Thread.currentThread().interrupt();
+    }  finally {
+      pool.shutdownNow();
+    }
+    return false;
+  }
+
+  private void cancelRunningTasks(List<Future<Map<String, Map<String,String>>>> tasks) {
+    for(Future<Map<String, Map<String,String>>> task: tasks) {
+      task.cancel(true);
     }
   }