You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by viirya <gi...@git.apache.org> on 2018/11/05 09:02:46 UTC

[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...

GitHub user viirya opened a pull request:

    https://github.com/apache/spark/pull/22944

    [SPARK-25942][SQL] Fix Dataset.groupByKey to make it work on primitive data

    ## What changes were proposed in this pull request?
    
    `Dataset.groupByKey` can't work on primitive data now. When we access `value` in aggregate function, it conflicts with the name of grouping attribute.
    
    ## How was this patch tested?
    
    Added test.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/viirya/spark-1 dataset_agg

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/22944.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #22944
    
----
commit 7564d98a9b1242f1eea167152ed97371416c204d
Author: Liang-Chi Hsieh <vi...@...>
Date:   2018-11-05T00:40:52Z

    Fix groupByKey.

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22944: [SPARK-25942][SQL] Aggregate expressions shouldn't be re...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22944
  
    **[Test build #98774 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/98774/testReport)** for PR 22944 at commit [`f6017ea`](https://github.com/apache/spark/commit/f6017ea1b53d0d41d0967ac57343dd969ec1cf2f).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make it wor...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22944
  
    **[Test build #98556 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/98556/testReport)** for PR 22944 at commit [`a8dbd9f`](https://github.com/apache/spark/commit/a8dbd9fcd3bd8c74ca18c76ea240d27ea36c3655).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make it wor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22944
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make it wor...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22944
  
    **[Test build #98557 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/98557/testReport)** for PR 22944 at commit [`588f40e`](https://github.com/apache/spark/commit/588f40e7e5a8beac5ebdf87f48448d86a0783946).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22944: [SPARK-25942][SQL] Aggregate expressions shouldn't be re...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22944
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22944: [SPARK-25942][SQL] Aggregate expressions shouldn't be re...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22944
  
    **[Test build #98752 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/98752/testReport)** for PR 22944 at commit [`71dff40`](https://github.com/apache/spark/commit/71dff408a3da828e628aa29f81e30cdcb822fd37).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22944#discussion_r231170974
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala ---
    @@ -262,25 +262,39 @@ object AppendColumns {
       def apply[T : Encoder, U : Encoder](
           func: T => U,
           child: LogicalPlan): AppendColumns = {
    +    val outputEncoder = encoderFor[U]
    +    val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
    +      assert(outputEncoder.namedExpressions.length == 1)
    +      outputEncoder.namedExpressions.map(Alias(_, "key")())
    +    } else {
    +      outputEncoder.namedExpressions
    +    }
         new AppendColumns(
           func.asInstanceOf[Any => Any],
           implicitly[Encoder[T]].clsTag.runtimeClass,
           implicitly[Encoder[T]].schema,
           UnresolvedDeserializer(encoderFor[T].deserializer),
    -      encoderFor[U].namedExpressions,
    +      namedExpressions,
           child)
       }
     
       def apply[T : Encoder, U : Encoder](
           func: T => U,
           inputAttributes: Seq[Attribute],
           child: LogicalPlan): AppendColumns = {
    +    val outputEncoder = encoderFor[U]
    +    val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
    +      assert(outputEncoder.namedExpressions.length == 1)
    +      outputEncoder.namedExpressions.map(Alias(_, "key")())
    +    } else {
    +      outputEncoder.namedExpressions
    --- End diff --
    
    Can you elaborate it more?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22944: [SPARK-25942][SQL] Aggregate expressions shouldn't be re...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22944
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/4976/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22944: [SPARK-25942][SQL] Aggregate expressions shouldn't be re...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22944
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/98776/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22944#discussion_r230983077
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala ---
    @@ -262,25 +262,39 @@ object AppendColumns {
       def apply[T : Encoder, U : Encoder](
           func: T => U,
           child: LogicalPlan): AppendColumns = {
    +    val outputEncoder = encoderFor[U]
    +    val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
    +      assert(outputEncoder.namedExpressions.length == 1)
    +      outputEncoder.namedExpressions.map(Alias(_, "key")())
    +    } else {
    +      outputEncoder.namedExpressions
    +    }
         new AppendColumns(
           func.asInstanceOf[Any => Any],
           implicitly[Encoder[T]].clsTag.runtimeClass,
           implicitly[Encoder[T]].schema,
           UnresolvedDeserializer(encoderFor[T].deserializer),
    -      encoderFor[U].namedExpressions,
    +      namedExpressions,
           child)
       }
     
       def apply[T : Encoder, U : Encoder](
           func: T => U,
           inputAttributes: Seq[Attribute],
           child: LogicalPlan): AppendColumns = {
    +    val outputEncoder = encoderFor[U]
    +    val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
    +      assert(outputEncoder.namedExpressions.length == 1)
    +      outputEncoder.namedExpressions.map(Alias(_, "key")())
    +    } else {
    +      outputEncoder.namedExpressions
    --- End diff --
    
    yea, but for such cases, seems it is more complicated as we can't simply create aliases for serializer fields. Because in methods like `mapGroups`, we need to access original fields of key.
    



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make it wor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22944
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/4766/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22944: [SPARK-25942][SQL] Aggregate expressions shouldn't be re...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22944
  
    **[Test build #98752 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/98752/testReport)** for PR 22944 at commit [`71dff40`](https://github.com/apache/spark/commit/71dff408a3da828e628aa29f81e30cdcb822fd37).
     * This patch **fails due to an unknown error code, -9**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22944: [SPARK-25942][SQL] Aggregate expressions shouldn'...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22944#discussion_r232699342
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala ---
    @@ -1556,6 +1556,20 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
           df.where($"city".contains(new java.lang.Character('A'))),
           Seq(Row("Amsterdam")))
       }
    +
    +  test("SPARK-25942: typed aggregation on primitive type") {
    +    val ds = Seq(1, 2, 3).toDS()
    +
    +    val agg = ds.groupByKey(_ >= 2)
    +      .agg(sum("value").as[Long], sum($"value" + 1).as[Long])
    --- End diff --
    
    BTW, if we have to do it in analyzer, can we remove `TypedAggregateExpression.withInputInfo` and put all the logic in analyzer?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22944#discussion_r230989073
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala ---
    @@ -262,25 +262,39 @@ object AppendColumns {
       def apply[T : Encoder, U : Encoder](
           func: T => U,
           child: LogicalPlan): AppendColumns = {
    +    val outputEncoder = encoderFor[U]
    +    val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
    +      assert(outputEncoder.namedExpressions.length == 1)
    +      outputEncoder.namedExpressions.map(Alias(_, "key")())
    +    } else {
    +      outputEncoder.namedExpressions
    +    }
         new AppendColumns(
           func.asInstanceOf[Any => Any],
           implicitly[Encoder[T]].clsTag.runtimeClass,
           implicitly[Encoder[T]].schema,
           UnresolvedDeserializer(encoderFor[T].deserializer),
    -      encoderFor[U].namedExpressions,
    +      namedExpressions,
           child)
       }
     
       def apply[T : Encoder, U : Encoder](
           func: T => U,
           inputAttributes: Seq[Attribute],
           child: LogicalPlan): AppendColumns = {
    +    val outputEncoder = encoderFor[U]
    +    val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
    +      assert(outputEncoder.namedExpressions.length == 1)
    +      outputEncoder.namedExpressions.map(Alias(_, "key")())
    +    } else {
    +      outputEncoder.namedExpressions
    --- End diff --
    
    We can improve the `CheckAnalysis` to detect this case, and improve the error message to ask users to do alias.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22944#discussion_r230997935
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala ---
    @@ -262,25 +262,39 @@ object AppendColumns {
       def apply[T : Encoder, U : Encoder](
           func: T => U,
           child: LogicalPlan): AppendColumns = {
    +    val outputEncoder = encoderFor[U]
    +    val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
    +      assert(outputEncoder.namedExpressions.length == 1)
    +      outputEncoder.namedExpressions.map(Alias(_, "key")())
    +    } else {
    +      outputEncoder.namedExpressions
    +    }
         new AppendColumns(
           func.asInstanceOf[Any => Any],
           implicitly[Encoder[T]].clsTag.runtimeClass,
           implicitly[Encoder[T]].schema,
           UnresolvedDeserializer(encoderFor[T].deserializer),
    -      encoderFor[U].namedExpressions,
    +      namedExpressions,
           child)
       }
     
       def apply[T : Encoder, U : Encoder](
           func: T => U,
           inputAttributes: Seq[Attribute],
           child: LogicalPlan): AppendColumns = {
    +    val outputEncoder = encoderFor[U]
    +    val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
    +      assert(outputEncoder.namedExpressions.length == 1)
    +      outputEncoder.namedExpressions.map(Alias(_, "key")())
    +    } else {
    +      outputEncoder.namedExpressions
    --- End diff --
    
    is this a special case of option of product? can you try pritimive type and product type?



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22944: [SPARK-25942][SQL] Aggregate expressions shouldn'...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22944#discussion_r232926151
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala ---
    @@ -1556,6 +1556,20 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
           df.where($"city".contains(new java.lang.Character('A'))),
           Seq(Row("Amsterdam")))
       }
    +
    +  test("SPARK-25942: typed aggregation on primitive type") {
    +    val ds = Seq(1, 2, 3).toDS()
    +
    +    val agg = ds.groupByKey(_ >= 2)
    +      .agg(sum("value").as[Long], sum($"value" + 1).as[Long])
    +    assert(agg.collect() === Seq((false, 1, 2), (true, 5, 7)))
    +  }
    +
    +  test("SPARK-25942: typed aggregation on product type") {
    +    val ds = Seq((1, 2), (2, 3), (3, 4)).toDS()
    +    val agg = ds.groupByKey(x => x).agg(sum("_1").as[Long], sum($"_2" + 1).as[Long])
    +    assert(agg.collect().sorted === Seq(((1, 2), 1, 3), ((2, 3), 2, 4), ((3, 4), 3, 5)))
    --- End diff --
    
    Using `checkDataset` comes out an error:
    ```
    [error]  found   : org.apache.spark.sql.Dataset[((Int, Int), Long, Long)]
    [error]  required: org.apache.spark.sql.Dataset[((Int, Int), AnyVal, AnyVal)]
    [error] Note: ((Int, Int), Long, Long) <: ((Int, Int), AnyVal, AnyVal), but class Dataset is invariant in type T.
    [error] You may wish to define T as +T instead. (SLS 4.5)
    [error]     checkDataset(agg, ((1, 2), 1, 3), ((2, 3), 2, 4), ((3, 4), 3, 5))
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make it wor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22944
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make it wor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22944
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make it wor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22944
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/4764/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22944: [SPARK-25942][SQL] Aggregate expressions shouldn't be re...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22944
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22944: [SPARK-25942][SQL] Aggregate expressions shouldn't be re...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22944
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make it wor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22944
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22944: [SPARK-25942][SQL] Aggregate expressions shouldn'...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22944#discussion_r232905784
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala ---
    @@ -1556,6 +1556,20 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
           df.where($"city".contains(new java.lang.Character('A'))),
           Seq(Row("Amsterdam")))
       }
    +
    +  test("SPARK-25942: typed aggregation on primitive type") {
    +    val ds = Seq(1, 2, 3).toDS()
    +
    +    val agg = ds.groupByKey(_ >= 2)
    +      .agg(sum("value").as[Long], sum($"value" + 1).as[Long])
    +    assert(agg.collect() === Seq((false, 1, 2), (true, 5, 7)))
    +  }
    +
    +  test("SPARK-25942: typed aggregation on product type") {
    +    val ds = Seq((1, 2), (2, 3), (3, 4)).toDS()
    +    val agg = ds.groupByKey(x => x).agg(sum("_1").as[Long], sum($"_2" + 1).as[Long])
    +    assert(agg.collect().sorted === Seq(((1, 2), 1, 3), ((2, 3), 2, 4), ((3, 4), 3, 5)))
    --- End diff --
    
    can we use `checkAnswer`/`CheckDataset`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22944: [SPARK-25942][SQL] Aggregate expressions shouldn't be re...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/22944
  
    retest this please.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22944: [SPARK-25942][SQL] Aggregate expressions shouldn't be re...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22944
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/98766/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22944#discussion_r231001655
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala ---
    @@ -262,25 +262,39 @@ object AppendColumns {
       def apply[T : Encoder, U : Encoder](
           func: T => U,
           child: LogicalPlan): AppendColumns = {
    +    val outputEncoder = encoderFor[U]
    +    val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
    +      assert(outputEncoder.namedExpressions.length == 1)
    +      outputEncoder.namedExpressions.map(Alias(_, "key")())
    +    } else {
    +      outputEncoder.namedExpressions
    +    }
         new AppendColumns(
           func.asInstanceOf[Any => Any],
           implicitly[Encoder[T]].clsTag.runtimeClass,
           implicitly[Encoder[T]].schema,
           UnresolvedDeserializer(encoderFor[T].deserializer),
    -      encoderFor[U].namedExpressions,
    +      namedExpressions,
           child)
       }
     
       def apply[T : Encoder, U : Encoder](
           func: T => U,
           inputAttributes: Seq[Attribute],
           child: LogicalPlan): AppendColumns = {
    +    val outputEncoder = encoderFor[U]
    +    val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
    +      assert(outputEncoder.namedExpressions.length == 1)
    +      outputEncoder.namedExpressions.map(Alias(_, "key")())
    +    } else {
    +      outputEncoder.namedExpressions
    --- End diff --
    
    For primitive type and product type, looks like it works:
    ```scala
    test("typed aggregation on primitive data") {
      val ds = Seq(1, 2, 3).toDS()
      val agg = ds.select(expr("value").as("data").as[Int])
        .groupByKey(_ >= 2)
        .agg(sum("data").as[Long], sum($"data" + 1).as[Long])
      agg.show()
    }
    ```
    ```
    +-----+---------+---------------+
    |value|sum(data)|sum((data + 1))|
    +-----+---------+---------------+
    |false|        1|              2|
    | true|        5|              7|
    +-----+---------+---------------+
    ```
    
    ```scala
    test("typed aggregation on product data") {
      val ds = Seq((1, 2), (2, 3), (3, 4)).toDS()
      val agg = ds.select(expr("_1").as("a").as[Int], expr("_2").as("b").as[Int])
        .groupByKey(_._1).agg(sum("a").as[Int], sum($"b" + 1).as[Int])
      agg.show
    }
    ```
    ```
    [info] - typed aggregation on primitive data (192 milliseconds)
    +-----+------+------------+
    |value|sum(a)|sum((b + 1))|
    +-----+------+------------+
    |    3|     3|           5|
    |    1|     1|           3|
    |    2|     2|           4|
    +-----+------+------------+
    
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22944#discussion_r230989493
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala ---
    @@ -262,25 +262,39 @@ object AppendColumns {
       def apply[T : Encoder, U : Encoder](
           func: T => U,
           child: LogicalPlan): AppendColumns = {
    +    val outputEncoder = encoderFor[U]
    +    val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
    +      assert(outputEncoder.namedExpressions.length == 1)
    +      outputEncoder.namedExpressions.map(Alias(_, "key")())
    +    } else {
    +      outputEncoder.namedExpressions
    +    }
         new AppendColumns(
           func.asInstanceOf[Any => Any],
           implicitly[Encoder[T]].clsTag.runtimeClass,
           implicitly[Encoder[T]].schema,
           UnresolvedDeserializer(encoderFor[T].deserializer),
    -      encoderFor[U].namedExpressions,
    +      namedExpressions,
           child)
       }
     
       def apply[T : Encoder, U : Encoder](
           func: T => U,
           inputAttributes: Seq[Attribute],
           child: LogicalPlan): AppendColumns = {
    +    val outputEncoder = encoderFor[U]
    +    val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
    +      assert(outputEncoder.namedExpressions.length == 1)
    +      outputEncoder.namedExpressions.map(Alias(_, "key")())
    +    } else {
    +      outputEncoder.namedExpressions
    --- End diff --
    
    Ok. Sounds good. Let me do the change.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make it wor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22944
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/98557/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22944#discussion_r230986226
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala ---
    @@ -262,25 +262,39 @@ object AppendColumns {
       def apply[T : Encoder, U : Encoder](
           func: T => U,
           child: LogicalPlan): AppendColumns = {
    +    val outputEncoder = encoderFor[U]
    +    val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
    +      assert(outputEncoder.namedExpressions.length == 1)
    +      outputEncoder.namedExpressions.map(Alias(_, "key")())
    +    } else {
    +      outputEncoder.namedExpressions
    +    }
         new AppendColumns(
           func.asInstanceOf[Any => Any],
           implicitly[Encoder[T]].clsTag.runtimeClass,
           implicitly[Encoder[T]].schema,
           UnresolvedDeserializer(encoderFor[T].deserializer),
    -      encoderFor[U].namedExpressions,
    +      namedExpressions,
           child)
       }
     
       def apply[T : Encoder, U : Encoder](
           func: T => U,
           inputAttributes: Seq[Attribute],
           child: LogicalPlan): AppendColumns = {
    +    val outputEncoder = encoderFor[U]
    +    val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
    +      assert(outputEncoder.namedExpressions.length == 1)
    +      outputEncoder.namedExpressions.map(Alias(_, "key")())
    +    } else {
    +      outputEncoder.namedExpressions
    --- End diff --
    
    if that is the case, I feel it better to ask users to resolve conflict manually, by adding alias.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make it wor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22944
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make it wor...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22944
  
    **[Test build #98469 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/98469/testReport)** for PR 22944 at commit [`5ef23f2`](https://github.com/apache/spark/commit/5ef23f27593efa968bed764a21988ccd7583a44c).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22944#discussion_r231350156
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala ---
    @@ -262,25 +262,39 @@ object AppendColumns {
       def apply[T : Encoder, U : Encoder](
           func: T => U,
           child: LogicalPlan): AppendColumns = {
    +    val outputEncoder = encoderFor[U]
    +    val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
    +      assert(outputEncoder.namedExpressions.length == 1)
    +      outputEncoder.namedExpressions.map(Alias(_, "key")())
    +    } else {
    +      outputEncoder.namedExpressions
    +    }
         new AppendColumns(
           func.asInstanceOf[Any => Any],
           implicitly[Encoder[T]].clsTag.runtimeClass,
           implicitly[Encoder[T]].schema,
           UnresolvedDeserializer(encoderFor[T].deserializer),
    -      encoderFor[U].namedExpressions,
    +      namedExpressions,
           child)
       }
     
       def apply[T : Encoder, U : Encoder](
           func: T => U,
           inputAttributes: Seq[Attribute],
           child: LogicalPlan): AppendColumns = {
    +    val outputEncoder = encoderFor[U]
    +    val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
    +      assert(outputEncoder.namedExpressions.length == 1)
    +      outputEncoder.namedExpressions.map(Alias(_, "key")())
    +    } else {
    +      outputEncoder.namedExpressions
    --- End diff --
    
    Thanks, I see. For this primitive type case, is current fix ok? Or we should deal with case classes together?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22944: [SPARK-25942][SQL] Aggregate expressions shouldn'...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22944#discussion_r232698607
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala ---
    @@ -1556,6 +1556,20 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
           df.where($"city".contains(new java.lang.Character('A'))),
           Seq(Row("Amsterdam")))
       }
    +
    +  test("SPARK-25942: typed aggregation on primitive type") {
    +    val ds = Seq(1, 2, 3).toDS()
    +
    +    val agg = ds.groupByKey(_ >= 2)
    +      .agg(sum("value").as[Long], sum($"value" + 1).as[Long])
    --- End diff --
    
    ah i see. Can you add a comment in `TypedColumn.withInputType` and say that untyped normal aggregate expressions are handled in the analyzer directly?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22944: [SPARK-25942][SQL] Aggregate expressions shouldn'...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22944#discussion_r232556359
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala ---
    @@ -1556,6 +1556,20 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
           df.where($"city".contains(new java.lang.Character('A'))),
           Seq(Row("Amsterdam")))
       }
    +
    +  test("SPARK-25942: typed aggregation on primitive type") {
    +    val ds = Seq(1, 2, 3).toDS()
    +
    +    val agg = ds.groupByKey(_ >= 2)
    +      .agg(sum("value").as[Long], sum($"value" + 1).as[Long])
    --- End diff --
    
    I think we should not make decisions for users. For untyped APIs, users can refer the grouping columns in the aggregate expressions, I think the typed APIs should be same.
    
    For this particular case, currrently spark allows grouping columns inside aggregate functions, so the `value` here is indeed ambiguous. There is nothing we can do, but fail and ask users to add alias.
    
    BTW, we should check other databases and see if "grouping columns inside aggregate functions" should be allowed,


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22944: [SPARK-25942][SQL] Aggregate expressions shouldn'...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22944#discussion_r232666996
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala ---
    @@ -1556,6 +1556,20 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
           df.where($"city".contains(new java.lang.Character('A'))),
           Seq(Row("Amsterdam")))
       }
    +
    +  test("SPARK-25942: typed aggregation on primitive type") {
    +    val ds = Seq(1, 2, 3).toDS()
    +
    +    val agg = ds.groupByKey(_ >= 2)
    +      .agg(sum("value").as[Long], sum($"value" + 1).as[Long])
    --- End diff --
    
    looking at `KeyValueGroupedDataset.dataAttributes`, seems we do want to resolve things based on the plan below `AppendColumns`. And in `KeyValueGroupedDataset#aggUntyped`, we do resolve expressions based on `dataAttributes`. Do you why it doesn't work?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22944: [SPARK-25942][SQL] Aggregate expressions shouldn't be re...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22944
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make it wor...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22944
  
    **[Test build #98557 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/98557/testReport)** for PR 22944 at commit [`588f40e`](https://github.com/apache/spark/commit/588f40e7e5a8beac5ebdf87f48448d86a0783946).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22944: [SPARK-25942][SQL] Aggregate expressions shouldn'...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22944#discussion_r232928066
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala ---
    @@ -1556,6 +1556,20 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
           df.where($"city".contains(new java.lang.Character('A'))),
           Seq(Row("Amsterdam")))
       }
    +
    +  test("SPARK-25942: typed aggregation on primitive type") {
    +    val ds = Seq(1, 2, 3).toDS()
    +
    +    val agg = ds.groupByKey(_ >= 2)
    +      .agg(sum("value").as[Long], sum($"value" + 1).as[Long])
    +    assert(agg.collect() === Seq((false, 1, 2), (true, 5, 7)))
    +  }
    +
    +  test("SPARK-25942: typed aggregation on product type") {
    +    val ds = Seq((1, 2), (2, 3), (3, 4)).toDS()
    +    val agg = ds.groupByKey(x => x).agg(sum("_1").as[Long], sum($"_2" + 1).as[Long])
    +    assert(agg.collect().sorted === Seq(((1, 2), 1, 3), ((2, 3), 2, 4), ((3, 4), 3, 5)))
    --- End diff --
    
    Is there any suggestion?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22944: [SPARK-25942][SQL] Aggregate expressions shouldn'...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22944#discussion_r232967650
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala ---
    @@ -1556,6 +1556,20 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
           df.where($"city".contains(new java.lang.Character('A'))),
           Seq(Row("Amsterdam")))
       }
    +
    +  test("SPARK-25942: typed aggregation on primitive type") {
    +    val ds = Seq(1, 2, 3).toDS()
    +
    +    val agg = ds.groupByKey(_ >= 2)
    +      .agg(sum("value").as[Long], sum($"value" + 1).as[Long])
    +    assert(agg.collect() === Seq((false, 1, 2), (true, 5, 7)))
    +  }
    +
    +  test("SPARK-25942: typed aggregation on product type") {
    +    val ds = Seq((1, 2), (2, 3), (3, 4)).toDS()
    +    val agg = ds.groupByKey(x => x).agg(sum("_1").as[Long], sum($"_2" + 1).as[Long])
    +    assert(agg.collect().sorted === Seq(((1, 2), 1, 3), ((2, 3), 2, 4), ((3, 4), 3, 5)))
    --- End diff --
    
    ah, cool, I see.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make it wor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22944
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/98558/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22944#discussion_r230781583
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala ---
    @@ -1556,6 +1556,14 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
           df.where($"city".contains(new java.lang.Character('A'))),
           Seq(Row("Amsterdam")))
       }
    +
    +  test("SPARK-25942: typed aggregation on primitive data") {
    --- End diff --
    
    It should not be introduced recently. I think we encode primitive data as a `value` field at beginning. 
    
    Because `AppendColumns` takes serializers' named expressions as `newColumns`. For primitive data, it conflicts with original data attribute `value`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22944: [SPARK-25942][SQL] Aggregate expressions shouldn'...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22944#discussion_r232571836
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala ---
    @@ -1556,6 +1556,20 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
           df.where($"city".contains(new java.lang.Character('A'))),
           Seq(Row("Amsterdam")))
       }
    +
    +  test("SPARK-25942: typed aggregation on primitive type") {
    +    val ds = Seq(1, 2, 3).toDS()
    +
    +    val agg = ds.groupByKey(_ >= 2)
    +      .agg(sum("value").as[Long], sum($"value" + 1).as[Long])
    --- End diff --
    
    I think typed groupByKey is somehow different to untyped groupBy API.
    
    For untyped groupBy API, the grouping attributes are from original relation and visible to users before grouping. It is reasonable to access grouping attributes in aggregate expressions for untype groupBy.
    
    The key attributes of typed groupByKey API are appended by users in addition to original data attributes. The key attributes are from typed user-provided function. It can be primitive type or complex type like case class.
    
    It makes difficulties to have general and reasonable access to key attributes in aggregate expressions for typed groupByKey API. For example, for case class type, key attributes are flatten alongside original data attributes. So aggregate expressions should access single typed key or flatten attributes? In other words, if key object is `(Int, String)`, how aggregate expressions use it?
    
    From the detail of `aggUntyped`, it also shows that key attributes should not be accessed in aggregate expressions in typed groupByKey API. In `aggUntyped`, we call `withInputType` to bind value encoders on data attributes. So typed aggregate expressions can't access key attributes too.



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22944: [SPARK-25942][SQL] Aggregate expressions shouldn't be re...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22944
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22944#discussion_r231171593
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala ---
    @@ -262,25 +262,39 @@ object AppendColumns {
       def apply[T : Encoder, U : Encoder](
           func: T => U,
           child: LogicalPlan): AppendColumns = {
    +    val outputEncoder = encoderFor[U]
    +    val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
    +      assert(outputEncoder.namedExpressions.length == 1)
    +      outputEncoder.namedExpressions.map(Alias(_, "key")())
    +    } else {
    +      outputEncoder.namedExpressions
    +    }
         new AppendColumns(
           func.asInstanceOf[Any => Any],
           implicitly[Encoder[T]].clsTag.runtimeClass,
           implicitly[Encoder[T]].schema,
           UnresolvedDeserializer(encoderFor[T].deserializer),
    -      encoderFor[U].namedExpressions,
    +      namedExpressions,
           child)
       }
     
       def apply[T : Encoder, U : Encoder](
           func: T => U,
           inputAttributes: Seq[Attribute],
           child: LogicalPlan): AppendColumns = {
    +    val outputEncoder = encoderFor[U]
    +    val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
    +      assert(outputEncoder.namedExpressions.length == 1)
    +      outputEncoder.namedExpressions.map(Alias(_, "key")())
    +    } else {
    +      outputEncoder.namedExpressions
    --- End diff --
    
    Oh, I see. Do you mean we don't do such check in `AttributeSeq.resolve` but leave it to `CheckAnalysis`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22944: [SPARK-25942][SQL] Aggregate expressions shouldn't be re...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/22944
  
    ping @cloud-fan 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22944: [SPARK-25942][SQL] Aggregate expressions shouldn't be re...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22944
  
    **[Test build #98766 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/98766/testReport)** for PR 22944 at commit [`f6017ea`](https://github.com/apache/spark/commit/f6017ea1b53d0d41d0967ac57343dd969ec1cf2f).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make it wor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22944
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22944: [SPARK-25942][SQL] Aggregate expressions shouldn't be re...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22944
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22944: [SPARK-25942][SQL] Aggregate expressions shouldn't be re...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22944
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22944#discussion_r231358690
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala ---
    @@ -262,25 +262,39 @@ object AppendColumns {
       def apply[T : Encoder, U : Encoder](
           func: T => U,
           child: LogicalPlan): AppendColumns = {
    +    val outputEncoder = encoderFor[U]
    +    val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
    +      assert(outputEncoder.namedExpressions.length == 1)
    +      outputEncoder.namedExpressions.map(Alias(_, "key")())
    +    } else {
    +      outputEncoder.namedExpressions
    +    }
         new AppendColumns(
           func.asInstanceOf[Any => Any],
           implicitly[Encoder[T]].clsTag.runtimeClass,
           implicitly[Encoder[T]].schema,
           UnresolvedDeserializer(encoderFor[T].deserializer),
    -      encoderFor[U].namedExpressions,
    +      namedExpressions,
           child)
       }
     
       def apply[T : Encoder, U : Encoder](
           func: T => U,
           inputAttributes: Seq[Attribute],
           child: LogicalPlan): AppendColumns = {
    +    val outputEncoder = encoderFor[U]
    +    val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
    +      assert(outputEncoder.namedExpressions.length == 1)
    +      outputEncoder.namedExpressions.map(Alias(_, "key")())
    +    } else {
    +      outputEncoder.namedExpressions
    --- End diff --
    
    I wouldn't special-case primitive type while this is a general problem.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22944: [SPARK-25942][SQL] Aggregate expressions shouldn'...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22944#discussion_r232665302
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala ---
    @@ -1556,6 +1556,20 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
           df.where($"city".contains(new java.lang.Character('A'))),
           Seq(Row("Amsterdam")))
       }
    +
    +  test("SPARK-25942: typed aggregation on primitive type") {
    +    val ds = Seq(1, 2, 3).toDS()
    +
    +    val agg = ds.groupByKey(_ >= 2)
    +      .agg(sum("value").as[Long], sum($"value" + 1).as[Long])
    --- End diff --
    
    ah i see your point.
    
    For untyped API, `df.groupBy...agg...` produces one plan node, but the typed API `df.groupByKey...agg...` produces 2 plan nodes.
    
    makes sense to me.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make it wor...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22944
  
    **[Test build #98469 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/98469/testReport)** for PR 22944 at commit [`5ef23f2`](https://github.com/apache/spark/commit/5ef23f27593efa968bed764a21988ccd7583a44c).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22944: [SPARK-25942][SQL] Aggregate expressions shouldn't be re...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22944
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22944: [SPARK-25942][SQL] Aggregate expressions shouldn't be re...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22944
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/98761/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22944: [SPARK-25942][SQL] Aggregate expressions shouldn't be re...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22944
  
    **[Test build #98776 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/98776/testReport)** for PR 22944 at commit [`f6017ea`](https://github.com/apache/spark/commit/f6017ea1b53d0d41d0967ac57343dd969ec1cf2f).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make it wor...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/22944
  
    @cloud-fan I have a simpler fix for this issue. Can you take another look? Thanks.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22944#discussion_r230995240
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala ---
    @@ -262,25 +262,39 @@ object AppendColumns {
       def apply[T : Encoder, U : Encoder](
           func: T => U,
           child: LogicalPlan): AppendColumns = {
    +    val outputEncoder = encoderFor[U]
    +    val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
    +      assert(outputEncoder.namedExpressions.length == 1)
    +      outputEncoder.namedExpressions.map(Alias(_, "key")())
    +    } else {
    +      outputEncoder.namedExpressions
    +    }
         new AppendColumns(
           func.asInstanceOf[Any => Any],
           implicitly[Encoder[T]].clsTag.runtimeClass,
           implicitly[Encoder[T]].schema,
           UnresolvedDeserializer(encoderFor[T].deserializer),
    -      encoderFor[U].namedExpressions,
    +      namedExpressions,
           child)
       }
     
       def apply[T : Encoder, U : Encoder](
           func: T => U,
           inputAttributes: Seq[Attribute],
           child: LogicalPlan): AppendColumns = {
    +    val outputEncoder = encoderFor[U]
    +    val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
    +      assert(outputEncoder.namedExpressions.length == 1)
    +      outputEncoder.namedExpressions.map(Alias(_, "key")())
    +    } else {
    +      outputEncoder.namedExpressions
    --- End diff --
    
    I just tried to manually add alias. It seems not working as we expect:
    
    ```scala
    val ds = Seq(Some(("a", 10)), Some(("a", 20)), Some(("b", 1)), Some(("b", 2)), Some(("c", 1)), None).toDS()
    
    val newDS = ds.select(expr("value").as("opt").as[Option[(String, Int)]])
    
    // schema
    root                                                                                                                                                   
     |-- value: struct (nullable = true)                                                                                                                   
     |    |-- _1: string (nullable = true)                                                                                                          
     |    |-- _2: integer (nullable = false)
    
    // physical plan
    *(1) SerializeFromObject [if (isnull(unwrapoption(ObjectType(class scala.Tuple2), input[0, scala.Option, true]))) null else named_struct(_1, staticinvo
    ke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(unwrapoption(ObjectType(class scala.Tuple2), input[0, scala.Op
    tion, true]))._1, true, false), _2, assertnotnull(unwrapoption(ObjectType(class scala.Tuple2), input[0, scala.Option, true]))._2) AS value#5482]
    +- *(1) MapElements <function1>, obj#5481: scala.Option                                             
       +- *(1) DeserializeToObject newInstance(class scala.Tuple1), obj#5480: scala.Tuple1                                  
          +- *(1) Project [value#5473 AS opt#5476]                                                                         
             +- LocalTableScan [value#5473]           
    ```
    
    So even we add alias before `groupByKey`, it can't change dataset's output names.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make it wor...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22944
  
    **[Test build #98558 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/98558/testReport)** for PR 22944 at commit [`ce761fc`](https://github.com/apache/spark/commit/ce761fc27f56c6d716f5b5125b2e308ec9a410c2).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22944: [SPARK-25942][SQL] Aggregate expressions shouldn'...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22944#discussion_r232573269
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala ---
    @@ -1556,6 +1556,20 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
           df.where($"city".contains(new java.lang.Character('A'))),
           Seq(Row("Amsterdam")))
       }
    +
    +  test("SPARK-25942: typed aggregation on primitive type") {
    +    val ds = Seq(1, 2, 3).toDS()
    +
    +    val agg = ds.groupByKey(_ >= 2)
    +      .agg(sum("value").as[Long], sum($"value" + 1).as[Long])
    --- End diff --
    
    Btw, the definition of `agg` functions in `KeyValueGroupedDataset` look like:
    
    ```scala
    def agg[U1, U2](col1: TypedColumn[V, U1], col2: TypedColumn[V, U2]): Dataset[(K, U1, U2)]
    ```
    
    So the inputs to `agg` are `TypedColumn` with `V` as input type.



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22944: [SPARK-25942][SQL] Aggregate expressions shouldn't be re...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22944
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/4985/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22944: [SPARK-25942][SQL] Aggregate expressions shouldn't be re...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22944
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/4964/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make it wor...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22944
  
    **[Test build #98556 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/98556/testReport)** for PR 22944 at commit [`a8dbd9f`](https://github.com/apache/spark/commit/a8dbd9fcd3bd8c74ca18c76ea240d27ea36c3655).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22944: [SPARK-25942][SQL] Aggregate expressions shouldn't be re...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22944
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/4973/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22944#discussion_r231201502
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala ---
    @@ -262,25 +262,39 @@ object AppendColumns {
       def apply[T : Encoder, U : Encoder](
           func: T => U,
           child: LogicalPlan): AppendColumns = {
    +    val outputEncoder = encoderFor[U]
    +    val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
    +      assert(outputEncoder.namedExpressions.length == 1)
    +      outputEncoder.namedExpressions.map(Alias(_, "key")())
    +    } else {
    +      outputEncoder.namedExpressions
    +    }
         new AppendColumns(
           func.asInstanceOf[Any => Any],
           implicitly[Encoder[T]].clsTag.runtimeClass,
           implicitly[Encoder[T]].schema,
           UnresolvedDeserializer(encoderFor[T].deserializer),
    -      encoderFor[U].namedExpressions,
    +      namedExpressions,
           child)
       }
     
       def apply[T : Encoder, U : Encoder](
           func: T => U,
           inputAttributes: Seq[Attribute],
           child: LogicalPlan): AppendColumns = {
    +    val outputEncoder = encoderFor[U]
    +    val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
    +      assert(outputEncoder.namedExpressions.length == 1)
    +      outputEncoder.namedExpressions.map(Alias(_, "key")())
    +    } else {
    +      outputEncoder.namedExpressions
    --- End diff --
    
    I mean, maybe we should just leave this problem. I'm not sure how hacky it is to detect the `AppendColumns` in this case. Maybe we can have more confidence if you have a PR ready.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22944: [SPARK-25942][SQL] Aggregate expressions shouldn't be re...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22944
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22944#discussion_r231159305
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala ---
    @@ -262,25 +262,39 @@ object AppendColumns {
       def apply[T : Encoder, U : Encoder](
           func: T => U,
           child: LogicalPlan): AppendColumns = {
    +    val outputEncoder = encoderFor[U]
    +    val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
    +      assert(outputEncoder.namedExpressions.length == 1)
    +      outputEncoder.namedExpressions.map(Alias(_, "key")())
    +    } else {
    +      outputEncoder.namedExpressions
    +    }
         new AppendColumns(
           func.asInstanceOf[Any => Any],
           implicitly[Encoder[T]].clsTag.runtimeClass,
           implicitly[Encoder[T]].schema,
           UnresolvedDeserializer(encoderFor[T].deserializer),
    -      encoderFor[U].namedExpressions,
    +      namedExpressions,
           child)
       }
     
       def apply[T : Encoder, U : Encoder](
           func: T => U,
           inputAttributes: Seq[Attribute],
           child: LogicalPlan): AppendColumns = {
    +    val outputEncoder = encoderFor[U]
    +    val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
    +      assert(outputEncoder.namedExpressions.length == 1)
    +      outputEncoder.namedExpressions.map(Alias(_, "key")())
    +    } else {
    +      outputEncoder.namedExpressions
    --- End diff --
    
    ah i see. Then maybe we should just leave it instead of hacking the `AttributeSeq.resolve`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22944#discussion_r231002129
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala ---
    @@ -262,25 +262,39 @@ object AppendColumns {
       def apply[T : Encoder, U : Encoder](
           func: T => U,
           child: LogicalPlan): AppendColumns = {
    +    val outputEncoder = encoderFor[U]
    +    val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
    +      assert(outputEncoder.namedExpressions.length == 1)
    +      outputEncoder.namedExpressions.map(Alias(_, "key")())
    +    } else {
    +      outputEncoder.namedExpressions
    +    }
         new AppendColumns(
           func.asInstanceOf[Any => Any],
           implicitly[Encoder[T]].clsTag.runtimeClass,
           implicitly[Encoder[T]].schema,
           UnresolvedDeserializer(encoderFor[T].deserializer),
    -      encoderFor[U].namedExpressions,
    +      namedExpressions,
           child)
       }
     
       def apply[T : Encoder, U : Encoder](
           func: T => U,
           inputAttributes: Seq[Attribute],
           child: LogicalPlan): AppendColumns = {
    +    val outputEncoder = encoderFor[U]
    +    val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
    +      assert(outputEncoder.namedExpressions.length == 1)
    +      outputEncoder.namedExpressions.map(Alias(_, "key")())
    +    } else {
    +      outputEncoder.namedExpressions
    --- End diff --
    
    For option of product, I think it is due to typed select. I will address it at #21732.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22944#discussion_r231038560
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala ---
    @@ -262,25 +262,39 @@ object AppendColumns {
       def apply[T : Encoder, U : Encoder](
           func: T => U,
           child: LogicalPlan): AppendColumns = {
    +    val outputEncoder = encoderFor[U]
    +    val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
    +      assert(outputEncoder.namedExpressions.length == 1)
    +      outputEncoder.namedExpressions.map(Alias(_, "key")())
    +    } else {
    +      outputEncoder.namedExpressions
    +    }
         new AppendColumns(
           func.asInstanceOf[Any => Any],
           implicitly[Encoder[T]].clsTag.runtimeClass,
           implicitly[Encoder[T]].schema,
           UnresolvedDeserializer(encoderFor[T].deserializer),
    -      encoderFor[U].namedExpressions,
    +      namedExpressions,
           child)
       }
     
       def apply[T : Encoder, U : Encoder](
           func: T => U,
           inputAttributes: Seq[Attribute],
           child: LogicalPlan): AppendColumns = {
    +    val outputEncoder = encoderFor[U]
    +    val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
    +      assert(outputEncoder.namedExpressions.length == 1)
    +      outputEncoder.namedExpressions.map(Alias(_, "key")())
    +    } else {
    +      outputEncoder.namedExpressions
    --- End diff --
    
    I tried to add check into `CheckAnalysis` to detect such `AppendColumns`. I found that we have many such use cases in `DatasetSuite`, e.g. `map and group by with class data`:
    
    ```scala
    val ds: Dataset[(ClassData, Long)] = Seq(ClassData("one", 1), ClassData("two", 2)).toDS()
      .map(c => ClassData(c.a, c.b + 1))
      .groupByKey(p => p).count()
    ```
    
    If users don't access original output like this patch shows, it won't cause problem. So I'm thinking if we disallow it at all, it is a behavior change. Should we only fail the `groupByKey` query accessing ambiguous field names? Or we should disallow at all if there is any conflicting name?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22944: [SPARK-25942][SQL] Aggregate expressions shouldn'...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/22944


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22944: [SPARK-25942][SQL] Aggregate expressions shouldn't be re...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/22944
  
    retest this please.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22944: [SPARK-25942][SQL] Aggregate expressions shouldn't be re...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22944
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/4983/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22944#discussion_r230986888
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala ---
    @@ -262,25 +262,39 @@ object AppendColumns {
       def apply[T : Encoder, U : Encoder](
           func: T => U,
           child: LogicalPlan): AppendColumns = {
    +    val outputEncoder = encoderFor[U]
    +    val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
    +      assert(outputEncoder.namedExpressions.length == 1)
    +      outputEncoder.namedExpressions.map(Alias(_, "key")())
    +    } else {
    +      outputEncoder.namedExpressions
    +    }
         new AppendColumns(
           func.asInstanceOf[Any => Any],
           implicitly[Encoder[T]].clsTag.runtimeClass,
           implicitly[Encoder[T]].schema,
           UnresolvedDeserializer(encoderFor[T].deserializer),
    -      encoderFor[U].namedExpressions,
    +      namedExpressions,
           child)
       }
     
       def apply[T : Encoder, U : Encoder](
           func: T => U,
           inputAttributes: Seq[Attribute],
           child: LogicalPlan): AppendColumns = {
    +    val outputEncoder = encoderFor[U]
    +    val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
    +      assert(outputEncoder.namedExpressions.length == 1)
    +      outputEncoder.namedExpressions.map(Alias(_, "key")())
    +    } else {
    +      outputEncoder.namedExpressions
    --- End diff --
    
    You mean we detect conflicting case, and show some error messages to ask users for that?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22944: [SPARK-25942][SQL] Aggregate expressions shouldn'...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22944#discussion_r232941317
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala ---
    @@ -1556,6 +1556,20 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
           df.where($"city".contains(new java.lang.Character('A'))),
           Seq(Row("Amsterdam")))
       }
    +
    +  test("SPARK-25942: typed aggregation on primitive type") {
    +    val ds = Seq(1, 2, 3).toDS()
    +
    +    val agg = ds.groupByKey(_ >= 2)
    +      .agg(sum("value").as[Long], sum($"value" + 1).as[Long])
    +    assert(agg.collect() === Seq((false, 1, 2), (true, 5, 7)))
    +  }
    +
    +  test("SPARK-25942: typed aggregation on product type") {
    +    val ds = Seq((1, 2), (2, 3), (3, 4)).toDS()
    +    val agg = ds.groupByKey(x => x).agg(sum("_1").as[Long], sum($"_2" + 1).as[Long])
    +    assert(agg.collect().sorted === Seq(((1, 2), 1, 3), ((2, 3), 2, 4), ((3, 4), 3, 5)))
    --- End diff --
    
    can you try `((1, 2), 1L, 3L)` instead of `((1, 2), 1, 3)`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22944: [SPARK-25942][SQL] Aggregate expressions shouldn't be re...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22944
  
    **[Test build #98761 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/98761/testReport)** for PR 22944 at commit [`71dff40`](https://github.com/apache/spark/commit/71dff408a3da828e628aa29f81e30cdcb822fd37).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22944#discussion_r230977212
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala ---
    @@ -262,25 +262,39 @@ object AppendColumns {
       def apply[T : Encoder, U : Encoder](
           func: T => U,
           child: LogicalPlan): AppendColumns = {
    +    val outputEncoder = encoderFor[U]
    +    val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
    +      assert(outputEncoder.namedExpressions.length == 1)
    +      outputEncoder.namedExpressions.map(Alias(_, "key")())
    +    } else {
    +      outputEncoder.namedExpressions
    +    }
         new AppendColumns(
           func.asInstanceOf[Any => Any],
           implicitly[Encoder[T]].clsTag.runtimeClass,
           implicitly[Encoder[T]].schema,
           UnresolvedDeserializer(encoderFor[T].deserializer),
    -      encoderFor[U].namedExpressions,
    +      namedExpressions,
           child)
       }
     
       def apply[T : Encoder, U : Encoder](
           func: T => U,
           inputAttributes: Seq[Attribute],
           child: LogicalPlan): AppendColumns = {
    +    val outputEncoder = encoderFor[U]
    +    val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
    +      assert(outputEncoder.namedExpressions.length == 1)
    +      outputEncoder.namedExpressions.map(Alias(_, "key")())
    +    } else {
    +      outputEncoder.namedExpressions
    --- End diff --
    
    so we may still fail if `T` and `U` are case classes and have conflict field names?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make it wor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22944
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22944: [SPARK-25942][SQL] Aggregate expressions shouldn't be re...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22944
  
    **[Test build #98774 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/98774/testReport)** for PR 22944 at commit [`f6017ea`](https://github.com/apache/spark/commit/f6017ea1b53d0d41d0967ac57343dd969ec1cf2f).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22944: [SPARK-25942][SQL] Aggregate expressions shouldn'...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22944#discussion_r232894304
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala ---
    @@ -1556,6 +1556,20 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
           df.where($"city".contains(new java.lang.Character('A'))),
           Seq(Row("Amsterdam")))
       }
    +
    +  test("SPARK-25942: typed aggregation on primitive type") {
    +    val ds = Seq(1, 2, 3).toDS()
    +
    +    val agg = ds.groupByKey(_ >= 2)
    +      .agg(sum("value").as[Long], sum($"value" + 1).as[Long])
    --- End diff --
    
    `TypedAggregateExpression.withInputInfo` needs the `UnresolvedDeserializer` which depends on input encoder and input attributes. In analyzer, we can't have such inputs.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22944#discussion_r230772018
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala ---
    @@ -1556,6 +1556,14 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
           df.where($"city".contains(new java.lang.Character('A'))),
           Seq(Row("Amsterdam")))
       }
    +
    +  test("SPARK-25942: typed aggregation on primitive data") {
    --- End diff --
    
    how was this bug introduced?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make it wor...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22944
  
    **[Test build #98558 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/98558/testReport)** for PR 22944 at commit [`ce761fc`](https://github.com/apache/spark/commit/ce761fc27f56c6d716f5b5125b2e308ec9a410c2).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make it wor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22944
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/4820/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make it wor...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22944
  
    **[Test build #98466 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/98466/testReport)** for PR 22944 at commit [`7564d98`](https://github.com/apache/spark/commit/7564d98a9b1242f1eea167152ed97371416c204d).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22944: [SPARK-25942][SQL] Aggregate expressions shouldn't be re...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22944
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/98752/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22944: [SPARK-25942][SQL] Aggregate expressions shouldn't be re...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22944
  
    **[Test build #98761 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/98761/testReport)** for PR 22944 at commit [`71dff40`](https://github.com/apache/spark/commit/71dff408a3da828e628aa29f81e30cdcb822fd37).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22944: [SPARK-25942][SQL] Aggregate expressions shouldn't be re...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22944
  
    **[Test build #98766 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/98766/testReport)** for PR 22944 at commit [`f6017ea`](https://github.com/apache/spark/commit/f6017ea1b53d0d41d0967ac57343dd969ec1cf2f).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make it wor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22944
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22944: [SPARK-25942][SQL] Aggregate expressions shouldn'...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22944#discussion_r232689555
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala ---
    @@ -1556,6 +1556,20 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
           df.where($"city".contains(new java.lang.Character('A'))),
           Seq(Row("Amsterdam")))
       }
    +
    +  test("SPARK-25942: typed aggregation on primitive type") {
    +    val ds = Seq(1, 2, 3).toDS()
    +
    +    val agg = ds.groupByKey(_ >= 2)
    +      .agg(sum("value").as[Long], sum($"value" + 1).as[Long])
    --- End diff --
    
    For `TypedAggregateExpression` input, `TypedColumn.withInputType` will create `UnresolvedDeserializer` to resolve on `dataAttributes`.
    
    But for non `TypedAggregateExpression`, `withInputType` actually doesn't do anything. So such non typed aggregate expressions, we still resolve expressions on `dataAttributes` + `groupingAttributes`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22944#discussion_r231129654
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala ---
    @@ -262,25 +262,39 @@ object AppendColumns {
       def apply[T : Encoder, U : Encoder](
           func: T => U,
           child: LogicalPlan): AppendColumns = {
    +    val outputEncoder = encoderFor[U]
    +    val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
    +      assert(outputEncoder.namedExpressions.length == 1)
    +      outputEncoder.namedExpressions.map(Alias(_, "key")())
    +    } else {
    +      outputEncoder.namedExpressions
    +    }
         new AppendColumns(
           func.asInstanceOf[Any => Any],
           implicitly[Encoder[T]].clsTag.runtimeClass,
           implicitly[Encoder[T]].schema,
           UnresolvedDeserializer(encoderFor[T].deserializer),
    -      encoderFor[U].namedExpressions,
    +      namedExpressions,
           child)
       }
     
       def apply[T : Encoder, U : Encoder](
           func: T => U,
           inputAttributes: Seq[Attribute],
           child: LogicalPlan): AppendColumns = {
    +    val outputEncoder = encoderFor[U]
    +    val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
    +      assert(outputEncoder.namedExpressions.length == 1)
    +      outputEncoder.namedExpressions.map(Alias(_, "key")())
    +    } else {
    +      outputEncoder.namedExpressions
    --- End diff --
    
    > Should we only fail the groupByKey query accessing ambiguous field names?
    
    Yes. When we have unresolved attributes, check if the child plan is `AppendColumns`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22944: [SPARK-25942][SQL] Aggregate expressions shouldn't be re...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22944
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/98774/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22944: [SPARK-25942][SQL] Aggregate expressions shouldn't be re...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/22944
  
    retest this please.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make it wor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22944
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/98469/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make it wor...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22944
  
    **[Test build #98466 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/98466/testReport)** for PR 22944 at commit [`7564d98`](https://github.com/apache/spark/commit/7564d98a9b1242f1eea167152ed97371416c204d).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make it wor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22944
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/98556/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make it wor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22944
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/4819/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22944#discussion_r231358749
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala ---
    @@ -262,25 +262,39 @@ object AppendColumns {
       def apply[T : Encoder, U : Encoder](
           func: T => U,
           child: LogicalPlan): AppendColumns = {
    +    val outputEncoder = encoderFor[U]
    +    val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
    +      assert(outputEncoder.namedExpressions.length == 1)
    +      outputEncoder.namedExpressions.map(Alias(_, "key")())
    +    } else {
    +      outputEncoder.namedExpressions
    +    }
         new AppendColumns(
           func.asInstanceOf[Any => Any],
           implicitly[Encoder[T]].clsTag.runtimeClass,
           implicitly[Encoder[T]].schema,
           UnresolvedDeserializer(encoderFor[T].deserializer),
    -      encoderFor[U].namedExpressions,
    +      namedExpressions,
           child)
       }
     
       def apply[T : Encoder, U : Encoder](
           func: T => U,
           inputAttributes: Seq[Attribute],
           child: LogicalPlan): AppendColumns = {
    +    val outputEncoder = encoderFor[U]
    +    val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
    +      assert(outputEncoder.namedExpressions.length == 1)
    +      outputEncoder.namedExpressions.map(Alias(_, "key")())
    +    } else {
    +      outputEncoder.namedExpressions
    --- End diff --
    
    I wouldn't special-case primitive type while this is a general problem.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make it wor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22944
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/98466/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22944#discussion_r231359624
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala ---
    @@ -262,25 +262,39 @@ object AppendColumns {
       def apply[T : Encoder, U : Encoder](
           func: T => U,
           child: LogicalPlan): AppendColumns = {
    +    val outputEncoder = encoderFor[U]
    +    val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
    +      assert(outputEncoder.namedExpressions.length == 1)
    +      outputEncoder.namedExpressions.map(Alias(_, "key")())
    +    } else {
    +      outputEncoder.namedExpressions
    +    }
         new AppendColumns(
           func.asInstanceOf[Any => Any],
           implicitly[Encoder[T]].clsTag.runtimeClass,
           implicitly[Encoder[T]].schema,
           UnresolvedDeserializer(encoderFor[T].deserializer),
    -      encoderFor[U].namedExpressions,
    +      namedExpressions,
           child)
       }
     
       def apply[T : Encoder, U : Encoder](
           func: T => U,
           inputAttributes: Seq[Attribute],
           child: LogicalPlan): AppendColumns = {
    +    val outputEncoder = encoderFor[U]
    +    val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
    +      assert(outputEncoder.namedExpressions.length == 1)
    +      outputEncoder.namedExpressions.map(Alias(_, "key")())
    +    } else {
    +      outputEncoder.namedExpressions
    --- End diff --
    
    Ok. I will try to make a PR and see if we can have better fix for this. Thanks for suggestion.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make it wor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22944
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22944: [SPARK-25942][SQL] Aggregate expressions shouldn't be re...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22944
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org