You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2019/07/16 23:35:31 UTC
[spark] branch master updated: [SPARK-18299][SQL] Allow more
aggregations on KeyValueGroupedDataset
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 1134fae [SPARK-18299][SQL] Allow more aggregations on KeyValueGroupedDataset
1134fae is described below
commit 1134faecf4fe2cc1bf7c3670f5f8f2b9d0c6f2e7
Author: nooberfsh <no...@gmail.com>
AuthorDate: Tue Jul 16 16:35:04 2019 -0700
[SPARK-18299][SQL] Allow more aggregations on KeyValueGroupedDataset
## What changes were proposed in this pull request?
Add 4 additional agg to KeyValueGroupedDataset
## How was this patch tested?
New test in DatasetSuite for typed aggregation
Closes #24993 from nooberfsh/sqlagg.
Authored-by: nooberfsh <no...@gmail.com>
Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
.../apache/spark/sql/KeyValueGroupedDataset.scala | 65 ++++++++++++++++++++++
.../scala/org/apache/spark/sql/DatasetSuite.scala | 64 +++++++++++++++++++++
2 files changed, 129 insertions(+)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
index a3cbea9..0da52d4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
@@ -521,6 +521,71 @@ class KeyValueGroupedDataset[K, V] private[sql](
aggUntyped(col1, col2, col3, col4).asInstanceOf[Dataset[(K, U1, U2, U3, U4)]]
/**
+ * Computes the given aggregations, returning a [[Dataset]] of tuples for each unique key
+ * and the result of computing these aggregations over all elements in the group.
+ *
+ * @since 3.0.0
+ */
+ def agg[U1, U2, U3, U4, U5](
+ col1: TypedColumn[V, U1],
+ col2: TypedColumn[V, U2],
+ col3: TypedColumn[V, U3],
+ col4: TypedColumn[V, U4],
+ col5: TypedColumn[V, U5]): Dataset[(K, U1, U2, U3, U4, U5)] =
+ aggUntyped(col1, col2, col3, col4, col5).asInstanceOf[Dataset[(K, U1, U2, U3, U4, U5)]]
+
+ /**
+ * Computes the given aggregations, returning a [[Dataset]] of tuples for each unique key
+ * and the result of computing these aggregations over all elements in the group.
+ *
+ * @since 3.0.0
+ */
+ def agg[U1, U2, U3, U4, U5, U6](
+ col1: TypedColumn[V, U1],
+ col2: TypedColumn[V, U2],
+ col3: TypedColumn[V, U3],
+ col4: TypedColumn[V, U4],
+ col5: TypedColumn[V, U5],
+ col6: TypedColumn[V, U6]): Dataset[(K, U1, U2, U3, U4, U5, U6)] =
+ aggUntyped(col1, col2, col3, col4, col5, col6)
+ .asInstanceOf[Dataset[(K, U1, U2, U3, U4, U5, U6)]]
+
+ /**
+ * Computes the given aggregations, returning a [[Dataset]] of tuples for each unique key
+ * and the result of computing these aggregations over all elements in the group.
+ *
+ * @since 3.0.0
+ */
+ def agg[U1, U2, U3, U4, U5, U6, U7](
+ col1: TypedColumn[V, U1],
+ col2: TypedColumn[V, U2],
+ col3: TypedColumn[V, U3],
+ col4: TypedColumn[V, U4],
+ col5: TypedColumn[V, U5],
+ col6: TypedColumn[V, U6],
+ col7: TypedColumn[V, U7]): Dataset[(K, U1, U2, U3, U4, U5, U6, U7)] =
+ aggUntyped(col1, col2, col3, col4, col5, col6, col7)
+ .asInstanceOf[Dataset[(K, U1, U2, U3, U4, U5, U6, U7)]]
+
+ /**
+ * Computes the given aggregations, returning a [[Dataset]] of tuples for each unique key
+ * and the result of computing these aggregations over all elements in the group.
+ *
+ * @since 3.0.0
+ */
+ def agg[U1, U2, U3, U4, U5, U6, U7, U8](
+ col1: TypedColumn[V, U1],
+ col2: TypedColumn[V, U2],
+ col3: TypedColumn[V, U3],
+ col4: TypedColumn[V, U4],
+ col5: TypedColumn[V, U5],
+ col6: TypedColumn[V, U6],
+ col7: TypedColumn[V, U7],
+ col8: TypedColumn[V, U8]): Dataset[(K, U1, U2, U3, U4, U5, U6, U7, U8)] =
+ aggUntyped(col1, col2, col3, col4, col5, col6, col7, col8)
+ .asInstanceOf[Dataset[(K, U1, U2, U3, U4, U5, U6, U7, U8)]]
+
+ /**
* Returns a [[Dataset]] that contains a tuple with each key and the number of items present
* for that key.
*
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 4b08a4b..ff61431 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -603,6 +603,70 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
("a", 30L, 32L, 2L, 15.0), ("b", 3L, 5L, 2L, 1.5), ("c", 1L, 2L, 1L, 1.0))
}
+ test("typed aggregation: expr, expr, expr, expr, expr") {
+ val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS()
+
+ checkDatasetUnorderly(
+ ds.groupByKey(_._1).agg(
+ sum("_2").as[Long],
+ sum($"_2" + 1).as[Long],
+ count("*").as[Long],
+ avg("_2").as[Double],
+ countDistinct("*").as[Long]),
+ ("a", 30L, 32L, 2L, 15.0, 2L), ("b", 3L, 5L, 2L, 1.5, 2L), ("c", 1L, 2L, 1L, 1.0, 1L))
+ }
+
+ test("typed aggregation: expr, expr, expr, expr, expr, expr") {
+ val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS()
+
+ checkDatasetUnorderly(
+ ds.groupByKey(_._1).agg(
+ sum("_2").as[Long],
+ sum($"_2" + 1).as[Long],
+ count("*").as[Long],
+ avg("_2").as[Double],
+ countDistinct("*").as[Long],
+ max("_2").as[Long]),
+ ("a", 30L, 32L, 2L, 15.0, 2L, 20L),
+ ("b", 3L, 5L, 2L, 1.5, 2L, 2L),
+ ("c", 1L, 2L, 1L, 1.0, 1L, 1L))
+ }
+
+ test("typed aggregation: expr, expr, expr, expr, expr, expr, expr") {
+ val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS()
+
+ checkDatasetUnorderly(
+ ds.groupByKey(_._1).agg(
+ sum("_2").as[Long],
+ sum($"_2" + 1).as[Long],
+ count("*").as[Long],
+ avg("_2").as[Double],
+ countDistinct("*").as[Long],
+ max("_2").as[Long],
+ min("_2").as[Long]),
+ ("a", 30L, 32L, 2L, 15.0, 2L, 20L, 10L),
+ ("b", 3L, 5L, 2L, 1.5, 2L, 2L, 1L),
+ ("c", 1L, 2L, 1L, 1.0, 1L, 1L, 1L))
+ }
+
+ test("typed aggregation: expr, expr, expr, expr, expr, expr, expr, expr") {
+ val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS()
+
+ checkDatasetUnorderly(
+ ds.groupByKey(_._1).agg(
+ sum("_2").as[Long],
+ sum($"_2" + 1).as[Long],
+ count("*").as[Long],
+ avg("_2").as[Double],
+ countDistinct("*").as[Long],
+ max("_2").as[Long],
+ min("_2").as[Long],
+ mean("_2").as[Double]),
+ ("a", 30L, 32L, 2L, 15.0, 2L, 20L, 10L, 15.0),
+ ("b", 3L, 5L, 2L, 1.5, 2L, 2L, 1L, 1.5),
+ ("c", 1L, 2L, 1L, 1.0, 1L, 1L, 1L, 1.0))
+ }
+
test("cogroup") {
val ds1 = Seq(1 -> "a", 3 -> "abc", 5 -> "hello", 3 -> "foo").toDS()
val ds2 = Seq(2 -> "q", 3 -> "w", 5 -> "e", 5 -> "r").toDS()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org