You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/07/16 13:12:46 UTC
[spark] branch master updated: [SPARK-32234][SQL] Spark sql
commands are failing on selecting the orc tables
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 6be8b93 [SPARK-32234][SQL] Spark sql commands are failing on selecting the orc tables
6be8b93 is described below
commit 6be8b935a4f7ce0dea2d7aaaf747c2e8e1a9f47a
Author: SaurabhChawla <sa...@qubole.com>
AuthorDate: Thu Jul 16 13:11:47 2020 +0000
[SPARK-32234][SQL] Spark sql commands are failing on selecting the orc tables
### What changes were proposed in this pull request?
Spark sql commands are failing on selecting the orc tables
Steps to reproduce
Example 1 -
Prerequisite - This is the location(/Users/test/tpcds_scale5data/date_dim) for orc data which is generated by the hive.
```
val table = """CREATE TABLE `date_dim` (
`d_date_sk` INT,
`d_date_id` STRING,
`d_date` TIMESTAMP,
`d_month_seq` INT,
`d_week_seq` INT,
`d_quarter_seq` INT,
`d_year` INT,
`d_dow` INT,
`d_moy` INT,
`d_dom` INT,
`d_qoy` INT,
`d_fy_year` INT,
`d_fy_quarter_seq` INT,
`d_fy_week_seq` INT,
`d_day_name` STRING,
`d_quarter_name` STRING,
`d_holiday` STRING,
`d_weekend` STRING,
`d_following_holiday` STRING,
`d_first_dom` INT,
`d_last_dom` INT,
`d_same_day_ly` INT,
`d_same_day_lq` INT,
`d_current_day` STRING,
`d_current_week` STRING,
`d_current_month` STRING,
`d_current_quarter` STRING,
`d_current_year` STRING)
USING orc
LOCATION '/Users/test/tpcds_scale5data/date_dim'"""
spark.sql(table).collect
val u = """select date_dim.d_date_id from date_dim limit 5"""
spark.sql(u).collect
```
Example 2
```
val table = """CREATE TABLE `test_orc_data` (
`_col1` INT,
`_col2` STRING,
`_col3` INT)
USING orc"""
spark.sql(table).collect
spark.sql("insert into test_orc_data values(13, '155', 2020)").collect
val df = """select _col2 from test_orc_data limit 5"""
spark.sql(df).collect
```
Its Failing with below error
```
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2, 192.168.0.103, executor driver): java.lang.ArrayIndexOutOfBoundsException: 1
at org.apache.spark.sql.execution.datasources.orc.OrcColumnarBatchReader.initBatch(OrcColumnarBatchReader.java:156)
at org.apache.spark.sql.execution.datasources.orc.OrcFileFormat.$anonfun$buildReaderWithPartitionValues$7(OrcFileFormat.scala:258)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:141)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:203)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:116)
at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:620)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:343)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:895)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:895)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:372)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:336)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:133)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:445)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1489)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:448)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)`
```
The reason behind this initBatch is not getting the schema that is needed to find out the column value in OrcFileFormat.scala
```
batchReader.initBatch(
TypeDescription.fromString(resultSchemaString)
```
### Why are the changes needed?
Spark sql queries for orc tables are failing
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Unit test is added for this .Also Tested through spark shell and spark submit the failing queries
Closes #29045 from SaurabhChawla100/SPARK-32234.
Lead-authored-by: SaurabhChawla <sa...@qubole.com>
Co-authored-by: SaurabhChawla <s....@gmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../execution/datasources/orc/OrcFileFormat.scala | 10 +++---
.../sql/execution/datasources/orc/OrcUtils.scala | 41 ++++++++++++++++++----
.../v2/orc/OrcPartitionReaderFactory.scala | 21 +++++------
.../spark/sql/hive/orc/HiveOrcQuerySuite.scala | 28 +++++++++++++++
4 files changed, 78 insertions(+), 22 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
index fd791ce..4dff1ec 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
@@ -164,8 +164,6 @@ class OrcFileFormat
val enableVectorizedReader = supportBatch(sparkSession, resultSchema)
val capacity = sqlConf.orcVectorizedReaderBatchSize
- val resultSchemaString = OrcUtils.orcTypeDescriptionString(resultSchema)
- OrcConf.MAPRED_INPUT_SCHEMA.setString(hadoopConf, resultSchemaString)
OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(hadoopConf, sqlConf.caseSensitiveAnalysis)
val broadcastedConf =
@@ -179,16 +177,18 @@ class OrcFileFormat
val fs = filePath.getFileSystem(conf)
val readerOptions = OrcFile.readerOptions(conf).filesystem(fs)
- val requestedColIdsOrEmptyFile =
+ val resultedColPruneInfo =
Utils.tryWithResource(OrcFile.createReader(filePath, readerOptions)) { reader =>
OrcUtils.requestedColumnIds(
isCaseSensitive, dataSchema, requiredSchema, reader, conf)
}
- if (requestedColIdsOrEmptyFile.isEmpty) {
+ if (resultedColPruneInfo.isEmpty) {
Iterator.empty
} else {
- val requestedColIds = requestedColIdsOrEmptyFile.get
+ val (requestedColIds, canPruneCols) = resultedColPruneInfo.get
+ val resultSchemaString = OrcUtils.orcResultSchemaString(canPruneCols,
+ dataSchema, resultSchema, partitionSchema, conf)
assert(requestedColIds.length == requiredSchema.length,
"[BUG] requested column IDs do not match required schema")
val taskConf = new Configuration(conf)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
index d274bcd..e102539 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
@@ -24,7 +24,7 @@ import scala.collection.JavaConverters._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
-import org.apache.orc.{OrcFile, Reader, TypeDescription, Writer}
+import org.apache.orc.{OrcConf, OrcFile, Reader, TypeDescription, Writer}
import org.apache.spark.{SPARK_VERSION_SHORT, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
@@ -116,15 +116,17 @@ object OrcUtils extends Logging {
}
/**
- * Returns the requested column ids from the given ORC file. Column id can be -1, which means the
- * requested column doesn't exist in the ORC file. Returns None if the given ORC file is empty.
+ * @return Returns the combination of requested column ids from the given ORC file and
+ * boolean flag to find if the pruneCols is allowed or not. Requested Column id can be
+ * -1, which means the requested column doesn't exist in the ORC file. Returns None
+ * if the given ORC file is empty.
*/
def requestedColumnIds(
isCaseSensitive: Boolean,
dataSchema: StructType,
requiredSchema: StructType,
reader: Reader,
- conf: Configuration): Option[Array[Int]] = {
+ conf: Configuration): Option[(Array[Int], Boolean)] = {
val orcFieldNames = reader.getSchema.getFieldNames.asScala
if (orcFieldNames.isEmpty) {
// SPARK-8501: Some old empty ORC files always have an empty schema stored in their footer.
@@ -136,6 +138,10 @@ object OrcUtils extends Logging {
assert(orcFieldNames.length <= dataSchema.length, "The given data schema " +
s"${dataSchema.catalogString} has less fields than the actual ORC physical schema, " +
"no idea which columns were dropped, fail to read.")
+ // for ORC file written by Hive, no field names
+ // in the physical schema, there is a need to send the
+ // entire dataSchema instead of required schema.
+ // So pruneCols is not done in this case
Some(requiredSchema.fieldNames.map { name =>
val index = dataSchema.fieldIndex(name)
if (index < orcFieldNames.length) {
@@ -143,7 +149,7 @@ object OrcUtils extends Logging {
} else {
-1
}
- })
+ }, false)
} else {
if (isCaseSensitive) {
Some(requiredSchema.fieldNames.zipWithIndex.map { case (name, idx) =>
@@ -152,7 +158,7 @@ object OrcUtils extends Logging {
} else {
-1
}
- })
+ }, true)
} else {
// Do case-insensitive resolution only if in case-insensitive mode
val caseInsensitiveOrcFieldMap = orcFieldNames.groupBy(_.toLowerCase(Locale.ROOT))
@@ -170,7 +176,7 @@ object OrcUtils extends Logging {
idx
}
}.getOrElse(-1)
- })
+ }, true)
}
}
}
@@ -199,4 +205,25 @@ object OrcUtils extends Logging {
s"map<${orcTypeDescriptionString(m.keyType)},${orcTypeDescriptionString(m.valueType)}>"
case _ => dt.catalogString
}
+
+ /**
+ * @return Returns the result schema string based on the canPruneCols flag.
+ * resultSchemaString will be created using resultsSchema in case of
+ * canPruneCols is true and for canPruneCols as false value
+ * resultSchemaString will be created using the actual dataSchema.
+ */
+ def orcResultSchemaString(
+ canPruneCols: Boolean,
+ dataSchema: StructType,
+ resultSchema: StructType,
+ partitionSchema: StructType,
+ conf: Configuration): String = {
+ val resultSchemaString = if (canPruneCols) {
+ OrcUtils.orcTypeDescriptionString(resultSchema)
+ } else {
+ OrcUtils.orcTypeDescriptionString(StructType(dataSchema.fields ++ partitionSchema.fields))
+ }
+ OrcConf.MAPRED_INPUT_SCHEMA.setString(conf, resultSchemaString)
+ resultSchemaString
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala
index 03d58fd..7f25f7bd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala
@@ -66,24 +66,24 @@ case class OrcPartitionReaderFactory(
override def buildReader(file: PartitionedFile): PartitionReader[InternalRow] = {
val conf = broadcastedConf.value.value
- val resultSchemaString = OrcUtils.orcTypeDescriptionString(resultSchema)
- OrcConf.MAPRED_INPUT_SCHEMA.setString(conf, resultSchemaString)
OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(conf, isCaseSensitive)
val filePath = new Path(new URI(file.filePath))
val fs = filePath.getFileSystem(conf)
val readerOptions = OrcFile.readerOptions(conf).filesystem(fs)
- val requestedColIdsOrEmptyFile =
+ val resultedColPruneInfo =
Utils.tryWithResource(OrcFile.createReader(filePath, readerOptions)) { reader =>
OrcUtils.requestedColumnIds(
isCaseSensitive, dataSchema, readDataSchema, reader, conf)
}
- if (requestedColIdsOrEmptyFile.isEmpty) {
+ if (resultedColPruneInfo.isEmpty) {
new EmptyPartitionReader[InternalRow]
} else {
- val requestedColIds = requestedColIdsOrEmptyFile.get
+ val (requestedColIds, canPruneCols) = resultedColPruneInfo.get
+ val resultSchemaString = OrcUtils.orcResultSchemaString(canPruneCols,
+ dataSchema, resultSchema, partitionSchema, conf)
assert(requestedColIds.length == readDataSchema.length,
"[BUG] requested column IDs do not match required schema")
@@ -112,24 +112,25 @@ case class OrcPartitionReaderFactory(
override def buildColumnarReader(file: PartitionedFile): PartitionReader[ColumnarBatch] = {
val conf = broadcastedConf.value.value
- val resultSchemaString = OrcUtils.orcTypeDescriptionString(resultSchema)
- OrcConf.MAPRED_INPUT_SCHEMA.setString(conf, resultSchemaString)
OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(conf, isCaseSensitive)
val filePath = new Path(new URI(file.filePath))
val fs = filePath.getFileSystem(conf)
val readerOptions = OrcFile.readerOptions(conf).filesystem(fs)
- val requestedColIdsOrEmptyFile =
+ val resultedColPruneInfo =
Utils.tryWithResource(OrcFile.createReader(filePath, readerOptions)) { reader =>
OrcUtils.requestedColumnIds(
isCaseSensitive, dataSchema, readDataSchema, reader, conf)
}
- if (requestedColIdsOrEmptyFile.isEmpty) {
+ if (resultedColPruneInfo.isEmpty) {
new EmptyPartitionReader
} else {
- val requestedColIds = requestedColIdsOrEmptyFile.get ++ Array.fill(partitionSchema.length)(-1)
+ val (requestedDataColIds, canPruneCols) = resultedColPruneInfo.get
+ val resultSchemaString = OrcUtils.orcResultSchemaString(canPruneCols,
+ dataSchema, resultSchema, partitionSchema, conf)
+ val requestedColIds = requestedDataColIds ++ Array.fill(partitionSchema.length)(-1)
assert(requestedColIds.length == resultSchema.length,
"[BUG] requested column IDs do not match required schema")
val taskConf = new Configuration(conf)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala
index 990d942..12ee5be 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala
@@ -288,4 +288,32 @@ class HiveOrcQuerySuite extends OrcQueryTest with TestHiveSingleton {
}
}
}
+
+ test("SPARK-32234 read ORC table with column names all starting with '_col'") {
+ Seq("native", "hive").foreach { orcImpl =>
+ Seq("false", "true").foreach { vectorized =>
+ withSQLConf(
+ SQLConf.ORC_IMPLEMENTATION.key -> orcImpl,
+ SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> vectorized) {
+ withTable("test_hive_orc_impl") {
+ spark.sql(
+ s"""
+ | CREATE TABLE test_hive_orc_impl
+ | (_col1 INT, _col2 STRING, _col3 INT)
+ | STORED AS ORC
+ """.stripMargin)
+ spark.sql(
+ s"""
+ | INSERT INTO
+ | test_hive_orc_impl
+ | VALUES(9, '12', 2020)
+ """.stripMargin)
+
+ val df = spark.sql("SELECT _col2 FROM test_hive_orc_impl")
+ checkAnswer(df, Row("12"))
+ }
+ }
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org