You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "anishshri-db (via GitHub)" <gi...@apache.org> on 2023/10/18 01:17:18 UTC

[PR] [SPARK-45582] Ensure that store instance is not used after calling commit within output mode streaming aggregation [spark]

anishshri-db opened a new pull request, #43413:
URL: https://github.com/apache/spark/pull/43413

   ### What changes were proposed in this pull request?
   Ensure that store instance is not used after calling commit within output mode streaming aggregation
   
   ### Why are the changes needed?
   Without these changes, we were accessing the store instance to retrieve the iterator even after the commit was called. When commit is called, we release the DB instance lock. So its possible task retries can acquire the instance lock and close the DB instance. So when the original thread tries to access the DB, it might run into a null pointer exception. This change fixes the issue
   
   ```
           org.apache.spark.sql.streaming.StreamingQueryException: Job aborted due to stage failure: Task 0 in stage 5.0 failed 1 times, most recent failure: Lost task 0.0 in stage 5.0 (TID 492) (ip-10-110-25-116.us-west-2.compute.internal executor driver): java.lang.NullPointerException
   	at org.apache.spark.sql.execution.streaming.state.RocksDB.iterator(RocksDB.scala:337)
   	at org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider$RocksDBStateStore.iterator(RocksDBStateStoreProvider.scala:79)
   	at org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV1.values(StreamingAggregationStateManager.scala:130)
   	at org.apache.spark.sql.execution.streaming.StateStoreSaveExec.$anonfun$doExecute$5(statefulOperators.scala:543)
   	at org.apache.spark.sql.execution.streaming.state.package$StateStoreOps.$anonfun$mapPartitionsWithStateStore$1(package.scala:63)
   	at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:131)
   	at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:407)
   	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:404)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:371)
   ```
   
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   RocksDBStreamingAggregationSuite
   ```
   18:12:00.242 WARN org.apache.spark.sql.streaming.RocksDBStateStoreStreamingAggregationSuite:
   
   ===== POSSIBLE THREAD LEAK IN SUITE o.a.s.sql.streaming.RocksDBStateStoreStreamingAggregationSuite, threads: ForkJoinPool.commonPool-worker-6 (daemon=true), ForkJoinPool.commonPool-worker-4 (daemon=true), ForkJoinPool.commonPool-worker-7 (daemon=true), ForkJoinPool.commonPool-worker-5 (daemon=true), ForkJoinPool.commonPool-worker-3 (daemon=true), state-store-maintenance-task (daemon=true), rpc-boss-3-1 (daemon=true), ForkJoinPool.commonPool-worker-8 (daemon=true), shuffle-boss-6-1 (daemon=true), ForkJoinP...
   [info] Run completed in 5 minutes, 8 seconds.
   [info] Total number of tests run: 80
   [info] Suites: completed 1, aborted 0
   [info] Tests: succeeded 80, failed 0, canceled 0, ignored 0, pending 0
   [info] All tests passed.
   ```
   
   StreamingSessionWindowSuite
   ```
   ===== POSSIBLE THREAD LEAK IN SUITE o.a.s.sql.streaming.StreamingSessionWindowSuite, threads: ForkJoinPool.commonPool-worker-6 (daemon=true), state-store-maintenance-thread-0 (daemon=true), ForkJoinPool.commonPool-worker-4 (daemon=true), state-store-maintenance-thread-1 (daemon=true), ForkJoinPool.commonPool-worker-7 (daemon=true), ForkJoinPool.commonPool-worker-5 (daemon=true), ForkJoinPool.commonPool-worker-3 (daemon=true), state-store-maintenance-task (daemon=true), rpc-boss-3-1 (daemon=true), ForkJoin...
   [info] Run completed in 3 minutes, 38 seconds.
   [info] Total number of tests run: 48
   [info] Suites: completed 1, aborted 0
   [info] Tests: succeeded 48, failed 0, canceled 0, ignored 0, pending 0
   [info] All tests passed.
   ```
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45582][SS] Ensure that store instance is not used after calling commit within output mode streaming aggregation [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #43413:
URL: https://github.com/apache/spark/pull/43413#issuecomment-1769286752

   cc. @rangadi 
   https://github.com/anishshri-db/spark/actions/runs/6556849347/job/17830698193
   Looks like protobuf test is flaky. Would you mind looking into it? I can file a JIRA ticket if you prefer to. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45582][SS] Ensure that store instance is not used after calling commit within output mode streaming aggregation [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on PR #43413:
URL: https://github.com/apache/spark/pull/43413#issuecomment-1767700066

   @HeartSaVioR - failure seems same as Kafka one couple days back. Will retry running the action again


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45582][SS] Ensure that store instance is not used after calling commit within output mode streaming aggregation [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on PR #43413:
URL: https://github.com/apache/spark/pull/43413#issuecomment-1769156128

   @HeartSaVioR - I don't think the failure is related to my change
   
   ```
   2023-10-18T18:50:15.1625340Z [info] - single StructType(StructField(bytes_type,BinaryType,true)) with seed 399 *** FAILED *** (14 milliseconds)
   2023-10-18T18:50:15.1647561Z [info]   (Eval check with Java class name) Incorrect evaluation (codegen off): from_protobuf(to_protobuf([[B@3ac95708], org.apache.spark.sql.protobuf.protos.CatalystTypes$BytesMsg, None), org.apache.spark.sql.protobuf.protos.CatalystTypes$BytesMsg, None), actual: [null], expected: [[B@3061d6e0] (ExpressionEvalHelper.scala:254)
   2023-10-18T18:50:15.1651529Z [info]   org.scalatest.exceptions.TestFailedException:
   2023-10-18T18:50:15.1654278Z [info]   at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
   2023-10-18T18:50:15.1658438Z [info]   at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
   2023-10-18T18:50:15.1661293Z [info]   at org.scalatest.funsuite.AnyFunSuite.newAssertionFailedException(AnyFunSuite.scala:1564)
   2023-10-18T18:50:15.1663356Z [info]   at org.scalatest.Assertions.fail(Assertions.scala:933)
   2023-10-18T18:50:15.1664990Z [info]   at org.scalatest.Assertions.fail$(Assertions.scala:929)
   2023-10-18T18:50:15.1672419Z [info]   at org.scalatest.funsuite.AnyFunSuite.fail(AnyFunSuite.scala:1564)
   2023-10-18T18:50:15.1680257Z [info]   at org.apache.spark.sql.catalyst.expressions.ExpressionEvalHelper.checkEvaluationWithoutCodegen(ExpressionEvalHelper.scala:254)
   2023-10-18T18:50:15.1689021Z [info]   at org.apache.spark.sql.catalyst.expressions.ExpressionEvalHelper.checkEvaluationWithoutCodegen$(ExpressionEvalHelper.scala:244)
   2023-10-18T18:50:15.1700429Z [info]   at org.apache.spark.sql.protobuf.ProtobufCatalystDataConversionSuite.checkEvaluationWithoutCodegen(ProtobufCatalystDataConversionSuite.scala:34)
   2023-10-18T18:50:15.1709843Z [info]   at org.apache.spark.sql.catalyst.expressions.ExpressionEvalHelper.checkEvaluation(ExpressionEvalHelper.scala:87)
   ```
   
   I ran the tests locally and they seem fine
   ```
   [info] ProtobufCatalystDataConversionSuite:
   [info] Passed: Total 0, Failed 0, Errors 0, Passed 0
   [info] No tests to run for sql / Test / testOnly
   [info] - single StructType(StructField(int32_type,IntegerType,true)) with seed 305 (916 milliseconds)
   [info] - single StructType(StructField(double_type,DoubleType,true)) with seed 710 (145 milliseconds)
   [info] - single StructType(StructField(float_type,FloatType,true)) with seed 900 (146 milliseconds)
   [info] - single StructType(StructField(bytes_type,BinaryType,true)) with seed 128 (143 milliseconds)
   [info] - single StructType(StructField(string_type,StringType,true)) with seed 856 (139 milliseconds)
   [info] - Handle unsupported input of message type (369 milliseconds)
   [info] - filter push-down to Protobuf deserializer (176 milliseconds)
   [info] - ProtobufDeserializer with binary type (3 milliseconds)
   [info] - Full names for message using descriptor file (2 milliseconds)
   [warn] 22 warnings found
   [info] Run completed in 4 seconds, 546 milliseconds.
   [info] Total number of tests run: 9
   [info] Suites: completed 1, aborted 0
   [info] Tests: succeeded 9, failed 0, canceled 0, ignored 0, pending 0
   [info] All tests passed.
   ```
   
   So seems like some flakiness on this test


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45582][SS] Ensure that store instance is not used after calling commit within output mode streaming aggregation [spark]

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on PR #43413:
URL: https://github.com/apache/spark/pull/43413#issuecomment-1769310242

   @HeartSaVioR thanks. I hadn't see the flake with Protobuf tests. Will take a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45582][SS] Ensure that store instance is not used after calling commit within output mode streaming aggregation [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR closed pull request #43413: [SPARK-45582][SS] Ensure that store instance is not used after calling commit within output mode streaming aggregation
URL: https://github.com/apache/spark/pull/43413


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45582][SS] Ensure that store instance is not used after calling commit within output mode streaming aggregation [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #43413:
URL: https://github.com/apache/spark/pull/43413#issuecomment-1768339596

   @anishshri-db We wasn't lucky. Could you push a empty commit to retrigger CI?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45582][SS] Ensure that store instance is not used after calling commit within output mode streaming aggregation [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #43413:
URL: https://github.com/apache/spark/pull/43413#issuecomment-1769287545

   I'm going to merge this as the test failure is not related. Thanks! Merging to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45582][SS] Ensure that store instance is not used after calling commit within output mode streaming aggregation [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on PR #43413:
URL: https://github.com/apache/spark/pull/43413#issuecomment-1767447102

   @HeartSaVioR - PTAL, thx !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org