You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/10/28 17:25:49 UTC

[GitHub] [spark] jerrypeng opened a new pull request, #38430: [SPARK-40957] Add in memory cache in HDFSMetadataLog

jerrypeng opened a new pull request, #38430:
URL: https://github.com/apache/spark/pull/38430

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38430: [SPARK-40957] Add in memory cache in HDFSMetadataLog

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38430:
URL: https://github.com/apache/spark/pull/38430#discussion_r1008968288


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala:
##########
@@ -64,6 +67,17 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
     fileManager.mkdirs(metadataPath)
   }
 
+  protected val metadataCacheEnabled: Boolean
+  = sparkSession.sessionState.conf.getConf(SQLConf.STREAMING_METADATA_CACHE_ENABLED)
+
+  /**
+   * Cache the latest two batches. [[StreamExecution]] usually just accesses the latest two batches
+   * when committing offsets, this cache will save some file system operations.
+   */
+  protected[sql] val batchCache = Collections.synchronizedMap(new LinkedHashMap[Long, T](2) {

Review Comment:
   In the previous implementation we didn't use Guava cache. We don't want to add more coupling to Guava unless it is quite bothering to implement by our own.
   
   I provided the diff upon caching offset seq, see here https://github.com/apache/spark/pull/31495
   This is a generalization of previous cache from offset seq to HDFSMetadataLog, which I actually got requested in my PR as well. As of now, the cache is only effective to offset seq, but we plan to propose functionalities/features where caching commit log would be helpful.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jerrypeng commented on pull request #38430: [SPARK-40957] Add in memory cache in HDFSMetadataLog

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on PR #38430:
URL: https://github.com/apache/spark/pull/38430#issuecomment-1297397528

   @LuciferYang @HeartSaVioR thanks for reviewing!  Do you guys have any other comments?  Or does this PR look goo?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #38430: [SPARK-40957] Add in memory cache in HDFSMetadataLog

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on code in PR #38430:
URL: https://github.com/apache/spark/pull/38430#discussion_r1011874757


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala:
##########
@@ -277,10 +295,34 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
     for (batchId <- batchIds if batchId > thresholdBatchId) {
       val path = batchIdToPath(batchId)
       fileManager.delete(path)
+      if (metadataCacheEnabled) batchCache.remove(batchId)
       logTrace(s"Removed metadata log file: $path")
     }
   }
 
+
+  /**
+   * List the available batches on file system. As a workaround for S3 inconsistent list, it also

Review Comment:
   Given that https://aws.amazon.com/blogs/aws/amazon-s3-update-strong-read-after-write-consistency/, this looks a little misleading. Could you elaborate a little more about what you are referring by `S3 inconsistent list` by revising this comment sentence, please?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #38430: [SPARK-40957] Add in memory cache in HDFSMetadataLog

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on code in PR #38430:
URL: https://github.com/apache/spark/pull/38430#discussion_r1011867108


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2007,6 +2007,14 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
+  val STREAMING_METADATA_CACHE_ENABLED =
+    buildConf("spark.sql.streaming.metadataCache.enabled")
+      .internal()
+      .doc("Whether the streaming HDFSMetadataLog caches the metadata of the latest two batches.")
+      .booleanConf
+      .createWithDefault(true)
+
+

Review Comment:
   Redundant empty line?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2007,6 +2007,14 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
+  val STREAMING_METADATA_CACHE_ENABLED =
+    buildConf("spark.sql.streaming.metadataCache.enabled")
+      .internal()
+      .doc("Whether the streaming HDFSMetadataLog caches the metadata of the latest two batches.")
+      .booleanConf
+      .createWithDefault(true)
+
+

Review Comment:
   nit. Redundant empty line?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR closed pull request #38430: [SPARK-40957] Add in memory cache in HDFSMetadataLog

Posted by GitBox <gi...@apache.org>.
HeartSaVioR closed pull request #38430: [SPARK-40957] Add in memory cache in HDFSMetadataLog
URL: https://github.com/apache/spark/pull/38430


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jerrypeng commented on a diff in pull request #38430: [SPARK-40957] Add in memory cache in HDFSMetadataLog

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on code in PR #38430:
URL: https://github.com/apache/spark/pull/38430#discussion_r1011239497


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala:
##########
@@ -64,6 +67,17 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
     fileManager.mkdirs(metadataPath)
   }
 
+  protected val metadataCacheEnabled: Boolean
+  = sparkSession.sessionState.conf.getConf(SQLConf.STREAMING_METADATA_CACHE_ENABLED)
+
+  /**
+   * Cache the latest two batches. [[StreamExecution]] usually just accesses the latest two batches
+   * when committing offsets, this cache will save some file system operations.
+   */
+  protected[sql] val batchCache = Collections.synchronizedMap(new LinkedHashMap[Long, T](2) {

Review Comment:
   We could but the change is not really going to yield much difference.  The memory foot print of this is minimal.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jerrypeng commented on a diff in pull request #38430: [SPARK-40957] Add in memory cache in HDFSMetadataLog

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on code in PR #38430:
URL: https://github.com/apache/spark/pull/38430#discussion_r1014297681


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala:
##########
@@ -277,10 +295,34 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
     for (batchId <- batchIds if batchId > thresholdBatchId) {
       val path = batchIdToPath(batchId)
       fileManager.delete(path)
+      if (metadataCacheEnabled) batchCache.remove(batchId)
       logTrace(s"Removed metadata log file: $path")
     }
   }
 
+
+  /**
+   * List the available batches on file system. As a workaround for S3 inconsistent list, it also

Review Comment:
   I think this comment is out of date.  Amazon now delivers strong read after write consistency.  I will remove in a subsequent PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #38430: [SPARK-40957] Add in memory cache in HDFSMetadataLog

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on code in PR #38430:
URL: https://github.com/apache/spark/pull/38430#discussion_r1008810087


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala:
##########
@@ -64,6 +67,17 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
     fileManager.mkdirs(metadataPath)
   }
 
+  protected val metadataCacheEnabled: Boolean
+  = sparkSession.sessionState.conf.getConf(SQLConf.STREAMING_METADATA_CACHE_ENABLED)
+
+  /**
+   * Cache the latest two batches. [[StreamExecution]] usually just accesses the latest two batches
+   * when committing offsets, this cache will save some file system operations.
+   */
+  protected[sql] val batchCache = Collections.synchronizedMap(new LinkedHashMap[Long, T](2) {

Review Comment:
   Why not use Guava Cache like other places? In what scenarios will this change improve? How much performance improvement will it bring?I think It's better to have a benchmark to explain.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #38430: [SPARK-40957] Add in memory cache in HDFSMetadataLog

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on code in PR #38430:
URL: https://github.com/apache/spark/pull/38430#discussion_r1010063341


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala:
##########
@@ -19,7 +19,9 @@ package org.apache.spark.sql.execution.streaming
 
 import java.io._
 import java.nio.charset.StandardCharsets
+import java.util.{Collections, LinkedHashMap}

Review Comment:
   nit: can rename `LinkedHashMap` to `JLinkedHashMap` or others to make it clearer
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jerrypeng commented on a diff in pull request #38430: [SPARK-40957] Add in memory cache in HDFSMetadataLog

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on code in PR #38430:
URL: https://github.com/apache/spark/pull/38430#discussion_r1010692304


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala:
##########
@@ -277,10 +295,34 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
     for (batchId <- batchIds if batchId > thresholdBatchId) {
       val path = batchIdToPath(batchId)
       fileManager.delete(path)
+      if (metadataCacheEnabled) batchCache.remove(batchId)
       logTrace(s"Removed metadata log file: $path")
     }
   }
 
+
+  /**
+   * List the available batches on file system. As a workaround for S3 inconsistent list, it also
+   * tries to take `batchCache` into consideration to infer a better answer.
+   */
+  protected def listBatches: Array[Long] = {
+    val batchIds = fileManager.list(metadataPath, batchFilesFilter)
+      .map(f => pathToBatchId(f.getPath)) ++
+      // Iterate over keySet is not thread safe. We call `toArray` to make a copy in the lock to
+      // elimiate the race condition.
+      batchCache.synchronized {
+        batchCache.keySet.asScala.toArray

Review Comment:
   We need to return a scala array not a Java array.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #38430: [SPARK-40957] Add in memory cache in HDFSMetadataLog

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on code in PR #38430:
URL: https://github.com/apache/spark/pull/38430#discussion_r1008989982


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala:
##########
@@ -64,6 +67,17 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
     fileManager.mkdirs(metadataPath)
   }
 
+  protected val metadataCacheEnabled: Boolean
+  = sparkSession.sessionState.conf.getConf(SQLConf.STREAMING_METADATA_CACHE_ENABLED)
+
+  /**
+   * Cache the latest two batches. [[StreamExecution]] usually just accesses the latest two batches
+   * when committing offsets, this cache will save some file system operations.
+   */
+  protected[sql] val batchCache = Collections.synchronizedMap(new LinkedHashMap[Long, T](2) {

Review Comment:
   Very clear, understand
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #38430: [SPARK-40957] Add in memory cache in HDFSMetadataLog

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on code in PR #38430:
URL: https://github.com/apache/spark/pull/38430#discussion_r1011867576


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala:
##########
@@ -64,6 +67,17 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
     fileManager.mkdirs(metadataPath)
   }
 
+  protected val metadataCacheEnabled: Boolean
+  = sparkSession.sessionState.conf.getConf(SQLConf.STREAMING_METADATA_CACHE_ENABLED)

Review Comment:
   nit. Indentation? We need two more space.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #38430: [SPARK-40957] Add in memory cache in HDFSMetadataLog

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on PR #38430:
URL: https://github.com/apache/spark/pull/38430#issuecomment-1300401192

   Looks like there is no major comment on 5 days, which seems OK to go proceed.
   
   Thanks! Merging to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jerrypeng commented on a diff in pull request #38430: [SPARK-40957] Add in memory cache in HDFSMetadataLog

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on code in PR #38430:
URL: https://github.com/apache/spark/pull/38430#discussion_r1011238096


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala:
##########
@@ -277,10 +295,34 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
     for (batchId <- batchIds if batchId > thresholdBatchId) {
       val path = batchIdToPath(batchId)
       fileManager.delete(path)
+      if (metadataCacheEnabled) batchCache.remove(batchId)
       logTrace(s"Removed metadata log file: $path")
     }
   }
 
+
+  /**
+   * List the available batches on file system. As a workaround for S3 inconsistent list, it also
+   * tries to take `batchCache` into consideration to infer a better answer.
+   */
+  protected def listBatches: Array[Long] = {
+    val batchIds = fileManager.list(metadataPath, batchFilesFilter)
+      .map(f => pathToBatchId(f.getPath)) ++
+      // Iterate over keySet is not thread safe. We call `toArray` to make a copy in the lock to
+      // elimiate the race condition.
+      batchCache.synchronized {
+        batchCache.keySet.asScala.toArray
+      }
+    logInfo("BatchIds found from listing: " + batchIds.sorted.mkString(", "))
+
+    if (batchIds.isEmpty) {
+      return Array.empty

Review Comment:
   will fix



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #38430: [SPARK-40957] Add in memory cache in HDFSMetadataLog

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on code in PR #38430:
URL: https://github.com/apache/spark/pull/38430#discussion_r1010061538


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala:
##########
@@ -277,10 +295,34 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
     for (batchId <- batchIds if batchId > thresholdBatchId) {
       val path = batchIdToPath(batchId)
       fileManager.delete(path)
+      if (metadataCacheEnabled) batchCache.remove(batchId)
       logTrace(s"Removed metadata log file: $path")
     }
   }
 
+
+  /**
+   * List the available batches on file system. As a workaround for S3 inconsistent list, it also
+   * tries to take `batchCache` into consideration to infer a better answer.
+   */
+  protected def listBatches: Array[Long] = {
+    val batchIds = fileManager.list(metadataPath, batchFilesFilter)
+      .map(f => pathToBatchId(f.getPath)) ++
+      // Iterate over keySet is not thread safe. We call `toArray` to make a copy in the lock to
+      // elimiate the race condition.
+      batchCache.synchronized {
+        batchCache.keySet.asScala.toArray

Review Comment:
   ~How about `batchCache.keySet().toArray`? ~
   ~And `SynchronizedCollection.toArray` seem threadsafe~



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala:
##########
@@ -277,10 +295,34 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
     for (batchId <- batchIds if batchId > thresholdBatchId) {
       val path = batchIdToPath(batchId)
       fileManager.delete(path)
+      if (metadataCacheEnabled) batchCache.remove(batchId)
       logTrace(s"Removed metadata log file: $path")
     }
   }
 
+
+  /**
+   * List the available batches on file system. As a workaround for S3 inconsistent list, it also
+   * tries to take `batchCache` into consideration to infer a better answer.
+   */
+  protected def listBatches: Array[Long] = {
+    val batchIds = fileManager.list(metadataPath, batchFilesFilter)
+      .map(f => pathToBatchId(f.getPath)) ++
+      // Iterate over keySet is not thread safe. We call `toArray` to make a copy in the lock to
+      // elimiate the race condition.
+      batchCache.synchronized {
+        batchCache.keySet.asScala.toArray

Review Comment:
   ~How about `batchCache.keySet().toArray`?~
   ~And `SynchronizedCollection.toArray` seem threadsafe~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jerrypeng commented on pull request #38430: [SPARK-40957] Add in memory cache in HDFSMetadataLog

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on PR #38430:
URL: https://github.com/apache/spark/pull/38430#issuecomment-1295283632

   @HeartSaVioR @zsxwing please review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38430: [SPARK-40957] Add in memory cache in HDFSMetadataLog

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38430:
URL: https://github.com/apache/spark/pull/38430#discussion_r1011232238


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala:
##########
@@ -19,7 +19,9 @@ package org.apache.spark.sql.execution.streaming
 
 import java.io._
 import java.nio.charset.StandardCharsets
+import java.util.{Collections, LinkedHashMap}

Review Comment:
   Yeah if there is a same class in Scala side, better to prefix it. Good point.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #38430: [SPARK-40957] Add in memory cache in HDFSMetadataLog

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on code in PR #38430:
URL: https://github.com/apache/spark/pull/38430#discussion_r1011869341


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala:
##########
@@ -168,7 +191,13 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
    * valid behavior, we still need to prevent it from destroying the files.
    */
   def addNewBatchByStream(batchId: Long)(fn: OutputStream => Unit): Boolean = {
-    get(batchId).map(_ => false).getOrElse {
+
+    val batchMetadataFile = batchIdToPath(batchId)
+
+    if ((metadataCacheEnabled && batchCache.containsKey(batchId))
+      || fileManager.exists(batchMetadataFile)) {

Review Comment:
   nit. Indentation?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #38430: [SPARK-40957] Add in memory cache in HDFSMetadataLog

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on PR #38430:
URL: https://github.com/apache/spark/pull/38430#issuecomment-1296082590

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #38430: [SPARK-40957] Add in memory cache in HDFSMetadataLog

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on code in PR #38430:
URL: https://github.com/apache/spark/pull/38430#discussion_r1010061925


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala:
##########
@@ -277,10 +295,34 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
     for (batchId <- batchIds if batchId > thresholdBatchId) {
       val path = batchIdToPath(batchId)
       fileManager.delete(path)
+      if (metadataCacheEnabled) batchCache.remove(batchId)
       logTrace(s"Removed metadata log file: $path")
     }
   }
 
+
+  /**
+   * List the available batches on file system. As a workaround for S3 inconsistent list, it also
+   * tries to take `batchCache` into consideration to infer a better answer.
+   */
+  protected def listBatches: Array[Long] = {
+    val batchIds = fileManager.list(metadataPath, batchFilesFilter)
+      .map(f => pathToBatchId(f.getPath)) ++
+      // Iterate over keySet is not thread safe. We call `toArray` to make a copy in the lock to
+      // elimiate the race condition.
+      batchCache.synchronized {
+        batchCache.keySet.asScala.toArray
+      }
+    logInfo("BatchIds found from listing: " + batchIds.sorted.mkString(", "))
+
+    if (batchIds.isEmpty) {
+      return Array.empty

Review Comment:
   nit: redundant `return`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #38430: [SPARK-40957] Add in memory cache in HDFSMetadataLog

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on PR #38430:
URL: https://github.com/apache/spark/pull/38430#issuecomment-1301097792

   @jerrypeng Could you please address post-review comments as followup PR? You don't need a new JIRA ticket.
   
   (Sigh, I forgot to check. Please make sure your PR title has a component prefix, e.g. `[SS]`.)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on pull request #38430: [SPARK-40957] Add in memory cache in HDFSMetadataLog

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on PR #38430:
URL: https://github.com/apache/spark/pull/38430#issuecomment-1300426540

   late lgtm


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jerrypeng commented on a diff in pull request #38430: [SPARK-40957] Add in memory cache in HDFSMetadataLog

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on code in PR #38430:
URL: https://github.com/apache/spark/pull/38430#discussion_r1011241156


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala:
##########
@@ -19,7 +19,9 @@ package org.apache.spark.sql.execution.streaming
 
 import java.io._
 import java.nio.charset.StandardCharsets
+import java.util.{Collections, LinkedHashMap}

Review Comment:
   sure



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #38430: [SPARK-40957] Add in memory cache in HDFSMetadataLog

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on code in PR #38430:
URL: https://github.com/apache/spark/pull/38430#discussion_r1010061538


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala:
##########
@@ -277,10 +295,34 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
     for (batchId <- batchIds if batchId > thresholdBatchId) {
       val path = batchIdToPath(batchId)
       fileManager.delete(path)
+      if (metadataCacheEnabled) batchCache.remove(batchId)
       logTrace(s"Removed metadata log file: $path")
     }
   }
 
+
+  /**
+   * List the available batches on file system. As a workaround for S3 inconsistent list, it also
+   * tries to take `batchCache` into consideration to infer a better answer.
+   */
+  protected def listBatches: Array[Long] = {
+    val batchIds = fileManager.list(metadataPath, batchFilesFilter)
+      .map(f => pathToBatchId(f.getPath)) ++
+      // Iterate over keySet is not thread safe. We call `toArray` to make a copy in the lock to
+      // elimiate the race condition.
+      batchCache.synchronized {
+        batchCache.keySet.asScala.toArray

Review Comment:
   How about `batchCache.keySet().toArray`? 
   ~And `SynchronizedCollection.toArray` seem threadsafe~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38430: [SPARK-40957] Add in memory cache in HDFSMetadataLog

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38430:
URL: https://github.com/apache/spark/pull/38430#discussion_r1011237000


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala:
##########
@@ -64,6 +67,17 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
     fileManager.mkdirs(metadataPath)
   }
 
+  protected val metadataCacheEnabled: Boolean
+  = sparkSession.sessionState.conf.getConf(SQLConf.STREAMING_METADATA_CACHE_ENABLED)
+
+  /**
+   * Cache the latest two batches. [[StreamExecution]] usually just accesses the latest two batches
+   * when committing offsets, this cache will save some file system operations.
+   */
+  protected[sql] val batchCache = Collections.synchronizedMap(new LinkedHashMap[Long, T](2) {

Review Comment:
   I was going to say I wouldn't concern much about it since the instances of HDFSMetadataLog are not many. But another point would be that whether 1) we prefer having condition in if or 2) we prefer dealing with higher-level API e.g. foreach. Given that most usages are also having another condition for if statement, it doesn't seem to simplify the code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jerrypeng commented on pull request #38430: [SPARK-40957] Add in memory cache in HDFSMetadataLog

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on PR #38430:
URL: https://github.com/apache/spark/pull/38430#issuecomment-1299643015

   @HeartSaVioR @LuciferYang thank you for the review.  I have addressed your comments. PTAL.  Thank in advance!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #38430: [SPARK-40957] Add in memory cache in HDFSMetadataLog

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on code in PR #38430:
URL: https://github.com/apache/spark/pull/38430#discussion_r1010061538


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala:
##########
@@ -277,10 +295,34 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
     for (batchId <- batchIds if batchId > thresholdBatchId) {
       val path = batchIdToPath(batchId)
       fileManager.delete(path)
+      if (metadataCacheEnabled) batchCache.remove(batchId)
       logTrace(s"Removed metadata log file: $path")
     }
   }
 
+
+  /**
+   * List the available batches on file system. As a workaround for S3 inconsistent list, it also
+   * tries to take `batchCache` into consideration to infer a better answer.
+   */
+  protected def listBatches: Array[Long] = {
+    val batchIds = fileManager.list(metadataPath, batchFilesFilter)
+      .map(f => pathToBatchId(f.getPath)) ++
+      // Iterate over keySet is not thread safe. We call `toArray` to make a copy in the lock to
+      // elimiate the race condition.
+      batchCache.synchronized {
+        batchCache.keySet.asScala.toArray

Review Comment:
   How about `batchCache.keySet().toArray`? And `SynchronizedCollection.toArray` seem threadsafe



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #38430: [SPARK-40957] Add in memory cache in HDFSMetadataLog

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on code in PR #38430:
URL: https://github.com/apache/spark/pull/38430#discussion_r1010062994


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala:
##########
@@ -64,6 +67,17 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
     fileManager.mkdirs(metadataPath)
   }
 
+  protected val metadataCacheEnabled: Boolean
+  = sparkSession.sessionState.conf.getConf(SQLConf.STREAMING_METADATA_CACHE_ENABLED)
+
+  /**
+   * Cache the latest two batches. [[StreamExecution]] usually just accesses the latest two batches
+   * when committing offsets, this cache will save some file system operations.
+   */
+  protected[sql] val batchCache = Collections.synchronizedMap(new LinkedHashMap[Long, T](2) {

Review Comment:
   Could we create `Map` instance only when `metadataCacheEnabled` is true? Maybe we can make it as `Option`
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #38430: [SPARK-40957] Add in memory cache in HDFSMetadataLog

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on code in PR #38430:
URL: https://github.com/apache/spark/pull/38430#discussion_r1011874757


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala:
##########
@@ -277,10 +295,34 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
     for (batchId <- batchIds if batchId > thresholdBatchId) {
       val path = batchIdToPath(batchId)
       fileManager.delete(path)
+      if (metadataCacheEnabled) batchCache.remove(batchId)
       logTrace(s"Removed metadata log file: $path")
     }
   }
 
+
+  /**
+   * List the available batches on file system. As a workaround for S3 inconsistent list, it also

Review Comment:
   Given that https://aws.amazon.com/blogs/aws/amazon-s3-update-strong-read-after-write-consistency/, this looks a little misleading. Could you elaborate a little more about what you are referring by `S3 inconsistent list`, please?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org