You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2020/10/31 03:25:49 UTC
[incubator-pinot] branch master updated: Support running pinot
batch ingestion standalone job in a parallel mode (#6214)
This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new cd5f812 Support running pinot batch ingestion standalone job in a parallel mode (#6214)
cd5f812 is described below
commit cd5f8129e126108a1bdb49c9ebf660167e62b5af
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Fri Oct 30 20:25:27 2020 -0700
Support running pinot batch ingestion standalone job in a parallel mode (#6214)
---
.../spark/SparkSegmentGenerationJobRunner.java | 16 +--
.../ingestion/batch/standalone/JobUtils.java | 30 ++++++
.../standalone/SegmentGenerationJobRunner.java | 120 +++++++++++++--------
.../batch/airlineStats/ingestionJobSpec.yaml | 3 +
4 files changed, 119 insertions(+), 50 deletions(-)
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
index cdee5cf..9b36a91 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
@@ -53,6 +53,7 @@ import org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec;
import org.apache.pinot.spi.ingestion.batch.spec.PinotFSSpec;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationTaskSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentNameGeneratorSpec;
import org.apache.pinot.spi.plugin.PluginManager;
import org.apache.pinot.spi.utils.DataSizeUtils;
import org.apache.spark.SparkContext;
@@ -205,13 +206,7 @@ public class SparkSegmentGenerationJobRunner implements IngestionJobRunner, Seri
}
List<String> pathAndIdxList = new ArrayList<>();
- String localDirectorySequenceIdString =
- _spec.getSegmentNameGeneratorSpec().getConfigs().get(LOCAL_DIRECTORY_SEQUENCE_ID);
- boolean localDirectorySequenceId = false;
- if (localDirectorySequenceIdString != null) {
- localDirectorySequenceId = Boolean.parseBoolean(localDirectorySequenceIdString);
- }
- if (localDirectorySequenceId) {
+ if (getLocalDirectorySequenceId(_spec.getSegmentNameGeneratorSpec())) {
Map<String, List<String>> localDirIndex = new HashMap<>();
for (String filteredFile : filteredFiles) {
Path filteredParentPath = Paths.get(filteredFile).getParent();
@@ -352,6 +347,13 @@ public class SparkSegmentGenerationJobRunner implements IngestionJobRunner, Seri
}
}
+ private static boolean getLocalDirectorySequenceId(SegmentNameGeneratorSpec spec) {
+ if (spec == null || spec.getConfigs() == null) {
+ return false;
+ }
+ return Boolean.parseBoolean(spec.getConfigs().get(LOCAL_DIRECTORY_SEQUENCE_ID));
+ }
+
protected void addDepsJarToDistributedCache(JavaSparkContext sparkContext, String depsJarDir)
throws IOException {
if (depsJarDir != null) {
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/JobUtils.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/JobUtils.java
new file mode 100644
index 0000000..506e38b
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/JobUtils.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.ingestion.batch.standalone;
+
+public class JobUtils {
+
+ public static int getNumThreads(int jobParallelism) {
+ int numCores = Math.max(Runtime.getRuntime().availableProcessors(), 1);
+ if (jobParallelism > 0) {
+ return Math.min(numCores, jobParallelism);
+ }
+ return 1;
+ }
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
index 661134f..f60a7e3 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
@@ -20,6 +20,7 @@ package org.apache.pinot.plugin.ingestion.batch.standalone;
import java.io.File;
import java.net.URI;
+import java.net.URISyntaxException;
import java.nio.file.FileSystems;
import java.nio.file.PathMatcher;
import java.nio.file.Paths;
@@ -27,6 +28,9 @@ import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner;
@@ -52,6 +56,7 @@ public class SegmentGenerationJobRunner implements IngestionJobRunner {
private static final Logger LOGGER = LoggerFactory.getLogger(SegmentGenerationJobRunner.class);
private SegmentGenerationJobSpec _spec;
+ private ExecutorService _executorService;
public SegmentGenerationJobRunner() {
}
@@ -83,8 +88,8 @@ public class SegmentGenerationJobRunner implements IngestionJobRunner {
throw new RuntimeException("Missing property 'schemaURI' in 'tableSpec'");
}
PinotClusterSpec pinotClusterSpec = _spec.getPinotClusterSpecs()[0];
- String schemaURI = SegmentGenerationUtils.generateSchemaURI(pinotClusterSpec.getControllerURI(),
- _spec.getTableSpec().getTableName());
+ String schemaURI = SegmentGenerationUtils
+ .generateSchemaURI(pinotClusterSpec.getControllerURI(), _spec.getTableSpec().getTableName());
_spec.getTableSpec().setSchemaURI(schemaURI);
}
if (_spec.getTableSpec().getTableConfigURI() == null) {
@@ -96,6 +101,11 @@ public class SegmentGenerationJobRunner implements IngestionJobRunner {
.generateTableConfigURI(pinotClusterSpec.getControllerURI(), _spec.getTableSpec().getTableName());
_spec.getTableSpec().setTableConfigURI(tableConfigURI);
}
+ final int jobParallelism = _spec.getSegmentCreationJobParallelism();
+ int numThreads = JobUtils.getNumThreads(jobParallelism);
+ LOGGER.info("Creating an executor service with {} threads(Job parallelism: {}, available cores: {}.)", numThreads,
+ jobParallelism, Runtime.getRuntime().availableProcessors());
+ _executorService = Executors.newFixedThreadPool(numThreads);
}
@Override
@@ -108,17 +118,11 @@ public class SegmentGenerationJobRunner implements IngestionJobRunner {
}
//Get pinotFS for input
- URI inputDirURI = new URI(_spec.getInputDirURI());
- if (inputDirURI.getScheme() == null) {
- inputDirURI = new File(_spec.getInputDirURI()).toURI();
- }
+ final URI inputDirURI = getDirectoryUri(_spec.getInputDirURI());
PinotFS inputDirFS = PinotFSFactory.create(inputDirURI.getScheme());
//Get outputFS for writing output pinot segments
- URI outputDirURI = new URI(_spec.getOutputDirURI());
- if (outputDirURI.getScheme() == null) {
- outputDirURI = new File(_spec.getOutputDirURI()).toURI();
- }
+ final URI outputDirURI = getDirectoryUri(_spec.getOutputDirURI());
PinotFS outputDirFS = PinotFSFactory.create(outputDirURI.getScheme());
outputDirFS.mkdir(outputDirURI);
@@ -151,7 +155,6 @@ public class SegmentGenerationJobRunner implements IngestionJobRunner {
filteredFiles.add(file);
}
}
-
File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-" + UUID.randomUUID());
try {
//create localTempDir for input and output
@@ -164,13 +167,12 @@ public class SegmentGenerationJobRunner implements IngestionJobRunner {
Schema schema = SegmentGenerationUtils.getSchema(_spec.getTableSpec().getSchemaURI());
TableConfig tableConfig = SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI());
+ int numInputFiles = filteredFiles.size();
+ CountDownLatch segmentCreationTaskCountDownLatch = new CountDownLatch(numInputFiles);
//iterate on the file list, for each
- for (int i = 0; i < filteredFiles.size(); i++) {
- URI inputFileURI = URI.create(filteredFiles.get(i));
- if (inputFileURI.getScheme() == null) {
- inputFileURI =
- new URI(inputDirURI.getScheme(), inputFileURI.getSchemeSpecificPart(), inputFileURI.getFragment());
- }
+ for (int i = 0; i < numInputFiles; i++) {
+ final URI inputFileURI = getFileURI(filteredFiles.get(i), inputDirURI.getScheme());
+
//copy input path to local
File localInputDataFile = new File(localInputTempDir, new File(inputFileURI.getPath()).getName());
inputDirFS.copyToLocalFile(inputFileURI, localInputDataFile);
@@ -185,35 +187,67 @@ public class SegmentGenerationJobRunner implements IngestionJobRunner {
taskSpec.setSequenceId(i);
taskSpec.setSegmentNameGeneratorSpec(_spec.getSegmentNameGeneratorSpec());
- //invoke segmentGenerationTask
- SegmentGenerationTaskRunner taskRunner = new SegmentGenerationTaskRunner(taskSpec);
- String segmentName = taskRunner.run();
-
- // Tar segment directory to compress file
- File localSegmentDir = new File(localOutputTempDir, segmentName);
- String segmentTarFileName = segmentName + Constants.TAR_GZ_FILE_EXT;
- File localSegmentTarFile = new File(localOutputTempDir, segmentTarFileName);
- LOGGER.info("Tarring segment from: {} to: {}", localSegmentDir, localSegmentTarFile);
- TarGzCompressionUtils.createTarGzFile(localSegmentDir, localSegmentTarFile);
- long uncompressedSegmentSize = FileUtils.sizeOf(localSegmentDir);
- long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile);
- LOGGER.info("Size for segment: {}, uncompressed: {}, compressed: {}", segmentName,
- DataSizeUtils.fromBytes(uncompressedSegmentSize), DataSizeUtils.fromBytes(compressedSegmentSize));
- //move segment to output PinotFS
- URI outputSegmentTarURI = SegmentGenerationUtils.getRelativeOutputPath(inputDirURI, inputFileURI, outputDirURI)
- .resolve(segmentTarFileName);
- if (!_spec.isOverwriteOutput() && outputDirFS.exists(outputSegmentTarURI)) {
- LOGGER.warn("Not overwrite existing output segment tar file: {}", outputDirFS.exists(outputSegmentTarURI));
- } else {
- outputDirFS.copyFromLocalFile(localSegmentTarFile, outputSegmentTarURI);
- }
- FileUtils.deleteQuietly(localSegmentDir);
- FileUtils.deleteQuietly(localSegmentTarFile);
- FileUtils.deleteQuietly(localInputDataFile);
+ LOGGER.info("Submitting one Segment Generation Task for {}", inputFileURI);
+ _executorService.submit(() -> {
+ File localSegmentDir = null;
+ File localSegmentTarFile = null;
+ try {
+ //invoke segmentGenerationTask
+ SegmentGenerationTaskRunner taskRunner = new SegmentGenerationTaskRunner(taskSpec);
+ String segmentName = taskRunner.run();
+ // Tar segment directory to compress file
+ localSegmentDir = new File(localOutputTempDir, segmentName);
+ String segmentTarFileName = segmentName + Constants.TAR_GZ_FILE_EXT;
+ localSegmentTarFile = new File(localOutputTempDir, segmentTarFileName);
+ LOGGER.info("Tarring segment from: {} to: {}", localSegmentDir, localSegmentTarFile);
+ TarGzCompressionUtils.createTarGzFile(localSegmentDir, localSegmentTarFile);
+ long uncompressedSegmentSize = FileUtils.sizeOf(localSegmentDir);
+ long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile);
+ LOGGER.info("Size for segment: {}, uncompressed: {}, compressed: {}", segmentName,
+ DataSizeUtils.fromBytes(uncompressedSegmentSize), DataSizeUtils.fromBytes(compressedSegmentSize));
+ //move segment to output PinotFS
+ URI outputSegmentTarURI =
+ SegmentGenerationUtils.getRelativeOutputPath(inputDirURI, inputFileURI, outputDirURI)
+ .resolve(segmentTarFileName);
+ if (!_spec.isOverwriteOutput() && outputDirFS.exists(outputSegmentTarURI)) {
+ LOGGER
+ .warn("Not overwrite existing output segment tar file: {}", outputDirFS.exists(outputSegmentTarURI));
+ } else {
+ outputDirFS.copyFromLocalFile(localSegmentTarFile, outputSegmentTarURI);
+ }
+ } catch (Exception e) {
+ LOGGER.error("Failed to generate Pinot segment for file - {}", inputFileURI, e);
+ } finally {
+ segmentCreationTaskCountDownLatch.countDown();
+ FileUtils.deleteQuietly(localSegmentDir);
+ FileUtils.deleteQuietly(localSegmentTarFile);
+ FileUtils.deleteQuietly(localInputDataFile);
+ }
+ });
}
+ segmentCreationTaskCountDownLatch.await();
} finally {
//clean up
- FileUtils.deleteDirectory(localTempDir);
+ FileUtils.deleteQuietly(localTempDir);
+ _executorService.shutdown();
+ }
+ }
+
+ private URI getDirectoryUri(String uriStr)
+ throws URISyntaxException {
+ URI uri = new URI(uriStr);
+ if (uri.getScheme() == null) {
+ uri = new File(uriStr).toURI();
+ }
+ return uri;
+ }
+
+ private URI getFileURI(String uriStr, String fallbackScheme)
+ throws URISyntaxException {
+ URI fileURI = URI.create(uriStr);
+ if (fileURI.getScheme() == null) {
+ return new URI(fallbackScheme, fileURI.getSchemeSpecificPart(), fileURI.getFragment());
}
+ return fileURI;
}
}
diff --git a/pinot-tools/src/main/resources/examples/batch/airlineStats/ingestionJobSpec.yaml b/pinot-tools/src/main/resources/examples/batch/airlineStats/ingestionJobSpec.yaml
index ef5413c..0ec698c 100644
--- a/pinot-tools/src/main/resources/examples/batch/airlineStats/ingestionJobSpec.yaml
+++ b/pinot-tools/src/main/resources/examples/batch/airlineStats/ingestionJobSpec.yaml
@@ -59,6 +59,9 @@ includeFileNamePattern: 'glob:**/*.avro'
# outputDirURI: Root directory of output segments, expected to have scheme configured in PinotFS.
outputDirURI: 'examples/batch/airlineStats/segments'
+# segmentCreationJobParallelism: Parallelism to build Pinot segments.
+segmentCreationJobParallelism: 4
+
# overwriteOutput: Overwrite output segments if existed.
overwriteOutput: true
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org