You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "zhenlineo (via GitHub)" <gi...@apache.org> on 2023/03/29 01:16:00 UTC

[GitHub] [spark] zhenlineo opened a new pull request, #40581: [SPARK-42953][Connect] Typed map, flatMap, mapPartitions

zhenlineo opened a new pull request, #40581:
URL: https://github.com/apache/spark/pull/40581

   ### What changes were proposed in this pull request?
   Implemented new missing methods in the client Dataset API: map, flatMap, mapPartitions.
   
   ### Why are the changes needed?
   Missing APIs
   
   ### Does this PR introduce _any_ user-facing change?
   No.
   
   ### How was this patch tested?
   Integration tests.
   TODO: Need to fix the test for maven.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xinrong-meng commented on a diff in pull request #40581: [SPARK-42953][Connect] Typed filter, map, flatMap, mapPartitions

Posted by "xinrong-meng (via GitHub)" <gi...@apache.org>.
xinrong-meng commented on code in PR #40581:
URL: https://github.com/apache/spark/pull/40581#discussion_r1151338333


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -482,27 +482,63 @@ class SparkConnectPlanner(val session: SparkSession) {
   }
 
   private def transformMapPartitions(rel: proto.MapPartitions): LogicalPlan = {
+    val baseRel = transformRelation(rel.getInput)
     val commonUdf = rel.getFunc
-    val pythonUdf = transformPythonUDF(commonUdf)
-    val isBarrier = if (rel.hasIsBarrier) rel.getIsBarrier else false
-    pythonUdf.evalType match {
-      case PythonEvalType.SQL_MAP_PANDAS_ITER_UDF =>
-        logical.MapInPandas(
-          pythonUdf,
-          pythonUdf.dataType.asInstanceOf[StructType].toAttributes,
-          transformRelation(rel.getInput),
-          isBarrier)
-      case PythonEvalType.SQL_MAP_ARROW_ITER_UDF =>
-        logical.PythonMapInArrow(
-          pythonUdf,
-          pythonUdf.dataType.asInstanceOf[StructType].toAttributes,
-          transformRelation(rel.getInput),
-          isBarrier)
+    commonUdf.getFunctionCase match {
+      case proto.CommonInlineUserDefinedFunction.FunctionCase.SCALAR_SCALA_UDF =>
+        transformTypedMapPartitions(commonUdf, baseRel)
       case _ =>

Review Comment:
   nit: I am wondering if we shall use an explicit
   ```
   case proto.CommonInlineUserDefinedFunction.FunctionCase.PYTHON_UDF
   ```
   and add a `case _` to block other `function` field values.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40581: [SPARK-42953][Connect] Typed filter, map, flatMap, mapPartitions

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40581:
URL: https://github.com/apache/spark/pull/40581#discussion_r1151339162


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -482,27 +482,63 @@ class SparkConnectPlanner(val session: SparkSession) {
   }
 
   private def transformMapPartitions(rel: proto.MapPartitions): LogicalPlan = {
+    val baseRel = transformRelation(rel.getInput)
     val commonUdf = rel.getFunc
-    val pythonUdf = transformPythonUDF(commonUdf)
-    val isBarrier = if (rel.hasIsBarrier) rel.getIsBarrier else false
-    pythonUdf.evalType match {
-      case PythonEvalType.SQL_MAP_PANDAS_ITER_UDF =>
-        logical.MapInPandas(
-          pythonUdf,
-          pythonUdf.dataType.asInstanceOf[StructType].toAttributes,
-          transformRelation(rel.getInput),
-          isBarrier)
-      case PythonEvalType.SQL_MAP_ARROW_ITER_UDF =>
-        logical.PythonMapInArrow(
-          pythonUdf,
-          pythonUdf.dataType.asInstanceOf[StructType].toAttributes,
-          transformRelation(rel.getInput),
-          isBarrier)
+    commonUdf.getFunctionCase match {
+      case proto.CommonInlineUserDefinedFunction.FunctionCase.SCALAR_SCALA_UDF =>
+        transformTypedMapPartitions(commonUdf, baseRel)
       case _ =>

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on pull request #40581: [SPARK-42953][Connect] Typed filter, map, flatMap, mapPartitions

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on PR #40581:
URL: https://github.com/apache/spark/pull/40581#issuecomment-1487847175

   ```
   TODO: Need to fix the test for maven.
   ```
   
   Should we make this one 3.5.0 only? I think Maven test failure will become a blocker for 3.4.0 release
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhenlineo commented on a diff in pull request #40581: [SPARK-42953][Connect] Typed filter, map, flatMap, mapPartitions

Posted by "zhenlineo (via GitHub)" <gi...@apache.org>.
zhenlineo commented on code in PR #40581:
URL: https://github.com/apache/spark/pull/40581#discussion_r1156202358


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UserDefinedFunctionE2ETestSuite.scala:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+import java.lang.{Long => JLong}
+import java.util.{Iterator => JIterator}
+import java.util.Arrays
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.api.java.function._
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{PrimitiveIntEncoder, PrimitiveLongEncoder}
+import org.apache.spark.sql.connect.client.util.RemoteSparkSession
+import org.apache.spark.sql.functions.udf
+
+/**
+ * All tests in this class requires client UDF artifacts synced with the server. TODO: It means
+ * these tests only works with SBT for now.
+ */
+class UserDefinedFunctionE2ETestSuite extends RemoteSparkSession {
+  test("simple udf") {
+    def dummyUdf(x: Int): Int = x + 5
+    val myUdf = udf(dummyUdf _)

Review Comment:
   I moved the original udf test from E2E into this class, as all tests in this class cannot run with maven. So I would leave this to @vicennial to address this comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhenlineo commented on a diff in pull request #40581: [SPARK-42953][Connect] Typed map, flatMap, mapPartitions

Posted by "zhenlineo (via GitHub)" <gi...@apache.org>.
zhenlineo commented on code in PR #40581:
URL: https://github.com/apache/spark/pull/40581#discussion_r1151299712


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UserDefinedFunctionE2ETestSuite.scala:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+import java.lang.{Long => JLong}
+import java.util.{Iterator => JIterator}
+import java.util.Arrays
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.api.java.function._
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{PrimitiveIntEncoder, PrimitiveLongEncoder}
+import org.apache.spark.sql.connect.client.util.RemoteSparkSession
+import org.apache.spark.sql.functions.udf
+
+/**
+ * All tests in this class requires client UDF artifacts synced with the server. TODO: It means
+ * these tests only works with SBT for now.
+ */
+class UserDefinedFunctionE2ETestSuite extends RemoteSparkSession {
+  test("simple udf") {

Review Comment:
   @vicennial I need your help to fix this for maven.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40581: [SPARK-42953][Connect] Typed filter, map, flatMap, mapPartitions

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40581:
URL: https://github.com/apache/spark/pull/40581#discussion_r1151324209


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/util/IntegrationTestUtils.scala:
##########
@@ -43,7 +45,27 @@ object IntegrationTestUtils {
     }
     sys.props.getOrElse("spark.test.home", sys.env("SPARK_HOME"))
   }
-  private[connect] val isDebug = System.getProperty(DEBUG_SC_JVM_CLIENT, "false").toBoolean
+
+  private[connect] lazy val debugConfig: Seq[String] = {
+    val log4j2 = s"$sparkHome/connector/connect/client/jvm/src/test/resources/log4j2.properties"
+    if (isDebug) {
+      Seq(
+        // Enable to see the server plan change log
+        // "--conf",
+        // "spark.sql.planChangeLog.level=WARN",
+
+        // Enable to see the server grpc received
+        // "--conf",
+        // "spark.connect.grpc.interceptor.classes=" +
+        //  "org.apache.spark.sql.connect.service.LoggingInterceptor",
+
+        // Redirect server log into console
+        "--conf",
+        s"spark.driver.extraJavaOptions=-Dlog4j.configuration=$log4j2",
+        "--conf",

Review Comment:
   should be `log4j.configurationFile` for log4j2



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on a diff in pull request #40581: [SPARK-42953][Connect] Typed filter, map, flatMap, mapPartitions

Posted by "amaliujia (via GitHub)" <gi...@apache.org>.
amaliujia commented on code in PR #40581:
URL: https://github.com/apache/spark/pull/40581#discussion_r1152385742


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -482,27 +482,66 @@ class SparkConnectPlanner(val session: SparkSession) {
   }
 
   private def transformMapPartitions(rel: proto.MapPartitions): LogicalPlan = {
+    val baseRel = transformRelation(rel.getInput)
     val commonUdf = rel.getFunc
-    val pythonUdf = transformPythonUDF(commonUdf)
-    val isBarrier = if (rel.hasIsBarrier) rel.getIsBarrier else false
-    pythonUdf.evalType match {
-      case PythonEvalType.SQL_MAP_PANDAS_ITER_UDF =>
-        logical.MapInPandas(
-          pythonUdf,
-          pythonUdf.dataType.asInstanceOf[StructType].toAttributes,
-          transformRelation(rel.getInput),
-          isBarrier)
-      case PythonEvalType.SQL_MAP_ARROW_ITER_UDF =>
-        logical.PythonMapInArrow(
-          pythonUdf,
-          pythonUdf.dataType.asInstanceOf[StructType].toAttributes,
-          transformRelation(rel.getInput),
-          isBarrier)
+    commonUdf.getFunctionCase match {
+      case proto.CommonInlineUserDefinedFunction.FunctionCase.SCALAR_SCALA_UDF =>
+        transformTypedMapPartitions(commonUdf, baseRel)
+      case proto.CommonInlineUserDefinedFunction.FunctionCase.PYTHON_UDF =>
+        val pythonUdf = transformPythonUDF(commonUdf)
+        val isBarrier = if (rel.hasIsBarrier) rel.getIsBarrier else false
+        pythonUdf.evalType match {
+          case PythonEvalType.SQL_MAP_PANDAS_ITER_UDF =>
+            logical.MapInPandas(
+              pythonUdf,
+              pythonUdf.dataType.asInstanceOf[StructType].toAttributes,
+              baseRel,
+              isBarrier)
+          case PythonEvalType.SQL_MAP_ARROW_ITER_UDF =>
+            logical.PythonMapInArrow(
+              pythonUdf,
+              pythonUdf.dataType.asInstanceOf[StructType].toAttributes,
+              baseRel,
+              isBarrier)
+          case _ =>
+            throw InvalidPlanInput(
+              s"Function with EvalType: ${pythonUdf.evalType} is not supported")
+        }
       case _ =>
-        throw InvalidPlanInput(s"Function with EvalType: ${pythonUdf.evalType} is not supported")
+        throw InvalidPlanInput(
+          s"Function with ID: ${commonUdf.getFunctionCase.getNumber} is not supported")
     }
   }
 
+  private def generateObjAttr[T](enc: ExpressionEncoder[T]): Attribute = {
+    val dataType = enc.deserializer.dataType
+    val nullable = !enc.clsTag.runtimeClass.isPrimitive
+    AttributeReference("obj", dataType, nullable)()

Review Comment:
   How is `obj` defined as an attribute? This looks like a magic name?



##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/util/IntegrationTestUtils.scala:
##########
@@ -43,7 +45,25 @@ object IntegrationTestUtils {
     }
     sys.props.getOrElse("spark.test.home", sys.env("SPARK_HOME"))
   }
-  private[connect] val isDebug = System.getProperty(DEBUG_SC_JVM_CLIENT, "false").toBoolean
+
+  private[connect] lazy val debugConfig: Seq[String] = {
+    val log4j2 = s"$sparkHome/connector/connect/client/jvm/src/test/resources/log4j2.properties"

Review Comment:
   where is `$sparkHome` defined?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhenlineo commented on a diff in pull request #40581: [SPARK-42953][Connect] Typed filter, map, flatMap, mapPartitions

Posted by "zhenlineo (via GitHub)" <gi...@apache.org>.
zhenlineo commented on code in PR #40581:
URL: https://github.com/apache/spark/pull/40581#discussion_r1158884111


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UserDefinedFunctionE2ETestSuite.scala:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+import java.lang.{Long => JLong}
+import java.util.{Iterator => JIterator}
+import java.util.Arrays
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.api.java.function._
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{PrimitiveIntEncoder, PrimitiveLongEncoder}
+import org.apache.spark.sql.connect.client.util.RemoteSparkSession
+import org.apache.spark.sql.functions.{col, udf}
+
+/**
+ * All tests in this class requires client UDF artifacts synced with the server. TODO: It means
+ * these tests only works with SBT for now.
+ */
+class UserDefinedFunctionE2ETestSuite extends RemoteSparkSession {
+  test("Dataset typed filter") {
+    val rows = spark.range(10).filter(n => n % 2 == 0).collectAsList()
+    assert(rows == Arrays.asList[Long](0, 2, 4, 6, 8))
+  }
+
+  test("Dataset typed filter - java") {
+    val rows = spark
+      .range(10)
+      .filter(new FilterFunction[JLong] {
+        override def call(value: JLong): Boolean = value % 2 == 0
+      })
+      .collectAsList()
+    assert(rows == Arrays.asList[Long](0, 2, 4, 6, 8))
+  }
+
+  test("Dataset typed map") {
+    val rows = spark.range(10).map(n => n / 2)(PrimitiveLongEncoder).collectAsList()
+    assert(rows == Arrays.asList[Long](0, 0, 1, 1, 2, 2, 3, 3, 4, 4))
+  }
+
+  test("filter with condition") {
+    // This should go via `def filter(condition: Column)` rather than
+    // `def filter(func: T => Boolean)`
+    def func(i: Long): Boolean = i < 5
+    val under5 = udf(func _)
+    val longs = spark.range(10).filter(under5(col("id") * 2)).collectAsList()
+    assert(longs == Arrays.asList[Long](0, 1, 2))
+  }
+
+  test("filter with col(*)") {

Review Comment:
   Note this test will run the udf as typed filter instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhenlineo commented on a diff in pull request #40581: [SPARK-42953][Connect] Typed map, flatMap, mapPartitions

Posted by "zhenlineo (via GitHub)" <gi...@apache.org>.
zhenlineo commented on code in PR #40581:
URL: https://github.com/apache/spark/pull/40581#discussion_r1151299712


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UserDefinedFunctionE2ETestSuite.scala:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+import java.lang.{Long => JLong}
+import java.util.{Iterator => JIterator}
+import java.util.Arrays
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.api.java.function._
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{PrimitiveIntEncoder, PrimitiveLongEncoder}
+import org.apache.spark.sql.connect.client.util.RemoteSparkSession
+import org.apache.spark.sql.functions.udf
+
+/**
+ * All tests in this class requires client UDF artifacts synced with the server. TODO: It means
+ * these tests only works with SBT for now.
+ */
+class UserDefinedFunctionE2ETestSuite extends RemoteSparkSession {
+  test("simple udf") {

Review Comment:
   @vicennial I need your help to fix all tests in this suite for maven.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhenlineo commented on a diff in pull request #40581: [SPARK-42953][Connect] Typed filter, map, flatMap, mapPartitions

Posted by "zhenlineo (via GitHub)" <gi...@apache.org>.
zhenlineo commented on code in PR #40581:
URL: https://github.com/apache/spark/pull/40581#discussion_r1152469860


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/util/IntegrationTestUtils.scala:
##########
@@ -43,7 +45,25 @@ object IntegrationTestUtils {
     }
     sys.props.getOrElse("spark.test.home", sys.env("SPARK_HOME"))
   }
-  private[connect] val isDebug = System.getProperty(DEBUG_SC_JVM_CLIENT, "false").toBoolean
+
+  private[connect] lazy val debugConfig: Seq[String] = {
+    val log4j2 = s"$sparkHome/connector/connect/client/jvm/src/test/resources/log4j2.properties"

Review Comment:
   It is at line 42 in this file. We obtained it from env var SPARK_HOME.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on a diff in pull request #40581: [SPARK-42953][Connect] Typed filter, map, flatMap, mapPartitions

Posted by "amaliujia (via GitHub)" <gi...@apache.org>.
amaliujia commented on code in PR #40581:
URL: https://github.com/apache/spark/pull/40581#discussion_r1152606377


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -482,27 +482,66 @@ class SparkConnectPlanner(val session: SparkSession) {
   }
 
   private def transformMapPartitions(rel: proto.MapPartitions): LogicalPlan = {
+    val baseRel = transformRelation(rel.getInput)
     val commonUdf = rel.getFunc
-    val pythonUdf = transformPythonUDF(commonUdf)
-    val isBarrier = if (rel.hasIsBarrier) rel.getIsBarrier else false
-    pythonUdf.evalType match {
-      case PythonEvalType.SQL_MAP_PANDAS_ITER_UDF =>
-        logical.MapInPandas(
-          pythonUdf,
-          pythonUdf.dataType.asInstanceOf[StructType].toAttributes,
-          transformRelation(rel.getInput),
-          isBarrier)
-      case PythonEvalType.SQL_MAP_ARROW_ITER_UDF =>
-        logical.PythonMapInArrow(
-          pythonUdf,
-          pythonUdf.dataType.asInstanceOf[StructType].toAttributes,
-          transformRelation(rel.getInput),
-          isBarrier)
+    commonUdf.getFunctionCase match {
+      case proto.CommonInlineUserDefinedFunction.FunctionCase.SCALAR_SCALA_UDF =>
+        transformTypedMapPartitions(commonUdf, baseRel)
+      case proto.CommonInlineUserDefinedFunction.FunctionCase.PYTHON_UDF =>
+        val pythonUdf = transformPythonUDF(commonUdf)
+        val isBarrier = if (rel.hasIsBarrier) rel.getIsBarrier else false
+        pythonUdf.evalType match {
+          case PythonEvalType.SQL_MAP_PANDAS_ITER_UDF =>
+            logical.MapInPandas(
+              pythonUdf,
+              pythonUdf.dataType.asInstanceOf[StructType].toAttributes,
+              baseRel,
+              isBarrier)
+          case PythonEvalType.SQL_MAP_ARROW_ITER_UDF =>
+            logical.PythonMapInArrow(
+              pythonUdf,
+              pythonUdf.dataType.asInstanceOf[StructType].toAttributes,
+              baseRel,
+              isBarrier)
+          case _ =>
+            throw InvalidPlanInput(
+              s"Function with EvalType: ${pythonUdf.evalType} is not supported")
+        }
       case _ =>
-        throw InvalidPlanInput(s"Function with EvalType: ${pythonUdf.evalType} is not supported")
+        throw InvalidPlanInput(
+          s"Function with ID: ${commonUdf.getFunctionCase.getNumber} is not supported")
     }
   }
 
+  private def generateObjAttr[T](enc: ExpressionEncoder[T]): Attribute = {
+    val dataType = enc.deserializer.dataType
+    val nullable = !enc.clsTag.runtimeClass.isPrimitive
+    AttributeReference("obj", dataType, nullable)()

Review Comment:
   To confirm this won't cause duplicate attribute issue if there is another attribute come from child plan which is also named `obj`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhenlineo commented on a diff in pull request #40581: [SPARK-42953][Connect] Typed filter, map, flatMap, mapPartitions

Posted by "zhenlineo (via GitHub)" <gi...@apache.org>.
zhenlineo commented on code in PR #40581:
URL: https://github.com/apache/spark/pull/40581#discussion_r1152474793


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -482,27 +482,66 @@ class SparkConnectPlanner(val session: SparkSession) {
   }
 
   private def transformMapPartitions(rel: proto.MapPartitions): LogicalPlan = {
+    val baseRel = transformRelation(rel.getInput)
     val commonUdf = rel.getFunc
-    val pythonUdf = transformPythonUDF(commonUdf)
-    val isBarrier = if (rel.hasIsBarrier) rel.getIsBarrier else false
-    pythonUdf.evalType match {
-      case PythonEvalType.SQL_MAP_PANDAS_ITER_UDF =>
-        logical.MapInPandas(
-          pythonUdf,
-          pythonUdf.dataType.asInstanceOf[StructType].toAttributes,
-          transformRelation(rel.getInput),
-          isBarrier)
-      case PythonEvalType.SQL_MAP_ARROW_ITER_UDF =>
-        logical.PythonMapInArrow(
-          pythonUdf,
-          pythonUdf.dataType.asInstanceOf[StructType].toAttributes,
-          transformRelation(rel.getInput),
-          isBarrier)
+    commonUdf.getFunctionCase match {
+      case proto.CommonInlineUserDefinedFunction.FunctionCase.SCALAR_SCALA_UDF =>
+        transformTypedMapPartitions(commonUdf, baseRel)
+      case proto.CommonInlineUserDefinedFunction.FunctionCase.PYTHON_UDF =>
+        val pythonUdf = transformPythonUDF(commonUdf)
+        val isBarrier = if (rel.hasIsBarrier) rel.getIsBarrier else false
+        pythonUdf.evalType match {
+          case PythonEvalType.SQL_MAP_PANDAS_ITER_UDF =>
+            logical.MapInPandas(
+              pythonUdf,
+              pythonUdf.dataType.asInstanceOf[StructType].toAttributes,
+              baseRel,
+              isBarrier)
+          case PythonEvalType.SQL_MAP_ARROW_ITER_UDF =>
+            logical.PythonMapInArrow(
+              pythonUdf,
+              pythonUdf.dataType.asInstanceOf[StructType].toAttributes,
+              baseRel,
+              isBarrier)
+          case _ =>
+            throw InvalidPlanInput(
+              s"Function with EvalType: ${pythonUdf.evalType} is not supported")
+        }
       case _ =>
-        throw InvalidPlanInput(s"Function with EvalType: ${pythonUdf.evalType} is not supported")
+        throw InvalidPlanInput(
+          s"Function with ID: ${commonUdf.getFunctionCase.getNumber} is not supported")
     }
   }
 
+  private def generateObjAttr[T](enc: ExpressionEncoder[T]): Attribute = {
+    val dataType = enc.deserializer.dataType
+    val nullable = !enc.clsTag.runtimeClass.isPrimitive
+    AttributeReference("obj", dataType, nullable)()

Review Comment:
   This code logic is in objects.scala. I basically expanded the logic there to be able to accept the encoders directly. The original code used "obj". This is fine as it is only used in the middle of ser/deser to provide the datatype and nullability info.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhenlineo commented on a diff in pull request #40581: [SPARK-42953][Connect] Typed filter, map, flatMap, mapPartitions

Posted by "zhenlineo (via GitHub)" <gi...@apache.org>.
zhenlineo commented on code in PR #40581:
URL: https://github.com/apache/spark/pull/40581#discussion_r1156521094


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -846,7 +885,28 @@ class SparkConnectPlanner(val session: SparkSession) {
   private def transformFilter(rel: proto.Filter): LogicalPlan = {
     assert(rel.hasInput)
     val baseRel = transformRelation(rel.getInput)
-    logical.Filter(condition = transformExpression(rel.getCondition), child = baseRel)
+    val cond = rel.getCondition
+    cond.getExprTypeCase match {
+      case proto.Expression.ExprTypeCase.COMMON_INLINE_USER_DEFINED_FUNCTION

Review Comment:
   Looking at the proto, the minimal chance needed is:
   1. Always pass the Scala UDF inputs and return types and 
   2. Leave the CommonInlineUDF arguments as provided when available or not provided e.g. for typed filter function and map partitions function.
   
   I feel this approach a. dose not make any proto change. b. does not add any extra data. c. can tell the typed filter udf clearly from others.
   `col("*")` would do a similar result, but I would prefer avoid a magical marker if possible.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40581: [SPARK-42953][Connect] Typed filter, map, flatMap, mapPartitions

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40581:
URL: https://github.com/apache/spark/pull/40581#discussion_r1156174966


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UserDefinedFunctionE2ETestSuite.scala:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+import java.lang.{Long => JLong}
+import java.util.{Iterator => JIterator}
+import java.util.Arrays
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.api.java.function._
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{PrimitiveIntEncoder, PrimitiveLongEncoder}
+import org.apache.spark.sql.connect.client.util.RemoteSparkSession
+import org.apache.spark.sql.functions.udf
+
+/**
+ * All tests in this class requires client UDF artifacts synced with the server. TODO: It means
+ * these tests only works with SBT for now.
+ */
+class UserDefinedFunctionE2ETestSuite extends RemoteSparkSession {
+  test("simple udf") {
+    def dummyUdf(x: Int): Int = x + 5
+    val myUdf = udf(dummyUdf _)
+    val df = spark.range(5).select(myUdf(Column("id")))
+    val result = df.collect()
+    assert(result.length == 5)
+    result.zipWithIndex.foreach { case (v, idx) =>
+      assert(v.getInt(0) == idx + 5)
+    }
+  }
+
+  test("Dataset typed filter") {
+    val rows = spark.range(10).filter(n => n % 2 == 0).collectAsList()
+    assert(rows == Arrays.asList[Long](0, 2, 4, 6, 8))
+  }
+
+  test("Dataset typed filter - java") {
+    val rows = spark
+      .range(10)
+      .filter(new FilterFunction[JLong] {
+        override def call(value: JLong): Boolean = value % 2 == 0
+      })
+      .collectAsList()
+    assert(rows == Arrays.asList[Long](0, 2, 4, 6, 8))
+  }
+
+  test("Dataset typed map") {
+    val rows = spark.range(10).map(n => n / 2)(PrimitiveLongEncoder).collectAsList()

Review Comment:
   Can you import `spark.implicits` and see if we don't have to pass the encoder?



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -482,27 +482,66 @@ class SparkConnectPlanner(val session: SparkSession) {
   }
 
   private def transformMapPartitions(rel: proto.MapPartitions): LogicalPlan = {
+    val baseRel = transformRelation(rel.getInput)
     val commonUdf = rel.getFunc
-    val pythonUdf = transformPythonUDF(commonUdf)
-    val isBarrier = if (rel.hasIsBarrier) rel.getIsBarrier else false
-    pythonUdf.evalType match {
-      case PythonEvalType.SQL_MAP_PANDAS_ITER_UDF =>
-        logical.MapInPandas(
-          pythonUdf,
-          pythonUdf.dataType.asInstanceOf[StructType].toAttributes,
-          transformRelation(rel.getInput),
-          isBarrier)
-      case PythonEvalType.SQL_MAP_ARROW_ITER_UDF =>
-        logical.PythonMapInArrow(
-          pythonUdf,
-          pythonUdf.dataType.asInstanceOf[StructType].toAttributes,
-          transformRelation(rel.getInput),
-          isBarrier)
+    commonUdf.getFunctionCase match {
+      case proto.CommonInlineUserDefinedFunction.FunctionCase.SCALAR_SCALA_UDF =>
+        transformTypedMapPartitions(commonUdf, baseRel)
+      case proto.CommonInlineUserDefinedFunction.FunctionCase.PYTHON_UDF =>
+        val pythonUdf = transformPythonUDF(commonUdf)
+        val isBarrier = if (rel.hasIsBarrier) rel.getIsBarrier else false
+        pythonUdf.evalType match {
+          case PythonEvalType.SQL_MAP_PANDAS_ITER_UDF =>
+            logical.MapInPandas(
+              pythonUdf,
+              pythonUdf.dataType.asInstanceOf[StructType].toAttributes,
+              baseRel,
+              isBarrier)
+          case PythonEvalType.SQL_MAP_ARROW_ITER_UDF =>
+            logical.PythonMapInArrow(
+              pythonUdf,
+              pythonUdf.dataType.asInstanceOf[StructType].toAttributes,
+              baseRel,
+              isBarrier)
+          case _ =>
+            throw InvalidPlanInput(
+              s"Function with EvalType: ${pythonUdf.evalType} is not supported")
+        }
       case _ =>
-        throw InvalidPlanInput(s"Function with EvalType: ${pythonUdf.evalType} is not supported")
+        throw InvalidPlanInput(
+          s"Function with ID: ${commonUdf.getFunctionCase.getNumber} is not supported")
     }
   }
 
+  private def generateObjAttr[T](enc: ExpressionEncoder[T]): Attribute = {
+    val dataType = enc.deserializer.dataType
+    val nullable = !enc.clsTag.runtimeClass.isPrimitive
+    AttributeReference("obj", dataType, nullable)()
+  }
+
+  private def transformTypedMapPartitions(
+      fun: proto.CommonInlineUserDefinedFunction,
+      child: LogicalPlan): LogicalPlan = {
+    val udf = fun.getScalarScalaUdf
+    val udfPacket =
+      Utils.deserialize[UdfPacket](
+        udf.getPayload.toByteArray,
+        SparkConnectArtifactManager.classLoaderWithArtifacts)
+    assert(udfPacket.inputEncoders.size == 1)
+    implicit val iEnc = ExpressionEncoder(udfPacket.inputEncoders.head)
+    val rEnc = ExpressionEncoder(udfPacket.outputEncoder)
+
+    val deserializer = UnresolvedDeserializer(iEnc.deserializer)
+    val deserialized = DeserializeToObject(deserializer, generateObjAttr(iEnc), child)
+    val mapped = MapPartitions(
+      udfPacket.function.asInstanceOf[Iterator[Any] => Iterator[Any]],
+      generateObjAttr(rEnc),
+      deserialized)
+    val serialized = SerializeFromObject(rEnc.namedExpressions, mapped)
+
+    new Dataset(session, serialized, rEnc).logicalPlan

Review Comment:
   Why construct a dataframe here? You should be able to return `serialized`.



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -482,27 +482,66 @@ class SparkConnectPlanner(val session: SparkSession) {
   }
 
   private def transformMapPartitions(rel: proto.MapPartitions): LogicalPlan = {
+    val baseRel = transformRelation(rel.getInput)
     val commonUdf = rel.getFunc
-    val pythonUdf = transformPythonUDF(commonUdf)
-    val isBarrier = if (rel.hasIsBarrier) rel.getIsBarrier else false
-    pythonUdf.evalType match {
-      case PythonEvalType.SQL_MAP_PANDAS_ITER_UDF =>
-        logical.MapInPandas(
-          pythonUdf,
-          pythonUdf.dataType.asInstanceOf[StructType].toAttributes,
-          transformRelation(rel.getInput),
-          isBarrier)
-      case PythonEvalType.SQL_MAP_ARROW_ITER_UDF =>
-        logical.PythonMapInArrow(
-          pythonUdf,
-          pythonUdf.dataType.asInstanceOf[StructType].toAttributes,
-          transformRelation(rel.getInput),
-          isBarrier)
+    commonUdf.getFunctionCase match {
+      case proto.CommonInlineUserDefinedFunction.FunctionCase.SCALAR_SCALA_UDF =>
+        transformTypedMapPartitions(commonUdf, baseRel)
+      case proto.CommonInlineUserDefinedFunction.FunctionCase.PYTHON_UDF =>
+        val pythonUdf = transformPythonUDF(commonUdf)
+        val isBarrier = if (rel.hasIsBarrier) rel.getIsBarrier else false
+        pythonUdf.evalType match {
+          case PythonEvalType.SQL_MAP_PANDAS_ITER_UDF =>
+            logical.MapInPandas(
+              pythonUdf,
+              pythonUdf.dataType.asInstanceOf[StructType].toAttributes,
+              baseRel,
+              isBarrier)
+          case PythonEvalType.SQL_MAP_ARROW_ITER_UDF =>
+            logical.PythonMapInArrow(
+              pythonUdf,
+              pythonUdf.dataType.asInstanceOf[StructType].toAttributes,
+              baseRel,
+              isBarrier)
+          case _ =>
+            throw InvalidPlanInput(
+              s"Function with EvalType: ${pythonUdf.evalType} is not supported")
+        }
       case _ =>
-        throw InvalidPlanInput(s"Function with EvalType: ${pythonUdf.evalType} is not supported")
+        throw InvalidPlanInput(
+          s"Function with ID: ${commonUdf.getFunctionCase.getNumber} is not supported")
     }
   }
 
+  private def generateObjAttr[T](enc: ExpressionEncoder[T]): Attribute = {
+    val dataType = enc.deserializer.dataType
+    val nullable = !enc.clsTag.runtimeClass.isPrimitive
+    AttributeReference("obj", dataType, nullable)()
+  }
+
+  private def transformTypedMapPartitions(
+      fun: proto.CommonInlineUserDefinedFunction,
+      child: LogicalPlan): LogicalPlan = {
+    val udf = fun.getScalarScalaUdf
+    val udfPacket =

Review Comment:
   Shall we put this in a helper function? It is also needed for UDFs.



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -846,7 +885,28 @@ class SparkConnectPlanner(val session: SparkSession) {
   private def transformFilter(rel: proto.Filter): LogicalPlan = {
     assert(rel.hasInput)
     val baseRel = transformRelation(rel.getInput)
-    logical.Filter(condition = transformExpression(rel.getCondition), child = baseRel)
+    val cond = rel.getCondition
+    cond.getExprTypeCase match {
+      case proto.Expression.ExprTypeCase.COMMON_INLINE_USER_DEFINED_FUNCTION
+          if cond.getCommonInlineUserDefinedFunction.getFunctionCase ==
+            proto.CommonInlineUserDefinedFunction.FunctionCase.SCALAR_SCALA_UDF =>
+        transformTypedFilter(cond.getCommonInlineUserDefinedFunction, baseRel)
+      case _ =>
+        logical.Filter(condition = transformExpression(cond), child = baseRel)
+    }
+  }
+
+  private def transformTypedFilter(
+      fun: proto.CommonInlineUserDefinedFunction,
+      child: LogicalPlan): TypedFilter = {
+    val udf = fun.getScalarScalaUdf
+    val udfPacket =
+      Utils.deserialize[UdfPacket](
+        udf.getPayload.toByteArray,
+        SparkConnectArtifactManager.classLoaderWithArtifacts)
+    assert(udfPacket.inputEncoders.size == 1)
+    implicit val iEnc = ExpressionEncoder(udfPacket.inputEncoders.head)

Review Comment:
   You can also explicitly pass the encoder.



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -482,27 +482,66 @@ class SparkConnectPlanner(val session: SparkSession) {
   }
 
   private def transformMapPartitions(rel: proto.MapPartitions): LogicalPlan = {
+    val baseRel = transformRelation(rel.getInput)
     val commonUdf = rel.getFunc
-    val pythonUdf = transformPythonUDF(commonUdf)
-    val isBarrier = if (rel.hasIsBarrier) rel.getIsBarrier else false
-    pythonUdf.evalType match {
-      case PythonEvalType.SQL_MAP_PANDAS_ITER_UDF =>
-        logical.MapInPandas(
-          pythonUdf,
-          pythonUdf.dataType.asInstanceOf[StructType].toAttributes,
-          transformRelation(rel.getInput),
-          isBarrier)
-      case PythonEvalType.SQL_MAP_ARROW_ITER_UDF =>
-        logical.PythonMapInArrow(
-          pythonUdf,
-          pythonUdf.dataType.asInstanceOf[StructType].toAttributes,
-          transformRelation(rel.getInput),
-          isBarrier)
+    commonUdf.getFunctionCase match {
+      case proto.CommonInlineUserDefinedFunction.FunctionCase.SCALAR_SCALA_UDF =>
+        transformTypedMapPartitions(commonUdf, baseRel)
+      case proto.CommonInlineUserDefinedFunction.FunctionCase.PYTHON_UDF =>
+        val pythonUdf = transformPythonUDF(commonUdf)
+        val isBarrier = if (rel.hasIsBarrier) rel.getIsBarrier else false
+        pythonUdf.evalType match {
+          case PythonEvalType.SQL_MAP_PANDAS_ITER_UDF =>
+            logical.MapInPandas(
+              pythonUdf,
+              pythonUdf.dataType.asInstanceOf[StructType].toAttributes,
+              baseRel,
+              isBarrier)
+          case PythonEvalType.SQL_MAP_ARROW_ITER_UDF =>
+            logical.PythonMapInArrow(
+              pythonUdf,
+              pythonUdf.dataType.asInstanceOf[StructType].toAttributes,
+              baseRel,
+              isBarrier)
+          case _ =>
+            throw InvalidPlanInput(
+              s"Function with EvalType: ${pythonUdf.evalType} is not supported")
+        }
       case _ =>
-        throw InvalidPlanInput(s"Function with EvalType: ${pythonUdf.evalType} is not supported")
+        throw InvalidPlanInput(
+          s"Function with ID: ${commonUdf.getFunctionCase.getNumber} is not supported")
     }
   }
 
+  private def generateObjAttr[T](enc: ExpressionEncoder[T]): Attribute = {
+    val dataType = enc.deserializer.dataType
+    val nullable = !enc.clsTag.runtimeClass.isPrimitive
+    AttributeReference("obj", dataType, nullable)()
+  }
+
+  private def transformTypedMapPartitions(
+      fun: proto.CommonInlineUserDefinedFunction,
+      child: LogicalPlan): LogicalPlan = {
+    val udf = fun.getScalarScalaUdf
+    val udfPacket =
+      Utils.deserialize[UdfPacket](
+        udf.getPayload.toByteArray,
+        SparkConnectArtifactManager.classLoaderWithArtifacts)
+    assert(udfPacket.inputEncoders.size == 1)
+    implicit val iEnc = ExpressionEncoder(udfPacket.inputEncoders.head)

Review Comment:
   Why implicit?



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -846,7 +885,28 @@ class SparkConnectPlanner(val session: SparkSession) {
   private def transformFilter(rel: proto.Filter): LogicalPlan = {
     assert(rel.hasInput)
     val baseRel = transformRelation(rel.getInput)
-    logical.Filter(condition = transformExpression(rel.getCondition), child = baseRel)
+    val cond = rel.getCondition
+    cond.getExprTypeCase match {
+      case proto.Expression.ExprTypeCase.COMMON_INLINE_USER_DEFINED_FUNCTION

Review Comment:
   Does this mean the following would hit this code path:
   ```scala
   import spark.implicits._
   val under5 = udf(i: Long => i < 5)
   spark.range(10).filter(under5($"id")).collect()
   ```
   If it does then that would be wrong. We should check if it has an unresolved star as its input.



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -482,27 +482,66 @@ class SparkConnectPlanner(val session: SparkSession) {
   }
 
   private def transformMapPartitions(rel: proto.MapPartitions): LogicalPlan = {
+    val baseRel = transformRelation(rel.getInput)
     val commonUdf = rel.getFunc
-    val pythonUdf = transformPythonUDF(commonUdf)
-    val isBarrier = if (rel.hasIsBarrier) rel.getIsBarrier else false
-    pythonUdf.evalType match {
-      case PythonEvalType.SQL_MAP_PANDAS_ITER_UDF =>
-        logical.MapInPandas(
-          pythonUdf,
-          pythonUdf.dataType.asInstanceOf[StructType].toAttributes,
-          transformRelation(rel.getInput),
-          isBarrier)
-      case PythonEvalType.SQL_MAP_ARROW_ITER_UDF =>
-        logical.PythonMapInArrow(
-          pythonUdf,
-          pythonUdf.dataType.asInstanceOf[StructType].toAttributes,
-          transformRelation(rel.getInput),
-          isBarrier)
+    commonUdf.getFunctionCase match {
+      case proto.CommonInlineUserDefinedFunction.FunctionCase.SCALAR_SCALA_UDF =>
+        transformTypedMapPartitions(commonUdf, baseRel)
+      case proto.CommonInlineUserDefinedFunction.FunctionCase.PYTHON_UDF =>
+        val pythonUdf = transformPythonUDF(commonUdf)
+        val isBarrier = if (rel.hasIsBarrier) rel.getIsBarrier else false
+        pythonUdf.evalType match {
+          case PythonEvalType.SQL_MAP_PANDAS_ITER_UDF =>
+            logical.MapInPandas(
+              pythonUdf,
+              pythonUdf.dataType.asInstanceOf[StructType].toAttributes,
+              baseRel,
+              isBarrier)
+          case PythonEvalType.SQL_MAP_ARROW_ITER_UDF =>
+            logical.PythonMapInArrow(
+              pythonUdf,
+              pythonUdf.dataType.asInstanceOf[StructType].toAttributes,
+              baseRel,
+              isBarrier)
+          case _ =>
+            throw InvalidPlanInput(
+              s"Function with EvalType: ${pythonUdf.evalType} is not supported")
+        }
       case _ =>
-        throw InvalidPlanInput(s"Function with EvalType: ${pythonUdf.evalType} is not supported")
+        throw InvalidPlanInput(
+          s"Function with ID: ${commonUdf.getFunctionCase.getNumber} is not supported")
     }
   }
 
+  private def generateObjAttr[T](enc: ExpressionEncoder[T]): Attribute = {
+    val dataType = enc.deserializer.dataType
+    val nullable = !enc.clsTag.runtimeClass.isPrimitive
+    AttributeReference("obj", dataType, nullable)()

Review Comment:
   You can give it any name, an empty name also works. What matter is that the attribute has a stable ID (which is does).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on pull request #40581: [SPARK-42953][Connect] Typed filter, map, flatMap, mapPartitions

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on PR #40581:
URL: https://github.com/apache/spark/pull/40581#issuecomment-1499487211

   Merging this one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhenlineo commented on a diff in pull request #40581: [SPARK-42953][Connect] Typed filter, map, flatMap, mapPartitions

Posted by "zhenlineo (via GitHub)" <gi...@apache.org>.
zhenlineo commented on code in PR #40581:
URL: https://github.com/apache/spark/pull/40581#discussion_r1156523516


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -482,27 +482,66 @@ class SparkConnectPlanner(val session: SparkSession) {
   }
 
   private def transformMapPartitions(rel: proto.MapPartitions): LogicalPlan = {
+    val baseRel = transformRelation(rel.getInput)
     val commonUdf = rel.getFunc
-    val pythonUdf = transformPythonUDF(commonUdf)
-    val isBarrier = if (rel.hasIsBarrier) rel.getIsBarrier else false
-    pythonUdf.evalType match {
-      case PythonEvalType.SQL_MAP_PANDAS_ITER_UDF =>
-        logical.MapInPandas(
-          pythonUdf,
-          pythonUdf.dataType.asInstanceOf[StructType].toAttributes,
-          transformRelation(rel.getInput),
-          isBarrier)
-      case PythonEvalType.SQL_MAP_ARROW_ITER_UDF =>
-        logical.PythonMapInArrow(
-          pythonUdf,
-          pythonUdf.dataType.asInstanceOf[StructType].toAttributes,
-          transformRelation(rel.getInput),
-          isBarrier)
+    commonUdf.getFunctionCase match {
+      case proto.CommonInlineUserDefinedFunction.FunctionCase.SCALAR_SCALA_UDF =>
+        transformTypedMapPartitions(commonUdf, baseRel)
+      case proto.CommonInlineUserDefinedFunction.FunctionCase.PYTHON_UDF =>
+        val pythonUdf = transformPythonUDF(commonUdf)
+        val isBarrier = if (rel.hasIsBarrier) rel.getIsBarrier else false
+        pythonUdf.evalType match {
+          case PythonEvalType.SQL_MAP_PANDAS_ITER_UDF =>
+            logical.MapInPandas(
+              pythonUdf,
+              pythonUdf.dataType.asInstanceOf[StructType].toAttributes,
+              baseRel,
+              isBarrier)
+          case PythonEvalType.SQL_MAP_ARROW_ITER_UDF =>
+            logical.PythonMapInArrow(
+              pythonUdf,
+              pythonUdf.dataType.asInstanceOf[StructType].toAttributes,
+              baseRel,
+              isBarrier)
+          case _ =>
+            throw InvalidPlanInput(
+              s"Function with EvalType: ${pythonUdf.evalType} is not supported")
+        }
       case _ =>
-        throw InvalidPlanInput(s"Function with EvalType: ${pythonUdf.evalType} is not supported")
+        throw InvalidPlanInput(
+          s"Function with ID: ${commonUdf.getFunctionCase.getNumber} is not supported")
     }
   }
 
+  private def generateObjAttr[T](enc: ExpressionEncoder[T]): Attribute = {
+    val dataType = enc.deserializer.dataType
+    val nullable = !enc.clsTag.runtimeClass.isPrimitive
+    AttributeReference("obj", dataType, nullable)()
+  }
+
+  private def transformTypedMapPartitions(
+      fun: proto.CommonInlineUserDefinedFunction,
+      child: LogicalPlan): LogicalPlan = {
+    val udf = fun.getScalarScalaUdf
+    val udfPacket =

Review Comment:
   Now that I have the input and return types, can I use reflection to find the encoders for them rather than use the encoders obtained from udfPacket?
   
   I cannot easily test out using a `Person` class currently as I am missing the class sync logic. 
   
   But in theory, if the class is synced to the server correctly, I should be able to directly use reflection to find the Person encoders, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on a diff in pull request #40581: [SPARK-42953][Connect] Typed filter, map, flatMap, mapPartitions

Posted by "amaliujia (via GitHub)" <gi...@apache.org>.
amaliujia commented on code in PR #40581:
URL: https://github.com/apache/spark/pull/40581#discussion_r1152606377


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -482,27 +482,66 @@ class SparkConnectPlanner(val session: SparkSession) {
   }
 
   private def transformMapPartitions(rel: proto.MapPartitions): LogicalPlan = {
+    val baseRel = transformRelation(rel.getInput)
     val commonUdf = rel.getFunc
-    val pythonUdf = transformPythonUDF(commonUdf)
-    val isBarrier = if (rel.hasIsBarrier) rel.getIsBarrier else false
-    pythonUdf.evalType match {
-      case PythonEvalType.SQL_MAP_PANDAS_ITER_UDF =>
-        logical.MapInPandas(
-          pythonUdf,
-          pythonUdf.dataType.asInstanceOf[StructType].toAttributes,
-          transformRelation(rel.getInput),
-          isBarrier)
-      case PythonEvalType.SQL_MAP_ARROW_ITER_UDF =>
-        logical.PythonMapInArrow(
-          pythonUdf,
-          pythonUdf.dataType.asInstanceOf[StructType].toAttributes,
-          transformRelation(rel.getInput),
-          isBarrier)
+    commonUdf.getFunctionCase match {
+      case proto.CommonInlineUserDefinedFunction.FunctionCase.SCALAR_SCALA_UDF =>
+        transformTypedMapPartitions(commonUdf, baseRel)
+      case proto.CommonInlineUserDefinedFunction.FunctionCase.PYTHON_UDF =>
+        val pythonUdf = transformPythonUDF(commonUdf)
+        val isBarrier = if (rel.hasIsBarrier) rel.getIsBarrier else false
+        pythonUdf.evalType match {
+          case PythonEvalType.SQL_MAP_PANDAS_ITER_UDF =>
+            logical.MapInPandas(
+              pythonUdf,
+              pythonUdf.dataType.asInstanceOf[StructType].toAttributes,
+              baseRel,
+              isBarrier)
+          case PythonEvalType.SQL_MAP_ARROW_ITER_UDF =>
+            logical.PythonMapInArrow(
+              pythonUdf,
+              pythonUdf.dataType.asInstanceOf[StructType].toAttributes,
+              baseRel,
+              isBarrier)
+          case _ =>
+            throw InvalidPlanInput(
+              s"Function with EvalType: ${pythonUdf.evalType} is not supported")
+        }
       case _ =>
-        throw InvalidPlanInput(s"Function with EvalType: ${pythonUdf.evalType} is not supported")
+        throw InvalidPlanInput(
+          s"Function with ID: ${commonUdf.getFunctionCase.getNumber} is not supported")
     }
   }
 
+  private def generateObjAttr[T](enc: ExpressionEncoder[T]): Attribute = {
+    val dataType = enc.deserializer.dataType
+    val nullable = !enc.clsTag.runtimeClass.isPrimitive
+    AttributeReference("obj", dataType, nullable)()

Review Comment:
   To confirm this won't cause duplicate attribute issue if there is another attribute come from child plan?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40581: [SPARK-42953][Connect] Typed filter, map, flatMap, mapPartitions

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40581:
URL: https://github.com/apache/spark/pull/40581#discussion_r1156174063


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UserDefinedFunctionE2ETestSuite.scala:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+import java.lang.{Long => JLong}
+import java.util.{Iterator => JIterator}
+import java.util.Arrays
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.api.java.function._
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{PrimitiveIntEncoder, PrimitiveLongEncoder}
+import org.apache.spark.sql.connect.client.util.RemoteSparkSession
+import org.apache.spark.sql.functions.udf
+
+/**
+ * All tests in this class requires client UDF artifacts synced with the server. TODO: It means
+ * these tests only works with SBT for now.
+ */
+class UserDefinedFunctionE2ETestSuite extends RemoteSparkSession {
+  test("simple udf") {
+    def dummyUdf(x: Int): Int = x + 5
+    val myUdf = udf(dummyUdf _)

Review Comment:
   For the record this requires us to capture the context because we are using a function in the enclosing class. We definitely should test this, however the more canonical way of writing this would be:
   ```scala
   val myUdf = udf { x: int = >
     x + 5
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhenlineo commented on a diff in pull request #40581: [SPARK-42953][Connect] Typed filter, map, flatMap, mapPartitions

Posted by "zhenlineo (via GitHub)" <gi...@apache.org>.
zhenlineo commented on code in PR #40581:
URL: https://github.com/apache/spark/pull/40581#discussion_r1158887600


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -846,7 +885,28 @@ class SparkConnectPlanner(val session: SparkSession) {
   private def transformFilter(rel: proto.Filter): LogicalPlan = {
     assert(rel.hasInput)
     val baseRel = transformRelation(rel.getInput)
-    logical.Filter(condition = transformExpression(rel.getCondition), child = baseRel)
+    val cond = rel.getCondition
+    cond.getExprTypeCase match {
+      case proto.Expression.ExprTypeCase.COMMON_INLINE_USER_DEFINED_FUNCTION

Review Comment:
   Changed to Col(*)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40581: [SPARK-42953][Connect] Typed filter, map, flatMap, mapPartitions

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40581:
URL: https://github.com/apache/spark/pull/40581#discussion_r1151329434


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -2468,6 +2470,113 @@ class Dataset[T] private[sql] (
    */
   def transform[U](t: Dataset[T] => Dataset[U]): Dataset[U] = t(this)
 
+  /**
+   * (Scala-specific) Returns a new Dataset that only contains elements where `func` returns
+   * `true`.
+   *
+   * @group typedrel
+   * @since 3.4.0

Review Comment:
   I suggest changing to `@since 3.5.0`
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhenlineo commented on a diff in pull request #40581: [SPARK-42953][Connect] Typed filter, map, flatMap, mapPartitions

Posted by "zhenlineo (via GitHub)" <gi...@apache.org>.
zhenlineo commented on code in PR #40581:
URL: https://github.com/apache/spark/pull/40581#discussion_r1152274921


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/util/IntegrationTestUtils.scala:
##########
@@ -43,7 +45,27 @@ object IntegrationTestUtils {
     }
     sys.props.getOrElse("spark.test.home", sys.env("SPARK_HOME"))
   }
-  private[connect] val isDebug = System.getProperty(DEBUG_SC_JVM_CLIENT, "false").toBoolean
+
+  private[connect] lazy val debugConfig: Seq[String] = {
+    val log4j2 = s"$sparkHome/connector/connect/client/jvm/src/test/resources/log4j2.properties"
+    if (isDebug) {
+      Seq(
+        // Enable to see the server plan change log
+        // "--conf",
+        // "spark.sql.planChangeLog.level=WARN",
+
+        // Enable to see the server grpc received
+        // "--conf",
+        // "spark.connect.grpc.interceptor.classes=" +
+        //  "org.apache.spark.sql.connect.service.LoggingInterceptor",
+
+        // Redirect server log into console
+        "--conf",
+        s"spark.driver.extraJavaOptions=-Dlog4j.configuration=$log4j2",
+        "--conf",

Review Comment:
   I tried `s"spark.driver.extraJavaOptions=-Dlog4j.configurationFile=$log4j2"`, but the logs is not picked up? Do you know what is the correct way to use `log4j.configurationFile`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40581: [SPARK-42953][Connect] Typed filter, map, flatMap, mapPartitions

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40581:
URL: https://github.com/apache/spark/pull/40581#discussion_r1151324703


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/util/IntegrationTestUtils.scala:
##########
@@ -43,7 +45,27 @@ object IntegrationTestUtils {
     }
     sys.props.getOrElse("spark.test.home", sys.env("SPARK_HOME"))
   }
-  private[connect] val isDebug = System.getProperty(DEBUG_SC_JVM_CLIENT, "false").toBoolean
+
+  private[connect] lazy val debugConfig: Seq[String] = {
+    val log4j2 = s"$sparkHome/connector/connect/client/jvm/src/test/resources/log4j2.properties"
+    if (isDebug) {
+      Seq(
+        // Enable to see the server plan change log
+        // "--conf",
+        // "spark.sql.planChangeLog.level=WARN",
+
+        // Enable to see the server grpc received
+        // "--conf",
+        // "spark.connect.grpc.interceptor.classes=" +
+        //  "org.apache.spark.sql.connect.service.LoggingInterceptor",
+
+        // Redirect server log into console
+        "--conf",
+        s"spark.driver.extraJavaOptions=-Dlog4j.configuration=$log4j2",
+        "--conf",
+        s"spark.executor.extraJavaOptions=-Dlog4j.configuration=$log4j2")

Review Comment:
   `SimpleSparkConnectService` start with `local` module, really need to configure `spark.executor.extraJavaOptions`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40581: [SPARK-42953][Connect] Typed filter, map, flatMap, mapPartitions

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40581:
URL: https://github.com/apache/spark/pull/40581#discussion_r1156102601


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -2468,6 +2470,113 @@ class Dataset[T] private[sql] (
    */
   def transform[U](t: Dataset[T] => Dataset[U]): Dataset[U] = t(this)
 
+  /**
+   * (Scala-specific) Returns a new Dataset that only contains elements where `func` returns
+   * `true`.
+   *
+   * @group typedrel
+   * @since 3.5.0
+   */
+  def filter(func: T => Boolean): Dataset[T] = {
+    val udf = ScalarUserDefinedFunction(
+      function = func,
+      inputEncoders = encoder :: Nil,
+      outputEncoder = PrimitiveBooleanEncoder,
+      name = None,
+      nullable = false,
+      deterministic = true)
+    sparkSession.newDataset[T](encoder) { builder =>
+      builder.getFilterBuilder
+        .setInput(plan.getRoot)
+        .setCondition(udf.apply().expr)

Review Comment:
   This makes me a tad nervous. I think it is possible for users to provide a udf without arguments.  Can we use some marker expression here? UnresolvedStar would be fine I guess.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell closed pull request #40581: [SPARK-42953][Connect] Typed filter, map, flatMap, mapPartitions

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell closed pull request #40581: [SPARK-42953][Connect] Typed filter, map, flatMap, mapPartitions
URL: https://github.com/apache/spark/pull/40581


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40581: [SPARK-42953][Connect] Typed filter, map, flatMap, mapPartitions

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40581:
URL: https://github.com/apache/spark/pull/40581#discussion_r1158830161


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -482,27 +482,66 @@ class SparkConnectPlanner(val session: SparkSession) {
   }
 
   private def transformMapPartitions(rel: proto.MapPartitions): LogicalPlan = {
+    val baseRel = transformRelation(rel.getInput)
     val commonUdf = rel.getFunc
-    val pythonUdf = transformPythonUDF(commonUdf)
-    val isBarrier = if (rel.hasIsBarrier) rel.getIsBarrier else false
-    pythonUdf.evalType match {
-      case PythonEvalType.SQL_MAP_PANDAS_ITER_UDF =>
-        logical.MapInPandas(
-          pythonUdf,
-          pythonUdf.dataType.asInstanceOf[StructType].toAttributes,
-          transformRelation(rel.getInput),
-          isBarrier)
-      case PythonEvalType.SQL_MAP_ARROW_ITER_UDF =>
-        logical.PythonMapInArrow(
-          pythonUdf,
-          pythonUdf.dataType.asInstanceOf[StructType].toAttributes,
-          transformRelation(rel.getInput),
-          isBarrier)
+    commonUdf.getFunctionCase match {
+      case proto.CommonInlineUserDefinedFunction.FunctionCase.SCALAR_SCALA_UDF =>
+        transformTypedMapPartitions(commonUdf, baseRel)
+      case proto.CommonInlineUserDefinedFunction.FunctionCase.PYTHON_UDF =>
+        val pythonUdf = transformPythonUDF(commonUdf)
+        val isBarrier = if (rel.hasIsBarrier) rel.getIsBarrier else false
+        pythonUdf.evalType match {
+          case PythonEvalType.SQL_MAP_PANDAS_ITER_UDF =>
+            logical.MapInPandas(
+              pythonUdf,
+              pythonUdf.dataType.asInstanceOf[StructType].toAttributes,
+              baseRel,
+              isBarrier)
+          case PythonEvalType.SQL_MAP_ARROW_ITER_UDF =>
+            logical.PythonMapInArrow(
+              pythonUdf,
+              pythonUdf.dataType.asInstanceOf[StructType].toAttributes,
+              baseRel,
+              isBarrier)
+          case _ =>
+            throw InvalidPlanInput(
+              s"Function with EvalType: ${pythonUdf.evalType} is not supported")
+        }
       case _ =>
-        throw InvalidPlanInput(s"Function with EvalType: ${pythonUdf.evalType} is not supported")
+        throw InvalidPlanInput(
+          s"Function with ID: ${commonUdf.getFunctionCase.getNumber} is not supported")
     }
   }
 
+  private def generateObjAttr[T](enc: ExpressionEncoder[T]): Attribute = {
+    val dataType = enc.deserializer.dataType
+    val nullable = !enc.clsTag.runtimeClass.isPrimitive
+    AttributeReference("obj", dataType, nullable)()
+  }
+
+  private def transformTypedMapPartitions(
+      fun: proto.CommonInlineUserDefinedFunction,
+      child: LogicalPlan): LogicalPlan = {
+    val udf = fun.getScalarScalaUdf
+    val udfPacket =

Review Comment:
   No you cannot. That would mean you cannot use case classes or java beans. Rows would work though.
   
   The reason we propagate types is to make the sure the engine can supply correctly typed data.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhenlineo commented on pull request #40581: [SPARK-42953][Connect] Typed map, flatMap, mapPartitions

Posted by "zhenlineo (via GitHub)" <gi...@apache.org>.
zhenlineo commented on PR #40581:
URL: https://github.com/apache/spark/pull/40581#issuecomment-1487814020

   cc @xinrong-meng 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40581: [SPARK-42953][Connect] Typed filter, map, flatMap, mapPartitions

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40581:
URL: https://github.com/apache/spark/pull/40581#discussion_r1151352449


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/UdfUtils.scala:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.api.java.function._
+
+/**
+ * Util functions to help convert input functions between typed filter, map, flatMap,
+ * mapPartitions etc. These functions cannot be defined inside the client Dataset class as it will
+ * cause Dataset sync conflicts when used together with UDFs. Thus we define them outside, in the
+ * client package.
+ */
+object UdfUtils {

Review Comment:
   A public API or Internal utils?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhenlineo commented on a diff in pull request #40581: [SPARK-42953][Connect] Typed filter, map, flatMap, mapPartitions

Posted by "zhenlineo (via GitHub)" <gi...@apache.org>.
zhenlineo commented on code in PR #40581:
URL: https://github.com/apache/spark/pull/40581#discussion_r1156511273


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -482,27 +482,66 @@ class SparkConnectPlanner(val session: SparkSession) {
   }
 
   private def transformMapPartitions(rel: proto.MapPartitions): LogicalPlan = {
+    val baseRel = transformRelation(rel.getInput)
     val commonUdf = rel.getFunc
-    val pythonUdf = transformPythonUDF(commonUdf)
-    val isBarrier = if (rel.hasIsBarrier) rel.getIsBarrier else false
-    pythonUdf.evalType match {
-      case PythonEvalType.SQL_MAP_PANDAS_ITER_UDF =>
-        logical.MapInPandas(
-          pythonUdf,
-          pythonUdf.dataType.asInstanceOf[StructType].toAttributes,
-          transformRelation(rel.getInput),
-          isBarrier)
-      case PythonEvalType.SQL_MAP_ARROW_ITER_UDF =>
-        logical.PythonMapInArrow(
-          pythonUdf,
-          pythonUdf.dataType.asInstanceOf[StructType].toAttributes,
-          transformRelation(rel.getInput),
-          isBarrier)
+    commonUdf.getFunctionCase match {
+      case proto.CommonInlineUserDefinedFunction.FunctionCase.SCALAR_SCALA_UDF =>
+        transformTypedMapPartitions(commonUdf, baseRel)
+      case proto.CommonInlineUserDefinedFunction.FunctionCase.PYTHON_UDF =>
+        val pythonUdf = transformPythonUDF(commonUdf)
+        val isBarrier = if (rel.hasIsBarrier) rel.getIsBarrier else false
+        pythonUdf.evalType match {
+          case PythonEvalType.SQL_MAP_PANDAS_ITER_UDF =>
+            logical.MapInPandas(
+              pythonUdf,
+              pythonUdf.dataType.asInstanceOf[StructType].toAttributes,
+              baseRel,
+              isBarrier)
+          case PythonEvalType.SQL_MAP_ARROW_ITER_UDF =>
+            logical.PythonMapInArrow(
+              pythonUdf,
+              pythonUdf.dataType.asInstanceOf[StructType].toAttributes,
+              baseRel,
+              isBarrier)
+          case _ =>
+            throw InvalidPlanInput(
+              s"Function with EvalType: ${pythonUdf.evalType} is not supported")
+        }
       case _ =>
-        throw InvalidPlanInput(s"Function with EvalType: ${pythonUdf.evalType} is not supported")
+        throw InvalidPlanInput(
+          s"Function with ID: ${commonUdf.getFunctionCase.getNumber} is not supported")
     }
   }
 
+  private def generateObjAttr[T](enc: ExpressionEncoder[T]): Attribute = {
+    val dataType = enc.deserializer.dataType
+    val nullable = !enc.clsTag.runtimeClass.isPrimitive
+    AttributeReference("obj", dataType, nullable)()
+  }
+
+  private def transformTypedMapPartitions(
+      fun: proto.CommonInlineUserDefinedFunction,
+      child: LogicalPlan): LogicalPlan = {
+    val udf = fun.getScalarScalaUdf
+    val udfPacket =

Review Comment:
   The shared code is like one line. The main problem here is that the udf for filter and map cannot really construct a `ScalaUDF` due to the absence of `children: Seq[Expression]`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40581: [SPARK-42953][Connect] Typed filter, map, flatMap, mapPartitions

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40581:
URL: https://github.com/apache/spark/pull/40581#discussion_r1160142283


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -2468,6 +2470,110 @@ class Dataset[T] private[sql] (
    */
   def transform[U](t: Dataset[T] => Dataset[U]): Dataset[U] = t(this)
 
+  /**
+   * (Scala-specific) Returns a new Dataset that only contains elements where `func` returns
+   * `true`.
+   *
+   * @group typedrel
+   * @since 3.5.0
+   */
+  def filter(func: T => Boolean): Dataset[T] = {
+    val udf = ScalarUserDefinedFunction(
+      function = func,
+      inputEncoders = encoder :: Nil,
+      outputEncoder = PrimitiveBooleanEncoder)
+    sparkSession.newDataset[T](encoder) { builder =>
+      builder.getFilterBuilder
+        .setInput(plan.getRoot)
+        .setCondition(udf.apply(col("*")).expr)
+    }
+  }
+
+  /**
+   * (Java-specific) Returns a new Dataset that only contains elements where `func` returns
+   * `true`.
+   *
+   * @group typedrel
+   * @since 3.5.0
+   */
+  def filter(f: FilterFunction[T]): Dataset[T] = {
+    filter(UdfUtils.filterFuncToScalaFunc(f))
+  }
+
+  /**
+   * (Scala-specific) Returns a new Dataset that contains the result of applying `func` to each
+   * element.
+   *
+   * @group typedrel
+   * @since 3.5.0
+   */
+  def map[U: Encoder](f: T => U): Dataset[U] = {
+    mapPartitions(UdfUtils.mapFuncToMapPartitionsAdaptor(f))
+  }
+
+  /**
+   * (Java-specific) Returns a new Dataset that contains the result of applying `func` to each
+   * element.
+   *
+   * @group typedrel
+   * @since 3.5.0
+   */
+  def map[U](f: MapFunction[T, U], encoder: Encoder[U]): Dataset[U] = {
+    map(UdfUtils.mapFunctionToScalaFunc(f))(encoder)
+  }
+
+  /**
+   * (Scala-specific) Returns a new Dataset that contains the result of applying `func` to each
+   * partition.
+   *
+   * @group typedrel
+   * @since 3.5.0
+   */
+  def mapPartitions[U: Encoder](func: Iterator[T] => Iterator[U]): Dataset[U] = {
+    val outputEncoder = encoderFor[U]
+    val udf = ScalarUserDefinedFunction(
+      function = func,
+      inputEncoders = encoder :: Nil,
+      outputEncoder = outputEncoder)
+    sparkSession.newDataset(outputEncoder) { builder =>
+      builder.getMapPartitionsBuilder
+        .setInput(plan.getRoot)
+        .setFunc(udf.apply().expr.getCommonInlineUserDefinedFunction)

Review Comment:
   Nit we should also `col("*")` here to be consistent. I know that on the server side we ignore this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org