You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2017/09/03 12:03:24 UTC

spark git commit: [SPARK-21897][PYTHON][R] Add unionByName API to DataFrame in Python and R

Repository: spark
Updated Branches:
  refs/heads/master acb7fed23 -> 07fd68a29


[SPARK-21897][PYTHON][R] Add unionByName API to DataFrame in Python and R

## What changes were proposed in this pull request?

This PR proposes to add a wrapper for `unionByName` API to R and Python as well.

**Python**

```python
df1 = spark.createDataFrame([[1, 2, 3]], ["col0", "col1", "col2"])
df2 = spark.createDataFrame([[4, 5, 6]], ["col1", "col2", "col0"])
df1.unionByName(df2).show()
```

```
+----+----+----+
|col0|col1|col3|
+----+----+----+
|   1|   2|   3|
|   6|   4|   5|
+----+----+----+
```

**R**

```R
df1 <- select(createDataFrame(mtcars), "carb", "am", "gear")
df2 <- select(createDataFrame(mtcars), "am", "gear", "carb")
head(unionByName(limit(df1, 2), limit(df2, 2)))
```

```
  carb am gear
1    4  1    4
2    4  1    4
3    4  1    4
4    4  1    4
```

## How was this patch tested?

Doctests for Python and unit test added in `test_sparkSQL.R` for R.

Author: hyukjinkwon <gu...@gmail.com>

Closes #19105 from HyukjinKwon/unionByName-r-python.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/07fd68a2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/07fd68a2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/07fd68a2

Branch: refs/heads/master
Commit: 07fd68a29fb6cad960b5ac72718bb05decf28a1a
Parents: acb7fed
Author: hyukjinkwon <gu...@gmail.com>
Authored: Sun Sep 3 21:03:21 2017 +0900
Committer: hyukjinkwon <gu...@gmail.com>
Committed: Sun Sep 3 21:03:21 2017 +0900

----------------------------------------------------------------------
 R/pkg/NAMESPACE                       |  1 +
 R/pkg/R/DataFrame.R                   | 38 ++++++++++++++++++++++++++++--
 R/pkg/R/generics.R                    |  4 ++++
 R/pkg/tests/fulltests/test_sparkSQL.R |  9 ++++++-
 python/pyspark/sql/dataframe.py       | 28 +++++++++++++++++++---
 5 files changed, 74 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/07fd68a2/R/pkg/NAMESPACE
----------------------------------------------------------------------
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index a1dd1af..3fc756b 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -169,6 +169,7 @@ exportMethods("arrange",
               "transform",
               "union",
               "unionAll",
+              "unionByName",
               "unique",
               "unpersist",
               "where",

http://git-wip-us.apache.org/repos/asf/spark/blob/07fd68a2/R/pkg/R/DataFrame.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index 80526cd..1b46c1e 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -2683,7 +2683,7 @@ generateAliasesForIntersectedCols <- function (x, intersectedColNames, suffix) {
 #' @rdname union
 #' @name union
 #' @aliases union,SparkDataFrame,SparkDataFrame-method
-#' @seealso \link{rbind}
+#' @seealso \link{rbind} \link{unionByName}
 #' @export
 #' @examples
 #'\dontrun{
@@ -2714,6 +2714,40 @@ setMethod("unionAll",
             union(x, y)
           })
 
+#' Return a new SparkDataFrame containing the union of rows, matched by column names
+#'
+#' Return a new SparkDataFrame containing the union of rows in this SparkDataFrame
+#' and another SparkDataFrame. This is different from \code{union} function, and both
+#' \code{UNION ALL} and \code{UNION DISTINCT} in SQL as column positions are not taken
+#' into account. Input SparkDataFrames can have different data types in the schema.
+#'
+#' Note: This does not remove duplicate rows across the two SparkDataFrames.
+#' This function resolves columns by name (not by position).
+#'
+#' @param x A SparkDataFrame
+#' @param y A SparkDataFrame
+#' @return A SparkDataFrame containing the result of the union.
+#' @family SparkDataFrame functions
+#' @rdname unionByName
+#' @name unionByName
+#' @aliases unionByName,SparkDataFrame,SparkDataFrame-method
+#' @seealso \link{rbind} \link{union}
+#' @export
+#' @examples
+#'\dontrun{
+#' sparkR.session()
+#' df1 <- select(createDataFrame(mtcars), "carb", "am", "gear")
+#' df2 <- select(createDataFrame(mtcars), "am", "gear", "carb")
+#' head(unionByName(df1, df2))
+#' }
+#' @note unionByName since 2.3.0
+setMethod("unionByName",
+          signature(x = "SparkDataFrame", y = "SparkDataFrame"),
+          function(x, y) {
+            unioned <- callJMethod(x@sdf, "unionByName", y@sdf)
+            dataFrame(unioned)
+          })
+
 #' Union two or more SparkDataFrames
 #'
 #' Union two or more SparkDataFrames by row. As in R's \code{rbind}, this method
@@ -2730,7 +2764,7 @@ setMethod("unionAll",
 #' @aliases rbind,SparkDataFrame-method
 #' @rdname rbind
 #' @name rbind
-#' @seealso \link{union}
+#' @seealso \link{union} \link{unionByName}
 #' @export
 #' @examples
 #'\dontrun{

http://git-wip-us.apache.org/repos/asf/spark/blob/07fd68a2/R/pkg/R/generics.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index f0cc2dc..603ff4e 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -769,6 +769,10 @@ setGeneric("union", function(x, y) { standardGeneric("union") })
 #' @export
 setGeneric("unionAll", function(x, y) { standardGeneric("unionAll") })
 
+#' @rdname unionByName
+#' @export
+setGeneric("unionByName", function(x, y) { standardGeneric("unionByName") })
+
 #' @rdname unpersist
 #' @export
 setGeneric("unpersist", function(x, ...) { standardGeneric("unpersist") })

http://git-wip-us.apache.org/repos/asf/spark/blob/07fd68a2/R/pkg/tests/fulltests/test_sparkSQL.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R
index d477fc6..7abc872 100644
--- a/R/pkg/tests/fulltests/test_sparkSQL.R
+++ b/R/pkg/tests/fulltests/test_sparkSQL.R
@@ -2255,7 +2255,7 @@ test_that("isLocal()", {
   expect_false(isLocal(df))
 })
 
-test_that("union(), rbind(), except(), and intersect() on a DataFrame", {
+test_that("union(), unionByName(), rbind(), except(), and intersect() on a DataFrame", {
   df <- read.json(jsonPath)
 
   lines <- c("{\"name\":\"Bob\", \"age\":24}",
@@ -2271,6 +2271,13 @@ test_that("union(), rbind(), except(), and intersect() on a DataFrame", {
   expect_equal(first(unioned)$name, "Michael")
   expect_equal(count(arrange(suppressWarnings(unionAll(df, df2)), df$age)), 6)
 
+  df1 <- select(df2, "age", "name")
+  unioned1 <- arrange(unionByName(df1, df), df1$age)
+  expect_is(unioned, "SparkDataFrame")
+  expect_equal(count(unioned), 6)
+  # Here, we test if 'Michael' in df is correctly mapped to the same name.
+  expect_equal(first(unioned)$name, "Michael")
+
   unioned2 <- arrange(rbind(unioned, df, df2), df$age)
   expect_is(unioned2, "SparkDataFrame")
   expect_equal(count(unioned2), 12)

http://git-wip-us.apache.org/repos/asf/spark/blob/07fd68a2/python/pyspark/sql/dataframe.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index c19e599..1cea130 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -1290,7 +1290,7 @@ class DataFrame(object):
         """ Return a new :class:`DataFrame` containing union of rows in this and another frame.
 
         This is equivalent to `UNION ALL` in SQL. To do a SQL-style set union
-        (that does deduplication of elements), use this function followed by a distinct.
+        (that does deduplication of elements), use this function followed by :func:`distinct`.
 
         Also as standard in SQL, this function resolves columns by position (not by name).
         """
@@ -1301,14 +1301,36 @@ class DataFrame(object):
         """ Return a new :class:`DataFrame` containing union of rows in this and another frame.
 
         This is equivalent to `UNION ALL` in SQL. To do a SQL-style set union
-        (that does deduplication of elements), use this function followed by a distinct.
+        (that does deduplication of elements), use this function followed by :func:`distinct`.
 
         Also as standard in SQL, this function resolves columns by position (not by name).
 
-        .. note:: Deprecated in 2.0, use union instead.
+        .. note:: Deprecated in 2.0, use :func:`union` instead.
         """
         return self.union(other)
 
+    @since(2.3)
+    def unionByName(self, other):
+        """ Returns a new :class:`DataFrame` containing union of rows in this and another frame.
+
+        This is different from both `UNION ALL` and `UNION DISTINCT` in SQL. To do a SQL-style set
+        union (that does deduplication of elements), use this function followed by :func:`distinct`.
+
+        The difference between this function and :func:`union` is that this function
+        resolves columns by name (not by position):
+
+        >>> df1 = spark.createDataFrame([[1, 2, 3]], ["col0", "col1", "col2"])
+        >>> df2 = spark.createDataFrame([[4, 5, 6]], ["col1", "col2", "col0"])
+        >>> df1.unionByName(df2).show()
+        +----+----+----+
+        |col0|col1|col2|
+        +----+----+----+
+        |   1|   2|   3|
+        |   6|   4|   5|
+        +----+----+----+
+        """
+        return DataFrame(self._jdf.unionByName(other._jdf), self.sql_ctx)
+
     @since(1.3)
     def intersect(self, other):
         """ Return a new :class:`DataFrame` containing rows only in


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org