You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2022/08/29 01:42:54 UTC
[spark] branch master updated: [SPARK-40039][SS] Introducing a streaming checkpoint file manager based on Hadoop's Abortable interface
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new b89d077d85b [SPARK-40039][SS] Introducing a streaming checkpoint file manager based on Hadoop's Abortable interface
b89d077d85b is described below
commit b89d077d85b4d0628beafd2bc8fa096a8c27c8a6
Author: attilapiros <pi...@gmail.com>
AuthorDate: Mon Aug 29 10:42:38 2022 +0900
[SPARK-40039][SS] Introducing a streaming checkpoint file manager based on Hadoop's Abortable interface
### What changes were proposed in this pull request?
Currently on S3 the checkpoint file manager (called `FileContextBasedCheckpointFileManager`) is available which is based on the rename operation. So when a file is opened for an atomic stream a temporary file will be used behind the scenes and when the stream is committed the file is renamed to its final location.
But on S3 the rename operation will be a file copy so it has some serious performance implication.
On Hadoop 3 there is new interface introduce called [Abortable](https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/fs/Abortable.html) and S3AFileSystem has this capability. When the file is small (<= the block size) this will be a single PUT as commit and no operation if it is aborted. When the file is bigger then S3's multipart upload is used: so when the file is committed [a POST is sent](https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html) and [...]
This avoids the file copying altogether.
### Why are the changes needed?
For improving streaming performance.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
#### Unit test
I have refactored the existing `CheckpointFileManagerTests` and run against a test filesystem which supports the `Abortable` interface (see `AbortableFileSystem` which is based on `RawLocalFileSystem`).
This way we have a unit test.
#### Integration test
Moreover the same test can be run against AWS S3 by using an integration test (see `AwsS3AbortableStreamBasedCheckpointFileManagerSuite`):
```
-> S3_PATH=<..> AWS_ACCESS_KEY_ID=<..> AWS_SECRET_ACCESS_KEY=<..> AWS_SESSION_TOKEN=<..> ./build/mvn install -pl hadoop-cloud -Phadoop-cloud,hadoop-3,integration-test
Discovery starting.
Discovery completed in 346 milliseconds.
Run starting. Expected test count is: 1
AwsS3AbortableStreamBasedCheckpointFileManagerSuite:
- mkdirs, list, createAtomic, open, delete, exists
CommitterBindingSuite:
AbortableStreamBasedCheckpointFileManagerSuite:
Run completed in 14 seconds, 407 milliseconds.
Total number of tests run: 1
Suites: completed 4, aborted 0
Tests: succeeded 1, failed 0, canceled 0, ignored 0, pending 0
All tests passed.
```
#### Performance test
I have run a [small performance app](https://github.com/attilapiros/spark-ss-perf/blob/ab4c6004caffc38a218fa81fd5482a6cc07ca14f/src/main/scala/perf.scala) which uses a rate stream and foreach sink with an empty body. The results:
```
➜ spark git:(SPARK-40039) ✗ ./bin/spark-submit ../spark-ss-perf/target/scala-2.12/performance-spark-ss_2.12-0.1.jar s3a://mybucket org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager 2>&1 | grep "walCommit took" | awk '{print $7}' | datamash max 1 min 1 mean 1 median 1 perc:90 1 perc:95 1 perc:99 1
4143 3286 3528.6 3500 3742.8 3840 4076.04
➜ spark git:(SPARK-40039) ✗ ./bin/spark-submit ../spark-ss-perf/target/scala-2.12/performance-spark-ss_2.12-0.1.jar s3a://mybucket org.apache.spark.internal.io.cloud.AbortableStreamBasedCheckpointFileManager 2>&1 | grep "walCommit took" | awk '{print $7}' | datamash max 1 min 1 mean 1 median 1 perc:90 1 perc:95 1 perc:99 1
3765 1447 2187.0217391304 1844.5 2867 2976.5 3437.85
```
Closes #37687 from attilapiros/SPARK-40039-II.
Authored-by: attilapiros <pi...@gmail.com>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
docs/cloud-integration.md | 12 ++-
hadoop-cloud/README.md | 20 ++++
hadoop-cloud/pom.xml | 63 ++++++++++--
...AbortableStreamBasedCheckpointFileManager.scala | 96 +++++++++++++++++
.../io/cloud/abortable/AbortableFileSystem.java | 113 +++++++++++++++++++++
.../abortable/AbstractAbortableFileSystem.java | 44 ++++++++
.../src/hadoop-3/test/resources/log4j2.properties | 40 ++++++++
...ableStreamBasedCheckpointFileManagerSuite.scala | 83 +++++++++++++++
.../internal/io/cloud/IntegrationTestSuite.java | 29 ++++++
project/SparkBuild.scala | 3 +-
.../streaming/CheckpointFileManager.scala | 57 ++++++-----
.../streaming/CheckpointFileManagerSuite.scala | 94 ++++++++++++-----
12 files changed, 589 insertions(+), 65 deletions(-)
diff --git a/docs/cloud-integration.md b/docs/cloud-integration.md
index d65616ed0b8..7630f3c0a0d 100644
--- a/docs/cloud-integration.md
+++ b/docs/cloud-integration.md
@@ -231,9 +231,15 @@ The size of the window needs to be set to handle this.
is no need for a workflow of write-then-rename to ensure that files aren't picked up
while they are still being written. Applications can write straight to the monitored directory.
-1. Streams should only be checkpointed to a store implementing a fast and
-atomic `rename()` operation.
-Otherwise the checkpointing may be slow and potentially unreliable.
+1. In case of the default checkpoint file manager called `FileContextBasedCheckpointFileManager`
+streams should only be checkpointed to a store implementing a fast and
+atomic `rename()` operation. Otherwise the checkpointing may be slow and potentially unreliable.
+On AWS S3 with Hadoop 3.3.1 or later using the S3A connector the abortable stream based checkpoint
+file manager can be used (by setting the `spark.sql.streaming.checkpointFileManagerClass`
+configuration to `org.apache.spark.internal.io.cloud.AbortableStreamBasedCheckpointFileManager`)
+which eliminates the slow rename. In this case users must be extra careful to avoid the reuse of
+the checkpoint location among multiple queries running parallelly as that could lead to corruption
+of the checkpointing data.
## Committing work into cloud storage safely and fast.
diff --git a/hadoop-cloud/README.md b/hadoop-cloud/README.md
new file mode 100644
index 00000000000..0be167e6ef8
--- /dev/null
+++ b/hadoop-cloud/README.md
@@ -0,0 +1,20 @@
+---
+layout: global
+title: Spark Hadoop3 Integration Tests
+---
+
+# Running the Integration Tests
+
+As mocking of an external systems (like AWS S3) is not always perfect the unit testing should be
+extended with integration testing. This is why the build profile `integration-test` has been
+introduced here. When it is given (`-Pintegration-test`) for testing then only those tests are
+executed where the `org.apache.spark.internal.io.cloud.IntegrationTestSuite` tag is used.
+
+One example is `AwsS3AbortableStreamBasedCheckpointFileManagerSuite`.
+
+Integration tests will have some extra configurations for example selecting the external system to
+run the test against. Those configs are passed as environment variables and the existence of these
+variables must be checked by the test.
+Like for `AwsS3AbortableStreamBasedCheckpointFileManagerSuite` the S3 bucket used for testing
+is passed in the `S3A_PATH` and the credetinals to access AWS S3 are AWS_ACCESS_KEY_ID and
+AWS_SECRET_ACCESS_KEY (in addition you can define an optional AWS_SESSION_TOKEN too).
diff --git a/hadoop-cloud/pom.xml b/hadoop-cloud/pom.xml
index 0a6a81ed7cf..1cded760571 100644
--- a/hadoop-cloud/pom.xml
+++ b/hadoop-cloud/pom.xml
@@ -49,6 +49,13 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
@@ -206,37 +213,57 @@
<activeByDefault>true</activeByDefault>
</activation>
<properties>
- <extra.source.dir>src/hadoop-3/main/scala</extra.source.dir>
- <extra.testsource.dir>src/hadoop-3/test/scala</extra.testsource.dir>
+ <extra.java.source.dir>src/hadoop-3/main/java</extra.java.source.dir>
+ <extra.java.testsource.dir>src/hadoop-3/test/java</extra.java.testsource.dir>
+ <extra.scala.source.dir>src/hadoop-3/main/scala</extra.scala.source.dir>
+ <extra.scala.testsource.dir>src/hadoop-3/test/scala</extra.scala.testsource.dir>
</properties>
<build>
<plugins>
+ <plugin>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>test</id>
+ <phase>test</phase>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ <configuration>
+ <tagsToExclude>org.apache.spark.internal.io.cloud.IntegrationTestSuite</tagsToExclude>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
<execution>
- <id>add-scala-sources</id>
+ <id>add-extra-sources</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
- <source>${extra.source.dir}</source>
+ <source>${extra.java.source.dir}</source>
+ <source>${extra.scala.source.dir}</source>
</sources>
</configuration>
</execution>
<execution>
- <id>add-scala-test-sources</id>
+ <id>add-extra-test-sources</id>
<phase>generate-test-sources</phase>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
- <source>${extra.testsource.dir}</source>
+ <source>${extra.java.testsource.dir}</source>
+ <source>${extra.scala.testsource.dir}</source>
</sources>
</configuration>
</execution>
@@ -297,6 +324,30 @@
</dependencies>
</profile>
+ <profile>
+ <id>integration-test</id>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>test</id>
+ <phase>test</phase>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ <configuration>
+ <tagsToExclude>None</tagsToExclude>
+ <tagsToInclude>org.apache.spark.internal.io.cloud.IntegrationTestSuite</tagsToInclude>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
</profiles>
</project>
diff --git a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/AbortableStreamBasedCheckpointFileManager.scala b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/AbortableStreamBasedCheckpointFileManager.scala
new file mode 100644
index 00000000000..2afab01ec7b
--- /dev/null
+++ b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/AbortableStreamBasedCheckpointFileManager.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.internal.io.cloud
+
+import java.nio.file.FileAlreadyExistsException
+import java.util.EnumSet
+
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.execution.streaming.AbstractFileContextBasedCheckpointFileManager
+import org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream
+
+class AbortableStreamBasedCheckpointFileManager(path: Path, hadoopConf: Configuration)
+ extends AbstractFileContextBasedCheckpointFileManager(path, hadoopConf) with Logging {
+
+ if (!fc.hasPathCapability(path, CommonPathCapabilities.ABORTABLE_STREAM)) {
+ throw new UnsupportedFileSystemException("AbortableStreamBasedCheckpointFileManager requires" +
+ s" an fs (path: $path) with abortable stream support")
+ }
+
+ logInfo(s"Writing atomically to $path based on abortable stream")
+
+ class AbortableStreamBasedFSDataOutputStream(
+ fsDataOutputStream: FSDataOutputStream,
+ fc: FileContext,
+ path: Path,
+ overwriteIfPossible: Boolean) extends CancellableFSDataOutputStream(fsDataOutputStream) {
+
+ @volatile private var terminated = false
+
+ override def cancel(): Unit = synchronized {
+ if (terminated) return
+ try {
+ fsDataOutputStream.abort()
+ fsDataOutputStream.close()
+ } catch {
+ case NonFatal(e) =>
+ logWarning(s"Error cancelling write to $path (stream: $fsDataOutputStream)", e)
+ } finally {
+ terminated = true
+ }
+ }
+
+ override def close(): Unit = synchronized {
+ if (terminated) return
+ try {
+ if (!overwriteIfPossible && fc.util().exists(path)) {
+ fsDataOutputStream.abort()
+ throw new FileAlreadyExistsException(
+ s"Failed to close atomic stream $path (stream: " +
+ s"$fsDataOutputStream) as destination already exists")
+ }
+ fsDataOutputStream.close()
+ } catch {
+ case NonFatal(e) =>
+ logWarning(s"Error closing $path (stream: $fsDataOutputStream)", e)
+ } finally {
+ terminated = true
+ }
+ }
+
+ override def toString(): String = {
+ fsDataOutputStream.toString
+ }
+ }
+
+ override def createAtomic(
+ path: Path, overwriteIfPossible: Boolean): CancellableFSDataOutputStream = {
+ import CreateFlag._
+ val createFlag = if (overwriteIfPossible) {
+ EnumSet.of(CREATE, OVERWRITE)
+ } else {
+ EnumSet.of(CREATE)
+ }
+ new AbortableStreamBasedFSDataOutputStream(
+ fc.create(path, createFlag), fc, path, overwriteIfPossible)
+ }
+}
diff --git a/hadoop-cloud/src/hadoop-3/test/java/org/apache/spark/internal/io/cloud/abortable/AbortableFileSystem.java b/hadoop-cloud/src/hadoop-3/test/java/org/apache/spark/internal/io/cloud/abortable/AbortableFileSystem.java
new file mode 100644
index 00000000000..5c7f68f4378
--- /dev/null
+++ b/hadoop-cloud/src/hadoop-3/test/java/org/apache/spark/internal/io/cloud/abortable/AbortableFileSystem.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.internal.io.cloud.abortable;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+
+public class AbortableFileSystem extends RawLocalFileSystem {
+
+ public static String ABORTABLE_FS_SCHEME = "abortable";
+
+ @Override
+ public URI getUri() {
+ return URI.create(ABORTABLE_FS_SCHEME + ":///");
+ }
+
+ public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite,
+ int bufferSize, short replication, long blockSize, Progressable progress) throws IOException {
+ FSDataOutputStream out = this.create(f, overwrite, bufferSize, replication, blockSize,
+ progress, permission);
+ return out;
+ }
+
+ private FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication,
+ long blockSize, Progressable progress, FsPermission permission) throws IOException {
+ if (this.exists(f) && !overwrite) {
+ throw new FileAlreadyExistsException("File already exists: " + f);
+ } else {
+ Path parent = f.getParent();
+ if (parent != null && !this.mkdirs(parent)) {
+ throw new IOException("Mkdirs failed to create " + parent.toString());
+ } else {
+ return new FSDataOutputStream(this.createOutputStreamWithMode(f, false, permission), null);
+ }
+ }
+ }
+
+ @Override
+ protected OutputStream createOutputStreamWithMode(Path f, boolean append,
+ FsPermission permission) throws IOException {
+ return new AbortableOutputStream(f, append, permission);
+ }
+
+ class AbortableOutputStream extends ByteArrayOutputStream
+ implements Abortable, StreamCapabilities {
+
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+
+ private Path f;
+
+ private boolean append;
+
+ private FsPermission permission;
+
+ AbortableOutputStream(Path f, boolean append, FsPermission permission) {
+ this.f = f;
+ this.append = append;
+ this.permission = permission;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (closed.getAndSet(true)) {
+ return;
+ }
+
+ OutputStream output =
+ AbortableFileSystem.super.createOutputStreamWithMode(f, append, permission);
+ writeTo(output);
+ output.close();
+ }
+
+ @Override
+ public AbortableResult abort() {
+ final boolean isAlreadyClosed = closed.getAndSet(true);
+ return new AbortableResult() {
+ public boolean alreadyClosed() {
+ return isAlreadyClosed;
+ }
+
+ public IOException anyCleanupException() {
+ return null;
+ }
+ };
+ }
+
+ @Override
+ public boolean hasCapability(String capability) {
+ return capability == CommonPathCapabilities.ABORTABLE_STREAM;
+ }
+ }
+}
diff --git a/hadoop-cloud/src/hadoop-3/test/java/org/apache/spark/internal/io/cloud/abortable/AbstractAbortableFileSystem.java b/hadoop-cloud/src/hadoop-3/test/java/org/apache/spark/internal/io/cloud/abortable/AbstractAbortableFileSystem.java
new file mode 100644
index 00000000000..57ede38a23b
--- /dev/null
+++ b/hadoop-cloud/src/hadoop-3/test/java/org/apache/spark/internal/io/cloud/abortable/AbstractAbortableFileSystem.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.internal.io.cloud.abortable;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonPathCapabilities;
+import org.apache.hadoop.fs.DelegateToFileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class AbstractAbortableFileSystem extends DelegateToFileSystem {
+
+ public AbstractAbortableFileSystem(
+ URI theUri,
+ Configuration conf) throws IOException, URISyntaxException {
+ super(theUri, new AbortableFileSystem(), conf, AbortableFileSystem.ABORTABLE_FS_SCHEME, false);
+ }
+
+ @Override
+ public boolean hasPathCapability(Path path, String capability) throws IOException {
+ if (capability == CommonPathCapabilities.ABORTABLE_STREAM) {
+ return true;
+ } else {
+ return super.hasPathCapability(path, capability);
+ }
+ }
+}
diff --git a/hadoop-cloud/src/hadoop-3/test/resources/log4j2.properties b/hadoop-cloud/src/hadoop-3/test/resources/log4j2.properties
new file mode 100644
index 00000000000..01a9cafafa8
--- /dev/null
+++ b/hadoop-cloud/src/hadoop-3/test/resources/log4j2.properties
@@ -0,0 +1,40 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file target/unit-tests.log
+rootLogger.level = info
+rootLogger.appenderRef.file.ref = ${sys:test.appender:-File}
+
+appender.file.type = File
+appender.file.name = File
+appender.file.fileName = target/unit-tests.log
+appender.file.append = true
+appender.file.layout.type = PatternLayout
+appender.file.layout.pattern = %d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n%ex
+
+# Tests that launch java subprocesses can set the "test.appender" system property to
+# "console" to avoid having the child process's logs overwrite the unit test's
+# log file.
+appender.console.type = Console
+appender.console.name = STDERR
+appender.console.target = SYSTEM_ERR
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %t: %m%n%ex
+
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+logger.jetty.name = org.spark_project.jetty
+logger.jetty.level = warn
diff --git a/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/AbortableStreamBasedCheckpointFileManagerSuite.scala b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/AbortableStreamBasedCheckpointFileManagerSuite.scala
new file mode 100644
index 00000000000..0dbc650fc8c
--- /dev/null
+++ b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/AbortableStreamBasedCheckpointFileManagerSuite.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.internal.io.cloud
+
+import java.io.File
+
+import scala.util.Properties
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+import org.apache.hadoop.fs.permission.FsPermission
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.io.cloud.abortable.AbortableFileSystem
+import org.apache.spark.sql.execution.streaming.CheckpointFileManager
+import org.apache.spark.sql.execution.streaming.CheckpointFileManagerTests
+
+class AbortableStreamBasedCheckpointFileManagerSuite
+ extends CheckpointFileManagerTests with Logging {
+
+ override def withTempHadoopPath(p: Path => Unit): Unit = {
+ withTempDir { f: File =>
+ val basePath = new Path(AbortableFileSystem.ABORTABLE_FS_SCHEME, null, f.getAbsolutePath)
+ p(basePath)
+ }
+ }
+
+ override def checkLeakingCrcFiles(path: Path): Unit = { }
+
+ override def createManager(path: Path): CheckpointFileManager = {
+ val conf = new Configuration()
+ conf.set(s"fs.AbstractFileSystem.${AbortableFileSystem.ABORTABLE_FS_SCHEME}.impl",
+ "org.apache.spark.internal.io.cloud.abortable.AbstractAbortableFileSystem")
+ new AbortableStreamBasedCheckpointFileManager(path, conf)
+ }
+}
+
+@IntegrationTestSuite
+class AwsS3AbortableStreamBasedCheckpointFileManagerSuite
+ extends AbortableStreamBasedCheckpointFileManagerSuite with BeforeAndAfter {
+
+ val s3aPath = Properties.envOrNone("S3A_PATH")
+
+ val hadoopConf = new Configuration()
+
+ var cleanup: () => Unit = () => {}
+
+ override protected def beforeAll(): Unit = {
+ assert(s3aPath.isDefined, "S3A_PATH must be defined!")
+ val path = new Path(s3aPath.get)
+ val fc = FileContext.getFileContext(path.toUri, hadoopConf)
+ assert(!fc.util.exists(path), s"S3A_PATH ($path) should not exists!")
+ fc.mkdir(path, FsPermission.getDirDefault, true)
+ cleanup = () => fc.delete(path, true)
+ }
+
+ override protected def afterAll(): Unit = {
+ cleanup()
+ }
+
+ override def withTempHadoopPath(p: Path => Unit): Unit = {
+ p(new Path(s3aPath.get))
+ }
+
+ override def createManager(path: Path): CheckpointFileManager = {
+ new AbortableStreamBasedCheckpointFileManager(path, hadoopConf)
+ }
+}
diff --git a/hadoop-cloud/src/test/java/org/apache/spark/internal/io/cloud/IntegrationTestSuite.java b/hadoop-cloud/src/test/java/org/apache/spark/internal/io/cloud/IntegrationTestSuite.java
new file mode 100644
index 00000000000..d1c5f07cceb
--- /dev/null
+++ b/hadoop-cloud/src/test/java/org/apache/spark/internal/io/cloud/IntegrationTestSuite.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.internal.io.cloud;
+
+import org.scalatest.TagAnnotation;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@TagAnnotation
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.METHOD, ElementType.TYPE})
+public @interface IntegrationTestSuite {}
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 21ab6f9f636..145440d43ed 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -1136,7 +1136,8 @@ object CopyDependencies {
object TestSettings {
import BuildCommons._
- private val defaultExcludedTags = Seq("org.apache.spark.tags.ChromeUITest")
+ private val defaultExcludedTags = Seq("org.apache.spark.tags.ChromeUITest",
+ "org.apache.spark.internal.io.cloud.IntegrationTestSuite")
lazy val settings = Seq (
// Fork new JVMs for tests and set Java options for those
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
index 48bf88f3116..cf5d54fd20a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
@@ -300,13 +300,10 @@ class FileSystemBasedCheckpointFileManager(path: Path, hadoopConf: Configuration
}
-/** An implementation of [[CheckpointFileManager]] using Hadoop's [[FileContext]] API. */
-class FileContextBasedCheckpointFileManager(path: Path, hadoopConf: Configuration)
- extends CheckpointFileManager with RenameHelperMethods with Logging {
-
- import CheckpointFileManager._
+abstract class AbstractFileContextBasedCheckpointFileManager(path: Path, hadoopConf: Configuration)
+ extends CheckpointFileManager with Logging {
- private val fc = if (path.toUri.getScheme == null) {
+ protected val fc = if (path.toUri.getScheme == null) {
FileContext.getFileContext(hadoopConf)
} else {
FileContext.getFileContext(path.toUri, hadoopConf)
@@ -320,19 +317,6 @@ class FileContextBasedCheckpointFileManager(path: Path, hadoopConf: Configuratio
fc.mkdir(path, FsPermission.getDirDefault, true)
}
- override def createTempFile(path: Path): FSDataOutputStream = {
- import CreateFlag._
- import Options._
- fc.create(
- path, EnumSet.of(CREATE, OVERWRITE), CreateOpts.checksumParam(ChecksumOpt.createDisabled()))
- }
-
- override def createAtomic(
- path: Path,
- overwriteIfPossible: Boolean): CancellableFSDataOutputStream = {
- new RenameBasedFSDataOutputStream(this, path, overwriteIfPossible)
- }
-
override def open(path: Path): FSDataInputStream = {
fc.open(path)
}
@@ -341,14 +325,6 @@ class FileContextBasedCheckpointFileManager(path: Path, hadoopConf: Configuratio
fc.util.exists(path)
}
- override def renameTempFile(srcPath: Path, dstPath: Path, overwriteIfPossible: Boolean): Unit = {
- import Options.Rename._
- fc.rename(srcPath, dstPath, if (overwriteIfPossible) OVERWRITE else NONE)
- // TODO: this is a workaround of HADOOP-16255 - remove this when HADOOP-16255 is resolved
- mayRemoveCrcFile(srcPath)
- }
-
-
override def delete(path: Path): Unit = {
try {
fc.delete(path, true)
@@ -368,6 +344,33 @@ class FileContextBasedCheckpointFileManager(path: Path, hadoopConf: Configuratio
fc.mkdir(qualifiedPath, FsPermission.getDirDefault, true)
qualifiedPath
}
+}
+
+class FileContextBasedCheckpointFileManager(path: Path, hadoopConf: Configuration)
+ extends AbstractFileContextBasedCheckpointFileManager(path, hadoopConf)
+ with RenameHelperMethods {
+
+ import CheckpointFileManager._
+
+ override def createTempFile(path: Path): FSDataOutputStream = {
+ import CreateFlag._
+ import Options._
+ fc.create(
+ path, EnumSet.of(CREATE, OVERWRITE), CreateOpts.checksumParam(ChecksumOpt.createDisabled()))
+ }
+
+ override def createAtomic(
+ path: Path,
+ overwriteIfPossible: Boolean): CancellableFSDataOutputStream = {
+ new RenameBasedFSDataOutputStream(this, path, overwriteIfPossible)
+ }
+
+ override def renameTempFile(srcPath: Path, dstPath: Path, overwriteIfPossible: Boolean): Unit = {
+ import Options.Rename._
+ fc.rename(srcPath, dstPath, if (overwriteIfPossible) OVERWRITE else NONE)
+ // TODO: this is a workaround of HADOOP-16255 - remove this when HADOOP-16255 is resolved
+ mayRemoveCrcFile(srcPath)
+ }
private def mayRemoveCrcFile(path: Path): Unit = {
try {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala
index 79bcd490a24..cbcb4a4062d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala
@@ -27,16 +27,35 @@ import org.apache.hadoop.fs._
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.plans.SQLHelper
import org.apache.spark.sql.catalyst.util.quietly
+import org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
-abstract class CheckpointFileManagerTests extends SparkFunSuite with SQLHelper {
+abstract class CheckpointFileManagerTests extends SparkFunSuite {
- def createManager(path: Path): CheckpointFileManager
+ protected def withTempHadoopPath(p: Path => Unit): Unit
+
+ protected def checkLeakingCrcFiles(path: Path): Unit
+
+ protected def createManager(path: Path): CheckpointFileManager
+
+ private implicit class RichCancellableStream(stream: CancellableFSDataOutputStream) {
+ def writeContent(i: Int): CancellableFSDataOutputStream = {
+ stream.writeInt(i)
+ stream
+ }
+ }
+
+ private implicit class RichFSDataInputStream(stream: FSDataInputStream) {
+ def readContent(): Int = {
+ val res = stream.readInt()
+ stream.close()
+ res
+ }
+ }
test("mkdirs, list, createAtomic, open, delete, exists") {
- withTempPath { p =>
- val basePath = new Path(p.getAbsolutePath)
+ withTempHadoopPath { case basePath =>
val fm = createManager(basePath)
// Mkdirs
val dir = new Path(s"$basePath/dir/subdir/subsubdir")
@@ -58,42 +77,32 @@ abstract class CheckpointFileManagerTests extends SparkFunSuite with SQLHelper {
// Create atomic without overwrite
var path = new Path(s"$dir/file")
assert(!fm.exists(path))
- fm.createAtomic(path, overwriteIfPossible = false).cancel()
+ fm.createAtomic(path, overwriteIfPossible = false).writeContent(1).cancel()
assert(!fm.exists(path))
- fm.createAtomic(path, overwriteIfPossible = false).close()
+ fm.createAtomic(path, overwriteIfPossible = false).writeContent(2).close()
assert(fm.exists(path))
+ assert(fm.open(path).readContent() == 2)
quietly {
intercept[IOException] {
// should throw exception since file exists and overwrite is false
- fm.createAtomic(path, overwriteIfPossible = false).close()
+ fm.createAtomic(path, overwriteIfPossible = false).writeContent(3).close()
}
}
+ assert(fm.open(path).readContent() == 2)
// Create atomic with overwrite if possible
path = new Path(s"$dir/file2")
assert(!fm.exists(path))
- fm.createAtomic(path, overwriteIfPossible = true).cancel()
+ fm.createAtomic(path, overwriteIfPossible = true).writeContent(4).cancel()
assert(!fm.exists(path))
- fm.createAtomic(path, overwriteIfPossible = true).close()
+ fm.createAtomic(path, overwriteIfPossible = true).writeContent(5).close()
assert(fm.exists(path))
- fm.createAtomic(path, overwriteIfPossible = true).close() // should not throw exception
-
- // crc file should not be leaked when origin file doesn't exist.
- // The implementation of Hadoop filesystem may filter out checksum file, so
- // listing files from local filesystem.
- val fileNames = new File(path.getParent.toString).listFiles().toSeq
- .filter(p => p.isFile).map(p => p.getName)
- val crcFiles = fileNames.filter(n => n.startsWith(".") && n.endsWith(".crc"))
- val originFileNamesForExistingCrcFiles = crcFiles.map { name =>
- // remove first "." and last ".crc"
- name.substring(1, name.length - 4)
- }
-
- // Check all origin files exist for all crc files.
- assert(originFileNamesForExistingCrcFiles.toSet.subsetOf(fileNames.toSet),
- s"Some of origin files for crc files don't exist - crc files: $crcFiles / " +
- s"expected origin files: $originFileNamesForExistingCrcFiles / actual files: $fileNames")
+ assert(fm.open(path).readContent() == 5)
+ // should not throw exception
+ fm.createAtomic(path, overwriteIfPossible = true).writeContent(6).close()
+ assert(fm.open(path).readContent() == 6)
+ checkLeakingCrcFiles(dir)
// Open and delete
fm.open(path).close()
fm.delete(path)
@@ -138,13 +147,42 @@ class CheckpointFileManagerSuite extends SharedSparkSession {
}
}
-class FileContextBasedCheckpointFileManagerSuite extends CheckpointFileManagerTests {
+abstract class CheckpointFileManagerTestsOnLocalFs
+ extends CheckpointFileManagerTests with SQLHelper {
+
+ protected def withTempHadoopPath(p: Path => Unit): Unit = {
+ withTempDir { f: File =>
+ val basePath = new Path(f.getAbsolutePath)
+ p(basePath)
+ }
+ }
+
+ protected def checkLeakingCrcFiles(path: Path): Unit = {
+ // crc file should not be leaked when origin file doesn't exist.
+ // The implementation of Hadoop filesystem may filter out checksum file, so
+ // listing files from local filesystem.
+ val fileNames = new File(path.toString).listFiles().toSeq
+ .filter(p => p.isFile).map(p => p.getName)
+ val crcFiles = fileNames.filter(n => n.startsWith(".") && n.endsWith(".crc"))
+ val originFileNamesForExistingCrcFiles = crcFiles.map { name =>
+ // remove first "." and last ".crc"
+ name.substring(1, name.length - 4)
+ }
+
+ // Check all origin files exist for all crc files.
+ assert(originFileNamesForExistingCrcFiles.toSet.subsetOf(fileNames.toSet),
+ s"Some of origin files for crc files don't exist - crc files: $crcFiles / " +
+ s"expected origin files: $originFileNamesForExistingCrcFiles / actual files: $fileNames")
+ }
+}
+
+class FileContextBasedCheckpointFileManagerSuite extends CheckpointFileManagerTestsOnLocalFs {
override def createManager(path: Path): CheckpointFileManager = {
new FileContextBasedCheckpointFileManager(path, new Configuration())
}
}
-class FileSystemBasedCheckpointFileManagerSuite extends CheckpointFileManagerTests {
+class FileSystemBasedCheckpointFileManagerSuite extends CheckpointFileManagerTestsOnLocalFs {
override def createManager(path: Path): CheckpointFileManager = {
new FileSystemBasedCheckpointFileManager(path, new Configuration())
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org