You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "anishshri-db (via GitHub)" <gi...@apache.org> on 2024/03/28 19:37:55 UTC

[PR] [SPARK-47553] Add Java support for transformWithState operator APIs [spark]

anishshri-db opened a new pull request, #45758:
URL: https://github.com/apache/spark/pull/45758

   ### What changes were proposed in this pull request?
   Add Java support for transformWithState operator APIs
   
   ### Why are the changes needed?
   To add support for using transformWithState operator in Java
   
   
   ### Does this PR introduce _any_ user-facing change?
   Yes
   
   ### How was this patch tested?
   Added unit tests
   
   ```
   [info] Test test.org.apache.spark.sql.JavaDatasetSuite#testGroupByKey() started
   [info] Test test.org.apache.spark.sql.JavaDatasetSuite#testCollect() started
   [info] Test test.org.apache.spark.sql.JavaDatasetSuite#testKryoEncoderErrorMessageForPrivateClass() started
   [info] Test test.org.apache.spark.sql.JavaDatasetSuite#testJavaBeanEncoder() started
   [info] Test test.org.apache.spark.sql.JavaDatasetSuite#testTupleEncoder() started
   [info] Test test.org.apache.spark.sql.JavaDatasetSuite#testPeriodEncoder() started
   [info] Test test.org.apache.spark.sql.JavaDatasetSuite#testRowEncoder() started
   [info] Test test.org.apache.spark.sql.JavaDatasetSuite#testNestedTupleEncoder() started
   [info] Test test.org.apache.spark.sql.JavaDatasetSuite#testTupleEncoderSchema() started
   [info] Test test.org.apache.spark.sql.JavaDatasetSuite#testMappingFunctionWithTestGroupState() started
   [info] Test test.org.apache.spark.sql.JavaDatasetSuite#testReduce() started
   [info] Test test.org.apache.spark.sql.JavaDatasetSuite#testSelect() started
   [info] Test test.org.apache.spark.sql.JavaDatasetSuite#testInitialStateFlatMapGroupsWithState() started
   [info] Test test.org.apache.spark.sql.JavaDatasetSuite#testJavaEncoderErrorMessageForPrivateClass() started
   [info] Test run finished: 0 failed, 0 ignored, 45 total, 14.73s
   [info] Passed: Total 45, Failed 0, Errors 0, Passed 45
   [success] Total time: 20 s, completed Mar 28, 2024, 12:37:30 PM
   ```
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47553][SS] Add Java support for transformWithState operator APIs [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45758:
URL: https://github.com/apache/spark/pull/45758#discussion_r1544092354


##########
sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessor.scala:
##########
@@ -28,7 +28,7 @@ import org.apache.spark.sql.errors.ExecutionErrors
  */
 @Experimental
 @Evolving

Review Comment:
   The API is still evolving and under active development. Thats why we have kept it as `package private` till we decide which Spark release it will become public in



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47553][SS] Add Java support for transformWithState operator APIs [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #45758:
URL: https://github.com/apache/spark/pull/45758#issuecomment-2033813895

   There seems to be a valid compilation failure.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [WIP][SPARK-47553][SS] Add Java support for transformWithState operator APIs [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #45758:
URL: https://github.com/apache/spark/pull/45758#discussion_r1543989348


##########
sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala:
##########
@@ -690,6 +718,7 @@ class KeyValueGroupedDataset[K, V] private[sql](
    * @param initialState      User provided initial state that will be used to initiate state for
    *                          the query in the first batch.
    *
+   * See [[Encoder]] for more details on what types are encodable to Spark SQL.
    */
   private[sql] def transformWithState[U: Encoder, S: Encoder](

Review Comment:
   Is this an API? we should remove `private[sql]`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47553][SS] Add Java support for transformWithState operator APIs [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on PR #45758:
URL: https://github.com/apache/spark/pull/45758#issuecomment-2026694871

   @HeartSaVioR @sahnib - could you folks please take a look ? Thx


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47553][SS] Add Java support for transformWithState operator APIs [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #45758:
URL: https://github.com/apache/spark/pull/45758#issuecomment-2035459986

   Thanks! Merging to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47553][SS] Add Java support for transformWithState operator APIs [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #45758:
URL: https://github.com/apache/spark/pull/45758#discussion_r1544096683


##########
sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala:
##########
@@ -690,6 +718,7 @@ class KeyValueGroupedDataset[K, V] private[sql](
    * @param initialState      User provided initial state that will be used to initiate state for
    *                          the query in the first batch.
    *
+   * See [[Encoder]] for more details on what types are encodable to Spark SQL.
    */
   private[sql] def transformWithState[U: Encoder, S: Encoder](

Review Comment:
   We want to open everything at once when we are ready to release the feature, hence we guard the API via `private[sql]`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47553][SS] Add Java support for transformWithState operator APIs [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45758:
URL: https://github.com/apache/spark/pull/45758#discussion_r1544093028


##########
sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala:
##########
@@ -690,6 +718,7 @@ class KeyValueGroupedDataset[K, V] private[sql](
    * @param initialState      User provided initial state that will be used to initiate state for
    *                          the query in the first batch.
    *
+   * See [[Encoder]] for more details on what types are encodable to Spark SQL.
    */
   private[sql] def transformWithState[U: Encoder, S: Encoder](

Review Comment:
   Same as above. We have a bunch of sub-features that we are working on that might need small changes to the APIs here. So we have kept this as `package private` for now and will make it public once all the changes are finalized.
   cc - @HeartSaVioR 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47553][SS] Add Java support for transformWithState operator APIs [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #45758:
URL: https://github.com/apache/spark/pull/45758#discussion_r1548917934


##########
sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala:
##########
@@ -676,6 +677,33 @@ class KeyValueGroupedDataset[K, V] private[sql](
     )
   }
 
+  /**
+   * (Java-specific)
+   * Invokes methods defined in the stateful processor used in arbitrary state API v2.
+   * We allow the user to act on per-group set of input rows along with keyed state and the
+   * user can choose to output/return 0 or more rows.
+   * For a static/batch dataset, this operator is not supported and will throw an exception.

Review Comment:
   nit: This is no longer true, right? As you fixed in Spark Connect method. Please fix this in above method (Scala-specific) as well.



##########
sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala:
##########
@@ -712,6 +741,35 @@ class KeyValueGroupedDataset[K, V] private[sql](
     )
   }
 
+  /**
+   * (Java-specific)
+   * Invokes methods defined in the stateful processor used in arbitrary state API v2.
+   * Functions as the function above, but with additional initial state.
+   *
+   * @tparam U The type of the output objects. Must be encodable to Spark SQL types.
+   * @tparam S The type of initial state objects. Must be encodable to Spark SQL types.
+   * @param statefulProcessor Instance of statefulProcessor whose functions will
+   *                          be invoked by the operator.
+   * @param timeoutMode       The timeout mode of the stateful processor.
+   * @param outputMode        The output mode of the stateful processor. Defaults to APPEND mode.

Review Comment:
   nit: remove default



##########
sql/core/src/test/java/test/org/apache/spark/sql/TestStatefulProcessorWithInitialState.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.org.apache.spark.sql;
+
+import java.util.*;
+
+import scala.jdk.javaapi.CollectionConverters;
+
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.streaming.*;
+
+/**
+ * A test stateful processor  concatenates all input rows for a key and emits the result.

Review Comment:
   nit: while we are here, double spaces



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47553][SS] Add Java support for transformWithState operator APIs [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on PR #45758:
URL: https://github.com/apache/spark/pull/45758#issuecomment-2035230353

   > There seems to be a "valid" compilation failure. @anishshri-db Could you please look into this?
   
   Yup thx - fixed this


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [WIP][SPARK-47553][SS] Add Java support for transformWithState operator APIs [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #45758:
URL: https://github.com/apache/spark/pull/45758#discussion_r1543988477


##########
sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessor.scala:
##########
@@ -28,7 +28,7 @@ import org.apache.spark.sql.errors.ExecutionErrors
  */
 @Experimental
 @Evolving

Review Comment:
   Can we add `since`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47553][SS] Add Java support for transformWithState operator APIs [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR closed pull request #45758: [SPARK-47553][SS] Add Java support for transformWithState operator APIs
URL: https://github.com/apache/spark/pull/45758


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47553][SS] Add Java support for transformWithState operator APIs [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45758:
URL: https://github.com/apache/spark/pull/45758#discussion_r1548984946


##########
sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala:
##########
@@ -676,6 +677,33 @@ class KeyValueGroupedDataset[K, V] private[sql](
     )
   }
 
+  /**
+   * (Java-specific)
+   * Invokes methods defined in the stateful processor used in arbitrary state API v2.
+   * We allow the user to act on per-group set of input rows along with keyed state and the
+   * user can choose to output/return 0 or more rows.
+   * For a static/batch dataset, this operator is not supported and will throw an exception.

Review Comment:
   Done



##########
sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala:
##########
@@ -712,6 +741,35 @@ class KeyValueGroupedDataset[K, V] private[sql](
     )
   }
 
+  /**
+   * (Java-specific)
+   * Invokes methods defined in the stateful processor used in arbitrary state API v2.
+   * Functions as the function above, but with additional initial state.
+   *
+   * @tparam U The type of the output objects. Must be encodable to Spark SQL types.
+   * @tparam S The type of initial state objects. Must be encodable to Spark SQL types.
+   * @param statefulProcessor Instance of statefulProcessor whose functions will
+   *                          be invoked by the operator.
+   * @param timeoutMode       The timeout mode of the stateful processor.
+   * @param outputMode        The output mode of the stateful processor. Defaults to APPEND mode.

Review Comment:
   Done



##########
sql/core/src/test/java/test/org/apache/spark/sql/TestStatefulProcessorWithInitialState.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.org.apache.spark.sql;
+
+import java.util.*;
+
+import scala.jdk.javaapi.CollectionConverters;
+
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.streaming.*;
+
+/**
+ * A test stateful processor  concatenates all input rows for a key and emits the result.

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org