You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2020/07/23 09:34:24 UTC

[GitHub] [incubator-pinot] fx19880617 opened a new pull request #5741: Move lambda expression to inner function in pinot-spark

fx19880617 opened a new pull request #5741:
URL: https://github.com/apache/incubator-pinot/pull/5741


   ## Description
   We observed user reporting issue that Spark cannot serialize lambda expressions. So need to explicitly use inner function for this.
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] fx19880617 commented on a change in pull request #5741: Move lambda expression to inner function in pinot-spark

Posted by GitBox <gi...@apache.org>.
fx19880617 commented on a change in pull request #5741:
URL: https://github.com/apache/incubator-pinot/pull/5741#discussion_r459671619



##########
File path: pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
##########
@@ -209,93 +210,99 @@ public void run()
               .get(PLUGINS_INCLUDE_PROPERTY_NAME) : null;
       final URI finalInputDirURI = inputDirURI;
       final URI finalOutputDirURI = (stagingDirURI == null) ? outputDirURI : stagingDirURI;
-      pathRDD.foreach(pathAndIdx -> {
-        for (PinotFSSpec pinotFSSpec : _spec.getPinotFSSpecs()) {
-          PinotFSFactory.register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
-        }
-        PinotFS finalOutputDirFS = PinotFSFactory.create(finalOutputDirURI.getScheme());
-        String[] splits = pathAndIdx.split(" ");
-        String path = splits[0];
-        int idx = Integer.valueOf(splits[1]);
-        // Load Pinot Plugins copied from Distributed cache.
-        File localPluginsTarFile = new File(PINOT_PLUGINS_TAR_GZ);
-        if (localPluginsTarFile.exists()) {
-          File pluginsDirFile = new File(PINOT_PLUGINS_DIR + "-" + idx);
-          try {
-            TarGzCompressionUtils.untar(localPluginsTarFile, pluginsDirFile);
-          } catch (Exception e) {
-            LOGGER.error("Failed to untar local Pinot plugins tarball file [{}]", localPluginsTarFile, e);
-            throw new RuntimeException(e);
+      // Prevent using lambda expression in Spark to avoid potential serialization exceptions, use inner function instead.
+      pathRDD.foreach(new VoidFunction<String>() {
+        @Override
+        public void call(String pathAndIdx)
+            throws Exception {
+          PluginManager.get().init();
+          for (PinotFSSpec pinotFSSpec : _spec.getPinotFSSpecs()) {
+            PinotFSFactory.register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
           }
-          LOGGER.info("Trying to set System Property: [{}={}]", PLUGINS_DIR_PROPERTY_NAME,
-              pluginsDirFile.getAbsolutePath());
-          System.setProperty(PLUGINS_DIR_PROPERTY_NAME, pluginsDirFile.getAbsolutePath());
-          if (pluginsInclude != null) {
-            LOGGER.info("Trying to set System Property: [{}={}]", PLUGINS_INCLUDE_PROPERTY_NAME, pluginsInclude);
-            System.setProperty(PLUGINS_INCLUDE_PROPERTY_NAME, pluginsInclude);
+          PinotFS finalOutputDirFS = PinotFSFactory.create(finalOutputDirURI.getScheme());
+          String[] splits = pathAndIdx.split(" ");
+          String path = splits[0];
+          int idx = Integer.valueOf(splits[1]);
+          // Load Pinot Plugins copied from Distributed cache.
+          File localPluginsTarFile = new File(PINOT_PLUGINS_TAR_GZ);
+          if (localPluginsTarFile.exists()) {
+            File pluginsDirFile = new File(PINOT_PLUGINS_DIR + "-" + idx);
+            try {
+              TarGzCompressionUtils.untar(localPluginsTarFile, pluginsDirFile);
+            } catch (Exception e) {
+              LOGGER.error("Failed to untar local Pinot plugins tarball file [{}]", localPluginsTarFile, e);
+              throw new RuntimeException(e);
+            }
+            LOGGER.info("Trying to set System Property: [{}={}]", PLUGINS_DIR_PROPERTY_NAME,
+                pluginsDirFile.getAbsolutePath());
+            System.setProperty(PLUGINS_DIR_PROPERTY_NAME, pluginsDirFile.getAbsolutePath());
+            if (pluginsInclude != null) {
+              LOGGER.info("Trying to set System Property: [{}={}]", PLUGINS_INCLUDE_PROPERTY_NAME, pluginsInclude);
+              System.setProperty(PLUGINS_INCLUDE_PROPERTY_NAME, pluginsInclude);
+            }
+            LOGGER.info("Pinot plugins System Properties are set at [{}], plugins includes [{}]",
+                System.getProperty(PLUGINS_DIR_PROPERTY_NAME), System.getProperty(PLUGINS_INCLUDE_PROPERTY_NAME));
+          } else {
+            LOGGER.warn("Cannot find local Pinot plugins tar file at [{}]", localPluginsTarFile.getAbsolutePath());
+          }
+          URI inputFileURI = URI.create(path);
+          if (inputFileURI.getScheme() == null) {
+            inputFileURI =
+                new URI(finalInputDirURI.getScheme(), inputFileURI.getSchemeSpecificPart(), inputFileURI.getFragment());
           }
-          LOGGER.info("Pinot plugins System Properties are set at [{}], plugins includes [{}]",
-              System.getProperty(PLUGINS_DIR_PROPERTY_NAME), System.getProperty(PLUGINS_INCLUDE_PROPERTY_NAME));
-        } else {
-          LOGGER.warn("Cannot find local Pinot plugins tar file at [{}]", localPluginsTarFile.getAbsolutePath());
-        }
-        URI inputFileURI = URI.create(path);
-        if (inputFileURI.getScheme() == null) {
-          inputFileURI =
-              new URI(finalInputDirURI.getScheme(), inputFileURI.getSchemeSpecificPart(), inputFileURI.getFragment());
-        }
 
-        //create localTempDir for input and output
-        File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-" + UUID.randomUUID());
-        File localInputTempDir = new File(localTempDir, "input");
-        FileUtils.forceMkdir(localInputTempDir);
-        File localOutputTempDir = new File(localTempDir, "output");
-        FileUtils.forceMkdir(localOutputTempDir);
+          //create localTempDir for input and output

Review comment:
       updated!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] mayankshriv commented on a change in pull request #5741: Move lambda expression to inner function in pinot-spark

Posted by GitBox <gi...@apache.org>.
mayankshriv commented on a change in pull request #5741:
URL: https://github.com/apache/incubator-pinot/pull/5741#discussion_r459467397



##########
File path: pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
##########
@@ -209,93 +210,99 @@ public void run()
               .get(PLUGINS_INCLUDE_PROPERTY_NAME) : null;
       final URI finalInputDirURI = inputDirURI;
       final URI finalOutputDirURI = (stagingDirURI == null) ? outputDirURI : stagingDirURI;
-      pathRDD.foreach(pathAndIdx -> {
-        for (PinotFSSpec pinotFSSpec : _spec.getPinotFSSpecs()) {
-          PinotFSFactory.register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
-        }
-        PinotFS finalOutputDirFS = PinotFSFactory.create(finalOutputDirURI.getScheme());
-        String[] splits = pathAndIdx.split(" ");
-        String path = splits[0];
-        int idx = Integer.valueOf(splits[1]);
-        // Load Pinot Plugins copied from Distributed cache.
-        File localPluginsTarFile = new File(PINOT_PLUGINS_TAR_GZ);
-        if (localPluginsTarFile.exists()) {
-          File pluginsDirFile = new File(PINOT_PLUGINS_DIR + "-" + idx);
-          try {
-            TarGzCompressionUtils.untar(localPluginsTarFile, pluginsDirFile);
-          } catch (Exception e) {
-            LOGGER.error("Failed to untar local Pinot plugins tarball file [{}]", localPluginsTarFile, e);
-            throw new RuntimeException(e);
+      // Prevent using lambda expression in Spark to avoid potential serialization exceptions, use inner function instead.
+      pathRDD.foreach(new VoidFunction<String>() {
+        @Override
+        public void call(String pathAndIdx)
+            throws Exception {
+          PluginManager.get().init();
+          for (PinotFSSpec pinotFSSpec : _spec.getPinotFSSpecs()) {
+            PinotFSFactory.register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
           }
-          LOGGER.info("Trying to set System Property: [{}={}]", PLUGINS_DIR_PROPERTY_NAME,
-              pluginsDirFile.getAbsolutePath());
-          System.setProperty(PLUGINS_DIR_PROPERTY_NAME, pluginsDirFile.getAbsolutePath());
-          if (pluginsInclude != null) {
-            LOGGER.info("Trying to set System Property: [{}={}]", PLUGINS_INCLUDE_PROPERTY_NAME, pluginsInclude);
-            System.setProperty(PLUGINS_INCLUDE_PROPERTY_NAME, pluginsInclude);
+          PinotFS finalOutputDirFS = PinotFSFactory.create(finalOutputDirURI.getScheme());
+          String[] splits = pathAndIdx.split(" ");
+          String path = splits[0];
+          int idx = Integer.valueOf(splits[1]);
+          // Load Pinot Plugins copied from Distributed cache.
+          File localPluginsTarFile = new File(PINOT_PLUGINS_TAR_GZ);
+          if (localPluginsTarFile.exists()) {
+            File pluginsDirFile = new File(PINOT_PLUGINS_DIR + "-" + idx);
+            try {
+              TarGzCompressionUtils.untar(localPluginsTarFile, pluginsDirFile);
+            } catch (Exception e) {
+              LOGGER.error("Failed to untar local Pinot plugins tarball file [{}]", localPluginsTarFile, e);
+              throw new RuntimeException(e);
+            }
+            LOGGER.info("Trying to set System Property: [{}={}]", PLUGINS_DIR_PROPERTY_NAME,
+                pluginsDirFile.getAbsolutePath());
+            System.setProperty(PLUGINS_DIR_PROPERTY_NAME, pluginsDirFile.getAbsolutePath());
+            if (pluginsInclude != null) {
+              LOGGER.info("Trying to set System Property: [{}={}]", PLUGINS_INCLUDE_PROPERTY_NAME, pluginsInclude);
+              System.setProperty(PLUGINS_INCLUDE_PROPERTY_NAME, pluginsInclude);
+            }
+            LOGGER.info("Pinot plugins System Properties are set at [{}], plugins includes [{}]",
+                System.getProperty(PLUGINS_DIR_PROPERTY_NAME), System.getProperty(PLUGINS_INCLUDE_PROPERTY_NAME));
+          } else {
+            LOGGER.warn("Cannot find local Pinot plugins tar file at [{}]", localPluginsTarFile.getAbsolutePath());
+          }
+          URI inputFileURI = URI.create(path);
+          if (inputFileURI.getScheme() == null) {
+            inputFileURI =
+                new URI(finalInputDirURI.getScheme(), inputFileURI.getSchemeSpecificPart(), inputFileURI.getFragment());
           }
-          LOGGER.info("Pinot plugins System Properties are set at [{}], plugins includes [{}]",
-              System.getProperty(PLUGINS_DIR_PROPERTY_NAME), System.getProperty(PLUGINS_INCLUDE_PROPERTY_NAME));
-        } else {
-          LOGGER.warn("Cannot find local Pinot plugins tar file at [{}]", localPluginsTarFile.getAbsolutePath());
-        }
-        URI inputFileURI = URI.create(path);
-        if (inputFileURI.getScheme() == null) {
-          inputFileURI =
-              new URI(finalInputDirURI.getScheme(), inputFileURI.getSchemeSpecificPart(), inputFileURI.getFragment());
-        }
 
-        //create localTempDir for input and output
-        File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-" + UUID.randomUUID());
-        File localInputTempDir = new File(localTempDir, "input");
-        FileUtils.forceMkdir(localInputTempDir);
-        File localOutputTempDir = new File(localTempDir, "output");
-        FileUtils.forceMkdir(localOutputTempDir);
+          //create localTempDir for input and output

Review comment:
       I can't tell if the original or the new formatting adheres to the Pinot Style, please ensure the new one does.

##########
File path: pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
##########
@@ -209,93 +210,99 @@ public void run()
               .get(PLUGINS_INCLUDE_PROPERTY_NAME) : null;
       final URI finalInputDirURI = inputDirURI;
       final URI finalOutputDirURI = (stagingDirURI == null) ? outputDirURI : stagingDirURI;
-      pathRDD.foreach(pathAndIdx -> {
-        for (PinotFSSpec pinotFSSpec : _spec.getPinotFSSpecs()) {
-          PinotFSFactory.register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
-        }
-        PinotFS finalOutputDirFS = PinotFSFactory.create(finalOutputDirURI.getScheme());
-        String[] splits = pathAndIdx.split(" ");
-        String path = splits[0];
-        int idx = Integer.valueOf(splits[1]);
-        // Load Pinot Plugins copied from Distributed cache.
-        File localPluginsTarFile = new File(PINOT_PLUGINS_TAR_GZ);
-        if (localPluginsTarFile.exists()) {
-          File pluginsDirFile = new File(PINOT_PLUGINS_DIR + "-" + idx);
-          try {
-            TarGzCompressionUtils.untar(localPluginsTarFile, pluginsDirFile);
-          } catch (Exception e) {
-            LOGGER.error("Failed to untar local Pinot plugins tarball file [{}]", localPluginsTarFile, e);
-            throw new RuntimeException(e);
+      // Prevent using lambda expression in Spark to avoid potential serialization exceptions, use inner function instead.

Review comment:
       Thanks for fixing this, how to prevent this from happening in future? Is this unit testable?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] fx19880617 merged pull request #5741: Move lambda expression to inner function in pinot-spark

Posted by GitBox <gi...@apache.org>.
fx19880617 merged pull request #5741:
URL: https://github.com/apache/incubator-pinot/pull/5741


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] fx19880617 commented on a change in pull request #5741: Move lambda expression to inner function in pinot-spark

Posted by GitBox <gi...@apache.org>.
fx19880617 commented on a change in pull request #5741:
URL: https://github.com/apache/incubator-pinot/pull/5741#discussion_r459630478



##########
File path: pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
##########
@@ -209,93 +210,99 @@ public void run()
               .get(PLUGINS_INCLUDE_PROPERTY_NAME) : null;
       final URI finalInputDirURI = inputDirURI;
       final URI finalOutputDirURI = (stagingDirURI == null) ? outputDirURI : stagingDirURI;
-      pathRDD.foreach(pathAndIdx -> {
-        for (PinotFSSpec pinotFSSpec : _spec.getPinotFSSpecs()) {
-          PinotFSFactory.register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
-        }
-        PinotFS finalOutputDirFS = PinotFSFactory.create(finalOutputDirURI.getScheme());
-        String[] splits = pathAndIdx.split(" ");
-        String path = splits[0];
-        int idx = Integer.valueOf(splits[1]);
-        // Load Pinot Plugins copied from Distributed cache.
-        File localPluginsTarFile = new File(PINOT_PLUGINS_TAR_GZ);
-        if (localPluginsTarFile.exists()) {
-          File pluginsDirFile = new File(PINOT_PLUGINS_DIR + "-" + idx);
-          try {
-            TarGzCompressionUtils.untar(localPluginsTarFile, pluginsDirFile);
-          } catch (Exception e) {
-            LOGGER.error("Failed to untar local Pinot plugins tarball file [{}]", localPluginsTarFile, e);
-            throw new RuntimeException(e);
+      // Prevent using lambda expression in Spark to avoid potential serialization exceptions, use inner function instead.

Review comment:
       This is not happening in every spark version/cluster, likely to be a bug in Spark, but we need to accommodate it .
   We hit this issue until some users reported it.
   
   I don't have a good way to prevent it apart from code review :(




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org