You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "zhenlineo (via GitHub)" <gi...@apache.org> on 2023/04/01 00:08:34 UTC

[GitHub] [spark] zhenlineo opened a new pull request, #40628: [SPARK-42999][Connect] Dataset#foreach, foreachPartition

zhenlineo opened a new pull request, #40628:
URL: https://github.com/apache/spark/pull/40628

   ### What changes were proposed in this pull request?
   Implements missing methods in Dataset: foreach and foreachPartition.
   PR based on top of https://github.com/apache/spark/pull/40581
   The impl of foreachPartition is based on top of mapPartitions + count.
   
   ### Why are the changes needed?
   Add missing methods in Dataset.
   
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   ### How was this patch tested?
   E2E tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xinrong-meng commented on pull request #40628: [SPARK-42999][Connect] Dataset#foreach, foreachPartition

Posted by "xinrong-meng (via GitHub)" <gi...@apache.org>.
xinrong-meng commented on PR #40628:
URL: https://github.com/apache/spark/pull/40628#issuecomment-1499733331

   Thanks for working on that! We will enable `foreach` and `foreachPartition` in Python Client's DataFrame based on that.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #40628: [SPARK-42999][Connect] Dataset#foreach, foreachPartition

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #40628:
URL: https://github.com/apache/spark/pull/40628#issuecomment-1493484560

   cc @xinrong-meng 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40628: [SPARK-42999][Connect] Dataset#foreach, foreachPartition

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40628:
URL: https://github.com/apache/spark/pull/40628#discussion_r1156364194


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/UdfUtils.scala:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.api.java.function._
+
+/**
+ * Util functions to help convert input functions between typed filter, map, flatMap,
+ * mapPartitions etc. These functions cannot be defined inside the client Dataset class as it will
+ * cause Dataset sync conflicts when used together with UDFs. Thus we define them outside, in the
+ * client package.
+ */
+private[sql] object UdfUtils {
+
+  def mapFuncToMapPartitionsAdaptor[T, U](f: T => U): Iterator[T] => Iterator[U] = _.map(f(_))
+
+  def foreachFuncToForeachPartitionsAdaptor[T](f: T => Unit): Iterator[T] => Unit =
+    _.foreach(f(_))
+
+  def foreachPartitionFuncToMapPartitionsAdaptor[T](
+      f: Iterator[T] => Unit): Iterator[T] => Iterator[Boolean] = x => {
+    f(x)
+    // The return constructs a minimal return iterator for mapPartitions
+    Iterator(true)

Review Comment:
   Return an empty iterator? There is no point in sending anything here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on pull request #40628: [SPARK-42999][Connect] Dataset#foreach, foreachPartition

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on PR #40628:
URL: https://github.com/apache/spark/pull/40628#issuecomment-1499822466

   Merging, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40628: [SPARK-42999][Connect] Dataset#foreach, foreachPartition

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40628:
URL: https://github.com/apache/spark/pull/40628#discussion_r1156367020


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -2468,6 +2470,153 @@ class Dataset[T] private[sql] (
    */
   def transform[U](t: Dataset[T] => Dataset[U]): Dataset[U] = t(this)
 
+  /**
+   * (Scala-specific) Returns a new Dataset that only contains elements where `func` returns
+   * `true`.
+   *
+   * @group typedrel
+   * @since 3.5.0
+   */
+  def filter(func: T => Boolean): Dataset[T] = {
+    val udf = ScalarUserDefinedFunction(
+      function = func,
+      inputEncoders = encoder :: Nil,
+      outputEncoder = PrimitiveBooleanEncoder,
+      name = None,
+      nullable = false,
+      deterministic = true)
+    sparkSession.newDataset[T](encoder) { builder =>
+      builder.getFilterBuilder
+        .setInput(plan.getRoot)
+        .setCondition(udf.apply().expr)
+    }
+  }
+
+  /**
+   * (Java-specific) Returns a new Dataset that only contains elements where `func` returns
+   * `true`.
+   *
+   * @group typedrel
+   * @since 3.5.0
+   */
+  def filter(f: FilterFunction[T]): Dataset[T] = {
+    filter(UdfUtils.filterFuncToScalaFunc(f))
+  }
+
+  /**
+   * (Scala-specific) Returns a new Dataset that contains the result of applying `func` to each
+   * element.
+   *
+   * @group typedrel
+   * @since 3.5.0
+   */
+  def map[U: Encoder](f: T => U): Dataset[U] = {
+    mapPartitions(UdfUtils.mapFuncToMapPartitionsAdaptor(f))
+  }
+
+  /**
+   * (Java-specific) Returns a new Dataset that contains the result of applying `func` to each
+   * element.
+   *
+   * @group typedrel
+   * @since 3.5.0
+   */
+  def map[U](f: MapFunction[T, U], encoder: Encoder[U]): Dataset[U] = {
+    map(UdfUtils.mapFunctionToScalaFunc(f))(encoder)
+  }
+
+  /**
+   * (Scala-specific) Returns a new Dataset that contains the result of applying `func` to each
+   * partition.
+   *
+   * @group typedrel
+   * @since 3.5.0
+   */
+  def mapPartitions[U: Encoder](func: Iterator[T] => Iterator[U]): Dataset[U] = {
+    val outputEncoder = encoderFor[U]
+    val udf = ScalarUserDefinedFunction(
+      function = func,
+      inputEncoders = encoder :: Nil,
+      outputEncoder = outputEncoder)
+    sparkSession.newDataset(outputEncoder) { builder =>
+      builder.getMapPartitionsBuilder
+        .setInput(plan.getRoot)
+        .setFunc(udf.apply().expr.getCommonInlineUserDefinedFunction)
+    }
+  }
+
+  /**
+   * (Java-specific) Returns a new Dataset that contains the result of applying `f` to each
+   * partition.
+   *
+   * @group typedrel
+   * @since 3.5.0
+   */
+  def mapPartitions[U](f: MapPartitionsFunction[T, U], encoder: Encoder[U]): Dataset[U] = {
+    mapPartitions(UdfUtils.mapPartitionsFuncToScalaFunc(f))(encoder)
+  }
+
+  /**
+   * (Scala-specific) Returns a new Dataset by first applying a function to all elements of this
+   * Dataset, and then flattening the results.
+   *
+   * @group typedrel
+   * @since 3.5.0
+   */
+  def flatMap[U: Encoder](func: T => TraversableOnce[U]): Dataset[U] =
+    mapPartitions(UdfUtils.flatMapFuncToMapPartitionsAdaptor(func))
+
+  /**
+   * (Java-specific) Returns a new Dataset by first applying a function to all elements of this
+   * Dataset, and then flattening the results.
+   *
+   * @group typedrel
+   * @since 3.5.0
+   */
+  def flatMap[U](f: FlatMapFunction[T, U], encoder: Encoder[U]): Dataset[U] = {
+    flatMap(UdfUtils.flatMapFuncToScalaFunc(f))(encoder)
+  }
+
+  /**
+   * Applies a function `f` to all rows.
+   *
+   * @group action
+   * @since 3.5.0
+   */
+  def foreach(f: T => Unit): Unit = {
+    foreachPartition(UdfUtils.foreachFuncToForeachPartitionsAdaptor(f))
+  }
+
+  /**
+   * (Java-specific) Runs `func` on each element of this Dataset.
+   *
+   * @group action
+   * @since 3.5.0
+   */
+  def foreach(func: ForeachFunction[T]): Unit = foreach(UdfUtils.foreachFuncToScalaFunc(func))
+
+  /**
+   * Applies a function `f` to each partition of this Dataset.
+   *
+   * @group action
+   * @since 3.5.0
+   */
+  def foreachPartition(f: Iterator[T] => Unit): Unit = {
+    // Delegate to mapPartition followed by a count to drop any result.
+    mapPartitions(UdfUtils.foreachPartitionFuncToMapPartitionsAdaptor(f))(PrimitiveBooleanEncoder)

Review Comment:
   Can you use an empty `RowEncoder` here? That is a bit more expected.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40628: [SPARK-42999][Connect] Dataset#foreach, foreachPartition

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40628:
URL: https://github.com/apache/spark/pull/40628#discussion_r1172014785


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UserDefinedFunctionE2ETestSuite.scala:
##########
@@ -128,4 +130,72 @@ class UserDefinedFunctionE2ETestSuite extends RemoteSparkSession {
       .collect()
     assert(result.sorted.toSeq === Seq(23, 25, 25, 27))
   }
+
+  test("Dataset foreach") {

Review Comment:
   https://github.com/LuciferYang/spark/actions/runs/4749019479/jobs/8435837245
   https://github.com/LuciferYang/spark/actions/runs/4749127823/jobs/8436326385
   
   same issue
   
   ```
   [info] - Dataset foreachPartition - java *** FAILED *** (92 milliseconds)
   [info]   "INTERNAL: RST_STREAM closed stream. HTTP/2 error code: PROTOCOL_ERROR" did not contain "45 did not equal -1" (UserDefinedFunctionE2ETestSuite.scala:190)
   [info]   org.scalatest.exceptions.TestFailedException:
   [info]   at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
   [info]   at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
   [info]   at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
   [info]   at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
   [info]   at org.apache.spark.sql.UserDefinedFunctionE2ETestSuite.$anonfun$new$29(UserDefinedFunctionE2ETestSuite.scala:190)
   [info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
   [info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
   [info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
   [info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
   [info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
   [info]   at org.scalatest.TestSuite.withFixture(TestSuite.scala:196)
   [info]   at org.scalatest.TestSuite.withFixture$(TestSuite.scala:195)
   [info]   at org.scalatest.funsuite.AnyFunSuite.withFixture(AnyFunSuite.scala:1564)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
   [info]   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
   [info]   at org.scalatest.funsuite.AnyFunSuite.runTest(AnyFunSuite.scala:1564)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
   [info]   at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
   [info]   at scala.collection.immutable.List.foreach(List.scala:431)
   [info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
   [info]   at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
   [info]   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
   [info]   at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
   [info]   at org.scalatest.Suite.run(Suite.scala:1114)
   [info]   at org.scalatest.Suite.run$(Suite.scala:1096)
   [info]   at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
   [info]   at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
   [info]   at org.apache.spark.sql.UserDefinedFunctionE2ETestSuite.org$scalatest$BeforeAndAfterAll$$super$run(UserDefinedFunctionE2ETestSuite.scala:36)
   [info]   at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
   [info]   at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
   [info]   at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
   [info]   at org.apache.spark.sql.UserDefinedFunctionE2ETestSuite.run(UserDefinedFunctionE2ETestSuite.scala:36)
   [info]   at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
   [info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
   [info]   at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:413)
   [info]   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
   [info]   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   [info]   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   [info]   at java.lang.Thread.run(Thread.java:750)
   ```
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhenlineo commented on a diff in pull request #40628: [SPARK-42999][Connect] Dataset#foreach, foreachPartition

Posted by "zhenlineo (via GitHub)" <gi...@apache.org>.
zhenlineo commented on code in PR #40628:
URL: https://github.com/apache/spark/pull/40628#discussion_r1161813090


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UserDefinedFunctionE2ETestSuite.scala:
##########
@@ -128,4 +130,72 @@ class UserDefinedFunctionE2ETestSuite extends RemoteSparkSession {
       .collect()
     assert(result.sorted.toSeq === Seq(23, 25, 25, 27))
   }
+
+  test("Dataset foreach") {

Review Comment:
   Fixing now. Will "at" you on the followup PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #40628: [SPARK-42999][Connect] Dataset#foreach, foreachPartition

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #40628:
URL: https://github.com/apache/spark/pull/40628#discussion_r1171972625


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UserDefinedFunctionE2ETestSuite.scala:
##########
@@ -128,4 +130,72 @@ class UserDefinedFunctionE2ETestSuite extends RemoteSparkSession {
       .collect()
     assert(result.sorted.toSeq === Seq(23, 25, 25, 27))
   }
+
+  test("Dataset foreach") {

Review Comment:
   This actually seems pretty flaky.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40628: [SPARK-42999][Connect] Dataset#foreach, foreachPartition

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40628:
URL: https://github.com/apache/spark/pull/40628#discussion_r1156364194


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/UdfUtils.scala:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.api.java.function._
+
+/**
+ * Util functions to help convert input functions between typed filter, map, flatMap,
+ * mapPartitions etc. These functions cannot be defined inside the client Dataset class as it will
+ * cause Dataset sync conflicts when used together with UDFs. Thus we define them outside, in the
+ * client package.
+ */
+private[sql] object UdfUtils {
+
+  def mapFuncToMapPartitionsAdaptor[T, U](f: T => U): Iterator[T] => Iterator[U] = _.map(f(_))
+
+  def foreachFuncToForeachPartitionsAdaptor[T](f: T => Unit): Iterator[T] => Unit =
+    _.foreach(f(_))
+
+  def foreachPartitionFuncToMapPartitionsAdaptor[T](
+      f: Iterator[T] => Unit): Iterator[T] => Iterator[Boolean] = x => {
+    f(x)
+    // The return constructs a minimal return iterator for mapPartitions
+    Iterator(true)

Review Comment:
   Return an empty iterator? There is not point in sending anything here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #40628: [SPARK-42999][Connect] Dataset#foreach, foreachPartition

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #40628:
URL: https://github.com/apache/spark/pull/40628#discussion_r1171963721


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UserDefinedFunctionE2ETestSuite.scala:
##########
@@ -128,4 +130,72 @@ class UserDefinedFunctionE2ETestSuite extends RemoteSparkSession {
       .collect()
     assert(result.sorted.toSeq === Seq(23, 25, 25, 27))
   }
+
+  test("Dataset foreach") {

Review Comment:
   FYI, it seems flaky:
   
   ```
   [info] - Dataset foreach *** FAILED *** (133 milliseconds)
   [info]   "INTERNAL: RST_STREAM closed stream. HTTP/2 error code: PROTOCOL_ERROR" did not contain "Hello foreach" (UserDefinedFunctionE2ETestSuite.scala:141)
   [info]   org.scalatest.exceptions.TestFailedException:
   [info]   at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
   [info]   at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
   [info]   at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
   [info]   at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
   [info]   at org.apache.spark.sql.UserDefinedFunctionE2ETestSuite.$anonfun$new$19(UserDefinedFunctionE2ETestSuite.scala:141)
   [info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
   [info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
   [info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
   [info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
   [info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
   [info]   at org.scalatest.TestSuite.withFixture(TestSuite.scala:196)
   [info]   at org.scalatest.TestSuite.withFixture$(TestSuite.scala:195)
   [info]   at org.scalatest.funsuite.AnyFunSuite.withFixture(AnyFunSuite.scala:1564)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
   [info]   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
   [info]   at org.scalatest.funsuite.AnyFunSuite.runTest(AnyFunSuite.scala:1564)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
   [info]   at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
   [info]   at scala.collection.immutable.List.foreach(List.scala:431)
   [info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
   [info]   at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
   [info]   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
   [info]   at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
   [info]   at org.scalatest.Suite.run(Suite.scala:1114)
   [info]   at org.scalatest.Suite.run$(Suite.scala:1096)
   [info]   at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
   [info]   at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
   [info]   at org.apache.spark.sql.UserDefinedFunctionE2ETestSuite.org$scalatest$BeforeAndAfterAll$$super$run(UserDefinedFunctionE2ETestSuite.scala:36)
   [info]   at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
   [info]   at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
   [info]   at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
   [info]   at org.apache.spark.sql.UserDefinedFunctionE2ETestSuite.run(UserDefinedFunctionE2ETestSuite.scala:36)
   [info]   at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
   [info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
   [info]   at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:413)
   [info]   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
   [info]   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   [info]   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   [info]   at java.lang.Thread.run(Thread.java:750)
   [info] - Dataset foreach - java *** FAILED *** (123 milliseconds)
   [info]   "INTERNAL: RST_STREAM closed stream. HTTP/2 error code: PROTOCOL_ERROR" did not contain "Hello foreach" (UserDefinedFunctionE2ETestSuite.scala:154)
   [info]   org.scalatest.exceptions.TestFailedException:
   [info]   at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
   [info]   at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
   [info]   at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
   ```
   
   https://github.com/apache/spark/actions/runs/4748841722/jobs/8435481918



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhenlineo commented on a diff in pull request #40628: [SPARK-42999][Connect] Dataset#foreach, foreachPartition

Posted by "zhenlineo (via GitHub)" <gi...@apache.org>.
zhenlineo commented on code in PR #40628:
URL: https://github.com/apache/spark/pull/40628#discussion_r1174040639


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UserDefinedFunctionE2ETestSuite.scala:
##########
@@ -128,4 +130,72 @@ class UserDefinedFunctionE2ETestSuite extends RemoteSparkSession {
       .collect()
     assert(result.sorted.toSeq === Seq(23, 25, 25, 27))
   }
+
+  test("Dataset foreach") {

Review Comment:
   With the latest master, I've been running the test locally for 100 times, and cannot repro this error. Do we have some way to see how flaky is this test? We can certainly mute these tests if it continue to be flaky. My local run indicate it is not flaky.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40628: [SPARK-42999][Connect] Dataset#foreach, foreachPartition

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40628:
URL: https://github.com/apache/spark/pull/40628#discussion_r1174270420


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UserDefinedFunctionE2ETestSuite.scala:
##########
@@ -128,4 +130,72 @@ class UserDefinedFunctionE2ETestSuite extends RemoteSparkSession {
       .collect()
     assert(result.sorted.toSeq === Seq(23, 25, 25, 27))
   }
+
+  test("Dataset foreach") {

Review Comment:
   @zhenlineo Sorry for forgetting to notify you,  after revert https://github.com/apache/spark/commit/09a43531d30346bb7c8d213822513dc35c70f82e everything is ok now
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell closed pull request #40628: [SPARK-42999][Connect] Dataset#foreach, foreachPartition

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell closed pull request #40628: [SPARK-42999][Connect] Dataset#foreach, foreachPartition
URL: https://github.com/apache/spark/pull/40628


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40628: [SPARK-42999][Connect] Dataset#foreach, foreachPartition

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40628:
URL: https://github.com/apache/spark/pull/40628#discussion_r1156366512


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -2468,6 +2470,153 @@ class Dataset[T] private[sql] (
    */
   def transform[U](t: Dataset[T] => Dataset[U]): Dataset[U] = t(this)
 
+  /**
+   * (Scala-specific) Returns a new Dataset that only contains elements where `func` returns
+   * `true`.
+   *
+   * @group typedrel
+   * @since 3.5.0
+   */
+  def filter(func: T => Boolean): Dataset[T] = {
+    val udf = ScalarUserDefinedFunction(
+      function = func,
+      inputEncoders = encoder :: Nil,
+      outputEncoder = PrimitiveBooleanEncoder,
+      name = None,
+      nullable = false,
+      deterministic = true)
+    sparkSession.newDataset[T](encoder) { builder =>
+      builder.getFilterBuilder
+        .setInput(plan.getRoot)
+        .setCondition(udf.apply().expr)
+    }
+  }
+
+  /**
+   * (Java-specific) Returns a new Dataset that only contains elements where `func` returns
+   * `true`.
+   *
+   * @group typedrel
+   * @since 3.5.0
+   */
+  def filter(f: FilterFunction[T]): Dataset[T] = {
+    filter(UdfUtils.filterFuncToScalaFunc(f))
+  }
+
+  /**
+   * (Scala-specific) Returns a new Dataset that contains the result of applying `func` to each
+   * element.
+   *
+   * @group typedrel
+   * @since 3.5.0
+   */
+  def map[U: Encoder](f: T => U): Dataset[U] = {
+    mapPartitions(UdfUtils.mapFuncToMapPartitionsAdaptor(f))
+  }
+
+  /**
+   * (Java-specific) Returns a new Dataset that contains the result of applying `func` to each
+   * element.
+   *
+   * @group typedrel
+   * @since 3.5.0
+   */
+  def map[U](f: MapFunction[T, U], encoder: Encoder[U]): Dataset[U] = {
+    map(UdfUtils.mapFunctionToScalaFunc(f))(encoder)
+  }
+
+  /**
+   * (Scala-specific) Returns a new Dataset that contains the result of applying `func` to each
+   * partition.
+   *
+   * @group typedrel
+   * @since 3.5.0
+   */
+  def mapPartitions[U: Encoder](func: Iterator[T] => Iterator[U]): Dataset[U] = {
+    val outputEncoder = encoderFor[U]
+    val udf = ScalarUserDefinedFunction(
+      function = func,
+      inputEncoders = encoder :: Nil,
+      outputEncoder = outputEncoder)
+    sparkSession.newDataset(outputEncoder) { builder =>
+      builder.getMapPartitionsBuilder
+        .setInput(plan.getRoot)
+        .setFunc(udf.apply().expr.getCommonInlineUserDefinedFunction)
+    }
+  }
+
+  /**
+   * (Java-specific) Returns a new Dataset that contains the result of applying `f` to each
+   * partition.
+   *
+   * @group typedrel
+   * @since 3.5.0
+   */
+  def mapPartitions[U](f: MapPartitionsFunction[T, U], encoder: Encoder[U]): Dataset[U] = {
+    mapPartitions(UdfUtils.mapPartitionsFuncToScalaFunc(f))(encoder)
+  }
+
+  /**
+   * (Scala-specific) Returns a new Dataset by first applying a function to all elements of this
+   * Dataset, and then flattening the results.
+   *
+   * @group typedrel
+   * @since 3.5.0
+   */
+  def flatMap[U: Encoder](func: T => TraversableOnce[U]): Dataset[U] =
+    mapPartitions(UdfUtils.flatMapFuncToMapPartitionsAdaptor(func))
+
+  /**
+   * (Java-specific) Returns a new Dataset by first applying a function to all elements of this
+   * Dataset, and then flattening the results.
+   *
+   * @group typedrel
+   * @since 3.5.0
+   */
+  def flatMap[U](f: FlatMapFunction[T, U], encoder: Encoder[U]): Dataset[U] = {
+    flatMap(UdfUtils.flatMapFuncToScalaFunc(f))(encoder)
+  }
+
+  /**
+   * Applies a function `f` to all rows.
+   *
+   * @group action
+   * @since 3.5.0
+   */
+  def foreach(f: T => Unit): Unit = {
+    foreachPartition(UdfUtils.foreachFuncToForeachPartitionsAdaptor(f))
+  }
+
+  /**
+   * (Java-specific) Runs `func` on each element of this Dataset.
+   *
+   * @group action
+   * @since 3.5.0
+   */
+  def foreach(func: ForeachFunction[T]): Unit = foreach(UdfUtils.foreachFuncToScalaFunc(func))
+
+  /**
+   * Applies a function `f` to each partition of this Dataset.
+   *
+   * @group action
+   * @since 3.5.0
+   */
+  def foreachPartition(f: Iterator[T] => Unit): Unit = {
+    // Delegate to mapPartition followed by a count to drop any result.
+    mapPartitions(UdfUtils.foreachPartitionFuncToMapPartitionsAdaptor(f))(PrimitiveBooleanEncoder)

Review Comment:
   There is no count here. However I don't think you need one. If you make the adaptor return an empty iterator, then no batches are produced on the executor, subsequently the stream handler will send back an empty batch with a schema. I think that is as efficient as we are going to get it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40628: [SPARK-42999][Connect] Dataset#foreach, foreachPartition

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40628:
URL: https://github.com/apache/spark/pull/40628#discussion_r1161399737


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UserDefinedFunctionE2ETestSuite.scala:
##########
@@ -128,4 +130,72 @@ class UserDefinedFunctionE2ETestSuite extends RemoteSparkSession {
       .collect()
     assert(result.sorted.toSeq === Seq(23, 25, 25, 27))
   }
+
+  test("Dataset foreach") {

Review Comment:
   How to test the new case with maven ? @zhenlineo 
   
   I run the following commands:
   
   ```
   build/mvn clean install -pl connector/connect/server -am -DskipTests
   build/mvn clean install -pl assembly -am -DskipTests
   build/mvn clean install -pl connector/connect/client
   ```
   
   there are 15 test failed in this one:
   
   ```
   - Dataset typed filter *** FAILED ***
     io.grpc.StatusRuntimeException: INTERNAL: org.apache.spark.sql.UserDefinedFunctionE2ETestSuite
     at io.grpc.Status.asRuntimeException(Status.java:535)
     at io.grpc.stub.ClientCalls$BlockingResponseStream.hasNext(ClientCalls.java:660)
     at org.apache.spark.sql.connect.client.SparkResult.org$apache$spark$sql$connect$client$SparkResult$$processResponses(SparkResult.scala:61)
     at org.apache.spark.sql.connect.client.SparkResult.length(SparkResult.scala:106)
     at org.apache.spark.sql.connect.client.SparkResult.toArray(SparkResult.scala:123)
     at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2687)
     at org.apache.spark.sql.Dataset.withResult(Dataset.scala:3088)
     at org.apache.spark.sql.Dataset.collect(Dataset.scala:2686)
     at org.apache.spark.sql.Dataset.collectAsList(Dataset.scala:2700)
     at org.apache.spark.sql.UserDefinedFunctionE2ETestSuite.$anonfun$new$1(UserDefinedFunctionE2ETestSuite.scala:38)
     ...
   
   *** 15 TESTS FAILED ***
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org