You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ay...@apache.org on 2022/02/23 15:59:41 UTC

[hive] branch master updated: HIVE-25958: Optimise BasicStatsNoJobTask. (#3037). (Ayush Saxena, reviewed by Rajesh Balamohan)

This is an automated email from the ASF dual-hosted git repository.

ayushsaxena pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new c97a491  HIVE-25958: Optimise BasicStatsNoJobTask. (#3037). (Ayush Saxena, reviewed by  Rajesh Balamohan)
c97a491 is described below

commit c97a4914828986a78e4af6c5c82d162a23097c13
Author: Ayush Saxena <ay...@apache.org>
AuthorDate: Wed Feb 23 21:29:17 2022 +0530

    HIVE-25958: Optimise BasicStatsNoJobTask. (#3037). (Ayush Saxena, reviewed by  Rajesh Balamohan)
---
 .../java/org/apache/hadoop/hive/conf/HiveConf.java |   4 +
 .../hadoop/hive/ql/stats/BasicStatsNoJobTask.java  | 135 ++++++++++++++++++---
 2 files changed, 122 insertions(+), 17 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 3ac60f3..28f309c 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -753,6 +753,10 @@ public class HiveConf extends Configuration {
         "Comma-separated list of statistics publishers to be invoked on counters on each job. \n" +
         "A client stats publisher is specified as the name of a Java class which implements the \n" +
         "org.apache.hadoop.hive.ql.stats.ClientStatsPublisher interface."),
+    BASICSTATSTASKSMAXTHREADSFACTOR("hive.basic.stats.max.threads.factor", 2, "Determines the maximum number of "
+        + "threads that can be used for collection of file level statistics. If the value configured is x, then the "
+        + "maximum number of threads that can be used is x multiplied by the number of available processors.  A value"
+        + " of less than 1, makes stats collection sequential."),
     EXECPARALLEL("hive.exec.parallel", false, "Whether to execute jobs in parallel"),
     EXECPARALLETHREADNUMBER("hive.exec.parallel.thread.number", 8,
         "How many jobs at most can be executed in parallel"),
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/BasicStatsNoJobTask.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/BasicStatsNoJobTask.java
index 919a243..145bdd6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/stats/BasicStatsNoJobTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/BasicStatsNoJobTask.java
@@ -23,7 +23,12 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
@@ -65,6 +70,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableListMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Multimaps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
  * StatsNoJobTask is used in cases where stats collection is the only task for the given query (no
@@ -223,6 +229,18 @@ public class BasicStatsNoJobTask implements IStatsProcessor {
         } else {
           fileList = HiveStatsUtils.getFileStatusRecurse(dir, -1, fs);
         }
+        ThreadPoolExecutor tpE = null;
+        List<Future<FileStats>> futures = null;
+        int numThreadsFactor = HiveConf.getIntVar(jc, HiveConf.ConfVars.BASICSTATSTASKSMAXTHREADSFACTOR);
+        if (fileList.size() > 1 && numThreadsFactor > 0) {
+          int numThreads = Math.min(fileList.size(), numThreadsFactor * Runtime.getRuntime().availableProcessors());
+          ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("Basic-Stats-Thread-%d").build();
+          tpE = new ThreadPoolExecutor(numThreads, numThreads, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),
+              threadFactory);
+          tpE.allowsCoreThreadTimeOut();
+          futures = new ArrayList<>();
+          LOG.info("Processing Stats for {} file using {} threads", fileList.size(), numThreads);
+        }
 
         for (FileStatus file : fileList) {
           Utilities.FILE_OP_LOGGER.debug("Computing stats for {}", file);
@@ -232,28 +250,41 @@ public class BasicStatsNoJobTask implements IStatsProcessor {
             if (file.getLen() == 0) {
               numFiles += 1;
             } else {
-              org.apache.hadoop.mapred.RecordReader<?, ?> recordReader = inputFormat.getRecordReader(dummySplit, jc, Reporter.NULL);
-              try {
-                if (recordReader instanceof StatsProvidingRecordReader) {
-                  StatsProvidingRecordReader statsRR;
-                  statsRR = (StatsProvidingRecordReader) recordReader;
-                  rawDataSize += statsRR.getStats().getRawDataSize();
-                  numRows += statsRR.getStats().getRowCount();
-                  fileSize += file.getLen();
-                  numFiles += 1;
-                  if (file.isErasureCoded()) {
-                    numErasureCodedFiles++;
-                  }
-                } else {
-                  throw new HiveException(String.format("Unexpected file found during reading footers for: %s ", file));
-                }
-              } finally {
-                recordReader.close();
+              FileStatProcessor fsp = new FileStatProcessor(file, inputFormat, dummySplit, jc);
+              if (tpE != null) {
+                futures.add(tpE.submit(fsp));
+              } else {
+                // No parallel processing, just call the method normally & update the stats.
+                FileStats fileStat = fsp.call();
+                rawDataSize += fileStat.getRawDataSize();
+                numRows += fileStat.getNumRows();
+                fileSize += fileStat.getFileSize();
+                numFiles += 1;
+                numErasureCodedFiles += fileStat.getNumErasureCodedFiles();
               }
             }
           }
         }
 
+        if (tpE != null) {
+          try {
+            for (Future<FileStats> future : futures) {
+              FileStats fileStat = future.get();
+              rawDataSize += fileStat.getRawDataSize();
+              numRows += fileStat.getNumRows();
+              fileSize += fileStat.getFileSize();
+              numFiles += 1;
+              numErasureCodedFiles += fileStat.getNumErasureCodedFiles();
+            }
+          } catch (Exception e) {
+            LOG.error("Encountered exception while collecting stats for file lists as {}", fileList, e);
+            // Cancel all the futures in the list & throw the caught exception post that.
+            futures.forEach(x -> x.cancel(true));
+            throw e;
+          } finally {
+            tpE.shutdown();
+          }
+        }
         StatsSetupConst.setBasicStatsState(parameters, StatsSetupConst.TRUE);
 
         parameters.put(StatsSetupConst.ROW_COUNT, String.valueOf(numRows));
@@ -446,4 +477,74 @@ public class BasicStatsNoJobTask implements IStatsProcessor {
   @Override
   public void setDpPartSpecs(Collection<Partition> dpPartSpecs) {
   }
+
+  /**
+   * Utility class to process file level stats in parallel.
+   */
+  private static class FileStatProcessor implements Callable <FileStats> {
+
+    private final InputSplit dummySplit;
+    private final InputFormat<?, ?> inputFormat;
+    private final JobConf jc;
+    private final FileStatus file;
+
+    FileStatProcessor(FileStatus file, InputFormat<?, ?> inputFormat, InputSplit dummySplit, JobConf jc) {
+      this.file = file;
+      this.dummySplit = dummySplit;
+      this.inputFormat = inputFormat;
+      this.jc = jc;
+    }
+
+    @Override
+    public FileStats call() throws Exception {
+      try (org.apache.hadoop.mapred.RecordReader<?, ?> recordReader = inputFormat
+          .getRecordReader(dummySplit, jc, Reporter.NULL)) {
+        if (recordReader instanceof StatsProvidingRecordReader) {
+          StatsProvidingRecordReader statsRR;
+          statsRR = (StatsProvidingRecordReader) recordReader;
+          final FileStats fileStats =
+              new FileStats(statsRR.getStats().getRawDataSize(), statsRR.getStats().getRowCount(), file.getLen(),
+                  file.isErasureCoded());
+          return fileStats;
+        } else {
+          throw new HiveException(String.format("Unexpected file found during reading footers for: %s ", file));
+        }
+      }
+    }
+  }
+
+  /**
+   * Utility class for holding the file level statistics.
+   */
+  private static class FileStats {
+
+    private long numRows = 0;
+    private long rawDataSize = 0;
+    private long fileSize = 0;
+    private long numErasureCodedFiles = 0;
+
+    public FileStats(long rawDataSize, long numRows, long fileSize, boolean isErasureCoded) {
+      this.rawDataSize = rawDataSize;
+      this.numRows = numRows;
+      this.fileSize = fileSize;
+      this.numErasureCodedFiles = isErasureCoded ? 1 : 0;
+    }
+
+    public long getNumRows() {
+      return numRows;
+    }
+
+    public long getRawDataSize() {
+      return rawDataSize;
+    }
+
+    public long getFileSize() {
+      return fileSize;
+    }
+
+    public long getNumErasureCodedFiles() {
+      return numErasureCodedFiles;
+    }
+
+  }
 }