You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2017/09/03 12:03:24 UTC
spark git commit: [SPARK-21897][PYTHON][R] Add unionByName API to
DataFrame in Python and R
Repository: spark
Updated Branches:
refs/heads/master acb7fed23 -> 07fd68a29
[SPARK-21897][PYTHON][R] Add unionByName API to DataFrame in Python and R
## What changes were proposed in this pull request?
This PR proposes to add a wrapper for `unionByName` API to R and Python as well.
**Python**
```python
df1 = spark.createDataFrame([[1, 2, 3]], ["col0", "col1", "col2"])
df2 = spark.createDataFrame([[4, 5, 6]], ["col1", "col2", "col0"])
df1.unionByName(df2).show()
```
```
+----+----+----+
|col0|col1|col3|
+----+----+----+
| 1| 2| 3|
| 6| 4| 5|
+----+----+----+
```
**R**
```R
df1 <- select(createDataFrame(mtcars), "carb", "am", "gear")
df2 <- select(createDataFrame(mtcars), "am", "gear", "carb")
head(unionByName(limit(df1, 2), limit(df2, 2)))
```
```
carb am gear
1 4 1 4
2 4 1 4
3 4 1 4
4 4 1 4
```
## How was this patch tested?
Doctests for Python and unit test added in `test_sparkSQL.R` for R.
Author: hyukjinkwon <gu...@gmail.com>
Closes #19105 from HyukjinKwon/unionByName-r-python.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/07fd68a2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/07fd68a2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/07fd68a2
Branch: refs/heads/master
Commit: 07fd68a29fb6cad960b5ac72718bb05decf28a1a
Parents: acb7fed
Author: hyukjinkwon <gu...@gmail.com>
Authored: Sun Sep 3 21:03:21 2017 +0900
Committer: hyukjinkwon <gu...@gmail.com>
Committed: Sun Sep 3 21:03:21 2017 +0900
----------------------------------------------------------------------
R/pkg/NAMESPACE | 1 +
R/pkg/R/DataFrame.R | 38 ++++++++++++++++++++++++++++--
R/pkg/R/generics.R | 4 ++++
R/pkg/tests/fulltests/test_sparkSQL.R | 9 ++++++-
python/pyspark/sql/dataframe.py | 28 +++++++++++++++++++---
5 files changed, 74 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/07fd68a2/R/pkg/NAMESPACE
----------------------------------------------------------------------
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index a1dd1af..3fc756b 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -169,6 +169,7 @@ exportMethods("arrange",
"transform",
"union",
"unionAll",
+ "unionByName",
"unique",
"unpersist",
"where",
http://git-wip-us.apache.org/repos/asf/spark/blob/07fd68a2/R/pkg/R/DataFrame.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index 80526cd..1b46c1e 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -2683,7 +2683,7 @@ generateAliasesForIntersectedCols <- function (x, intersectedColNames, suffix) {
#' @rdname union
#' @name union
#' @aliases union,SparkDataFrame,SparkDataFrame-method
-#' @seealso \link{rbind}
+#' @seealso \link{rbind} \link{unionByName}
#' @export
#' @examples
#'\dontrun{
@@ -2714,6 +2714,40 @@ setMethod("unionAll",
union(x, y)
})
+#' Return a new SparkDataFrame containing the union of rows, matched by column names
+#'
+#' Return a new SparkDataFrame containing the union of rows in this SparkDataFrame
+#' and another SparkDataFrame. This is different from \code{union} function, and both
+#' \code{UNION ALL} and \code{UNION DISTINCT} in SQL as column positions are not taken
+#' into account. Input SparkDataFrames can have different data types in the schema.
+#'
+#' Note: This does not remove duplicate rows across the two SparkDataFrames.
+#' This function resolves columns by name (not by position).
+#'
+#' @param x A SparkDataFrame
+#' @param y A SparkDataFrame
+#' @return A SparkDataFrame containing the result of the union.
+#' @family SparkDataFrame functions
+#' @rdname unionByName
+#' @name unionByName
+#' @aliases unionByName,SparkDataFrame,SparkDataFrame-method
+#' @seealso \link{rbind} \link{union}
+#' @export
+#' @examples
+#'\dontrun{
+#' sparkR.session()
+#' df1 <- select(createDataFrame(mtcars), "carb", "am", "gear")
+#' df2 <- select(createDataFrame(mtcars), "am", "gear", "carb")
+#' head(unionByName(df1, df2))
+#' }
+#' @note unionByName since 2.3.0
+setMethod("unionByName",
+ signature(x = "SparkDataFrame", y = "SparkDataFrame"),
+ function(x, y) {
+ unioned <- callJMethod(x@sdf, "unionByName", y@sdf)
+ dataFrame(unioned)
+ })
+
#' Union two or more SparkDataFrames
#'
#' Union two or more SparkDataFrames by row. As in R's \code{rbind}, this method
@@ -2730,7 +2764,7 @@ setMethod("unionAll",
#' @aliases rbind,SparkDataFrame-method
#' @rdname rbind
#' @name rbind
-#' @seealso \link{union}
+#' @seealso \link{union} \link{unionByName}
#' @export
#' @examples
#'\dontrun{
http://git-wip-us.apache.org/repos/asf/spark/blob/07fd68a2/R/pkg/R/generics.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index f0cc2dc..603ff4e 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -769,6 +769,10 @@ setGeneric("union", function(x, y) { standardGeneric("union") })
#' @export
setGeneric("unionAll", function(x, y) { standardGeneric("unionAll") })
+#' @rdname unionByName
+#' @export
+setGeneric("unionByName", function(x, y) { standardGeneric("unionByName") })
+
#' @rdname unpersist
#' @export
setGeneric("unpersist", function(x, ...) { standardGeneric("unpersist") })
http://git-wip-us.apache.org/repos/asf/spark/blob/07fd68a2/R/pkg/tests/fulltests/test_sparkSQL.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R
index d477fc6..7abc872 100644
--- a/R/pkg/tests/fulltests/test_sparkSQL.R
+++ b/R/pkg/tests/fulltests/test_sparkSQL.R
@@ -2255,7 +2255,7 @@ test_that("isLocal()", {
expect_false(isLocal(df))
})
-test_that("union(), rbind(), except(), and intersect() on a DataFrame", {
+test_that("union(), unionByName(), rbind(), except(), and intersect() on a DataFrame", {
df <- read.json(jsonPath)
lines <- c("{\"name\":\"Bob\", \"age\":24}",
@@ -2271,6 +2271,13 @@ test_that("union(), rbind(), except(), and intersect() on a DataFrame", {
expect_equal(first(unioned)$name, "Michael")
expect_equal(count(arrange(suppressWarnings(unionAll(df, df2)), df$age)), 6)
+ df1 <- select(df2, "age", "name")
+ unioned1 <- arrange(unionByName(df1, df), df1$age)
+ expect_is(unioned, "SparkDataFrame")
+ expect_equal(count(unioned), 6)
+ # Here, we test if 'Michael' in df is correctly mapped to the same name.
+ expect_equal(first(unioned)$name, "Michael")
+
unioned2 <- arrange(rbind(unioned, df, df2), df$age)
expect_is(unioned2, "SparkDataFrame")
expect_equal(count(unioned2), 12)
http://git-wip-us.apache.org/repos/asf/spark/blob/07fd68a2/python/pyspark/sql/dataframe.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index c19e599..1cea130 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -1290,7 +1290,7 @@ class DataFrame(object):
""" Return a new :class:`DataFrame` containing union of rows in this and another frame.
This is equivalent to `UNION ALL` in SQL. To do a SQL-style set union
- (that does deduplication of elements), use this function followed by a distinct.
+ (that does deduplication of elements), use this function followed by :func:`distinct`.
Also as standard in SQL, this function resolves columns by position (not by name).
"""
@@ -1301,14 +1301,36 @@ class DataFrame(object):
""" Return a new :class:`DataFrame` containing union of rows in this and another frame.
This is equivalent to `UNION ALL` in SQL. To do a SQL-style set union
- (that does deduplication of elements), use this function followed by a distinct.
+ (that does deduplication of elements), use this function followed by :func:`distinct`.
Also as standard in SQL, this function resolves columns by position (not by name).
- .. note:: Deprecated in 2.0, use union instead.
+ .. note:: Deprecated in 2.0, use :func:`union` instead.
"""
return self.union(other)
+ @since(2.3)
+ def unionByName(self, other):
+ """ Returns a new :class:`DataFrame` containing union of rows in this and another frame.
+
+ This is different from both `UNION ALL` and `UNION DISTINCT` in SQL. To do a SQL-style set
+ union (that does deduplication of elements), use this function followed by :func:`distinct`.
+
+ The difference between this function and :func:`union` is that this function
+ resolves columns by name (not by position):
+
+ >>> df1 = spark.createDataFrame([[1, 2, 3]], ["col0", "col1", "col2"])
+ >>> df2 = spark.createDataFrame([[4, 5, 6]], ["col1", "col2", "col0"])
+ >>> df1.unionByName(df2).show()
+ +----+----+----+
+ |col0|col1|col2|
+ +----+----+----+
+ | 1| 2| 3|
+ | 6| 4| 5|
+ +----+----+----+
+ """
+ return DataFrame(self._jdf.unionByName(other._jdf), self.sql_ctx)
+
@since(1.3)
def intersect(self, other):
""" Return a new :class:`DataFrame` containing rows only in
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org