You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/06/04 20:30:15 UTC

spark git commit: [SPARK-7969] [SQL] Added a DataFrame.drop function that accepts a Column reference.

Repository: spark
Updated Branches:
  refs/heads/master c8709dcfd -> df7da07a8


[SPARK-7969] [SQL] Added a DataFrame.drop function that accepts a Column reference.

Added a `DataFrame.drop` function that accepts a `Column` reference rather than a `String`, and added associated unit tests.  Basically iterates through the `DataFrame` to find a column with an expression that is equivalent to that of the `Column` argument supplied to the function.

Author: Mike Dusenberry <du...@gmail.com>

Closes #6585 from dusenberrymw/SPARK-7969_Drop_method_on_Dataframes_should_handle_Column and squashes the following commits:

514727a [Mike Dusenberry] Updating the @since tag of the drop(Column) function doc to reflect version 1.4.1 instead of 1.4.0.
2f1bb4e [Mike Dusenberry] Adding an additional assert statement to the 'drop column after join' unit test in order to make sure the correct column was indeed left over.
6bf7c0e [Mike Dusenberry] Minor code formatting change.
e583888 [Mike Dusenberry] Adding more Python doctests for the df.drop with column reference function to test joined datasets that have columns with the same name.
5f74401 [Mike Dusenberry] Updating DataFrame.drop with column reference function to use logicalPlan.output to prevent ambiguities resulting from columns with the same name. Also added associated unit tests for joined datasets with duplicate column names.
4b8bbe8 [Mike Dusenberry] Adding Python support for Dataframe.drop with a Column reference.
986129c [Mike Dusenberry] Added a DataFrame.drop function that accepts a Column reference rather than a String, and added associated unit tests.  Basically iterates through the DataFrame to find a column with an expression that is equivalent to one supplied to the function.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/df7da07a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/df7da07a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/df7da07a

Branch: refs/heads/master
Commit: df7da07a86a30c684d5b07d955f1045a66715e3a
Parents: c8709dc
Author: Mike Dusenberry <du...@gmail.com>
Authored: Thu Jun 4 11:30:07 2015 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Thu Jun 4 11:30:07 2015 -0700

----------------------------------------------------------------------
 python/pyspark/sql/dataframe.py                 | 21 +++++++--
 .../scala/org/apache/spark/sql/DataFrame.scala  | 16 +++++++
 .../org/apache/spark/sql/DataFrameSuite.scala   | 45 ++++++++++++++++++++
 3 files changed, 79 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/df7da07a/python/pyspark/sql/dataframe.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 7673153..03b01a1 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -1189,15 +1189,30 @@ class DataFrame(object):
 
     @since(1.4)
     @ignore_unicode_prefix
-    def drop(self, colName):
+    def drop(self, col):
         """Returns a new :class:`DataFrame` that drops the specified column.
 
-        :param colName: string, name of the column to drop.
+        :param col: a string name of the column to drop, or a
+            :class:`Column` to drop.
 
         >>> df.drop('age').collect()
         [Row(name=u'Alice'), Row(name=u'Bob')]
+
+        >>> df.drop(df.age).collect()
+        [Row(name=u'Alice'), Row(name=u'Bob')]
+
+        >>> df.join(df2, df.name == df2.name, 'inner').drop(df.name).collect()
+        [Row(age=5, height=85, name=u'Bob')]
+
+        >>> df.join(df2, df.name == df2.name, 'inner').drop(df2.name).collect()
+        [Row(age=5, name=u'Bob', height=85)]
         """
-        jdf = self._jdf.drop(colName)
+        if isinstance(col, basestring):
+            jdf = self._jdf.drop(col)
+        elif isinstance(col, Column):
+            jdf = self._jdf.drop(col._jc)
+        else:
+            raise TypeError("col should be a string or a Column")
         return DataFrame(jdf, self.sql_ctx)
 
     @since(1.3)

http://git-wip-us.apache.org/repos/asf/spark/blob/df7da07a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 034d887..d1a54ad 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -1083,6 +1083,22 @@ class DataFrame private[sql](
   }
 
   /**
+   * Returns a new [[DataFrame]] with a column dropped.
+   * This version of drop accepts a Column rather than a name.
+   * This is a no-op if the DataFrame doesn't have a column
+   * with an equivalent expression.
+   * @group dfops
+   * @since 1.4.1
+   */
+  def drop(col: Column): DataFrame = {
+    val attrs = this.logicalPlan.output
+    val colsAfterDrop = attrs.filter { attr =>
+      attr != col.expr
+    }.map(attr => Column(attr))
+    select(colsAfterDrop : _*)
+  }
+
+  /**
    * Returns a new [[DataFrame]] that contains only the unique rows from this [[DataFrame]].
    * This is an alias for `distinct`.
    * @group dfops

http://git-wip-us.apache.org/repos/asf/spark/blob/df7da07a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index b41b1b7..8e81dac 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -334,6 +334,51 @@ class DataFrameSuite extends QueryTest {
     assert(df.schema.map(_.name) === Seq("key", "value"))
   }
 
+  test("drop column using drop with column reference") {
+    val col = testData("key")
+    val df = testData.drop(col)
+    checkAnswer(
+      df,
+      testData.collect().map(x => Row(x.getString(1))).toSeq)
+    assert(df.schema.map(_.name) === Seq("value"))
+  }
+
+  test("drop unknown column (no-op) with column reference") {
+    val col = Column("random")
+    val df = testData.drop(col)
+    checkAnswer(
+      df,
+      testData.collect().toSeq)
+    assert(df.schema.map(_.name) === Seq("key", "value"))
+  }
+
+  test("drop unknown column with same name (no-op) with column reference") {
+    val col = Column("key")
+    val df = testData.drop(col)
+    checkAnswer(
+      df,
+      testData.collect().toSeq)
+    assert(df.schema.map(_.name) === Seq("key", "value"))
+  }
+
+  test("drop column after join with duplicate columns using column reference") {
+    val newSalary = salary.withColumnRenamed("personId", "id")
+    val col = newSalary("id")
+    // this join will result in duplicate "id" columns
+    val joinedDf = person.join(newSalary,
+      person("id") === newSalary("id"), "inner")
+    // remove only the "id" column that was associated with newSalary
+    val df = joinedDf.drop(col)
+    checkAnswer(
+      df,
+      joinedDf.collect().map {
+        case Row(id: Int, name: String, age: Int, idToDrop: Int, salary: Double) =>
+          Row(id, name, age, salary)
+      }.toSeq)
+    assert(df.schema.map(_.name) === Seq("id", "name", "age", "salary"))
+    assert(df("id") == person("id"))
+  }
+
   test("withColumnRenamed") {
     val df = testData.toDF().withColumn("newCol", col("key") + 1)
       .withColumnRenamed("value", "valueRenamed")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org