You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "mridulm (via GitHub)" <gi...@apache.org> on 2023/07/19 04:59:31 UTC

[GitHub] [spark] mridulm commented on a diff in pull request #42037: [SPARK-44305][SQL] Dynamically choose whether to broadcast hadoop conf

mridulm commented on code in PR #42037:
URL: https://github.com/apache/spark/pull/42037#discussion_r1267527890


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala:
##########
@@ -152,24 +153,34 @@ class OrcFileFormat
       assert(supportBatch(sparkSession, resultSchema))
     }
 
-    OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(hadoopConf, sqlConf.caseSensitiveAnalysis)
-
-    val broadcastedConf =
-      sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
     val isCaseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
+    OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(hadoopConf, isCaseSensitive)
+    val broadcastedHadoopConf = if (options.isEmpty) {
+      Option.empty
+    } else {
+      Option(sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)))
+    }

Review Comment:
   ```suggestion
       val broadcastedHadoopConf = options.map(_ => sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)))
   ```



##########
sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala:
##########
@@ -136,20 +137,37 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
       options: Map[String, String],
       hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
 
-    if (sparkSession.sessionState.conf.orcFilterPushDown) {
+    val orcFilterPushDown = sparkSession.sessionState.conf.orcFilterPushDown
+    if (orcFilterPushDown) {
       // Sets pushed predicates
       OrcFilters.createFilter(requiredSchema, filters).foreach { f =>
         hadoopConf.set(OrcFileFormat.SARG_PUSHDOWN, toKryo(f))
         hadoopConf.setBoolean(ConfVars.HIVEOPTINDEXFILTER.varname, true)
       }
     }
 
-    val broadcastedHadoopConf =
-      sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
+    val broadcastedHadoopConf = if (options.isEmpty) {
+      Option.empty
+    } else {
+      Option(sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)))
+    }
     val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles
+    val sparkConf = sparkSession.sparkContext.conf
 
     (file: PartitionedFile) => {
-      val conf = broadcastedHadoopConf.value.value
+      val conf = if (broadcastedHadoopConf.isDefined) {
+        broadcastedHadoopConf.get.value.value
+      } else {
+        val conf = SparkHadoopUtil.newConfiguration(sparkConf)
+        if (orcFilterPushDown) {
+          // Sets pushed predicates
+          OrcFilters.createFilter(requiredSchema, filters).foreach { f =>
+            conf.set(OrcFileFormat.SARG_PUSHDOWN, toKryo(f))
+            conf.setBoolean(ConfVars.HIVEOPTINDEXFILTER.varname, true)
+          }
+        }
+        new SerializableConfiguration(conf).value
+      }

Review Comment:
   My comments in `OrcFileFormat` apply here as well.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala:
##########
@@ -152,24 +153,34 @@ class OrcFileFormat
       assert(supportBatch(sparkSession, resultSchema))
     }
 
-    OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(hadoopConf, sqlConf.caseSensitiveAnalysis)
-
-    val broadcastedConf =
-      sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
     val isCaseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
+    OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(hadoopConf, isCaseSensitive)
+    val broadcastedHadoopConf = if (options.isEmpty) {
+      Option.empty
+    } else {
+      Option(sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)))
+    }
+
     val orcFilterPushDown = sparkSession.sessionState.conf.orcFilterPushDown
+    val sparkConf = sparkSession.sparkContext.conf
 
     (file: PartitionedFile) => {
-      val conf = broadcastedConf.value.value
+      val sharedConf = if (broadcastedHadoopConf.isDefined) {
+        broadcastedHadoopConf.get.value.value
+      } else {
+        val conf = SparkHadoopUtil.newConfiguration(sparkConf)
+        OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(conf, isCaseSensitive)
+        new SerializableConfiguration(conf).value
+      }

Review Comment:
   Simply use `conf` here ? Why `new SerializableConfiguration(conf).value` ?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala:
##########
@@ -94,6 +94,14 @@ trait FileFormat {
     false
   }
 
+  /**
+   * Returns whether Hadoop configuration needs to be broadcasted.
+   */
+  def isBroadcastHadoopConf(

Review Comment:
   `isBroadcastHadoopConf` -> `shouldBroadcastHadoopConf`



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala:
##########
@@ -152,24 +153,34 @@ class OrcFileFormat
       assert(supportBatch(sparkSession, resultSchema))
     }
 
-    OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(hadoopConf, sqlConf.caseSensitiveAnalysis)
-
-    val broadcastedConf =
-      sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
     val isCaseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
+    OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(hadoopConf, isCaseSensitive)
+    val broadcastedHadoopConf = if (options.isEmpty) {
+      Option.empty
+    } else {
+      Option(sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)))
+    }
+
     val orcFilterPushDown = sparkSession.sessionState.conf.orcFilterPushDown
+    val sparkConf = sparkSession.sparkContext.conf
 
     (file: PartitionedFile) => {
-      val conf = broadcastedConf.value.value
+      val sharedConf = if (broadcastedHadoopConf.isDefined) {
+        broadcastedHadoopConf.get.value.value
+      } else {
+        val conf = SparkHadoopUtil.newConfiguration(sparkConf)

Review Comment:
   You are assuming `hadoopConf` == `SparkHadoopUtil.newConfiguration(sparkConf)`
   This does not hold in general, particularly when the code evolves.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala:
##########
@@ -152,24 +153,34 @@ class OrcFileFormat
       assert(supportBatch(sparkSession, resultSchema))
     }
 
-    OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(hadoopConf, sqlConf.caseSensitiveAnalysis)
-
-    val broadcastedConf =
-      sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
     val isCaseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
+    OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(hadoopConf, isCaseSensitive)
+    val broadcastedHadoopConf = if (options.isEmpty) {
+      Option.empty
+    } else {
+      Option(sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)))
+    }
+
     val orcFilterPushDown = sparkSession.sessionState.conf.orcFilterPushDown
+    val sparkConf = sparkSession.sparkContext.conf
 
     (file: PartitionedFile) => {
-      val conf = broadcastedConf.value.value
+      val sharedConf = if (broadcastedHadoopConf.isDefined) {

Review Comment:
   rename `sharedConf` to `conf` and minimize unnecessary diffs



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org