You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/04/17 04:49:29 UTC

spark git commit: [SPARK-6972][SQL] Add Coalesce to DataFrame

Repository: spark
Updated Branches:
  refs/heads/master e5949c287 -> 8220d5265


[SPARK-6972][SQL] Add Coalesce to DataFrame

Author: Michael Armbrust <mi...@databricks.com>

Closes #5545 from marmbrus/addCoalesce and squashes the following commits:

9fdf3f6 [Michael Armbrust] [SPARK-6972][SQL] Add Coalesce to DataFrame


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8220d526
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8220d526
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8220d526

Branch: refs/heads/master
Commit: 8220d5265f1bbea9dfdaeec4f2d06d7fe24c0bc3
Parents: e5949c2
Author: Michael Armbrust <mi...@databricks.com>
Authored: Thu Apr 16 21:49:26 2015 -0500
Committer: Reynold Xin <rx...@databricks.com>
Committed: Thu Apr 16 21:49:26 2015 -0500

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/sql/DataFrame.scala   | 14 ++++++++++++++
 .../src/main/scala/org/apache/spark/sql/RDDApi.scala  |  2 ++
 .../scala/org/apache/spark/sql/DataFrameSuite.scala   |  8 ++++++++
 3 files changed, 24 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8220d526/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 3235f85..17c21f6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -909,6 +909,20 @@ class DataFrame private[sql](
   }
 
   /**
+   * Returns a new [[DataFrame]] that has exactly `numPartitions` partitions.
+   * Similar to coalesce defined on an [[RDD]], this operation results in a narrow dependency, e.g.
+   * if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of
+   * the 100 new partitions will claim 10 of the current partitions.
+   * @group rdd
+   */
+  override def coalesce(numPartitions: Int): DataFrame = {
+    sqlContext.createDataFrame(
+      queryExecution.toRdd.coalesce(numPartitions),
+      schema,
+      needsConversion = false)
+  }
+
+  /**
    * Returns a new [[DataFrame]] that contains only the unique rows from this [[DataFrame]].
    * @group dfops
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/8220d526/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala b/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala
index ba4373f..63dbab1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala
@@ -61,5 +61,7 @@ private[sql] trait RDDApi[T] {
 
   def repartition(numPartitions: Int): DataFrame
 
+  def coalesce(numPartitions: Int): DataFrame
+
   def distinct: DataFrame
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/8220d526/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 44a7d1e..3250ab4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -178,6 +178,14 @@ class DataFrameSuite extends QueryTest {
       testData.select('key).collect().toSeq)
   }
 
+  test("coalesce") {
+    assert(testData.select('key).coalesce(1).rdd.partitions.size === 1)
+
+    checkAnswer(
+      testData.select('key).coalesce(1).select('key),
+      testData.select('key).collect().toSeq)
+  }
+
   test("groupBy") {
     checkAnswer(
       testData2.groupBy("a").agg($"a", sum($"b")),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org