You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2020/10/30 00:52:59 UTC

[incubator-pinot] 01/01: Support running pinot batch ingestion standalone job in a parallel mode

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch parallelize_batch_ingestion_standalone_job
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 3ea8a59a8efc90087421a2acd34c434eacf0dee6
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Thu Oct 29 17:52:29 2020 -0700

    Support running pinot batch ingestion standalone job in a parallel mode
---
 .../standalone/SegmentGenerationJobRunner.java     | 118 +++++++++++++--------
 1 file changed, 75 insertions(+), 43 deletions(-)

diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
index 661134f..518e0d1 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
@@ -20,6 +20,7 @@ package org.apache.pinot.plugin.ingestion.batch.standalone;
 
 import java.io.File;
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.nio.file.FileSystems;
 import java.nio.file.PathMatcher;
 import java.nio.file.Paths;
@@ -27,6 +28,9 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner;
@@ -52,6 +56,7 @@ public class SegmentGenerationJobRunner implements IngestionJobRunner {
   private static final Logger LOGGER = LoggerFactory.getLogger(SegmentGenerationJobRunner.class);
 
   private SegmentGenerationJobSpec _spec;
+  private ExecutorService _executorService;
 
   public SegmentGenerationJobRunner() {
   }
@@ -83,8 +88,8 @@ public class SegmentGenerationJobRunner implements IngestionJobRunner {
         throw new RuntimeException("Missing property 'schemaURI' in 'tableSpec'");
       }
       PinotClusterSpec pinotClusterSpec = _spec.getPinotClusterSpecs()[0];
-      String schemaURI = SegmentGenerationUtils.generateSchemaURI(pinotClusterSpec.getControllerURI(),
-          _spec.getTableSpec().getTableName());
+      String schemaURI = SegmentGenerationUtils
+          .generateSchemaURI(pinotClusterSpec.getControllerURI(), _spec.getTableSpec().getTableName());
       _spec.getTableSpec().setSchemaURI(schemaURI);
     }
     if (_spec.getTableSpec().getTableConfigURI() == null) {
@@ -96,6 +101,9 @@ public class SegmentGenerationJobRunner implements IngestionJobRunner {
           .generateTableConfigURI(pinotClusterSpec.getControllerURI(), _spec.getTableSpec().getTableName());
       _spec.getTableSpec().setTableConfigURI(tableConfigURI);
     }
+    int numThreads = (_spec.getSegmentCreationJobParallelism() > 0) ? _spec.getSegmentCreationJobParallelism() : 1;
+    LOGGER.info("Creating an executor service with {} threads.", numThreads);
+    _executorService = Executors.newFixedThreadPool(numThreads);
   }
 
   @Override
@@ -108,17 +116,11 @@ public class SegmentGenerationJobRunner implements IngestionJobRunner {
     }
 
     //Get pinotFS for input
-    URI inputDirURI = new URI(_spec.getInputDirURI());
-    if (inputDirURI.getScheme() == null) {
-      inputDirURI = new File(_spec.getInputDirURI()).toURI();
-    }
+    final URI inputDirURI = getDirectoryUri(_spec.getInputDirURI());
     PinotFS inputDirFS = PinotFSFactory.create(inputDirURI.getScheme());
 
     //Get outputFS for writing output pinot segments
-    URI outputDirURI = new URI(_spec.getOutputDirURI());
-    if (outputDirURI.getScheme() == null) {
-      outputDirURI = new File(_spec.getOutputDirURI()).toURI();
-    }
+    final URI outputDirURI = getDirectoryUri(_spec.getOutputDirURI());
     PinotFS outputDirFS = PinotFSFactory.create(outputDirURI.getScheme());
     outputDirFS.mkdir(outputDirURI);
 
@@ -151,7 +153,6 @@ public class SegmentGenerationJobRunner implements IngestionJobRunner {
         filteredFiles.add(file);
       }
     }
-
     File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-" + UUID.randomUUID());
     try {
       //create localTempDir for input and output
@@ -164,13 +165,12 @@ public class SegmentGenerationJobRunner implements IngestionJobRunner {
       Schema schema = SegmentGenerationUtils.getSchema(_spec.getTableSpec().getSchemaURI());
       TableConfig tableConfig = SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI());
 
+      int numInputFiles = filteredFiles.size();
+      CountDownLatch segmentCreationTaskCountDownLatch = new CountDownLatch(numInputFiles);
       //iterate on the file list, for each
-      for (int i = 0; i < filteredFiles.size(); i++) {
-        URI inputFileURI = URI.create(filteredFiles.get(i));
-        if (inputFileURI.getScheme() == null) {
-          inputFileURI =
-              new URI(inputDirURI.getScheme(), inputFileURI.getSchemeSpecificPart(), inputFileURI.getFragment());
-        }
+      for (int i = 0; i < numInputFiles; i++) {
+        final URI inputFileURI = getFileURI(filteredFiles.get(i), inputDirURI.getScheme());
+
         //copy input path to local
         File localInputDataFile = new File(localInputTempDir, new File(inputFileURI.getPath()).getName());
         inputDirFS.copyToLocalFile(inputFileURI, localInputDataFile);
@@ -185,35 +185,67 @@ public class SegmentGenerationJobRunner implements IngestionJobRunner {
         taskSpec.setSequenceId(i);
         taskSpec.setSegmentNameGeneratorSpec(_spec.getSegmentNameGeneratorSpec());
 
-        //invoke segmentGenerationTask
-        SegmentGenerationTaskRunner taskRunner = new SegmentGenerationTaskRunner(taskSpec);
-        String segmentName = taskRunner.run();
-
-        // Tar segment directory to compress file
-        File localSegmentDir = new File(localOutputTempDir, segmentName);
-        String segmentTarFileName = segmentName + Constants.TAR_GZ_FILE_EXT;
-        File localSegmentTarFile = new File(localOutputTempDir, segmentTarFileName);
-        LOGGER.info("Tarring segment from: {} to: {}", localSegmentDir, localSegmentTarFile);
-        TarGzCompressionUtils.createTarGzFile(localSegmentDir, localSegmentTarFile);
-        long uncompressedSegmentSize = FileUtils.sizeOf(localSegmentDir);
-        long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile);
-        LOGGER.info("Size for segment: {}, uncompressed: {}, compressed: {}", segmentName,
-            DataSizeUtils.fromBytes(uncompressedSegmentSize), DataSizeUtils.fromBytes(compressedSegmentSize));
-        //move segment to output PinotFS
-        URI outputSegmentTarURI = SegmentGenerationUtils.getRelativeOutputPath(inputDirURI, inputFileURI, outputDirURI)
-            .resolve(segmentTarFileName);
-        if (!_spec.isOverwriteOutput() && outputDirFS.exists(outputSegmentTarURI)) {
-          LOGGER.warn("Not overwrite existing output segment tar file: {}", outputDirFS.exists(outputSegmentTarURI));
-        } else {
-          outputDirFS.copyFromLocalFile(localSegmentTarFile, outputSegmentTarURI);
-        }
-        FileUtils.deleteQuietly(localSegmentDir);
-        FileUtils.deleteQuietly(localSegmentTarFile);
-        FileUtils.deleteQuietly(localInputDataFile);
+        LOGGER.info("Submitting one Segment Generation Task for {}", inputFileURI);
+        _executorService.submit(() -> {
+          File localSegmentDir = null;
+          File localSegmentTarFile = null;
+          try {
+            //invoke segmentGenerationTask
+            SegmentGenerationTaskRunner taskRunner = new SegmentGenerationTaskRunner(taskSpec);
+            String segmentName = taskRunner.run();
+            // Tar segment directory to compress file
+            localSegmentDir = new File(localOutputTempDir, segmentName);
+            String segmentTarFileName = segmentName + Constants.TAR_GZ_FILE_EXT;
+            localSegmentTarFile = new File(localOutputTempDir, segmentTarFileName);
+            LOGGER.info("Tarring segment from: {} to: {}", localSegmentDir, localSegmentTarFile);
+            TarGzCompressionUtils.createTarGzFile(localSegmentDir, localSegmentTarFile);
+            long uncompressedSegmentSize = FileUtils.sizeOf(localSegmentDir);
+            long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile);
+            LOGGER.info("Size for segment: {}, uncompressed: {}, compressed: {}", segmentName,
+                DataSizeUtils.fromBytes(uncompressedSegmentSize), DataSizeUtils.fromBytes(compressedSegmentSize));
+            //move segment to output PinotFS
+            URI outputSegmentTarURI =
+                SegmentGenerationUtils.getRelativeOutputPath(inputDirURI, inputFileURI, outputDirURI)
+                    .resolve(segmentTarFileName);
+            if (!_spec.isOverwriteOutput() && outputDirFS.exists(outputSegmentTarURI)) {
+              LOGGER
+                  .warn("Not overwrite existing output segment tar file: {}", outputDirFS.exists(outputSegmentTarURI));
+            } else {
+              outputDirFS.copyFromLocalFile(localSegmentTarFile, outputSegmentTarURI);
+            }
+          } catch (Exception e) {
+            LOGGER.error("Failed to generate Pinot segment for file - {}", inputFileURI, e);
+          } finally {
+            segmentCreationTaskCountDownLatch.countDown();
+            FileUtils.deleteQuietly(localSegmentDir);
+            FileUtils.deleteQuietly(localSegmentTarFile);
+            FileUtils.deleteQuietly(localInputDataFile);
+          }
+        });
       }
+      segmentCreationTaskCountDownLatch.await();
     } finally {
       //clean up
-      FileUtils.deleteDirectory(localTempDir);
+      FileUtils.deleteQuietly(localTempDir);
+      _executorService.shutdown();
+    }
+  }
+
+  private URI getDirectoryUri(String uriStr)
+      throws URISyntaxException {
+    URI uri = new URI(uriStr);
+    if (uri.getScheme() == null) {
+      uri = new File(uriStr).toURI();
+    }
+    return uri;
+  }
+
+  private URI getFileURI(String uriStr, String fallbackScheme)
+      throws URISyntaxException {
+    URI fileURI = URI.create(uriStr);
+    if (fileURI.getScheme() == null) {
+      return new URI(fallbackScheme, fileURI.getSchemeSpecificPart(), fileURI.getFragment());
     }
+    return fileURI;
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org