You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/07/16 13:12:46 UTC

[spark] branch master updated: [SPARK-32234][SQL] Spark sql commands are failing on selecting the orc tables

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6be8b93  [SPARK-32234][SQL] Spark sql commands are failing on selecting the orc tables
6be8b93 is described below

commit 6be8b935a4f7ce0dea2d7aaaf747c2e8e1a9f47a
Author: SaurabhChawla <sa...@qubole.com>
AuthorDate: Thu Jul 16 13:11:47 2020 +0000

    [SPARK-32234][SQL] Spark sql commands are failing on selecting the orc tables
    
    ### What changes were proposed in this pull request?
    Spark sql commands are failing on selecting the orc tables
    Steps to reproduce
    Example 1 -
    Prerequisite -  This is the location(/Users/test/tpcds_scale5data/date_dim) for orc data which is generated by the hive.
    ```
    val table = """CREATE TABLE `date_dim` (
      `d_date_sk` INT,
      `d_date_id` STRING,
      `d_date` TIMESTAMP,
      `d_month_seq` INT,
      `d_week_seq` INT,
      `d_quarter_seq` INT,
      `d_year` INT,
      `d_dow` INT,
      `d_moy` INT,
      `d_dom` INT,
      `d_qoy` INT,
      `d_fy_year` INT,
      `d_fy_quarter_seq` INT,
      `d_fy_week_seq` INT,
      `d_day_name` STRING,
      `d_quarter_name` STRING,
      `d_holiday` STRING,
      `d_weekend` STRING,
      `d_following_holiday` STRING,
      `d_first_dom` INT,
      `d_last_dom` INT,
      `d_same_day_ly` INT,
      `d_same_day_lq` INT,
      `d_current_day` STRING,
      `d_current_week` STRING,
      `d_current_month` STRING,
      `d_current_quarter` STRING,
      `d_current_year` STRING)
    USING orc
    LOCATION '/Users/test/tpcds_scale5data/date_dim'"""
    
    spark.sql(table).collect
    
    val u = """select date_dim.d_date_id from date_dim limit 5"""
    
    spark.sql(u).collect
    ```
    Example 2
    
    ```
      val table = """CREATE TABLE `test_orc_data` (
      `_col1` INT,
      `_col2` STRING,
      `_col3` INT)
      USING orc"""
    
    spark.sql(table).collect
    
    spark.sql("insert into test_orc_data values(13, '155', 2020)").collect
    
    val df = """select _col2 from test_orc_data limit 5"""
    spark.sql(df).collect
    
    ```
    
    Its Failing with below error
    ```
    org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2, 192.168.0.103, executor driver): java.lang.ArrayIndexOutOfBoundsException: 1
        at org.apache.spark.sql.execution.datasources.orc.OrcColumnarBatchReader.initBatch(OrcColumnarBatchReader.java:156)
        at org.apache.spark.sql.execution.datasources.orc.OrcFileFormat.$anonfun$buildReaderWithPartitionValues$7(OrcFileFormat.scala:258)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:141)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:203)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:116)
        at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:620)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:343)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:895)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:895)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:372)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:336)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:133)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:445)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1489)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:448)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)`
    ```
    
    The reason behind this initBatch is not getting the schema that is needed to find out the column value in OrcFileFormat.scala
    ```
    batchReader.initBatch(
     TypeDescription.fromString(resultSchemaString)
    ```
    
    ### Why are the changes needed?
    Spark sql queries for orc tables are failing
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Unit test is added for this .Also Tested through spark shell and spark submit the failing queries
    
    Closes #29045 from SaurabhChawla100/SPARK-32234.
    
    Lead-authored-by: SaurabhChawla <sa...@qubole.com>
    Co-authored-by: SaurabhChawla <s....@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../execution/datasources/orc/OrcFileFormat.scala  | 10 +++---
 .../sql/execution/datasources/orc/OrcUtils.scala   | 41 ++++++++++++++++++----
 .../v2/orc/OrcPartitionReaderFactory.scala         | 21 +++++------
 .../spark/sql/hive/orc/HiveOrcQuerySuite.scala     | 28 +++++++++++++++
 4 files changed, 78 insertions(+), 22 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
index fd791ce..4dff1ec 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
@@ -164,8 +164,6 @@ class OrcFileFormat
     val enableVectorizedReader = supportBatch(sparkSession, resultSchema)
     val capacity = sqlConf.orcVectorizedReaderBatchSize
 
-    val resultSchemaString = OrcUtils.orcTypeDescriptionString(resultSchema)
-    OrcConf.MAPRED_INPUT_SCHEMA.setString(hadoopConf, resultSchemaString)
     OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(hadoopConf, sqlConf.caseSensitiveAnalysis)
 
     val broadcastedConf =
@@ -179,16 +177,18 @@ class OrcFileFormat
 
       val fs = filePath.getFileSystem(conf)
       val readerOptions = OrcFile.readerOptions(conf).filesystem(fs)
-      val requestedColIdsOrEmptyFile =
+      val resultedColPruneInfo =
         Utils.tryWithResource(OrcFile.createReader(filePath, readerOptions)) { reader =>
           OrcUtils.requestedColumnIds(
             isCaseSensitive, dataSchema, requiredSchema, reader, conf)
         }
 
-      if (requestedColIdsOrEmptyFile.isEmpty) {
+      if (resultedColPruneInfo.isEmpty) {
         Iterator.empty
       } else {
-        val requestedColIds = requestedColIdsOrEmptyFile.get
+        val (requestedColIds, canPruneCols) = resultedColPruneInfo.get
+        val resultSchemaString = OrcUtils.orcResultSchemaString(canPruneCols,
+          dataSchema, resultSchema, partitionSchema, conf)
         assert(requestedColIds.length == requiredSchema.length,
           "[BUG] requested column IDs do not match required schema")
         val taskConf = new Configuration(conf)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
index d274bcd..e102539 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
@@ -24,7 +24,7 @@ import scala.collection.JavaConverters._
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, Path}
-import org.apache.orc.{OrcFile, Reader, TypeDescription, Writer}
+import org.apache.orc.{OrcConf, OrcFile, Reader, TypeDescription, Writer}
 
 import org.apache.spark.{SPARK_VERSION_SHORT, SparkException}
 import org.apache.spark.deploy.SparkHadoopUtil
@@ -116,15 +116,17 @@ object OrcUtils extends Logging {
   }
 
   /**
-   * Returns the requested column ids from the given ORC file. Column id can be -1, which means the
-   * requested column doesn't exist in the ORC file. Returns None if the given ORC file is empty.
+   * @return Returns the combination of requested column ids from the given ORC file and
+   *         boolean flag to find if the pruneCols is allowed or not. Requested Column id can be
+   *         -1, which means the requested column doesn't exist in the ORC file. Returns None
+   *         if the given ORC file is empty.
    */
   def requestedColumnIds(
       isCaseSensitive: Boolean,
       dataSchema: StructType,
       requiredSchema: StructType,
       reader: Reader,
-      conf: Configuration): Option[Array[Int]] = {
+      conf: Configuration): Option[(Array[Int], Boolean)] = {
     val orcFieldNames = reader.getSchema.getFieldNames.asScala
     if (orcFieldNames.isEmpty) {
       // SPARK-8501: Some old empty ORC files always have an empty schema stored in their footer.
@@ -136,6 +138,10 @@ object OrcUtils extends Logging {
         assert(orcFieldNames.length <= dataSchema.length, "The given data schema " +
           s"${dataSchema.catalogString} has less fields than the actual ORC physical schema, " +
           "no idea which columns were dropped, fail to read.")
+        // for ORC file written by Hive, no field names
+        // in the physical schema, there is a need to send the
+        // entire dataSchema instead of required schema.
+        // So pruneCols is not done in this case
         Some(requiredSchema.fieldNames.map { name =>
           val index = dataSchema.fieldIndex(name)
           if (index < orcFieldNames.length) {
@@ -143,7 +149,7 @@ object OrcUtils extends Logging {
           } else {
             -1
           }
-        })
+        }, false)
       } else {
         if (isCaseSensitive) {
           Some(requiredSchema.fieldNames.zipWithIndex.map { case (name, idx) =>
@@ -152,7 +158,7 @@ object OrcUtils extends Logging {
             } else {
               -1
             }
-          })
+          }, true)
         } else {
           // Do case-insensitive resolution only if in case-insensitive mode
           val caseInsensitiveOrcFieldMap = orcFieldNames.groupBy(_.toLowerCase(Locale.ROOT))
@@ -170,7 +176,7 @@ object OrcUtils extends Logging {
                   idx
                 }
               }.getOrElse(-1)
-          })
+          }, true)
         }
       }
     }
@@ -199,4 +205,25 @@ object OrcUtils extends Logging {
       s"map<${orcTypeDescriptionString(m.keyType)},${orcTypeDescriptionString(m.valueType)}>"
     case _ => dt.catalogString
   }
+
+  /**
+   * @return Returns the result schema string based on the canPruneCols flag.
+   *         resultSchemaString will be created using resultsSchema in case of
+   *         canPruneCols is true and for canPruneCols as false value
+   *         resultSchemaString will be created using the actual dataSchema.
+   */
+  def orcResultSchemaString(
+      canPruneCols: Boolean,
+      dataSchema: StructType,
+      resultSchema: StructType,
+      partitionSchema: StructType,
+      conf: Configuration): String = {
+    val resultSchemaString = if (canPruneCols) {
+      OrcUtils.orcTypeDescriptionString(resultSchema)
+    } else {
+      OrcUtils.orcTypeDescriptionString(StructType(dataSchema.fields ++ partitionSchema.fields))
+    }
+    OrcConf.MAPRED_INPUT_SCHEMA.setString(conf, resultSchemaString)
+    resultSchemaString
+  }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala
index 03d58fd..7f25f7bd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala
@@ -66,24 +66,24 @@ case class OrcPartitionReaderFactory(
   override def buildReader(file: PartitionedFile): PartitionReader[InternalRow] = {
     val conf = broadcastedConf.value.value
 
-    val resultSchemaString = OrcUtils.orcTypeDescriptionString(resultSchema)
-    OrcConf.MAPRED_INPUT_SCHEMA.setString(conf, resultSchemaString)
     OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(conf, isCaseSensitive)
 
     val filePath = new Path(new URI(file.filePath))
 
     val fs = filePath.getFileSystem(conf)
     val readerOptions = OrcFile.readerOptions(conf).filesystem(fs)
-    val requestedColIdsOrEmptyFile =
+    val resultedColPruneInfo =
       Utils.tryWithResource(OrcFile.createReader(filePath, readerOptions)) { reader =>
         OrcUtils.requestedColumnIds(
           isCaseSensitive, dataSchema, readDataSchema, reader, conf)
       }
 
-    if (requestedColIdsOrEmptyFile.isEmpty) {
+    if (resultedColPruneInfo.isEmpty) {
       new EmptyPartitionReader[InternalRow]
     } else {
-      val requestedColIds = requestedColIdsOrEmptyFile.get
+      val (requestedColIds, canPruneCols) = resultedColPruneInfo.get
+      val resultSchemaString = OrcUtils.orcResultSchemaString(canPruneCols,
+        dataSchema, resultSchema, partitionSchema, conf)
       assert(requestedColIds.length == readDataSchema.length,
         "[BUG] requested column IDs do not match required schema")
 
@@ -112,24 +112,25 @@ case class OrcPartitionReaderFactory(
   override def buildColumnarReader(file: PartitionedFile): PartitionReader[ColumnarBatch] = {
     val conf = broadcastedConf.value.value
 
-    val resultSchemaString = OrcUtils.orcTypeDescriptionString(resultSchema)
-    OrcConf.MAPRED_INPUT_SCHEMA.setString(conf, resultSchemaString)
     OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(conf, isCaseSensitive)
 
     val filePath = new Path(new URI(file.filePath))
 
     val fs = filePath.getFileSystem(conf)
     val readerOptions = OrcFile.readerOptions(conf).filesystem(fs)
-    val requestedColIdsOrEmptyFile =
+    val resultedColPruneInfo =
       Utils.tryWithResource(OrcFile.createReader(filePath, readerOptions)) { reader =>
         OrcUtils.requestedColumnIds(
           isCaseSensitive, dataSchema, readDataSchema, reader, conf)
       }
 
-    if (requestedColIdsOrEmptyFile.isEmpty) {
+    if (resultedColPruneInfo.isEmpty) {
       new EmptyPartitionReader
     } else {
-      val requestedColIds = requestedColIdsOrEmptyFile.get ++ Array.fill(partitionSchema.length)(-1)
+      val (requestedDataColIds, canPruneCols) = resultedColPruneInfo.get
+      val resultSchemaString = OrcUtils.orcResultSchemaString(canPruneCols,
+        dataSchema, resultSchema, partitionSchema, conf)
+      val requestedColIds = requestedDataColIds ++ Array.fill(partitionSchema.length)(-1)
       assert(requestedColIds.length == resultSchema.length,
         "[BUG] requested column IDs do not match required schema")
       val taskConf = new Configuration(conf)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala
index 990d942..12ee5be 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala
@@ -288,4 +288,32 @@ class HiveOrcQuerySuite extends OrcQueryTest with TestHiveSingleton {
       }
     }
   }
+
+  test("SPARK-32234 read ORC table with column names all starting with '_col'") {
+    Seq("native", "hive").foreach { orcImpl =>
+      Seq("false", "true").foreach { vectorized =>
+        withSQLConf(
+          SQLConf.ORC_IMPLEMENTATION.key -> orcImpl,
+          SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> vectorized) {
+          withTable("test_hive_orc_impl") {
+            spark.sql(
+              s"""
+                 | CREATE TABLE test_hive_orc_impl
+                 | (_col1 INT, _col2 STRING, _col3 INT)
+                 | STORED AS ORC
+               """.stripMargin)
+            spark.sql(
+              s"""
+                 | INSERT INTO
+                 | test_hive_orc_impl
+                 | VALUES(9, '12', 2020)
+               """.stripMargin)
+
+            val df = spark.sql("SELECT _col2 FROM test_hive_orc_impl")
+            checkAnswer(df, Row("12"))
+          }
+        }
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org