You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2020/10/31 03:25:49 UTC

[incubator-pinot] branch master updated: Support running pinot batch ingestion standalone job in a parallel mode (#6214)

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new cd5f812  Support running pinot batch ingestion standalone job in a parallel mode (#6214)
cd5f812 is described below

commit cd5f8129e126108a1bdb49c9ebf660167e62b5af
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Fri Oct 30 20:25:27 2020 -0700

    Support running pinot batch ingestion standalone job in a parallel mode (#6214)
---
 .../spark/SparkSegmentGenerationJobRunner.java     |  16 +--
 .../ingestion/batch/standalone/JobUtils.java       |  30 ++++++
 .../standalone/SegmentGenerationJobRunner.java     | 120 +++++++++++++--------
 .../batch/airlineStats/ingestionJobSpec.yaml       |   3 +
 4 files changed, 119 insertions(+), 50 deletions(-)

diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
index cdee5cf..9b36a91 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
@@ -53,6 +53,7 @@ import org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec;
 import org.apache.pinot.spi.ingestion.batch.spec.PinotFSSpec;
 import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
 import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationTaskSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentNameGeneratorSpec;
 import org.apache.pinot.spi.plugin.PluginManager;
 import org.apache.pinot.spi.utils.DataSizeUtils;
 import org.apache.spark.SparkContext;
@@ -205,13 +206,7 @@ public class SparkSegmentGenerationJobRunner implements IngestionJobRunner, Seri
       }
 
       List<String> pathAndIdxList = new ArrayList<>();
-      String localDirectorySequenceIdString =
-          _spec.getSegmentNameGeneratorSpec().getConfigs().get(LOCAL_DIRECTORY_SEQUENCE_ID);
-      boolean localDirectorySequenceId = false;
-      if (localDirectorySequenceIdString != null) {
-        localDirectorySequenceId = Boolean.parseBoolean(localDirectorySequenceIdString);
-      }
-      if (localDirectorySequenceId) {
+      if (getLocalDirectorySequenceId(_spec.getSegmentNameGeneratorSpec())) {
         Map<String, List<String>> localDirIndex = new HashMap<>();
         for (String filteredFile : filteredFiles) {
           Path filteredParentPath = Paths.get(filteredFile).getParent();
@@ -352,6 +347,13 @@ public class SparkSegmentGenerationJobRunner implements IngestionJobRunner, Seri
     }
   }
 
+  private static boolean getLocalDirectorySequenceId(SegmentNameGeneratorSpec spec) {
+    if (spec == null || spec.getConfigs() == null) {
+      return false;
+    }
+    return Boolean.parseBoolean(spec.getConfigs().get(LOCAL_DIRECTORY_SEQUENCE_ID));
+  }
+
   protected void addDepsJarToDistributedCache(JavaSparkContext sparkContext, String depsJarDir)
       throws IOException {
     if (depsJarDir != null) {
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/JobUtils.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/JobUtils.java
new file mode 100644
index 0000000..506e38b
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/JobUtils.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.ingestion.batch.standalone;
+
+public class JobUtils {
+
+  public static int getNumThreads(int jobParallelism) {
+    int numCores = Math.max(Runtime.getRuntime().availableProcessors(), 1);
+    if (jobParallelism > 0) {
+      return Math.min(numCores, jobParallelism);
+    }
+    return 1;
+  }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
index 661134f..f60a7e3 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
@@ -20,6 +20,7 @@ package org.apache.pinot.plugin.ingestion.batch.standalone;
 
 import java.io.File;
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.nio.file.FileSystems;
 import java.nio.file.PathMatcher;
 import java.nio.file.Paths;
@@ -27,6 +28,9 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner;
@@ -52,6 +56,7 @@ public class SegmentGenerationJobRunner implements IngestionJobRunner {
   private static final Logger LOGGER = LoggerFactory.getLogger(SegmentGenerationJobRunner.class);
 
   private SegmentGenerationJobSpec _spec;
+  private ExecutorService _executorService;
 
   public SegmentGenerationJobRunner() {
   }
@@ -83,8 +88,8 @@ public class SegmentGenerationJobRunner implements IngestionJobRunner {
         throw new RuntimeException("Missing property 'schemaURI' in 'tableSpec'");
       }
       PinotClusterSpec pinotClusterSpec = _spec.getPinotClusterSpecs()[0];
-      String schemaURI = SegmentGenerationUtils.generateSchemaURI(pinotClusterSpec.getControllerURI(),
-          _spec.getTableSpec().getTableName());
+      String schemaURI = SegmentGenerationUtils
+          .generateSchemaURI(pinotClusterSpec.getControllerURI(), _spec.getTableSpec().getTableName());
       _spec.getTableSpec().setSchemaURI(schemaURI);
     }
     if (_spec.getTableSpec().getTableConfigURI() == null) {
@@ -96,6 +101,11 @@ public class SegmentGenerationJobRunner implements IngestionJobRunner {
           .generateTableConfigURI(pinotClusterSpec.getControllerURI(), _spec.getTableSpec().getTableName());
       _spec.getTableSpec().setTableConfigURI(tableConfigURI);
     }
+    final int jobParallelism = _spec.getSegmentCreationJobParallelism();
+    int numThreads = JobUtils.getNumThreads(jobParallelism);
+    LOGGER.info("Creating an executor service with {} threads(Job parallelism: {}, available cores: {}.)", numThreads,
+        jobParallelism, Runtime.getRuntime().availableProcessors());
+    _executorService = Executors.newFixedThreadPool(numThreads);
   }
 
   @Override
@@ -108,17 +118,11 @@ public class SegmentGenerationJobRunner implements IngestionJobRunner {
     }
 
     //Get pinotFS for input
-    URI inputDirURI = new URI(_spec.getInputDirURI());
-    if (inputDirURI.getScheme() == null) {
-      inputDirURI = new File(_spec.getInputDirURI()).toURI();
-    }
+    final URI inputDirURI = getDirectoryUri(_spec.getInputDirURI());
     PinotFS inputDirFS = PinotFSFactory.create(inputDirURI.getScheme());
 
     //Get outputFS for writing output pinot segments
-    URI outputDirURI = new URI(_spec.getOutputDirURI());
-    if (outputDirURI.getScheme() == null) {
-      outputDirURI = new File(_spec.getOutputDirURI()).toURI();
-    }
+    final URI outputDirURI = getDirectoryUri(_spec.getOutputDirURI());
     PinotFS outputDirFS = PinotFSFactory.create(outputDirURI.getScheme());
     outputDirFS.mkdir(outputDirURI);
 
@@ -151,7 +155,6 @@ public class SegmentGenerationJobRunner implements IngestionJobRunner {
         filteredFiles.add(file);
       }
     }
-
     File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-" + UUID.randomUUID());
     try {
       //create localTempDir for input and output
@@ -164,13 +167,12 @@ public class SegmentGenerationJobRunner implements IngestionJobRunner {
       Schema schema = SegmentGenerationUtils.getSchema(_spec.getTableSpec().getSchemaURI());
       TableConfig tableConfig = SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI());
 
+      int numInputFiles = filteredFiles.size();
+      CountDownLatch segmentCreationTaskCountDownLatch = new CountDownLatch(numInputFiles);
       //iterate on the file list, for each
-      for (int i = 0; i < filteredFiles.size(); i++) {
-        URI inputFileURI = URI.create(filteredFiles.get(i));
-        if (inputFileURI.getScheme() == null) {
-          inputFileURI =
-              new URI(inputDirURI.getScheme(), inputFileURI.getSchemeSpecificPart(), inputFileURI.getFragment());
-        }
+      for (int i = 0; i < numInputFiles; i++) {
+        final URI inputFileURI = getFileURI(filteredFiles.get(i), inputDirURI.getScheme());
+
         //copy input path to local
         File localInputDataFile = new File(localInputTempDir, new File(inputFileURI.getPath()).getName());
         inputDirFS.copyToLocalFile(inputFileURI, localInputDataFile);
@@ -185,35 +187,67 @@ public class SegmentGenerationJobRunner implements IngestionJobRunner {
         taskSpec.setSequenceId(i);
         taskSpec.setSegmentNameGeneratorSpec(_spec.getSegmentNameGeneratorSpec());
 
-        //invoke segmentGenerationTask
-        SegmentGenerationTaskRunner taskRunner = new SegmentGenerationTaskRunner(taskSpec);
-        String segmentName = taskRunner.run();
-
-        // Tar segment directory to compress file
-        File localSegmentDir = new File(localOutputTempDir, segmentName);
-        String segmentTarFileName = segmentName + Constants.TAR_GZ_FILE_EXT;
-        File localSegmentTarFile = new File(localOutputTempDir, segmentTarFileName);
-        LOGGER.info("Tarring segment from: {} to: {}", localSegmentDir, localSegmentTarFile);
-        TarGzCompressionUtils.createTarGzFile(localSegmentDir, localSegmentTarFile);
-        long uncompressedSegmentSize = FileUtils.sizeOf(localSegmentDir);
-        long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile);
-        LOGGER.info("Size for segment: {}, uncompressed: {}, compressed: {}", segmentName,
-            DataSizeUtils.fromBytes(uncompressedSegmentSize), DataSizeUtils.fromBytes(compressedSegmentSize));
-        //move segment to output PinotFS
-        URI outputSegmentTarURI = SegmentGenerationUtils.getRelativeOutputPath(inputDirURI, inputFileURI, outputDirURI)
-            .resolve(segmentTarFileName);
-        if (!_spec.isOverwriteOutput() && outputDirFS.exists(outputSegmentTarURI)) {
-          LOGGER.warn("Not overwrite existing output segment tar file: {}", outputDirFS.exists(outputSegmentTarURI));
-        } else {
-          outputDirFS.copyFromLocalFile(localSegmentTarFile, outputSegmentTarURI);
-        }
-        FileUtils.deleteQuietly(localSegmentDir);
-        FileUtils.deleteQuietly(localSegmentTarFile);
-        FileUtils.deleteQuietly(localInputDataFile);
+        LOGGER.info("Submitting one Segment Generation Task for {}", inputFileURI);
+        _executorService.submit(() -> {
+          File localSegmentDir = null;
+          File localSegmentTarFile = null;
+          try {
+            //invoke segmentGenerationTask
+            SegmentGenerationTaskRunner taskRunner = new SegmentGenerationTaskRunner(taskSpec);
+            String segmentName = taskRunner.run();
+            // Tar segment directory to compress file
+            localSegmentDir = new File(localOutputTempDir, segmentName);
+            String segmentTarFileName = segmentName + Constants.TAR_GZ_FILE_EXT;
+            localSegmentTarFile = new File(localOutputTempDir, segmentTarFileName);
+            LOGGER.info("Tarring segment from: {} to: {}", localSegmentDir, localSegmentTarFile);
+            TarGzCompressionUtils.createTarGzFile(localSegmentDir, localSegmentTarFile);
+            long uncompressedSegmentSize = FileUtils.sizeOf(localSegmentDir);
+            long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile);
+            LOGGER.info("Size for segment: {}, uncompressed: {}, compressed: {}", segmentName,
+                DataSizeUtils.fromBytes(uncompressedSegmentSize), DataSizeUtils.fromBytes(compressedSegmentSize));
+            //move segment to output PinotFS
+            URI outputSegmentTarURI =
+                SegmentGenerationUtils.getRelativeOutputPath(inputDirURI, inputFileURI, outputDirURI)
+                    .resolve(segmentTarFileName);
+            if (!_spec.isOverwriteOutput() && outputDirFS.exists(outputSegmentTarURI)) {
+              LOGGER
+                  .warn("Not overwrite existing output segment tar file: {}", outputDirFS.exists(outputSegmentTarURI));
+            } else {
+              outputDirFS.copyFromLocalFile(localSegmentTarFile, outputSegmentTarURI);
+            }
+          } catch (Exception e) {
+            LOGGER.error("Failed to generate Pinot segment for file - {}", inputFileURI, e);
+          } finally {
+            segmentCreationTaskCountDownLatch.countDown();
+            FileUtils.deleteQuietly(localSegmentDir);
+            FileUtils.deleteQuietly(localSegmentTarFile);
+            FileUtils.deleteQuietly(localInputDataFile);
+          }
+        });
       }
+      segmentCreationTaskCountDownLatch.await();
     } finally {
       //clean up
-      FileUtils.deleteDirectory(localTempDir);
+      FileUtils.deleteQuietly(localTempDir);
+      _executorService.shutdown();
+    }
+  }
+
+  private URI getDirectoryUri(String uriStr)
+      throws URISyntaxException {
+    URI uri = new URI(uriStr);
+    if (uri.getScheme() == null) {
+      uri = new File(uriStr).toURI();
+    }
+    return uri;
+  }
+
+  private URI getFileURI(String uriStr, String fallbackScheme)
+      throws URISyntaxException {
+    URI fileURI = URI.create(uriStr);
+    if (fileURI.getScheme() == null) {
+      return new URI(fallbackScheme, fileURI.getSchemeSpecificPart(), fileURI.getFragment());
     }
+    return fileURI;
   }
 }
diff --git a/pinot-tools/src/main/resources/examples/batch/airlineStats/ingestionJobSpec.yaml b/pinot-tools/src/main/resources/examples/batch/airlineStats/ingestionJobSpec.yaml
index ef5413c..0ec698c 100644
--- a/pinot-tools/src/main/resources/examples/batch/airlineStats/ingestionJobSpec.yaml
+++ b/pinot-tools/src/main/resources/examples/batch/airlineStats/ingestionJobSpec.yaml
@@ -59,6 +59,9 @@ includeFileNamePattern: 'glob:**/*.avro'
 # outputDirURI: Root directory of output segments, expected to have scheme configured in PinotFS.
 outputDirURI: 'examples/batch/airlineStats/segments'
 
+# segmentCreationJobParallelism: Parallelism to build Pinot segments.
+segmentCreationJobParallelism: 4
+
 # overwriteOutput: Overwrite output segments if existed.
 overwriteOutput: true
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org