You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2019/04/05 05:35:25 UTC
[spark] branch master updated: [SPARK-27356][SQL] File source V2:
Fix the case that data columns overlap with partition schema
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 568db94 [SPARK-27356][SQL] File source V2: Fix the case that data columns overlap with partition schema
568db94 is described below
commit 568db94e0c3787305fea76c546c558d695ae2951
Author: Gengliang Wang <ge...@databricks.com>
AuthorDate: Fri Apr 5 13:34:46 2019 +0800
[SPARK-27356][SQL] File source V2: Fix the case that data columns overlap with partition schema
## What changes were proposed in this pull request?
In the current file source V2 framework, the schema of `FileScan` is not returned correctly if there are overlap columns between `dataSchema` and `partitionSchema`. The actual schema should be
`dataSchema - overlapSchema + partitionSchema`, which might have different column order from the pushed down `requiredSchema` in `SupportsPushDownRequiredColumns.pruneColumns`.
For example, if the data schema is `[a: String, b: String, c: String]` and the partition schema is `[b: Int, d: Int]`, the result schema is `[a: String, b: Int, c: String, d: Int]` in current `FileTable` and `HadoopFsRelation`. while the actual scan schema is `[a: String, c: String, b: Int, d: Int]` in `FileScan`.
To fix the corner case, this PR proposes that the output schema of `FileTable` should be `dataSchema - overlapSchema + partitionSchema`, so that the column order is consistent with `FileScan`.
Putting all the partition columns to the end of table schema is more reasonable.
## How was this patch tested?
Unit test.
Closes #24284 from gengliangwang/FixReadSchema.
Authored-by: Gengliang Wang <ge...@databricks.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../sql/execution/datasources/v2/FileTable.scala | 12 +-
.../spark/sql/FileBasedDataSourceSuite.scala | 14 ++
.../orc/OrcPartitionDiscoverySuite.scala | 165 +++++++++++++++------
3 files changed, 147 insertions(+), 44 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
index d7284fd..cb816d6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
@@ -70,8 +70,16 @@ abstract class FileTable(
val partitionSchema = fileIndex.partitionSchema
SchemaUtils.checkColumnNameDuplication(partitionSchema.fieldNames,
"in the partition schema", caseSensitive)
- PartitioningUtils.mergeDataAndPartitionSchema(dataSchema,
- partitionSchema, caseSensitive)._1
+ val partitionNameSet: Set[String] =
+ partitionSchema.fields.map(PartitioningUtils.getColName(_, caseSensitive)).toSet
+
+ // When data and partition schemas have overlapping columns,
+ // tableSchema = dataSchema - overlapSchema + partitionSchema
+ val fields = dataSchema.fields.filterNot { field =>
+ val colName = PartitioningUtils.getColName(field, caseSensitive)
+ partitionNameSet.contains(colName)
+ } ++ partitionSchema.fields
+ StructType(fields)
}
override def capabilities(): java.util.Set[TableCapability] = FileTable.CAPABILITIES
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index 1d30cbf..add8a30 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -493,6 +493,20 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo
}
}
}
+
+ test("Return correct results when data columns overlap with partition columns") {
+ Seq("parquet", "orc", "json").foreach { format =>
+ withTempPath { path =>
+ val tablePath = new File(s"${path.getCanonicalPath}/cOl3=c/cOl1=a/cOl5=e")
+ Seq((1, 2, 3, 4, 5)).toDF("cOl1", "cOl2", "cOl3", "cOl4", "cOl5")
+ .write.format(format).save(tablePath.getCanonicalPath)
+
+ val df = spark.read.format(format).load(path.getCanonicalPath)
+ .select("CoL1", "Col2", "CoL5", "CoL3")
+ checkAnswer(df, Row("a", 2, "e", "c"))
+ }
+ }
+ }
}
object TestingUDT {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcPartitionDiscoverySuite.scala
index e1d0254..143e3f0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcPartitionDiscoverySuite.scala
@@ -107,6 +107,68 @@ abstract class OrcPartitionDiscoveryTest extends OrcTest {
}
}
+ test("read partitioned table - with nulls") {
+ withTempDir { base =>
+ for {
+ // Must be `Integer` rather than `Int` here. `null.asInstanceOf[Int]` results in a zero...
+ pi <- Seq(1, null.asInstanceOf[Integer])
+ ps <- Seq("foo", null.asInstanceOf[String])
+ } {
+ makeOrcFile(
+ (1 to 10).map(i => OrcParData(i, i.toString)),
+ makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps))
+ }
+
+ spark.read
+ .option("hive.exec.default.partition.name", defaultPartitionName)
+ .orc(base.getCanonicalPath)
+ .createOrReplaceTempView("t")
+
+ withTempTable("t") {
+ checkAnswer(
+ sql("SELECT * FROM t"),
+ for {
+ i <- 1 to 10
+ pi <- Seq(1, null.asInstanceOf[Integer])
+ ps <- Seq("foo", null.asInstanceOf[String])
+ } yield Row(i, i.toString, pi, ps))
+
+ checkAnswer(
+ sql("SELECT * FROM t WHERE pi IS NULL"),
+ for {
+ i <- 1 to 10
+ ps <- Seq("foo", null.asInstanceOf[String])
+ } yield Row(i, i.toString, null, ps))
+
+ checkAnswer(
+ sql("SELECT * FROM t WHERE ps IS NULL"),
+ for {
+ i <- 1 to 10
+ pi <- Seq(1, null.asInstanceOf[Integer])
+ } yield Row(i, i.toString, pi, null))
+ }
+ }
+ }
+
+ test("SPARK-27162: handle pathfilter configuration correctly") {
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+
+ val df = spark.range(2)
+ df.write.orc(path + "/p=1")
+ df.write.orc(path + "/p=2")
+ assert(spark.read.orc(path).count() === 4)
+
+ val extraOptions = Map(
+ "mapred.input.pathFilter.class" -> classOf[TestFileFilter].getName,
+ "mapreduce.input.pathFilter.class" -> classOf[TestFileFilter].getName
+ )
+ assert(spark.read.options(extraOptions).orc(path).count() === 2)
+ }
+ }
+}
+
+class OrcPartitionDiscoverySuite extends OrcPartitionDiscoveryTest with SharedSQLContext {
test("read partitioned table - partition key included in orc file") {
withTempDir { base =>
for {
@@ -127,7 +189,7 @@ abstract class OrcPartitionDiscoveryTest extends OrcTest {
i <- 1 to 10
pi <- Seq(1, 2)
ps <- Seq("foo", "bar")
- } yield Row(i, pi, i.toString, ps))
+ } yield Row(i, i.toString, pi, ps))
checkAnswer(
sql("SELECT intField, pi FROM t"),
@@ -142,28 +204,26 @@ abstract class OrcPartitionDiscoveryTest extends OrcTest {
for {
i <- 1 to 10
ps <- Seq("foo", "bar")
- } yield Row(i, 1, i.toString, ps))
+ } yield Row(i, i.toString, 1, ps))
checkAnswer(
sql("SELECT * FROM t WHERE ps = 'foo'"),
for {
i <- 1 to 10
pi <- Seq(1, 2)
- } yield Row(i, pi, i.toString, "foo"))
+ } yield Row(i, i.toString, pi, "foo"))
}
}
}
-
- test("read partitioned table - with nulls") {
+ test("read partitioned table - with nulls and partition keys are included in Orc file") {
withTempDir { base =>
for {
- // Must be `Integer` rather than `Int` here. `null.asInstanceOf[Int]` results in a zero...
- pi <- Seq(1, null.asInstanceOf[Integer])
+ pi <- Seq(1, 2)
ps <- Seq("foo", null.asInstanceOf[String])
} {
makeOrcFile(
- (1 to 10).map(i => OrcParData(i, i.toString)),
+ (1 to 10).map(i => OrcParDataWithKey(i, pi, i.toString, ps)),
makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps))
}
@@ -177,23 +237,71 @@ abstract class OrcPartitionDiscoveryTest extends OrcTest {
sql("SELECT * FROM t"),
for {
i <- 1 to 10
- pi <- Seq(1, null.asInstanceOf[Integer])
+ pi <- Seq(1, 2)
ps <- Seq("foo", null.asInstanceOf[String])
} yield Row(i, i.toString, pi, ps))
checkAnswer(
- sql("SELECT * FROM t WHERE pi IS NULL"),
+ sql("SELECT * FROM t WHERE ps IS NULL"),
for {
i <- 1 to 10
- ps <- Seq("foo", null.asInstanceOf[String])
- } yield Row(i, i.toString, null, ps))
+ pi <- Seq(1, 2)
+ } yield Row(i, i.toString, pi, null))
+ }
+ }
+ }
+}
+class OrcV1PartitionDiscoverySuite extends OrcPartitionDiscoveryTest with SharedSQLContext {
+ override protected def sparkConf: SparkConf =
+ super
+ .sparkConf
+ .set(SQLConf.USE_V1_SOURCE_READER_LIST, "orc")
+ .set(SQLConf.USE_V1_SOURCE_WRITER_LIST, "orc")
+
+ test("read partitioned table - partition key included in orc file") {
+ withTempDir { base =>
+ for {
+ pi <- Seq(1, 2)
+ ps <- Seq("foo", "bar")
+ } {
+ makeOrcFile(
+ (1 to 10).map(i => OrcParDataWithKey(i, pi, i.toString, ps)),
+ makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps))
+ }
+
+ spark.read.orc(base.getCanonicalPath).createOrReplaceTempView("t")
+
+ withTempTable("t") {
checkAnswer(
- sql("SELECT * FROM t WHERE ps IS NULL"),
+ sql("SELECT * FROM t"),
for {
i <- 1 to 10
- pi <- Seq(1, null.asInstanceOf[Integer])
- } yield Row(i, i.toString, pi, null))
+ pi <- Seq(1, 2)
+ ps <- Seq("foo", "bar")
+ } yield Row(i, pi, i.toString, ps))
+
+ checkAnswer(
+ sql("SELECT intField, pi FROM t"),
+ for {
+ i <- 1 to 10
+ pi <- Seq(1, 2)
+ _ <- Seq("foo", "bar")
+ } yield Row(i, pi))
+
+ checkAnswer(
+ sql("SELECT * FROM t WHERE pi = 1"),
+ for {
+ i <- 1 to 10
+ ps <- Seq("foo", "bar")
+ } yield Row(i, 1, i.toString, ps))
+
+ checkAnswer(
+ sql("SELECT * FROM t WHERE ps = 'foo'"),
+ for {
+ i <- 1 to 10
+ pi <- Seq(1, 2)
+ } yield Row(i, pi, i.toString, "foo"))
}
}
}
@@ -232,31 +340,4 @@ abstract class OrcPartitionDiscoveryTest extends OrcTest {
}
}
}
-
- test("SPARK-27162: handle pathfilter configuration correctly") {
- withTempPath { dir =>
- val path = dir.getCanonicalPath
-
- val df = spark.range(2)
- df.write.orc(path + "/p=1")
- df.write.orc(path + "/p=2")
- assert(spark.read.orc(path).count() === 4)
-
- val extraOptions = Map(
- "mapred.input.pathFilter.class" -> classOf[TestFileFilter].getName,
- "mapreduce.input.pathFilter.class" -> classOf[TestFileFilter].getName
- )
- assert(spark.read.options(extraOptions).orc(path).count() === 2)
- }
- }
-}
-
-class OrcPartitionDiscoverySuite extends OrcPartitionDiscoveryTest with SharedSQLContext
-
-class OrcV1PartitionDiscoverySuite extends OrcPartitionDiscoveryTest with SharedSQLContext {
- override protected def sparkConf: SparkConf =
- super
- .sparkConf
- .set(SQLConf.USE_V1_SOURCE_READER_LIST, "orc")
- .set(SQLConf.USE_V1_SOURCE_WRITER_LIST, "orc")
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org