You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ge...@apache.org on 2020/07/01 22:13:33 UTC

[spark] branch branch-3.0 updated: [SPARK-31935][SQL][FOLLOWUP] Hadoop file system config should be effective in data source options

This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new fef3379  [SPARK-31935][SQL][FOLLOWUP] Hadoop file system config should be effective in data source options
fef3379 is described below

commit fef337935179248e88aa4be84a4214fea1c3a11a
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Thu Jul 2 06:09:54 2020 +0800

    [SPARK-31935][SQL][FOLLOWUP] Hadoop file system config should be effective in data source options
    
    ### What changes were proposed in this pull request?
    
    This is a followup of https://github.com/apache/spark/pull/28760 to fix the remaining issues:
    1. should consider data source options when refreshing cache by path at the end of `InsertIntoHadoopFsRelationCommand`
    2. should consider data source options when inferring schema for file source
    3. should consider data source options when getting the qualified path in file source v2.
    
    ### Why are the changes needed?
    
    We didn't catch these issues in https://github.com/apache/spark/pull/28760, because the test case is to check error when initializing the file system. If we initialize the file system multiple times during a simple read/write action, the test case actually only test the first time.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    rewrite the test to make sure the entire data source read/write action can succeed.
    
    Closes #28948 from cloud-fan/fix.
    
    Authored-by: Wenchen Fan <we...@databricks.com>
    Signed-off-by: Gengliang Wang <ge...@databricks.com>
    (cherry picked from commit 6edb20df834f7f9b85c1559ef78a3d0d2272e4df)
    Signed-off-by: Gengliang Wang <ge...@databricks.com>
---
 .../spark/sql/v2/avro/AvroDataSourceV2.scala       |  4 +--
 .../apache/spark/sql/execution/CacheManager.scala  | 15 +++++++----
 .../InsertIntoHadoopFsRelationCommand.scala        |  2 +-
 .../execution/datasources/SchemaMergeUtils.scala   |  4 ++-
 .../sql/execution/datasources/orc/OrcUtils.scala   |  2 +-
 .../datasources/parquet/ParquetFileFormat.scala    |  3 ++-
 .../datasources/parquet/ParquetUtils.scala         |  2 +-
 .../datasources/v2/FileDataSourceV2.scala          | 13 ++++++---
 .../datasources/v2/csv/CSVDataSourceV2.scala       |  4 +--
 .../datasources/v2/json/JsonDataSourceV2.scala     |  4 +--
 .../datasources/v2/orc/OrcDataSourceV2.scala       |  4 +--
 .../v2/parquet/ParquetDataSourceV2.scala           |  4 +--
 .../datasources/v2/text/TextDataSourceV2.scala     |  4 +--
 .../spark/sql/FileBasedDataSourceSuite.scala       | 31 +++++++++++++---------
 .../execution/datasources/orc/OrcSourceSuite.scala |  4 +--
 .../apache/spark/sql/hive/orc/OrcFileFormat.scala  |  4 +--
 16 files changed, 59 insertions(+), 45 deletions(-)

diff --git a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroDataSourceV2.scala b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroDataSourceV2.scala
index c6f52d6..969dee0 100644
--- a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroDataSourceV2.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroDataSourceV2.scala
@@ -31,13 +31,13 @@ class AvroDataSourceV2 extends FileDataSourceV2 {
 
   override def getTable(options: CaseInsensitiveStringMap): Table = {
     val paths = getPaths(options)
-    val tableName = getTableName(paths)
+    val tableName = getTableName(options, paths)
     AvroTable(tableName, sparkSession, options, paths, None, fallbackFileFormat)
   }
 
   override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = {
     val paths = getPaths(options)
-    val tableName = getTableName(paths)
+    val tableName = getTableName(options, paths)
     AvroTable(tableName, sparkSession, options, paths, Some(schema), fallbackFileFormat)
   }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index 52cec8b..7d86c48 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -248,12 +248,17 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
    * `HadoopFsRelation` node(s) as part of its logical plan.
    */
   def recacheByPath(spark: SparkSession, resourcePath: String): Unit = {
-    val (fs, qualifiedPath) = {
-      val path = new Path(resourcePath)
-      val fs = path.getFileSystem(spark.sessionState.newHadoopConf())
-      (fs, fs.makeQualified(path))
-    }
+    val path = new Path(resourcePath)
+    val fs = path.getFileSystem(spark.sessionState.newHadoopConf())
+    recacheByPath(spark, path, fs)
+  }
 
+  /**
+   * Tries to re-cache all the cache entries that contain `resourcePath` in one or more
+   * `HadoopFsRelation` node(s) as part of its logical plan.
+   */
+  def recacheByPath(spark: SparkSession, resourcePath: Path, fs: FileSystem): Unit = {
+    val qualifiedPath = fs.makeQualified(resourcePath)
     recacheByCondition(spark, _.plan.find(lookupAndRefresh(_, fs, qualifiedPath)).isDefined)
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
index f119721..fe733f4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
@@ -192,7 +192,7 @@ case class InsertIntoHadoopFsRelationCommand(
       // refresh cached files in FileIndex
       fileIndex.foreach(_.refresh())
       // refresh data cache if table is cached
-      sparkSession.catalog.refreshByPath(outputPath.toString)
+      sparkSession.sharedState.cacheManager.recacheByPath(sparkSession, outputPath, fs)
 
       if (catalogTable.nonEmpty) {
         CommandUtils.updateTableStats(sparkSession, catalogTable.get)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaMergeUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaMergeUtils.scala
index 99882b0..28097c3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaMergeUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaMergeUtils.scala
@@ -32,10 +32,12 @@ object SchemaMergeUtils extends Logging {
    */
   def mergeSchemasInParallel(
       sparkSession: SparkSession,
+      parameters: Map[String, String],
       files: Seq[FileStatus],
       schemaReader: (Seq[FileStatus], Configuration, Boolean) => Seq[StructType])
       : Option[StructType] = {
-    val serializedConf = new SerializableConfiguration(sparkSession.sessionState.newHadoopConf())
+    val serializedConf = new SerializableConfiguration(
+      sparkSession.sessionState.newHadoopConfWithOptions(parameters))
 
     // !! HACK ALERT !!
     // Here is a hack for Parquet, but it can be used by Orc as well.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
index eea9b2a..d274bcd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
@@ -109,7 +109,7 @@ object OrcUtils extends Logging {
     val orcOptions = new OrcOptions(options, sparkSession.sessionState.conf)
     if (orcOptions.mergeSchema) {
       SchemaMergeUtils.mergeSchemasInParallel(
-        sparkSession, files, OrcUtils.readOrcSchemasInParallel)
+        sparkSession, options, files, OrcUtils.readOrcSchemasInParallel)
     } else {
       OrcUtils.readSchema(sparkSession, files)
     }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 7187410..68f49f9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -475,6 +475,7 @@ object ParquetFileFormat extends Logging {
    *     S3 nodes).
    */
   def mergeSchemasInParallel(
+      parameters: Map[String, String],
       filesToTouch: Seq[FileStatus],
       sparkSession: SparkSession): Option[StructType] = {
     val assumeBinaryIsString = sparkSession.sessionState.conf.isParquetBinaryAsString
@@ -490,7 +491,7 @@ object ParquetFileFormat extends Logging {
         .map(ParquetFileFormat.readSchemaFromFooter(_, converter))
     }
 
-    SchemaMergeUtils.mergeSchemasInParallel(sparkSession, filesToTouch, reader)
+    SchemaMergeUtils.mergeSchemasInParallel(sparkSession, parameters, filesToTouch, reader)
   }
 
   /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
index 7e7dba9..b91d75c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
@@ -104,7 +104,7 @@ object ParquetUtils {
           .orElse(filesByType.data.headOption)
           .toSeq
       }
-    ParquetFileFormat.mergeSchemasInParallel(filesToTouch, sparkSession)
+    ParquetFileFormat.mergeSchemasInParallel(parameters, filesToTouch, sparkSession)
   }
 
   case class FileTypes(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
index 30a964d..bbe8835 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
@@ -18,7 +18,10 @@ package org.apache.spark.sql.execution.datasources.v2
 
 import java.util
 
+import scala.collection.JavaConverters._
+
 import com.fasterxml.jackson.databind.ObjectMapper
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.sql.SparkSession
@@ -53,14 +56,16 @@ trait FileDataSourceV2 extends TableProvider with DataSourceRegister {
     paths ++ Option(map.get("path")).toSeq
   }
 
-  protected def getTableName(paths: Seq[String]): String = {
-    val name = shortName() + " " + paths.map(qualifiedPathName).mkString(",")
+  protected def getTableName(map: CaseInsensitiveStringMap, paths: Seq[String]): String = {
+    val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(
+      map.asCaseSensitiveMap().asScala.toMap)
+    val name = shortName() + " " + paths.map(qualifiedPathName(_, hadoopConf)).mkString(",")
     Utils.redact(sparkSession.sessionState.conf.stringRedactionPattern, name)
   }
 
-  private def qualifiedPathName(path: String): String = {
+  private def qualifiedPathName(path: String, hadoopConf: Configuration): String = {
     val hdfsPath = new Path(path)
-    val fs = hdfsPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
+    val fs = hdfsPath.getFileSystem(hadoopConf)
     hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory).toString
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVDataSourceV2.scala
index 1f99d42..69d001b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVDataSourceV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVDataSourceV2.scala
@@ -31,13 +31,13 @@ class CSVDataSourceV2 extends FileDataSourceV2 {
 
   override def getTable(options: CaseInsensitiveStringMap): Table = {
     val paths = getPaths(options)
-    val tableName = getTableName(paths)
+    val tableName = getTableName(options, paths)
     CSVTable(tableName, sparkSession, options, paths, None, fallbackFileFormat)
   }
 
   override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = {
     val paths = getPaths(options)
-    val tableName = getTableName(paths)
+    val tableName = getTableName(options, paths)
     CSVTable(tableName, sparkSession, options, paths, Some(schema), fallbackFileFormat)
   }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonDataSourceV2.scala
index 7a0949e..9c4e3b8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonDataSourceV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonDataSourceV2.scala
@@ -31,13 +31,13 @@ class JsonDataSourceV2 extends FileDataSourceV2 {
 
   override def getTable(options: CaseInsensitiveStringMap): Table = {
     val paths = getPaths(options)
-    val tableName = getTableName(paths)
+    val tableName = getTableName(options, paths)
     JsonTable(tableName, sparkSession, options, paths, None, fallbackFileFormat)
   }
 
   override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = {
     val paths = getPaths(options)
-    val tableName = getTableName(paths)
+    val tableName = getTableName(options, paths)
     JsonTable(tableName, sparkSession, options, paths, Some(schema), fallbackFileFormat)
   }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala
index 8665af3..fa2febd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala
@@ -31,13 +31,13 @@ class OrcDataSourceV2 extends FileDataSourceV2 {
 
   override def getTable(options: CaseInsensitiveStringMap): Table = {
     val paths = getPaths(options)
-    val tableName = getTableName(paths)
+    val tableName = getTableName(options, paths)
     OrcTable(tableName, sparkSession, options, paths, None, fallbackFileFormat)
   }
 
   override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = {
     val paths = getPaths(options)
-    val tableName = getTableName(paths)
+    val tableName = getTableName(options, paths)
     OrcTable(tableName, sparkSession, options, paths, Some(schema), fallbackFileFormat)
   }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetDataSourceV2.scala
index 8cb6186..7e7ca96 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetDataSourceV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetDataSourceV2.scala
@@ -31,13 +31,13 @@ class ParquetDataSourceV2 extends FileDataSourceV2 {
 
   override def getTable(options: CaseInsensitiveStringMap): Table = {
     val paths = getPaths(options)
-    val tableName = getTableName(paths)
+    val tableName = getTableName(options, paths)
     ParquetTable(tableName, sparkSession, options, paths, None, fallbackFileFormat)
   }
 
   override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = {
     val paths = getPaths(options)
-    val tableName = getTableName(paths)
+    val tableName = getTableName(options, paths)
     ParquetTable(tableName, sparkSession, options, paths, Some(schema), fallbackFileFormat)
   }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextDataSourceV2.scala
index 049c717..43bcb61 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextDataSourceV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextDataSourceV2.scala
@@ -31,13 +31,13 @@ class TextDataSourceV2 extends FileDataSourceV2 {
 
   override def getTable(options: CaseInsensitiveStringMap): Table = {
     val paths = getPaths(options)
-    val tableName = getTableName(paths)
+    val tableName = getTableName(options, paths)
     TextTable(tableName, sparkSession, options, paths, None, fallbackFileFormat)
   }
 
   override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = {
     val paths = getPaths(options)
-    val tableName = getTableName(paths)
+    val tableName = getTableName(options, paths)
     TextTable(tableName, sparkSession, options, paths, Some(schema), fallbackFileFormat)
   }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index d8157d3..231a8f2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -18,12 +18,14 @@
 package org.apache.spark.sql
 
 import java.io.{File, FileNotFoundException}
+import java.net.URI
 import java.nio.file.{Files, StandardOpenOption}
 import java.util.Locale
 
 import scala.collection.mutable
 
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{LocalFileSystem, Path}
 
 import org.apache.spark.SparkException
 import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
@@ -845,19 +847,15 @@ class FileBasedDataSourceSuite extends QueryTest
 
   test("SPARK-31935: Hadoop file system config should be effective in data source options") {
     Seq("parquet", "").foreach { format =>
-      withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> format) {
+      withSQLConf(
+        SQLConf.USE_V1_SOURCE_LIST.key -> format,
+        "fs.file.impl" -> classOf[FakeFileSystemRequiringDSOption].getName,
+        "fs.file.impl.disable.cache" -> "true") {
         withTempDir { dir =>
-          val path = dir.getCanonicalPath
-          val defaultFs = "nonexistFS://nonexistFS"
-          val expectMessage = "No FileSystem for scheme nonexistFS"
-          val message1 = intercept[java.io.IOException] {
-            spark.range(10).write.option("fs.defaultFS", defaultFs).parquet(path)
-          }.getMessage
-          assert(message1.filterNot(Set(':', '"').contains) == expectMessage)
-          val message2 = intercept[java.io.IOException] {
-            spark.read.option("fs.defaultFS", defaultFs).parquet(path)
-          }.getMessage
-          assert(message2.filterNot(Set(':', '"').contains) == expectMessage)
+          val path = "file:" + dir.getCanonicalPath.stripPrefix("file:")
+          spark.range(10).write.option("ds_option", "value").mode("overwrite").parquet(path)
+          checkAnswer(
+            spark.read.option("ds_option", "value").parquet(path), spark.range(10).toDF())
         }
       }
     }
@@ -932,3 +930,10 @@ object TestingUDT {
     override def userClass: Class[NullData] = classOf[NullData]
   }
 }
+
+class FakeFileSystemRequiringDSOption extends LocalFileSystem {
+  override def initialize(name: URI, conf: Configuration): Unit = {
+    super.initialize(name, conf)
+    require(conf.get("ds_option", "") == "value")
+  }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
index 7387368..b70fd74 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
@@ -213,9 +213,7 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll {
           Seq(fs.listStatus(path1), fs.listStatus(path2), fs.listStatus(path3)).flatten
 
         val schema = SchemaMergeUtils.mergeSchemasInParallel(
-          spark,
-          fileStatuses,
-          schemaReader)
+          spark, Map.empty, fileStatuses, schemaReader)
 
         assert(schema.isDefined)
         assert(schema.get == StructType(Seq(
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
index 7f2eb14..356b92b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
@@ -70,9 +70,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
     val orcOptions = new OrcOptions(options, sparkSession.sessionState.conf)
     if (orcOptions.mergeSchema) {
       SchemaMergeUtils.mergeSchemasInParallel(
-        sparkSession,
-        files,
-        OrcFileOperator.readOrcSchemasInParallel)
+        sparkSession, options, files, OrcFileOperator.readOrcSchemasInParallel)
     } else {
       val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles
       OrcFileOperator.readSchema(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org