You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Huanli Wang (Jira)" <ji...@apache.org> on 2023/03/14 22:08:00 UTC

[jira] [Updated] (SPARK-42794) Increase the lockAcquireTimeoutMs for acquiring the RocksDB state store in Structure Streaming

     [ https://issues.apache.org/jira/browse/SPARK-42794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Huanli Wang updated SPARK-42794:
--------------------------------
    Description: 
We are seeing query failure which is caused by RocksDB acquisition failure for the retry tasks.
 *  at t1, we shrink the cluster to only have one executor

{code:java}
23/03/05 22:47:21 INFO StandaloneAppClient$ClientEndpoint: Executor updated: app-20230305224215-0000/2 is now DECOMMISSIONED (worker decommissioned because of kill request from HTTP endpoint (data migration disabled))
23/03/05 22:47:21 INFO StandaloneAppClient$ClientEndpoint: Executor updated: app-20230305224215-0000/3 is now DECOMMISSIONED (worker decommissioned because of kill request from HTTP endpoint (data migration disabled))
{code}
 
 * at t1+2min, task 7 at its first attempt (i.e. task 7.0) is scheduled to the alive executor

{code:java}
23/03/05 22:49:58 INFO TaskSetManager: Starting task 7.0 in stage 133.0 (TID 685) (10.166.225.249, executor 0, partition 7, ANY, {code}
 

It seems that task 7.0 is able to pass *{{dataRDD.iterator(partition, ctxt)}}* and acquires the rocksdb lock as we are seeing
{code:java}
23/03/05 22:51:59 WARN TaskSetManager: Lost task 4.1 in stage 133.1 (TID 700) (10.166.225.249 executor 0): java.lang.IllegalStateException: StateStoreId(opId=0,partId=7,name=default): RocksDB instance could not be acquired by [ThreadId: Some(50), task: partition 7.1 in stage 133.1, TID 700] as it was not released by [ThreadId: Some(449), task: partition 7.0 in stage 133.0, TID 685] after 60003 ms.
23/03/05 22:52:59 WARN TaskSetManager: Lost task 4.2 in stage 133.1 (TID 702) (10.166.225.249 executor 0): java.lang.IllegalStateException: StateStoreId(opId=0,partId=7,name=default): RocksDB instance could not be acquired by [ThreadId: Some(1495), task: partition 7.2 in stage 133.1, TID 702] as it was not released by [ThreadId: Some(449), task: partition 7.0 in stage 133.0, TID 685] after 60006 ms.
23/03/05 22:53:59 WARN TaskSetManager: Lost task 4.3 in stage 133.1 (TID 704) (10.166.225.249 executor 0): java.lang.IllegalStateException: StateStoreId(opId=0,partId=7,name=default): RocksDB instance could not be acquired by [ThreadId: Some(46), task: partition 7.3 in stage 133.1, TID 704] as it was not released by [ThreadId: Some(449), task: partition 7.0 in stage 133.0, TID 685] after 60003 ms.
{code}
 
Increasing the *lockAcquireTimeoutMs* to 2 minutes such that 4 task retries will give us 8 minutes to acquire the lock and it is larger than connectionTimeout with retries (3 * 120s).

  was:
We are seeing query failure which is caused by RocksDB acquisition failure for the retry tasks.
 *  at t1, we shrink the cluster to only have one executor

{code:java}
23/03/05 22:47:21 INFO StandaloneAppClient$ClientEndpoint: Executor updated: app-20230305224215-0000/2 is now DECOMMISSIONED (worker decommissioned because of kill request from HTTP endpoint (data migration disabled))
23/03/05 22:47:21 INFO StandaloneAppClient$ClientEndpoint: Executor updated: app-20230305224215-0000/3 is now DECOMMISSIONED (worker decommissioned because of kill request from HTTP endpoint (data migration disabled))
{code}
 
 * at t1+2min, task 7 at its first attempt (i.e. task 7.0) is scheduled to the alive executor

{code:java}
23/03/05 22:49:58 INFO TaskSetManager: Starting task 7.0 in stage 133.0 (TID 685) (10.166.225.249, executor 0, partition 7, ANY, {code}
 

It seems that task 7.0 is able to pass *{{dataRDD.iterator(partition, ctxt)}}* and acquires the rocksdb lock as we are seeing
{code:java}
23/03/05 22:51:59 WARN TaskSetManager: Lost task 4.1 in stage 133.1 (TID 700) (10.166.225.249 executor 0): java.lang.IllegalStateException: StateStoreId(opId=0,partId=7,name=default): RocksDB instance could not be acquired by [ThreadId: Some(50), task: partition 7.1 in stage 133.1, TID 700] as it was not released by [ThreadId: Some(449), task: partition 7.0 in stage 133.0, TID 685] after 60003 ms.
23/03/05 22:52:59 WARN TaskSetManager: Lost task 4.2 in stage 133.1 (TID 702) (10.166.225.249 executor 0): java.lang.IllegalStateException: StateStoreId(opId=0,partId=7,name=default): RocksDB instance could not be acquired by [ThreadId: Some(1495), task: partition 7.2 in stage 133.1, TID 702] as it was not released by [ThreadId: Some(449), task: partition 7.0 in stage 133.0, TID 685] after 60006 ms.
23/03/05 22:53:59 WARN TaskSetManager: Lost task 4.3 in stage 133.1 (TID 704) (10.166.225.249 executor 0): java.lang.IllegalStateException: StateStoreId(opId=0,partId=7,name=default): RocksDB instance could not be acquired by [ThreadId: Some(46), task: partition 7.3 in stage 133.1, TID 704] as it was not released by [ThreadId: Some(449), task: partition 7.0 in stage 133.0, TID 685] after 60003 ms.
{code}
 
Increasing the [lockAcquireTimeoutMs|https://src.dev.databricks.com/databricks/runtime/-/blob/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala?L927:3] to 2 minutes such that 4 task retries will give us 8 minutes to acquire the lock and it is larger than connectionTimeout with retries (3 * 120s).


> Increase the lockAcquireTimeoutMs for acquiring the RocksDB state store in Structure Streaming
> ----------------------------------------------------------------------------------------------
>
>                 Key: SPARK-42794
>                 URL: https://issues.apache.org/jira/browse/SPARK-42794
>             Project: Spark
>          Issue Type: Improvement
>          Components: Structured Streaming
>    Affects Versions: 3.5.0
>            Reporter: Huanli Wang
>            Priority: Major
>
> We are seeing query failure which is caused by RocksDB acquisition failure for the retry tasks.
>  *  at t1, we shrink the cluster to only have one executor
> {code:java}
> 23/03/05 22:47:21 INFO StandaloneAppClient$ClientEndpoint: Executor updated: app-20230305224215-0000/2 is now DECOMMISSIONED (worker decommissioned because of kill request from HTTP endpoint (data migration disabled))
> 23/03/05 22:47:21 INFO StandaloneAppClient$ClientEndpoint: Executor updated: app-20230305224215-0000/3 is now DECOMMISSIONED (worker decommissioned because of kill request from HTTP endpoint (data migration disabled))
> {code}
>  
>  * at t1+2min, task 7 at its first attempt (i.e. task 7.0) is scheduled to the alive executor
> {code:java}
> 23/03/05 22:49:58 INFO TaskSetManager: Starting task 7.0 in stage 133.0 (TID 685) (10.166.225.249, executor 0, partition 7, ANY, {code}
>  
> It seems that task 7.0 is able to pass *{{dataRDD.iterator(partition, ctxt)}}* and acquires the rocksdb lock as we are seeing
> {code:java}
> 23/03/05 22:51:59 WARN TaskSetManager: Lost task 4.1 in stage 133.1 (TID 700) (10.166.225.249 executor 0): java.lang.IllegalStateException: StateStoreId(opId=0,partId=7,name=default): RocksDB instance could not be acquired by [ThreadId: Some(50), task: partition 7.1 in stage 133.1, TID 700] as it was not released by [ThreadId: Some(449), task: partition 7.0 in stage 133.0, TID 685] after 60003 ms.
> 23/03/05 22:52:59 WARN TaskSetManager: Lost task 4.2 in stage 133.1 (TID 702) (10.166.225.249 executor 0): java.lang.IllegalStateException: StateStoreId(opId=0,partId=7,name=default): RocksDB instance could not be acquired by [ThreadId: Some(1495), task: partition 7.2 in stage 133.1, TID 702] as it was not released by [ThreadId: Some(449), task: partition 7.0 in stage 133.0, TID 685] after 60006 ms.
> 23/03/05 22:53:59 WARN TaskSetManager: Lost task 4.3 in stage 133.1 (TID 704) (10.166.225.249 executor 0): java.lang.IllegalStateException: StateStoreId(opId=0,partId=7,name=default): RocksDB instance could not be acquired by [ThreadId: Some(46), task: partition 7.3 in stage 133.1, TID 704] as it was not released by [ThreadId: Some(449), task: partition 7.0 in stage 133.0, TID 685] after 60003 ms.
> {code}
>  
> Increasing the *lockAcquireTimeoutMs* to 2 minutes such that 4 task retries will give us 8 minutes to acquire the lock and it is larger than connectionTimeout with retries (3 * 120s).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org