You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/09/22 08:46:05 UTC

spark git commit: [SPARK-10446][SQL] Support to specify join type when calling join with usingColumns

Repository: spark
Updated Branches:
  refs/heads/master 781b21ba2 -> 1fcefef06


[SPARK-10446][SQL] Support to specify join type when calling join with usingColumns

JIRA: https://issues.apache.org/jira/browse/SPARK-10446

Currently the method `join(right: DataFrame, usingColumns: Seq[String])` only supports inner join. It is more convenient to have it support other join types.

Author: Liang-Chi Hsieh <vi...@appier.com>

Closes #8600 from viirya/usingcolumns_df.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1fcefef0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1fcefef0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1fcefef0

Branch: refs/heads/master
Commit: 1fcefef06950e2f03477282368ca835bbf40ff24
Parents: 781b21b
Author: Liang-Chi Hsieh <vi...@appier.com>
Authored: Mon Sep 21 23:46:00 2015 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Mon Sep 21 23:46:00 2015 -0700

----------------------------------------------------------------------
 python/pyspark/sql/dataframe.py                 |  6 +++++-
 .../scala/org/apache/spark/sql/DataFrame.scala  | 22 +++++++++++++++++++-
 .../apache/spark/sql/DataFrameJoinSuite.scala   | 13 ++++++++++++
 3 files changed, 39 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1fcefef0/python/pyspark/sql/dataframe.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index fb995fa..80f8d8a 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -567,7 +567,11 @@ class DataFrame(object):
         if on is None or len(on) == 0:
             jdf = self._jdf.join(other._jdf)
         elif isinstance(on[0], basestring):
-            jdf = self._jdf.join(other._jdf, self._jseq(on))
+            if how is None:
+                jdf = self._jdf.join(other._jdf, self._jseq(on), "inner")
+            else:
+                assert isinstance(how, basestring), "how should be basestring"
+                jdf = self._jdf.join(other._jdf, self._jseq(on), how)
         else:
             assert isinstance(on[0], Column), "on should be Column or list of Column"
             if len(on) > 1:

http://git-wip-us.apache.org/repos/asf/spark/blob/1fcefef0/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 8f737c2..ba94d77 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -484,6 +484,26 @@ class DataFrame private[sql](
    * @since 1.4.0
    */
   def join(right: DataFrame, usingColumns: Seq[String]): DataFrame = {
+    join(right, usingColumns, "inner")
+  }
+
+  /**
+   * Equi-join with another [[DataFrame]] using the given columns.
+   *
+   * Different from other join functions, the join columns will only appear once in the output,
+   * i.e. similar to SQL's `JOIN USING` syntax.
+   *
+   * Note that if you perform a self-join using this function without aliasing the input
+   * [[DataFrame]]s, you will NOT be able to reference any columns after the join, since
+   * there is no way to disambiguate which side of the join you would like to reference.
+   *
+   * @param right Right side of the join operation.
+   * @param usingColumns Names of the columns to join on. This columns must exist on both sides.
+   * @param joinType One of: `inner`, `outer`, `left_outer`, `right_outer`, `leftsemi`.
+   * @group dfops
+   * @since 1.6.0
+   */
+  def join(right: DataFrame, usingColumns: Seq[String], joinType: String): DataFrame = {
     // Analyze the self join. The assumption is that the analyzer will disambiguate left vs right
     // by creating a new instance for one of the branch.
     val joined = sqlContext.executePlan(
@@ -502,7 +522,7 @@ class DataFrame private[sql](
       Join(
         joined.left,
         joined.right,
-        joinType = Inner,
+        joinType = JoinType(joinType),
         condition)
     )
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/1fcefef0/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
index e2716d7..56ad71e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
@@ -42,6 +42,19 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext {
       Row(1, 2, "1", "2") :: Row(2, 3, "2", "3") :: Row(3, 4, "3", "4") :: Nil)
   }
 
+  test("join - join using multiple columns and specifying join type") {
+    val df = Seq(1, 2, 3).map(i => (i, i + 1, i.toString)).toDF("int", "int2", "str")
+    val df2 = Seq(1, 2, 3).map(i => (i, i + 1, (i + 1).toString)).toDF("int", "int2", "str")
+
+    checkAnswer(
+      df.join(df2, Seq("int", "str"), "left"),
+      Row(1, 2, "1", null) :: Row(2, 3, "2", null) :: Row(3, 4, "3", null) :: Nil)
+
+    checkAnswer(
+      df.join(df2, Seq("int", "str"), "right"),
+      Row(null, null, null, 2) :: Row(null, null, null, 3) :: Row(null, null, null, 4) :: Nil)
+  }
+
   test("join - join using self join") {
     val df = Seq(1, 2, 3).map(i => (i, i.toString)).toDF("int", "str")
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org