You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2023/06/27 02:54:34 UTC

[spark] branch master updated: [SPARK-43136][CONNECT][FOLLOWUP] Adding tests for KeyAs

This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a9199c3d815 [SPARK-43136][CONNECT][FOLLOWUP] Adding tests for KeyAs
a9199c3d815 is described below

commit a9199c3d815b0f63ba2ddbad2532e6d12180aafe
Author: Zhen Li <zh...@users.noreply.github.com>
AuthorDate: Mon Jun 26 22:54:23 2023 -0400

    [SPARK-43136][CONNECT][FOLLOWUP] Adding tests for KeyAs
    
    ### What changes were proposed in this pull request?
    The current impl of KeyAs for the Scala client is a purely client side encoder operation. Thus we could end up with duplicates in keys.
    
    Added the tests both for the Scala Client and for Spark dataset API. It showed the behavior is the same for server and client at this moment.
    
    ### Why are the changes needed?
    More tests to verify the client and server behavior.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Tests
    
    Closes #40980 from zhenlineo/keys.
    
    Authored-by: Zhen Li <zh...@users.noreply.github.com>
    Signed-off-by: Herman van Hovell <he...@databricks.com>
---
 .../sql/KeyValueGroupedDatasetE2ETestSuite.scala   | 29 ++++++++++++++++++++++
 .../scala/org/apache/spark/sql/DatasetSuite.scala  | 24 ++++++++++++++++++
 2 files changed, 53 insertions(+)

diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/KeyValueGroupedDatasetE2ETestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/KeyValueGroupedDatasetE2ETestSuite.scala
index 404239f7e14..173867060bd 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/KeyValueGroupedDatasetE2ETestSuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/KeyValueGroupedDatasetE2ETestSuite.scala
@@ -79,6 +79,32 @@ class KeyValueGroupedDatasetE2ETestSuite extends QueryTest with SQLHelper {
     assert(values == Arrays.asList[Double](0, 1))
   }
 
+  test("groupByKey, keyAs - duplicates") {
+    val session: SparkSession = spark
+    import session.implicits._
+    val result = spark
+      .range(10)
+      .as[Long]
+      .groupByKey(id => K2(id % 2, id % 4))
+      .keyAs[K1]
+      .flatMapGroups((_, it) => Seq(it.toSeq.size))
+      .collect()
+    assert(result.sorted === Seq(2, 2, 3, 3))
+  }
+
+  test("groupByKey, keyAs, keys - duplicates") {
+    val session: SparkSession = spark
+    import session.implicits._
+    val result = spark
+      .range(10)
+      .as[Long]
+      .groupByKey(id => K2(id % 2, id % 4))
+      .keyAs[K1]
+      .keys
+      .collect()
+    assert(result.sortBy(_.a) === Seq(K1(0), K1(0), K1(1), K1(1)))
+  }
+
   test("keyAs - flatGroupMap") {
     val values = spark
       .range(10)
@@ -555,3 +581,6 @@ class KeyValueGroupedDatasetE2ETestSuite extends QueryTest with SQLHelper {
     checkDataset(values, ClickState("a", 5), ClickState("b", 3), ClickState("c", 1))
   }
 }
+
+case class K1(a: Long)
+case class K2(a: Long, b: Long)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 167aea79209..0766dd2e772 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -668,6 +668,27 @@ class DatasetSuite extends QueryTest
     )
   }
 
+  test("groupByKey, keyAs - duplicates") {
+    val ds = spark
+      .range(10)
+      .as[Long]
+      .groupByKey(id => K2(id % 2, id % 4))
+      .keyAs[K1]
+      .flatMapGroups((_, it) => Seq(it.toSeq.size))
+    checkDatasetUnorderly(ds, 3, 2, 3, 2)
+  }
+
+  test("groupByKey, keyAs, keys - duplicates") {
+    val result = spark
+      .range(10)
+      .as[Long]
+      .groupByKey(id => K2(id % 2, id % 4))
+      .keyAs[K1]
+      .keys
+      .collect()
+    assert(result.sortBy(_.a) === Seq(K1(0), K1(0), K1(1), K1(1)))
+  }
+
   test("groupBy function, mapValues, flatMap") {
     val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS()
     val keyValue = ds.groupByKey(_._1).mapValues(_._2)
@@ -2626,3 +2647,6 @@ case class SpecialCharClass(`field.1`: String, `field 2`: String)
 /** Used to test Java Enums from Scala code */
 case class SaveModeCase(mode: SaveMode)
 case class SaveModeArrayCase(modes: Array[SaveMode])
+
+case class K1(a: Long)
+case class K2(a: Long, b: Long)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org