You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/07/11 03:16:33 UTC
spark git commit: [SPARK-21043][SQL] Add unionByName in Dataset
Repository: spark
Updated Branches:
refs/heads/master c3713fde8 -> a2bec6c92
[SPARK-21043][SQL] Add unionByName in Dataset
## What changes were proposed in this pull request?
This pr added `unionByName` in `DataSet`.
Here is how to use:
```
val df1 = Seq((1, 2, 3)).toDF("col0", "col1", "col2")
val df2 = Seq((4, 5, 6)).toDF("col1", "col2", "col0")
df1.unionByName(df2).show
// output:
// +----+----+----+
// |col0|col1|col2|
// +----+----+----+
// | 1| 2| 3|
// | 6| 4| 5|
// +----+----+----+
```
## How was this patch tested?
Added tests in `DataFrameSuite`.
Author: Takeshi Yamamuro <ya...@apache.org>
Closes #18300 from maropu/SPARK-21043-2.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a2bec6c9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a2bec6c9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a2bec6c9
Branch: refs/heads/master
Commit: a2bec6c92a063f4a8e9ed75a9f3f06808485b6d7
Parents: c3713fd
Author: Takeshi Yamamuro <ya...@apache.org>
Authored: Mon Jul 10 20:16:29 2017 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Mon Jul 10 20:16:29 2017 -0700
----------------------------------------------------------------------
.../scala/org/apache/spark/sql/Dataset.scala | 60 ++++++++++++++
.../org/apache/spark/sql/DataFrameSuite.scala | 87 ++++++++++++++++++++
2 files changed, 147 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/a2bec6c9/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index a777383..7f3ae05 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -53,6 +53,7 @@ import org.apache.spark.sql.execution.python.EvaluatePython
import org.apache.spark.sql.execution.stat.StatFunctions
import org.apache.spark.sql.streaming.DataStreamWriter
import org.apache.spark.sql.types._
+import org.apache.spark.sql.util.SchemaUtils
import org.apache.spark.storage.StorageLevel
import org.apache.spark.unsafe.types.CalendarInterval
import org.apache.spark.util.Utils
@@ -1735,6 +1736,65 @@ class Dataset[T] private[sql](
}
/**
+ * Returns a new Dataset containing union of rows in this Dataset and another Dataset.
+ *
+ * This is different from both `UNION ALL` and `UNION DISTINCT` in SQL. To do a SQL-style set
+ * union (that does deduplication of elements), use this function followed by a [[distinct]].
+ *
+ * The difference between this function and [[union]] is that this function
+ * resolves columns by name (not by position):
+ *
+ * {{{
+ * val df1 = Seq((1, 2, 3)).toDF("col0", "col1", "col2")
+ * val df2 = Seq((4, 5, 6)).toDF("col1", "col2", "col0")
+ * df1.unionByName(df2).show
+ *
+ * // output:
+ * // +----+----+----+
+ * // |col0|col1|col2|
+ * // +----+----+----+
+ * // | 1| 2| 3|
+ * // | 6| 4| 5|
+ * // +----+----+----+
+ * }}}
+ *
+ * @group typedrel
+ * @since 2.3.0
+ */
+ def unionByName(other: Dataset[T]): Dataset[T] = withSetOperator {
+ // Check column name duplication
+ val resolver = sparkSession.sessionState.analyzer.resolver
+ val leftOutputAttrs = logicalPlan.output
+ val rightOutputAttrs = other.logicalPlan.output
+
+ SchemaUtils.checkColumnNameDuplication(
+ leftOutputAttrs.map(_.name),
+ "in the left attributes",
+ sparkSession.sessionState.conf.caseSensitiveAnalysis)
+ SchemaUtils.checkColumnNameDuplication(
+ rightOutputAttrs.map(_.name),
+ "in the right attributes",
+ sparkSession.sessionState.conf.caseSensitiveAnalysis)
+
+ // Builds a project list for `other` based on `logicalPlan` output names
+ val rightProjectList = leftOutputAttrs.map { lattr =>
+ rightOutputAttrs.find { rattr => resolver(lattr.name, rattr.name) }.getOrElse {
+ throw new AnalysisException(
+ s"""Cannot resolve column name "${lattr.name}" among """ +
+ s"""(${rightOutputAttrs.map(_.name).mkString(", ")})""")
+ }
+ }
+
+ // Delegates failure checks to `CheckAnalysis`
+ val notFoundAttrs = rightOutputAttrs.diff(rightProjectList)
+ val rightChild = Project(rightProjectList ++ notFoundAttrs, other.logicalPlan)
+
+ // This breaks caching, but it's usually ok because it addresses a very specific use case:
+ // using union to union many files or partitions.
+ CombineUnions(Union(logicalPlan, rightChild))
+ }
+
+ /**
* Returns a new Dataset containing rows only in both this Dataset and another Dataset.
* This is equivalent to `INTERSECT` in SQL.
*
http://git-wip-us.apache.org/repos/asf/spark/blob/a2bec6c9/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index a5a2e1c..5ae2703 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -111,6 +111,93 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
)
}
+ test("union by name") {
+ var df1 = Seq((1, 2, 3)).toDF("a", "b", "c")
+ var df2 = Seq((3, 1, 2)).toDF("c", "a", "b")
+ val df3 = Seq((2, 3, 1)).toDF("b", "c", "a")
+ val unionDf = df1.unionByName(df2.unionByName(df3))
+ checkAnswer(unionDf,
+ Row(1, 2, 3) :: Row(1, 2, 3) :: Row(1, 2, 3) :: Nil
+ )
+
+ // Check if adjacent unions are combined into a single one
+ assert(unionDf.queryExecution.optimizedPlan.collect { case u: Union => true }.size == 1)
+
+ // Check failure cases
+ df1 = Seq((1, 2)).toDF("a", "c")
+ df2 = Seq((3, 4, 5)).toDF("a", "b", "c")
+ var errMsg = intercept[AnalysisException] {
+ df1.unionByName(df2)
+ }.getMessage
+ assert(errMsg.contains(
+ "Union can only be performed on tables with the same number of columns, " +
+ "but the first table has 2 columns and the second table has 3 columns"))
+
+ df1 = Seq((1, 2, 3)).toDF("a", "b", "c")
+ df2 = Seq((4, 5, 6)).toDF("a", "c", "d")
+ errMsg = intercept[AnalysisException] {
+ df1.unionByName(df2)
+ }.getMessage
+ assert(errMsg.contains("""Cannot resolve column name "b" among (a, c, d)"""))
+ }
+
+ test("union by name - type coercion") {
+ var df1 = Seq((1, "a")).toDF("c0", "c1")
+ var df2 = Seq((3, 1L)).toDF("c1", "c0")
+ checkAnswer(df1.unionByName(df2), Row(1L, "a") :: Row(1L, "3") :: Nil)
+
+ df1 = Seq((1, 1.0)).toDF("c0", "c1")
+ df2 = Seq((8L, 3.0)).toDF("c1", "c0")
+ checkAnswer(df1.unionByName(df2), Row(1.0, 1.0) :: Row(3.0, 8.0) :: Nil)
+
+ df1 = Seq((2.0f, 7.4)).toDF("c0", "c1")
+ df2 = Seq(("a", 4.0)).toDF("c1", "c0")
+ checkAnswer(df1.unionByName(df2), Row(2.0, "7.4") :: Row(4.0, "a") :: Nil)
+
+ df1 = Seq((1, "a", 3.0)).toDF("c0", "c1", "c2")
+ df2 = Seq((1.2, 2, "bc")).toDF("c2", "c0", "c1")
+ val df3 = Seq(("def", 1.2, 3)).toDF("c1", "c2", "c0")
+ checkAnswer(df1.unionByName(df2.unionByName(df3)),
+ Row(1, "a", 3.0) :: Row(2, "bc", 1.2) :: Row(3, "def", 1.2) :: Nil
+ )
+ }
+
+ test("union by name - check case sensitivity") {
+ def checkCaseSensitiveTest(): Unit = {
+ val df1 = Seq((1, 2, 3)).toDF("ab", "cd", "ef")
+ val df2 = Seq((4, 5, 6)).toDF("cd", "ef", "AB")
+ checkAnswer(df1.unionByName(df2), Row(1, 2, 3) :: Row(6, 4, 5) :: Nil)
+ }
+ withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
+ val errMsg2 = intercept[AnalysisException] {
+ checkCaseSensitiveTest()
+ }.getMessage
+ assert(errMsg2.contains("""Cannot resolve column name "ab" among (cd, ef, AB)"""))
+ }
+ withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
+ checkCaseSensitiveTest()
+ }
+ }
+
+ test("union by name - check name duplication") {
+ Seq((true, ("a", "a")), (false, ("aA", "Aa"))).foreach { case (caseSensitive, (c0, c1)) =>
+ withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
+ var df1 = Seq((1, 1)).toDF(c0, c1)
+ var df2 = Seq((1, 1)).toDF("c0", "c1")
+ var errMsg = intercept[AnalysisException] {
+ df1.unionByName(df2)
+ }.getMessage
+ assert(errMsg.contains("Found duplicate column(s) in the left attributes:"))
+ df1 = Seq((1, 1)).toDF("c0", "c1")
+ df2 = Seq((1, 1)).toDF(c0, c1)
+ errMsg = intercept[AnalysisException] {
+ df1.unionByName(df2)
+ }.getMessage
+ assert(errMsg.contains("Found duplicate column(s) in the right attributes:"))
+ }
+ }
+ }
+
test("empty data frame") {
assert(spark.emptyDataFrame.columns.toSeq === Seq.empty[String])
assert(spark.emptyDataFrame.count() === 0)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org