You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by nongli <gi...@git.apache.org> on 2015/10/29 23:02:19 UTC

[GitHub] spark pull request: [SPARK-11410] [SQL] Add APIs to provide functi...

GitHub user nongli opened a pull request:

    https://github.com/apache/spark/pull/9364

    [SPARK-11410] [SQL] Add APIs to provide functionality similar to Hive…

    …'s DISTRIBUTE BY
    
    DISTRIBUTE BY allows the user to hash partition the data by specified exprs. It also allows for
    optioning sorting within each resulting partition. There is no required relationship between the
    exprs for partitioning and sorting (i.e. one does not need to be a prefix of the other).
    
    This patch adds to APIs to DataFrames which can be used together to provide this functionality:
      1. distributeBy() which partitions the data frame into a specified number of partitions using the
         partitioning exprs.
      2. localSort() which sorts each partition using the provided sorting exprs.
    
    To get the DISTRIBUTE BY functionality, the user simply does: df.distributeBy(...).localSort(...)

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/nongli/spark spark-11410

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/9364.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #9364
    
----
commit 71a159f8844ecafb964c67547e5ce88658aa5c25
Author: Nong Li <no...@gmail.com>
Date:   2015-10-29T21:09:24Z

    [SPARK-11410] [SQL] Add APIs to provide functionality similar to Hive's DISTRIBUTE BY
    
    DISTRIBUTE BY allows the user to hash partition the data by specified exprs. It also allows for
    optioning sorting within each resulting partition. There is no required relationship between the
    exprs for partitioning and sorting (i.e. one does not need to be a prefix of the other).
    
    This patch adds to APIs to DataFrames which can be used together to provide this functionality:
      1. distributeBy() which partitions the data frame into a specified number of partitions using the
         partitioning exprs.
      2. localSort() which sorts each partition using the provided sorting exprs.
    
    To get the DISTRIBUTE BY functionality, the user simply does: df.distributeBy(...).localSort(...)

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11410] [SQL] Add APIs to provide functi...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9364#discussion_r43558572
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala ---
    @@ -31,10 +31,11 @@ case class SortPartitions(sortExpressions: Seq[SortOrder], child: LogicalPlan)
       extends RedistributeData
     
     /**
    - * This method repartitions data using [[Expression]]s, and receives information about the
    - * number of partitions during execution. Used when a specific ordering or distribution is
    - * expected by the consumer of the query result. Use [[Repartition]] for RDD-like
    + * This method repartitions data using [[Expression]]s into `numPartitions`, and receives
    + * information about the number of partitions during execution. Used when a specific ordering or
    + * distribution is expected by the consumer of the query result. Use [[Repartition]] for RDD-like
      * `coalesce` and `repartition`.
    + * If `numPartitions` is not specified, the partition in `child` is preserved.
      */
    -case class RepartitionByExpression(partitionExpressions: Seq[Expression], child: LogicalPlan)
    -  extends RedistributeData
    +case class RepartitionByExpression(partitionExpressions: Seq[Expression],
    +    child: LogicalPlan, numPartitions: Option[Int] = None) extends RedistributeData
    --- End diff --
    
    Change the format to the following one?
    ```
    case class RepartitionByExpression(
        partitionExpressions: Seq[Expression],
        child: LogicalPlan,
        numPartitions: Option[Int] = None) extends RedistributeData
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11410] [SQL] Add APIs to provide functi...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9364#discussion_r43466845
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala ---
    @@ -987,4 +992,116 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
         val df = (1 to 10).map(Tuple1.apply).toDF("i").as("src")
         assert(df.select($"src.i".cast(StringType)).columns.head === "i")
       }
    +
    +  /**
    +   * Verifies that there is no Exchange between the Aggregations for `df`
    +   */
    +  private def verifyNonExchangingAgg(df: DataFrame) = {
    +    var atFirstAgg: Boolean = false
    +    df.queryExecution.executedPlan.foreach {
    +      case agg: TungstenAggregate => {
    +        atFirstAgg = !atFirstAgg
    +      }
    +      case _ => {
    +        if (atFirstAgg) {
    +          fail("Should not have operators between the two aggregations")
    +        }
    +      }
    +    }
    --- End diff --
    
    yeah, you are right.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11410] [SQL] Add APIs to provide functi...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9364#discussion_r43452264
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala ---
    @@ -663,6 +667,35 @@ class DataFrame private[sql](
       def orderBy(sortExprs: Column*): DataFrame = sort(sortExprs : _*)
     
       /**
    +   * Returns a new [[DataFrame]] partitioned by the given partitioning expressions into
    +   * `numPartitions`
    +   * `numPartitions` can be < 0 to preserve the current number of partitions.
    +   * @group dfops
    +   * @since 1.6.0
    +   */
    +  def distributeBy(partitionExprs: Seq[Column], numPartitions: Int = -1): DataFrame = {
    +    PartitionByExpression(partitionExprs.map { _.expr }, logicalPlan, numPartitions)
    +  }
    --- End diff --
    
    Let's avoid of having default value in the public API (see https://github.com/databricks/scala-style-guide#java-default-param-values). We can overload this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11410] [SQL] Add APIs to provide functi...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9364#discussion_r43460731
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala ---
    @@ -987,4 +992,116 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
         val df = (1 to 10).map(Tuple1.apply).toDF("i").as("src")
         assert(df.select($"src.i".cast(StringType)).columns.head === "i")
       }
    +
    +  /**
    +   * Verifies that there is no Exchange between the Aggregations for `df`
    +   */
    +  private def verifyNonExchangingAgg(df: DataFrame) = {
    +    var atFirstAgg: Boolean = false
    +    df.queryExecution.executedPlan.foreach {
    +      case agg: TungstenAggregate => {
    +        atFirstAgg = !atFirstAgg
    +      }
    +      case _ => {
    +        if (atFirstAgg) {
    +          fail("Should not have operators between the two aggregations")
    +        }
    +      }
    +    }
    --- End diff --
    
    Nevermind. I don't think checking the number of exchanges is sufficient. It will pass if distributeBy did nothing. The key is that the exchanges happen where we expect them to.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11410] [SQL] Add APIs to provide functi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9364#issuecomment-152853348
  
    **[Test build #44764 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44764/consoleFull)** for PR 9364 at commit [`98c05ae`](https://github.com/apache/spark/commit/98c05ae08281b14dd67151bf8edd55cb884d1061).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11410] [SQL] Add APIs to provide functi...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9364#discussion_r43453219
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala ---
    @@ -38,3 +38,11 @@ case class SortPartitions(sortExpressions: Seq[SortOrder], child: LogicalPlan)
      */
     case class RepartitionByExpression(partitionExpressions: Seq[Expression], child: LogicalPlan)
       extends RedistributeData
    +
    +/**
    + * This method repartitions data using [[Expression]]s into `numPartitions`. If numPartitions is
    + * less than zero, a default is used. Otherwise this behaves identically to RepartitionByExpression.
    + */
    +case class PartitionByExpression(partitionExpressions: Seq[Expression],
    +                                 child: LogicalPlan, numPartitions: Int = -1)
    +  extends RedistributeData
    --- End diff --
    
    oh, we need to avoid of using `Option` in the public API `distributeBy` because it is not Java friendly. So, I am good if we use `numPartitions: Int` internally as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11410] [SQL] Add APIs to provide functi...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9364#discussion_r43452473
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala ---
    @@ -987,4 +992,116 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
         val df = (1 to 10).map(Tuple1.apply).toDF("i").as("src")
         assert(df.select($"src.i".cast(StringType)).columns.head === "i")
       }
    +
    +  /**
    +   * Verifies that there is no Exchange between the Aggregations for `df`
    +   */
    +  private def verifyNonExchangingAgg(df: DataFrame) = {
    +    var atFirstAgg: Boolean = false
    +    df.queryExecution.executedPlan.foreach {
    +      case agg: TungstenAggregate => {
    +        atFirstAgg = !atFirstAgg
    +      }
    +      case _ => {
    +        if (atFirstAgg) {
    +          fail("Should not have operators between the two aggregations")
    +        }
    +      }
    +    }
    --- End diff --
    
    Maybe we can do
    ```
    val numExchanges = df.queryExecution.executedPlan.collect {
      case e: Exchange => e
    }.size
    ```
    Then, we can check if `numExchanges` is `0`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11410] [SQL] Add APIs to provide functi...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9364#discussion_r43559349
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala ---
    @@ -31,10 +31,11 @@ case class SortPartitions(sortExpressions: Seq[SortOrder], child: LogicalPlan)
       extends RedistributeData
     
     /**
    - * This method repartitions data using [[Expression]]s, and receives information about the
    - * number of partitions during execution. Used when a specific ordering or distribution is
    - * expected by the consumer of the query result. Use [[Repartition]] for RDD-like
    + * This method repartitions data using [[Expression]]s into `numPartitions`, and receives
    + * information about the number of partitions during execution. Used when a specific ordering or
    + * distribution is expected by the consumer of the query result. Use [[Repartition]] for RDD-like
      * `coalesce` and `repartition`.
    + * If `numPartitions` is not specified, the partition in `child` is preserved.
      */
    -case class RepartitionByExpression(partitionExpressions: Seq[Expression], child: LogicalPlan)
    -  extends RedistributeData
    +case class RepartitionByExpression(partitionExpressions: Seq[Expression],
    +    child: LogicalPlan, numPartitions: Option[Int] = None) extends RedistributeData
    --- End diff --
    
    Also, how about we add a `require` to make sure we get a valid numPartitions?
    
    ```
    case class RepartitionByExpression(
        partitionExpressions: Seq[Expression],
        child: LogicalPlan,
        numPartitions: Option[Int] = None) extends RedistributeData {
    
      numPartitions match {
        case Some(n) => require(n > 0, "error message")
        case None => // OK
      }
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11410] [SQL] Add APIs to provide functi...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on the pull request:

    https://github.com/apache/spark/pull/9364#issuecomment-152857767
  
    LGTM pending jenkins.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11410] [SQL] Add APIs to provide functi...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9364#discussion_r43452642
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala ---
    @@ -987,4 +992,116 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
         val df = (1 to 10).map(Tuple1.apply).toDF("i").as("src")
         assert(df.select($"src.i".cast(StringType)).columns.head === "i")
       }
    +
    +  /**
    +   * Verifies that there is no Exchange between the Aggregations for `df`
    +   */
    +  private def verifyNonExchangingAgg(df: DataFrame) = {
    +    var atFirstAgg: Boolean = false
    +    df.queryExecution.executedPlan.foreach {
    +      case agg: TungstenAggregate => {
    +        atFirstAgg = !atFirstAgg
    +      }
    +      case _ => {
    +        if (atFirstAgg) {
    +          fail("Should not have operators between the two aggregations")
    +        }
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Verifies that there is an Exchange between the Aggregations for `df`
    +   */
    +  private def verifyExchangingAgg(df: DataFrame) = {
    +    var atFirstAgg: Boolean = false
    +    df.queryExecution.executedPlan.foreach {
    +      case agg: TungstenAggregate => {
    +        if (atFirstAgg) {
    +          fail("Should not have back to back Aggregates")
    +        }
    +        atFirstAgg = true
    +      }
    +      case e: Exchange => atFirstAgg = false
    +      case _ =>
    +    }
    +  }
    +
    +  test("distributeBy") {
    --- End diff --
    
    `distributeBy and localSort`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11410] [SQL] Add APIs to provide functi...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on the pull request:

    https://github.com/apache/spark/pull/9364#issuecomment-152852984
  
    test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11410] [SQL] Add APIs to provide functi...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9364#discussion_r43823876
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala ---
    @@ -997,4 +1001,116 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
           }
         }
       }
    +
    +  /**
    +   * Verifies that there is no Exchange between the Aggregations for `df`
    +   */
    +  private def verifyNonExchangingAgg(df: DataFrame) = {
    +    var atFirstAgg: Boolean = false
    +    df.queryExecution.executedPlan.foreach {
    +      case agg: TungstenAggregate => {
    +        atFirstAgg = !atFirstAgg
    +      }
    +      case _ => {
    +        if (atFirstAgg) {
    +          fail("Should not have operators between the two aggregations")
    +        }
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Verifies that there is an Exchange between the Aggregations for `df`
    +   */
    +  private def verifyExchangingAgg(df: DataFrame) = {
    +    var atFirstAgg: Boolean = false
    +    df.queryExecution.executedPlan.foreach {
    +      case agg: TungstenAggregate => {
    +        if (atFirstAgg) {
    +          fail("Should not have back to back Aggregates")
    +        }
    +        atFirstAgg = true
    +      }
    +      case e: Exchange => atFirstAgg = false
    +      case _ =>
    +    }
    +  }
    +
    +  test("distributeBy and localSort") {
    +    val original = testData.repartition(1)
    +    assert(original.rdd.partitions.length == 1)
    +    val df = original.distributeBy(Column("key") :: Nil, 5)
    +    assert(df.rdd.partitions.length  == 5)
    +    checkAnswer(original.select(), df.select())
    +
    +    val df2 = original.distributeBy(Column("key") :: Nil, 10)
    +    assert(df2.rdd.partitions.length  == 10)
    +    checkAnswer(original.select(), df2.select())
    +
    +    // Group by the column we are distributed by. This should generate a plan with no exchange
    +    // between the aggregates
    +    val df3 = testData.distributeBy(Column("key") :: Nil).groupBy("key").count()
    +    verifyNonExchangingAgg(df3)
    +    verifyNonExchangingAgg(testData.distributeBy(Column("key") :: Column("value") :: Nil)
    +      .groupBy("key", "value").count())
    +
    +    // Grouping by just the first distributeBy expr, need to exchange.
    +    verifyExchangingAgg(testData.distributeBy(Column("key") :: Column("value") :: Nil)
    +      .groupBy("key").count())
    +
    +    val data = sqlContext.sparkContext.parallelize(
    +      (1 to 100).map(i => TestData2(i % 10, i))).toDF()
    +
    +    // Distribute and order by.
    +    val df4 = data.distributeBy(Column("a") :: Nil).localSort($"b".desc)
    +    // Walk each partition and verify that it is sorted descending and does not contain all
    +    // the values.
    +    df4.rdd.foreachPartition(p => {
    --- End diff --
    
    for future reference, we usually do
    
    ```scala
    df4.rdd.foreachPartition { p =>
      ...
    }
    ```
    rather than `(p => {`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11410] [SQL] Add APIs to provide functi...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9364#discussion_r43453065
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala ---
    @@ -987,4 +992,116 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
         val df = (1 to 10).map(Tuple1.apply).toDF("i").as("src")
         assert(df.select($"src.i".cast(StringType)).columns.head === "i")
       }
    +
    +  /**
    +   * Verifies that there is no Exchange between the Aggregations for `df`
    +   */
    +  private def verifyNonExchangingAgg(df: DataFrame) = {
    +    var atFirstAgg: Boolean = false
    +    df.queryExecution.executedPlan.foreach {
    +      case agg: TungstenAggregate => {
    +        atFirstAgg = !atFirstAgg
    +      }
    +      case _ => {
    +        if (atFirstAgg) {
    +          fail("Should not have operators between the two aggregations")
    +        }
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Verifies that there is an Exchange between the Aggregations for `df`
    +   */
    +  private def verifyExchangingAgg(df: DataFrame) = {
    +    var atFirstAgg: Boolean = false
    +    df.queryExecution.executedPlan.foreach {
    +      case agg: TungstenAggregate => {
    +        if (atFirstAgg) {
    +          fail("Should not have back to back Aggregates")
    +        }
    +        atFirstAgg = true
    +      }
    +      case e: Exchange => atFirstAgg = false
    +      case _ =>
    +    }
    +  }
    +
    +  test("distributeBy") {
    +    val original = testData.repartition(1)
    +    assert(original.rdd.partitions.length == 1)
    +    val df = original.distributeBy(Column("key") :: Nil, 5)
    +    assert(df.rdd.partitions.length  == 5)
    +    checkAnswer(original.select(), df.select())
    +
    +    val df2 = original.distributeBy(Column("key") :: Nil, 10)
    +    assert(df2.rdd.partitions.length  == 10)
    +    checkAnswer(original.select(), df2.select())
    +
    +    // Group by the column we are distributed by. This should generate a plan with no exchange
    +    // between the aggregates
    +    val df3 = testData.distributeBy(Column("key") :: Nil).groupBy("key").count()
    +    verifyNonExchangingAgg(df3)
    +    verifyNonExchangingAgg(testData.distributeBy(Column("key") :: Column("value") :: Nil)
    +      .groupBy("key", "value").count())
    +
    +    // Grouping by just the first distributeBy expr, need to exchange.
    +    verifyExchangingAgg(testData.distributeBy(Column("key") :: Column("value") :: Nil)
    +      .groupBy("key").count())
    +
    +    val data = sqlContext.sparkContext.parallelize(
    +      (1 to 100).map(i => TestData2(i % 10, i))).toDF()
    +
    +    // Distribute and order by.
    +    val df4 = data.distributeBy(Column("a") :: Nil).localSort($"b".desc)
    +    // Walk each partition and verify that it is sorted descending and not globally sorted.
    +    df4.rdd.foreachPartition(p => {
    +      var previousValue: Int = -1
    +      var globallyOrdered: Boolean = true
    +      p.foreach(r => {
    +        val v: Int = r.getInt(1)
    +        if (previousValue != -1) {
    +          if (previousValue < v) throw new SparkException("Partition is not ordered.")
    +          if (v + 1 != previousValue) globallyOrdered = false
    +        }
    +        previousValue = v
    +      })
    +      if (globallyOrdered) throw new SparkException("Partition should not be globally ordered")
    +    })
    +
    +    // Distribute and order by with multiple order bys
    +    val df5 = data.distributeBy(Column("a") :: Nil, 2).localSort($"b".asc, $"a".asc)
    +    // Walk each partition and verify that it is sorted descending and not globally sorted.
    +    df5.rdd.foreachPartition(p => {
    +      var previousValue: Int = -1
    +      var globallyOrdered: Boolean = true
    +      p.foreach(r => {
    +        val v: Int = r.getInt(1)
    +        if (previousValue != -1) {
    +          if (previousValue > v) throw new SparkException("Partition is not ordered.")
    +          if (v - 1 != previousValue) globallyOrdered = false
    +        }
    +        previousValue = v
    +      })
    +      if (globallyOrdered) throw new SparkException("Partition should not be globally ordered")
    +    })
    --- End diff --
    
    I think the function provided to `foreachPartition` will be executed in parallel. How about we get `max` and `min` values of every partition and then check if values are globally ordered at driver side?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11410] [SQL] Add APIs to provide functi...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9364#discussion_r43460822
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala ---
    @@ -987,4 +992,116 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
         val df = (1 to 10).map(Tuple1.apply).toDF("i").as("src")
         assert(df.select($"src.i".cast(StringType)).columns.head === "i")
       }
    +
    +  /**
    +   * Verifies that there is no Exchange between the Aggregations for `df`
    +   */
    +  private def verifyNonExchangingAgg(df: DataFrame) = {
    +    var atFirstAgg: Boolean = false
    +    df.queryExecution.executedPlan.foreach {
    +      case agg: TungstenAggregate => {
    +        atFirstAgg = !atFirstAgg
    +      }
    +      case _ => {
    +        if (atFirstAgg) {
    +          fail("Should not have operators between the two aggregations")
    +        }
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Verifies that there is an Exchange between the Aggregations for `df`
    +   */
    +  private def verifyExchangingAgg(df: DataFrame) = {
    +    var atFirstAgg: Boolean = false
    +    df.queryExecution.executedPlan.foreach {
    +      case agg: TungstenAggregate => {
    +        if (atFirstAgg) {
    +          fail("Should not have back to back Aggregates")
    +        }
    +        atFirstAgg = true
    +      }
    +      case e: Exchange => atFirstAgg = false
    +      case _ =>
    +    }
    +  }
    +
    +  test("distributeBy") {
    +    val original = testData.repartition(1)
    +    assert(original.rdd.partitions.length == 1)
    +    val df = original.distributeBy(Column("key") :: Nil, 5)
    +    assert(df.rdd.partitions.length  == 5)
    +    checkAnswer(original.select(), df.select())
    +
    +    val df2 = original.distributeBy(Column("key") :: Nil, 10)
    +    assert(df2.rdd.partitions.length  == 10)
    +    checkAnswer(original.select(), df2.select())
    +
    +    // Group by the column we are distributed by. This should generate a plan with no exchange
    +    // between the aggregates
    +    val df3 = testData.distributeBy(Column("key") :: Nil).groupBy("key").count()
    +    verifyNonExchangingAgg(df3)
    +    verifyNonExchangingAgg(testData.distributeBy(Column("key") :: Column("value") :: Nil)
    +      .groupBy("key", "value").count())
    +
    +    // Grouping by just the first distributeBy expr, need to exchange.
    +    verifyExchangingAgg(testData.distributeBy(Column("key") :: Column("value") :: Nil)
    +      .groupBy("key").count())
    +
    +    val data = sqlContext.sparkContext.parallelize(
    +      (1 to 100).map(i => TestData2(i % 10, i))).toDF()
    +
    +    // Distribute and order by.
    +    val df4 = data.distributeBy(Column("a") :: Nil).localSort($"b".desc)
    +    // Walk each partition and verify that it is sorted descending and not globally sorted.
    +    df4.rdd.foreachPartition(p => {
    +      var previousValue: Int = -1
    +      var globallyOrdered: Boolean = true
    +      p.foreach(r => {
    +        val v: Int = r.getInt(1)
    +        if (previousValue != -1) {
    +          if (previousValue < v) throw new SparkException("Partition is not ordered.")
    +          if (v + 1 != previousValue) globallyOrdered = false
    +        }
    +        previousValue = v
    +      })
    +      if (globallyOrdered) throw new SparkException("Partition should not be globally ordered")
    +    })
    +
    +    // Distribute and order by with multiple order bys
    +    val df5 = data.distributeBy(Column("a") :: Nil, 2).localSort($"b".asc, $"a".asc)
    +    // Walk each partition and verify that it is sorted descending and not globally sorted.
    +    df5.rdd.foreachPartition(p => {
    +      var previousValue: Int = -1
    +      var globallyOrdered: Boolean = true
    +      p.foreach(r => {
    +        val v: Int = r.getInt(1)
    +        if (previousValue != -1) {
    +          if (previousValue > v) throw new SparkException("Partition is not ordered.")
    +          if (v - 1 != previousValue) globallyOrdered = false
    +        }
    +        previousValue = v
    +      })
    +      if (globallyOrdered) throw new SparkException("Partition should not be globally ordered")
    +    })
    --- End diff --
    
    What do you mean by in parallel? That's what I want to verify, that each partition is ordered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11410] [SQL] Add APIs to provide functi...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9364#discussion_r43558527
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala ---
    @@ -31,10 +31,11 @@ case class SortPartitions(sortExpressions: Seq[SortOrder], child: LogicalPlan)
       extends RedistributeData
     
     /**
    - * This method repartitions data using [[Expression]]s, and receives information about the
    - * number of partitions during execution. Used when a specific ordering or distribution is
    - * expected by the consumer of the query result. Use [[Repartition]] for RDD-like
    + * This method repartitions data using [[Expression]]s into `numPartitions`, and receives
    + * information about the number of partitions during execution. Used when a specific ordering or
    + * distribution is expected by the consumer of the query result. Use [[Repartition]] for RDD-like
      * `coalesce` and `repartition`.
    + * If `numPartitions` is not specified, the partition in `child` is preserved.
    --- End diff --
    
    the partition in `child` is preserved. => the number of partitions of the `child` is preserved?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11410] [SQL] Add APIs to provide functi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9364#issuecomment-152864581
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44764/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11410] [SQL] Add APIs to provide functi...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9364#discussion_r43452622
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala ---
    @@ -987,4 +992,116 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
         val df = (1 to 10).map(Tuple1.apply).toDF("i").as("src")
         assert(df.select($"src.i".cast(StringType)).columns.head === "i")
       }
    +
    +  /**
    +   * Verifies that there is no Exchange between the Aggregations for `df`
    +   */
    +  private def verifyNonExchangingAgg(df: DataFrame) = {
    +    var atFirstAgg: Boolean = false
    +    df.queryExecution.executedPlan.foreach {
    +      case agg: TungstenAggregate => {
    +        atFirstAgg = !atFirstAgg
    +      }
    +      case _ => {
    +        if (atFirstAgg) {
    +          fail("Should not have operators between the two aggregations")
    +        }
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Verifies that there is an Exchange between the Aggregations for `df`
    +   */
    +  private def verifyExchangingAgg(df: DataFrame) = {
    --- End diff --
    
    If we can verify the plan by checking the number of `Exchange`s, how about we can merge `verifyNonExchangingAgg` and `verifyExchangingAgg` and add a new argument to specify the expected number of `Exchange`s?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11410] [SQL] Add APIs to provide functi...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on the pull request:

    https://github.com/apache/spark/pull/9364#issuecomment-152341432
  
    ok to test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11410] [SQL] Add APIs to provide functi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9364#issuecomment-152864579
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11410] [SQL] Add APIs to provide functi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9364#issuecomment-152853284
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11410] [SQL] Add APIs to provide functi...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9364#discussion_r43559392
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala ---
    @@ -19,15 +19,18 @@ package org.apache.spark.sql
     
     import java.io.File
     
    -import scala.language.postfixOps
    -import scala.util.Random
    -
    -import org.scalatest.Matchers._
    -
    +import org.apache.spark.SparkException
     import org.apache.spark.sql.catalyst.plans.logical.OneRowRelation
    +import org.apache.spark.sql.execution.Exchange
    +import org.apache.spark.sql.execution.aggregate.TungstenAggregate
     import org.apache.spark.sql.functions._
    +import org.apache.spark.sql.test.SQLTestData.TestData2
    +import org.apache.spark.sql.test.{ExamplePoint, ExamplePointUDT, SharedSQLContext}
     import org.apache.spark.sql.types._
    -import org.apache.spark.sql.test.{ExamplePointUDT, ExamplePoint, SharedSQLContext}
    +import org.scalatest.Matchers._
    +
    +import scala.language.postfixOps
    +import scala.util.Random
    --- End diff --
    
    import ordering


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11410] [SQL] Add APIs to provide functi...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9364#discussion_r43452005
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala ---
    @@ -38,3 +38,11 @@ case class SortPartitions(sortExpressions: Seq[SortOrder], child: LogicalPlan)
      */
     case class RepartitionByExpression(partitionExpressions: Seq[Expression], child: LogicalPlan)
       extends RedistributeData
    +
    +/**
    + * This method repartitions data using [[Expression]]s into `numPartitions`. If numPartitions is
    + * less than zero, a default is used. Otherwise this behaves identically to RepartitionByExpression.
    + */
    +case class PartitionByExpression(partitionExpressions: Seq[Expression],
    +                                 child: LogicalPlan, numPartitions: Int = -1)
    +  extends RedistributeData
    --- End diff --
    
    How about we just use `RepartitionByExpression`? Since users do not use classes in catalyst directly, it is fine to change the signature of the constructor. For the type of `numPartitions`, how about `Option[Int]`? If it is `None`, the number of partition is not specified. We can add a `require` clause to make sure this value is larger than 0 if `numPartitions` is a `Some`.
    
    btw, regarding the format, the following one is preferred.
    ```
    case class PartitionByExpression(
        partitionExpressions: Seq[Expression],
        numPartitions: Option[Int] = None,
        child: LogicalPlan)
      extends RedistributeData
    ```



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11410] [SQL] Add APIs to provide functi...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9364#discussion_r43467544
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala ---
    @@ -987,4 +992,116 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
         val df = (1 to 10).map(Tuple1.apply).toDF("i").as("src")
         assert(df.select($"src.i".cast(StringType)).columns.head === "i")
       }
    +
    +  /**
    +   * Verifies that there is no Exchange between the Aggregations for `df`
    +   */
    +  private def verifyNonExchangingAgg(df: DataFrame) = {
    +    var atFirstAgg: Boolean = false
    +    df.queryExecution.executedPlan.foreach {
    +      case agg: TungstenAggregate => {
    +        atFirstAgg = !atFirstAgg
    +      }
    +      case _ => {
    +        if (atFirstAgg) {
    +          fail("Should not have operators between the two aggregations")
    +        }
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Verifies that there is an Exchange between the Aggregations for `df`
    +   */
    +  private def verifyExchangingAgg(df: DataFrame) = {
    +    var atFirstAgg: Boolean = false
    +    df.queryExecution.executedPlan.foreach {
    +      case agg: TungstenAggregate => {
    +        if (atFirstAgg) {
    +          fail("Should not have back to back Aggregates")
    +        }
    +        atFirstAgg = true
    +      }
    +      case e: Exchange => atFirstAgg = false
    +      case _ =>
    +    }
    +  }
    +
    +  test("distributeBy") {
    +    val original = testData.repartition(1)
    +    assert(original.rdd.partitions.length == 1)
    +    val df = original.distributeBy(Column("key") :: Nil, 5)
    +    assert(df.rdd.partitions.length  == 5)
    +    checkAnswer(original.select(), df.select())
    +
    +    val df2 = original.distributeBy(Column("key") :: Nil, 10)
    +    assert(df2.rdd.partitions.length  == 10)
    +    checkAnswer(original.select(), df2.select())
    +
    +    // Group by the column we are distributed by. This should generate a plan with no exchange
    +    // between the aggregates
    +    val df3 = testData.distributeBy(Column("key") :: Nil).groupBy("key").count()
    +    verifyNonExchangingAgg(df3)
    +    verifyNonExchangingAgg(testData.distributeBy(Column("key") :: Column("value") :: Nil)
    +      .groupBy("key", "value").count())
    +
    +    // Grouping by just the first distributeBy expr, need to exchange.
    +    verifyExchangingAgg(testData.distributeBy(Column("key") :: Column("value") :: Nil)
    +      .groupBy("key").count())
    +
    +    val data = sqlContext.sparkContext.parallelize(
    +      (1 to 100).map(i => TestData2(i % 10, i))).toDF()
    +
    +    // Distribute and order by.
    +    val df4 = data.distributeBy(Column("a") :: Nil).localSort($"b".desc)
    +    // Walk each partition and verify that it is sorted descending and not globally sorted.
    +    df4.rdd.foreachPartition(p => {
    +      var previousValue: Int = -1
    +      var globallyOrdered: Boolean = true
    +      p.foreach(r => {
    +        val v: Int = r.getInt(1)
    +        if (previousValue != -1) {
    +          if (previousValue < v) throw new SparkException("Partition is not ordered.")
    +          if (v + 1 != previousValue) globallyOrdered = false
    +        }
    +        previousValue = v
    +      })
    +      if (globallyOrdered) throw new SparkException("Partition should not be globally ordered")
    +    })
    +
    +    // Distribute and order by with multiple order bys
    +    val df5 = data.distributeBy(Column("a") :: Nil, 2).localSort($"b".asc, $"a".asc)
    +    // Walk each partition and verify that it is sorted descending and not globally sorted.
    --- End diff --
    
    descending => ascending 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11410] [SQL] Add APIs to provide functi...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9364#discussion_r43459695
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala ---
    @@ -987,4 +992,116 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
         val df = (1 to 10).map(Tuple1.apply).toDF("i").as("src")
         assert(df.select($"src.i".cast(StringType)).columns.head === "i")
       }
    +
    +  /**
    +   * Verifies that there is no Exchange between the Aggregations for `df`
    +   */
    +  private def verifyNonExchangingAgg(df: DataFrame) = {
    +    var atFirstAgg: Boolean = false
    +    df.queryExecution.executedPlan.foreach {
    +      case agg: TungstenAggregate => {
    +        atFirstAgg = !atFirstAgg
    +      }
    +      case _ => {
    +        if (atFirstAgg) {
    +          fail("Should not have operators between the two aggregations")
    +        }
    +      }
    +    }
    --- End diff --
    
    I changed it to check the number of exchanges and move it to QueryTest. This is generally useful (I already use it in another PR I'm working on).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11410] [SQL] Add APIs to provide functi...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9364#discussion_r43471991
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala ---
    @@ -987,4 +992,116 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
         val df = (1 to 10).map(Tuple1.apply).toDF("i").as("src")
         assert(df.select($"src.i".cast(StringType)).columns.head === "i")
       }
    +
    +  /**
    +   * Verifies that there is no Exchange between the Aggregations for `df`
    +   */
    +  private def verifyNonExchangingAgg(df: DataFrame) = {
    +    var atFirstAgg: Boolean = false
    +    df.queryExecution.executedPlan.foreach {
    +      case agg: TungstenAggregate => {
    +        atFirstAgg = !atFirstAgg
    +      }
    +      case _ => {
    +        if (atFirstAgg) {
    +          fail("Should not have operators between the two aggregations")
    +        }
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Verifies that there is an Exchange between the Aggregations for `df`
    +   */
    +  private def verifyExchangingAgg(df: DataFrame) = {
    +    var atFirstAgg: Boolean = false
    +    df.queryExecution.executedPlan.foreach {
    +      case agg: TungstenAggregate => {
    +        if (atFirstAgg) {
    +          fail("Should not have back to back Aggregates")
    +        }
    +        atFirstAgg = true
    +      }
    +      case e: Exchange => atFirstAgg = false
    +      case _ =>
    +    }
    +  }
    +
    +  test("distributeBy") {
    +    val original = testData.repartition(1)
    +    assert(original.rdd.partitions.length == 1)
    +    val df = original.distributeBy(Column("key") :: Nil, 5)
    +    assert(df.rdd.partitions.length  == 5)
    +    checkAnswer(original.select(), df.select())
    +
    +    val df2 = original.distributeBy(Column("key") :: Nil, 10)
    +    assert(df2.rdd.partitions.length  == 10)
    +    checkAnswer(original.select(), df2.select())
    +
    +    // Group by the column we are distributed by. This should generate a plan with no exchange
    +    // between the aggregates
    +    val df3 = testData.distributeBy(Column("key") :: Nil).groupBy("key").count()
    +    verifyNonExchangingAgg(df3)
    +    verifyNonExchangingAgg(testData.distributeBy(Column("key") :: Column("value") :: Nil)
    +      .groupBy("key", "value").count())
    +
    +    // Grouping by just the first distributeBy expr, need to exchange.
    +    verifyExchangingAgg(testData.distributeBy(Column("key") :: Column("value") :: Nil)
    +      .groupBy("key").count())
    +
    +    val data = sqlContext.sparkContext.parallelize(
    +      (1 to 100).map(i => TestData2(i % 10, i))).toDF()
    +
    +    // Distribute and order by.
    +    val df4 = data.distributeBy(Column("a") :: Nil).localSort($"b".desc)
    +    // Walk each partition and verify that it is sorted descending and not globally sorted.
    +    df4.rdd.foreachPartition(p => {
    +      var previousValue: Int = -1
    +      var globallyOrdered: Boolean = true
    +      p.foreach(r => {
    +        val v: Int = r.getInt(1)
    +        if (previousValue != -1) {
    +          if (previousValue < v) throw new SparkException("Partition is not ordered.")
    +          if (v + 1 != previousValue) globallyOrdered = false
    +        }
    +        previousValue = v
    +      })
    +      if (globallyOrdered) throw new SparkException("Partition should not be globally ordered")
    +    })
    +
    +    // Distribute and order by with multiple order bys
    +    val df5 = data.distributeBy(Column("a") :: Nil, 2).localSort($"b".asc, $"a".asc)
    +    // Walk each partition and verify that it is sorted descending and not globally sorted.
    +    df5.rdd.foreachPartition(p => {
    +      var previousValue: Int = -1
    +      var globallyOrdered: Boolean = true
    +      p.foreach(r => {
    +        val v: Int = r.getInt(1)
    +        if (previousValue != -1) {
    +          if (previousValue > v) throw new SparkException("Partition is not ordered.")
    +          if (v - 1 != previousValue) globallyOrdered = false
    +        }
    +        previousValue = v
    +      })
    +      if (globallyOrdered) throw new SparkException("Partition should not be globally ordered")
    +    })
    --- End diff --
    
    I renamed it to allSequential. It's testing that the partitioning is doing something.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11410] [SQL] Add APIs to provide functi...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9364#issuecomment-152864530
  
    **[Test build #44764 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44764/consoleFull)** for PR 9364 at commit [`98c05ae`](https://github.com/apache/spark/commit/98c05ae08281b14dd67151bf8edd55cb884d1061).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:\n  * `case class RepartitionByExpression(`\n


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11410] [SQL] Add APIs to provide functi...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9364#discussion_r43453259
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala ---
    @@ -663,6 +667,35 @@ class DataFrame private[sql](
       def orderBy(sortExprs: Column*): DataFrame = sort(sortExprs : _*)
     
       /**
    +   * Returns a new [[DataFrame]] partitioned by the given partitioning expressions into
    +   * `numPartitions`
    +   * `numPartitions` can be < 0 to preserve the current number of partitions.
    +   * @group dfops
    --- End diff --
    
    Let's add scaladoc to explain that we are using hash partitioning.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11410] [SQL] Add APIs to provide functi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9364#issuecomment-152341865
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11410] [SQL] Add APIs to provide functi...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9364#discussion_r43467527
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala ---
    @@ -987,4 +992,116 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
         val df = (1 to 10).map(Tuple1.apply).toDF("i").as("src")
         assert(df.select($"src.i".cast(StringType)).columns.head === "i")
       }
    +
    +  /**
    +   * Verifies that there is no Exchange between the Aggregations for `df`
    +   */
    +  private def verifyNonExchangingAgg(df: DataFrame) = {
    +    var atFirstAgg: Boolean = false
    +    df.queryExecution.executedPlan.foreach {
    +      case agg: TungstenAggregate => {
    +        atFirstAgg = !atFirstAgg
    +      }
    +      case _ => {
    +        if (atFirstAgg) {
    +          fail("Should not have operators between the two aggregations")
    +        }
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Verifies that there is an Exchange between the Aggregations for `df`
    +   */
    +  private def verifyExchangingAgg(df: DataFrame) = {
    +    var atFirstAgg: Boolean = false
    +    df.queryExecution.executedPlan.foreach {
    +      case agg: TungstenAggregate => {
    +        if (atFirstAgg) {
    +          fail("Should not have back to back Aggregates")
    +        }
    +        atFirstAgg = true
    +      }
    +      case e: Exchange => atFirstAgg = false
    +      case _ =>
    +    }
    +  }
    +
    +  test("distributeBy") {
    +    val original = testData.repartition(1)
    +    assert(original.rdd.partitions.length == 1)
    +    val df = original.distributeBy(Column("key") :: Nil, 5)
    +    assert(df.rdd.partitions.length  == 5)
    +    checkAnswer(original.select(), df.select())
    +
    +    val df2 = original.distributeBy(Column("key") :: Nil, 10)
    +    assert(df2.rdd.partitions.length  == 10)
    +    checkAnswer(original.select(), df2.select())
    +
    +    // Group by the column we are distributed by. This should generate a plan with no exchange
    +    // between the aggregates
    +    val df3 = testData.distributeBy(Column("key") :: Nil).groupBy("key").count()
    +    verifyNonExchangingAgg(df3)
    +    verifyNonExchangingAgg(testData.distributeBy(Column("key") :: Column("value") :: Nil)
    +      .groupBy("key", "value").count())
    +
    +    // Grouping by just the first distributeBy expr, need to exchange.
    +    verifyExchangingAgg(testData.distributeBy(Column("key") :: Column("value") :: Nil)
    +      .groupBy("key").count())
    +
    +    val data = sqlContext.sparkContext.parallelize(
    +      (1 to 100).map(i => TestData2(i % 10, i))).toDF()
    +
    +    // Distribute and order by.
    +    val df4 = data.distributeBy(Column("a") :: Nil).localSort($"b".desc)
    +    // Walk each partition and verify that it is sorted descending and not globally sorted.
    +    df4.rdd.foreachPartition(p => {
    +      var previousValue: Int = -1
    +      var globallyOrdered: Boolean = true
    +      p.foreach(r => {
    +        val v: Int = r.getInt(1)
    +        if (previousValue != -1) {
    +          if (previousValue < v) throw new SparkException("Partition is not ordered.")
    +          if (v + 1 != previousValue) globallyOrdered = false
    +        }
    +        previousValue = v
    +      })
    +      if (globallyOrdered) throw new SparkException("Partition should not be globally ordered")
    +    })
    +
    +    // Distribute and order by with multiple order bys
    +    val df5 = data.distributeBy(Column("a") :: Nil, 2).localSort($"b".asc, $"a".asc)
    +    // Walk each partition and verify that it is sorted descending and not globally sorted.
    +    df5.rdd.foreachPartition(p => {
    +      var previousValue: Int = -1
    +      var globallyOrdered: Boolean = true
    +      p.foreach(r => {
    +        val v: Int = r.getInt(1)
    +        if (previousValue != -1) {
    +          if (previousValue > v) throw new SparkException("Partition is not ordered.")
    +          if (v - 1 != previousValue) globallyOrdered = false
    +        }
    +        previousValue = v
    +      })
    +      if (globallyOrdered) throw new SparkException("Partition should not be globally ordered")
    +    })
    --- End diff --
    
    I am confused by the name of `globallyOrdered`. What does it mean? Looks like you want to check `a` is sorted in the ascending order within every group of `b`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11410] [SQL] Add APIs to provide functi...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/9364


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11410] [SQL] Add APIs to provide functi...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on the pull request:

    https://github.com/apache/spark/pull/9364#issuecomment-152852982
  
    ok to test


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11410] [SQL] Add APIs to provide functi...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on the pull request:

    https://github.com/apache/spark/pull/9364#issuecomment-152396487
  
    How about we update the title to `[SPARK-11410] [SQL] Add APIs to provide functionality similar to Hive's DISTRIBUTE BY and SORT BY`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11410] [SQL] Add APIs to provide functi...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9364#issuecomment-152853278
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11410] [SQL] Add APIs to provide functi...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9364#discussion_r43452307
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala ---
    @@ -19,6 +19,11 @@ package org.apache.spark.sql
     
     import java.io.File
     
    +import org.apache.spark.SparkException
    +import org.apache.spark.sql.execution.Exchange
    +import org.apache.spark.sql.execution.aggregate.TungstenAggregate
    +import org.apache.spark.sql.test.SQLTestData.TestData2
    +
    --- End diff --
    
    Let's order these imports based on https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide#SparkCodeStyleGuide-Imports.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11410] [SQL] Add APIs to provide functi...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9364#discussion_r43586777
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala ---
    @@ -31,10 +31,18 @@ case class SortPartitions(sortExpressions: Seq[SortOrder], child: LogicalPlan)
       extends RedistributeData
     
     /**
    - * This method repartitions data using [[Expression]]s, and receives information about the
    - * number of partitions during execution. Used when a specific ordering or distribution is
    - * expected by the consumer of the query result. Use [[Repartition]] for RDD-like
    + * This method repartitions data using [[Expression]]s into `numPartitions`, and receives
    + * information about the number of partitions during execution. Used when a specific ordering or
    + * distribution is expected by the consumer of the query result. Use [[Repartition]] for RDD-like
      * `coalesce` and `repartition`.
    + * If `numPartitions` is not specified, the partitioning of `child` is preserved.
    --- End diff --
    
    Seems we need to update this conf to something like `If numPartitions  is not specified, the value of spark.sql.shuffle.partitions will be used as the number of partitions.` I can fix it when I merge it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11410] [SQL] Add APIs to provide functi...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9364#discussion_r43452517
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala ---
    @@ -987,4 +992,116 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
         val df = (1 to 10).map(Tuple1.apply).toDF("i").as("src")
         assert(df.select($"src.i".cast(StringType)).columns.head === "i")
       }
    +
    +  /**
    +   * Verifies that there is no Exchange between the Aggregations for `df`
    +   */
    +  private def verifyNonExchangingAgg(df: DataFrame) = {
    +    var atFirstAgg: Boolean = false
    +    df.queryExecution.executedPlan.foreach {
    +      case agg: TungstenAggregate => {
    +        atFirstAgg = !atFirstAgg
    +      }
    +      case _ => {
    +        if (atFirstAgg) {
    +          fail("Should not have operators between the two aggregations")
    +        }
    +      }
    +    }
    --- End diff --
    
    Also, if this method is small, we can move it in `test("distributeBy")` since it is only used by this test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org