You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2019/04/05 05:35:25 UTC

[spark] branch master updated: [SPARK-27356][SQL] File source V2: Fix the case that data columns overlap with partition schema

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 568db94  [SPARK-27356][SQL] File source V2: Fix the case that data columns overlap with partition schema
568db94 is described below

commit 568db94e0c3787305fea76c546c558d695ae2951
Author: Gengliang Wang <ge...@databricks.com>
AuthorDate: Fri Apr 5 13:34:46 2019 +0800

    [SPARK-27356][SQL] File source V2: Fix the case that data columns overlap with partition schema
    
    ## What changes were proposed in this pull request?
    
    In the current file source V2 framework, the schema of `FileScan` is not returned correctly if there are overlap columns between `dataSchema` and `partitionSchema`. The actual schema should be
    `dataSchema - overlapSchema + partitionSchema`, which might have different column order from the pushed down `requiredSchema` in `SupportsPushDownRequiredColumns.pruneColumns`.
    
    For example, if the data schema is `[a: String, b: String, c: String]` and the partition schema is `[b: Int, d: Int]`, the result schema is `[a: String, b: Int, c: String, d: Int]` in current `FileTable` and `HadoopFsRelation`. while the actual scan schema is `[a: String, c: String, b: Int, d: Int]` in `FileScan`.
    
    To fix the corner case, this PR proposes that the output schema of `FileTable` should be `dataSchema - overlapSchema + partitionSchema`, so that the column order is consistent with `FileScan`.
    Putting all the partition columns to the end of table schema is more reasonable.
    
    ## How was this patch tested?
    
    Unit test.
    
    Closes #24284 from gengliangwang/FixReadSchema.
    
    Authored-by: Gengliang Wang <ge...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../sql/execution/datasources/v2/FileTable.scala   |  12 +-
 .../spark/sql/FileBasedDataSourceSuite.scala       |  14 ++
 .../orc/OrcPartitionDiscoverySuite.scala           | 165 +++++++++++++++------
 3 files changed, 147 insertions(+), 44 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
index d7284fd..cb816d6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
@@ -70,8 +70,16 @@ abstract class FileTable(
     val partitionSchema = fileIndex.partitionSchema
     SchemaUtils.checkColumnNameDuplication(partitionSchema.fieldNames,
       "in the partition schema", caseSensitive)
-    PartitioningUtils.mergeDataAndPartitionSchema(dataSchema,
-      partitionSchema, caseSensitive)._1
+    val partitionNameSet: Set[String] =
+      partitionSchema.fields.map(PartitioningUtils.getColName(_, caseSensitive)).toSet
+
+    // When data and partition schemas have overlapping columns,
+    // tableSchema = dataSchema - overlapSchema + partitionSchema
+    val fields = dataSchema.fields.filterNot { field =>
+      val colName = PartitioningUtils.getColName(field, caseSensitive)
+      partitionNameSet.contains(colName)
+    } ++ partitionSchema.fields
+    StructType(fields)
   }
 
   override def capabilities(): java.util.Set[TableCapability] = FileTable.CAPABILITIES
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index 1d30cbf..add8a30 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -493,6 +493,20 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo
       }
     }
   }
+
+  test("Return correct results when data columns overlap with partition columns") {
+    Seq("parquet", "orc", "json").foreach { format =>
+      withTempPath { path =>
+        val tablePath = new File(s"${path.getCanonicalPath}/cOl3=c/cOl1=a/cOl5=e")
+        Seq((1, 2, 3, 4, 5)).toDF("cOl1", "cOl2", "cOl3", "cOl4", "cOl5")
+          .write.format(format).save(tablePath.getCanonicalPath)
+
+        val df = spark.read.format(format).load(path.getCanonicalPath)
+          .select("CoL1", "Col2", "CoL5", "CoL3")
+        checkAnswer(df, Row("a", 2, "e", "c"))
+      }
+    }
+  }
 }
 
 object TestingUDT {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcPartitionDiscoverySuite.scala
index e1d0254..143e3f0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcPartitionDiscoverySuite.scala
@@ -107,6 +107,68 @@ abstract class OrcPartitionDiscoveryTest extends OrcTest {
     }
   }
 
+  test("read partitioned table - with nulls") {
+    withTempDir { base =>
+      for {
+      // Must be `Integer` rather than `Int` here. `null.asInstanceOf[Int]` results in a zero...
+        pi <- Seq(1, null.asInstanceOf[Integer])
+        ps <- Seq("foo", null.asInstanceOf[String])
+      } {
+        makeOrcFile(
+          (1 to 10).map(i => OrcParData(i, i.toString)),
+          makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps))
+      }
+
+      spark.read
+        .option("hive.exec.default.partition.name", defaultPartitionName)
+        .orc(base.getCanonicalPath)
+        .createOrReplaceTempView("t")
+
+      withTempTable("t") {
+        checkAnswer(
+          sql("SELECT * FROM t"),
+          for {
+            i <- 1 to 10
+            pi <- Seq(1, null.asInstanceOf[Integer])
+            ps <- Seq("foo", null.asInstanceOf[String])
+          } yield Row(i, i.toString, pi, ps))
+
+        checkAnswer(
+          sql("SELECT * FROM t WHERE pi IS NULL"),
+          for {
+            i <- 1 to 10
+            ps <- Seq("foo", null.asInstanceOf[String])
+          } yield Row(i, i.toString, null, ps))
+
+        checkAnswer(
+          sql("SELECT * FROM t WHERE ps IS NULL"),
+          for {
+            i <- 1 to 10
+            pi <- Seq(1, null.asInstanceOf[Integer])
+          } yield Row(i, i.toString, pi, null))
+      }
+    }
+  }
+
+  test("SPARK-27162: handle pathfilter configuration correctly") {
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+
+      val df = spark.range(2)
+      df.write.orc(path + "/p=1")
+      df.write.orc(path + "/p=2")
+      assert(spark.read.orc(path).count() === 4)
+
+      val extraOptions = Map(
+        "mapred.input.pathFilter.class" -> classOf[TestFileFilter].getName,
+        "mapreduce.input.pathFilter.class" -> classOf[TestFileFilter].getName
+      )
+      assert(spark.read.options(extraOptions).orc(path).count() === 2)
+    }
+  }
+}
+
+class OrcPartitionDiscoverySuite extends OrcPartitionDiscoveryTest with SharedSQLContext {
   test("read partitioned table - partition key included in orc file") {
     withTempDir { base =>
       for {
@@ -127,7 +189,7 @@ abstract class OrcPartitionDiscoveryTest extends OrcTest {
             i <- 1 to 10
             pi <- Seq(1, 2)
             ps <- Seq("foo", "bar")
-          } yield Row(i, pi, i.toString, ps))
+          } yield Row(i, i.toString, pi, ps))
 
         checkAnswer(
           sql("SELECT intField, pi FROM t"),
@@ -142,28 +204,26 @@ abstract class OrcPartitionDiscoveryTest extends OrcTest {
           for {
             i <- 1 to 10
             ps <- Seq("foo", "bar")
-          } yield Row(i, 1, i.toString, ps))
+          } yield Row(i, i.toString, 1, ps))
 
         checkAnswer(
           sql("SELECT * FROM t WHERE ps = 'foo'"),
           for {
             i <- 1 to 10
             pi <- Seq(1, 2)
-          } yield Row(i, pi, i.toString, "foo"))
+          } yield Row(i, i.toString, pi, "foo"))
       }
     }
   }
 
-
-  test("read partitioned table - with nulls") {
+  test("read partitioned table - with nulls and partition keys are included in Orc file") {
     withTempDir { base =>
       for {
-      // Must be `Integer` rather than `Int` here. `null.asInstanceOf[Int]` results in a zero...
-        pi <- Seq(1, null.asInstanceOf[Integer])
+        pi <- Seq(1, 2)
         ps <- Seq("foo", null.asInstanceOf[String])
       } {
         makeOrcFile(
-          (1 to 10).map(i => OrcParData(i, i.toString)),
+          (1 to 10).map(i => OrcParDataWithKey(i, pi, i.toString, ps)),
           makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps))
       }
 
@@ -177,23 +237,71 @@ abstract class OrcPartitionDiscoveryTest extends OrcTest {
           sql("SELECT * FROM t"),
           for {
             i <- 1 to 10
-            pi <- Seq(1, null.asInstanceOf[Integer])
+            pi <- Seq(1, 2)
             ps <- Seq("foo", null.asInstanceOf[String])
           } yield Row(i, i.toString, pi, ps))
 
         checkAnswer(
-          sql("SELECT * FROM t WHERE pi IS NULL"),
+          sql("SELECT * FROM t WHERE ps IS NULL"),
           for {
             i <- 1 to 10
-            ps <- Seq("foo", null.asInstanceOf[String])
-          } yield Row(i, i.toString, null, ps))
+            pi <- Seq(1, 2)
+          } yield Row(i, i.toString, pi, null))
+      }
+    }
+  }
+}
 
+class OrcV1PartitionDiscoverySuite extends OrcPartitionDiscoveryTest with SharedSQLContext {
+  override protected def sparkConf: SparkConf =
+    super
+      .sparkConf
+      .set(SQLConf.USE_V1_SOURCE_READER_LIST, "orc")
+      .set(SQLConf.USE_V1_SOURCE_WRITER_LIST, "orc")
+
+  test("read partitioned table - partition key included in orc file") {
+    withTempDir { base =>
+      for {
+        pi <- Seq(1, 2)
+        ps <- Seq("foo", "bar")
+      } {
+        makeOrcFile(
+          (1 to 10).map(i => OrcParDataWithKey(i, pi, i.toString, ps)),
+          makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps))
+      }
+
+      spark.read.orc(base.getCanonicalPath).createOrReplaceTempView("t")
+
+      withTempTable("t") {
         checkAnswer(
-          sql("SELECT * FROM t WHERE ps IS NULL"),
+          sql("SELECT * FROM t"),
           for {
             i <- 1 to 10
-            pi <- Seq(1, null.asInstanceOf[Integer])
-          } yield Row(i, i.toString, pi, null))
+            pi <- Seq(1, 2)
+            ps <- Seq("foo", "bar")
+          } yield Row(i, pi, i.toString, ps))
+
+        checkAnswer(
+          sql("SELECT intField, pi FROM t"),
+          for {
+            i <- 1 to 10
+            pi <- Seq(1, 2)
+            _ <- Seq("foo", "bar")
+          } yield Row(i, pi))
+
+        checkAnswer(
+          sql("SELECT * FROM t WHERE pi = 1"),
+          for {
+            i <- 1 to 10
+            ps <- Seq("foo", "bar")
+          } yield Row(i, 1, i.toString, ps))
+
+        checkAnswer(
+          sql("SELECT * FROM t WHERE ps = 'foo'"),
+          for {
+            i <- 1 to 10
+            pi <- Seq(1, 2)
+          } yield Row(i, pi, i.toString, "foo"))
       }
     }
   }
@@ -232,31 +340,4 @@ abstract class OrcPartitionDiscoveryTest extends OrcTest {
       }
     }
   }
-
-  test("SPARK-27162: handle pathfilter configuration correctly") {
-    withTempPath { dir =>
-      val path = dir.getCanonicalPath
-
-      val df = spark.range(2)
-      df.write.orc(path + "/p=1")
-      df.write.orc(path + "/p=2")
-      assert(spark.read.orc(path).count() === 4)
-
-      val extraOptions = Map(
-        "mapred.input.pathFilter.class" -> classOf[TestFileFilter].getName,
-        "mapreduce.input.pathFilter.class" -> classOf[TestFileFilter].getName
-      )
-      assert(spark.read.options(extraOptions).orc(path).count() === 2)
-    }
-  }
-}
-
-class OrcPartitionDiscoverySuite extends OrcPartitionDiscoveryTest with SharedSQLContext
-
-class OrcV1PartitionDiscoverySuite extends OrcPartitionDiscoveryTest with SharedSQLContext {
-  override protected def sparkConf: SparkConf =
-    super
-      .sparkConf
-      .set(SQLConf.USE_V1_SOURCE_READER_LIST, "orc")
-      .set(SQLConf.USE_V1_SOURCE_WRITER_LIST, "orc")
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org