You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/06/30 15:02:57 UTC

[GitHub] [spark] xuanyuanking commented on a change in pull request #28904: [SPARK-30462][SS] Streamline the logic on file stream source and sink to avoid memory issue

xuanyuanking commented on a change in pull request #28904:
URL: https://github.com/apache/spark/pull/28904#discussion_r447677532



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
##########
@@ -173,37 +177,67 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
   override def purge(thresholdBatchId: Long): Unit = throw new UnsupportedOperationException(
     s"Cannot purge as it might break internal state.")
 
+  /**
+   * Apply function on all entries in the specific batch. The method will throw
+   * FileNotFouncException if `throwOnNonExist` is true. If false, the method will just return.

Review comment:
       typo: FileNotFoundException

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
##########
@@ -165,6 +165,63 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
     }
   }
 
+  /**
+   * Apply provided function to each entry in the specific batch metadata log.
+   *
+   * Unlike get which will materialize all entries into memory, this method streamlines the process
+   * via READ-AND-PROCESS. This helps to avoid the memory issue on huge metadata log file.
+   *
+   * NOTE: This no longer fails early on corruption. The caller should handle the exception
+   * properly and make sure the logic is not affected by failing in the middle.
+   */
+  def applyFnToBatchByStream[RET](batchId: Long)(fn: InputStream => RET): RET = {

Review comment:
       nit: duplicated code with `get`.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
##########
@@ -173,37 +177,67 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
   override def purge(thresholdBatchId: Long): Unit = throw new UnsupportedOperationException(
     s"Cannot purge as it might break internal state.")
 
+  /**
+   * Apply function on all entries in the specific batch. The method will throw
+   * FileNotFouncException if `throwOnNonExist` is true. If false, the method will just return.
+   */
+  def foreachInBatch(batchId: Long, throwOnNonExist: Boolean)(fn: T => Unit): Unit = {

Review comment:
       What kind of scenario do we need `throwOnNonExist = false`?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
##########
@@ -165,6 +165,63 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
     }
   }
 
+  /**
+   * Apply provided function to each entry in the specific batch metadata log.
+   *
+   * Unlike get which will materialize all entries into memory, this method streamlines the process
+   * via READ-AND-PROCESS. This helps to avoid the memory issue on huge metadata log file.
+   *
+   * NOTE: This no longer fails early on corruption. The caller should handle the exception
+   * properly and make sure the logic is not affected by failing in the middle.
+   */
+  def applyFnToBatchByStream[RET](batchId: Long)(fn: InputStream => RET): RET = {
+    val batchMetadataFile = batchIdToPath(batchId)
+    if (fileManager.exists(batchMetadataFile)) {
+      val input = fileManager.open(batchMetadataFile)
+      try {
+        fn(input)
+      } catch {
+        case ise: IllegalStateException =>
+          // re-throw the exception with the log file path added
+          throw new IllegalStateException(
+            s"Failed to read log file $batchMetadataFile. ${ise.getMessage}", ise)
+      } finally {
+        IOUtils.closeQuietly(input)
+      }
+    } else {
+      throw new FileNotFoundException(s"Unable to find batch $batchMetadataFile")
+    }
+  }
+
+  /**
+   * Store the metadata for the specified batchId and return `true` if successful. This method
+   * fills the content of metadata via executing function. If the function throws exception,
+   * writing will be automatically cancelled and this method will propagate the exception.
+   *
+   * If the batchId's metadata has already been stored, this method will return `false`.
+   */
+  def addNewBatchByStream(batchId: Long)(fn: OutputStream => Unit): Boolean = {

Review comment:
       ditto, duplicate code with writeBatchToFile

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
##########
@@ -106,10 +106,8 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
     interval
   }
 
-  /**
-   * Filter out the obsolete logs.
-   */
-  def compactLogs(logs: Seq[T]): Seq[T]
+  /** Determine whether the log should be retained or not. */
+  def shouldRetain(log: T): Boolean

Review comment:
       Little confused about this change. Seems the refactor of compactLogs and `DELETE_ACTION` didn't relate to the memory issue fixing? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org