You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2023/02/25 09:41:06 UTC

[spark] branch master updated: [SPARK-42567][SS][SQL] Track load time for state store provider and log warning if it exceeds threshold

This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b71f71e196 [SPARK-42567][SS][SQL] Track load time for state store provider and log warning if it exceeds threshold
6b71f71e196 is described below

commit 6b71f71e196718474a0e204a6b29aec2d2f8530d
Author: Anish Shrigondekar <an...@databricks.com>
AuthorDate: Sat Feb 25 18:40:52 2023 +0900

    [SPARK-42567][SS][SQL] Track load time for state store provider and log warning if it exceeds threshold
    
    ### What changes were proposed in this pull request?
    Track load time for state store provider and log warning if it exceeds threshold
    
    ### Why are the changes needed?
    We have seen that the initial state store provider load can be blocked by external factors such as filesystem initialization. This log enables us to track cases where this load takes too long and we log a warning in such cases.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Augmented some of the tests to verify the logging is working as expected.
    Sample logs:
    ```
    14:58:51.784 WARN org.apache.spark.sql.execution.streaming.state.StateStore: Loaded state store provider in loadTimeMs=2049 for storeId=StateStoreId[ checkpointRootLocation=file:/Users/anish.shrigondekar/spark/spark/target/tmp/streaming.metadata-1f2ff296-1ece-4a0c-b4b4-48aa0e909b49/
    state, operatorId=0, partitionId=2, storeName=default ] and queryRunId=a4063603-3929-4340-9920-eca206ebec36
    14:58:53.838 WARN org.apache.spark.sql.execution.streaming.state.StateStore: Loaded state store provider in loadTimeMs=2046 for storeId=StateStoreId[ checkpointRootLocation=file:/Users/anish.shrigondekar/spark/spark/target/tmp/streaming.metadata-1f2ff296-1ece-4a0c-b4b4-48aa0e909b49/
    state, operatorId=0, partitionId=3, storeName=default ] and queryRunId=a4063603-3929-4340-9920-eca206ebec36
    14:58:55.885 WARN org.apache.spark.sql.execution.streaming.state.StateStore: Loaded state store provider in loadTimeMs=2044 for storeId=StateStoreId[ checkpointRootLocation=file:/Users/anish.shrigondekar/spark/spark/target/tmp/streaming.metadata-1f2ff296-1ece-4a0c-b4b4-48aa0e909b49/
    state, operatorId=0, partitionId=4, storeName=default ] and queryRunId=a4063603-3929-4340-9920-eca206ebec36
    ```
    
    Closes #40163 from anishshri-db/task/SPARK-42567.
    
    Authored-by: Anish Shrigondekar <an...@databricks.com>
    Signed-off-by: Jungtaek Lim <ka...@gmail.com>
---
 .../sql/execution/streaming/state/StateStore.scala | 27 ++++++++++++++++++----
 1 file changed, 22 insertions(+), 5 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index 787f4e390e5..beb6500fe3a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -398,6 +398,12 @@ case class StateStoreId(
       new Path(checkpointRootLocation, s"$operatorId/$partitionId/$storeName")
     }
   }
+
+  override def toString: String = {
+    s"""StateStoreId[ checkpointRootLocation=$checkpointRootLocation, operatorId=$operatorId,
+       | partitionId=$partitionId, storeName=$storeName ]
+       |""".stripMargin.replaceAll("\n", "")
+  }
 }
 
 object StateStoreId {
@@ -533,11 +539,22 @@ object StateStore extends Logging {
         }
       }
 
-      val provider = loadedProviders.getOrElseUpdate(
-        storeProviderId,
-        StateStoreProvider.createAndInit(
-          storeProviderId, keySchema, valueSchema, numColsPrefixKey, storeConf, hadoopConf)
-      )
+      // SPARK-42567 - Track load time for state store provider and log warning if takes longer
+      // than 2s.
+      val (provider, loadTimeMs) = Utils.timeTakenMs {
+        loadedProviders.getOrElseUpdate(
+          storeProviderId,
+          StateStoreProvider.createAndInit(
+            storeProviderId, keySchema, valueSchema, numColsPrefixKey, storeConf, hadoopConf)
+        )
+      }
+
+      if (loadTimeMs > 2000L) {
+        logWarning(s"Loaded state store provider in loadTimeMs=$loadTimeMs " +
+          s"for storeId=${storeProviderId.storeId.toString} and " +
+          s"queryRunId=${storeProviderId.queryRunId}")
+      }
+
       val otherProviderIds = loadedProviders.keys.filter(_ != storeProviderId).toSeq
       val providerIdsToUnload = reportActiveStoreInstance(storeProviderId, otherProviderIds)
       providerIdsToUnload.foreach(unload(_))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org