You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by mu...@apache.org on 2020/12/02 00:48:11 UTC
[hive] branch master updated: HIVE-24380: NullScanTaskDispatcher
should liststatus in parallel (Mustafa Iman, reviewed by Rajesh Balamohan)
This is an automated email from the ASF dual-hosted git repository.
mustafaiman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 2597088 HIVE-24380: NullScanTaskDispatcher should liststatus in parallel (Mustafa Iman, reviewed by Rajesh Balamohan)
2597088 is described below
commit 25970885d5f401171e0e1e30b48b0f3aaf86aaed
Author: Mustafa Iman <mu...@gmail.com>
AuthorDate: Tue Nov 3 00:06:23 2020 -0800
HIVE-24380: NullScanTaskDispatcher should liststatus in parallel (Mustafa Iman, reviewed by Rajesh Balamohan)
Change-Id: I033dec5a74224ebc1fcd7d1cad4ab718d872d229
---
.../optimizer/physical/NullScanTaskDispatcher.java | 43 ++++++++++++++++++++--
.../physical/TestNullScanTaskDispatcher.java | 5 +++
2 files changed, 45 insertions(+), 3 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanTaskDispatcher.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanTaskDispatcher.java
index a224bb9..e43fbd3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanTaskDispatcher.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanTaskDispatcher.java
@@ -29,10 +29,16 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.Stack;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.common.StringInternUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
@@ -70,6 +76,7 @@ public class NullScanTaskDispatcher implements SemanticDispatcher {
static final Logger LOG =
LoggerFactory.getLogger(NullScanTaskDispatcher.class);
+ private final int maxAsyncLookupCount;
private final PhysicalContext physicalContext;
private final Map<SemanticRule, SemanticNodeProcessor> rules;
@@ -78,6 +85,8 @@ public class NullScanTaskDispatcher implements SemanticDispatcher {
super();
this.physicalContext = context;
this.rules = rules;
+ this.maxAsyncLookupCount = physicalContext.getConf()
+ .getIntVar(HiveConf.ConfVars.HIVE_COMPUTE_SPLITS_NUM_THREADS);
}
private String getAliasForTableScanOperator(MapWork work,
@@ -102,8 +111,8 @@ public class NullScanTaskDispatcher implements SemanticDispatcher {
}
}
- private void processPath(MapWork work, Path path, Collection<String> aliasesToOptimize,
- boolean isEmpty) {
+ private synchronized void processPath(MapWork work, Path path, Collection<String> aliasesToOptimize,
+ boolean isEmpty) {
PartitionDesc partDesc = work.getPathToPartitionInfo().get(path).clone();
partDesc.setInputFileFormatClass(isEmpty ? ZeroRowsInputFormat.class : OneNullRowInputFormat.class);
partDesc.setOutputFileFormatClass(HiveIgnoreKeyTextOutputFormat.class);
@@ -128,6 +137,7 @@ public class NullScanTaskDispatcher implements SemanticDispatcher {
Map<Path, Collection<String>> candidatePathsToAliases = new HashMap<>();
Map<String, Boolean> aliasTypeMap = new HashMap<>();
Map<String, Map<Path, Partition>> allowedAliasesToPartitions = new HashMap<>();
+ int numberOfUnmanagedPaths = 0;
for (TableScanOperator tso : tableScans) {
// use LinkedHashMap<String, Operator<? extends OperatorDesc>>
// getAliasToWork() should not apply this for non-native table
@@ -159,21 +169,48 @@ public class NullScanTaskDispatcher implements SemanticDispatcher {
} else {
managedEmptyPathMap.put(path, false);
}
+ } else {
+ numberOfUnmanagedPaths++;
}
}
}
+ int fetcherPoolParallelism = Math.min(maxAsyncLookupCount, numberOfUnmanagedPaths);
+ ExecutorService executorService = null;
+ if (fetcherPoolParallelism > 1) { // don't create executor service for one partition
+ executorService = Executors.newFixedThreadPool(fetcherPoolParallelism,
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat("NullScanOptimizer_lookup#%d").build());
+ }
+ List<Future<?>> lookupFutures = new ArrayList<>(numberOfUnmanagedPaths);
for (Entry<Path, Collection<String>> entry : candidatePathsToAliases.entrySet()) {
Path path = entry.getKey();
Collection<String> allowed = entry.getValue();
Boolean isEmpty = managedEmptyPathMap.get(path);
// if isEmpty is null, either stats are not up to date or this is external table
if (isEmpty == null) {
- lookupAndProcessPath(work, path, allowed);
+ if (executorService != null) {
+ Future<?> f = executorService.submit(() -> lookupAndProcessPath(work, path, allowed));
+ lookupFutures.add(f);
+ } else {
+ lookupAndProcessPath(work, path, allowed);
+ }
} else {
processPath(work, path, allowed, isEmpty);
}
}
+ try {
+ for (Future<?> future : lookupFutures) {
+ future.get();
+ }
+ } catch (ExecutionException | InterruptedException e) {
+ for (Future<?> f : lookupFutures) {
+ f.cancel(true);
+ }
+ LOG.error("NullScanOptimizer could not complete. It may miss eliminating some null scans", e);
+ }
+ if (executorService != null) {
+ executorService.shutdown();
+ }
}
private Map<Path, Partition> getPathToPartitionMap(String alias, TableScanOperator tso) {
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/optimizer/physical/TestNullScanTaskDispatcher.java b/ql/src/test/org/apache/hadoop/hive/ql/optimizer/physical/TestNullScanTaskDispatcher.java
index 4758ce6..5f6535a 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/optimizer/physical/TestNullScanTaskDispatcher.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/optimizer/physical/TestNullScanTaskDispatcher.java
@@ -136,6 +136,11 @@ public class TestNullScanTaskDispatcher {
}
@Test
+ public void testNumberOfListStatusCalls_whenExternalLookupRunsInCaller() throws IOException, SemanticException {
+ verifyNumberOfReads(1, 0, 0, 1);
+ }
+
+ @Test
public void testTwoManagedTables() throws IOException, SemanticException {
final String managedTable1 = "table1";
final String managedTable2 = "table2";