You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2020/09/15 09:20:30 UTC
[incubator-pinot] branch master updated: Adding field
'segmentCreationJobParallelism' to allow users to set segment generation
job parallelism. Default to the number of input files. (#6012)
This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 054faf7 Adding field 'segmentCreationJobParallelism' to allow users to set segment generation job parallelism. Default to the number of input files. (#6012)
054faf7 is described below
commit 054faf76cdef5a625d042a4d435d428529c8d798
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Tue Sep 15 02:20:12 2020 -0700
Adding field 'segmentCreationJobParallelism' to allow users to set segment generation job parallelism. Default to the number of input files. (#6012)
---
.../batch/hadoop/HadoopSegmentGenerationJobRunner.java | 6 +++++-
.../batch/spark/SparkSegmentGenerationJobRunner.java | 12 +++++++++---
.../main/resources/segmentCreationAndTarPushJobSpec.yaml | 1 +
.../spi/ingestion/batch/spec/SegmentGenerationJobSpec.java | 13 +++++++++++++
.../pinot/spi/ingestion/batch/IngestionJobLauncherTest.java | 2 ++
.../src/test/resources/ingestion_job_spec_template.yaml | 1 +
6 files changed, 31 insertions(+), 4 deletions(-)
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
index dee13e5..59beadc 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
@@ -230,7 +230,11 @@ public class HadoopSegmentGenerationJobRunner extends Configured implements Inge
if (hadoopTokenFileLocation != null) {
jobConf.set("mapreduce.job.credentials.binary", hadoopTokenFileLocation);
}
- jobConf.setInt(JobContext.NUM_MAPS, numDataFiles);
+ int jobParallelism = _spec.getSegmentCreationJobParallelism();
+ if (jobParallelism <= 0 || jobParallelism > numDataFiles) {
+ jobParallelism = numDataFiles;
+ }
+ jobConf.setInt(JobContext.NUM_MAPS, jobParallelism);
// Pinot plugins are necessary to launch Pinot ingestion job from every mapper.
// In order to ensure pinot plugins would be loaded to each worker, this method
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
index c1b3f25..cdee5cf 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
@@ -205,7 +205,8 @@ public class SparkSegmentGenerationJobRunner implements IngestionJobRunner, Seri
}
List<String> pathAndIdxList = new ArrayList<>();
- String localDirectorySequenceIdString = _spec.getSegmentNameGeneratorSpec().getConfigs().get(LOCAL_DIRECTORY_SEQUENCE_ID);
+ String localDirectorySequenceIdString =
+ _spec.getSegmentNameGeneratorSpec().getConfigs().get(LOCAL_DIRECTORY_SEQUENCE_ID);
boolean localDirectorySequenceId = false;
if (localDirectorySequenceIdString != null) {
localDirectorySequenceId = Boolean.parseBoolean(localDirectorySequenceIdString);
@@ -219,7 +220,7 @@ public class SparkSegmentGenerationJobRunner implements IngestionJobRunner, Seri
}
localDirIndex.get(filteredParentPath.toString()).add(filteredFile);
}
- for (String parentPath: localDirIndex.keySet()){
+ for (String parentPath : localDirIndex.keySet()) {
List<String> siblingFiles = localDirIndex.get(parentPath);
Collections.sort(siblingFiles);
for (int i = 0; i < siblingFiles.size(); i++) {
@@ -231,7 +232,12 @@ public class SparkSegmentGenerationJobRunner implements IngestionJobRunner, Seri
pathAndIdxList.add(String.format("%s %d", filteredFiles.get(i), i));
}
}
- JavaRDD<String> pathRDD = sparkContext.parallelize(pathAndIdxList, pathAndIdxList.size());
+ int numDataFiles = pathAndIdxList.size();
+ int jobParallelism = _spec.getSegmentCreationJobParallelism();
+ if (jobParallelism <= 0 || jobParallelism > numDataFiles) {
+ jobParallelism = numDataFiles;
+ }
+ JavaRDD<String> pathRDD = sparkContext.parallelize(pathAndIdxList, jobParallelism);
final String pluginsInclude =
(sparkContext.getConf().contains(PLUGINS_INCLUDE_PROPERTY_NAME)) ? sparkContext.getConf()
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/resources/segmentCreationAndTarPushJobSpec.yaml b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/resources/segmentCreationAndTarPushJobSpec.yaml
index 9b4da30..f4dd4b2 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/resources/segmentCreationAndTarPushJobSpec.yaml
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/resources/segmentCreationAndTarPushJobSpec.yaml
@@ -28,6 +28,7 @@ includeFileNamePattern: 'glob:**/*.parquet'
excludeFileNamePattern: 'glob:**/*.avro'
outputDirURI: 'file:///path/to/output'
overwriteOutput: true
+parallelism: 10000
pinotFSSpecs:
- scheme: file
className: org.apache.pinot.spi.filesystem.LocalPinotFS
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java
index 2e532c2..e41d5a4 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java
@@ -70,6 +70,11 @@ public class SegmentGenerationJobSpec implements Serializable {
private String _outputDirURI;
/**
+ * Segment creation job parallelism.
+ */
+ private int _segmentCreationJobParallelism;
+
+ /**
* Should overwrite output segments if existed.
*/
private boolean _overwriteOutput;
@@ -232,6 +237,14 @@ public class SegmentGenerationJobSpec implements Serializable {
public void setPushJobSpec(PushJobSpec pushJobSpec) {
_pushJobSpec = pushJobSpec;
}
+
+ public int getSegmentCreationJobParallelism() {
+ return _segmentCreationJobParallelism;
+ }
+
+ public void setSegmentCreationJobParallelism(int segmentCreationJobParallelism) {
+ _segmentCreationJobParallelism = segmentCreationJobParallelism;
+ }
}
diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncherTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncherTest.java
index a00a15f..23110e8 100644
--- a/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncherTest.java
+++ b/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncherTest.java
@@ -60,6 +60,7 @@ public class IngestionJobLauncherTest {
GroovyTemplateUtils.class.getClassLoader().getResource("job.config").getFile(), context);
Assert.assertEquals(spec.getInputDirURI(), "file:///path/to/input/2020/06/07");
Assert.assertEquals(spec.getOutputDirURI(), "file:///path/to/output/2020/06/07");
+ Assert.assertEquals(spec.getSegmentCreationJobParallelism(), 100);
}
@Test
@@ -70,5 +71,6 @@ public class IngestionJobLauncherTest {
GroovyTemplateUtils.class.getClassLoader().getResource("job_json.config").getFile(), null);
Assert.assertEquals(spec.getInputDirURI(), "file:///path/to/input/2020/07/22");
Assert.assertEquals(spec.getOutputDirURI(), "file:///path/to/output/2020/07/22");
+ Assert.assertEquals(spec.getSegmentCreationJobParallelism(), 0);
}
}
diff --git a/pinot-spi/src/test/resources/ingestion_job_spec_template.yaml b/pinot-spi/src/test/resources/ingestion_job_spec_template.yaml
index c20e88a..a786506 100644
--- a/pinot-spi/src/test/resources/ingestion_job_spec_template.yaml
+++ b/pinot-spi/src/test/resources/ingestion_job_spec_template.yaml
@@ -28,6 +28,7 @@ includeFileNamePattern: 'glob:**/*.parquet'
excludeFileNamePattern: 'glob:**/*.avro'
outputDirURI: 'file:///path/to/output/${year}/${month}/${day}'
overwriteOutput: true
+segmentCreationJobParallelism: 100
pinotFSSpecs:
- scheme: file
className: org.apache.pinot.spi.filesystem.LocalPinotFS
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org