You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "utkarsh39 (via GitHub)" <gi...@apache.org> on 2023/08/08 01:31:08 UTC

[GitHub] [spark] utkarsh39 opened a new pull request, #42385: [SPARK-44705] Make PythonRunner single-threaded

utkarsh39 opened a new pull request, #42385:
URL: https://github.com/apache/spark/pull/42385

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   PythonRunner, a utility that executes Python UDFs in Spark, uses two threads in a producer-consumer model today. This multi-threading model is problematic and confusing as Spark's execution model within a task is commonly understood to be single-threaded.
   More importantly, this departure of a double-threaded execution resulted in a series of customer issues involving [race conditions](https://issues.apache.org/jira/browse/SPARK-33277) and [deadlocks](https://issues.apache.org/jira/browse/SPARK-38677) between threads as the code was hard to reason about. There have been multiple attempts to reign in these issues, viz., [fix 1](https://issues.apache.org/jira/browse/SPARK-22535), [fix 2](https://github.com/apache/spark/pull/30177), [fix 3](https://github.com/apache/spark/commit/243c321db2f02f6b4d926114bd37a6e74c2be185). Moreover, the fixes have made the code base somewhat abstruse by introducing multiple daemon [monitor threads](https://github.com/apache/spark/blob/a3a32912be04d3760cb34eb4b79d6d481bbec502/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L579) to detect deadlocks. This PR makes PythonRunner single-threaded making it easier to reason about and improving code health.
   
   #### Current Execution Model in Spark for Python UDFs
   For queries containing Python UDFs, the main Java task thread spins up a new writer thread to pipe data from the child Spark plan into the Python worker evaluating the UDF. The writer thread runs in a tight loop: evaluates the child Spark plan, and feeds the resulting output to the Python worker. The main task thread simultaneously consumes the Python UDF’s output and evaluates the parent Spark plan to produce the final result.
   The I/O to/from the Python worker uses blocking Java Sockets necessitating the use of two threads, one responsible for input to the Python worker and the other for output. Without two threads, it is easy to run into a deadlock. For example, the task can block forever waiting for the output from the Python worker. The output will never arrive until the input is supplied to the Python worker, which is not possible as the task thread is blocked while waiting on output.
   
   #### Proposed Fix
   
   The proposed fix is to move to the standard single-threaded execution model within a task, i.e., to do away with the writer thread. In addition to mitigating the crashes, the fix reduces the complexity of the existing code by doing away with many safety checks in place to track deadlocks in the double-threaded execution model.
   
   In the new model, the main task thread alternates between consuming/feeding data to the Python worker using asynchronous I/O through Java’s [SocketChannel](https://docs.oracle.com/javase/7/docs/api/java/nio/channels/SocketChannel.html). See the `read()` method in the code below for approximately how this is achieved.
   
   
   ```
   case class PythonUDFRunner {
   
     private var nextRow: Row = _
     private var endOfStream = false
     private var childHasNext = true
     private var buffer: ByteBuffer = _
   
     def hasNext(): Boolean = nextRow != null || {
        if (!endOfStream) {
          read(buffer)
          nextRow = deserialize(buffer)
          hasNext
        } else {
          false
        }
     }
   
     def next(): Row = {
        if (hasNext) {
          val outputRow = nextRow
          nextRow = null
          outputRow
        } else {
          null
        }
     }
    
     def read(buf: Array[Byte]): Row = {
       var n = 0
       while (n == 0) {
       // Alternate between reading/writing to the Python worker using async I/O
       if (pythonWorker.isReadable) {
         n = pythonWorker.read(buf)
       }
       if (pythonWorker.isWritable) {
         consumeChildPlanAndWriteDataToPythonWorker()
       }
     }
     
     def consumeChildPlanAndWriteDataToPythonWorker(): Unit = {
         // Tracks whether the connection to the Python worker can be written to. 
         var socketAcceptsInput = true
         while (socketAcceptsInput && (childHasNext || buffer.hasRemaining)) {
           if (!buffer.hasRemaining && childHasNext) {
             // Consume data from the child and buffer it.
             writeToBuffer(childPlan.next(), buffer)
             childHasNext = childPlan.hasNext()
             if (!childHasNext) {
               // Exhausted child plan’s output. Write a keyword to the Python worker signaling the end of data input.
               writeToBuffer(endOfStream)
             }
           }
           // Try to write as much buffered data as possible to the Python worker.
           while (buffer.hasRemaining && socketAcceptsInput) {
             val n = writeToPythonWorker(buffer)
             // `writeToPythonWorker()` returns 0 when the socket cannot accept more data right now.
             socketAcceptsInput = n > 0
           }
         }
       }
   }
   
   ```
   ### Why are the changes needed?
   This PR makes PythonRunner single-threaded making it easier to reason about and improving code health.
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   Existing tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on pull request #42385: [SPARK-44705][PYTHON] Make PythonRunner single-threaded

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on PR #42385:
URL: https://github.com/apache/spark/pull/42385#issuecomment-1676854237

   @utkarsh39 
   
   I found that this PR may caused some PySpark test cases to fail in the Java 17 daily tests(pyspark-sql and pyspark-connect module):
   
   - https://github.com/apache/spark/actions/runs/5837423492
   - https://github.com/apache/spark/actions/runs/5843658110
   - https://github.com/apache/spark/actions/runs/5849761680
   
   <img width="1157" alt="image" src="https://github.com/apache/spark/assets/1475305/bcab0032-5d96-4596-9f03-0aa364f91574">
   
   
   To verify this , I conducted some local testing using Java 17
   
   ```
   java -version
   openjdk version "17.0.8" 2023-07-18 LTS
   OpenJDK Runtime Environment Zulu17.44+15-CA (build 17.0.8+7-LTS)
   OpenJDK 64-Bit Server VM Zulu17.44+15-CA (build 17.0.8+7-LTS, mixed mode, sharing)
   ```
   
   1. Revert to the previous PR before SPARK-44705 and run the following commands:
   
   
   ```
   // [SPARK-44765][CONNECT] Simplify retries of ReleaseExecute
   git reset --hard 9bde882fcb39e9fedced0df9702df2a36c1a84e6
   export SKIP_UNIDOC=true
   export SKIP_MIMA=true
   export SKIP_PACKAGING=true
   ./dev/run-tests --parallelism 1 --modules "pyspark-sql"
   ```
   
   ```
   Finished test(python3.9): pyspark.sql.tests.test_udtf (57s) ... 2 tests were skipped
   Tests passed in 59 seconds
   ```
   
   The tests in `pyspark.sql.tests.test_udtf` passed.
   
   
   2. Revert to SPARK-44705 and run the following commands:
   
   ```
   // [SPARK-44705][PYTHON] Make PythonRunner single-threaded
   git reset --hard 8aaff55839493e80e3ce376f928c04aa8f31d18c
   export SKIP_UNIDOC=true
   export SKIP_MIMA=true
   export SKIP_PACKAGING=true
   ./dev/run-tests --parallelism 1 --modules "pyspark-sql"
   ```
   
   
   ```
   ======================================================================
   FAIL: test_udtf_with_analyze_table_argument_adding_columns (pyspark.sql.tests.test_udtf.UDTFTests)
   ----------------------------------------------------------------------
   Traceback (most recent call last):
     File "/Users/yangjie01/SourceCode/git/spark-mine-sbt/python/pyspark/sql/tests/test_udtf.py", line 1340, in test_udtf_with_analyze_table_argument_adding_columns
       assertSchemaEqual(
     File "/Users/yangjie01/SourceCode/git/spark-mine-sbt/python/pyspark/testing/utils.py", line 356, in assertSchemaEqual
       raise PySparkAssertionError(
   pyspark.errors.exceptions.base.PySparkAssertionError: [DIFFERENT_SCHEMA] Schemas do not match.
   --- actual
   +++ expected
   - StructType([StructField('a', LongType(), True)])
   + StructType([StructField('id', LongType(), False), StructField('is_even', BooleanType(), True)])
   
   ======================================================================
   FAIL: test_udtf_with_analyze_table_argument_repeating_rows (pyspark.sql.tests.test_udtf.UDTFTests) (query_no=0)
   ----------------------------------------------------------------------
   Traceback (most recent call last):
     File "/Users/yangjie01/SourceCode/git/spark-mine-sbt/python/pyspark/sql/tests/test_udtf.py", line 1394, in test_udtf_with_analyze_table_argument_repeating_rows
       assertSchemaEqual(df.schema, expected_schema)
     File "/Users/yangjie01/SourceCode/git/spark-mine-sbt/python/pyspark/testing/utils.py", line 356, in assertSchemaEqual
       raise PySparkAssertionError(
   pyspark.errors.exceptions.base.PySparkAssertionError: [DIFFERENT_SCHEMA] Schemas do not match.
   --- actual
   +++ expected
   - StructType([StructField('id', LongType(), False), StructField('is_even', BooleanType(), True)])
   + StructType([StructField('id', LongType(), False)])
   
   ======================================================================
   FAIL: test_udtf_with_analyze_table_argument_repeating_rows (pyspark.sql.tests.test_udtf.UDTFTests)
   ----------------------------------------------------------------------
   Traceback (most recent call last):
     File "/Users/yangjie01/SourceCode/git/spark-mine-sbt/python/pyspark/sql/tests/test_udtf.py", line 1400, in test_udtf_with_analyze_table_argument_repeating_rows
       self.spark.sql(
   AssertionError: AnalysisException not raised
   
   ======================================================================
   FAIL: test_udtf_with_analyze_using_accumulator (pyspark.sql.tests.test_udtf.UDTFTests) (query_no=0)
   ----------------------------------------------------------------------
   Traceback (most recent call last):
     File "/Users/yangjie01/SourceCode/git/spark-mine-sbt/python/pyspark/sql/tests/test_udtf.py", line 1625, in test_udtf_with_analyze_using_accumulator
       assertSchemaEqual(df.schema, StructType().add("col1", IntegerType()))
     File "/Users/yangjie01/SourceCode/git/spark-mine-sbt/python/pyspark/testing/utils.py", line 356, in assertSchemaEqual
       raise PySparkAssertionError(
   pyspark.errors.exceptions.base.PySparkAssertionError: [DIFFERENT_SCHEMA] Schemas do not match.
   --- actual
   +++ expected
   - StructType([StructField('a', IntegerType(), True), StructField('b', IntegerType(), True)])
   + StructType([StructField('col1', IntegerType(), True)])
   
   ======================================================================
   FAIL: test_udtf_with_analyze_using_accumulator (pyspark.sql.tests.test_udtf.UDTFTests)
   ----------------------------------------------------------------------
   Traceback (most recent call last):
     File "/Users/yangjie01/SourceCode/git/spark-mine-sbt/python/pyspark/sql/tests/test_udtf.py", line 1628, in test_udtf_with_analyze_using_accumulator
       self.assertEqual(test_accum.value, 222)
   AssertionError: 111 != 222
   
   ----------------------------------------------------------------------
   Ran 174 tests in 54.619s
   
   FAILED (failures=34, errors=6, skipped=2)
   ```
   
   There are 34 test failures after this one merged.
   
   @utkarsh39 Do you have time to fix these test cases?  For this, I have created SPARK-44797.
   
   Or should we revert this PR to restore the Java 17 daily tests first? @HyukjinKwon @ueshin @dongjoon-hyun 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ueshin commented on a diff in pull request #42385: [SPARK-44705][PYTHON] Make PythonRunner single-threaded

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on code in PR #42385:
URL: https://github.com/apache/spark/pull/42385#discussion_r1287639626


##########
core/src/main/scala/org/apache/spark/ContextAwareIterator.scala:
##########
@@ -30,8 +30,10 @@ import org.apache.spark.annotation.DeveloperApi
  * Thus, we should use [[ContextAwareIterator]] to stop consuming after the task ends.
  *
  * @since 3.1.0
+ * @deprecated since 3.5.0 as its only usage for Python evaluation is now extinct
  */
 @DeveloperApi
+@deprecated("Only usage for Python evaluation is now extinct", "3.5.0")

Review Comment:
   The deprecated version should be `4.0.0`? cc @HyukjinKwon 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on pull request #42385: [SPARK-44705][PYTHON] Make PythonRunner single-threaded

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on PR #42385:
URL: https://github.com/apache/spark/pull/42385#issuecomment-1677616117

   > I merged #42422. Let's see the next daily tests. Thanks.
   
   Thanks ~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ueshin commented on a diff in pull request #42385: [SPARK-44705][PYTHON] Make PythonRunner single-threaded

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on code in PR #42385:
URL: https://github.com/apache/spark/pull/42385#discussion_r1293994187


##########
core/src/main/scala/org/apache/spark/ContextAwareIterator.scala:
##########
@@ -30,8 +30,10 @@ import org.apache.spark.annotation.DeveloperApi
  * Thus, we should use [[ContextAwareIterator]] to stop consuming after the task ends.
  *
  * @since 3.1.0
+ * @deprecated since 4.0.0 as its only usage for Python evaluation is now extinct
  */
 @DeveloperApi
+@deprecated("Only usage for Python evaluation is now extinct", "3.5.0")

Review Comment:
   @utkarsh39 This should be `4.0.0`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on pull request #42385: [SPARK-44705][PYTHON] Make PythonRunner single-threaded

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on PR #42385:
URL: https://github.com/apache/spark/pull/42385#issuecomment-1678345983

   > @LuciferYang The error is different from the previous one that seems to be fixed. Is it possible to rerun the test? I guess it's just flaky.
   
   re-run the failed ones, there are 26 `[Errno 111] Connection refused`, not sure this is flaky
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] utkarsh39 commented on pull request #42385: [SPARK-44705][PYTHON] Make PythonRunner single-threaded

Posted by "utkarsh39 (via GitHub)" <gi...@apache.org>.
utkarsh39 commented on PR #42385:
URL: https://github.com/apache/spark/pull/42385#issuecomment-1677405782

   > @utkarsh39
   > 
   > I found that this PR may caused some PySpark test cases to fail in the Java 17 daily tests(pyspark-sql and pyspark-connect module):
   > 
   > * https://github.com/apache/spark/actions/runs/5837423492
   > * https://github.com/apache/spark/actions/runs/5843658110
   > * https://github.com/apache/spark/actions/runs/5849761680
   > 
   > <img alt="image" width="1157" src="https://user-images.githubusercontent.com/1475305/260390648-bcab0032-5d96-4596-9f03-0aa364f91574.png">
   > To verify this , I conducted some local testing using Java 17
   > 
   > ```
   > java -version
   > openjdk version "17.0.8" 2023-07-18 LTS
   > OpenJDK Runtime Environment Zulu17.44+15-CA (build 17.0.8+7-LTS)
   > OpenJDK 64-Bit Server VM Zulu17.44+15-CA (build 17.0.8+7-LTS, mixed mode, sharing)
   > ```
   > 
   > 1. Revert to the previous PR before [SPARK-44705](https://issues.apache.org/jira/browse/SPARK-44705) and run the following commands:
   > 
   > ```
   > // [SPARK-44765][CONNECT] Simplify retries of ReleaseExecute
   > git reset --hard 9bde882fcb39e9fedced0df9702df2a36c1a84e6
   > export SKIP_UNIDOC=true
   > export SKIP_MIMA=true
   > export SKIP_PACKAGING=true
   > ./dev/run-tests --parallelism 1 --modules "pyspark-sql"
   > ```
   > 
   > ```
   > Finished test(python3.9): pyspark.sql.tests.test_udtf (57s) ... 2 tests were skipped
   > ```
   > 
   > The tests in `pyspark.sql.tests.test_udtf` passed.
   > 
   > 2. Revert to [SPARK-44705](https://issues.apache.org/jira/browse/SPARK-44705) and run the following commands:
   > 
   > ```
   > // [SPARK-44705][PYTHON] Make PythonRunner single-threaded
   > git reset --hard 8aaff55839493e80e3ce376f928c04aa8f31d18c
   > export SKIP_UNIDOC=true
   > export SKIP_MIMA=true
   > export SKIP_PACKAGING=true
   > ./dev/run-tests --parallelism 1 --modules "pyspark-sql"
   > ```
   > 
   > ```
   > ======================================================================
   > FAIL: test_udtf_with_analyze_table_argument_adding_columns (pyspark.sql.tests.test_udtf.UDTFTests)
   > ----------------------------------------------------------------------
   > Traceback (most recent call last):
   >   File "/Users/yangjie01/SourceCode/git/spark-mine-sbt/python/pyspark/sql/tests/test_udtf.py", line 1340, in test_udtf_with_analyze_table_argument_adding_columns
   >     assertSchemaEqual(
   >   File "/Users/yangjie01/SourceCode/git/spark-mine-sbt/python/pyspark/testing/utils.py", line 356, in assertSchemaEqual
   >     raise PySparkAssertionError(
   > pyspark.errors.exceptions.base.PySparkAssertionError: [DIFFERENT_SCHEMA] Schemas do not match.
   > --- actual
   > +++ expected
   > - StructType([StructField('a', LongType(), True)])
   > + StructType([StructField('id', LongType(), False), StructField('is_even', BooleanType(), True)])
   > 
   > ======================================================================
   > FAIL: test_udtf_with_analyze_table_argument_repeating_rows (pyspark.sql.tests.test_udtf.UDTFTests) (query_no=0)
   > ----------------------------------------------------------------------
   > Traceback (most recent call last):
   >   File "/Users/yangjie01/SourceCode/git/spark-mine-sbt/python/pyspark/sql/tests/test_udtf.py", line 1394, in test_udtf_with_analyze_table_argument_repeating_rows
   >     assertSchemaEqual(df.schema, expected_schema)
   >   File "/Users/yangjie01/SourceCode/git/spark-mine-sbt/python/pyspark/testing/utils.py", line 356, in assertSchemaEqual
   >     raise PySparkAssertionError(
   > pyspark.errors.exceptions.base.PySparkAssertionError: [DIFFERENT_SCHEMA] Schemas do not match.
   > --- actual
   > +++ expected
   > - StructType([StructField('id', LongType(), False), StructField('is_even', BooleanType(), True)])
   > + StructType([StructField('id', LongType(), False)])
   > 
   > ======================================================================
   > FAIL: test_udtf_with_analyze_table_argument_repeating_rows (pyspark.sql.tests.test_udtf.UDTFTests)
   > ----------------------------------------------------------------------
   > Traceback (most recent call last):
   >   File "/Users/yangjie01/SourceCode/git/spark-mine-sbt/python/pyspark/sql/tests/test_udtf.py", line 1400, in test_udtf_with_analyze_table_argument_repeating_rows
   >     self.spark.sql(
   > AssertionError: AnalysisException not raised
   > 
   > ======================================================================
   > FAIL: test_udtf_with_analyze_using_accumulator (pyspark.sql.tests.test_udtf.UDTFTests) (query_no=0)
   > ----------------------------------------------------------------------
   > Traceback (most recent call last):
   >   File "/Users/yangjie01/SourceCode/git/spark-mine-sbt/python/pyspark/sql/tests/test_udtf.py", line 1625, in test_udtf_with_analyze_using_accumulator
   >     assertSchemaEqual(df.schema, StructType().add("col1", IntegerType()))
   >   File "/Users/yangjie01/SourceCode/git/spark-mine-sbt/python/pyspark/testing/utils.py", line 356, in assertSchemaEqual
   >     raise PySparkAssertionError(
   > pyspark.errors.exceptions.base.PySparkAssertionError: [DIFFERENT_SCHEMA] Schemas do not match.
   > --- actual
   > +++ expected
   > - StructType([StructField('a', IntegerType(), True), StructField('b', IntegerType(), True)])
   > + StructType([StructField('col1', IntegerType(), True)])
   > 
   > ======================================================================
   > FAIL: test_udtf_with_analyze_using_accumulator (pyspark.sql.tests.test_udtf.UDTFTests)
   > ----------------------------------------------------------------------
   > Traceback (most recent call last):
   >   File "/Users/yangjie01/SourceCode/git/spark-mine-sbt/python/pyspark/sql/tests/test_udtf.py", line 1628, in test_udtf_with_analyze_using_accumulator
   >     self.assertEqual(test_accum.value, 222)
   > AssertionError: 111 != 222
   > 
   > ----------------------------------------------------------------------
   > Ran 174 tests in 54.619s
   > 
   > FAILED (failures=34, errors=6, skipped=2)
   > ```
   > 
   > There are 34 test failures after this one merged.
   > 
   > @utkarsh39 Do you have time to fix these test cases? For this, I have created [SPARK-44797](https://issues.apache.org/jira/browse/SPARK-44797).
   > 
   > Or should we revert this PR to restore the Java 17 daily tests first? @HyukjinKwon @ueshin @dongjoon-hyun
   
   I will try to get these tests fixed ASAP


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #42385: [SPARK-44705][PYTHON] Make PythonRunner single-threaded

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #42385:
URL: https://github.com/apache/spark/pull/42385#discussion_r1295429496


##########
core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala:
##########
@@ -71,14 +71,17 @@ private[spark] class StreamingPythonRunner(
     val prevConf = conf.get(PYTHON_USE_DAEMON)
     conf.set(PYTHON_USE_DAEMON, false)
     try {
-      val (worker, _) = env.createPythonWorker(
-        pythonExec, workerModule, envVars.asScala.toMap)
+      val workerFactory =
+        new PythonWorkerFactory(pythonExec, workerModule, envVars.asScala.toMap)
+      val (worker: PythonWorker, _) = workerFactory.createSimpleWorker(blockingMode = true)

Review Comment:
   What is this change about? 
   It broke `stop()` method below. 
   cc: @WweiL, @HyukjinKwon 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on pull request #42385: [SPARK-44705][PYTHON] Make PythonRunner single-threaded

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on PR #42385:
URL: https://github.com/apache/spark/pull/42385#issuecomment-1678266553

   > I merged #42422. Let's see the next daily tests. Thanks.
   
   https://github.com/apache/spark/actions/runs/5861115482/job/15890643041
   
   <img width="886" alt="image" src="https://github.com/apache/spark/assets/1475305/9478dc9d-5fdf-49ae-ae64-d9c5f1cf6c8c">
   
   
   Still some failed, can't determine the reason for now, further investigation is needed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #42385: [SPARK-44705][PYTHON] Make PythonRunner single-threaded

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #42385:
URL: https://github.com/apache/spark/pull/42385#discussion_r1296293684


##########
core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala:
##########
@@ -71,14 +71,17 @@ private[spark] class StreamingPythonRunner(
     val prevConf = conf.get(PYTHON_USE_DAEMON)
     conf.set(PYTHON_USE_DAEMON, false)
     try {
-      val (worker, _) = env.createPythonWorker(
-        pythonExec, workerModule, envVars.asScala.toMap)
+      val workerFactory =
+        new PythonWorkerFactory(pythonExec, workerModule, envVars.asScala.toMap)
+      val (worker: PythonWorker, _) = workerFactory.createSimpleWorker(blockingMode = true)

Review Comment:
   Yes it breaks the stop() method below. It should be updated to like this: 
   https://github.com/apache/spark/blob/branch-3.5/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala#L109-L113



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WweiL commented on a diff in pull request #42385: [SPARK-44705][PYTHON] Make PythonRunner single-threaded

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #42385:
URL: https://github.com/apache/spark/pull/42385#discussion_r1296302212


##########
core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala:
##########
@@ -71,14 +71,17 @@ private[spark] class StreamingPythonRunner(
     val prevConf = conf.get(PYTHON_USE_DAEMON)
     conf.set(PYTHON_USE_DAEMON, false)
     try {
-      val (worker, _) = env.createPythonWorker(
-        pythonExec, workerModule, envVars.asScala.toMap)
+      val workerFactory =
+        new PythonWorkerFactory(pythonExec, workerModule, envVars.asScala.toMap)
+      val (worker: PythonWorker, _) = workerFactory.createSimpleWorker(blockingMode = true)

Review Comment:
   @utkarsh39 we will create a followup ticket to fix this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #42385: [SPARK-44705][PYTHON] Make PythonRunner single-threaded

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #42385:
URL: https://github.com/apache/spark/pull/42385#discussion_r1296312638


##########
core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala:
##########
@@ -71,14 +71,17 @@ private[spark] class StreamingPythonRunner(
     val prevConf = conf.get(PYTHON_USE_DAEMON)
     conf.set(PYTHON_USE_DAEMON, false)
     try {
-      val (worker, _) = env.createPythonWorker(
-        pythonExec, workerModule, envVars.asScala.toMap)
+      val workerFactory =
+        new PythonWorkerFactory(pythonExec, workerModule, envVars.asScala.toMap)
+      val (worker: PythonWorker, _) = workerFactory.createSimpleWorker(blockingMode = true)

Review Comment:
   Thanks guys.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on pull request #42385: [SPARK-44705] Make PythonRunner single-threaded

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on PR #42385:
URL: https://github.com/apache/spark/pull/42385#issuecomment-1668839667

   cc @HyukjinKwon 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon closed pull request #42385: [SPARK-44705][PYTHON] Make PythonRunner single-threaded

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon closed pull request #42385: [SPARK-44705][PYTHON] Make PythonRunner single-threaded
URL: https://github.com/apache/spark/pull/42385


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on pull request #42385: [SPARK-44705][PYTHON] Make PythonRunner single-threaded

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on PR #42385:
URL: https://github.com/apache/spark/pull/42385#issuecomment-1678406847

   > @LuciferYang The error is different from the previous one that seems to be fixed. Is it possible to rerun the test? I guess it's just flaky.
   
   The test passed after retrying, thanks for your work ~ @ueshin 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on pull request #42385: [SPARK-44705][PYTHON] Make PythonRunner single-threaded

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on PR #42385:
URL: https://github.com/apache/spark/pull/42385#issuecomment-1676690789

   This PR caused the failure of the Scala 2.13 mima check. https://github.com/apache/spark/pull/42479


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] utkarsh39 commented on a diff in pull request #42385: [SPARK-44705][PYTHON] Make PythonRunner single-threaded

Posted by "utkarsh39 (via GitHub)" <gi...@apache.org>.
utkarsh39 commented on code in PR #42385:
URL: https://github.com/apache/spark/pull/42385#discussion_r1294131365


##########
core/src/main/scala/org/apache/spark/ContextAwareIterator.scala:
##########
@@ -30,8 +30,10 @@ import org.apache.spark.annotation.DeveloperApi
  * Thus, we should use [[ContextAwareIterator]] to stop consuming after the task ends.
  *
  * @since 3.1.0
+ * @deprecated since 4.0.0 as its only usage for Python evaluation is now extinct
  */
 @DeveloperApi
+@deprecated("Only usage for Python evaluation is now extinct", "3.5.0")

Review Comment:
   PR to fix it: https://github.com/apache/spark/pull/42494



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ueshin commented on pull request #42385: [SPARK-44705][PYTHON] Make PythonRunner single-threaded

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on PR #42385:
URL: https://github.com/apache/spark/pull/42385#issuecomment-1677582315

   I think #42422 includes the fix. Could you take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #42385: [SPARK-44705][PYTHON] Make PythonRunner single-threaded

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #42385:
URL: https://github.com/apache/spark/pull/42385#discussion_r1287805977


##########
core/src/main/scala/org/apache/spark/ContextAwareIterator.scala:
##########
@@ -30,8 +30,10 @@ import org.apache.spark.annotation.DeveloperApi
  * Thus, we should use [[ContextAwareIterator]] to stop consuming after the task ends.
  *
  * @since 3.1.0
+ * @deprecated since 3.5.0 as its only usage for Python evaluation is now extinct
  */
 @DeveloperApi
+@deprecated("Only usage for Python evaluation is now extinct", "3.5.0")

Review Comment:
   Oops yes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #42385: [SPARK-44705][PYTHON] Make PythonRunner single-threaded

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #42385:
URL: https://github.com/apache/spark/pull/42385#issuecomment-1674118399

   Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ueshin commented on pull request #42385: [SPARK-44705][PYTHON] Make PythonRunner single-threaded

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on PR #42385:
URL: https://github.com/apache/spark/pull/42385#issuecomment-1677612335

   I merged #42422. Let's see the next daily tests. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ueshin commented on pull request #42385: [SPARK-44705][PYTHON] Make PythonRunner single-threaded

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on PR #42385:
URL: https://github.com/apache/spark/pull/42385#issuecomment-1678285030

   @LuciferYang The error is different from the previous one that seems to be fixed.
   Is it possible to rerun the test? I guess it's just flaky.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] utkarsh39 commented on a diff in pull request #42385: [SPARK-44705][PYTHON] Make PythonRunner single-threaded

Posted by "utkarsh39 (via GitHub)" <gi...@apache.org>.
utkarsh39 commented on code in PR #42385:
URL: https://github.com/apache/spark/pull/42385#discussion_r1296303115


##########
core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala:
##########
@@ -71,14 +71,17 @@ private[spark] class StreamingPythonRunner(
     val prevConf = conf.get(PYTHON_USE_DAEMON)
     conf.set(PYTHON_USE_DAEMON, false)
     try {
-      val (worker, _) = env.createPythonWorker(
-        pythonExec, workerModule, envVars.asScala.toMap)
+      val workerFactory =
+        new PythonWorkerFactory(pythonExec, workerModule, envVars.asScala.toMap)
+      val (worker: PythonWorker, _) = workerFactory.createSimpleWorker(blockingMode = true)

Review Comment:
   Thanks @WweiL 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org