You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "hvanhovell (via GitHub)" <gi...@apache.org> on 2023/08/08 00:58:58 UTC

[GitHub] [spark] hvanhovell opened a new pull request, #42384: [SPARK-44710][CONNECT] Add Dataset.dropDuplicatesWithinWatermark to Spark Connect Scala Client

hvanhovell opened a new pull request, #42384:
URL: https://github.com/apache/spark/pull/42384

   ### What changes were proposed in this pull request?
   This PR adds `Dataset.dropDuplicatesWithinWatermark` to the Spark Connect Scala Client.
   
   ### Why are the changes needed?
   Increase compatibility with the current sql/core APIs.
   
   ### Does this PR introduce _any_ user-facing change?
   Yes. It adds a new method to the scala client.
   
   ### How was this patch tested?
   Added a new (rudimentary) test to `ClientStreamingQuerySuite`.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on pull request #42384: [SPARK-44710][CONNECT] Add Dataset.dropDuplicatesWithinWatermark to Spark Connect Scala Client

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on PR #42384:
URL: https://github.com/apache/spark/pull/42384#issuecomment-1668759250

   cc @bogao007 PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] bogao007 commented on a diff in pull request #42384: [SPARK-44710][CONNECT] Add Dataset.dropDuplicatesWithinWatermark to Spark Connect Scala Client

Posted by "bogao007 (via GitHub)" <gi...@apache.org>.
bogao007 commented on code in PR #42384:
URL: https://github.com/apache/spark/pull/42384#discussion_r1286500284


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala:
##########
@@ -352,6 +352,26 @@ class ClientStreamingQuerySuite extends QueryTest with SQLHelper with Logging {
       q.stop()
     }
   }
+
+  test("deduplicate in batch DataFrame") {

Review Comment:
   Since this is a batch test, is it possible to move it to a batch test suite?



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -2443,16 +2449,14 @@ class Dataset[T] private[sql] (
    */
   @scala.annotation.varargs
   def dropDuplicates(col1: String, cols: String*): Dataset[T] = {
-    val colNames: Seq[String] = col1 +: cols
-    dropDuplicates(colNames)
+    dropDuplicates(col1 +: cols)
   }
 
-  def dropDuplicatesWithinWatermark(): Dataset[T] = {
-    dropDuplicatesWithinWatermark(this.columns)
-  }
+  def dropDuplicatesWithinWatermark(): Dataset[T] =
+    buildDropDuplicates(None, withWaterMark = true)
 
   def dropDuplicatesWithinWatermark(colNames: Seq[String]): Dataset[T] = {
-    throw new UnsupportedOperationException("dropDuplicatesWithinWatermark is not implemented.")
+    buildDropDuplicates(Option(colNames), withWaterMark = false)

Review Comment:
   should this be `withWaterMark = true`?



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -2399,18 +2399,27 @@ class Dataset[T] private[sql] (
         .addAllColumnNames(cols.asJava)
   }
 
+  private def buildDropDuplicates(
+      columns: Option[Seq[String]],
+      withWaterMark: Boolean): Dataset[T] = sparkSession.newDataset(encoder) { builder =>
+    val dropBuilder = builder.getDeduplicateBuilder
+      .setInput(plan.getRoot)
+      .setWithinWatermark(true)

Review Comment:
   We need to use `withWaterMark` here instead of `true` right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell closed pull request #42384: [SPARK-44710][CONNECT] Add Dataset.dropDuplicatesWithinWatermark to Spark Connect Scala Client

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell closed pull request #42384: [SPARK-44710][CONNECT] Add Dataset.dropDuplicatesWithinWatermark to Spark Connect Scala Client
URL: https://github.com/apache/spark/pull/42384


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org