You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "WweiL (via GitHub)" <gi...@apache.org> on 2023/09/01 20:03:14 UTC

[GitHub] [spark] WweiL opened a new pull request, #42779: [SPARK-45056][PYTHON][SS][CONNECT] Termination tests for streamingQueryListener and foreachBatch

WweiL opened a new pull request, #42779:
URL: https://github.com/apache/spark/pull/42779

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   Add termination tests for StreamingQueryListener and foreachBatch. The behavior is mimicked by creating the same query on server side that would have been created if running the same python query on client side. Refactored the code a bit for testing purpose.
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   Necessary tests
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   No
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   Test only addition
   
   ### Was this patch authored or co-authored using generative AI tooling?
   <!--
   If generative AI tooling has been used in the process of authoring this patch, please include the
   phrase: 'Generated-by: ' followed by the name of the tool and its version.
   If no, write 'No'.
   Please refer to the [ASF Generative Tooling Guidance](https://www.apache.org/legal/generative-tooling.html) for details.
   -->
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #42779: [SPARK-45056][PYTHON][SS][CONNECT] Termination tests for streamingQueryListener and foreachBatch

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #42779:
URL: https://github.com/apache/spark/pull/42779#discussion_r1328453510


##########
connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHodlerSuite.scala:
##########
@@ -79,4 +92,196 @@ class SparkConnectSessionHolderSuite extends SharedSparkSession {
       sessionHolder.getDataFrameOrThrow(key1)
     }
   }
+
+  private def streamingForeachBatchFunction(pysparkPythonPath: String): Array[Byte] = {
+    var binaryFunc: Array[Byte] = null
+    withTempPath { path =>
+      Process(
+        Seq(
+          IntegratedUDFTestUtils.pythonExec,
+          "-c",
+          "from pyspark.serializers import CloudPickleSerializer; " +
+            s"f = open('$path', 'wb');" +
+            "f.write(CloudPickleSerializer().dumps((" +
+            "lambda df, batchId: batchId)))"),
+        None,
+        "PYTHONPATH" -> pysparkPythonPath).!!
+      binaryFunc = Files.readAllBytes(path.toPath)
+    }
+    assert(binaryFunc != null)
+    binaryFunc
+  }
+
+  private def streamingQueryListenerFunction(pysparkPythonPath: String): Array[Byte] = {
+    var binaryFunc: Array[Byte] = null
+    val pythonScript =
+      """
+        |from pyspark.sql.streaming.listener import StreamingQueryListener
+        |
+        |class MyListener(StreamingQueryListener):
+        |    def onQueryStarted(e):
+        |        pass
+        |
+        |    def onQueryIdle(e):
+        |        pass
+        |
+        |    def onQueryProgress(e):
+        |        pass
+        |
+        |    def onQueryTerminated(e):
+        |        pass
+        |
+        |listener = MyListener()
+      """.stripMargin
+    withTempPath { codePath =>
+      Files.write(codePath.toPath, pythonScript.getBytes(StandardCharsets.UTF_8))
+      withTempPath { path =>
+        Process(
+          Seq(
+            IntegratedUDFTestUtils.pythonExec,
+            "-c",
+            "from pyspark.serializers import CloudPickleSerializer; " +
+              s"f = open('$path', 'wb');" +
+              s"exec(open('$codePath', 'r').read());" +
+              "f.write(CloudPickleSerializer().dumps(listener))"),
+          None,
+          "PYTHONPATH" -> pysparkPythonPath).!!
+        binaryFunc = Files.readAllBytes(path.toPath)
+      }
+    }
+    assert(binaryFunc != null)
+    binaryFunc
+  }
+
+  private def dummyPythonFunction(sessionHolder: SessionHolder)(
+      fcn: String => Array[Byte]): SimplePythonFunction = {
+    val sparkPythonPath =
+      s"${IntegratedUDFTestUtils.pysparkPythonPath}:${IntegratedUDFTestUtils.pythonPath}"
+
+    SimplePythonFunction(
+      command = fcn(sparkPythonPath),
+      envVars = mutable.Map("PYTHONPATH" -> sparkPythonPath).asJava,
+      pythonIncludes = sessionHolder.artifactManager.getSparkConnectPythonIncludes.asJava,
+      pythonExec = IntegratedUDFTestUtils.pythonExec,
+      pythonVer = IntegratedUDFTestUtils.pythonVer,
+      broadcastVars = Lists.newArrayList(),
+      accumulator = null)
+  }
+
+  test("python foreachBatch process: process terminates after query is stopped") {
+    // scalastyle:off assume
+    assume(IntegratedUDFTestUtils.shouldTestPythonUDFs)
+    // scalastyle:on assume
+
+    val sessionHolder = SessionHolder.forTesting(spark)
+    try {
+      SparkConnectService.start(spark.sparkContext)
+
+      val pythonFn = dummyPythonFunction(sessionHolder)(streamingForeachBatchFunction)
+      val (fn1, cleaner1) =
+        StreamingForeachBatchHelper.pythonForeachBatchWrapper(pythonFn, sessionHolder)
+      val (fn2, cleaner2) =
+        StreamingForeachBatchHelper.pythonForeachBatchWrapper(pythonFn, sessionHolder)
+
+      val query1 = spark.readStream
+        .format("rate")
+        .load()
+        .writeStream
+        .format("memory")
+        .queryName("foreachBatch_termination_test_q1")
+        .foreachBatch(fn1)
+        .start()
+
+      val query2 = spark.readStream
+        .format("rate")
+        .load()
+        .writeStream
+        .format("memory")
+        .queryName("foreachBatch_termination_test_q2")
+        .foreachBatch(fn2)
+        .start()
+
+      sessionHolder.streamingForeachBatchRunnerCleanerCache
+        .registerCleanerForQuery(query1, cleaner1)
+      sessionHolder.streamingForeachBatchRunnerCleanerCache
+        .registerCleanerForQuery(query2, cleaner2)
+
+      val (runner1, runner2) = (cleaner1.runner, cleaner2.runner)
+
+      // assert both python processes are running
+      assert(!runner1.isWorkerStopped().get)
+      assert(!runner2.isWorkerStopped().get)
+      // stop query1
+      query1.stop()
+      // assert query1's python process is not running
+      eventually(timeout(30.seconds)) {
+        assert(runner1.isWorkerStopped().get)
+        assert(!runner2.isWorkerStopped().get)
+      }
+
+      // stop query2
+      query2.stop()
+      eventually(timeout(30.seconds)) {
+        // assert query2's python process is not running
+        assert(runner2.isWorkerStopped().get)
+      }
+
+      assert(spark.streams.active.isEmpty) // no running query
+      assert(spark.streams.listListeners().length == 1) // only process termination listener
+    } finally {
+      SparkConnectService.stop()
+      // remove process termination listener
+      spark.streams.removeListener(spark.streams.listListeners()(0))
+    }
+  }
+
+  test("python listener process: process terminates after listener is removed") {

Review Comment:
   It seems that the new tesgt is needed is `pandas`, so should we change to check
   
   ```
   assume(IntegratedUDFTestUtils.shouldTestPandasUDFs)
   ```
   ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] yaooqinn commented on a diff in pull request #42779: [SPARK-45056][PYTHON][SS][CONNECT] Termination tests for streamingQueryListener and foreachBatch

Posted by "yaooqinn (via GitHub)" <gi...@apache.org>.
yaooqinn commented on code in PR #42779:
URL: https://github.com/apache/spark/pull/42779#discussion_r1325551540


##########
connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHodlerSuite.scala:
##########
@@ -79,4 +92,196 @@ class SparkConnectSessionHolderSuite extends SharedSparkSession {
       sessionHolder.getDataFrameOrThrow(key1)
     }
   }
+
+  private def streamingForeachBatchFunction(pysparkPythonPath: String): Array[Byte] = {
+    var binaryFunc: Array[Byte] = null
+    withTempPath { path =>
+      Process(
+        Seq(
+          IntegratedUDFTestUtils.pythonExec,
+          "-c",
+          "from pyspark.serializers import CloudPickleSerializer; " +
+            s"f = open('$path', 'wb');" +
+            "f.write(CloudPickleSerializer().dumps((" +
+            "lambda df, batchId: batchId)))"),
+        None,
+        "PYTHONPATH" -> pysparkPythonPath).!!
+      binaryFunc = Files.readAllBytes(path.toPath)
+    }
+    assert(binaryFunc != null)
+    binaryFunc
+  }
+
+  private def streamingQueryListenerFunction(pysparkPythonPath: String): Array[Byte] = {
+    var binaryFunc: Array[Byte] = null
+    val pythonScript =
+      """
+        |from pyspark.sql.streaming.listener import StreamingQueryListener
+        |
+        |class MyListener(StreamingQueryListener):
+        |    def onQueryStarted(e):
+        |        pass
+        |
+        |    def onQueryIdle(e):
+        |        pass
+        |
+        |    def onQueryProgress(e):
+        |        pass
+        |
+        |    def onQueryTerminated(e):
+        |        pass
+        |
+        |listener = MyListener()
+      """.stripMargin
+    withTempPath { codePath =>
+      Files.write(codePath.toPath, pythonScript.getBytes(StandardCharsets.UTF_8))
+      withTempPath { path =>
+        Process(
+          Seq(
+            IntegratedUDFTestUtils.pythonExec,
+            "-c",
+            "from pyspark.serializers import CloudPickleSerializer; " +
+              s"f = open('$path', 'wb');" +
+              s"exec(open('$codePath', 'r').read());" +
+              "f.write(CloudPickleSerializer().dumps(listener))"),
+          None,
+          "PYTHONPATH" -> pysparkPythonPath).!!
+        binaryFunc = Files.readAllBytes(path.toPath)
+      }
+    }
+    assert(binaryFunc != null)
+    binaryFunc
+  }
+
+  private def dummyPythonFunction(sessionHolder: SessionHolder)(
+      fcn: String => Array[Byte]): SimplePythonFunction = {
+    val sparkPythonPath =
+      s"${IntegratedUDFTestUtils.pysparkPythonPath}:${IntegratedUDFTestUtils.pythonPath}"
+
+    SimplePythonFunction(
+      command = fcn(sparkPythonPath),
+      envVars = mutable.Map("PYTHONPATH" -> sparkPythonPath).asJava,
+      pythonIncludes = sessionHolder.artifactManager.getSparkConnectPythonIncludes.asJava,
+      pythonExec = IntegratedUDFTestUtils.pythonExec,
+      pythonVer = IntegratedUDFTestUtils.pythonVer,
+      broadcastVars = Lists.newArrayList(),
+      accumulator = null)
+  }
+
+  test("python foreachBatch process: process terminates after query is stopped") {
+    // scalastyle:off assume
+    assume(IntegratedUDFTestUtils.shouldTestPythonUDFs)
+    // scalastyle:on assume
+
+    val sessionHolder = SessionHolder.forTesting(spark)
+    try {
+      SparkConnectService.start(spark.sparkContext)
+
+      val pythonFn = dummyPythonFunction(sessionHolder)(streamingForeachBatchFunction)
+      val (fn1, cleaner1) =
+        StreamingForeachBatchHelper.pythonForeachBatchWrapper(pythonFn, sessionHolder)
+      val (fn2, cleaner2) =
+        StreamingForeachBatchHelper.pythonForeachBatchWrapper(pythonFn, sessionHolder)
+
+      val query1 = spark.readStream
+        .format("rate")
+        .load()
+        .writeStream
+        .format("memory")
+        .queryName("foreachBatch_termination_test_q1")
+        .foreachBatch(fn1)
+        .start()
+
+      val query2 = spark.readStream
+        .format("rate")
+        .load()
+        .writeStream
+        .format("memory")
+        .queryName("foreachBatch_termination_test_q2")
+        .foreachBatch(fn2)
+        .start()
+
+      sessionHolder.streamingForeachBatchRunnerCleanerCache
+        .registerCleanerForQuery(query1, cleaner1)
+      sessionHolder.streamingForeachBatchRunnerCleanerCache
+        .registerCleanerForQuery(query2, cleaner2)
+
+      val (runner1, runner2) = (cleaner1.runner, cleaner2.runner)
+
+      // assert both python processes are running
+      assert(!runner1.isWorkerStopped().get)
+      assert(!runner2.isWorkerStopped().get)
+      // stop query1
+      query1.stop()
+      // assert query1's python process is not running
+      eventually(timeout(30.seconds)) {
+        assert(runner1.isWorkerStopped().get)
+        assert(!runner2.isWorkerStopped().get)
+      }
+
+      // stop query2
+      query2.stop()
+      eventually(timeout(30.seconds)) {
+        // assert query2's python process is not running
+        assert(runner2.isWorkerStopped().get)
+      }
+
+      assert(spark.streams.active.isEmpty) // no running query
+      assert(spark.streams.listListeners().length == 1) // only process termination listener
+    } finally {
+      SparkConnectService.stop()
+      // remove process termination listener
+      spark.streams.removeListener(spark.streams.listListeners()(0))

Review Comment:
   Hi @WweiL @HyukjinKwon,  we encountered a test failure with this line 
   
   ```scala
   python foreachBatch process: process terminates after query is stopped *** FAILED *** (1 second, 431 milliseconds)
   [info]   java.lang.ArrayIndexOutOfBoundsException: 0
   [info]   at org.apache.spark.sql.connect.service.SparkConnectSessionHolderSuite.$anonfun$new$7(SparkConnectSessionHodlerSuite.scala:234)
   ```
   
   How about changing it to
   ```scala
   spark.streams.listListeners().foreach(spark.streams.removeListener)
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #42779: [SPARK-45056][PYTHON][SS][CONNECT] Termination tests for streamingQueryListener and foreachBatch

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #42779:
URL: https://github.com/apache/spark/pull/42779#discussion_r1323973632


##########
.github/workflows/build_and_test.yml:
##########
@@ -256,14 +256,14 @@ jobs:
       # We should install one Python that is higher then 3+ for SQL and Yarn because:
       # - SQL component also has Python related tests, for example, IntegratedUDFTestUtils.
       # - Yarn has a Python specific test too, for example, YarnClusterSuite.
-      if: contains(matrix.modules, 'yarn') || (contains(matrix.modules, 'sql') && !contains(matrix.modules, 'sql-'))
+      if: contains(matrix.modules, 'yarn') || (contains(matrix.modules, 'sql') && !contains(matrix.modules, 'sql-')) || contains(matrix.modules, 'connect')
       with:
         python-version: 3.8
         architecture: x64
     - name: Install Python packages (Python 3.8)
-      if: (contains(matrix.modules, 'sql') && !contains(matrix.modules, 'sql-'))
+      if: (contains(matrix.modules, 'sql') && !contains(matrix.modules, 'sql-')) || contains(matrix.modules, 'connect')
       run: |
-        python3.8 -m pip install 'numpy>=1.20.0' pyarrow pandas scipy unittest-xml-reporting 'grpcio==1.56.0' 'protobuf==3.20.3'
+        python3.8 -m pip install 'numpy>=1.20.0' pyarrow pandas scipy unittest-xml-reporting 'grpcio==1.56.0' 'protobuf==3.20.3' grpcio-status

Review Comment:
   ```suggestion
           python3.8 -m pip install 'numpy>=1.20.0' pyarrow pandas scipy unittest-xml-reporting 'grpcio==1.56.0' 'protobuf==3.20.3' grpcio-status=='1.56.0'
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon closed pull request #42779: [SPARK-45056][PYTHON][SS][CONNECT] Termination tests for streamingQueryListener and foreachBatch

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon closed pull request #42779: [SPARK-45056][PYTHON][SS][CONNECT] Termination tests for streamingQueryListener and foreachBatch
URL: https://github.com/apache/spark/pull/42779


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #42779: [SPARK-45056][PYTHON][SS][CONNECT] Termination tests for streamingQueryListener and foreachBatch

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #42779:
URL: https://github.com/apache/spark/pull/42779#discussion_r1317722057


##########
python/pyspark/sql/tests/connect/streaming/worker_for_testing.py:
##########
@@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+A worker for streaming foreachBatch in Spark Connect.
+Usually this is ran on the driver side of the Spark Connect Server.
+"""
+import os
+
+from pyspark.java_gateway import local_connect_and_auth
+from pyspark.serializers import (
+    write_int,
+    UTF8Deserializer,
+    CPickleSerializer,
+)
+from typing import IO
+from pyspark.worker_util import check_python_version
+
+pickle_ser = CPickleSerializer()
+utf8_deserializer = UTF8Deserializer()
+
+
+def main(infile: IO, outfile: IO) -> None:
+    check_python_version(infile)
+
+    connect_url = os.environ["SPARK_CONNECT_LOCAL_URL"]
+    session_id = utf8_deserializer.loads(infile)
+
+    print(
+        "Streaming test worker is starting with " f"url {connect_url} and sessionId {session_id}."
+    )
+
+    write_int(0, outfile)  # Indicate successful initialization
+    outfile.flush()
+
+    while True:

Review Comment:
   Thanks. Removed the test worker. Found a way to test the actual python code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #42779: [SPARK-45056][PYTHON][SS][CONNECT] Termination tests for streamingQueryListener and foreachBatch

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #42779:
URL: https://github.com/apache/spark/pull/42779#discussion_r1316564483


##########
python/pyspark/sql/tests/connect/streaming/worker_for_testing.py:
##########
@@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+A worker for streaming foreachBatch in Spark Connect.
+Usually this is ran on the driver side of the Spark Connect Server.

Review Comment:
   Why do we need this? better to add description of that here. 



##########
python/pyspark/sql/tests/connect/streaming/worker_for_testing.py:
##########
@@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+A worker for streaming foreachBatch in Spark Connect.
+Usually this is ran on the driver side of the Spark Connect Server.
+"""
+import os
+
+from pyspark.java_gateway import local_connect_and_auth
+from pyspark.serializers import (
+    write_int,
+    UTF8Deserializer,
+    CPickleSerializer,
+)
+from typing import IO
+from pyspark.worker_util import check_python_version
+
+pickle_ser = CPickleSerializer()
+utf8_deserializer = UTF8Deserializer()
+
+
+def main(infile: IO, outfile: IO) -> None:
+    check_python_version(infile)
+
+    connect_url = os.environ["SPARK_CONNECT_LOCAL_URL"]
+    session_id = utf8_deserializer.loads(infile)
+
+    print(
+        "Streaming test worker is starting with " f"url {connect_url} and sessionId {session_id}."
+    )
+
+    write_int(0, outfile)  # Indicate successful initialization
+    outfile.flush()
+
+    while True:

Review Comment:
   Busy loop like can add a lot of burden on the test infra and make tests flaky 



##########
connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHodlerSuite.scala:
##########
@@ -79,4 +90,135 @@ class SparkConnectSessionHolderSuite extends SharedSparkSession {
       sessionHolder.getDataFrameOrThrow(key1)
     }
   }
+
+  private def dummyPythonFunction(sessionHolder: SessionHolder): SimplePythonFunction = {
+    // Needed because pythonPath in PythonWorkerFactory doesn't include test spark home
+    val sparkPythonPath = PythonUtils.mergePythonPaths(
+      Seq(sparkHome, "python", "lib", "pyspark.zip").mkString(File.separator),
+      Seq(sparkHome, "python", "lib", PythonUtils.PY4J_ZIP_NAME).mkString(File.separator))
+
+    SimplePythonFunction(
+      command = Array.emptyByteArray,
+      envVars = mutable.Map("PYTHONPATH" -> sparkPythonPath).asJava,
+      pythonIncludes = sessionHolder.artifactManager.getSparkConnectPythonIncludes.asJava,
+      pythonExec = IntegratedUDFTestUtils.pythonExec,
+      pythonVer = IntegratedUDFTestUtils.pythonVer,
+      broadcastVars = Lists.newArrayList(),
+      accumulator = null)
+  }
+
+  test("python foreachBatch process: process terminates after query is stopped") {
+    val sessionHolder = SessionHolder.forTesting(spark)

Review Comment:
   How does this test if the actual python process is not running?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on pull request #42779: [SPARK-45056][PYTHON][SS][CONNECT] Termination tests for streamingQueryListener and foreachBatch

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on PR #42779:
URL: https://github.com/apache/spark/pull/42779#issuecomment-1703372774

   Reminder to myself: https://github.com/apache/spark/pull/42687 and this PR will have conflicts in `streamingForeachBatchRunnerCleanerCache`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #42779: [SPARK-45056][PYTHON][SS][CONNECT] Termination tests for streamingQueryListener and foreachBatch

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #42779:
URL: https://github.com/apache/spark/pull/42779#issuecomment-1718652170

   Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #42779: [SPARK-45056][PYTHON][SS][CONNECT] Termination tests for streamingQueryListener and foreachBatch

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #42779:
URL: https://github.com/apache/spark/pull/42779#discussion_r1317722427


##########
connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHodlerSuite.scala:
##########
@@ -79,4 +90,135 @@ class SparkConnectSessionHolderSuite extends SharedSparkSession {
       sessionHolder.getDataFrameOrThrow(key1)
     }
   }
+
+  private def dummyPythonFunction(sessionHolder: SessionHolder): SimplePythonFunction = {
+    // Needed because pythonPath in PythonWorkerFactory doesn't include test spark home
+    val sparkPythonPath = PythonUtils.mergePythonPaths(
+      Seq(sparkHome, "python", "lib", "pyspark.zip").mkString(File.separator),
+      Seq(sparkHome, "python", "lib", PythonUtils.PY4J_ZIP_NAME).mkString(File.separator))
+
+    SimplePythonFunction(
+      command = Array.emptyByteArray,
+      envVars = mutable.Map("PYTHONPATH" -> sparkPythonPath).asJava,
+      pythonIncludes = sessionHolder.artifactManager.getSparkConnectPythonIncludes.asJava,
+      pythonExec = IntegratedUDFTestUtils.pythonExec,
+      pythonVer = IntegratedUDFTestUtils.pythonVer,
+      broadcastVars = Lists.newArrayList(),
+      accumulator = null)
+  }
+
+  test("python foreachBatch process: process terminates after query is stopped") {
+    val sessionHolder = SessionHolder.forTesting(spark)

Review Comment:
   Found out a new way of testing this. Please see the update



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #42779: [SPARK-45056][PYTHON][SS][CONNECT] Termination tests for streamingQueryListener and foreachBatch

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #42779:
URL: https://github.com/apache/spark/pull/42779#discussion_r1328447379


##########
connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHodlerSuite.scala:
##########
@@ -79,4 +92,196 @@ class SparkConnectSessionHolderSuite extends SharedSparkSession {
       sessionHolder.getDataFrameOrThrow(key1)
     }
   }
+
+  private def streamingForeachBatchFunction(pysparkPythonPath: String): Array[Byte] = {
+    var binaryFunc: Array[Byte] = null
+    withTempPath { path =>
+      Process(
+        Seq(
+          IntegratedUDFTestUtils.pythonExec,
+          "-c",
+          "from pyspark.serializers import CloudPickleSerializer; " +
+            s"f = open('$path', 'wb');" +
+            "f.write(CloudPickleSerializer().dumps((" +
+            "lambda df, batchId: batchId)))"),
+        None,
+        "PYTHONPATH" -> pysparkPythonPath).!!
+      binaryFunc = Files.readAllBytes(path.toPath)
+    }
+    assert(binaryFunc != null)
+    binaryFunc
+  }
+
+  private def streamingQueryListenerFunction(pysparkPythonPath: String): Array[Byte] = {
+    var binaryFunc: Array[Byte] = null
+    val pythonScript =
+      """
+        |from pyspark.sql.streaming.listener import StreamingQueryListener
+        |
+        |class MyListener(StreamingQueryListener):
+        |    def onQueryStarted(e):
+        |        pass
+        |
+        |    def onQueryIdle(e):
+        |        pass
+        |
+        |    def onQueryProgress(e):
+        |        pass
+        |
+        |    def onQueryTerminated(e):
+        |        pass
+        |
+        |listener = MyListener()
+      """.stripMargin
+    withTempPath { codePath =>
+      Files.write(codePath.toPath, pythonScript.getBytes(StandardCharsets.UTF_8))
+      withTempPath { path =>
+        Process(
+          Seq(
+            IntegratedUDFTestUtils.pythonExec,
+            "-c",
+            "from pyspark.serializers import CloudPickleSerializer; " +
+              s"f = open('$path', 'wb');" +
+              s"exec(open('$codePath', 'r').read());" +
+              "f.write(CloudPickleSerializer().dumps(listener))"),
+          None,
+          "PYTHONPATH" -> pysparkPythonPath).!!
+        binaryFunc = Files.readAllBytes(path.toPath)
+      }
+    }
+    assert(binaryFunc != null)
+    binaryFunc
+  }
+
+  private def dummyPythonFunction(sessionHolder: SessionHolder)(
+      fcn: String => Array[Byte]): SimplePythonFunction = {
+    val sparkPythonPath =
+      s"${IntegratedUDFTestUtils.pysparkPythonPath}:${IntegratedUDFTestUtils.pythonPath}"
+
+    SimplePythonFunction(
+      command = fcn(sparkPythonPath),
+      envVars = mutable.Map("PYTHONPATH" -> sparkPythonPath).asJava,
+      pythonIncludes = sessionHolder.artifactManager.getSparkConnectPythonIncludes.asJava,
+      pythonExec = IntegratedUDFTestUtils.pythonExec,
+      pythonVer = IntegratedUDFTestUtils.pythonVer,
+      broadcastVars = Lists.newArrayList(),
+      accumulator = null)
+  }
+
+  test("python foreachBatch process: process terminates after query is stopped") {
+    // scalastyle:off assume
+    assume(IntegratedUDFTestUtils.shouldTestPythonUDFs)
+    // scalastyle:on assume
+
+    val sessionHolder = SessionHolder.forTesting(spark)
+    try {
+      SparkConnectService.start(spark.sparkContext)
+
+      val pythonFn = dummyPythonFunction(sessionHolder)(streamingForeachBatchFunction)
+      val (fn1, cleaner1) =
+        StreamingForeachBatchHelper.pythonForeachBatchWrapper(pythonFn, sessionHolder)
+      val (fn2, cleaner2) =
+        StreamingForeachBatchHelper.pythonForeachBatchWrapper(pythonFn, sessionHolder)
+
+      val query1 = spark.readStream
+        .format("rate")
+        .load()
+        .writeStream
+        .format("memory")
+        .queryName("foreachBatch_termination_test_q1")
+        .foreachBatch(fn1)
+        .start()
+
+      val query2 = spark.readStream
+        .format("rate")
+        .load()
+        .writeStream
+        .format("memory")
+        .queryName("foreachBatch_termination_test_q2")
+        .foreachBatch(fn2)
+        .start()
+
+      sessionHolder.streamingForeachBatchRunnerCleanerCache
+        .registerCleanerForQuery(query1, cleaner1)
+      sessionHolder.streamingForeachBatchRunnerCleanerCache
+        .registerCleanerForQuery(query2, cleaner2)
+
+      val (runner1, runner2) = (cleaner1.runner, cleaner2.runner)
+
+      // assert both python processes are running
+      assert(!runner1.isWorkerStopped().get)
+      assert(!runner2.isWorkerStopped().get)
+      // stop query1
+      query1.stop()
+      // assert query1's python process is not running
+      eventually(timeout(30.seconds)) {
+        assert(runner1.isWorkerStopped().get)
+        assert(!runner2.isWorkerStopped().get)
+      }
+
+      // stop query2
+      query2.stop()
+      eventually(timeout(30.seconds)) {
+        // assert query2's python process is not running
+        assert(runner2.isWorkerStopped().get)
+      }
+
+      assert(spark.streams.active.isEmpty) // no running query
+      assert(spark.streams.listListeners().length == 1) // only process termination listener
+    } finally {
+      SparkConnectService.stop()
+      // remove process termination listener
+      spark.streams.removeListener(spark.streams.listListeners()(0))
+    }
+  }
+
+  test("python listener process: process terminates after listener is removed") {

Review Comment:
   @WweiL The newly added test case caused Maven's daily testing to fail. Could you help fix it? Thanks
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #42779: [SPARK-45056][PYTHON][SS][CONNECT] Termination tests for streamingQueryListener and foreachBatch

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #42779:
URL: https://github.com/apache/spark/pull/42779#issuecomment-1718988397

   let me make a quick followup or revert.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #42779: [SPARK-45056][PYTHON][SS][CONNECT] Termination tests for streamingQueryListener and foreachBatch

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #42779:
URL: https://github.com/apache/spark/pull/42779#discussion_r1325584119


##########
connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHodlerSuite.scala:
##########
@@ -79,4 +92,196 @@ class SparkConnectSessionHolderSuite extends SharedSparkSession {
       sessionHolder.getDataFrameOrThrow(key1)
     }
   }
+
+  private def streamingForeachBatchFunction(pysparkPythonPath: String): Array[Byte] = {
+    var binaryFunc: Array[Byte] = null
+    withTempPath { path =>
+      Process(
+        Seq(
+          IntegratedUDFTestUtils.pythonExec,
+          "-c",
+          "from pyspark.serializers import CloudPickleSerializer; " +
+            s"f = open('$path', 'wb');" +
+            "f.write(CloudPickleSerializer().dumps((" +
+            "lambda df, batchId: batchId)))"),
+        None,
+        "PYTHONPATH" -> pysparkPythonPath).!!
+      binaryFunc = Files.readAllBytes(path.toPath)
+    }
+    assert(binaryFunc != null)
+    binaryFunc
+  }
+
+  private def streamingQueryListenerFunction(pysparkPythonPath: String): Array[Byte] = {
+    var binaryFunc: Array[Byte] = null
+    val pythonScript =
+      """
+        |from pyspark.sql.streaming.listener import StreamingQueryListener
+        |
+        |class MyListener(StreamingQueryListener):
+        |    def onQueryStarted(e):
+        |        pass
+        |
+        |    def onQueryIdle(e):
+        |        pass
+        |
+        |    def onQueryProgress(e):
+        |        pass
+        |
+        |    def onQueryTerminated(e):
+        |        pass
+        |
+        |listener = MyListener()
+      """.stripMargin
+    withTempPath { codePath =>
+      Files.write(codePath.toPath, pythonScript.getBytes(StandardCharsets.UTF_8))
+      withTempPath { path =>
+        Process(
+          Seq(
+            IntegratedUDFTestUtils.pythonExec,
+            "-c",
+            "from pyspark.serializers import CloudPickleSerializer; " +
+              s"f = open('$path', 'wb');" +
+              s"exec(open('$codePath', 'r').read());" +
+              "f.write(CloudPickleSerializer().dumps(listener))"),
+          None,
+          "PYTHONPATH" -> pysparkPythonPath).!!
+        binaryFunc = Files.readAllBytes(path.toPath)
+      }
+    }
+    assert(binaryFunc != null)
+    binaryFunc
+  }
+
+  private def dummyPythonFunction(sessionHolder: SessionHolder)(
+      fcn: String => Array[Byte]): SimplePythonFunction = {
+    val sparkPythonPath =
+      s"${IntegratedUDFTestUtils.pysparkPythonPath}:${IntegratedUDFTestUtils.pythonPath}"
+
+    SimplePythonFunction(
+      command = fcn(sparkPythonPath),
+      envVars = mutable.Map("PYTHONPATH" -> sparkPythonPath).asJava,
+      pythonIncludes = sessionHolder.artifactManager.getSparkConnectPythonIncludes.asJava,
+      pythonExec = IntegratedUDFTestUtils.pythonExec,
+      pythonVer = IntegratedUDFTestUtils.pythonVer,
+      broadcastVars = Lists.newArrayList(),
+      accumulator = null)
+  }
+
+  test("python foreachBatch process: process terminates after query is stopped") {
+    // scalastyle:off assume
+    assume(IntegratedUDFTestUtils.shouldTestPythonUDFs)
+    // scalastyle:on assume
+
+    val sessionHolder = SessionHolder.forTesting(spark)
+    try {
+      SparkConnectService.start(spark.sparkContext)
+
+      val pythonFn = dummyPythonFunction(sessionHolder)(streamingForeachBatchFunction)
+      val (fn1, cleaner1) =
+        StreamingForeachBatchHelper.pythonForeachBatchWrapper(pythonFn, sessionHolder)
+      val (fn2, cleaner2) =
+        StreamingForeachBatchHelper.pythonForeachBatchWrapper(pythonFn, sessionHolder)
+
+      val query1 = spark.readStream
+        .format("rate")
+        .load()
+        .writeStream
+        .format("memory")
+        .queryName("foreachBatch_termination_test_q1")
+        .foreachBatch(fn1)
+        .start()
+
+      val query2 = spark.readStream
+        .format("rate")
+        .load()
+        .writeStream
+        .format("memory")
+        .queryName("foreachBatch_termination_test_q2")
+        .foreachBatch(fn2)
+        .start()
+
+      sessionHolder.streamingForeachBatchRunnerCleanerCache
+        .registerCleanerForQuery(query1, cleaner1)
+      sessionHolder.streamingForeachBatchRunnerCleanerCache
+        .registerCleanerForQuery(query2, cleaner2)
+
+      val (runner1, runner2) = (cleaner1.runner, cleaner2.runner)
+
+      // assert both python processes are running
+      assert(!runner1.isWorkerStopped().get)
+      assert(!runner2.isWorkerStopped().get)
+      // stop query1
+      query1.stop()
+      // assert query1's python process is not running
+      eventually(timeout(30.seconds)) {
+        assert(runner1.isWorkerStopped().get)
+        assert(!runner2.isWorkerStopped().get)
+      }
+
+      // stop query2
+      query2.stop()
+      eventually(timeout(30.seconds)) {
+        // assert query2's python process is not running
+        assert(runner2.isWorkerStopped().get)
+      }
+
+      assert(spark.streams.active.isEmpty) // no running query
+      assert(spark.streams.listListeners().length == 1) // only process termination listener
+    } finally {
+      SparkConnectService.stop()
+      // remove process termination listener
+      spark.streams.removeListener(spark.streams.listListeners()(0))
+    }
+  }
+
+  test("python listener process: process terminates after listener is removed") {
+    // scalastyle:off assume
+    assume(IntegratedUDFTestUtils.shouldTestPythonUDFs)
+    // scalastyle:on assume
+
+    val sessionHolder = SessionHolder.forTesting(spark)
+    try {
+      SparkConnectService.start(spark.sparkContext)
+
+      val pythonFn = dummyPythonFunction(sessionHolder)(streamingQueryListenerFunction)
+
+      val id1 = "listener_removeListener_test_1"
+      val id2 = "listener_removeListener_test_2"
+      val listener1 = new PythonStreamingQueryListener(pythonFn, sessionHolder)

Review Comment:
   I think this was a flakiness.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on pull request #42779: [SPARK-45056][PYTHON][SS][CONNECT] Termination tests for streamingQueryListener and foreachBatch

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on PR #42779:
URL: https://github.com/apache/spark/pull/42779#issuecomment-1703263828

   CC @bogao007 @rangadi @ueshin Can you guys take a look? Thanks in advance!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #42779: [SPARK-45056][PYTHON][SS][CONNECT] Termination tests for streamingQueryListener and foreachBatch

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #42779:
URL: https://github.com/apache/spark/pull/42779#discussion_r1317574980


##########
python/pyspark/sql/tests/connect/streaming/worker_for_testing.py:
##########
@@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+A worker for streaming foreachBatch in Spark Connect.
+Usually this is ran on the driver side of the Spark Connect Server.

Review Comment:
   Ah thanks for pointing that out, this comment needs to change



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #42779: [SPARK-45056][PYTHON][SS][CONNECT] Termination tests for streamingQueryListener and foreachBatch

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #42779:
URL: https://github.com/apache/spark/pull/42779#discussion_r1326514203


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingQueryListenerHelper.scala:
##########
@@ -26,15 +26,13 @@ import org.apache.spark.sql.streaming.StreamingQueryListener
  * instance of this class starts a python process, inside which has the python handling logic.
  * When a new event is received, it is serialized to json, and passed to the python process.
  */
-class PythonStreamingQueryListener(
-    listener: SimplePythonFunction,
-    sessionHolder: SessionHolder,
-    pythonExec: String)
+class PythonStreamingQueryListener(listener: SimplePythonFunction, sessionHolder: SessionHolder)
     extends StreamingQueryListener {
 
   private val port = SparkConnectService.localPort
   private val connectUrl = s"sc://localhost:$port/;user_id=${sessionHolder.userId}"
-  private val runner = StreamingPythonRunner(
+  // Scoped for testing

Review Comment:
   Nit: _// Visible for testing._



##########
core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala:
##########
@@ -108,10 +108,30 @@ private[spark] class StreamingPythonRunner(
    * Stops the Python worker.
    */
   def stop(): Unit = {
-    pythonWorker.foreach { worker =>
+    logInfo(s"Stopping streaming runner for sessionId: $sessionId, module: $workerModule.")
+
+    try {
       pythonWorkerFactory.foreach { factory =>
-        factory.stopWorker(worker)
-        factory.stop()
+        pythonWorker.foreach { worker =>
+          factory.stopWorker(worker)
+          factory.stop()
+        }
+      }
+    } catch {
+      case e: Exception =>
+        logError("Exception when trying to kill worker", e)
+    }
+  }
+
+  /**
+   * Returns whether the Python worker has been stopped.
+   * @return Some(true) if the Python worker has been stopped.

Review Comment:
   Simpler return `Boolean` and throw if there is no worker started. This is only used in tests.
   



##########
connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHodlerSuite.scala:
##########
@@ -79,4 +92,196 @@ class SparkConnectSessionHolderSuite extends SharedSparkSession {
       sessionHolder.getDataFrameOrThrow(key1)
     }
   }
+
+  private def streamingForeachBatchFunction(pysparkPythonPath: String): Array[Byte] = {

Review Comment:
   Add a brief comment about what this does (and/or it purpose).



##########
connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHodlerSuite.scala:
##########
@@ -79,4 +92,196 @@ class SparkConnectSessionHolderSuite extends SharedSparkSession {
       sessionHolder.getDataFrameOrThrow(key1)
     }
   }
+
+  private def streamingForeachBatchFunction(pysparkPythonPath: String): Array[Byte] = {
+    var binaryFunc: Array[Byte] = null
+    withTempPath { path =>
+      Process(
+        Seq(
+          IntegratedUDFTestUtils.pythonExec,
+          "-c",
+          "from pyspark.serializers import CloudPickleSerializer; " +
+            s"f = open('$path', 'wb');" +
+            "f.write(CloudPickleSerializer().dumps((" +
+            "lambda df, batchId: batchId)))"),
+        None,
+        "PYTHONPATH" -> pysparkPythonPath).!!
+      binaryFunc = Files.readAllBytes(path.toPath)
+    }
+    assert(binaryFunc != null)
+    binaryFunc
+  }
+
+  private def streamingQueryListenerFunction(pysparkPythonPath: String): Array[Byte] = {
+    var binaryFunc: Array[Byte] = null
+    val pythonScript =
+      """
+        |from pyspark.sql.streaming.listener import StreamingQueryListener
+        |
+        |class MyListener(StreamingQueryListener):
+        |    def onQueryStarted(e):
+        |        pass
+        |
+        |    def onQueryIdle(e):
+        |        pass
+        |
+        |    def onQueryProgress(e):
+        |        pass
+        |
+        |    def onQueryTerminated(e):
+        |        pass
+        |
+        |listener = MyListener()
+      """.stripMargin
+    withTempPath { codePath =>
+      Files.write(codePath.toPath, pythonScript.getBytes(StandardCharsets.UTF_8))
+      withTempPath { path =>
+        Process(
+          Seq(
+            IntegratedUDFTestUtils.pythonExec,
+            "-c",
+            "from pyspark.serializers import CloudPickleSerializer; " +
+              s"f = open('$path', 'wb');" +
+              s"exec(open('$codePath', 'r').read());" +
+              "f.write(CloudPickleSerializer().dumps(listener))"),
+          None,
+          "PYTHONPATH" -> pysparkPythonPath).!!
+        binaryFunc = Files.readAllBytes(path.toPath)
+      }
+    }
+    assert(binaryFunc != null)
+    binaryFunc
+  }
+
+  private def dummyPythonFunction(sessionHolder: SessionHolder)(
+      fcn: String => Array[Byte]): SimplePythonFunction = {
+    val sparkPythonPath =
+      s"${IntegratedUDFTestUtils.pysparkPythonPath}:${IntegratedUDFTestUtils.pythonPath}"
+
+    SimplePythonFunction(
+      command = fcn(sparkPythonPath),
+      envVars = mutable.Map("PYTHONPATH" -> sparkPythonPath).asJava,
+      pythonIncludes = sessionHolder.artifactManager.getSparkConnectPythonIncludes.asJava,
+      pythonExec = IntegratedUDFTestUtils.pythonExec,
+      pythonVer = IntegratedUDFTestUtils.pythonVer,
+      broadcastVars = Lists.newArrayList(),
+      accumulator = null)
+  }
+
+  test("python foreachBatch process: process terminates after query is stopped") {
+    // scalastyle:off assume
+    assume(IntegratedUDFTestUtils.shouldTestPythonUDFs)
+    // scalastyle:on assume
+
+    val sessionHolder = SessionHolder.forTesting(spark)
+    try {
+      SparkConnectService.start(spark.sparkContext)
+
+      val pythonFn = dummyPythonFunction(sessionHolder)(streamingForeachBatchFunction)
+      val (fn1, cleaner1) =
+        StreamingForeachBatchHelper.pythonForeachBatchWrapper(pythonFn, sessionHolder)
+      val (fn2, cleaner2) =
+        StreamingForeachBatchHelper.pythonForeachBatchWrapper(pythonFn, sessionHolder)
+
+      val query1 = spark.readStream
+        .format("rate")
+        .load()
+        .writeStream
+        .format("memory")
+        .queryName("foreachBatch_termination_test_q1")
+        .foreachBatch(fn1)
+        .start()
+
+      val query2 = spark.readStream
+        .format("rate")
+        .load()
+        .writeStream
+        .format("memory")
+        .queryName("foreachBatch_termination_test_q2")
+        .foreachBatch(fn2)
+        .start()
+
+      sessionHolder.streamingForeachBatchRunnerCleanerCache

Review Comment:
   Add comment about why we are accessing internal cleaner cache here.
   It wouldn't be clear to me if you hadn't explain in VC. Refer to how this this mimicking Python foreachBatch API from Scala.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #42779: [SPARK-45056][PYTHON][SS][CONNECT] Termination tests for streamingQueryListener and foreachBatch

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #42779:
URL: https://github.com/apache/spark/pull/42779#issuecomment-1720305034

   @WweiL mind creating a followup to fix them?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on pull request #42779: [SPARK-45056][PYTHON][SS][CONNECT] Termination tests for streamingQueryListener and foreachBatch

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on PR #42779:
URL: https://github.com/apache/spark/pull/42779#issuecomment-1715000070

   Now it fails with `No module named 'grpc_status'`, let me search through how to resolve this


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] yaooqinn commented on a diff in pull request #42779: [SPARK-45056][PYTHON][SS][CONNECT] Termination tests for streamingQueryListener and foreachBatch

Posted by "yaooqinn (via GitHub)" <gi...@apache.org>.
yaooqinn commented on code in PR #42779:
URL: https://github.com/apache/spark/pull/42779#discussion_r1325554786


##########
connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHodlerSuite.scala:
##########
@@ -79,4 +92,196 @@ class SparkConnectSessionHolderSuite extends SharedSparkSession {
       sessionHolder.getDataFrameOrThrow(key1)
     }
   }
+
+  private def streamingForeachBatchFunction(pysparkPythonPath: String): Array[Byte] = {
+    var binaryFunc: Array[Byte] = null
+    withTempPath { path =>
+      Process(
+        Seq(
+          IntegratedUDFTestUtils.pythonExec,
+          "-c",
+          "from pyspark.serializers import CloudPickleSerializer; " +
+            s"f = open('$path', 'wb');" +
+            "f.write(CloudPickleSerializer().dumps((" +
+            "lambda df, batchId: batchId)))"),
+        None,
+        "PYTHONPATH" -> pysparkPythonPath).!!
+      binaryFunc = Files.readAllBytes(path.toPath)
+    }
+    assert(binaryFunc != null)
+    binaryFunc
+  }
+
+  private def streamingQueryListenerFunction(pysparkPythonPath: String): Array[Byte] = {
+    var binaryFunc: Array[Byte] = null
+    val pythonScript =
+      """
+        |from pyspark.sql.streaming.listener import StreamingQueryListener
+        |
+        |class MyListener(StreamingQueryListener):
+        |    def onQueryStarted(e):
+        |        pass
+        |
+        |    def onQueryIdle(e):
+        |        pass
+        |
+        |    def onQueryProgress(e):
+        |        pass
+        |
+        |    def onQueryTerminated(e):
+        |        pass
+        |
+        |listener = MyListener()
+      """.stripMargin
+    withTempPath { codePath =>
+      Files.write(codePath.toPath, pythonScript.getBytes(StandardCharsets.UTF_8))
+      withTempPath { path =>
+        Process(
+          Seq(
+            IntegratedUDFTestUtils.pythonExec,
+            "-c",
+            "from pyspark.serializers import CloudPickleSerializer; " +
+              s"f = open('$path', 'wb');" +
+              s"exec(open('$codePath', 'r').read());" +
+              "f.write(CloudPickleSerializer().dumps(listener))"),
+          None,
+          "PYTHONPATH" -> pysparkPythonPath).!!
+        binaryFunc = Files.readAllBytes(path.toPath)
+      }
+    }
+    assert(binaryFunc != null)
+    binaryFunc
+  }
+
+  private def dummyPythonFunction(sessionHolder: SessionHolder)(
+      fcn: String => Array[Byte]): SimplePythonFunction = {
+    val sparkPythonPath =
+      s"${IntegratedUDFTestUtils.pysparkPythonPath}:${IntegratedUDFTestUtils.pythonPath}"
+
+    SimplePythonFunction(
+      command = fcn(sparkPythonPath),
+      envVars = mutable.Map("PYTHONPATH" -> sparkPythonPath).asJava,
+      pythonIncludes = sessionHolder.artifactManager.getSparkConnectPythonIncludes.asJava,
+      pythonExec = IntegratedUDFTestUtils.pythonExec,
+      pythonVer = IntegratedUDFTestUtils.pythonVer,
+      broadcastVars = Lists.newArrayList(),
+      accumulator = null)
+  }
+
+  test("python foreachBatch process: process terminates after query is stopped") {
+    // scalastyle:off assume
+    assume(IntegratedUDFTestUtils.shouldTestPythonUDFs)
+    // scalastyle:on assume
+
+    val sessionHolder = SessionHolder.forTesting(spark)
+    try {
+      SparkConnectService.start(spark.sparkContext)
+
+      val pythonFn = dummyPythonFunction(sessionHolder)(streamingForeachBatchFunction)
+      val (fn1, cleaner1) =
+        StreamingForeachBatchHelper.pythonForeachBatchWrapper(pythonFn, sessionHolder)
+      val (fn2, cleaner2) =
+        StreamingForeachBatchHelper.pythonForeachBatchWrapper(pythonFn, sessionHolder)
+
+      val query1 = spark.readStream
+        .format("rate")
+        .load()
+        .writeStream
+        .format("memory")
+        .queryName("foreachBatch_termination_test_q1")
+        .foreachBatch(fn1)
+        .start()
+
+      val query2 = spark.readStream
+        .format("rate")
+        .load()
+        .writeStream
+        .format("memory")
+        .queryName("foreachBatch_termination_test_q2")
+        .foreachBatch(fn2)
+        .start()
+
+      sessionHolder.streamingForeachBatchRunnerCleanerCache
+        .registerCleanerForQuery(query1, cleaner1)
+      sessionHolder.streamingForeachBatchRunnerCleanerCache
+        .registerCleanerForQuery(query2, cleaner2)
+
+      val (runner1, runner2) = (cleaner1.runner, cleaner2.runner)
+
+      // assert both python processes are running
+      assert(!runner1.isWorkerStopped().get)
+      assert(!runner2.isWorkerStopped().get)
+      // stop query1
+      query1.stop()
+      // assert query1's python process is not running
+      eventually(timeout(30.seconds)) {
+        assert(runner1.isWorkerStopped().get)
+        assert(!runner2.isWorkerStopped().get)
+      }
+
+      // stop query2
+      query2.stop()
+      eventually(timeout(30.seconds)) {
+        // assert query2's python process is not running
+        assert(runner2.isWorkerStopped().get)
+      }
+
+      assert(spark.streams.active.isEmpty) // no running query
+      assert(spark.streams.listListeners().length == 1) // only process termination listener
+    } finally {
+      SparkConnectService.stop()
+      // remove process termination listener
+      spark.streams.removeListener(spark.streams.listListeners()(0))
+    }
+  }
+
+  test("python listener process: process terminates after listener is removed") {
+    // scalastyle:off assume
+    assume(IntegratedUDFTestUtils.shouldTestPythonUDFs)
+    // scalastyle:on assume
+
+    val sessionHolder = SessionHolder.forTesting(spark)
+    try {
+      SparkConnectService.start(spark.sparkContext)
+
+      val pythonFn = dummyPythonFunction(sessionHolder)(streamingQueryListenerFunction)
+
+      val id1 = "listener_removeListener_test_1"
+      val id2 = "listener_removeListener_test_2"
+      val listener1 = new PythonStreamingQueryListener(pythonFn, sessionHolder)

Review Comment:
   this test also crushed at this line
   ```
   [info] - python listener process: process terminates after listener is removed *** FAILED *** (427 milliseconds)
   [info]   java.io.EOFException:
   [info]   at java.io.DataInputStream.readInt(DataInputStream.java:392)
   [info]   at org.apache.spark.api.python.StreamingPythonRunner.init(StreamingPythonRunner.scala:101)
   [info]   at org.apache.spark.sql.connect.planner.PythonStreamingQueryListener.<init>(StreamingQueryListenerHelper.scala:41)
   [info]   at org.apache.spark.sql.connect.service.SparkConnectSessionHolderSuite.$anonfun$new$11(SparkConnectSessionHodlerSuite.scala:251)
   [info]   at org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
   [info]   at org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282)
   [info]   at org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)
   [info]   at org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #42779: [SPARK-45056][PYTHON][SS][CONNECT] Termination tests for streamingQueryListener and foreachBatch

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #42779:
URL: https://github.com/apache/spark/pull/42779#discussion_r1328447379


##########
connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHodlerSuite.scala:
##########
@@ -79,4 +92,196 @@ class SparkConnectSessionHolderSuite extends SharedSparkSession {
       sessionHolder.getDataFrameOrThrow(key1)
     }
   }
+
+  private def streamingForeachBatchFunction(pysparkPythonPath: String): Array[Byte] = {
+    var binaryFunc: Array[Byte] = null
+    withTempPath { path =>
+      Process(
+        Seq(
+          IntegratedUDFTestUtils.pythonExec,
+          "-c",
+          "from pyspark.serializers import CloudPickleSerializer; " +
+            s"f = open('$path', 'wb');" +
+            "f.write(CloudPickleSerializer().dumps((" +
+            "lambda df, batchId: batchId)))"),
+        None,
+        "PYTHONPATH" -> pysparkPythonPath).!!
+      binaryFunc = Files.readAllBytes(path.toPath)
+    }
+    assert(binaryFunc != null)
+    binaryFunc
+  }
+
+  private def streamingQueryListenerFunction(pysparkPythonPath: String): Array[Byte] = {
+    var binaryFunc: Array[Byte] = null
+    val pythonScript =
+      """
+        |from pyspark.sql.streaming.listener import StreamingQueryListener
+        |
+        |class MyListener(StreamingQueryListener):
+        |    def onQueryStarted(e):
+        |        pass
+        |
+        |    def onQueryIdle(e):
+        |        pass
+        |
+        |    def onQueryProgress(e):
+        |        pass
+        |
+        |    def onQueryTerminated(e):
+        |        pass
+        |
+        |listener = MyListener()
+      """.stripMargin
+    withTempPath { codePath =>
+      Files.write(codePath.toPath, pythonScript.getBytes(StandardCharsets.UTF_8))
+      withTempPath { path =>
+        Process(
+          Seq(
+            IntegratedUDFTestUtils.pythonExec,
+            "-c",
+            "from pyspark.serializers import CloudPickleSerializer; " +
+              s"f = open('$path', 'wb');" +
+              s"exec(open('$codePath', 'r').read());" +
+              "f.write(CloudPickleSerializer().dumps(listener))"),
+          None,
+          "PYTHONPATH" -> pysparkPythonPath).!!
+        binaryFunc = Files.readAllBytes(path.toPath)
+      }
+    }
+    assert(binaryFunc != null)
+    binaryFunc
+  }
+
+  private def dummyPythonFunction(sessionHolder: SessionHolder)(
+      fcn: String => Array[Byte]): SimplePythonFunction = {
+    val sparkPythonPath =
+      s"${IntegratedUDFTestUtils.pysparkPythonPath}:${IntegratedUDFTestUtils.pythonPath}"
+
+    SimplePythonFunction(
+      command = fcn(sparkPythonPath),
+      envVars = mutable.Map("PYTHONPATH" -> sparkPythonPath).asJava,
+      pythonIncludes = sessionHolder.artifactManager.getSparkConnectPythonIncludes.asJava,
+      pythonExec = IntegratedUDFTestUtils.pythonExec,
+      pythonVer = IntegratedUDFTestUtils.pythonVer,
+      broadcastVars = Lists.newArrayList(),
+      accumulator = null)
+  }
+
+  test("python foreachBatch process: process terminates after query is stopped") {
+    // scalastyle:off assume
+    assume(IntegratedUDFTestUtils.shouldTestPythonUDFs)
+    // scalastyle:on assume
+
+    val sessionHolder = SessionHolder.forTesting(spark)
+    try {
+      SparkConnectService.start(spark.sparkContext)
+
+      val pythonFn = dummyPythonFunction(sessionHolder)(streamingForeachBatchFunction)
+      val (fn1, cleaner1) =
+        StreamingForeachBatchHelper.pythonForeachBatchWrapper(pythonFn, sessionHolder)
+      val (fn2, cleaner2) =
+        StreamingForeachBatchHelper.pythonForeachBatchWrapper(pythonFn, sessionHolder)
+
+      val query1 = spark.readStream
+        .format("rate")
+        .load()
+        .writeStream
+        .format("memory")
+        .queryName("foreachBatch_termination_test_q1")
+        .foreachBatch(fn1)
+        .start()
+
+      val query2 = spark.readStream
+        .format("rate")
+        .load()
+        .writeStream
+        .format("memory")
+        .queryName("foreachBatch_termination_test_q2")
+        .foreachBatch(fn2)
+        .start()
+
+      sessionHolder.streamingForeachBatchRunnerCleanerCache
+        .registerCleanerForQuery(query1, cleaner1)
+      sessionHolder.streamingForeachBatchRunnerCleanerCache
+        .registerCleanerForQuery(query2, cleaner2)
+
+      val (runner1, runner2) = (cleaner1.runner, cleaner2.runner)
+
+      // assert both python processes are running
+      assert(!runner1.isWorkerStopped().get)
+      assert(!runner2.isWorkerStopped().get)
+      // stop query1
+      query1.stop()
+      // assert query1's python process is not running
+      eventually(timeout(30.seconds)) {
+        assert(runner1.isWorkerStopped().get)
+        assert(!runner2.isWorkerStopped().get)
+      }
+
+      // stop query2
+      query2.stop()
+      eventually(timeout(30.seconds)) {
+        // assert query2's python process is not running
+        assert(runner2.isWorkerStopped().get)
+      }
+
+      assert(spark.streams.active.isEmpty) // no running query
+      assert(spark.streams.listListeners().length == 1) // only process termination listener
+    } finally {
+      SparkConnectService.stop()
+      // remove process termination listener
+      spark.streams.removeListener(spark.streams.listListeners()(0))
+    }
+  }
+
+  test("python listener process: process terminates after listener is removed") {

Review Comment:
   @WweiL The newly added test case caused Maven's daily testing to fail. Could you help fix it? Thanks
   
   https://github.com/apache/spark/actions/runs/6213746551/job/16865056986
   
   ```
   ImportError: Pandas >= 1.4.4 must be installed; however, it was not found.
   - python listener process: process terminates after listener is removed *** FAILED ***
     java.io.EOFException:
     at java.io.DataInputStream.readInt(DataInputStream.java:392)
     at org.apache.spark.api.python.StreamingPythonRunner.init(StreamingPythonRunner.scala:101)
     at org.apache.spark.sql.connect.planner.PythonStreamingQueryListener.<init>(StreamingQueryListenerHelper.scala:41)
     at org.apache.spark.sql.connect.service.SparkConnectSessionHolderSuite.$anonfun$new$12(SparkConnectSessionHodlerSuite.scala:251)
     at org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
     at org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282)
     at org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)
     at org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230)
     at org.apache.spark.SparkFunSuite.failAfter(SparkFunSuite.scala:69)
     at org.apache.spark.SparkFunSuite.$anonfun$test$2(SparkFunSuite.scala:155)
     ...
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #42779: [SPARK-45056][PYTHON][SS][CONNECT] Termination tests for streamingQueryListener and foreachBatch

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #42779:
URL: https://github.com/apache/spark/pull/42779#discussion_r1328475255


##########
connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHodlerSuite.scala:
##########
@@ -79,4 +92,196 @@ class SparkConnectSessionHolderSuite extends SharedSparkSession {
       sessionHolder.getDataFrameOrThrow(key1)
     }
   }
+
+  private def streamingForeachBatchFunction(pysparkPythonPath: String): Array[Byte] = {
+    var binaryFunc: Array[Byte] = null
+    withTempPath { path =>
+      Process(
+        Seq(
+          IntegratedUDFTestUtils.pythonExec,
+          "-c",
+          "from pyspark.serializers import CloudPickleSerializer; " +
+            s"f = open('$path', 'wb');" +
+            "f.write(CloudPickleSerializer().dumps((" +
+            "lambda df, batchId: batchId)))"),
+        None,
+        "PYTHONPATH" -> pysparkPythonPath).!!
+      binaryFunc = Files.readAllBytes(path.toPath)
+    }
+    assert(binaryFunc != null)
+    binaryFunc
+  }
+
+  private def streamingQueryListenerFunction(pysparkPythonPath: String): Array[Byte] = {
+    var binaryFunc: Array[Byte] = null
+    val pythonScript =
+      """
+        |from pyspark.sql.streaming.listener import StreamingQueryListener
+        |
+        |class MyListener(StreamingQueryListener):
+        |    def onQueryStarted(e):
+        |        pass
+        |
+        |    def onQueryIdle(e):
+        |        pass
+        |
+        |    def onQueryProgress(e):
+        |        pass
+        |
+        |    def onQueryTerminated(e):
+        |        pass
+        |
+        |listener = MyListener()
+      """.stripMargin
+    withTempPath { codePath =>
+      Files.write(codePath.toPath, pythonScript.getBytes(StandardCharsets.UTF_8))
+      withTempPath { path =>
+        Process(
+          Seq(
+            IntegratedUDFTestUtils.pythonExec,
+            "-c",
+            "from pyspark.serializers import CloudPickleSerializer; " +
+              s"f = open('$path', 'wb');" +
+              s"exec(open('$codePath', 'r').read());" +
+              "f.write(CloudPickleSerializer().dumps(listener))"),
+          None,
+          "PYTHONPATH" -> pysparkPythonPath).!!
+        binaryFunc = Files.readAllBytes(path.toPath)
+      }
+    }
+    assert(binaryFunc != null)
+    binaryFunc
+  }
+
+  private def dummyPythonFunction(sessionHolder: SessionHolder)(
+      fcn: String => Array[Byte]): SimplePythonFunction = {
+    val sparkPythonPath =
+      s"${IntegratedUDFTestUtils.pysparkPythonPath}:${IntegratedUDFTestUtils.pythonPath}"
+
+    SimplePythonFunction(
+      command = fcn(sparkPythonPath),
+      envVars = mutable.Map("PYTHONPATH" -> sparkPythonPath).asJava,
+      pythonIncludes = sessionHolder.artifactManager.getSparkConnectPythonIncludes.asJava,
+      pythonExec = IntegratedUDFTestUtils.pythonExec,
+      pythonVer = IntegratedUDFTestUtils.pythonVer,
+      broadcastVars = Lists.newArrayList(),
+      accumulator = null)
+  }
+
+  test("python foreachBatch process: process terminates after query is stopped") {
+    // scalastyle:off assume
+    assume(IntegratedUDFTestUtils.shouldTestPythonUDFs)
+    // scalastyle:on assume
+
+    val sessionHolder = SessionHolder.forTesting(spark)
+    try {
+      SparkConnectService.start(spark.sparkContext)
+
+      val pythonFn = dummyPythonFunction(sessionHolder)(streamingForeachBatchFunction)
+      val (fn1, cleaner1) =
+        StreamingForeachBatchHelper.pythonForeachBatchWrapper(pythonFn, sessionHolder)
+      val (fn2, cleaner2) =
+        StreamingForeachBatchHelper.pythonForeachBatchWrapper(pythonFn, sessionHolder)
+
+      val query1 = spark.readStream
+        .format("rate")
+        .load()
+        .writeStream
+        .format("memory")
+        .queryName("foreachBatch_termination_test_q1")
+        .foreachBatch(fn1)
+        .start()
+
+      val query2 = spark.readStream
+        .format("rate")
+        .load()
+        .writeStream
+        .format("memory")
+        .queryName("foreachBatch_termination_test_q2")
+        .foreachBatch(fn2)
+        .start()
+
+      sessionHolder.streamingForeachBatchRunnerCleanerCache
+        .registerCleanerForQuery(query1, cleaner1)
+      sessionHolder.streamingForeachBatchRunnerCleanerCache
+        .registerCleanerForQuery(query2, cleaner2)
+
+      val (runner1, runner2) = (cleaner1.runner, cleaner2.runner)
+
+      // assert both python processes are running
+      assert(!runner1.isWorkerStopped().get)
+      assert(!runner2.isWorkerStopped().get)
+      // stop query1
+      query1.stop()
+      // assert query1's python process is not running
+      eventually(timeout(30.seconds)) {
+        assert(runner1.isWorkerStopped().get)
+        assert(!runner2.isWorkerStopped().get)
+      }
+
+      // stop query2
+      query2.stop()
+      eventually(timeout(30.seconds)) {
+        // assert query2's python process is not running
+        assert(runner2.isWorkerStopped().get)
+      }
+
+      assert(spark.streams.active.isEmpty) // no running query
+      assert(spark.streams.listListeners().length == 1) // only process termination listener
+    } finally {
+      SparkConnectService.stop()
+      // remove process termination listener
+      spark.streams.removeListener(spark.streams.listListeners()(0))
+    }
+  }
+
+  test("python listener process: process terminates after listener is removed") {

Review Comment:
   change the assume condition in https://github.com/apache/spark/pull/42976. Afterwards, we should install Python test dependencies for Maven's daily test
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org