You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2020/07/23 04:06:41 UTC
[incubator-pinot] 01/01: Move lambda expression to inner function
in pinot-spark
This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch pinot_spark_move_lambda_expression_to_inner_function
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 8f63792287253dd6412e2caaf113806563ae1ef3
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Wed Jul 22 21:06:25 2020 -0700
Move lambda expression to inner function in pinot-spark
---
.../spark/SparkSegmentGenerationJobRunner.java | 164 +++++++++++----------
1 file changed, 85 insertions(+), 79 deletions(-)
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
index b0aac11..dc28dfb 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
@@ -54,6 +54,7 @@ import org.apache.pinot.spi.utils.DataSizeUtils;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.VoidFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -209,93 +210,98 @@ public class SparkSegmentGenerationJobRunner implements IngestionJobRunner, Seri
.get(PLUGINS_INCLUDE_PROPERTY_NAME) : null;
final URI finalInputDirURI = inputDirURI;
final URI finalOutputDirURI = (stagingDirURI == null) ? outputDirURI : stagingDirURI;
- pathRDD.foreach(pathAndIdx -> {
- for (PinotFSSpec pinotFSSpec : _spec.getPinotFSSpecs()) {
- PinotFSFactory.register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
- }
- PinotFS finalOutputDirFS = PinotFSFactory.create(finalOutputDirURI.getScheme());
- String[] splits = pathAndIdx.split(" ");
- String path = splits[0];
- int idx = Integer.valueOf(splits[1]);
- // Load Pinot Plugins copied from Distributed cache.
- File localPluginsTarFile = new File(PINOT_PLUGINS_TAR_GZ);
- if (localPluginsTarFile.exists()) {
- File pluginsDirFile = new File(PINOT_PLUGINS_DIR + "-" + idx);
- try {
- TarGzCompressionUtils.untar(localPluginsTarFile, pluginsDirFile);
- } catch (Exception e) {
- LOGGER.error("Failed to untar local Pinot plugins tarball file [{}]", localPluginsTarFile, e);
- throw new RuntimeException(e);
+ pathRDD.foreach(new VoidFunction<String>() {
+ @Override
+ public void call(String pathAndIdx)
+ throws Exception {
+ PluginManager.get().init();
+ for (PinotFSSpec pinotFSSpec : _spec.getPinotFSSpecs()) {
+ PinotFSFactory.register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
}
- LOGGER.info("Trying to set System Property: [{}={}]", PLUGINS_DIR_PROPERTY_NAME,
- pluginsDirFile.getAbsolutePath());
- System.setProperty(PLUGINS_DIR_PROPERTY_NAME, pluginsDirFile.getAbsolutePath());
- if (pluginsInclude != null) {
- LOGGER.info("Trying to set System Property: [{}={}]", PLUGINS_INCLUDE_PROPERTY_NAME, pluginsInclude);
- System.setProperty(PLUGINS_INCLUDE_PROPERTY_NAME, pluginsInclude);
+ PinotFS finalOutputDirFS = PinotFSFactory.create(finalOutputDirURI.getScheme());
+ String[] splits = pathAndIdx.split(" ");
+ String path = splits[0];
+ int idx = Integer.valueOf(splits[1]);
+ // Load Pinot Plugins copied from Distributed cache.
+ File localPluginsTarFile = new File(PINOT_PLUGINS_TAR_GZ);
+ if (localPluginsTarFile.exists()) {
+ File pluginsDirFile = new File(PINOT_PLUGINS_DIR + "-" + idx);
+ try {
+ TarGzCompressionUtils.untar(localPluginsTarFile, pluginsDirFile);
+ } catch (Exception e) {
+ LOGGER.error("Failed to untar local Pinot plugins tarball file [{}]", localPluginsTarFile, e);
+ throw new RuntimeException(e);
+ }
+ LOGGER.info("Trying to set System Property: [{}={}]", PLUGINS_DIR_PROPERTY_NAME,
+ pluginsDirFile.getAbsolutePath());
+ System.setProperty(PLUGINS_DIR_PROPERTY_NAME, pluginsDirFile.getAbsolutePath());
+ if (pluginsInclude != null) {
+ LOGGER.info("Trying to set System Property: [{}={}]", PLUGINS_INCLUDE_PROPERTY_NAME, pluginsInclude);
+ System.setProperty(PLUGINS_INCLUDE_PROPERTY_NAME, pluginsInclude);
+ }
+ LOGGER.info("Pinot plugins System Properties are set at [{}], plugins includes [{}]",
+ System.getProperty(PLUGINS_DIR_PROPERTY_NAME), System.getProperty(PLUGINS_INCLUDE_PROPERTY_NAME));
+ } else {
+ LOGGER.warn("Cannot find local Pinot plugins tar file at [{}]", localPluginsTarFile.getAbsolutePath());
+ }
+ URI inputFileURI = URI.create(path);
+ if (inputFileURI.getScheme() == null) {
+ inputFileURI =
+ new URI(finalInputDirURI.getScheme(), inputFileURI.getSchemeSpecificPart(), inputFileURI.getFragment());
}
- LOGGER.info("Pinot plugins System Properties are set at [{}], plugins includes [{}]",
- System.getProperty(PLUGINS_DIR_PROPERTY_NAME), System.getProperty(PLUGINS_INCLUDE_PROPERTY_NAME));
- } else {
- LOGGER.warn("Cannot find local Pinot plugins tar file at [{}]", localPluginsTarFile.getAbsolutePath());
- }
- URI inputFileURI = URI.create(path);
- if (inputFileURI.getScheme() == null) {
- inputFileURI =
- new URI(finalInputDirURI.getScheme(), inputFileURI.getSchemeSpecificPart(), inputFileURI.getFragment());
- }
- //create localTempDir for input and output
- File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-" + UUID.randomUUID());
- File localInputTempDir = new File(localTempDir, "input");
- FileUtils.forceMkdir(localInputTempDir);
- File localOutputTempDir = new File(localTempDir, "output");
- FileUtils.forceMkdir(localOutputTempDir);
+ //create localTempDir for input and output
+ File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-" + UUID.randomUUID());
+ File localInputTempDir = new File(localTempDir, "input");
+ FileUtils.forceMkdir(localInputTempDir);
+ File localOutputTempDir = new File(localTempDir, "output");
+ FileUtils.forceMkdir(localOutputTempDir);
- //copy input path to local
- File localInputDataFile = new File(localInputTempDir, getFileName(inputFileURI));
- LOGGER.info("Trying to copy input file from {} to {}", inputFileURI, localInputDataFile);
- PinotFSFactory.create(inputFileURI.getScheme()).copyToLocalFile(inputFileURI, localInputDataFile);
+ //copy input path to local
+ File localInputDataFile = new File(localInputTempDir, getFileName(inputFileURI));
+ LOGGER.info("Trying to copy input file from {} to {}", inputFileURI, localInputDataFile);
+ PinotFSFactory.create(inputFileURI.getScheme()).copyToLocalFile(inputFileURI, localInputDataFile);
- //create task spec
- SegmentGenerationTaskSpec taskSpec = new SegmentGenerationTaskSpec();
- taskSpec.setInputFilePath(localInputDataFile.getAbsolutePath());
- taskSpec.setOutputDirectoryPath(localOutputTempDir.getAbsolutePath());
- taskSpec.setRecordReaderSpec(_spec.getRecordReaderSpec());
- taskSpec.setSchema(SegmentGenerationUtils.getSchema(_spec.getTableSpec().getSchemaURI()));
- taskSpec.setTableConfig(
- SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI()).toJsonNode());
- taskSpec.setSequenceId(idx);
- taskSpec.setSegmentNameGeneratorSpec(_spec.getSegmentNameGeneratorSpec());
+ //create task spec
+ SegmentGenerationTaskSpec taskSpec = new SegmentGenerationTaskSpec();
+ taskSpec.setInputFilePath(localInputDataFile.getAbsolutePath());
+ taskSpec.setOutputDirectoryPath(localOutputTempDir.getAbsolutePath());
+ taskSpec.setRecordReaderSpec(_spec.getRecordReaderSpec());
+ taskSpec.setSchema(SegmentGenerationUtils.getSchema(_spec.getTableSpec().getSchemaURI()));
+ taskSpec.setTableConfig(
+ SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI()).toJsonNode());
+ taskSpec.setSequenceId(idx);
+ taskSpec.setSegmentNameGeneratorSpec(_spec.getSegmentNameGeneratorSpec());
- SegmentGenerationTaskRunner taskRunner = new SegmentGenerationTaskRunner(taskSpec);
- String segmentName = taskRunner.run();
+ SegmentGenerationTaskRunner taskRunner = new SegmentGenerationTaskRunner(taskSpec);
+ String segmentName = taskRunner.run();
- // Tar segment directory to compress file
- File localSegmentDir = new File(localOutputTempDir, segmentName);
- String segmentTarFileName = segmentName + Constants.TAR_GZ_FILE_EXT;
- File localSegmentTarFile = new File(localOutputTempDir, segmentTarFileName);
- LOGGER.info("Tarring segment from: {} to: {}", localSegmentDir, localSegmentTarFile);
- TarGzCompressionUtils.createTarGzFile(localSegmentDir, localSegmentTarFile);
- long uncompressedSegmentSize = FileUtils.sizeOf(localSegmentDir);
- long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile);
- LOGGER.info("Size for segment: {}, uncompressed: {}, compressed: {}", segmentName,
- DataSizeUtils.fromBytes(uncompressedSegmentSize), DataSizeUtils.fromBytes(compressedSegmentSize));
- //move segment to output PinotFS
- URI outputSegmentTarURI =
- SegmentGenerationUtils.getRelativeOutputPath(finalInputDirURI, inputFileURI, finalOutputDirURI)
- .resolve(segmentTarFileName);
- LOGGER.info("Trying to move segment tar file from: [{}] to [{}]", localSegmentTarFile, outputSegmentTarURI);
- if (!_spec.isOverwriteOutput() && PinotFSFactory.create(outputSegmentTarURI.getScheme())
- .exists(outputSegmentTarURI)) {
- LOGGER
- .warn("Not overwrite existing output segment tar file: {}", finalOutputDirFS.exists(outputSegmentTarURI));
- } else {
- finalOutputDirFS.copyFromLocalFile(localSegmentTarFile, outputSegmentTarURI);
+ // Tar segment directory to compress file
+ File localSegmentDir = new File(localOutputTempDir, segmentName);
+ String segmentTarFileName = segmentName + Constants.TAR_GZ_FILE_EXT;
+ File localSegmentTarFile = new File(localOutputTempDir, segmentTarFileName);
+ LOGGER.info("Tarring segment from: {} to: {}", localSegmentDir, localSegmentTarFile);
+ TarGzCompressionUtils.createTarGzFile(localSegmentDir, localSegmentTarFile);
+ long uncompressedSegmentSize = FileUtils.sizeOf(localSegmentDir);
+ long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile);
+ LOGGER.info("Size for segment: {}, uncompressed: {}, compressed: {}", segmentName,
+ DataSizeUtils.fromBytes(uncompressedSegmentSize), DataSizeUtils.fromBytes(compressedSegmentSize));
+ //move segment to output PinotFS
+ URI outputSegmentTarURI =
+ SegmentGenerationUtils.getRelativeOutputPath(finalInputDirURI, inputFileURI, finalOutputDirURI)
+ .resolve(segmentTarFileName);
+ LOGGER.info("Trying to move segment tar file from: [{}] to [{}]", localSegmentTarFile, outputSegmentTarURI);
+ if (!_spec.isOverwriteOutput() && PinotFSFactory.create(outputSegmentTarURI.getScheme())
+ .exists(outputSegmentTarURI)) {
+ LOGGER
+ .warn("Not overwrite existing output segment tar file: {}", finalOutputDirFS.exists(outputSegmentTarURI));
+ } else {
+ finalOutputDirFS.copyFromLocalFile(localSegmentTarFile, outputSegmentTarURI);
+ }
+ FileUtils.deleteQuietly(localSegmentDir);
+ FileUtils.deleteQuietly(localSegmentTarFile);
+ FileUtils.deleteQuietly(localInputDataFile);
}
- FileUtils.deleteQuietly(localSegmentDir);
- FileUtils.deleteQuietly(localSegmentTarFile);
- FileUtils.deleteQuietly(localInputDataFile);
});
if (stagingDirURI != null) {
LOGGER.info("Trying to copy segment tars from staging directory: [{}] to output directory [{}]", stagingDirURI,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org