You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2020/10/30 00:52:58 UTC

[incubator-pinot] branch parallelize_batch_ingestion_standalone_job created (now 3ea8a59)

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a change to branch parallelize_batch_ingestion_standalone_job
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at 3ea8a59  Support running pinot batch ingestion standalone job in a parallel mode

This branch includes the following new commits:

     new 3ea8a59  Support running pinot batch ingestion standalone job in a parallel mode

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Support running pinot batch ingestion standalone job in a parallel mode

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch parallelize_batch_ingestion_standalone_job
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 3ea8a59a8efc90087421a2acd34c434eacf0dee6
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Thu Oct 29 17:52:29 2020 -0700

    Support running pinot batch ingestion standalone job in a parallel mode
---
 .../standalone/SegmentGenerationJobRunner.java     | 118 +++++++++++++--------
 1 file changed, 75 insertions(+), 43 deletions(-)

diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
index 661134f..518e0d1 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
@@ -20,6 +20,7 @@ package org.apache.pinot.plugin.ingestion.batch.standalone;
 
 import java.io.File;
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.nio.file.FileSystems;
 import java.nio.file.PathMatcher;
 import java.nio.file.Paths;
@@ -27,6 +28,9 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner;
@@ -52,6 +56,7 @@ public class SegmentGenerationJobRunner implements IngestionJobRunner {
   private static final Logger LOGGER = LoggerFactory.getLogger(SegmentGenerationJobRunner.class);
 
   private SegmentGenerationJobSpec _spec;
+  private ExecutorService _executorService;
 
   public SegmentGenerationJobRunner() {
   }
@@ -83,8 +88,8 @@ public class SegmentGenerationJobRunner implements IngestionJobRunner {
         throw new RuntimeException("Missing property 'schemaURI' in 'tableSpec'");
       }
       PinotClusterSpec pinotClusterSpec = _spec.getPinotClusterSpecs()[0];
-      String schemaURI = SegmentGenerationUtils.generateSchemaURI(pinotClusterSpec.getControllerURI(),
-          _spec.getTableSpec().getTableName());
+      String schemaURI = SegmentGenerationUtils
+          .generateSchemaURI(pinotClusterSpec.getControllerURI(), _spec.getTableSpec().getTableName());
       _spec.getTableSpec().setSchemaURI(schemaURI);
     }
     if (_spec.getTableSpec().getTableConfigURI() == null) {
@@ -96,6 +101,9 @@ public class SegmentGenerationJobRunner implements IngestionJobRunner {
           .generateTableConfigURI(pinotClusterSpec.getControllerURI(), _spec.getTableSpec().getTableName());
       _spec.getTableSpec().setTableConfigURI(tableConfigURI);
     }
+    int numThreads = (_spec.getSegmentCreationJobParallelism() > 0) ? _spec.getSegmentCreationJobParallelism() : 1;
+    LOGGER.info("Creating an executor service with {} threads.", numThreads);
+    _executorService = Executors.newFixedThreadPool(numThreads);
   }
 
   @Override
@@ -108,17 +116,11 @@ public class SegmentGenerationJobRunner implements IngestionJobRunner {
     }
 
     //Get pinotFS for input
-    URI inputDirURI = new URI(_spec.getInputDirURI());
-    if (inputDirURI.getScheme() == null) {
-      inputDirURI = new File(_spec.getInputDirURI()).toURI();
-    }
+    final URI inputDirURI = getDirectoryUri(_spec.getInputDirURI());
     PinotFS inputDirFS = PinotFSFactory.create(inputDirURI.getScheme());
 
     //Get outputFS for writing output pinot segments
-    URI outputDirURI = new URI(_spec.getOutputDirURI());
-    if (outputDirURI.getScheme() == null) {
-      outputDirURI = new File(_spec.getOutputDirURI()).toURI();
-    }
+    final URI outputDirURI = getDirectoryUri(_spec.getOutputDirURI());
     PinotFS outputDirFS = PinotFSFactory.create(outputDirURI.getScheme());
     outputDirFS.mkdir(outputDirURI);
 
@@ -151,7 +153,6 @@ public class SegmentGenerationJobRunner implements IngestionJobRunner {
         filteredFiles.add(file);
       }
     }
-
     File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-" + UUID.randomUUID());
     try {
       //create localTempDir for input and output
@@ -164,13 +165,12 @@ public class SegmentGenerationJobRunner implements IngestionJobRunner {
       Schema schema = SegmentGenerationUtils.getSchema(_spec.getTableSpec().getSchemaURI());
       TableConfig tableConfig = SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI());
 
+      int numInputFiles = filteredFiles.size();
+      CountDownLatch segmentCreationTaskCountDownLatch = new CountDownLatch(numInputFiles);
       //iterate on the file list, for each
-      for (int i = 0; i < filteredFiles.size(); i++) {
-        URI inputFileURI = URI.create(filteredFiles.get(i));
-        if (inputFileURI.getScheme() == null) {
-          inputFileURI =
-              new URI(inputDirURI.getScheme(), inputFileURI.getSchemeSpecificPart(), inputFileURI.getFragment());
-        }
+      for (int i = 0; i < numInputFiles; i++) {
+        final URI inputFileURI = getFileURI(filteredFiles.get(i), inputDirURI.getScheme());
+
         //copy input path to local
         File localInputDataFile = new File(localInputTempDir, new File(inputFileURI.getPath()).getName());
         inputDirFS.copyToLocalFile(inputFileURI, localInputDataFile);
@@ -185,35 +185,67 @@ public class SegmentGenerationJobRunner implements IngestionJobRunner {
         taskSpec.setSequenceId(i);
         taskSpec.setSegmentNameGeneratorSpec(_spec.getSegmentNameGeneratorSpec());
 
-        //invoke segmentGenerationTask
-        SegmentGenerationTaskRunner taskRunner = new SegmentGenerationTaskRunner(taskSpec);
-        String segmentName = taskRunner.run();
-
-        // Tar segment directory to compress file
-        File localSegmentDir = new File(localOutputTempDir, segmentName);
-        String segmentTarFileName = segmentName + Constants.TAR_GZ_FILE_EXT;
-        File localSegmentTarFile = new File(localOutputTempDir, segmentTarFileName);
-        LOGGER.info("Tarring segment from: {} to: {}", localSegmentDir, localSegmentTarFile);
-        TarGzCompressionUtils.createTarGzFile(localSegmentDir, localSegmentTarFile);
-        long uncompressedSegmentSize = FileUtils.sizeOf(localSegmentDir);
-        long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile);
-        LOGGER.info("Size for segment: {}, uncompressed: {}, compressed: {}", segmentName,
-            DataSizeUtils.fromBytes(uncompressedSegmentSize), DataSizeUtils.fromBytes(compressedSegmentSize));
-        //move segment to output PinotFS
-        URI outputSegmentTarURI = SegmentGenerationUtils.getRelativeOutputPath(inputDirURI, inputFileURI, outputDirURI)
-            .resolve(segmentTarFileName);
-        if (!_spec.isOverwriteOutput() && outputDirFS.exists(outputSegmentTarURI)) {
-          LOGGER.warn("Not overwrite existing output segment tar file: {}", outputDirFS.exists(outputSegmentTarURI));
-        } else {
-          outputDirFS.copyFromLocalFile(localSegmentTarFile, outputSegmentTarURI);
-        }
-        FileUtils.deleteQuietly(localSegmentDir);
-        FileUtils.deleteQuietly(localSegmentTarFile);
-        FileUtils.deleteQuietly(localInputDataFile);
+        LOGGER.info("Submitting one Segment Generation Task for {}", inputFileURI);
+        _executorService.submit(() -> {
+          File localSegmentDir = null;
+          File localSegmentTarFile = null;
+          try {
+            //invoke segmentGenerationTask
+            SegmentGenerationTaskRunner taskRunner = new SegmentGenerationTaskRunner(taskSpec);
+            String segmentName = taskRunner.run();
+            // Tar segment directory to compress file
+            localSegmentDir = new File(localOutputTempDir, segmentName);
+            String segmentTarFileName = segmentName + Constants.TAR_GZ_FILE_EXT;
+            localSegmentTarFile = new File(localOutputTempDir, segmentTarFileName);
+            LOGGER.info("Tarring segment from: {} to: {}", localSegmentDir, localSegmentTarFile);
+            TarGzCompressionUtils.createTarGzFile(localSegmentDir, localSegmentTarFile);
+            long uncompressedSegmentSize = FileUtils.sizeOf(localSegmentDir);
+            long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile);
+            LOGGER.info("Size for segment: {}, uncompressed: {}, compressed: {}", segmentName,
+                DataSizeUtils.fromBytes(uncompressedSegmentSize), DataSizeUtils.fromBytes(compressedSegmentSize));
+            //move segment to output PinotFS
+            URI outputSegmentTarURI =
+                SegmentGenerationUtils.getRelativeOutputPath(inputDirURI, inputFileURI, outputDirURI)
+                    .resolve(segmentTarFileName);
+            if (!_spec.isOverwriteOutput() && outputDirFS.exists(outputSegmentTarURI)) {
+              LOGGER
+                  .warn("Not overwrite existing output segment tar file: {}", outputDirFS.exists(outputSegmentTarURI));
+            } else {
+              outputDirFS.copyFromLocalFile(localSegmentTarFile, outputSegmentTarURI);
+            }
+          } catch (Exception e) {
+            LOGGER.error("Failed to generate Pinot segment for file - {}", inputFileURI, e);
+          } finally {
+            segmentCreationTaskCountDownLatch.countDown();
+            FileUtils.deleteQuietly(localSegmentDir);
+            FileUtils.deleteQuietly(localSegmentTarFile);
+            FileUtils.deleteQuietly(localInputDataFile);
+          }
+        });
       }
+      segmentCreationTaskCountDownLatch.await();
     } finally {
       //clean up
-      FileUtils.deleteDirectory(localTempDir);
+      FileUtils.deleteQuietly(localTempDir);
+      _executorService.shutdown();
+    }
+  }
+
+  private URI getDirectoryUri(String uriStr)
+      throws URISyntaxException {
+    URI uri = new URI(uriStr);
+    if (uri.getScheme() == null) {
+      uri = new File(uriStr).toURI();
+    }
+    return uri;
+  }
+
+  private URI getFileURI(String uriStr, String fallbackScheme)
+      throws URISyntaxException {
+    URI fileURI = URI.create(uriStr);
+    if (fileURI.getScheme() == null) {
+      return new URI(fallbackScheme, fileURI.getSchemeSpecificPart(), fileURI.getFragment());
     }
+    return fileURI;
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org