You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2022/08/29 01:42:54 UTC

[spark] branch master updated: [SPARK-40039][SS] Introducing a streaming checkpoint file manager based on Hadoop's Abortable interface

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b89d077d85b [SPARK-40039][SS] Introducing a streaming checkpoint file manager based on Hadoop's Abortable interface
b89d077d85b is described below

commit b89d077d85b4d0628beafd2bc8fa096a8c27c8a6
Author: attilapiros <pi...@gmail.com>
AuthorDate: Mon Aug 29 10:42:38 2022 +0900

    [SPARK-40039][SS] Introducing a streaming checkpoint file manager based on Hadoop's Abortable interface
    
    ### What changes were proposed in this pull request?
    
    Currently on S3 the checkpoint file manager (called `FileContextBasedCheckpointFileManager`) is available which is based on the rename operation. So when a file is opened for an atomic stream a temporary file will be used behind the scenes and when the stream is committed the file is renamed to its final location.
    
    But on S3 the rename operation will be a file copy so it has some serious performance implication.
    
    On Hadoop 3 there is new interface introduce called [Abortable](https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/fs/Abortable.html) and S3AFileSystem has this capability. When the file is small (<= the block size) this will be a single PUT as commit and no operation if it is aborted. When the file is bigger then S3's multipart upload is used: so when the file is committed [a POST is sent](https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html) and  [...]
    
    This avoids the file copying altogether.
    
    ### Why are the changes needed?
    
    For improving streaming performance.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    
    #### Unit test
    
    I have refactored the existing `CheckpointFileManagerTests` and run against a test filesystem which supports the `Abortable` interface (see `AbortableFileSystem` which is based on `RawLocalFileSystem`).
    This way we have a unit test.
    
    #### Integration test
    
    Moreover the same test can be run against AWS S3 by using an integration test (see `AwsS3AbortableStreamBasedCheckpointFileManagerSuite`):
    
    ```
    -> S3_PATH=<..> AWS_ACCESS_KEY_ID=<..> AWS_SECRET_ACCESS_KEY=<..> AWS_SESSION_TOKEN=<..>  ./build/mvn install -pl hadoop-cloud  -Phadoop-cloud,hadoop-3,integration-test
    
    Discovery starting.
    Discovery completed in 346 milliseconds.
    Run starting. Expected test count is: 1
    AwsS3AbortableStreamBasedCheckpointFileManagerSuite:
    - mkdirs, list, createAtomic, open, delete, exists
    CommitterBindingSuite:
    AbortableStreamBasedCheckpointFileManagerSuite:
    Run completed in 14 seconds, 407 milliseconds.
    Total number of tests run: 1
    Suites: completed 4, aborted 0
    Tests: succeeded 1, failed 0, canceled 0, ignored 0, pending 0
    All tests passed.
    ```
    
    #### Performance test
    
    I have run a [small performance app](https://github.com/attilapiros/spark-ss-perf/blob/ab4c6004caffc38a218fa81fd5482a6cc07ca14f/src/main/scala/perf.scala) which uses a rate stream and foreach sink with an empty body. The results:
    
    ```
    ➜  spark git:(SPARK-40039) ✗ ./bin/spark-submit ../spark-ss-perf/target/scala-2.12/performance-spark-ss_2.12-0.1.jar s3a://mybucket org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager  2>&1 | grep "walCommit took" | awk '{print $7}' |  datamash max 1 min 1 mean 1 median 1 perc:90 1 perc:95 1 perc:99 1
    4143    3286    3528.6  3500    3742.8  3840    4076.04
    
    ➜  spark git:(SPARK-40039) ✗ ./bin/spark-submit ../spark-ss-perf/target/scala-2.12/performance-spark-ss_2.12-0.1.jar s3a://mybucket org.apache.spark.internal.io.cloud.AbortableStreamBasedCheckpointFileManager  2>&1 | grep "walCommit took" | awk '{print $7}' |  datamash max 1 min 1 mean 1 median 1 perc:90 1 perc:95 1 perc:99 1
    3765    1447    2187.0217391304 1844.5  2867    2976.5  3437.85
    ```
    
    Closes #37687 from attilapiros/SPARK-40039-II.
    
    Authored-by: attilapiros <pi...@gmail.com>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 docs/cloud-integration.md                          |  12 ++-
 hadoop-cloud/README.md                             |  20 ++++
 hadoop-cloud/pom.xml                               |  63 ++++++++++--
 ...AbortableStreamBasedCheckpointFileManager.scala |  96 +++++++++++++++++
 .../io/cloud/abortable/AbortableFileSystem.java    | 113 +++++++++++++++++++++
 .../abortable/AbstractAbortableFileSystem.java     |  44 ++++++++
 .../src/hadoop-3/test/resources/log4j2.properties  |  40 ++++++++
 ...ableStreamBasedCheckpointFileManagerSuite.scala |  83 +++++++++++++++
 .../internal/io/cloud/IntegrationTestSuite.java    |  29 ++++++
 project/SparkBuild.scala                           |   3 +-
 .../streaming/CheckpointFileManager.scala          |  57 ++++++-----
 .../streaming/CheckpointFileManagerSuite.scala     |  94 ++++++++++++-----
 12 files changed, 589 insertions(+), 65 deletions(-)

diff --git a/docs/cloud-integration.md b/docs/cloud-integration.md
index d65616ed0b8..7630f3c0a0d 100644
--- a/docs/cloud-integration.md
+++ b/docs/cloud-integration.md
@@ -231,9 +231,15 @@ The size of the window needs to be set to handle this.
 is no need for a workflow of write-then-rename to ensure that files aren't picked up
 while they are still being written. Applications can write straight to the monitored directory.
 
-1. Streams should only be checkpointed to a store implementing a fast and
-atomic `rename()` operation.
-Otherwise the checkpointing may be slow and potentially unreliable.
+1. In case of the default checkpoint file manager called `FileContextBasedCheckpointFileManager`
+streams should only be checkpointed to a store implementing a fast and
+atomic `rename()` operation. Otherwise the checkpointing may be slow and potentially unreliable.
+On AWS S3 with Hadoop 3.3.1 or later using the S3A connector the abortable stream based checkpoint
+file manager can be used (by setting the `spark.sql.streaming.checkpointFileManagerClass`
+configuration to `org.apache.spark.internal.io.cloud.AbortableStreamBasedCheckpointFileManager`)
+which eliminates the slow rename. In this case users must be extra careful to avoid the reuse of
+the checkpoint location among multiple queries running parallelly as that could lead to corruption
+of the checkpointing data.
 
 ## Committing work into cloud storage safely and fast.
 
diff --git a/hadoop-cloud/README.md b/hadoop-cloud/README.md
new file mode 100644
index 00000000000..0be167e6ef8
--- /dev/null
+++ b/hadoop-cloud/README.md
@@ -0,0 +1,20 @@
+---
+layout: global
+title: Spark Hadoop3 Integration Tests
+---
+
+# Running the Integration Tests
+
+As mocking of an external systems (like AWS S3) is not always perfect the unit testing should be
+extended with integration testing. This is why the build profile `integration-test` has been
+introduced here. When it is given (`-Pintegration-test`) for testing then only those tests are
+executed where the `org.apache.spark.internal.io.cloud.IntegrationTestSuite` tag is used.
+
+One example is `AwsS3AbortableStreamBasedCheckpointFileManagerSuite`.
+
+Integration tests will have some extra configurations for example selecting the external system to
+run the test against. Those configs are passed as environment variables and the existence of these
+variables must be checked by the test.
+Like for `AwsS3AbortableStreamBasedCheckpointFileManagerSuite` the S3 bucket used for testing
+is passed in the `S3A_PATH` and the credetinals to access AWS S3 are AWS_ACCESS_KEY_ID and
+AWS_SECRET_ACCESS_KEY (in addition you can define an optional AWS_SESSION_TOKEN too).
diff --git a/hadoop-cloud/pom.xml b/hadoop-cloud/pom.xml
index 0a6a81ed7cf..1cded760571 100644
--- a/hadoop-cloud/pom.xml
+++ b/hadoop-cloud/pom.xml
@@ -49,6 +49,13 @@
       <version>${project.version}</version>
       <scope>provided</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-sql_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-core_${scala.binary.version}</artifactId>
@@ -206,37 +213,57 @@
         <activeByDefault>true</activeByDefault>
       </activation>
       <properties>
-        <extra.source.dir>src/hadoop-3/main/scala</extra.source.dir>
-        <extra.testsource.dir>src/hadoop-3/test/scala</extra.testsource.dir>
+        <extra.java.source.dir>src/hadoop-3/main/java</extra.java.source.dir>
+        <extra.java.testsource.dir>src/hadoop-3/test/java</extra.java.testsource.dir>
+        <extra.scala.source.dir>src/hadoop-3/main/scala</extra.scala.source.dir>
+        <extra.scala.testsource.dir>src/hadoop-3/test/scala</extra.scala.testsource.dir>
       </properties>
 
       <build>
         <plugins>
+          <plugin>
+            <groupId>org.scalatest</groupId>
+            <artifactId>scalatest-maven-plugin</artifactId>
+            <executions>
+              <execution>
+                <id>test</id>
+                <phase>test</phase>
+                <goals>
+                  <goal>test</goal>
+                </goals>
+                <configuration>
+                  <tagsToExclude>org.apache.spark.internal.io.cloud.IntegrationTestSuite</tagsToExclude>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
           <plugin>
             <groupId>org.codehaus.mojo</groupId>
             <artifactId>build-helper-maven-plugin</artifactId>
             <executions>
               <execution>
-                <id>add-scala-sources</id>
+                <id>add-extra-sources</id>
                 <phase>generate-sources</phase>
                 <goals>
                   <goal>add-source</goal>
                 </goals>
                 <configuration>
                   <sources>
-                    <source>${extra.source.dir}</source>
+                    <source>${extra.java.source.dir}</source>
+                    <source>${extra.scala.source.dir}</source>
                   </sources>
                 </configuration>
               </execution>
               <execution>
-                <id>add-scala-test-sources</id>
+                <id>add-extra-test-sources</id>
                 <phase>generate-test-sources</phase>
                 <goals>
                   <goal>add-test-source</goal>
                 </goals>
                 <configuration>
                   <sources>
-                    <source>${extra.testsource.dir}</source>
+                    <source>${extra.java.testsource.dir}</source>
+                    <source>${extra.scala.testsource.dir}</source>
                   </sources>
                 </configuration>
               </execution>
@@ -297,6 +324,30 @@
       </dependencies>
     </profile>
 
+    <profile>
+      <id>integration-test</id>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.scalatest</groupId>
+            <artifactId>scalatest-maven-plugin</artifactId>
+            <executions>
+              <execution>
+                <id>test</id>
+                <phase>test</phase>
+                <goals>
+                  <goal>test</goal>
+                </goals>
+                <configuration>
+                  <tagsToExclude>None</tagsToExclude>
+                  <tagsToInclude>org.apache.spark.internal.io.cloud.IntegrationTestSuite</tagsToInclude>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
   </profiles>
 
 </project>
diff --git a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/AbortableStreamBasedCheckpointFileManager.scala b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/AbortableStreamBasedCheckpointFileManager.scala
new file mode 100644
index 00000000000..2afab01ec7b
--- /dev/null
+++ b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/AbortableStreamBasedCheckpointFileManager.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.internal.io.cloud
+
+import java.nio.file.FileAlreadyExistsException
+import java.util.EnumSet
+
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.execution.streaming.AbstractFileContextBasedCheckpointFileManager
+import org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream
+
+class AbortableStreamBasedCheckpointFileManager(path: Path, hadoopConf: Configuration)
+  extends AbstractFileContextBasedCheckpointFileManager(path, hadoopConf) with Logging {
+
+  if (!fc.hasPathCapability(path, CommonPathCapabilities.ABORTABLE_STREAM)) {
+    throw new UnsupportedFileSystemException("AbortableStreamBasedCheckpointFileManager requires" +
+      s" an fs (path: $path) with abortable stream support")
+  }
+
+  logInfo(s"Writing atomically to $path based on abortable stream")
+
+  class AbortableStreamBasedFSDataOutputStream(
+      fsDataOutputStream: FSDataOutputStream,
+      fc: FileContext,
+      path: Path,
+      overwriteIfPossible: Boolean) extends CancellableFSDataOutputStream(fsDataOutputStream) {
+
+    @volatile private var terminated = false
+
+    override def cancel(): Unit = synchronized {
+      if (terminated) return
+      try {
+        fsDataOutputStream.abort()
+        fsDataOutputStream.close()
+      } catch {
+          case NonFatal(e) =>
+            logWarning(s"Error cancelling write to $path (stream: $fsDataOutputStream)", e)
+      } finally {
+        terminated = true
+      }
+    }
+
+    override def close(): Unit = synchronized {
+      if (terminated) return
+      try {
+        if (!overwriteIfPossible && fc.util().exists(path)) {
+          fsDataOutputStream.abort()
+          throw new FileAlreadyExistsException(
+            s"Failed to close atomic stream $path (stream: " +
+            s"$fsDataOutputStream) as destination already exists")
+        }
+        fsDataOutputStream.close()
+      } catch {
+          case NonFatal(e) =>
+            logWarning(s"Error closing $path (stream: $fsDataOutputStream)", e)
+      } finally {
+        terminated = true
+      }
+    }
+
+    override def toString(): String = {
+      fsDataOutputStream.toString
+    }
+  }
+
+  override def createAtomic(
+      path: Path, overwriteIfPossible: Boolean): CancellableFSDataOutputStream = {
+    import CreateFlag._
+    val createFlag = if (overwriteIfPossible) {
+      EnumSet.of(CREATE, OVERWRITE)
+    } else {
+      EnumSet.of(CREATE)
+    }
+    new AbortableStreamBasedFSDataOutputStream(
+      fc.create(path, createFlag), fc, path, overwriteIfPossible)
+  }
+}
diff --git a/hadoop-cloud/src/hadoop-3/test/java/org/apache/spark/internal/io/cloud/abortable/AbortableFileSystem.java b/hadoop-cloud/src/hadoop-3/test/java/org/apache/spark/internal/io/cloud/abortable/AbortableFileSystem.java
new file mode 100644
index 00000000000..5c7f68f4378
--- /dev/null
+++ b/hadoop-cloud/src/hadoop-3/test/java/org/apache/spark/internal/io/cloud/abortable/AbortableFileSystem.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.internal.io.cloud.abortable;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+
+public class AbortableFileSystem extends RawLocalFileSystem {
+
+  public static String ABORTABLE_FS_SCHEME = "abortable";
+
+  @Override
+  public URI getUri() {
+    return URI.create(ABORTABLE_FS_SCHEME + ":///");
+  }
+
+  public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite,
+    int bufferSize, short replication, long blockSize, Progressable progress) throws IOException {
+    FSDataOutputStream out = this.create(f, overwrite, bufferSize, replication, blockSize,
+      progress, permission);
+    return out;
+  }
+
+  private FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication,
+    long blockSize, Progressable progress, FsPermission permission) throws IOException {
+    if (this.exists(f) && !overwrite) {
+      throw new FileAlreadyExistsException("File already exists: " + f);
+    } else {
+      Path parent = f.getParent();
+      if (parent != null && !this.mkdirs(parent)) {
+        throw new IOException("Mkdirs failed to create " + parent.toString());
+      } else {
+        return new FSDataOutputStream(this.createOutputStreamWithMode(f, false, permission), null);
+      }
+    }
+  }
+
+  @Override
+  protected OutputStream createOutputStreamWithMode(Path f, boolean append,
+      FsPermission permission) throws IOException {
+    return new AbortableOutputStream(f, append, permission);
+  }
+
+  class AbortableOutputStream extends ByteArrayOutputStream
+      implements Abortable, StreamCapabilities {
+
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+
+    private Path f;
+
+    private boolean append;
+
+    private FsPermission permission;
+
+    AbortableOutputStream(Path f, boolean append, FsPermission permission) {
+      this.f = f;
+      this.append = append;
+      this.permission = permission;
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (closed.getAndSet(true)) {
+        return;
+      }
+
+      OutputStream output =
+        AbortableFileSystem.super.createOutputStreamWithMode(f, append, permission);
+      writeTo(output);
+      output.close();
+    }
+
+    @Override
+    public AbortableResult abort() {
+      final boolean isAlreadyClosed = closed.getAndSet(true);
+      return new AbortableResult() {
+        public boolean alreadyClosed() {
+          return isAlreadyClosed;
+        }
+
+        public IOException anyCleanupException() {
+          return null;
+        }
+      };
+    }
+
+    @Override
+    public boolean hasCapability(String capability) {
+      return capability == CommonPathCapabilities.ABORTABLE_STREAM;
+    }
+  }
+}
diff --git a/hadoop-cloud/src/hadoop-3/test/java/org/apache/spark/internal/io/cloud/abortable/AbstractAbortableFileSystem.java b/hadoop-cloud/src/hadoop-3/test/java/org/apache/spark/internal/io/cloud/abortable/AbstractAbortableFileSystem.java
new file mode 100644
index 00000000000..57ede38a23b
--- /dev/null
+++ b/hadoop-cloud/src/hadoop-3/test/java/org/apache/spark/internal/io/cloud/abortable/AbstractAbortableFileSystem.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.internal.io.cloud.abortable;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonPathCapabilities;
+import org.apache.hadoop.fs.DelegateToFileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class AbstractAbortableFileSystem extends DelegateToFileSystem {
+
+  public AbstractAbortableFileSystem(
+      URI theUri,
+      Configuration conf) throws IOException, URISyntaxException {
+    super(theUri, new AbortableFileSystem(), conf, AbortableFileSystem.ABORTABLE_FS_SCHEME, false);
+  }
+
+  @Override
+  public boolean hasPathCapability(Path path, String capability) throws IOException {
+    if (capability == CommonPathCapabilities.ABORTABLE_STREAM) {
+      return true;
+    } else {
+      return super.hasPathCapability(path, capability);
+    }
+  }
+}
diff --git a/hadoop-cloud/src/hadoop-3/test/resources/log4j2.properties b/hadoop-cloud/src/hadoop-3/test/resources/log4j2.properties
new file mode 100644
index 00000000000..01a9cafafa8
--- /dev/null
+++ b/hadoop-cloud/src/hadoop-3/test/resources/log4j2.properties
@@ -0,0 +1,40 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file target/unit-tests.log
+rootLogger.level = info
+rootLogger.appenderRef.file.ref = ${sys:test.appender:-File}
+
+appender.file.type = File
+appender.file.name = File
+appender.file.fileName = target/unit-tests.log
+appender.file.append = true
+appender.file.layout.type = PatternLayout
+appender.file.layout.pattern = %d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n%ex
+
+# Tests that launch java subprocesses can set the "test.appender" system property to
+# "console" to avoid having the child process's logs overwrite the unit test's
+# log file.
+appender.console.type = Console
+appender.console.name = STDERR
+appender.console.target = SYSTEM_ERR
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %t: %m%n%ex
+
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+logger.jetty.name = org.spark_project.jetty
+logger.jetty.level = warn
diff --git a/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/AbortableStreamBasedCheckpointFileManagerSuite.scala b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/AbortableStreamBasedCheckpointFileManagerSuite.scala
new file mode 100644
index 00000000000..0dbc650fc8c
--- /dev/null
+++ b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/AbortableStreamBasedCheckpointFileManagerSuite.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.internal.io.cloud
+
+import java.io.File
+
+import scala.util.Properties
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+import org.apache.hadoop.fs.permission.FsPermission
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.io.cloud.abortable.AbortableFileSystem
+import org.apache.spark.sql.execution.streaming.CheckpointFileManager
+import org.apache.spark.sql.execution.streaming.CheckpointFileManagerTests
+
+class AbortableStreamBasedCheckpointFileManagerSuite
+  extends CheckpointFileManagerTests with Logging {
+
+  override def withTempHadoopPath(p: Path => Unit): Unit = {
+    withTempDir { f: File =>
+      val basePath = new Path(AbortableFileSystem.ABORTABLE_FS_SCHEME, null, f.getAbsolutePath)
+      p(basePath)
+    }
+  }
+
+  override def checkLeakingCrcFiles(path: Path): Unit = { }
+
+  override def createManager(path: Path): CheckpointFileManager = {
+    val conf = new Configuration()
+    conf.set(s"fs.AbstractFileSystem.${AbortableFileSystem.ABORTABLE_FS_SCHEME}.impl",
+      "org.apache.spark.internal.io.cloud.abortable.AbstractAbortableFileSystem")
+    new AbortableStreamBasedCheckpointFileManager(path, conf)
+  }
+}
+
+@IntegrationTestSuite
+class AwsS3AbortableStreamBasedCheckpointFileManagerSuite
+    extends AbortableStreamBasedCheckpointFileManagerSuite with BeforeAndAfter {
+
+  val s3aPath = Properties.envOrNone("S3A_PATH")
+
+  val hadoopConf = new Configuration()
+
+  var cleanup: () => Unit = () => {}
+
+  override protected def beforeAll(): Unit = {
+    assert(s3aPath.isDefined, "S3A_PATH must be defined!")
+    val path = new Path(s3aPath.get)
+    val fc = FileContext.getFileContext(path.toUri, hadoopConf)
+    assert(!fc.util.exists(path), s"S3A_PATH ($path) should not exists!")
+    fc.mkdir(path, FsPermission.getDirDefault, true)
+    cleanup = () => fc.delete(path, true)
+  }
+
+  override protected def afterAll(): Unit = {
+    cleanup()
+  }
+
+  override def withTempHadoopPath(p: Path => Unit): Unit = {
+    p(new Path(s3aPath.get))
+  }
+
+  override def createManager(path: Path): CheckpointFileManager = {
+    new AbortableStreamBasedCheckpointFileManager(path, hadoopConf)
+  }
+}
diff --git a/hadoop-cloud/src/test/java/org/apache/spark/internal/io/cloud/IntegrationTestSuite.java b/hadoop-cloud/src/test/java/org/apache/spark/internal/io/cloud/IntegrationTestSuite.java
new file mode 100644
index 00000000000..d1c5f07cceb
--- /dev/null
+++ b/hadoop-cloud/src/test/java/org/apache/spark/internal/io/cloud/IntegrationTestSuite.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.internal.io.cloud;
+
+import org.scalatest.TagAnnotation;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@TagAnnotation
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.METHOD, ElementType.TYPE})
+public @interface IntegrationTestSuite {}
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 21ab6f9f636..145440d43ed 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -1136,7 +1136,8 @@ object CopyDependencies {
 
 object TestSettings {
   import BuildCommons._
-  private val defaultExcludedTags = Seq("org.apache.spark.tags.ChromeUITest")
+  private val defaultExcludedTags = Seq("org.apache.spark.tags.ChromeUITest",
+    "org.apache.spark.internal.io.cloud.IntegrationTestSuite")
 
   lazy val settings = Seq (
     // Fork new JVMs for tests and set Java options for those
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
index 48bf88f3116..cf5d54fd20a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
@@ -300,13 +300,10 @@ class FileSystemBasedCheckpointFileManager(path: Path, hadoopConf: Configuration
 }
 
 
-/** An implementation of [[CheckpointFileManager]] using Hadoop's [[FileContext]] API. */
-class FileContextBasedCheckpointFileManager(path: Path, hadoopConf: Configuration)
-  extends CheckpointFileManager with RenameHelperMethods with Logging {
-
-  import CheckpointFileManager._
+abstract class AbstractFileContextBasedCheckpointFileManager(path: Path, hadoopConf: Configuration)
+  extends CheckpointFileManager with Logging {
 
-  private val fc = if (path.toUri.getScheme == null) {
+  protected val fc = if (path.toUri.getScheme == null) {
     FileContext.getFileContext(hadoopConf)
   } else {
     FileContext.getFileContext(path.toUri, hadoopConf)
@@ -320,19 +317,6 @@ class FileContextBasedCheckpointFileManager(path: Path, hadoopConf: Configuratio
     fc.mkdir(path, FsPermission.getDirDefault, true)
   }
 
-  override def createTempFile(path: Path): FSDataOutputStream = {
-    import CreateFlag._
-    import Options._
-    fc.create(
-      path, EnumSet.of(CREATE, OVERWRITE), CreateOpts.checksumParam(ChecksumOpt.createDisabled()))
-  }
-
-  override def createAtomic(
-      path: Path,
-      overwriteIfPossible: Boolean): CancellableFSDataOutputStream = {
-    new RenameBasedFSDataOutputStream(this, path, overwriteIfPossible)
-  }
-
   override def open(path: Path): FSDataInputStream = {
     fc.open(path)
   }
@@ -341,14 +325,6 @@ class FileContextBasedCheckpointFileManager(path: Path, hadoopConf: Configuratio
     fc.util.exists(path)
   }
 
-  override def renameTempFile(srcPath: Path, dstPath: Path, overwriteIfPossible: Boolean): Unit = {
-    import Options.Rename._
-    fc.rename(srcPath, dstPath, if (overwriteIfPossible) OVERWRITE else NONE)
-    // TODO: this is a workaround of HADOOP-16255 - remove this when HADOOP-16255 is resolved
-    mayRemoveCrcFile(srcPath)
-  }
-
-
   override def delete(path: Path): Unit = {
     try {
       fc.delete(path, true)
@@ -368,6 +344,33 @@ class FileContextBasedCheckpointFileManager(path: Path, hadoopConf: Configuratio
     fc.mkdir(qualifiedPath, FsPermission.getDirDefault, true)
     qualifiedPath
   }
+}
+
+class FileContextBasedCheckpointFileManager(path: Path, hadoopConf: Configuration)
+  extends AbstractFileContextBasedCheckpointFileManager(path, hadoopConf)
+  with RenameHelperMethods {
+
+  import CheckpointFileManager._
+
+  override def createTempFile(path: Path): FSDataOutputStream = {
+    import CreateFlag._
+    import Options._
+    fc.create(
+      path, EnumSet.of(CREATE, OVERWRITE), CreateOpts.checksumParam(ChecksumOpt.createDisabled()))
+  }
+
+  override def createAtomic(
+      path: Path,
+      overwriteIfPossible: Boolean): CancellableFSDataOutputStream = {
+    new RenameBasedFSDataOutputStream(this, path, overwriteIfPossible)
+  }
+
+  override def renameTempFile(srcPath: Path, dstPath: Path, overwriteIfPossible: Boolean): Unit = {
+    import Options.Rename._
+    fc.rename(srcPath, dstPath, if (overwriteIfPossible) OVERWRITE else NONE)
+    // TODO: this is a workaround of HADOOP-16255 - remove this when HADOOP-16255 is resolved
+    mayRemoveCrcFile(srcPath)
+  }
 
   private def mayRemoveCrcFile(path: Path): Unit = {
     try {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala
index 79bcd490a24..cbcb4a4062d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala
@@ -27,16 +27,35 @@ import org.apache.hadoop.fs._
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.plans.SQLHelper
 import org.apache.spark.sql.catalyst.util.quietly
+import org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
 
-abstract class CheckpointFileManagerTests extends SparkFunSuite with SQLHelper {
+abstract class CheckpointFileManagerTests extends SparkFunSuite {
 
-  def createManager(path: Path): CheckpointFileManager
+  protected def withTempHadoopPath(p: Path => Unit): Unit
+
+  protected def checkLeakingCrcFiles(path: Path): Unit
+
+  protected def createManager(path: Path): CheckpointFileManager
+
+  private implicit class RichCancellableStream(stream: CancellableFSDataOutputStream) {
+    def writeContent(i: Int): CancellableFSDataOutputStream = {
+      stream.writeInt(i)
+      stream
+    }
+  }
+
+  private implicit class RichFSDataInputStream(stream: FSDataInputStream) {
+    def readContent(): Int = {
+      val res = stream.readInt()
+      stream.close()
+      res
+    }
+  }
 
   test("mkdirs, list, createAtomic, open, delete, exists") {
-    withTempPath { p =>
-      val basePath = new Path(p.getAbsolutePath)
+    withTempHadoopPath { case basePath =>
       val fm = createManager(basePath)
       // Mkdirs
       val dir = new Path(s"$basePath/dir/subdir/subsubdir")
@@ -58,42 +77,32 @@ abstract class CheckpointFileManagerTests extends SparkFunSuite with SQLHelper {
       // Create atomic without overwrite
       var path = new Path(s"$dir/file")
       assert(!fm.exists(path))
-      fm.createAtomic(path, overwriteIfPossible = false).cancel()
+      fm.createAtomic(path, overwriteIfPossible = false).writeContent(1).cancel()
       assert(!fm.exists(path))
-      fm.createAtomic(path, overwriteIfPossible = false).close()
+      fm.createAtomic(path, overwriteIfPossible = false).writeContent(2).close()
       assert(fm.exists(path))
+      assert(fm.open(path).readContent() == 2)
       quietly {
         intercept[IOException] {
           // should throw exception since file exists and overwrite is false
-          fm.createAtomic(path, overwriteIfPossible = false).close()
+          fm.createAtomic(path, overwriteIfPossible = false).writeContent(3).close()
         }
       }
+      assert(fm.open(path).readContent() == 2)
 
       // Create atomic with overwrite if possible
       path = new Path(s"$dir/file2")
       assert(!fm.exists(path))
-      fm.createAtomic(path, overwriteIfPossible = true).cancel()
+      fm.createAtomic(path, overwriteIfPossible = true).writeContent(4).cancel()
       assert(!fm.exists(path))
-      fm.createAtomic(path, overwriteIfPossible = true).close()
+      fm.createAtomic(path, overwriteIfPossible = true).writeContent(5).close()
       assert(fm.exists(path))
-      fm.createAtomic(path, overwriteIfPossible = true).close()  // should not throw exception
-
-      // crc file should not be leaked when origin file doesn't exist.
-      // The implementation of Hadoop filesystem may filter out checksum file, so
-      // listing files from local filesystem.
-      val fileNames = new File(path.getParent.toString).listFiles().toSeq
-        .filter(p => p.isFile).map(p => p.getName)
-      val crcFiles = fileNames.filter(n => n.startsWith(".") && n.endsWith(".crc"))
-      val originFileNamesForExistingCrcFiles = crcFiles.map { name =>
-        // remove first "." and last ".crc"
-        name.substring(1, name.length - 4)
-      }
-
-      // Check all origin files exist for all crc files.
-      assert(originFileNamesForExistingCrcFiles.toSet.subsetOf(fileNames.toSet),
-        s"Some of origin files for crc files don't exist - crc files: $crcFiles / " +
-          s"expected origin files: $originFileNamesForExistingCrcFiles / actual files: $fileNames")
+      assert(fm.open(path).readContent() == 5)
+      // should not throw exception
+      fm.createAtomic(path, overwriteIfPossible = true).writeContent(6).close()
+      assert(fm.open(path).readContent() == 6)
 
+      checkLeakingCrcFiles(dir)
       // Open and delete
       fm.open(path).close()
       fm.delete(path)
@@ -138,13 +147,42 @@ class CheckpointFileManagerSuite extends SharedSparkSession {
   }
 }
 
-class FileContextBasedCheckpointFileManagerSuite extends CheckpointFileManagerTests {
+abstract class CheckpointFileManagerTestsOnLocalFs
+  extends CheckpointFileManagerTests with SQLHelper {
+
+  protected def withTempHadoopPath(p: Path => Unit): Unit = {
+    withTempDir { f: File =>
+      val basePath = new Path(f.getAbsolutePath)
+      p(basePath)
+    }
+  }
+
+  protected def checkLeakingCrcFiles(path: Path): Unit = {
+    // crc file should not be leaked when origin file doesn't exist.
+    // The implementation of Hadoop filesystem may filter out checksum file, so
+    // listing files from local filesystem.
+    val fileNames = new File(path.toString).listFiles().toSeq
+      .filter(p => p.isFile).map(p => p.getName)
+    val crcFiles = fileNames.filter(n => n.startsWith(".") && n.endsWith(".crc"))
+    val originFileNamesForExistingCrcFiles = crcFiles.map { name =>
+      // remove first "." and last ".crc"
+      name.substring(1, name.length - 4)
+    }
+
+    // Check all origin files exist for all crc files.
+    assert(originFileNamesForExistingCrcFiles.toSet.subsetOf(fileNames.toSet),
+      s"Some of origin files for crc files don't exist - crc files: $crcFiles / " +
+        s"expected origin files: $originFileNamesForExistingCrcFiles / actual files: $fileNames")
+  }
+}
+
+class FileContextBasedCheckpointFileManagerSuite extends CheckpointFileManagerTestsOnLocalFs {
   override def createManager(path: Path): CheckpointFileManager = {
     new FileContextBasedCheckpointFileManager(path, new Configuration())
   }
 }
 
-class FileSystemBasedCheckpointFileManagerSuite extends CheckpointFileManagerTests {
+class FileSystemBasedCheckpointFileManagerSuite extends CheckpointFileManagerTestsOnLocalFs {
   override def createManager(path: Path): CheckpointFileManager = {
     new FileSystemBasedCheckpointFileManager(path, new Configuration())
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org