You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by mu...@apache.org on 2020/12/02 00:48:11 UTC

[hive] branch master updated: HIVE-24380: NullScanTaskDispatcher should liststatus in parallel (Mustafa Iman, reviewed by Rajesh Balamohan)

This is an automated email from the ASF dual-hosted git repository.

mustafaiman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 2597088  HIVE-24380: NullScanTaskDispatcher should liststatus in parallel (Mustafa Iman, reviewed by Rajesh Balamohan)
2597088 is described below

commit 25970885d5f401171e0e1e30b48b0f3aaf86aaed
Author: Mustafa Iman <mu...@gmail.com>
AuthorDate: Tue Nov 3 00:06:23 2020 -0800

    HIVE-24380: NullScanTaskDispatcher should liststatus in parallel (Mustafa Iman, reviewed by Rajesh Balamohan)
    
    Change-Id: I033dec5a74224ebc1fcd7d1cad4ab718d872d229
---
 .../optimizer/physical/NullScanTaskDispatcher.java | 43 ++++++++++++++++++++--
 .../physical/TestNullScanTaskDispatcher.java       |  5 +++
 2 files changed, 45 insertions(+), 3 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanTaskDispatcher.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanTaskDispatcher.java
index a224bb9..e43fbd3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanTaskDispatcher.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanTaskDispatcher.java
@@ -29,10 +29,16 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.Stack;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.common.StringInternUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
@@ -70,6 +76,7 @@ public class NullScanTaskDispatcher implements SemanticDispatcher {
   static final Logger LOG =
       LoggerFactory.getLogger(NullScanTaskDispatcher.class);
 
+  private final int maxAsyncLookupCount;
   private final PhysicalContext physicalContext;
   private final Map<SemanticRule, SemanticNodeProcessor> rules;
 
@@ -78,6 +85,8 @@ public class NullScanTaskDispatcher implements SemanticDispatcher {
     super();
     this.physicalContext = context;
     this.rules = rules;
+    this.maxAsyncLookupCount = physicalContext.getConf()
+        .getIntVar(HiveConf.ConfVars.HIVE_COMPUTE_SPLITS_NUM_THREADS);
   }
 
   private String getAliasForTableScanOperator(MapWork work,
@@ -102,8 +111,8 @@ public class NullScanTaskDispatcher implements SemanticDispatcher {
     }
   }
 
-  private void processPath(MapWork work, Path path, Collection<String> aliasesToOptimize,
-                           boolean isEmpty) {
+  private synchronized void processPath(MapWork work, Path path, Collection<String> aliasesToOptimize,
+                                        boolean isEmpty) {
     PartitionDesc partDesc = work.getPathToPartitionInfo().get(path).clone();
     partDesc.setInputFileFormatClass(isEmpty ? ZeroRowsInputFormat.class : OneNullRowInputFormat.class);
     partDesc.setOutputFileFormatClass(HiveIgnoreKeyTextOutputFormat.class);
@@ -128,6 +137,7 @@ public class NullScanTaskDispatcher implements SemanticDispatcher {
     Map<Path, Collection<String>> candidatePathsToAliases = new HashMap<>();
     Map<String, Boolean> aliasTypeMap = new HashMap<>();
     Map<String, Map<Path, Partition>> allowedAliasesToPartitions = new HashMap<>();
+    int numberOfUnmanagedPaths = 0;
     for (TableScanOperator tso : tableScans) {
       // use LinkedHashMap<String, Operator<? extends OperatorDesc>>
       // getAliasToWork() should not apply this for non-native table
@@ -159,21 +169,48 @@ public class NullScanTaskDispatcher implements SemanticDispatcher {
           } else {
             managedEmptyPathMap.put(path, false);
           }
+        } else {
+          numberOfUnmanagedPaths++;
         }
       }
     }
 
+    int fetcherPoolParallelism = Math.min(maxAsyncLookupCount, numberOfUnmanagedPaths);
+    ExecutorService executorService = null;
+    if (fetcherPoolParallelism > 1) { // don't create executor service for one partition
+      executorService = Executors.newFixedThreadPool(fetcherPoolParallelism,
+          new ThreadFactoryBuilder().setDaemon(true).setNameFormat("NullScanOptimizer_lookup#%d").build());
+    }
+    List<Future<?>> lookupFutures = new ArrayList<>(numberOfUnmanagedPaths);
     for (Entry<Path, Collection<String>> entry : candidatePathsToAliases.entrySet()) {
       Path path = entry.getKey();
       Collection<String> allowed = entry.getValue();
       Boolean isEmpty = managedEmptyPathMap.get(path);
       // if isEmpty is null, either stats are not up to date or this is external table
       if (isEmpty == null) {
-        lookupAndProcessPath(work, path, allowed);
+        if (executorService != null) {
+          Future<?> f = executorService.submit(() -> lookupAndProcessPath(work, path, allowed));
+          lookupFutures.add(f);
+        } else {
+          lookupAndProcessPath(work, path, allowed);
+        }
       } else {
         processPath(work, path, allowed, isEmpty);
       }
     }
+    try {
+      for (Future<?> future : lookupFutures) {
+          future.get();
+      }
+    } catch (ExecutionException | InterruptedException e) {
+      for (Future<?> f : lookupFutures) {
+        f.cancel(true);
+      }
+      LOG.error("NullScanOptimizer could not complete. It may miss eliminating some null scans", e);
+    }
+    if (executorService != null) {
+      executorService.shutdown();
+    }
   }
 
   private Map<Path, Partition> getPathToPartitionMap(String alias, TableScanOperator tso) {
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/optimizer/physical/TestNullScanTaskDispatcher.java b/ql/src/test/org/apache/hadoop/hive/ql/optimizer/physical/TestNullScanTaskDispatcher.java
index 4758ce6..5f6535a 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/optimizer/physical/TestNullScanTaskDispatcher.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/optimizer/physical/TestNullScanTaskDispatcher.java
@@ -136,6 +136,11 @@ public class TestNullScanTaskDispatcher {
   }
 
   @Test
+  public void testNumberOfListStatusCalls_whenExternalLookupRunsInCaller() throws IOException, SemanticException {
+    verifyNumberOfReads(1, 0, 0, 1);
+  }
+
+  @Test
   public void testTwoManagedTables() throws IOException, SemanticException {
     final String managedTable1 = "table1";
     final String managedTable2 = "table2";