You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2020/09/15 09:20:30 UTC

[incubator-pinot] branch master updated: Adding field 'segmentCreationJobParallelism' to allow users to set segment generation job parallelism. Default to the number of input files. (#6012)

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 054faf7  Adding field 'segmentCreationJobParallelism' to allow users to set segment generation job parallelism. Default to the number of input files. (#6012)
054faf7 is described below

commit 054faf76cdef5a625d042a4d435d428529c8d798
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Tue Sep 15 02:20:12 2020 -0700

    Adding field 'segmentCreationJobParallelism' to allow users to set segment generation job parallelism. Default to the number of input files. (#6012)
---
 .../batch/hadoop/HadoopSegmentGenerationJobRunner.java      |  6 +++++-
 .../batch/spark/SparkSegmentGenerationJobRunner.java        | 12 +++++++++---
 .../main/resources/segmentCreationAndTarPushJobSpec.yaml    |  1 +
 .../spi/ingestion/batch/spec/SegmentGenerationJobSpec.java  | 13 +++++++++++++
 .../pinot/spi/ingestion/batch/IngestionJobLauncherTest.java |  2 ++
 .../src/test/resources/ingestion_job_spec_template.yaml     |  1 +
 6 files changed, 31 insertions(+), 4 deletions(-)

diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
index dee13e5..59beadc 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
@@ -230,7 +230,11 @@ public class HadoopSegmentGenerationJobRunner extends Configured implements Inge
       if (hadoopTokenFileLocation != null) {
         jobConf.set("mapreduce.job.credentials.binary", hadoopTokenFileLocation);
       }
-      jobConf.setInt(JobContext.NUM_MAPS, numDataFiles);
+      int jobParallelism = _spec.getSegmentCreationJobParallelism();
+      if (jobParallelism <= 0 || jobParallelism > numDataFiles) {
+        jobParallelism = numDataFiles;
+      }
+      jobConf.setInt(JobContext.NUM_MAPS, jobParallelism);
 
       // Pinot plugins are necessary to launch Pinot ingestion job from every mapper.
       // In order to ensure pinot plugins would be loaded to each worker, this method
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
index c1b3f25..cdee5cf 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
@@ -205,7 +205,8 @@ public class SparkSegmentGenerationJobRunner implements IngestionJobRunner, Seri
       }
 
       List<String> pathAndIdxList = new ArrayList<>();
-      String localDirectorySequenceIdString = _spec.getSegmentNameGeneratorSpec().getConfigs().get(LOCAL_DIRECTORY_SEQUENCE_ID);
+      String localDirectorySequenceIdString =
+          _spec.getSegmentNameGeneratorSpec().getConfigs().get(LOCAL_DIRECTORY_SEQUENCE_ID);
       boolean localDirectorySequenceId = false;
       if (localDirectorySequenceIdString != null) {
         localDirectorySequenceId = Boolean.parseBoolean(localDirectorySequenceIdString);
@@ -219,7 +220,7 @@ public class SparkSegmentGenerationJobRunner implements IngestionJobRunner, Seri
           }
           localDirIndex.get(filteredParentPath.toString()).add(filteredFile);
         }
-        for (String parentPath: localDirIndex.keySet()){
+        for (String parentPath : localDirIndex.keySet()) {
           List<String> siblingFiles = localDirIndex.get(parentPath);
           Collections.sort(siblingFiles);
           for (int i = 0; i < siblingFiles.size(); i++) {
@@ -231,7 +232,12 @@ public class SparkSegmentGenerationJobRunner implements IngestionJobRunner, Seri
           pathAndIdxList.add(String.format("%s %d", filteredFiles.get(i), i));
         }
       }
-      JavaRDD<String> pathRDD = sparkContext.parallelize(pathAndIdxList, pathAndIdxList.size());
+      int numDataFiles = pathAndIdxList.size();
+      int jobParallelism = _spec.getSegmentCreationJobParallelism();
+      if (jobParallelism <= 0 || jobParallelism > numDataFiles) {
+        jobParallelism = numDataFiles;
+      }
+      JavaRDD<String> pathRDD = sparkContext.parallelize(pathAndIdxList, jobParallelism);
 
       final String pluginsInclude =
           (sparkContext.getConf().contains(PLUGINS_INCLUDE_PROPERTY_NAME)) ? sparkContext.getConf()
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/resources/segmentCreationAndTarPushJobSpec.yaml b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/resources/segmentCreationAndTarPushJobSpec.yaml
index 9b4da30..f4dd4b2 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/resources/segmentCreationAndTarPushJobSpec.yaml
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/resources/segmentCreationAndTarPushJobSpec.yaml
@@ -28,6 +28,7 @@ includeFileNamePattern: 'glob:**/*.parquet'
 excludeFileNamePattern: 'glob:**/*.avro'
 outputDirURI: 'file:///path/to/output'
 overwriteOutput: true
+parallelism: 10000
 pinotFSSpecs:
   - scheme: file
     className: org.apache.pinot.spi.filesystem.LocalPinotFS
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java
index 2e532c2..e41d5a4 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java
@@ -70,6 +70,11 @@ public class SegmentGenerationJobSpec implements Serializable {
   private String _outputDirURI;
 
   /**
+   * Segment creation job parallelism.
+   */
+  private int _segmentCreationJobParallelism;
+
+  /**
    * Should overwrite output segments if existed.
    */
   private boolean _overwriteOutput;
@@ -232,6 +237,14 @@ public class SegmentGenerationJobSpec implements Serializable {
   public void setPushJobSpec(PushJobSpec pushJobSpec) {
     _pushJobSpec = pushJobSpec;
   }
+
+  public int getSegmentCreationJobParallelism() {
+    return _segmentCreationJobParallelism;
+  }
+
+  public void setSegmentCreationJobParallelism(int segmentCreationJobParallelism) {
+    _segmentCreationJobParallelism = segmentCreationJobParallelism;
+  }
 }
 
 
diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncherTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncherTest.java
index a00a15f..23110e8 100644
--- a/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncherTest.java
+++ b/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncherTest.java
@@ -60,6 +60,7 @@ public class IngestionJobLauncherTest {
         GroovyTemplateUtils.class.getClassLoader().getResource("job.config").getFile(), context);
     Assert.assertEquals(spec.getInputDirURI(), "file:///path/to/input/2020/06/07");
     Assert.assertEquals(spec.getOutputDirURI(), "file:///path/to/output/2020/06/07");
+    Assert.assertEquals(spec.getSegmentCreationJobParallelism(), 100);
   }
 
   @Test
@@ -70,5 +71,6 @@ public class IngestionJobLauncherTest {
         GroovyTemplateUtils.class.getClassLoader().getResource("job_json.config").getFile(), null);
     Assert.assertEquals(spec.getInputDirURI(), "file:///path/to/input/2020/07/22");
     Assert.assertEquals(spec.getOutputDirURI(), "file:///path/to/output/2020/07/22");
+    Assert.assertEquals(spec.getSegmentCreationJobParallelism(), 0);
   }
 }
diff --git a/pinot-spi/src/test/resources/ingestion_job_spec_template.yaml b/pinot-spi/src/test/resources/ingestion_job_spec_template.yaml
index c20e88a..a786506 100644
--- a/pinot-spi/src/test/resources/ingestion_job_spec_template.yaml
+++ b/pinot-spi/src/test/resources/ingestion_job_spec_template.yaml
@@ -28,6 +28,7 @@ includeFileNamePattern: 'glob:**/*.parquet'
 excludeFileNamePattern: 'glob:**/*.avro'
 outputDirURI: 'file:///path/to/output/${year}/${month}/${day}'
 overwriteOutput: true
+segmentCreationJobParallelism: 100
 pinotFSSpecs:
   - scheme: file
     className: org.apache.pinot.spi.filesystem.LocalPinotFS


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org