You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/06/23 08:30:32 UTC

[GitHub] [spark] HeartSaVioR commented on a change in pull request #28904: [SPARK-30462][SS] Streamline the logic on file stream source and sink to avoid memory issue

HeartSaVioR commented on a change in pull request #28904:
URL: https://github.com/apache/spark/pull/28904#discussion_r444050433



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
##########
@@ -222,21 +256,22 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
         try {
           val logs =
             getAllValidBatches(latestId, compactInterval).flatMap { id =>
-              super.get(id).getOrElse {
+              filterInBatch(id)(shouldRetain).getOrElse {

Review comment:
       This would help when we introduce a new condition on exclusion of entries.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
##########
@@ -97,18 +97,15 @@ class FileStreamSinkLog(
     s"Please set ${SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key} (was $defaultCompactInterval) " +
       "to a positive value.")
 
-  override def compactLogs(logs: Seq[SinkFileStatus]): Seq[SinkFileStatus] = {
-    val deletedFiles = logs.filter(_.action == FileStreamSinkLog.DELETE_ACTION).map(_.path).toSet
-    if (deletedFiles.isEmpty) {
-      logs
-    } else {
-      logs.filter(f => !deletedFiles.contains(f.path))
-    }
+  override def shouldRetain(log: SinkFileStatus): Boolean = {
+    log.action != FileStreamSinkLog.DELETE_ACTION

Review comment:
       While I just keep this, I think we should just remove this. As I left TODO below, it hasn't been used, exists hypothetically.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
##########
@@ -212,7 +246,7 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
   /**
    * Returns all files except the deleted ones.
    */
-  def allFiles(): Array[T] = {
+  def allFiles(predicate: T => Boolean = _ => true): Array[T] = {

Review comment:
       We can also have a streamlined version of this method to avoid materializing all entries on initializing FileStreamSource, though I think there's the another problem we should solve (file stream source log should not have bunch of entries - think about other data sources) and once we fixed that issue it won't matter at all.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
##########
@@ -106,10 +106,8 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
     interval
   }
 
-  /**
-   * Filter out the obsolete logs.
-   */
-  def compactLogs(logs: Seq[T]): Seq[T]
+  /** Determine whether the log should be retained or not. */
+  def shouldRetain(log: T): Boolean

Review comment:
       I just retained the functionality of exclusion, as we still have a chance to apply retention which is applied per entry.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
##########
@@ -222,21 +256,22 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
         try {
           val logs =
             getAllValidBatches(latestId, compactInterval).flatMap { id =>
-              super.get(id).getOrElse {
+              filterInBatch(id)(shouldRetain).getOrElse {
                 throw new IllegalStateException(
                   s"${batchIdToPath(id)} doesn't exist " +
                     s"(latestId: $latestId, compactInterval: $compactInterval)")
               }
             }
-          return compactLogs(logs).toArray
+          return logs.toArray
         } catch {
           case e: IOException =>
             // Another process using `CompactibleFileStreamLog` may delete the batch files when
             // `StreamFileIndex` are reading. However, it only happens when a compaction is
             // deleting old files. If so, let's try the next compaction batch and we should find it.
             // Otherwise, this is a real IO issue and we should throw it.
-            latestId = nextCompactionBatchId(latestId, compactInterval)
-            super.get(latestId).getOrElse {
+            val expectedMinLatestId = nextCompactionBatchId(latestId, compactInterval)

Review comment:
       This new approach is to avoid reading the next compact file log, which materializes all entries into the file.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
##########
@@ -97,18 +97,15 @@ class FileStreamSinkLog(
     s"Please set ${SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key} (was $defaultCompactInterval) " +
       "to a positive value.")
 
-  override def compactLogs(logs: Seq[SinkFileStatus]): Seq[SinkFileStatus] = {
-    val deletedFiles = logs.filter(_.action == FileStreamSinkLog.DELETE_ACTION).map(_.path).toSet
-    if (deletedFiles.isEmpty) {
-      logs
-    } else {
-      logs.filter(f => !deletedFiles.contains(f.path))
-    }
+  override def shouldRetain(log: SinkFileStatus): Boolean = {
+    log.action != FileStreamSinkLog.DELETE_ACTION
   }
 }
 
 object FileStreamSinkLog {
   val VERSION = 1
+  // TODO: This action hasn't been used from the introduction. We should just remove this.
+  // TODO: We can remove the field "action" as well, ignoring "action" in existing metadata log.

Review comment:
       Note that this would also help to reduce the size of each entry. OK to leave only ADD_ACTION if JSON serializer/deserializer complains.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org