You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2020/07/23 04:06:41 UTC

[incubator-pinot] 01/01: Move lambda expression to inner function in pinot-spark

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch pinot_spark_move_lambda_expression_to_inner_function
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 8f63792287253dd6412e2caaf113806563ae1ef3
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Wed Jul 22 21:06:25 2020 -0700

    Move lambda expression to inner function in pinot-spark
---
 .../spark/SparkSegmentGenerationJobRunner.java     | 164 +++++++++++----------
 1 file changed, 85 insertions(+), 79 deletions(-)

diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
index b0aac11..dc28dfb 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
@@ -54,6 +54,7 @@ import org.apache.pinot.spi.utils.DataSizeUtils;
 import org.apache.spark.SparkContext;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.VoidFunction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -209,93 +210,98 @@ public class SparkSegmentGenerationJobRunner implements IngestionJobRunner, Seri
               .get(PLUGINS_INCLUDE_PROPERTY_NAME) : null;
       final URI finalInputDirURI = inputDirURI;
       final URI finalOutputDirURI = (stagingDirURI == null) ? outputDirURI : stagingDirURI;
-      pathRDD.foreach(pathAndIdx -> {
-        for (PinotFSSpec pinotFSSpec : _spec.getPinotFSSpecs()) {
-          PinotFSFactory.register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
-        }
-        PinotFS finalOutputDirFS = PinotFSFactory.create(finalOutputDirURI.getScheme());
-        String[] splits = pathAndIdx.split(" ");
-        String path = splits[0];
-        int idx = Integer.valueOf(splits[1]);
-        // Load Pinot Plugins copied from Distributed cache.
-        File localPluginsTarFile = new File(PINOT_PLUGINS_TAR_GZ);
-        if (localPluginsTarFile.exists()) {
-          File pluginsDirFile = new File(PINOT_PLUGINS_DIR + "-" + idx);
-          try {
-            TarGzCompressionUtils.untar(localPluginsTarFile, pluginsDirFile);
-          } catch (Exception e) {
-            LOGGER.error("Failed to untar local Pinot plugins tarball file [{}]", localPluginsTarFile, e);
-            throw new RuntimeException(e);
+      pathRDD.foreach(new VoidFunction<String>() {
+        @Override
+        public void call(String pathAndIdx)
+            throws Exception {
+          PluginManager.get().init();
+          for (PinotFSSpec pinotFSSpec : _spec.getPinotFSSpecs()) {
+            PinotFSFactory.register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
           }
-          LOGGER.info("Trying to set System Property: [{}={}]", PLUGINS_DIR_PROPERTY_NAME,
-              pluginsDirFile.getAbsolutePath());
-          System.setProperty(PLUGINS_DIR_PROPERTY_NAME, pluginsDirFile.getAbsolutePath());
-          if (pluginsInclude != null) {
-            LOGGER.info("Trying to set System Property: [{}={}]", PLUGINS_INCLUDE_PROPERTY_NAME, pluginsInclude);
-            System.setProperty(PLUGINS_INCLUDE_PROPERTY_NAME, pluginsInclude);
+          PinotFS finalOutputDirFS = PinotFSFactory.create(finalOutputDirURI.getScheme());
+          String[] splits = pathAndIdx.split(" ");
+          String path = splits[0];
+          int idx = Integer.valueOf(splits[1]);
+          // Load Pinot Plugins copied from Distributed cache.
+          File localPluginsTarFile = new File(PINOT_PLUGINS_TAR_GZ);
+          if (localPluginsTarFile.exists()) {
+            File pluginsDirFile = new File(PINOT_PLUGINS_DIR + "-" + idx);
+            try {
+              TarGzCompressionUtils.untar(localPluginsTarFile, pluginsDirFile);
+            } catch (Exception e) {
+              LOGGER.error("Failed to untar local Pinot plugins tarball file [{}]", localPluginsTarFile, e);
+              throw new RuntimeException(e);
+            }
+            LOGGER.info("Trying to set System Property: [{}={}]", PLUGINS_DIR_PROPERTY_NAME,
+                pluginsDirFile.getAbsolutePath());
+            System.setProperty(PLUGINS_DIR_PROPERTY_NAME, pluginsDirFile.getAbsolutePath());
+            if (pluginsInclude != null) {
+              LOGGER.info("Trying to set System Property: [{}={}]", PLUGINS_INCLUDE_PROPERTY_NAME, pluginsInclude);
+              System.setProperty(PLUGINS_INCLUDE_PROPERTY_NAME, pluginsInclude);
+            }
+            LOGGER.info("Pinot plugins System Properties are set at [{}], plugins includes [{}]",
+                System.getProperty(PLUGINS_DIR_PROPERTY_NAME), System.getProperty(PLUGINS_INCLUDE_PROPERTY_NAME));
+          } else {
+            LOGGER.warn("Cannot find local Pinot plugins tar file at [{}]", localPluginsTarFile.getAbsolutePath());
+          }
+          URI inputFileURI = URI.create(path);
+          if (inputFileURI.getScheme() == null) {
+            inputFileURI =
+                new URI(finalInputDirURI.getScheme(), inputFileURI.getSchemeSpecificPart(), inputFileURI.getFragment());
           }
-          LOGGER.info("Pinot plugins System Properties are set at [{}], plugins includes [{}]",
-              System.getProperty(PLUGINS_DIR_PROPERTY_NAME), System.getProperty(PLUGINS_INCLUDE_PROPERTY_NAME));
-        } else {
-          LOGGER.warn("Cannot find local Pinot plugins tar file at [{}]", localPluginsTarFile.getAbsolutePath());
-        }
-        URI inputFileURI = URI.create(path);
-        if (inputFileURI.getScheme() == null) {
-          inputFileURI =
-              new URI(finalInputDirURI.getScheme(), inputFileURI.getSchemeSpecificPart(), inputFileURI.getFragment());
-        }
 
-        //create localTempDir for input and output
-        File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-" + UUID.randomUUID());
-        File localInputTempDir = new File(localTempDir, "input");
-        FileUtils.forceMkdir(localInputTempDir);
-        File localOutputTempDir = new File(localTempDir, "output");
-        FileUtils.forceMkdir(localOutputTempDir);
+          //create localTempDir for input and output
+          File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-" + UUID.randomUUID());
+          File localInputTempDir = new File(localTempDir, "input");
+          FileUtils.forceMkdir(localInputTempDir);
+          File localOutputTempDir = new File(localTempDir, "output");
+          FileUtils.forceMkdir(localOutputTempDir);
 
-        //copy input path to local
-        File localInputDataFile = new File(localInputTempDir, getFileName(inputFileURI));
-        LOGGER.info("Trying to copy input file from {} to {}", inputFileURI, localInputDataFile);
-        PinotFSFactory.create(inputFileURI.getScheme()).copyToLocalFile(inputFileURI, localInputDataFile);
+          //copy input path to local
+          File localInputDataFile = new File(localInputTempDir, getFileName(inputFileURI));
+          LOGGER.info("Trying to copy input file from {} to {}", inputFileURI, localInputDataFile);
+          PinotFSFactory.create(inputFileURI.getScheme()).copyToLocalFile(inputFileURI, localInputDataFile);
 
-        //create task spec
-        SegmentGenerationTaskSpec taskSpec = new SegmentGenerationTaskSpec();
-        taskSpec.setInputFilePath(localInputDataFile.getAbsolutePath());
-        taskSpec.setOutputDirectoryPath(localOutputTempDir.getAbsolutePath());
-        taskSpec.setRecordReaderSpec(_spec.getRecordReaderSpec());
-        taskSpec.setSchema(SegmentGenerationUtils.getSchema(_spec.getTableSpec().getSchemaURI()));
-        taskSpec.setTableConfig(
-            SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI()).toJsonNode());
-        taskSpec.setSequenceId(idx);
-        taskSpec.setSegmentNameGeneratorSpec(_spec.getSegmentNameGeneratorSpec());
+          //create task spec
+          SegmentGenerationTaskSpec taskSpec = new SegmentGenerationTaskSpec();
+          taskSpec.setInputFilePath(localInputDataFile.getAbsolutePath());
+          taskSpec.setOutputDirectoryPath(localOutputTempDir.getAbsolutePath());
+          taskSpec.setRecordReaderSpec(_spec.getRecordReaderSpec());
+          taskSpec.setSchema(SegmentGenerationUtils.getSchema(_spec.getTableSpec().getSchemaURI()));
+          taskSpec.setTableConfig(
+              SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI()).toJsonNode());
+          taskSpec.setSequenceId(idx);
+          taskSpec.setSegmentNameGeneratorSpec(_spec.getSegmentNameGeneratorSpec());
 
-        SegmentGenerationTaskRunner taskRunner = new SegmentGenerationTaskRunner(taskSpec);
-        String segmentName = taskRunner.run();
+          SegmentGenerationTaskRunner taskRunner = new SegmentGenerationTaskRunner(taskSpec);
+          String segmentName = taskRunner.run();
 
-        // Tar segment directory to compress file
-        File localSegmentDir = new File(localOutputTempDir, segmentName);
-        String segmentTarFileName = segmentName + Constants.TAR_GZ_FILE_EXT;
-        File localSegmentTarFile = new File(localOutputTempDir, segmentTarFileName);
-        LOGGER.info("Tarring segment from: {} to: {}", localSegmentDir, localSegmentTarFile);
-        TarGzCompressionUtils.createTarGzFile(localSegmentDir, localSegmentTarFile);
-        long uncompressedSegmentSize = FileUtils.sizeOf(localSegmentDir);
-        long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile);
-        LOGGER.info("Size for segment: {}, uncompressed: {}, compressed: {}", segmentName,
-            DataSizeUtils.fromBytes(uncompressedSegmentSize), DataSizeUtils.fromBytes(compressedSegmentSize));
-        //move segment to output PinotFS
-        URI outputSegmentTarURI =
-            SegmentGenerationUtils.getRelativeOutputPath(finalInputDirURI, inputFileURI, finalOutputDirURI)
-                .resolve(segmentTarFileName);
-        LOGGER.info("Trying to move segment tar file from: [{}] to [{}]", localSegmentTarFile, outputSegmentTarURI);
-        if (!_spec.isOverwriteOutput() && PinotFSFactory.create(outputSegmentTarURI.getScheme())
-            .exists(outputSegmentTarURI)) {
-          LOGGER
-              .warn("Not overwrite existing output segment tar file: {}", finalOutputDirFS.exists(outputSegmentTarURI));
-        } else {
-          finalOutputDirFS.copyFromLocalFile(localSegmentTarFile, outputSegmentTarURI);
+          // Tar segment directory to compress file
+          File localSegmentDir = new File(localOutputTempDir, segmentName);
+          String segmentTarFileName = segmentName + Constants.TAR_GZ_FILE_EXT;
+          File localSegmentTarFile = new File(localOutputTempDir, segmentTarFileName);
+          LOGGER.info("Tarring segment from: {} to: {}", localSegmentDir, localSegmentTarFile);
+          TarGzCompressionUtils.createTarGzFile(localSegmentDir, localSegmentTarFile);
+          long uncompressedSegmentSize = FileUtils.sizeOf(localSegmentDir);
+          long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile);
+          LOGGER.info("Size for segment: {}, uncompressed: {}, compressed: {}", segmentName,
+              DataSizeUtils.fromBytes(uncompressedSegmentSize), DataSizeUtils.fromBytes(compressedSegmentSize));
+          //move segment to output PinotFS
+          URI outputSegmentTarURI =
+              SegmentGenerationUtils.getRelativeOutputPath(finalInputDirURI, inputFileURI, finalOutputDirURI)
+                  .resolve(segmentTarFileName);
+          LOGGER.info("Trying to move segment tar file from: [{}] to [{}]", localSegmentTarFile, outputSegmentTarURI);
+          if (!_spec.isOverwriteOutput() && PinotFSFactory.create(outputSegmentTarURI.getScheme())
+              .exists(outputSegmentTarURI)) {
+            LOGGER
+                .warn("Not overwrite existing output segment tar file: {}", finalOutputDirFS.exists(outputSegmentTarURI));
+          } else {
+            finalOutputDirFS.copyFromLocalFile(localSegmentTarFile, outputSegmentTarURI);
+          }
+          FileUtils.deleteQuietly(localSegmentDir);
+          FileUtils.deleteQuietly(localSegmentTarFile);
+          FileUtils.deleteQuietly(localInputDataFile);
         }
-        FileUtils.deleteQuietly(localSegmentDir);
-        FileUtils.deleteQuietly(localSegmentTarFile);
-        FileUtils.deleteQuietly(localInputDataFile);
       });
       if (stagingDirURI != null) {
         LOGGER.info("Trying to copy segment tars from staging directory: [{}] to output directory [{}]", stagingDirURI,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org