You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by HeartSaVioR <gi...@git.apache.org> on 2018/06/07 14:56:40 UTC

[GitHub] spark pull request #21506: [SPARK-24485][SS] Measure and log elapsed time fo...

GitHub user HeartSaVioR opened a pull request:

    https://github.com/apache/spark/pull/21506

    [SPARK-24485][SS] Measure and log elapsed time for filesystem operations in HDFSBackedStateStoreProvider

    ## What changes were proposed in this pull request?
    
    This patch measures and logs elapsed time for each operation which communicate with file system (mostly remote HDFS in production) in HDFSBackedStateStoreProvider to help investigating any latency issue.
    
    ## How was this patch tested?
    
    Manually tested.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/HeartSaVioR/spark SPARK-24485

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/21506.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #21506
    
----
commit d84f98fc978262f4165f78b3b223b8bb3151f735
Author: Jungtaek Lim <ka...@...>
Date:   2018-06-07T14:14:46Z

    [SPARK-24485][SS] Measure and log elapsed time for filesystem operations in HDFSBackedStateStoreProvider

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21506: [SPARK-24485][SS] Measure and log elapsed time fo...

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21506#discussion_r194266029
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala ---
    @@ -280,38 +278,49 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
         if (loadedCurrentVersionMap.isDefined) {
           return loadedCurrentVersionMap.get
         }
    -    val snapshotCurrentVersionMap = readSnapshotFile(version)
    -    if (snapshotCurrentVersionMap.isDefined) {
    -      synchronized { loadedMaps.put(version, snapshotCurrentVersionMap.get) }
    -      return snapshotCurrentVersionMap.get
    -    }
     
    -    // Find the most recent map before this version that we can.
    -    // [SPARK-22305] This must be done iteratively to avoid stack overflow.
    -    var lastAvailableVersion = version
    -    var lastAvailableMap: Option[MapType] = None
    -    while (lastAvailableMap.isEmpty) {
    -      lastAvailableVersion -= 1
    +    logWarning(s"The state for version $version doesn't exist in loadedMaps. " +
    +      "Reading snapshot file and delta files if needed..." +
    +      "Note that this is normal for the first batch of starting query.")
     
    -      if (lastAvailableVersion <= 0) {
    -        // Use an empty map for versions 0 or less.
    -        lastAvailableMap = Some(new MapType)
    -      } else {
    -        lastAvailableMap =
    -          synchronized { loadedMaps.get(lastAvailableVersion) }
    -            .orElse(readSnapshotFile(lastAvailableVersion))
    +    val (result, elapsedMs) = Utils.timeTakenMs {
    --- End diff --
    
    Github has an... interesting idea of how to display this diff. The only change was the existing code moving inside timeTakenMs, and adding the logWarning statements, correct?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21506: [SPARK-24485][SS] Measure and log elapsed time for files...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21506
  
    **[Test build #91627 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91627/testReport)** for PR 21506 at commit [`d84f98f`](https://github.com/apache/spark/commit/d84f98fc978262f4165f78b3b223b8bb3151f735).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21506: [SPARK-24485][SS] Measure and log elapsed time for files...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/spark/pull/21506
  
    There're plenty of other debug messages which might hide the log messages added from this patch. Would we want to log them with INFO instead of DEBUG?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21506: [SPARK-24485][SS] Measure and log elapsed time for files...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21506
  
    **[Test build #91626 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91626/testReport)** for PR 21506 at commit [`d84f98f`](https://github.com/apache/spark/commit/d84f98fc978262f4165f78b3b223b8bb3151f735).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21506: [SPARK-24485][SS] Measure and log elapsed time for files...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21506
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21506: [SPARK-24485][SS] Measure and log elapsed time fo...

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21506#discussion_r194266060
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala ---
    @@ -280,38 +278,49 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
         if (loadedCurrentVersionMap.isDefined) {
           return loadedCurrentVersionMap.get
         }
    -    val snapshotCurrentVersionMap = readSnapshotFile(version)
    -    if (snapshotCurrentVersionMap.isDefined) {
    -      synchronized { loadedMaps.put(version, snapshotCurrentVersionMap.get) }
    -      return snapshotCurrentVersionMap.get
    -    }
     
    -    // Find the most recent map before this version that we can.
    -    // [SPARK-22305] This must be done iteratively to avoid stack overflow.
    -    var lastAvailableVersion = version
    -    var lastAvailableMap: Option[MapType] = None
    -    while (lastAvailableMap.isEmpty) {
    -      lastAvailableVersion -= 1
    +    logWarning(s"The state for version $version doesn't exist in loadedMaps. " +
    +      "Reading snapshot file and delta files if needed..." +
    +      "Note that this is normal for the first batch of starting query.")
     
    -      if (lastAvailableVersion <= 0) {
    -        // Use an empty map for versions 0 or less.
    -        lastAvailableMap = Some(new MapType)
    -      } else {
    -        lastAvailableMap =
    -          synchronized { loadedMaps.get(lastAvailableVersion) }
    -            .orElse(readSnapshotFile(lastAvailableVersion))
    +    val (result, elapsedMs) = Utils.timeTakenMs {
    +      val snapshotCurrentVersionMap = readSnapshotFile(version)
    +      if (snapshotCurrentVersionMap.isDefined) {
    +        synchronized { loadedMaps.put(version, snapshotCurrentVersionMap.get) }
    +        return snapshotCurrentVersionMap.get
    +      }
    +
    +      // Find the most recent map before this version that we can.
    +      // [SPARK-22305] This must be done iteratively to avoid stack overflow.
    +      var lastAvailableVersion = version
    +      var lastAvailableMap: Option[MapType] = None
    +      while (lastAvailableMap.isEmpty) {
    +        lastAvailableVersion -= 1
    +
    +        if (lastAvailableVersion <= 0) {
    +          // Use an empty map for versions 0 or less.
    +          lastAvailableMap = Some(new MapType)
    +        } else {
    +          lastAvailableMap =
    +            synchronized { loadedMaps.get(lastAvailableVersion) }
    +              .orElse(readSnapshotFile(lastAvailableVersion))
    +        }
    +      }
    +
    +      // Load all the deltas from the version after the last available one up to the target version.
    +      // The last available version is the one with a full snapshot, so it doesn't need deltas.
    +      val resultMap = new MapType(lastAvailableMap.get)
    +      for (deltaVersion <- lastAvailableVersion + 1 to version) {
    +        updateFromDeltaFile(deltaVersion, resultMap)
           }
    -    }
     
    -    // Load all the deltas from the version after the last available one up to the target version.
    -    // The last available version is the one with a full snapshot, so it doesn't need deltas.
    -    val resultMap = new MapType(lastAvailableMap.get)
    -    for (deltaVersion <- lastAvailableVersion + 1 to version) {
    -      updateFromDeltaFile(deltaVersion, resultMap)
    +      synchronized { loadedMaps.put(version, resultMap) }
    +      resultMap
         }
     
    -    synchronized { loadedMaps.put(version, resultMap) }
    -    resultMap
    +    logWarning(s"Loading state for $version takes $elapsedMs ms.")
    --- End diff --
    
    I'm not sure this should be a warning.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21506: [SPARK-24485][SS] Measure and log elapsed time for files...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21506
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91627/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21506: [SPARK-24485][SS] Measure and log elapsed time for files...

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on the issue:

    https://github.com/apache/spark/pull/21506
  
    lgtm


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21506: [SPARK-24485][SS] Measure and log elapsed time fo...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21506#discussion_r194293251
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala ---
    @@ -280,38 +278,49 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
         if (loadedCurrentVersionMap.isDefined) {
           return loadedCurrentVersionMap.get
         }
    -    val snapshotCurrentVersionMap = readSnapshotFile(version)
    -    if (snapshotCurrentVersionMap.isDefined) {
    -      synchronized { loadedMaps.put(version, snapshotCurrentVersionMap.get) }
    -      return snapshotCurrentVersionMap.get
    -    }
     
    -    // Find the most recent map before this version that we can.
    -    // [SPARK-22305] This must be done iteratively to avoid stack overflow.
    -    var lastAvailableVersion = version
    -    var lastAvailableMap: Option[MapType] = None
    -    while (lastAvailableMap.isEmpty) {
    -      lastAvailableVersion -= 1
    +    logWarning(s"The state for version $version doesn't exist in loadedMaps. " +
    +      "Reading snapshot file and delta files if needed..." +
    +      "Note that this is normal for the first batch of starting query.")
     
    -      if (lastAvailableVersion <= 0) {
    -        // Use an empty map for versions 0 or less.
    -        lastAvailableMap = Some(new MapType)
    -      } else {
    -        lastAvailableMap =
    -          synchronized { loadedMaps.get(lastAvailableVersion) }
    -            .orElse(readSnapshotFile(lastAvailableVersion))
    +    val (result, elapsedMs) = Utils.timeTakenMs {
    --- End diff --
    
    Yup right. Most of the code change is just wrapping codes into timeTakenMs.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21506: [SPARK-24485][SS] Measure and log elapsed time for files...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21506
  
    **[Test build #91649 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91649/testReport)** for PR 21506 at commit [`3d0e23f`](https://github.com/apache/spark/commit/3d0e23f7460976a33d6f86178d04f04e488bfaa8).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21506: [SPARK-24485][SS] Measure and log elapsed time for files...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21506
  
    **[Test build #91627 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91627/testReport)** for PR 21506 at commit [`d84f98f`](https://github.com/apache/spark/commit/d84f98fc978262f4165f78b3b223b8bb3151f735).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21506: [SPARK-24485][SS] Measure and log elapsed time for files...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/21506
  
    Merged to master.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21506: [SPARK-24485][SS] Measure and log elapsed time for files...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21506
  
    **[Test build #91651 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91651/testReport)** for PR 21506 at commit [`3d0e23f`](https://github.com/apache/spark/commit/3d0e23f7460976a33d6f86178d04f04e488bfaa8).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21506: [SPARK-24485][SS] Measure and log elapsed time for files...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21506
  
    **[Test build #91525 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91525/testReport)** for PR 21506 at commit [`d84f98f`](https://github.com/apache/spark/commit/d84f98fc978262f4165f78b3b223b8bb3151f735).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21506: [SPARK-24485][SS] Measure and log elapsed time for files...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21506
  
    **[Test build #91651 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91651/testReport)** for PR 21506 at commit [`3d0e23f`](https://github.com/apache/spark/commit/3d0e23f7460976a33d6f86178d04f04e488bfaa8).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21506: [SPARK-24485][SS] Measure and log elapsed time for files...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21506
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21506: [SPARK-24485][SS] Measure and log elapsed time for files...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21506
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91525/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21506: [SPARK-24485][SS] Measure and log elapsed time for files...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21506
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91651/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21506: [SPARK-24485][SS] Measure and log elapsed time fo...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/21506


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21506: [SPARK-24485][SS] Measure and log elapsed time for files...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21506
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91649/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21506: [SPARK-24485][SS] Measure and log elapsed time for files...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21506
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91626/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21506: [SPARK-24485][SS] Measure and log elapsed time for files...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21506
  
    **[Test build #91626 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91626/testReport)** for PR 21506 at commit [`d84f98f`](https://github.com/apache/spark/commit/d84f98fc978262f4165f78b3b223b8bb3151f735).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21506: [SPARK-24485][SS] Measure and log elapsed time for files...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21506
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21506: [SPARK-24485][SS] Measure and log elapsed time for files...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/spark/pull/21506
  
    Kindly ping again to @tdas 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21506: [SPARK-24485][SS] Measure and log elapsed time for files...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/spark/pull/21506
  
    cc. @tdas @jose-torres @jerryshao @arunmahadevan @HyukjinKwon 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21506: [SPARK-24485][SS] Measure and log elapsed time for files...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21506
  
    **[Test build #91649 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91649/testReport)** for PR 21506 at commit [`3d0e23f`](https://github.com/apache/spark/commit/3d0e23f7460976a33d6f86178d04f04e488bfaa8).
     * This patch **fails due to an unknown error code, -9**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21506: [SPARK-24485][SS] Measure and log elapsed time for files...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/21506
  
    ok to test


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21506: [SPARK-24485][SS] Measure and log elapsed time fo...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21506#discussion_r194295068
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala ---
    @@ -280,38 +278,49 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
         if (loadedCurrentVersionMap.isDefined) {
           return loadedCurrentVersionMap.get
         }
    -    val snapshotCurrentVersionMap = readSnapshotFile(version)
    -    if (snapshotCurrentVersionMap.isDefined) {
    -      synchronized { loadedMaps.put(version, snapshotCurrentVersionMap.get) }
    -      return snapshotCurrentVersionMap.get
    -    }
     
    -    // Find the most recent map before this version that we can.
    -    // [SPARK-22305] This must be done iteratively to avoid stack overflow.
    -    var lastAvailableVersion = version
    -    var lastAvailableMap: Option[MapType] = None
    -    while (lastAvailableMap.isEmpty) {
    -      lastAvailableVersion -= 1
    +    logWarning(s"The state for version $version doesn't exist in loadedMaps. " +
    +      "Reading snapshot file and delta files if needed..." +
    +      "Note that this is normal for the first batch of starting query.")
     
    -      if (lastAvailableVersion <= 0) {
    -        // Use an empty map for versions 0 or less.
    -        lastAvailableMap = Some(new MapType)
    -      } else {
    -        lastAvailableMap =
    -          synchronized { loadedMaps.get(lastAvailableVersion) }
    -            .orElse(readSnapshotFile(lastAvailableVersion))
    +    val (result, elapsedMs) = Utils.timeTakenMs {
    +      val snapshotCurrentVersionMap = readSnapshotFile(version)
    +      if (snapshotCurrentVersionMap.isDefined) {
    +        synchronized { loadedMaps.put(version, snapshotCurrentVersionMap.get) }
    +        return snapshotCurrentVersionMap.get
    +      }
    +
    +      // Find the most recent map before this version that we can.
    +      // [SPARK-22305] This must be done iteratively to avoid stack overflow.
    +      var lastAvailableVersion = version
    +      var lastAvailableMap: Option[MapType] = None
    +      while (lastAvailableMap.isEmpty) {
    +        lastAvailableVersion -= 1
    +
    +        if (lastAvailableVersion <= 0) {
    +          // Use an empty map for versions 0 or less.
    +          lastAvailableMap = Some(new MapType)
    +        } else {
    +          lastAvailableMap =
    +            synchronized { loadedMaps.get(lastAvailableVersion) }
    +              .orElse(readSnapshotFile(lastAvailableVersion))
    +        }
    +      }
    +
    +      // Load all the deltas from the version after the last available one up to the target version.
    +      // The last available version is the one with a full snapshot, so it doesn't need deltas.
    +      val resultMap = new MapType(lastAvailableMap.get)
    +      for (deltaVersion <- lastAvailableVersion + 1 to version) {
    +        updateFromDeltaFile(deltaVersion, resultMap)
           }
    -    }
     
    -    // Load all the deltas from the version after the last available one up to the target version.
    -    // The last available version is the one with a full snapshot, so it doesn't need deltas.
    -    val resultMap = new MapType(lastAvailableMap.get)
    -    for (deltaVersion <- lastAvailableVersion + 1 to version) {
    -      updateFromDeltaFile(deltaVersion, resultMap)
    +      synchronized { loadedMaps.put(version, resultMap) }
    +      resultMap
         }
     
    -    synchronized { loadedMaps.put(version, resultMap) }
    -    resultMap
    +    logWarning(s"Loading state for $version takes $elapsedMs ms.")
    --- End diff --
    
    Changed log level to DEBUG.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21506: [SPARK-24485][SS] Measure and log elapsed time for files...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/spark/pull/21506
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21506: [SPARK-24485][SS] Measure and log elapsed time for files...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/21506
  
    add to whitelist


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21506: [SPARK-24485][SS] Measure and log elapsed time for files...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21506
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21506: [SPARK-24485][SS] Measure and log elapsed time for files...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21506
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21506: [SPARK-24485][SS] Measure and log elapsed time for files...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21506
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21506: [SPARK-24485][SS] Measure and log elapsed time fo...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21506#discussion_r194293481
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala ---
    @@ -280,38 +278,49 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
         if (loadedCurrentVersionMap.isDefined) {
           return loadedCurrentVersionMap.get
         }
    -    val snapshotCurrentVersionMap = readSnapshotFile(version)
    -    if (snapshotCurrentVersionMap.isDefined) {
    -      synchronized { loadedMaps.put(version, snapshotCurrentVersionMap.get) }
    -      return snapshotCurrentVersionMap.get
    -    }
     
    -    // Find the most recent map before this version that we can.
    -    // [SPARK-22305] This must be done iteratively to avoid stack overflow.
    -    var lastAvailableVersion = version
    -    var lastAvailableMap: Option[MapType] = None
    -    while (lastAvailableMap.isEmpty) {
    -      lastAvailableVersion -= 1
    +    logWarning(s"The state for version $version doesn't exist in loadedMaps. " +
    +      "Reading snapshot file and delta files if needed..." +
    +      "Note that this is normal for the first batch of starting query.")
     
    -      if (lastAvailableVersion <= 0) {
    -        // Use an empty map for versions 0 or less.
    -        lastAvailableMap = Some(new MapType)
    -      } else {
    -        lastAvailableMap =
    -          synchronized { loadedMaps.get(lastAvailableVersion) }
    -            .orElse(readSnapshotFile(lastAvailableVersion))
    +    val (result, elapsedMs) = Utils.timeTakenMs {
    +      val snapshotCurrentVersionMap = readSnapshotFile(version)
    +      if (snapshotCurrentVersionMap.isDefined) {
    +        synchronized { loadedMaps.put(version, snapshotCurrentVersionMap.get) }
    +        return snapshotCurrentVersionMap.get
    +      }
    +
    +      // Find the most recent map before this version that we can.
    +      // [SPARK-22305] This must be done iteratively to avoid stack overflow.
    +      var lastAvailableVersion = version
    +      var lastAvailableMap: Option[MapType] = None
    +      while (lastAvailableMap.isEmpty) {
    +        lastAvailableVersion -= 1
    +
    +        if (lastAvailableVersion <= 0) {
    +          // Use an empty map for versions 0 or less.
    +          lastAvailableMap = Some(new MapType)
    +        } else {
    +          lastAvailableMap =
    +            synchronized { loadedMaps.get(lastAvailableVersion) }
    +              .orElse(readSnapshotFile(lastAvailableVersion))
    +        }
    +      }
    +
    +      // Load all the deltas from the version after the last available one up to the target version.
    +      // The last available version is the one with a full snapshot, so it doesn't need deltas.
    +      val resultMap = new MapType(lastAvailableMap.get)
    +      for (deltaVersion <- lastAvailableVersion + 1 to version) {
    +        updateFromDeltaFile(deltaVersion, resultMap)
           }
    -    }
     
    -    // Load all the deltas from the version after the last available one up to the target version.
    -    // The last available version is the one with a full snapshot, so it doesn't need deltas.
    -    val resultMap = new MapType(lastAvailableMap.get)
    -    for (deltaVersion <- lastAvailableVersion + 1 to version) {
    -      updateFromDeltaFile(deltaVersion, resultMap)
    +      synchronized { loadedMaps.put(version, resultMap) }
    +      resultMap
         }
     
    -    synchronized { loadedMaps.put(version, resultMap) }
    -    resultMap
    +    logWarning(s"Loading state for $version takes $elapsedMs ms.")
    --- End diff --
    
    I just thought about making a pair between warning message above and this, but once we are guiding end users to turn on DEBUG level to see information regarding addition latencies, turning this to DEBUG would be also OK.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21506: [SPARK-24485][SS] Measure and log elapsed time for files...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21506
  
    **[Test build #91525 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91525/testReport)** for PR 21506 at commit [`d84f98f`](https://github.com/apache/spark/commit/d84f98fc978262f4165f78b3b223b8bb3151f735).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21506: [SPARK-24485][SS] Measure and log elapsed time for files...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21506
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org