You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/08/11 00:58:49 UTC

[GitHub] [spark] attilapiros opened a new pull request, #37474: [SPARK-40039][Streaming][WIP] Introducing a streaming checkpoint file manager based on Hadoop's Abortable interface

attilapiros opened a new pull request, #37474:
URL: https://github.com/apache/spark/pull/37474

   
   ### What changes were proposed in this pull request?
   
   Currently on S3 the checkpoint file manager (called `FileContextBasedCheckpointFileManager`) is based on the rename operation. So when a file is opened for an atomic stream a temporary file will be used behind the scenes and when the stream is committed the file is renamed to its final location.
   
   But on S3 the rename operation will be a file copy so it has some serious performance implication.
   
   On Hadoop 3 there is new interface introduce called [Abortable](https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/fs/Abortable.html) and S3AFileSystem has this capability which is implemented by on top S3's multipart upload. So when the file is committed [a POST is sent](https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html) and when aborted [a DELETE will be sent](https://docs.aws.amazon.com/AmazonS3/latest/API/API_AbortMultipartUpload.html).
   
   This avoids the file copying altogether.
   
   
   ### Why are the changes needed?
   
   For improving streaming performance.
   
   ### Does this PR introduce _any_ user-facing change?
   No.
   
   ### How was this patch tested?
   
   I have refactored the existing `CheckpointFileManagerTests` and run against a test filesystem which supports the `Abortable` interface (see `AbortableFileSystem` which is based on `RawLocalFileSystem`). 
   This way we have a unit test.
   
   Moreover the same test can be run against AWS S3 by using an integration test (see `AwsAbortableStreamBasedCheckpointFileManagerSuite`):
   
   ```
   -> S3_PATH=<..> AWS_ACCESS_KEY_ID=<..> AWS_SECRET_ACCESS_KEY=<..> AWS_SESSION_TOKEN=<..>  ./build/mvn install -pl hadoop-cloud  -Phadoop-cloud,hadoop-3,integration-test
   
   Discovery starting.
   Discovery completed in 346 milliseconds.
   Run starting. Expected test count is: 1
   AwsAbortableStreamBasedCheckpointFileManagerSuite:
   - mkdirs, list, createAtomic, open, delete, exists
   CommitterBindingSuite:
   AbortableStreamBasedCheckpointFileManagerSuite:
   Run completed in 14 seconds, 407 milliseconds.
   Total number of tests run: 1
   Suites: completed 4, aborted 0
   Tests: succeeded 1, failed 0, canceled 0, ignored 0, pending 0
   All tests passed.
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #37474: [SPARK-40039][SS][WIP] Introducing a streaming checkpoint file manager based on Hadoop's Abortable interface

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on PR #37474:
URL: https://github.com/apache/spark/pull/37474#issuecomment-1211461886

   Refer https://github.com/apache/spark/pull/31495 - I don't have an example query to benchmark here, but you can see the graph showing how much we benefit from the change. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR closed pull request #37474: [SPARK-40039][SS] Introducing a streaming checkpoint file manager based on Hadoop's Abortable interface

Posted by GitBox <gi...@apache.org>.
HeartSaVioR closed pull request #37474: [SPARK-40039][SS] Introducing a streaming checkpoint file manager based on Hadoop's Abortable interface
URL: https://github.com/apache/spark/pull/37474


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] steveloughran commented on a diff in pull request #37474: [SPARK-40039][SS] Introducing a streaming checkpoint file manager based on Hadoop's Abortable interface

Posted by GitBox <gi...@apache.org>.
steveloughran commented on code in PR #37474:
URL: https://github.com/apache/spark/pull/37474#discussion_r953592317


##########
docs/cloud-integration.md:
##########
@@ -231,9 +231,15 @@ The size of the window needs to be set to handle this.
 is no need for a workflow of write-then-rename to ensure that files aren't picked up
 while they are still being written. Applications can write straight to the monitored directory.
 
-1. Streams should only be checkpointed to a store implementing a fast and
-atomic `rename()` operation.
-Otherwise the checkpointing may be slow and potentially unreliable.
+1. In case of the default checkpoint file manager called `FileContextBasedCheckpointFileManager`
+streams should only be checkpointed to a store implementing a fast and
+atomic `rename()` operation. Otherwise the checkpointing may be slow and potentially unreliable.
+On AWS S3 with Hadoop 3.3.1 or later the abortable stream based checkpoint file manager

Review Comment:
   need to be specific about s3a connector, as EMR doesn't have it (yet). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] attilapiros commented on a diff in pull request #37474: [SPARK-40039][SS][WIP] Introducing a streaming checkpoint file manager based on Hadoop's Abortable interface

Posted by GitBox <gi...@apache.org>.
attilapiros commented on code in PR #37474:
URL: https://github.com/apache/spark/pull/37474#discussion_r943034872


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala:
##########
@@ -58,50 +69,40 @@ abstract class CheckpointFileManagerTests extends SparkFunSuite with SQLHelper {
       // Create atomic without overwrite
       var path = new Path(s"$dir/file")
       assert(!fm.exists(path))
-      fm.createAtomic(path, overwriteIfPossible = false).cancel()
+      fm.createAtomic(path, overwriteIfPossible = false).writeContent(1).cancel()

Review Comment:
   I have extended this test to write some content into those stream and check their contents to make sure which operations really wins if two tries to write into the same location.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] steveloughran commented on a diff in pull request #37474: [SPARK-40039][SS][WIP] Introducing a streaming checkpoint file manager based on Hadoop's Abortable interface

Posted by GitBox <gi...@apache.org>.
steveloughran commented on code in PR #37474:
URL: https://github.com/apache/spark/pull/37474#discussion_r943267264


##########
hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/AbortableStreamBasedCheckpointFileManager.scala:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.internal.io.cloud
+
+import java.nio.file.FileAlreadyExistsException
+import java.util.EnumSet
+
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.execution.streaming.AbstractFileContextBasedCheckpointFileManager
+import org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream
+
+class AbortableStreamBasedCheckpointFileManager(path: Path, hadoopConf: Configuration)
+  extends AbstractFileContextBasedCheckpointFileManager(path, hadoopConf) with Logging {
+
+  if (!fc.hasPathCapability(path, CommonPathCapabilities.ABORTABLE_STREAM)) {
+    throw new UnsupportedFileSystemException("AbortableStreamBasedCheckpointFileManager requires" +
+      " an fs with abortable stream support")
+  }
+
+  logInfo(s"Writing atomically to $path based on abortable stream")
+
+  class AbortableStreamBasedFSDataOutputStream(
+      fsDataOutputStream: FSDataOutputStream,
+      fc: FileContext,
+      path: Path,
+      overwriteIfPossible: Boolean) extends CancellableFSDataOutputStream(fsDataOutputStream) {
+
+    @volatile private var terminated = false
+
+    override def cancel(): Unit = synchronized {
+      if (terminated) return
+      try {
+        fsDataOutputStream.abort()
+        fsDataOutputStream.close()
+      } catch {
+          case NonFatal(e) =>
+            logWarning(s"Error cancelling write to $path", e)
+      } finally {
+        terminated = true
+      }
+    }
+
+    override def close(): Unit = synchronized {
+      if (terminated) return
+      try {
+        if (!overwriteIfPossible && fc.util().exists(path)) {
+          throw new FileAlreadyExistsException(
+            s"Failed to close atomic stream $path as destination already exists")
+        }
+        fsDataOutputStream.close()
+      } catch {
+          case NonFatal(e) =>
+            logWarning(s"Error closing $path", e)

Review Comment:
   again. log output stream



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala:
##########
@@ -58,50 +69,40 @@ abstract class CheckpointFileManagerTests extends SparkFunSuite with SQLHelper {
       // Create atomic without overwrite
       var path = new Path(s"$dir/file")
       assert(!fm.exists(path))
-      fm.createAtomic(path, overwriteIfPossible = false).cancel()
+      fm.createAtomic(path, overwriteIfPossible = false).writeContent(1).cancel()
       assert(!fm.exists(path))
-      fm.createAtomic(path, overwriteIfPossible = false).close()
+      fm.createAtomic(path, overwriteIfPossible = false).writeContent(2).close()
       assert(fm.exists(path))
+      assert(fm.open(path).readInt() == 2)
       quietly {
         intercept[IOException] {
           // should throw exception since file exists and overwrite is false
-          fm.createAtomic(path, overwriteIfPossible = false).close()
+          fm.createAtomic(path, overwriteIfPossible = false).writeContent(3).close()
         }
       }
+      assert(fm.open(path).readInt() == 2)
 
       // Create atomic with overwrite if possible
       path = new Path(s"$dir/file2")
       assert(!fm.exists(path))
-      fm.createAtomic(path, overwriteIfPossible = true).cancel()
+      fm.createAtomic(path, overwriteIfPossible = true).writeContent(4).cancel()
       assert(!fm.exists(path))
-      fm.createAtomic(path, overwriteIfPossible = true).close()
+      fm.createAtomic(path, overwriteIfPossible = true).writeContent(5).close()
       assert(fm.exists(path))
-      fm.createAtomic(path, overwriteIfPossible = true).close()  // should not throw exception
-
-      // crc file should not be leaked when origin file doesn't exist.
-      // The implementation of Hadoop filesystem may filter out checksum file, so
-      // listing files from local filesystem.
-      val fileNames = new File(path.getParent.toString).listFiles().toSeq
-        .filter(p => p.isFile).map(p => p.getName)
-      val crcFiles = fileNames.filter(n => n.startsWith(".") && n.endsWith(".crc"))
-      val originFileNamesForExistingCrcFiles = crcFiles.map { name =>
-        // remove first "." and last ".crc"
-        name.substring(1, name.length - 4)
-      }
-
-      // Check all origin files exist for all crc files.
-      assert(originFileNamesForExistingCrcFiles.toSet.subsetOf(fileNames.toSet),
-        s"Some of origin files for crc files don't exist - crc files: $crcFiles / " +
-          s"expected origin files: $originFileNamesForExistingCrcFiles / actual files: $fileNames")
+      assert(fm.open(path).readInt() == 5)
+      // should not throw exception
+      fm.createAtomic(path, overwriteIfPossible = true).writeContent(6).close()
+      assert(fm.open(path).readInt() == 6)

Review Comment:
   be good to close this; may use up process resources for rest of test run



##########
hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/AbortableStreamBasedCheckpointFileManager.scala:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.internal.io.cloud
+
+import java.nio.file.FileAlreadyExistsException
+import java.util.EnumSet
+
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.execution.streaming.AbstractFileContextBasedCheckpointFileManager
+import org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream
+
+class AbortableStreamBasedCheckpointFileManager(path: Path, hadoopConf: Configuration)
+  extends AbstractFileContextBasedCheckpointFileManager(path, hadoopConf) with Logging {
+
+  if (!fc.hasPathCapability(path, CommonPathCapabilities.ABORTABLE_STREAM)) {
+    throw new UnsupportedFileSystemException("AbortableStreamBasedCheckpointFileManager requires" +
+      " an fs with abortable stream support")
+  }
+
+  logInfo(s"Writing atomically to $path based on abortable stream")
+
+  class AbortableStreamBasedFSDataOutputStream(
+      fsDataOutputStream: FSDataOutputStream,
+      fc: FileContext,
+      path: Path,
+      overwriteIfPossible: Boolean) extends CancellableFSDataOutputStream(fsDataOutputStream) {
+
+    @volatile private var terminated = false
+
+    override def cancel(): Unit = synchronized {
+      if (terminated) return
+      try {
+        fsDataOutputStream.abort()
+        fsDataOutputStream.close()
+      } catch {
+          case NonFatal(e) =>
+            logWarning(s"Error cancelling write to $path", e)
+      } finally {
+        terminated = true
+      }
+    }
+
+    override def close(): Unit = synchronized {
+      if (terminated) return
+      try {
+        if (!overwriteIfPossible && fc.util().exists(path)) {
+          throw new FileAlreadyExistsException(

Review Comment:
   maybe call abort() here for maximum rigorousness



##########
hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/AbortableStreamBasedCheckpointFileManager.scala:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.internal.io.cloud
+
+import java.nio.file.FileAlreadyExistsException
+import java.util.EnumSet
+
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.execution.streaming.AbstractFileContextBasedCheckpointFileManager
+import org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream
+
+class AbortableStreamBasedCheckpointFileManager(path: Path, hadoopConf: Configuration)
+  extends AbstractFileContextBasedCheckpointFileManager(path, hadoopConf) with Logging {
+
+  if (!fc.hasPathCapability(path, CommonPathCapabilities.ABORTABLE_STREAM)) {
+    throw new UnsupportedFileSystemException("AbortableStreamBasedCheckpointFileManager requires" +
+      " an fs with abortable stream support")
+  }
+
+  logInfo(s"Writing atomically to $path based on abortable stream")
+
+  class AbortableStreamBasedFSDataOutputStream(
+      fsDataOutputStream: FSDataOutputStream,
+      fc: FileContext,
+      path: Path,
+      overwriteIfPossible: Boolean) extends CancellableFSDataOutputStream(fsDataOutputStream) {
+
+    @volatile private var terminated = false
+
+    override def cancel(): Unit = synchronized {
+      if (terminated) return
+      try {
+        fsDataOutputStream.abort()
+        fsDataOutputStream.close()
+      } catch {
+          case NonFatal(e) =>
+            logWarning(s"Error cancelling write to $path", e)

Review Comment:
   include SfsDataOutputStream in the log to get any info from it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] attilapiros commented on pull request #37474: [SPARK-40039][SS][WIP] Introducing a streaming checkpoint file manager based on Hadoop's Abortable interface

Posted by GitBox <gi...@apache.org>.
attilapiros commented on PR #37474:
URL: https://github.com/apache/spark/pull/37474#issuecomment-1211458229

   I guess it could be because of the extra REST call (the POST) but this extra REST call is true for the rename as well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] attilapiros commented on pull request #37474: [SPARK-40039][SS][WIP] Introducing a streaming checkpoint file manager based on Hadoop's Abortable interface

Posted by GitBox <gi...@apache.org>.
attilapiros commented on PR #37474:
URL: https://github.com/apache/spark/pull/37474#issuecomment-1211460105

   @HeartSaVioR Do we have Structured Streaming Benchmark? I run a quick grep but found nothing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] attilapiros commented on pull request #37474: [SPARK-40039][SS][WIP] Introducing a streaming checkpoint file manager based on Hadoop's Abortable interface

Posted by GitBox <gi...@apache.org>.
attilapiros commented on PR #37474:
URL: https://github.com/apache/spark/pull/37474#issuecomment-1215630207

   Thanks @steveloughran for the review and confirming how the abortable is working for small files! I have updated the PR description accordingly. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] attilapiros commented on a diff in pull request #37474: [SPARK-40039][SS][WIP] Introducing a streaming checkpoint file manager based on Hadoop's Abortable interface

Posted by GitBox <gi...@apache.org>.
attilapiros commented on code in PR #37474:
URL: https://github.com/apache/spark/pull/37474#discussion_r944030204


##########
hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/AbortableStreamBasedCheckpointFileManager.scala:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.internal.io.cloud
+
+import java.nio.file.FileAlreadyExistsException
+import java.util.EnumSet
+
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.execution.streaming.AbstractFileContextBasedCheckpointFileManager
+import org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream
+
+class AbortableStreamBasedCheckpointFileManager(path: Path, hadoopConf: Configuration)
+  extends AbstractFileContextBasedCheckpointFileManager(path, hadoopConf) with Logging {
+
+  if (!fc.hasPathCapability(path, CommonPathCapabilities.ABORTABLE_STREAM)) {
+    throw new UnsupportedFileSystemException("AbortableStreamBasedCheckpointFileManager requires" +
+      " an fs with abortable stream support")
+  }
+
+  logInfo(s"Writing atomically to $path based on abortable stream")
+
+  class AbortableStreamBasedFSDataOutputStream(
+      fsDataOutputStream: FSDataOutputStream,
+      fc: FileContext,
+      path: Path,
+      overwriteIfPossible: Boolean) extends CancellableFSDataOutputStream(fsDataOutputStream) {
+
+    @volatile private var terminated = false
+
+    override def cancel(): Unit = synchronized {
+      if (terminated) return
+      try {
+        fsDataOutputStream.abort()
+        fsDataOutputStream.close()
+      } catch {
+          case NonFatal(e) =>
+            logWarning(s"Error cancelling write to $path", e)
+      } finally {
+        terminated = true
+      }
+    }
+
+    override def close(): Unit = synchronized {
+      if (terminated) return
+      try {
+        if (!overwriteIfPossible && fc.util().exists(path)) {
+          throw new FileAlreadyExistsException(

Review Comment:
   Great catch! 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] steveloughran commented on pull request #37474: [SPARK-40039][SS][WIP] Introducing a streaming checkpoint file manager based on Hadoop's Abortable interface

Posted by GitBox <gi...@apache.org>.
steveloughran commented on PR #37474:
URL: https://github.com/apache/spark/pull/37474#issuecomment-1211704141

   update. when an MPU is active, the upload is aborted in the abort() call. any active uploads in separate threads are interrupted/cancelled
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #37474: [SPARK-40039][SS] Introducing a streaming checkpoint file manager based on Hadoop's Abortable interface

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on PR #37474:
URL: https://github.com/apache/spark/pull/37474#issuecomment-1229123426

   This actually breaks Hadoop 2 compilation, and I believe we still release w/ Hadoop 2 profile too cc @tgravescs FYI, see https://github.com/apache/spark/actions/runs/2933948860.
   
   I am reverting this for now to keep the build alive


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] attilapiros commented on pull request #37474: [SPARK-40039][SS][WIP] Introducing a streaming checkpoint file manager based on Hadoop's Abortable interface

Posted by GitBox <gi...@apache.org>.
attilapiros commented on PR #37474:
URL: https://github.com/apache/spark/pull/37474#issuecomment-1211455973

   Oh, we have an outdated description at the Spark's contribution site:
   
   > The PR title should be of the form [SPARK-xxxx][COMPONENT] Title, where SPARK-xxxx is the relevant JIRA number, COMPONENT is one of the PR categories shown at [spark-prs.appspot.com](https://spark-prs.appspot.com/) and Title may be the JIRA’s title or a more specific title describing the PR itself.
   
   Or better the spark-prs.appspot.com should be updated!
   
   @HeartSaVioR no, I have not run a performance test yet. Do you happen to know why multipart upload is slower for tiny files? It really surprises me. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #37474: [SPARK-40039][SS][WIP] Introducing a streaming checkpoint file manager based on Hadoop's Abortable interface

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on PR #37474:
URL: https://github.com/apache/spark/pull/37474#issuecomment-1211458224

   No I'm just thinking based on physics - multi-part uploads require more ping-pong against AWS S3.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] attilapiros commented on a diff in pull request #37474: [SPARK-40039][SS] Introducing a streaming checkpoint file manager based on Hadoop's Abortable interface

Posted by GitBox <gi...@apache.org>.
attilapiros commented on code in PR #37474:
URL: https://github.com/apache/spark/pull/37474#discussion_r950764250


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala:
##########
@@ -58,50 +77,40 @@ abstract class CheckpointFileManagerTests extends SparkFunSuite with SQLHelper {
       // Create atomic without overwrite
       var path = new Path(s"$dir/file")
       assert(!fm.exists(path))
-      fm.createAtomic(path, overwriteIfPossible = false).cancel()
+      fm.createAtomic(path, overwriteIfPossible = false).writeContent(1).cancel()
       assert(!fm.exists(path))
-      fm.createAtomic(path, overwriteIfPossible = false).close()
+      fm.createAtomic(path, overwriteIfPossible = false).writeContent(2).close()
       assert(fm.exists(path))
+      assert(fm.open(path).readContent() == 2)
       quietly {
         intercept[IOException] {
           // should throw exception since file exists and overwrite is false
-          fm.createAtomic(path, overwriteIfPossible = false).close()
+          fm.createAtomic(path, overwriteIfPossible = false).writeContent(3).close()
         }
       }
+      assert(fm.open(path).readContent() == 2)
 
       // Create atomic with overwrite if possible
       path = new Path(s"$dir/file2")
       assert(!fm.exists(path))
-      fm.createAtomic(path, overwriteIfPossible = true).cancel()
+      fm.createAtomic(path, overwriteIfPossible = true).writeContent(4).cancel()
       assert(!fm.exists(path))
-      fm.createAtomic(path, overwriteIfPossible = true).close()
+      fm.createAtomic(path, overwriteIfPossible = true).writeContent(5).close()
       assert(fm.exists(path))
-      fm.createAtomic(path, overwriteIfPossible = true).close()  // should not throw exception
-
-      // crc file should not be leaked when origin file doesn't exist.
-      // The implementation of Hadoop filesystem may filter out checksum file, so
-      // listing files from local filesystem.
-      val fileNames = new File(path.getParent.toString).listFiles().toSeq
-        .filter(p => p.isFile).map(p => p.getName)
-      val crcFiles = fileNames.filter(n => n.startsWith(".") && n.endsWith(".crc"))
-      val originFileNamesForExistingCrcFiles = crcFiles.map { name =>
-        // remove first "." and last ".crc"
-        name.substring(1, name.length - 4)
-      }
-
-      // Check all origin files exist for all crc files.
-      assert(originFileNamesForExistingCrcFiles.toSet.subsetOf(fileNames.toSet),
-        s"Some of origin files for crc files don't exist - crc files: $crcFiles / " +
-          s"expected origin files: $originFileNamesForExistingCrcFiles / actual files: $fileNames")
+      assert(fm.open(path).readContent() == 5)
+      // should not throw exception
+      fm.createAtomic(path, overwriteIfPossible = true).writeContent(6).close()
+      assert(fm.open(path).readContent() == 6)
 
+      checkLeakingCrcFiles(dir)
       // Open and delete
       fm.open(path).close()
       fm.delete(path)
       assert(!fm.exists(path))
       intercept[IOException] {
         fm.open(path)
       }
-      fm.delete(path) // should not throw exception
+      fm.delete(dir) // should not throw exception

Review Comment:
   yes we can revert this back as `afterAll()` will clean the dir anyway



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] attilapiros commented on pull request #37474: [SPARK-40039][SS] Introducing a streaming checkpoint file manager based on Hadoop's Abortable interface

Posted by GitBox <gi...@apache.org>.
attilapiros commented on PR #37474:
URL: https://github.com/apache/spark/pull/37474#issuecomment-1229127015

   I see: my test code changes are used for the Hadoop 2 profile. We can fix this easily next week.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #37474: [SPARK-40039][SS] Introducing a streaming checkpoint file manager based on Hadoop's Abortable interface

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on PR #37474:
URL: https://github.com/apache/spark/pull/37474#issuecomment-1229143104

   Looks like all code changes in hadoop-cloud except IntegrationTestSuite should be moved to hadoop-3 and it would work afterwards. Thanks @HyukjinKwon for handling the issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] attilapiros commented on pull request #37474: [SPARK-40039][SS] Introducing a streaming checkpoint file manager based on Hadoop's Abortable interface

Posted by GitBox <gi...@apache.org>.
attilapiros commented on PR #37474:
URL: https://github.com/apache/spark/pull/37474#issuecomment-1227645281

   Thanks again for all the effort made on this PR @steveloughran and @HeartSaVioR!
   
   @steveloughran I hope now as `AbortableStreamBasedCheckpointFileManager` is available it will turn out those hiccups were worth it!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] steveloughran commented on a diff in pull request #37474: [SPARK-40039][SS][WIP] Introducing a streaming checkpoint file manager based on Hadoop's Abortable interface

Posted by GitBox <gi...@apache.org>.
steveloughran commented on code in PR #37474:
URL: https://github.com/apache/spark/pull/37474#discussion_r943251576


##########
hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/AbortableStreamBasedCheckpointFileManagerSuite.scala:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.internal.io.cloud
+
+import java.io.File
+
+import scala.util.Properties
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.io.cloud.abortable.AbortableFileSystem
+import org.apache.spark.sql.execution.streaming.CheckpointFileManager
+import org.apache.spark.sql.execution.streaming.CheckpointFileManagerTests
+
+class AbortableStreamBasedCheckpointFileManagerSuite
+  extends CheckpointFileManagerTests with Logging {
+
+  override def withTempHadoopPath(p: Path => Unit): Unit = {
+    withTempDir { f: File =>
+      val basePath = new Path(AbortableFileSystem.ABORTABLE_FS_SCHEME, null, f.getAbsolutePath)
+      p(basePath)
+    }
+  }
+
+  override def checkLeakingCrcFiles(path: Path): Unit = { }
+
+  override def createManager(path: Path): CheckpointFileManager = {
+    val conf = new Configuration()
+    conf.set(s"fs.AbstractFileSystem.${AbortableFileSystem.ABORTABLE_FS_SCHEME}.impl",
+      "org.apache.spark.internal.io.cloud.abortable.AbstractAbortableFileSystem")
+    new AbortableStreamBasedCheckpointFileManager(path, conf)
+  }
+}
+
+@IntegrationTestSuite
+class AwsAbortableStreamBasedCheckpointFileManagerSuite
+    extends AbortableStreamBasedCheckpointFileManagerSuite with BeforeAndAfter {
+
+  val s3aPath = Properties.envOrNone("S3A_PATH")
+
+  override protected def beforeAll(): Unit = {
+    assert(s3aPath.isDefined, "S3A_PATH must be defined!")
+  }
+
+  override def withTempHadoopPath(p: Path => Unit): Unit = {
+    p(new Path(s3aPath.get))
+  }
+
+  override def createManager(path: Path): CheckpointFileManager = {
+    val conf = new Configuration()
+    conf.set("fs.s3a.aws.credentials.provider",

Review Comment:
   this is always set in the default chain. leaving that list alone will ensure that when run in ec2 it will also check the IAM role provider and so pick up the credentials the VM/container was granted.
   
   propose: cut this line



##########
hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/AbortableStreamBasedCheckpointFileManagerSuite.scala:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.internal.io.cloud
+
+import java.io.File
+
+import scala.util.Properties
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.io.cloud.abortable.AbortableFileSystem
+import org.apache.spark.sql.execution.streaming.CheckpointFileManager
+import org.apache.spark.sql.execution.streaming.CheckpointFileManagerTests
+
+class AbortableStreamBasedCheckpointFileManagerSuite
+  extends CheckpointFileManagerTests with Logging {
+
+  override def withTempHadoopPath(p: Path => Unit): Unit = {
+    withTempDir { f: File =>
+      val basePath = new Path(AbortableFileSystem.ABORTABLE_FS_SCHEME, null, f.getAbsolutePath)
+      p(basePath)
+    }
+  }
+
+  override def checkLeakingCrcFiles(path: Path): Unit = { }
+
+  override def createManager(path: Path): CheckpointFileManager = {
+    val conf = new Configuration()
+    conf.set(s"fs.AbstractFileSystem.${AbortableFileSystem.ABORTABLE_FS_SCHEME}.impl",
+      "org.apache.spark.internal.io.cloud.abortable.AbstractAbortableFileSystem")
+    new AbortableStreamBasedCheckpointFileManager(path, conf)
+  }
+}
+
+@IntegrationTestSuite
+class AwsAbortableStreamBasedCheckpointFileManagerSuite
+    extends AbortableStreamBasedCheckpointFileManagerSuite with BeforeAndAfter {

Review Comment:
   propose an afterAll() which deletes the path if set, swallows any exceptions. keeps costs down as no data is retained on the bucket after the run



##########
hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/AbortableStreamBasedCheckpointFileManagerSuite.scala:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.internal.io.cloud
+
+import java.io.File
+
+import scala.util.Properties
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.io.cloud.abortable.AbortableFileSystem
+import org.apache.spark.sql.execution.streaming.CheckpointFileManager
+import org.apache.spark.sql.execution.streaming.CheckpointFileManagerTests
+
+class AbortableStreamBasedCheckpointFileManagerSuite
+  extends CheckpointFileManagerTests with Logging {
+
+  override def withTempHadoopPath(p: Path => Unit): Unit = {
+    withTempDir { f: File =>
+      val basePath = new Path(AbortableFileSystem.ABORTABLE_FS_SCHEME, null, f.getAbsolutePath)
+      p(basePath)
+    }
+  }
+
+  override def checkLeakingCrcFiles(path: Path): Unit = { }
+
+  override def createManager(path: Path): CheckpointFileManager = {
+    val conf = new Configuration()
+    conf.set(s"fs.AbstractFileSystem.${AbortableFileSystem.ABORTABLE_FS_SCHEME}.impl",
+      "org.apache.spark.internal.io.cloud.abortable.AbstractAbortableFileSystem")
+    new AbortableStreamBasedCheckpointFileManager(path, conf)
+  }
+}
+
+@IntegrationTestSuite
+class AwsAbortableStreamBasedCheckpointFileManagerSuite

Review Comment:
   maybe use s3 in the title as it is s3 only



##########
hadoop-cloud/README.md:
##########
@@ -0,0 +1,20 @@
+---
+layout: global
+title: Spark Hadoop3 Integration Tests
+---
+
+# Running the Integration Tests
+
+As mocking of an external systems (like AWS S3) is not always perfect the unit testing should be
+extended with integration testing. This is why the build profile `integration-test` has been
+introduced here. When it is given (`-pintegration-test`) for testing then only those tests are

Review Comment:
   `-P` rather than `-p`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] attilapiros commented on pull request #37474: [SPARK-40039][SS] Introducing a streaming checkpoint file manager based on Hadoop's Abortable interface

Posted by GitBox <gi...@apache.org>.
attilapiros commented on PR #37474:
URL: https://github.com/apache/spark/pull/37474#issuecomment-1221197541

   @HeartSaVioR sure, please find the "Performance test" at the description.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #37474: [SPARK-40039][SS][WIP] Introducing a streaming checkpoint file manager based on Hadoop's Abortable interface

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on PR #37474:
URL: https://github.com/apache/spark/pull/37474#issuecomment-1211452718

   Duplicating my comment in JIRA ticket here:
   
   Very interesting one to see! (Disclaimer: Abortable was something I worked with Steve.)
   
   Have you gone through some benchmarks to figure out this works with small to big files? One thing I wonder is whether multipart upload performs well with tiny file. We have lots of tiny files in checkpoint and all files could be pretty tiny for stateless query.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] attilapiros commented on a diff in pull request #37474: [SPARK-40039][SS][WIP] Introducing a streaming checkpoint file manager based on Hadoop's Abortable interface

Posted by GitBox <gi...@apache.org>.
attilapiros commented on code in PR #37474:
URL: https://github.com/apache/spark/pull/37474#discussion_r943979385


##########
hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/AbortableStreamBasedCheckpointFileManagerSuite.scala:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.internal.io.cloud
+
+import java.io.File
+
+import scala.util.Properties
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.io.cloud.abortable.AbortableFileSystem
+import org.apache.spark.sql.execution.streaming.CheckpointFileManager
+import org.apache.spark.sql.execution.streaming.CheckpointFileManagerTests
+
+class AbortableStreamBasedCheckpointFileManagerSuite
+  extends CheckpointFileManagerTests with Logging {
+
+  override def withTempHadoopPath(p: Path => Unit): Unit = {
+    withTempDir { f: File =>
+      val basePath = new Path(AbortableFileSystem.ABORTABLE_FS_SCHEME, null, f.getAbsolutePath)
+      p(basePath)
+    }
+  }
+
+  override def checkLeakingCrcFiles(path: Path): Unit = { }
+
+  override def createManager(path: Path): CheckpointFileManager = {
+    val conf = new Configuration()
+    conf.set(s"fs.AbstractFileSystem.${AbortableFileSystem.ABORTABLE_FS_SCHEME}.impl",
+      "org.apache.spark.internal.io.cloud.abortable.AbstractAbortableFileSystem")
+    new AbortableStreamBasedCheckpointFileManager(path, conf)
+  }
+}
+
+@IntegrationTestSuite
+class AwsAbortableStreamBasedCheckpointFileManagerSuite
+    extends AbortableStreamBasedCheckpointFileManagerSuite with BeforeAndAfter {

Review Comment:
   Ok. But to make it safe and avoid deleting valuable data first before the tests I have to check whether S3A_PATH is really empty.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #37474: [SPARK-40039][SS] Introducing a streaming checkpoint file manager based on Hadoop's Abortable interface

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #37474:
URL: https://github.com/apache/spark/pull/37474#discussion_r950751083


##########
hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/AbortableStreamBasedCheckpointFileManager.scala:
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.internal.io.cloud
+
+import java.nio.file.FileAlreadyExistsException
+import java.util.EnumSet
+
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.execution.streaming.AbstractFileContextBasedCheckpointFileManager
+import org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream
+
+class AbortableStreamBasedCheckpointFileManager(path: Path, hadoopConf: Configuration)

Review Comment:
   Would be nice to have a classdoc describing the rationalization (briefly explain what this does and how it helps), and the requirement of path. End users wouldn't know about capability we define in Hadoop Filesystem - so better to just list up schemes directly (e.g. s3a).



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala:
##########
@@ -58,50 +77,40 @@ abstract class CheckpointFileManagerTests extends SparkFunSuite with SQLHelper {
       // Create atomic without overwrite
       var path = new Path(s"$dir/file")
       assert(!fm.exists(path))
-      fm.createAtomic(path, overwriteIfPossible = false).cancel()
+      fm.createAtomic(path, overwriteIfPossible = false).writeContent(1).cancel()
       assert(!fm.exists(path))
-      fm.createAtomic(path, overwriteIfPossible = false).close()
+      fm.createAtomic(path, overwriteIfPossible = false).writeContent(2).close()
       assert(fm.exists(path))
+      assert(fm.open(path).readContent() == 2)
       quietly {
         intercept[IOException] {
           // should throw exception since file exists and overwrite is false
-          fm.createAtomic(path, overwriteIfPossible = false).close()
+          fm.createAtomic(path, overwriteIfPossible = false).writeContent(3).close()
         }
       }
+      assert(fm.open(path).readContent() == 2)
 
       // Create atomic with overwrite if possible
       path = new Path(s"$dir/file2")
       assert(!fm.exists(path))
-      fm.createAtomic(path, overwriteIfPossible = true).cancel()
+      fm.createAtomic(path, overwriteIfPossible = true).writeContent(4).cancel()
       assert(!fm.exists(path))
-      fm.createAtomic(path, overwriteIfPossible = true).close()
+      fm.createAtomic(path, overwriteIfPossible = true).writeContent(5).close()
       assert(fm.exists(path))
-      fm.createAtomic(path, overwriteIfPossible = true).close()  // should not throw exception
-
-      // crc file should not be leaked when origin file doesn't exist.
-      // The implementation of Hadoop filesystem may filter out checksum file, so
-      // listing files from local filesystem.
-      val fileNames = new File(path.getParent.toString).listFiles().toSeq
-        .filter(p => p.isFile).map(p => p.getName)
-      val crcFiles = fileNames.filter(n => n.startsWith(".") && n.endsWith(".crc"))
-      val originFileNamesForExistingCrcFiles = crcFiles.map { name =>
-        // remove first "." and last ".crc"
-        name.substring(1, name.length - 4)
-      }
-
-      // Check all origin files exist for all crc files.
-      assert(originFileNamesForExistingCrcFiles.toSet.subsetOf(fileNames.toSet),
-        s"Some of origin files for crc files don't exist - crc files: $crcFiles / " +
-          s"expected origin files: $originFileNamesForExistingCrcFiles / actual files: $fileNames")
+      assert(fm.open(path).readContent() == 5)
+      // should not throw exception
+      fm.createAtomic(path, overwriteIfPossible = true).writeContent(6).close()
+      assert(fm.open(path).readContent() == 6)
 
+      checkLeakingCrcFiles(dir)
       // Open and delete
       fm.open(path).close()
       fm.delete(path)
       assert(!fm.exists(path))
       intercept[IOException] {
         fm.open(path)
       }
-      fm.delete(path) // should not throw exception
+      fm.delete(dir) // should not throw exception

Review Comment:
   Isn't `path` correct here as we test against file rather than directory? 



##########
hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/AbortableStreamBasedCheckpointFileManager.scala:
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.internal.io.cloud
+
+import java.nio.file.FileAlreadyExistsException
+import java.util.EnumSet
+
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.execution.streaming.AbstractFileContextBasedCheckpointFileManager
+import org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream
+
+class AbortableStreamBasedCheckpointFileManager(path: Path, hadoopConf: Configuration)

Review Comment:
   Also good to describe the limitation as well. Assuming the path is S3, if my understanding is correct, concurrent puts against same file will all succeed. Although we check the file with exists before closing the stream (put), there is a race condition which could mess up the checkpoint, so end users are encouraged to take serious care to prevent multiple queries to access same checkpoint.
   
   Default checkpoint file manager with file system supporting atomic rename would not suffer from concurrency issue.
   
   cc. @steveloughran in case if you got a chance to correct me if I'm mistaken. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #37474: [SPARK-40039][SS] Introducing a streaming checkpoint file manager based on Hadoop's Abortable interface

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on PR #37474:
URL: https://github.com/apache/spark/pull/37474#issuecomment-1229130755

   Thanks for taking a look.
   
   (Just a bit of context, there are other multiple commits that caused other build failures so I just decided to revert all together)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #37474: [SPARK-40039][SS] Introducing a streaming checkpoint file manager based on Hadoop's Abortable interface

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on PR #37474:
URL: https://github.com/apache/spark/pull/37474#issuecomment-1226802090

   Looks like Steve's comment has addressed. Thanks! Merging to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #37474: [SPARK-40039][SS] Introducing a streaming checkpoint file manager based on Hadoop's Abortable interface

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #37474:
URL: https://github.com/apache/spark/pull/37474#discussion_r950767972


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala:
##########
@@ -58,50 +77,40 @@ abstract class CheckpointFileManagerTests extends SparkFunSuite with SQLHelper {
       // Create atomic without overwrite
       var path = new Path(s"$dir/file")
       assert(!fm.exists(path))
-      fm.createAtomic(path, overwriteIfPossible = false).cancel()
+      fm.createAtomic(path, overwriteIfPossible = false).writeContent(1).cancel()
       assert(!fm.exists(path))
-      fm.createAtomic(path, overwriteIfPossible = false).close()
+      fm.createAtomic(path, overwriteIfPossible = false).writeContent(2).close()
       assert(fm.exists(path))
+      assert(fm.open(path).readContent() == 2)
       quietly {
         intercept[IOException] {
           // should throw exception since file exists and overwrite is false
-          fm.createAtomic(path, overwriteIfPossible = false).close()
+          fm.createAtomic(path, overwriteIfPossible = false).writeContent(3).close()
         }
       }
+      assert(fm.open(path).readContent() == 2)
 
       // Create atomic with overwrite if possible
       path = new Path(s"$dir/file2")
       assert(!fm.exists(path))
-      fm.createAtomic(path, overwriteIfPossible = true).cancel()
+      fm.createAtomic(path, overwriteIfPossible = true).writeContent(4).cancel()
       assert(!fm.exists(path))
-      fm.createAtomic(path, overwriteIfPossible = true).close()
+      fm.createAtomic(path, overwriteIfPossible = true).writeContent(5).close()
       assert(fm.exists(path))
-      fm.createAtomic(path, overwriteIfPossible = true).close()  // should not throw exception
-
-      // crc file should not be leaked when origin file doesn't exist.
-      // The implementation of Hadoop filesystem may filter out checksum file, so
-      // listing files from local filesystem.
-      val fileNames = new File(path.getParent.toString).listFiles().toSeq
-        .filter(p => p.isFile).map(p => p.getName)
-      val crcFiles = fileNames.filter(n => n.startsWith(".") && n.endsWith(".crc"))
-      val originFileNamesForExistingCrcFiles = crcFiles.map { name =>
-        // remove first "." and last ".crc"
-        name.substring(1, name.length - 4)
-      }
-
-      // Check all origin files exist for all crc files.
-      assert(originFileNamesForExistingCrcFiles.toSet.subsetOf(fileNames.toSet),
-        s"Some of origin files for crc files don't exist - crc files: $crcFiles / " +
-          s"expected origin files: $originFileNamesForExistingCrcFiles / actual files: $fileNames")
+      assert(fm.open(path).readContent() == 5)
+      // should not throw exception
+      fm.createAtomic(path, overwriteIfPossible = true).writeContent(6).close()
+      assert(fm.open(path).readContent() == 6)
 
+      checkLeakingCrcFiles(dir)
       // Open and delete
       fm.open(path).close()
       fm.delete(path)
       assert(!fm.exists(path))
       intercept[IOException] {
         fm.open(path)
       }
-      fm.delete(path) // should not throw exception
+      fm.delete(dir) // should not throw exception

Review Comment:
   Yeah, actually this intends to "test" the behavior (triggering delete against non-existing file does not throw exception), not to clean up the resource.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] steveloughran commented on pull request #37474: [SPARK-40039][SS] Introducing a streaming checkpoint file manager based on Hadoop's Abortable interface

Posted by GitBox <gi...@apache.org>.
steveloughran commented on PR #37474:
URL: https://github.com/apache/spark/pull/37474#issuecomment-1217004467

   LGTM. the test failure is in python streaming tests...could it somehow be related?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #37474: [SPARK-40039][SS][WIP] Introducing a streaming checkpoint file manager based on Hadoop's Abortable interface

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on PR #37474:
URL: https://github.com/apache/spark/pull/37474#issuecomment-1211461085

   No, I guess the best bet to you is to construct a no-op query reading from rate stream and do nothing and writing to no-op sink, and run in EC2. Looking into latency in WAL commit phase would show the difference.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] steveloughran commented on pull request #37474: [SPARK-40039][SS][WIP] Introducing a streaming checkpoint file manager based on Hadoop's Abortable interface

Posted by GitBox <gi...@apache.org>.
steveloughran commented on PR #37474:
URL: https://github.com/apache/spark/pull/37474#issuecomment-1211699163

   hey @HeartSaVioR. yes, this is exactly what the API we worked on was designed for.
   
   There is no need to initiate an MPU when writing small files; the OutputStream simply doesn't upload the data anymore. you can check this by calling toString() on the stream, all its IO stats there. This means that the cost is as normal; one PUT for data <= the block size, after that one POST to initiate, one POST per block and one POST in close() to finalize. block uploads are parallelised, though you do need enough https connection for this.
   
   It's no more expensive than normal write; upload performance will be the same. except when you call abort(), when it is faster.
   
   that said, let me review the code to confirm this
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #37474: [SPARK-40039][SS][WIP] Introducing a streaming checkpoint file manager based on Hadoop's Abortable interface

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on PR #37474:
URL: https://github.com/apache/spark/pull/37474#issuecomment-1211719349

   Thanks for confirming! That is exactly what I understand as it is.
   My another understanding is that this does not provide the semantic "fail if exists", right? So while we achieve atomic put with much better performance (as it was quite inefficient for us to upload the temp file to S3), it doesn't still capture the case where two concurrent streaming queries pick up the same checkpoint and run. It will always overwrite the existing one. Could you please confirm?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #37474: [SPARK-40039][SS][WIP] Introducing a streaming checkpoint file manager based on Hadoop's Abortable interface

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on PR #37474:
URL: https://github.com/apache/spark/pull/37474#issuecomment-1211458861

   Yeah that's also true. Btw, when I worked on abort with Steve it wasn't always gone through multipart upload, but my memory could be incorrect. Could you please double check for this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] steveloughran commented on pull request #37474: [SPARK-40039][SS] Introducing a streaming checkpoint file manager based on Hadoop's Abortable interface

Posted by GitBox <gi...@apache.org>.
steveloughran commented on PR #37474:
URL: https://github.com/apache/spark/pull/37474#issuecomment-1230138117

   it's really time the hadoop-2 branch was laid to rest. if someone wants to run new code in a hadoop-2 cluster they'll need to be on java7; most of the stable production hadoop clusters run on that. only hadoop 3.2+ makes any commitment to work with java 11 "i.e. problems with java 11 will not be closed as WONTFIX". time to kill it, cut testing in half, etc etc


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] steveloughran commented on pull request #37474: [SPARK-40039][SS] Introducing a streaming checkpoint file manager based on Hadoop's Abortable interface

Posted by GitBox <gi...@apache.org>.
steveloughran commented on PR #37474:
URL: https://github.com/apache/spark/pull/37474#issuecomment-1227014957

   nice to see that feature @HeartSaVioR and i worked on being used. during that work i changed s3a output stream to fail if anyone called flush(), which was an interesting experiment to see what code expected it to work on s3. ranger, for example....


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] attilapiros commented on pull request #37474: [SPARK-40039][SS][WIP] Introducing a streaming checkpoint file manager based on Hadoop's Abortable interface

Posted by GitBox <gi...@apache.org>.
attilapiros commented on PR #37474:
URL: https://github.com/apache/spark/pull/37474#issuecomment-1215869382

   The pyspark errors are unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #37474: [SPARK-40039][SS] Introducing a streaming checkpoint file manager based on Hadoop's Abortable interface

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on PR #37474:
URL: https://github.com/apache/spark/pull/37474#issuecomment-1220155962

   @attilapiros Could you please perform some performance benchmark and put the observation in PR description, as I mentioned in previous comments? Thanks in advance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org