You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ay...@apache.org on 2022/02/23 15:59:41 UTC
[hive] branch master updated: HIVE-25958: Optimise BasicStatsNoJobTask. (#3037). (Ayush Saxena, reviewed by Rajesh Balamohan)
This is an automated email from the ASF dual-hosted git repository.
ayushsaxena pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new c97a491 HIVE-25958: Optimise BasicStatsNoJobTask. (#3037). (Ayush Saxena, reviewed by Rajesh Balamohan)
c97a491 is described below
commit c97a4914828986a78e4af6c5c82d162a23097c13
Author: Ayush Saxena <ay...@apache.org>
AuthorDate: Wed Feb 23 21:29:17 2022 +0530
HIVE-25958: Optimise BasicStatsNoJobTask. (#3037). (Ayush Saxena, reviewed by Rajesh Balamohan)
---
.../java/org/apache/hadoop/hive/conf/HiveConf.java | 4 +
.../hadoop/hive/ql/stats/BasicStatsNoJobTask.java | 135 ++++++++++++++++++---
2 files changed, 122 insertions(+), 17 deletions(-)
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 3ac60f3..28f309c 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -753,6 +753,10 @@ public class HiveConf extends Configuration {
"Comma-separated list of statistics publishers to be invoked on counters on each job. \n" +
"A client stats publisher is specified as the name of a Java class which implements the \n" +
"org.apache.hadoop.hive.ql.stats.ClientStatsPublisher interface."),
+ BASICSTATSTASKSMAXTHREADSFACTOR("hive.basic.stats.max.threads.factor", 2, "Determines the maximum number of "
+ + "threads that can be used for collection of file level statistics. If the value configured is x, then the "
+ + "maximum number of threads that can be used is x multiplied by the number of available processors. A value"
+ + " of less than 1, makes stats collection sequential."),
EXECPARALLEL("hive.exec.parallel", false, "Whether to execute jobs in parallel"),
EXECPARALLETHREADNUMBER("hive.exec.parallel.thread.number", 8,
"How many jobs at most can be executed in parallel"),
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/BasicStatsNoJobTask.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/BasicStatsNoJobTask.java
index 919a243..145bdd6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/stats/BasicStatsNoJobTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/BasicStatsNoJobTask.java
@@ -23,7 +23,12 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -65,6 +70,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimaps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* StatsNoJobTask is used in cases where stats collection is the only task for the given query (no
@@ -223,6 +229,18 @@ public class BasicStatsNoJobTask implements IStatsProcessor {
} else {
fileList = HiveStatsUtils.getFileStatusRecurse(dir, -1, fs);
}
+ ThreadPoolExecutor tpE = null;
+ List<Future<FileStats>> futures = null;
+ int numThreadsFactor = HiveConf.getIntVar(jc, HiveConf.ConfVars.BASICSTATSTASKSMAXTHREADSFACTOR);
+ if (fileList.size() > 1 && numThreadsFactor > 0) {
+ int numThreads = Math.min(fileList.size(), numThreadsFactor * Runtime.getRuntime().availableProcessors());
+ ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("Basic-Stats-Thread-%d").build();
+ tpE = new ThreadPoolExecutor(numThreads, numThreads, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),
+ threadFactory);
+ tpE.allowsCoreThreadTimeOut();
+ futures = new ArrayList<>();
+ LOG.info("Processing Stats for {} file using {} threads", fileList.size(), numThreads);
+ }
for (FileStatus file : fileList) {
Utilities.FILE_OP_LOGGER.debug("Computing stats for {}", file);
@@ -232,28 +250,41 @@ public class BasicStatsNoJobTask implements IStatsProcessor {
if (file.getLen() == 0) {
numFiles += 1;
} else {
- org.apache.hadoop.mapred.RecordReader<?, ?> recordReader = inputFormat.getRecordReader(dummySplit, jc, Reporter.NULL);
- try {
- if (recordReader instanceof StatsProvidingRecordReader) {
- StatsProvidingRecordReader statsRR;
- statsRR = (StatsProvidingRecordReader) recordReader;
- rawDataSize += statsRR.getStats().getRawDataSize();
- numRows += statsRR.getStats().getRowCount();
- fileSize += file.getLen();
- numFiles += 1;
- if (file.isErasureCoded()) {
- numErasureCodedFiles++;
- }
- } else {
- throw new HiveException(String.format("Unexpected file found during reading footers for: %s ", file));
- }
- } finally {
- recordReader.close();
+ FileStatProcessor fsp = new FileStatProcessor(file, inputFormat, dummySplit, jc);
+ if (tpE != null) {
+ futures.add(tpE.submit(fsp));
+ } else {
+ // No parallel processing, just call the method normally & update the stats.
+ FileStats fileStat = fsp.call();
+ rawDataSize += fileStat.getRawDataSize();
+ numRows += fileStat.getNumRows();
+ fileSize += fileStat.getFileSize();
+ numFiles += 1;
+ numErasureCodedFiles += fileStat.getNumErasureCodedFiles();
}
}
}
}
+ if (tpE != null) {
+ try {
+ for (Future<FileStats> future : futures) {
+ FileStats fileStat = future.get();
+ rawDataSize += fileStat.getRawDataSize();
+ numRows += fileStat.getNumRows();
+ fileSize += fileStat.getFileSize();
+ numFiles += 1;
+ numErasureCodedFiles += fileStat.getNumErasureCodedFiles();
+ }
+ } catch (Exception e) {
+ LOG.error("Encountered exception while collecting stats for file lists as {}", fileList, e);
+ // Cancel all the futures in the list & throw the caught exception post that.
+ futures.forEach(x -> x.cancel(true));
+ throw e;
+ } finally {
+ tpE.shutdown();
+ }
+ }
StatsSetupConst.setBasicStatsState(parameters, StatsSetupConst.TRUE);
parameters.put(StatsSetupConst.ROW_COUNT, String.valueOf(numRows));
@@ -446,4 +477,74 @@ public class BasicStatsNoJobTask implements IStatsProcessor {
@Override
public void setDpPartSpecs(Collection<Partition> dpPartSpecs) {
}
+
+ /**
+ * Utility class to process file level stats in parallel.
+ */
+ private static class FileStatProcessor implements Callable <FileStats> {
+
+ private final InputSplit dummySplit;
+ private final InputFormat<?, ?> inputFormat;
+ private final JobConf jc;
+ private final FileStatus file;
+
+ FileStatProcessor(FileStatus file, InputFormat<?, ?> inputFormat, InputSplit dummySplit, JobConf jc) {
+ this.file = file;
+ this.dummySplit = dummySplit;
+ this.inputFormat = inputFormat;
+ this.jc = jc;
+ }
+
+ @Override
+ public FileStats call() throws Exception {
+ try (org.apache.hadoop.mapred.RecordReader<?, ?> recordReader = inputFormat
+ .getRecordReader(dummySplit, jc, Reporter.NULL)) {
+ if (recordReader instanceof StatsProvidingRecordReader) {
+ StatsProvidingRecordReader statsRR;
+ statsRR = (StatsProvidingRecordReader) recordReader;
+ final FileStats fileStats =
+ new FileStats(statsRR.getStats().getRawDataSize(), statsRR.getStats().getRowCount(), file.getLen(),
+ file.isErasureCoded());
+ return fileStats;
+ } else {
+ throw new HiveException(String.format("Unexpected file found during reading footers for: %s ", file));
+ }
+ }
+ }
+ }
+
+ /**
+ * Utility class for holding the file level statistics.
+ */
+ private static class FileStats {
+
+ private long numRows = 0;
+ private long rawDataSize = 0;
+ private long fileSize = 0;
+ private long numErasureCodedFiles = 0;
+
+ public FileStats(long rawDataSize, long numRows, long fileSize, boolean isErasureCoded) {
+ this.rawDataSize = rawDataSize;
+ this.numRows = numRows;
+ this.fileSize = fileSize;
+ this.numErasureCodedFiles = isErasureCoded ? 1 : 0;
+ }
+
+ public long getNumRows() {
+ return numRows;
+ }
+
+ public long getRawDataSize() {
+ return rawDataSize;
+ }
+
+ public long getFileSize() {
+ return fileSize;
+ }
+
+ public long getNumErasureCodedFiles() {
+ return numErasureCodedFiles;
+ }
+
+ }
}