You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2016/11/10 05:04:09 UTC
spark git commit: [SPARK-18147][SQL] do not fail for very complex
aggregator result type
Repository: spark
Updated Branches:
refs/heads/master 3f62e1b5d -> 6021c95a3
[SPARK-18147][SQL] do not fail for very complex aggregator result type
## What changes were proposed in this pull request?
~In `TypedAggregateExpression.evaluateExpression`, we may create `ReferenceToExpressions` with `CreateStruct`, and `CreateStruct` may generate too many codes and split them into several methods. `ReferenceToExpressions` will replace `BoundReference` in `CreateStruct` with `LambdaVariable`, which can only be used as local variables and doesn't work if we split the generated code.~
It's already fixed by #15693 , this pr adds regression test
## How was this patch tested?
new test in `DatasetAggregatorSuite`
Author: Wenchen Fan <we...@databricks.com>
Closes #15807 from cloud-fan/typed-agg.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6021c95a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6021c95a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6021c95a
Branch: refs/heads/master
Commit: 6021c95a3aa3858b0499782b23b08ef92c73245d
Parents: 3f62e1b
Author: Wenchen Fan <we...@databricks.com>
Authored: Thu Nov 10 13:03:59 2016 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Thu Nov 10 13:03:59 2016 +0800
----------------------------------------------------------------------
.../spark/sql/DatasetAggregatorSuite.scala | 21 ++++++++++++++++++++
1 file changed, 21 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/6021c95a/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala
index b117fbd..36b2651 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala
@@ -134,6 +134,19 @@ object NullResultAgg extends Aggregator[AggData, AggData, AggData] {
override def outputEncoder: Encoder[AggData] = Encoders.product[AggData]
}
+case class ComplexAggData(d1: AggData, d2: AggData)
+
+object VeryComplexResultAgg extends Aggregator[Row, String, ComplexAggData] {
+ override def zero: String = ""
+ override def reduce(buffer: String, input: Row): String = buffer + input.getString(1)
+ override def merge(b1: String, b2: String): String = b1 + b2
+ override def finish(reduction: String): ComplexAggData = {
+ ComplexAggData(AggData(reduction.length, reduction), AggData(reduction.length, reduction))
+ }
+ override def bufferEncoder: Encoder[String] = Encoders.STRING
+ override def outputEncoder: Encoder[ComplexAggData] = Encoders.product[ComplexAggData]
+}
+
class DatasetAggregatorSuite extends QueryTest with SharedSQLContext {
import testImplicits._
@@ -312,4 +325,12 @@ class DatasetAggregatorSuite extends QueryTest with SharedSQLContext {
val ds3 = sql("SELECT 'Some String' AS b, 1279869254 AS a").as[AggData]
assert(ds3.select(NameAgg.toColumn).schema.head.nullable === true)
}
+
+ test("SPARK-18147: very complex aggregator result type") {
+ val df = Seq(1 -> "a", 2 -> "b", 2 -> "c").toDF("i", "j")
+
+ checkAnswer(
+ df.groupBy($"i").agg(VeryComplexResultAgg.toColumn),
+ Row(1, Row(Row(1, "a"), Row(1, "a"))) :: Row(2, Row(Row(2, "bc"), Row(2, "bc"))) :: Nil)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org