You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ab...@apache.org on 2023/02/13 12:11:34 UTC

[tez] branch master updated: TEZ-4397: Open Tez Input splits asynchronously (#263) (Syed Shameerur Rahman reviewed by Laszlo Bodor, original patch by Ramesh Kumar)

This is an automated email from the ASF dual-hosted git repository.

abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git


The following commit(s) were added to refs/heads/master by this push:
     new e3e91a150 TEZ-4397: Open Tez Input splits asynchronously (#263) (Syed Shameerur Rahman reviewed by Laszlo Bodor, original patch by Ramesh Kumar)
e3e91a150 is described below

commit e3e91a150dad44a9daa3102da04542e2e365203d
Author: Syed Shameerur Rahman <rh...@amazon.com>
AuthorDate: Mon Feb 13 17:41:28 2023 +0530

    TEZ-4397: Open Tez Input splits asynchronously (#263) (Syed Shameerur Rahman reviewed by Laszlo Bodor, original patch by Ramesh Kumar)
---
 .../mapred/split/TezGroupedSplitsInputFormat.java  | 100 +++++++++++++++++++--
 .../tez/mapreduce/grouper/TezSplitGrouper.java     |  15 ++++
 2 files changed, 108 insertions(+), 7 deletions(-)

diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
index 61ba56030..bdeba2a0a 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java
@@ -20,7 +20,16 @@ package org.apache.hadoop.mapred.split;
 
 import java.io.IOException;
 import java.util.Objects;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.tez.mapreduce.grouper.TezSplitGrouper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
@@ -129,14 +138,69 @@ public class TezGroupedSplitsInputFormat<K, V>
     int idx = 0;
     long progress;
     RecordReader<K, V> curReader;
-    
+    private final AtomicInteger initIndex;
+    private final int numReaders;
+    private ExecutorService initReaderExecService;
+    private BlockingDeque<Future<RecordReader<K, V>>> initedReaders;
+    private AtomicBoolean failureOccurred = new AtomicBoolean(false);
+
     public TezGroupedSplitsRecordReader(TezGroupedSplit split, JobConf job,
         Reporter reporter) throws IOException {
       this.groupedSplit = split;
       this.job = job;
       this.reporter = reporter;
+      this.initIndex = new AtomicInteger(0);
+      int numThreads = conf.getInt(TezSplitGrouper.TEZ_GROUPING_SPLIT_INIT_THREADS,
+          TezSplitGrouper.TEZ_GROUPING_SPLIT_INIT_THREADS_DEFAULT);
+      this.numReaders = Math.min(groupedSplit.wrappedSplits.size(),
+          conf.getInt(TezSplitGrouper.TEZ_GROUPING_SPLIT_INIT_RECORDREADERS,
+              TezSplitGrouper.TEZ_GROUPING_SPLIT_INIT_RECORDREADERS_DEFAULT));
+      // init the async split opening executor service if numReaders are greater than 1
+      if (numReaders > 1) {
+        this.initReaderExecService = Executors.newFixedThreadPool(numThreads,
+            new ThreadFactoryBuilder()
+                .setDaemon(true)
+                .setPriority(Thread.MAX_PRIORITY)
+                .setNameFormat("TEZ-Split-Init-Thread-%d")
+                .build());
+        this.initedReaders = new LinkedBlockingDeque<>();
+      }
       initNextRecordReader();
     }
+
+    private void preInitReaders() {
+      if (initReaderExecService == null) {
+        return;
+      }
+      for (int i = 0; i < numReaders; i++) {
+        initedReaders.offer(this.initReaderExecService.submit(() -> {
+          if (failureOccurred.get()) {
+            return null;
+          }
+          try {
+            int index = initIndex.getAndIncrement();
+            if (index >= groupedSplit.wrappedSplits.size()) {
+              return null;
+            }
+            InputSplit s = groupedSplit.wrappedSplits.get(index);
+            RecordReader<K, V> reader = wrappedInputFormat.getRecordReader(s, job, reporter);
+            LOG.debug("Init Thread processed reader number {} initialization", index);
+            return reader;
+          } catch (Exception e) {
+            failureOccurred.set(true);
+            if (e instanceof InterruptedException) {
+              Thread.currentThread().interrupt();
+            }
+            cancelFutures();
+            throw new RuntimeException(e);
+          }
+        }));
+      }
+    }
+
+    public RecordReader<K, V> getCurReader() {
+      return curReader;
+    }
     
     @Override
     public boolean next(K key, V value) throws IOException {
@@ -171,7 +235,7 @@ public class TezGroupedSplitsInputFormat<K, V>
         curReader = null;
       }
     }
-    
+
     protected boolean initNextRecordReader() throws IOException {
       if (curReader != null) {
         curReader.close();
@@ -183,23 +247,45 @@ public class TezGroupedSplitsInputFormat<K, V>
 
       // if all chunks have been processed, nothing more to do.
       if (idx == groupedSplit.wrappedSplits.size()) {
+        if (initReaderExecService != null) {
+          LOG.info("Shutting down the init record reader threadpool");
+          initReaderExecService.shutdownNow();
+        }
         return false;
       }
 
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Init record reader for index " + idx + " of " + 
+        LOG.debug("Init record reader for index " + idx + " of " +
                   groupedSplit.wrappedSplits.size());
       }
 
       // get a record reader for the idx-th chunk
       try {
-        curReader = wrappedInputFormat.getRecordReader(
-            groupedSplit.wrappedSplits.get(idx), job, reporter);
+        // get the cur reader directly when async split opening is disabled
+        if (initReaderExecService == null) {
+          curReader = wrappedInputFormat.getRecordReader(groupedSplit.wrappedSplits.get(idx), job, reporter);
+        } else {
+          preInitReaders();
+          curReader = initedReaders.take().get();
+        }
       } catch (Exception e) {
-        throw new RuntimeException (e);
+        failureOccurred.set(true);
+        if (e instanceof InterruptedException) {
+          Thread.currentThread().interrupt();
+        }
+        if (initedReaders != null) {
+          cancelFutures();
+        }
+        throw new RuntimeException(e);
       }
       idx++;
-      return true;
+      return curReader != null;
+    }
+
+    private void cancelFutures() {
+      for (Future<RecordReader<K, V>> f : initedReaders) {
+        f.cancel(true);
+      }
     }
 
     @Override
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java
index a1d6b6c80..b4143494f 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java
@@ -26,6 +26,7 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.TreeMap;
 
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.tez.common.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.util.RackResolver;
@@ -102,6 +103,20 @@ public abstract class TezSplitGrouper {
   public static final String TEZ_GROUPING_NODE_LOCAL_ONLY = "tez.grouping.node.local.only";
   public static final boolean TEZ_GROUPING_NODE_LOCAL_ONLY_DEFAULT = false;
 
+  /**
+   * Number of threads used to initialize the grouped splits, to asynchronously open the readers.
+   */
+  public static final String TEZ_GROUPING_SPLIT_INIT_THREADS = "tez.grouping.split.init.threads";
+  public static final int TEZ_GROUPING_SPLIT_INIT_THREADS_DEFAULT = 4;
+
+  /**
+   * Number of record readers to asynchronously and proactively init.
+   * In order for upstream apps to use this feature, the objects created in the
+   * upstream apps as part TezGroupedSplitsRecordReader call should be thread safe.
+   */
+  @InterfaceStability.Unstable
+  public static final String TEZ_GROUPING_SPLIT_INIT_RECORDREADERS = "tez.grouping.split.init.recordreaders";
+  public static final int TEZ_GROUPING_SPLIT_INIT_RECORDREADERS_DEFAULT = 1;
 
   static class LocationHolder {
     List<SplitContainer> splits;