You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "bogao007 (via GitHub)" <gi...@apache.org> on 2023/05/09 22:20:48 UTC

[GitHub] [spark] bogao007 commented on a diff in pull request #40959: [CONNECT][SS]Implemented MapGroupsWithState and FlatMapGroupsWithState APIs for Spark Connect

bogao007 commented on code in PR #40959:
URL: https://github.com/apache/spark/pull/40959#discussion_r1177633899


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -632,6 +636,52 @@ class SparkConnectPlanner(val session: SparkSession) {
     SerializeFromObject(udf.outputNamedExpression, mapped)
   }
 
+  private def transformFlatMapGroupsWithState(rel: proto.FlatMapGroupsWithState): LogicalPlan = {
+    val commonUdf = rel.getFunc
+    val udf = ScalaUdf(commonUdf)
+    val ds = UntypedKeyValueGroupedDataset(
+      rel.getInput,
+      rel.getGroupingExpressionsList,
+      new util.ArrayList[proto.Expression]())
+
+    val initialDs = UntypedKeyValueGroupedDataset(
+      rel.getInput,
+      rel.getInitialGroupingExpressionsList,
+      new util.ArrayList[proto.Expression]())
+
+    val hasInitialState = !rel.getInitialGroupingExpressionsList.isEmpty
+    val timeoutConf = if (rel.getTimeoutConf.isEmpty) {
+      GroupStateTimeout.NoTimeout
+    } else {
+      org.apache.spark.sql.execution.streaming.GroupStateImpl
+        .groupStateTimeoutFromString(rel.getTimeoutConf)
+    }
+    val outputMode = if (rel.getOutputMode.isEmpty) {
+      OutputMode.Update
+    } else {
+      InternalOutputModes(rel.getOutputMode)
+    }
+
+    val flatMapGroupsWithState = new FlatMapGroupsWithState(
+      udf.function.asInstanceOf[(Any, Iterator[Any], LogicalGroupState[Any]) => Iterator[Any]],
+      udf.inputDeserializer(ds.groupingAttributes),
+      ds.valueDeserializer,
+      ds.groupingAttributes,
+      ds.dataAttributes,
+      udf.outputObjAttr,
+      ds.kEncoder.asInstanceOf[ExpressionEncoder[Any]],

Review Comment:
   Placeholder for state encoder, will need to investigate how to pass the state encoder when I'm back



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala:
##########
@@ -312,6 +313,154 @@ abstract class KeyValueGroupedDataset[K, V] private[sql] () extends Serializable
     cogroupSorted(other)(thisSortExprs: _*)(otherSortExprs: _*)(
       UdfUtils.coGroupFunctionToScalaFunc(f))(encoder)
   }
+
+  /**
+   * (Scala-specific) Applies the given function to each group of data, while maintaining a
+   * user-defined per-group state. The result Dataset will represent the objects returned by the
+   * function. For a static batch Dataset, the function will be invoked once per group. For a
+   * streaming Dataset, the function will be invoked for each group repeatedly in every trigger,
+   * and updates to each group's state will be saved across invocations. See
+   * [[org.apache.spark.sql.streaming.GroupState]] for more details.
+   *
+   * @tparam S
+   *   The type of the user-defined state. Must be encodable to Spark SQL types.
+   * @tparam U
+   *   The type of the output objects. Must be encodable to Spark SQL types.
+   * @param func
+   *   Function to be called on every group.
+   *
+   * See [[Encoder]] for more details on what types are encodable to Spark SQL.
+   * @since 2.2.0
+   */
+  def mapGroupsWithState[S: Encoder, U: Encoder](

Review Comment:
   Didn't get chance to add all of the overload methods for `mapGroupsWithState` and `flatMapGroupsWithState`. Will add the rest methods when I'm back.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org