You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2022/08/27 04:55:30 UTC

[spark] branch master updated (1178bcecc83 -> fb4dba1413d)

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


    from 1178bcecc83 [SPARK-40211][CORE][SQL] Allow customize initial partitions number in take() behavior
     new 8fb853218be Revert "[SPARK-40039][SS][FOLLOWUP] Fixes scala style check issue"
     new fb4dba1413d Revert "[SPARK-40039][SS] Introducing a streaming checkpoint file manager based on Hadoop's Abortable interface"

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/cloud-integration.md                          |  12 +--
 hadoop-cloud/README.md                             |  20 ----
 hadoop-cloud/pom.xml                               |  47 ---------
 ...AbortableStreamBasedCheckpointFileManager.scala |  96 -----------------
 ...ableStreamBasedCheckpointFileManagerSuite.scala |  83 ---------------
 .../internal/io/cloud/IntegrationTestSuite.java    |  29 ------
 .../io/cloud/abortable/AbortableFileSystem.java    | 113 ---------------------
 .../abortable/AbstractAbortableFileSystem.java     |  44 --------
 project/SparkBuild.scala                           |   3 +-
 .../streaming/CheckpointFileManager.scala          |  57 +++++------
 .../streaming/CheckpointFileManagerSuite.scala     |  94 +++++------------
 11 files changed, 59 insertions(+), 539 deletions(-)
 delete mode 100644 hadoop-cloud/README.md
 delete mode 100644 hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/AbortableStreamBasedCheckpointFileManager.scala
 delete mode 100644 hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/AbortableStreamBasedCheckpointFileManagerSuite.scala
 delete mode 100644 hadoop-cloud/src/test/java/org/apache/spark/internal/io/cloud/IntegrationTestSuite.java
 delete mode 100644 hadoop-cloud/src/test/java/org/apache/spark/internal/io/cloud/abortable/AbortableFileSystem.java
 delete mode 100644 hadoop-cloud/src/test/java/org/apache/spark/internal/io/cloud/abortable/AbstractAbortableFileSystem.java


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


[spark] 01/02: Revert "[SPARK-40039][SS][FOLLOWUP] Fixes scala style check issue"

Posted by gu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git

commit 8fb853218bed792b3f66c8401691dd16262e1f9e
Author: Hyukjin Kwon <gu...@apache.org>
AuthorDate: Sat Aug 27 13:55:04 2022 +0900

    Revert "[SPARK-40039][SS][FOLLOWUP] Fixes scala style check issue"
    
    This reverts commit 4f4a080a78c3979961e8483aace663bdc45f489d.
---
 .../internal/io/cloud/AbortableStreamBasedCheckpointFileManager.scala  | 3 +--
 .../apache/spark/internal/io/cloud/abortable/AbortableFileSystem.java  | 1 +
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/AbortableStreamBasedCheckpointFileManager.scala b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/AbortableStreamBasedCheckpointFileManager.scala
index 2afab01ec7b..71a2c1d896e 100644
--- a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/AbortableStreamBasedCheckpointFileManager.scala
+++ b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/AbortableStreamBasedCheckpointFileManager.scala
@@ -65,8 +65,7 @@ class AbortableStreamBasedCheckpointFileManager(path: Path, hadoopConf: Configur
         if (!overwriteIfPossible && fc.util().exists(path)) {
           fsDataOutputStream.abort()
           throw new FileAlreadyExistsException(
-            s"Failed to close atomic stream $path (stream: " +
-            s"$fsDataOutputStream) as destination already exists")
+            s"Failed to close atomic stream $path (stream: $fsDataOutputStream) as destination already exists")
         }
         fsDataOutputStream.close()
       } catch {
diff --git a/hadoop-cloud/src/test/java/org/apache/spark/internal/io/cloud/abortable/AbortableFileSystem.java b/hadoop-cloud/src/test/java/org/apache/spark/internal/io/cloud/abortable/AbortableFileSystem.java
index 5c7f68f4378..65a735e7dae 100644
--- a/hadoop-cloud/src/test/java/org/apache/spark/internal/io/cloud/abortable/AbortableFileSystem.java
+++ b/hadoop-cloud/src/test/java/org/apache/spark/internal/io/cloud/abortable/AbortableFileSystem.java
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.statistics.BufferedIOStatisticsOutputStream;
 import org.apache.hadoop.util.Progressable;
 
 public class AbortableFileSystem extends RawLocalFileSystem {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


[spark] 02/02: Revert "[SPARK-40039][SS] Introducing a streaming checkpoint file manager based on Hadoop's Abortable interface"

Posted by gu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git

commit fb4dba1413dac860c9bec5bae422b2f78d212948
Author: Hyukjin Kwon <gu...@apache.org>
AuthorDate: Sat Aug 27 13:55:09 2022 +0900

    Revert "[SPARK-40039][SS] Introducing a streaming checkpoint file manager based on Hadoop's Abortable interface"
    
    This reverts commit 7e4064c02de46b5fe33bdb0b9c3991d41abfbf41.
---
 docs/cloud-integration.md                          |  12 +--
 hadoop-cloud/README.md                             |  20 ----
 hadoop-cloud/pom.xml                               |  47 ---------
 ...AbortableStreamBasedCheckpointFileManager.scala |  95 -----------------
 ...ableStreamBasedCheckpointFileManagerSuite.scala |  83 ---------------
 .../internal/io/cloud/IntegrationTestSuite.java    |  29 ------
 .../io/cloud/abortable/AbortableFileSystem.java    | 114 ---------------------
 .../abortable/AbstractAbortableFileSystem.java     |  44 --------
 project/SparkBuild.scala                           |   3 +-
 .../streaming/CheckpointFileManager.scala          |  57 +++++------
 .../streaming/CheckpointFileManagerSuite.scala     |  94 +++++------------
 11 files changed, 59 insertions(+), 539 deletions(-)

diff --git a/docs/cloud-integration.md b/docs/cloud-integration.md
index 7630f3c0a0d..d65616ed0b8 100644
--- a/docs/cloud-integration.md
+++ b/docs/cloud-integration.md
@@ -231,15 +231,9 @@ The size of the window needs to be set to handle this.
 is no need for a workflow of write-then-rename to ensure that files aren't picked up
 while they are still being written. Applications can write straight to the monitored directory.
 
-1. In case of the default checkpoint file manager called `FileContextBasedCheckpointFileManager`
-streams should only be checkpointed to a store implementing a fast and
-atomic `rename()` operation. Otherwise the checkpointing may be slow and potentially unreliable.
-On AWS S3 with Hadoop 3.3.1 or later using the S3A connector the abortable stream based checkpoint
-file manager can be used (by setting the `spark.sql.streaming.checkpointFileManagerClass`
-configuration to `org.apache.spark.internal.io.cloud.AbortableStreamBasedCheckpointFileManager`)
-which eliminates the slow rename. In this case users must be extra careful to avoid the reuse of
-the checkpoint location among multiple queries running parallelly as that could lead to corruption
-of the checkpointing data.
+1. Streams should only be checkpointed to a store implementing a fast and
+atomic `rename()` operation.
+Otherwise the checkpointing may be slow and potentially unreliable.
 
 ## Committing work into cloud storage safely and fast.
 
diff --git a/hadoop-cloud/README.md b/hadoop-cloud/README.md
deleted file mode 100644
index 0be167e6ef8..00000000000
--- a/hadoop-cloud/README.md
+++ /dev/null
@@ -1,20 +0,0 @@
----
-layout: global
-title: Spark Hadoop3 Integration Tests
----
-
-# Running the Integration Tests
-
-As mocking of an external systems (like AWS S3) is not always perfect the unit testing should be
-extended with integration testing. This is why the build profile `integration-test` has been
-introduced here. When it is given (`-Pintegration-test`) for testing then only those tests are
-executed where the `org.apache.spark.internal.io.cloud.IntegrationTestSuite` tag is used.
-
-One example is `AwsS3AbortableStreamBasedCheckpointFileManagerSuite`.
-
-Integration tests will have some extra configurations for example selecting the external system to
-run the test against. Those configs are passed as environment variables and the existence of these
-variables must be checked by the test.
-Like for `AwsS3AbortableStreamBasedCheckpointFileManagerSuite` the S3 bucket used for testing
-is passed in the `S3A_PATH` and the credetinals to access AWS S3 are AWS_ACCESS_KEY_ID and
-AWS_SECRET_ACCESS_KEY (in addition you can define an optional AWS_SESSION_TOKEN too).
diff --git a/hadoop-cloud/pom.xml b/hadoop-cloud/pom.xml
index f4e0557ea7a..0a6a81ed7cf 100644
--- a/hadoop-cloud/pom.xml
+++ b/hadoop-cloud/pom.xml
@@ -49,13 +49,6 @@
       <version>${project.version}</version>
       <scope>provided</scope>
     </dependency>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-sql_${scala.binary.version}</artifactId>
-      <version>${project.version}</version>
-      <type>test-jar</type>
-      <scope>test</scope>
-    </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-core_${scala.binary.version}</artifactId>
@@ -219,22 +212,6 @@
 
       <build>
         <plugins>
-          <plugin>
-            <groupId>org.scalatest</groupId>
-            <artifactId>scalatest-maven-plugin</artifactId>
-            <executions>
-              <execution>
-                <id>test</id>
-                <phase>test</phase>
-                <goals>
-                  <goal>test</goal>
-                </goals>
-                <configuration>
-                  <tagsToExclude>org.apache.spark.internal.io.cloud.IntegrationTestSuite</tagsToExclude>
-                </configuration>
-              </execution>
-            </executions>
-          </plugin>
           <plugin>
             <groupId>org.codehaus.mojo</groupId>
             <artifactId>build-helper-maven-plugin</artifactId>
@@ -320,30 +297,6 @@
       </dependencies>
     </profile>
 
-    <profile>
-      <id>integration-test</id>
-      <build>
-        <plugins>
-          <plugin>
-            <groupId>org.scalatest</groupId>
-            <artifactId>scalatest-maven-plugin</artifactId>
-            <executions>
-              <execution>
-                <id>test</id>
-                <phase>test</phase>
-                <goals>
-                  <goal>test</goal>
-                </goals>
-                <configuration>
-                  <tagsToExclude>None</tagsToExclude>
-                  <tagsToInclude>org.apache.spark.internal.io.cloud.IntegrationTestSuite</tagsToInclude>
-                </configuration>
-              </execution>
-            </executions>
-          </plugin>
-        </plugins>
-      </build>
-    </profile>
   </profiles>
 
 </project>
diff --git a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/AbortableStreamBasedCheckpointFileManager.scala b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/AbortableStreamBasedCheckpointFileManager.scala
deleted file mode 100644
index 71a2c1d896e..00000000000
--- a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/AbortableStreamBasedCheckpointFileManager.scala
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.internal.io.cloud
-
-import java.nio.file.FileAlreadyExistsException
-import java.util.EnumSet
-
-import scala.util.control.NonFatal
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs._
-
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.execution.streaming.AbstractFileContextBasedCheckpointFileManager
-import org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream
-
-class AbortableStreamBasedCheckpointFileManager(path: Path, hadoopConf: Configuration)
-  extends AbstractFileContextBasedCheckpointFileManager(path, hadoopConf) with Logging {
-
-  if (!fc.hasPathCapability(path, CommonPathCapabilities.ABORTABLE_STREAM)) {
-    throw new UnsupportedFileSystemException("AbortableStreamBasedCheckpointFileManager requires" +
-      s" an fs (path: $path) with abortable stream support")
-  }
-
-  logInfo(s"Writing atomically to $path based on abortable stream")
-
-  class AbortableStreamBasedFSDataOutputStream(
-      fsDataOutputStream: FSDataOutputStream,
-      fc: FileContext,
-      path: Path,
-      overwriteIfPossible: Boolean) extends CancellableFSDataOutputStream(fsDataOutputStream) {
-
-    @volatile private var terminated = false
-
-    override def cancel(): Unit = synchronized {
-      if (terminated) return
-      try {
-        fsDataOutputStream.abort()
-        fsDataOutputStream.close()
-      } catch {
-          case NonFatal(e) =>
-            logWarning(s"Error cancelling write to $path (stream: $fsDataOutputStream)", e)
-      } finally {
-        terminated = true
-      }
-    }
-
-    override def close(): Unit = synchronized {
-      if (terminated) return
-      try {
-        if (!overwriteIfPossible && fc.util().exists(path)) {
-          fsDataOutputStream.abort()
-          throw new FileAlreadyExistsException(
-            s"Failed to close atomic stream $path (stream: $fsDataOutputStream) as destination already exists")
-        }
-        fsDataOutputStream.close()
-      } catch {
-          case NonFatal(e) =>
-            logWarning(s"Error closing $path (stream: $fsDataOutputStream)", e)
-      } finally {
-        terminated = true
-      }
-    }
-
-    override def toString(): String = {
-      fsDataOutputStream.toString
-    }
-  }
-
-  override def createAtomic(
-      path: Path, overwriteIfPossible: Boolean): CancellableFSDataOutputStream = {
-    import CreateFlag._
-    val createFlag = if (overwriteIfPossible) {
-      EnumSet.of(CREATE, OVERWRITE)
-    } else {
-      EnumSet.of(CREATE)
-    }
-    new AbortableStreamBasedFSDataOutputStream(
-      fc.create(path, createFlag), fc, path, overwriteIfPossible)
-  }
-}
diff --git a/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/AbortableStreamBasedCheckpointFileManagerSuite.scala b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/AbortableStreamBasedCheckpointFileManagerSuite.scala
deleted file mode 100644
index 0dbc650fc8c..00000000000
--- a/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/AbortableStreamBasedCheckpointFileManagerSuite.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.internal.io.cloud
-
-import java.io.File
-
-import scala.util.Properties
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs._
-import org.apache.hadoop.fs.permission.FsPermission
-import org.scalatest.BeforeAndAfter
-
-import org.apache.spark.internal.Logging
-import org.apache.spark.internal.io.cloud.abortable.AbortableFileSystem
-import org.apache.spark.sql.execution.streaming.CheckpointFileManager
-import org.apache.spark.sql.execution.streaming.CheckpointFileManagerTests
-
-class AbortableStreamBasedCheckpointFileManagerSuite
-  extends CheckpointFileManagerTests with Logging {
-
-  override def withTempHadoopPath(p: Path => Unit): Unit = {
-    withTempDir { f: File =>
-      val basePath = new Path(AbortableFileSystem.ABORTABLE_FS_SCHEME, null, f.getAbsolutePath)
-      p(basePath)
-    }
-  }
-
-  override def checkLeakingCrcFiles(path: Path): Unit = { }
-
-  override def createManager(path: Path): CheckpointFileManager = {
-    val conf = new Configuration()
-    conf.set(s"fs.AbstractFileSystem.${AbortableFileSystem.ABORTABLE_FS_SCHEME}.impl",
-      "org.apache.spark.internal.io.cloud.abortable.AbstractAbortableFileSystem")
-    new AbortableStreamBasedCheckpointFileManager(path, conf)
-  }
-}
-
-@IntegrationTestSuite
-class AwsS3AbortableStreamBasedCheckpointFileManagerSuite
-    extends AbortableStreamBasedCheckpointFileManagerSuite with BeforeAndAfter {
-
-  val s3aPath = Properties.envOrNone("S3A_PATH")
-
-  val hadoopConf = new Configuration()
-
-  var cleanup: () => Unit = () => {}
-
-  override protected def beforeAll(): Unit = {
-    assert(s3aPath.isDefined, "S3A_PATH must be defined!")
-    val path = new Path(s3aPath.get)
-    val fc = FileContext.getFileContext(path.toUri, hadoopConf)
-    assert(!fc.util.exists(path), s"S3A_PATH ($path) should not exists!")
-    fc.mkdir(path, FsPermission.getDirDefault, true)
-    cleanup = () => fc.delete(path, true)
-  }
-
-  override protected def afterAll(): Unit = {
-    cleanup()
-  }
-
-  override def withTempHadoopPath(p: Path => Unit): Unit = {
-    p(new Path(s3aPath.get))
-  }
-
-  override def createManager(path: Path): CheckpointFileManager = {
-    new AbortableStreamBasedCheckpointFileManager(path, hadoopConf)
-  }
-}
diff --git a/hadoop-cloud/src/test/java/org/apache/spark/internal/io/cloud/IntegrationTestSuite.java b/hadoop-cloud/src/test/java/org/apache/spark/internal/io/cloud/IntegrationTestSuite.java
deleted file mode 100644
index d1c5f07cceb..00000000000
--- a/hadoop-cloud/src/test/java/org/apache/spark/internal/io/cloud/IntegrationTestSuite.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.internal.io.cloud;
-
-import org.scalatest.TagAnnotation;
-
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-@TagAnnotation
-@Retention(RetentionPolicy.RUNTIME)
-@Target({ElementType.METHOD, ElementType.TYPE})
-public @interface IntegrationTestSuite {}
diff --git a/hadoop-cloud/src/test/java/org/apache/spark/internal/io/cloud/abortable/AbortableFileSystem.java b/hadoop-cloud/src/test/java/org/apache/spark/internal/io/cloud/abortable/AbortableFileSystem.java
deleted file mode 100644
index 65a735e7dae..00000000000
--- a/hadoop-cloud/src/test/java/org/apache/spark/internal/io/cloud/abortable/AbortableFileSystem.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.internal.io.cloud.abortable;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.URI;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.fs.statistics.BufferedIOStatisticsOutputStream;
-import org.apache.hadoop.util.Progressable;
-
-public class AbortableFileSystem extends RawLocalFileSystem {
-
-  public static String ABORTABLE_FS_SCHEME = "abortable";
-
-  @Override
-  public URI getUri() {
-    return URI.create(ABORTABLE_FS_SCHEME + ":///");
-  }
-
-  public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite,
-    int bufferSize, short replication, long blockSize, Progressable progress) throws IOException {
-    FSDataOutputStream out = this.create(f, overwrite, bufferSize, replication, blockSize,
-      progress, permission);
-    return out;
-  }
-
-  private FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication,
-    long blockSize, Progressable progress, FsPermission permission) throws IOException {
-    if (this.exists(f) && !overwrite) {
-      throw new FileAlreadyExistsException("File already exists: " + f);
-    } else {
-      Path parent = f.getParent();
-      if (parent != null && !this.mkdirs(parent)) {
-        throw new IOException("Mkdirs failed to create " + parent.toString());
-      } else {
-        return new FSDataOutputStream(this.createOutputStreamWithMode(f, false, permission), null);
-      }
-    }
-  }
-
-  @Override
-  protected OutputStream createOutputStreamWithMode(Path f, boolean append,
-      FsPermission permission) throws IOException {
-    return new AbortableOutputStream(f, append, permission);
-  }
-
-  class AbortableOutputStream extends ByteArrayOutputStream
-      implements Abortable, StreamCapabilities {
-
-    private final AtomicBoolean closed = new AtomicBoolean(false);
-
-    private Path f;
-
-    private boolean append;
-
-    private FsPermission permission;
-
-    AbortableOutputStream(Path f, boolean append, FsPermission permission) {
-      this.f = f;
-      this.append = append;
-      this.permission = permission;
-    }
-
-    @Override
-    public void close() throws IOException {
-      if (closed.getAndSet(true)) {
-        return;
-      }
-
-      OutputStream output =
-        AbortableFileSystem.super.createOutputStreamWithMode(f, append, permission);
-      writeTo(output);
-      output.close();
-    }
-
-    @Override
-    public AbortableResult abort() {
-      final boolean isAlreadyClosed = closed.getAndSet(true);
-      return new AbortableResult() {
-        public boolean alreadyClosed() {
-          return isAlreadyClosed;
-        }
-
-        public IOException anyCleanupException() {
-          return null;
-        }
-      };
-    }
-
-    @Override
-    public boolean hasCapability(String capability) {
-      return capability == CommonPathCapabilities.ABORTABLE_STREAM;
-    }
-  }
-}
diff --git a/hadoop-cloud/src/test/java/org/apache/spark/internal/io/cloud/abortable/AbstractAbortableFileSystem.java b/hadoop-cloud/src/test/java/org/apache/spark/internal/io/cloud/abortable/AbstractAbortableFileSystem.java
deleted file mode 100644
index 57ede38a23b..00000000000
--- a/hadoop-cloud/src/test/java/org/apache/spark/internal/io/cloud/abortable/AbstractAbortableFileSystem.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.internal.io.cloud.abortable;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonPathCapabilities;
-import org.apache.hadoop.fs.DelegateToFileSystem;
-import org.apache.hadoop.fs.Path;
-
-public class AbstractAbortableFileSystem extends DelegateToFileSystem {
-
-  public AbstractAbortableFileSystem(
-      URI theUri,
-      Configuration conf) throws IOException, URISyntaxException {
-    super(theUri, new AbortableFileSystem(), conf, AbortableFileSystem.ABORTABLE_FS_SCHEME, false);
-  }
-
-  @Override
-  public boolean hasPathCapability(Path path, String capability) throws IOException {
-    if (capability == CommonPathCapabilities.ABORTABLE_STREAM) {
-      return true;
-    } else {
-      return super.hasPathCapability(path, capability);
-    }
-  }
-}
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 145440d43ed..21ab6f9f636 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -1136,8 +1136,7 @@ object CopyDependencies {
 
 object TestSettings {
   import BuildCommons._
-  private val defaultExcludedTags = Seq("org.apache.spark.tags.ChromeUITest",
-    "org.apache.spark.internal.io.cloud.IntegrationTestSuite")
+  private val defaultExcludedTags = Seq("org.apache.spark.tags.ChromeUITest")
 
   lazy val settings = Seq (
     // Fork new JVMs for tests and set Java options for those
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
index cf5d54fd20a..48bf88f3116 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
@@ -300,10 +300,13 @@ class FileSystemBasedCheckpointFileManager(path: Path, hadoopConf: Configuration
 }
 
 
-abstract class AbstractFileContextBasedCheckpointFileManager(path: Path, hadoopConf: Configuration)
-  extends CheckpointFileManager with Logging {
+/** An implementation of [[CheckpointFileManager]] using Hadoop's [[FileContext]] API. */
+class FileContextBasedCheckpointFileManager(path: Path, hadoopConf: Configuration)
+  extends CheckpointFileManager with RenameHelperMethods with Logging {
+
+  import CheckpointFileManager._
 
-  protected val fc = if (path.toUri.getScheme == null) {
+  private val fc = if (path.toUri.getScheme == null) {
     FileContext.getFileContext(hadoopConf)
   } else {
     FileContext.getFileContext(path.toUri, hadoopConf)
@@ -317,6 +320,19 @@ abstract class AbstractFileContextBasedCheckpointFileManager(path: Path, hadoopC
     fc.mkdir(path, FsPermission.getDirDefault, true)
   }
 
+  override def createTempFile(path: Path): FSDataOutputStream = {
+    import CreateFlag._
+    import Options._
+    fc.create(
+      path, EnumSet.of(CREATE, OVERWRITE), CreateOpts.checksumParam(ChecksumOpt.createDisabled()))
+  }
+
+  override def createAtomic(
+      path: Path,
+      overwriteIfPossible: Boolean): CancellableFSDataOutputStream = {
+    new RenameBasedFSDataOutputStream(this, path, overwriteIfPossible)
+  }
+
   override def open(path: Path): FSDataInputStream = {
     fc.open(path)
   }
@@ -325,6 +341,14 @@ abstract class AbstractFileContextBasedCheckpointFileManager(path: Path, hadoopC
     fc.util.exists(path)
   }
 
+  override def renameTempFile(srcPath: Path, dstPath: Path, overwriteIfPossible: Boolean): Unit = {
+    import Options.Rename._
+    fc.rename(srcPath, dstPath, if (overwriteIfPossible) OVERWRITE else NONE)
+    // TODO: this is a workaround of HADOOP-16255 - remove this when HADOOP-16255 is resolved
+    mayRemoveCrcFile(srcPath)
+  }
+
+
   override def delete(path: Path): Unit = {
     try {
       fc.delete(path, true)
@@ -344,33 +368,6 @@ abstract class AbstractFileContextBasedCheckpointFileManager(path: Path, hadoopC
     fc.mkdir(qualifiedPath, FsPermission.getDirDefault, true)
     qualifiedPath
   }
-}
-
-class FileContextBasedCheckpointFileManager(path: Path, hadoopConf: Configuration)
-  extends AbstractFileContextBasedCheckpointFileManager(path, hadoopConf)
-  with RenameHelperMethods {
-
-  import CheckpointFileManager._
-
-  override def createTempFile(path: Path): FSDataOutputStream = {
-    import CreateFlag._
-    import Options._
-    fc.create(
-      path, EnumSet.of(CREATE, OVERWRITE), CreateOpts.checksumParam(ChecksumOpt.createDisabled()))
-  }
-
-  override def createAtomic(
-      path: Path,
-      overwriteIfPossible: Boolean): CancellableFSDataOutputStream = {
-    new RenameBasedFSDataOutputStream(this, path, overwriteIfPossible)
-  }
-
-  override def renameTempFile(srcPath: Path, dstPath: Path, overwriteIfPossible: Boolean): Unit = {
-    import Options.Rename._
-    fc.rename(srcPath, dstPath, if (overwriteIfPossible) OVERWRITE else NONE)
-    // TODO: this is a workaround of HADOOP-16255 - remove this when HADOOP-16255 is resolved
-    mayRemoveCrcFile(srcPath)
-  }
 
   private def mayRemoveCrcFile(path: Path): Unit = {
     try {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala
index cbcb4a4062d..79bcd490a24 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala
@@ -27,35 +27,16 @@ import org.apache.hadoop.fs._
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.plans.SQLHelper
 import org.apache.spark.sql.catalyst.util.quietly
-import org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
 
-abstract class CheckpointFileManagerTests extends SparkFunSuite {
+abstract class CheckpointFileManagerTests extends SparkFunSuite with SQLHelper {
 
-  protected def withTempHadoopPath(p: Path => Unit): Unit
-
-  protected def checkLeakingCrcFiles(path: Path): Unit
-
-  protected def createManager(path: Path): CheckpointFileManager
-
-  private implicit class RichCancellableStream(stream: CancellableFSDataOutputStream) {
-    def writeContent(i: Int): CancellableFSDataOutputStream = {
-      stream.writeInt(i)
-      stream
-    }
-  }
-
-  private implicit class RichFSDataInputStream(stream: FSDataInputStream) {
-    def readContent(): Int = {
-      val res = stream.readInt()
-      stream.close()
-      res
-    }
-  }
+  def createManager(path: Path): CheckpointFileManager
 
   test("mkdirs, list, createAtomic, open, delete, exists") {
-    withTempHadoopPath { case basePath =>
+    withTempPath { p =>
+      val basePath = new Path(p.getAbsolutePath)
       val fm = createManager(basePath)
       // Mkdirs
       val dir = new Path(s"$basePath/dir/subdir/subsubdir")
@@ -77,32 +58,42 @@ abstract class CheckpointFileManagerTests extends SparkFunSuite {
       // Create atomic without overwrite
       var path = new Path(s"$dir/file")
       assert(!fm.exists(path))
-      fm.createAtomic(path, overwriteIfPossible = false).writeContent(1).cancel()
+      fm.createAtomic(path, overwriteIfPossible = false).cancel()
       assert(!fm.exists(path))
-      fm.createAtomic(path, overwriteIfPossible = false).writeContent(2).close()
+      fm.createAtomic(path, overwriteIfPossible = false).close()
       assert(fm.exists(path))
-      assert(fm.open(path).readContent() == 2)
       quietly {
         intercept[IOException] {
           // should throw exception since file exists and overwrite is false
-          fm.createAtomic(path, overwriteIfPossible = false).writeContent(3).close()
+          fm.createAtomic(path, overwriteIfPossible = false).close()
         }
       }
-      assert(fm.open(path).readContent() == 2)
 
       // Create atomic with overwrite if possible
       path = new Path(s"$dir/file2")
       assert(!fm.exists(path))
-      fm.createAtomic(path, overwriteIfPossible = true).writeContent(4).cancel()
+      fm.createAtomic(path, overwriteIfPossible = true).cancel()
       assert(!fm.exists(path))
-      fm.createAtomic(path, overwriteIfPossible = true).writeContent(5).close()
+      fm.createAtomic(path, overwriteIfPossible = true).close()
       assert(fm.exists(path))
-      assert(fm.open(path).readContent() == 5)
-      // should not throw exception
-      fm.createAtomic(path, overwriteIfPossible = true).writeContent(6).close()
-      assert(fm.open(path).readContent() == 6)
+      fm.createAtomic(path, overwriteIfPossible = true).close()  // should not throw exception
+
+      // crc file should not be leaked when origin file doesn't exist.
+      // The implementation of Hadoop filesystem may filter out checksum file, so
+      // listing files from local filesystem.
+      val fileNames = new File(path.getParent.toString).listFiles().toSeq
+        .filter(p => p.isFile).map(p => p.getName)
+      val crcFiles = fileNames.filter(n => n.startsWith(".") && n.endsWith(".crc"))
+      val originFileNamesForExistingCrcFiles = crcFiles.map { name =>
+        // remove first "." and last ".crc"
+        name.substring(1, name.length - 4)
+      }
+
+      // Check all origin files exist for all crc files.
+      assert(originFileNamesForExistingCrcFiles.toSet.subsetOf(fileNames.toSet),
+        s"Some of origin files for crc files don't exist - crc files: $crcFiles / " +
+          s"expected origin files: $originFileNamesForExistingCrcFiles / actual files: $fileNames")
 
-      checkLeakingCrcFiles(dir)
       // Open and delete
       fm.open(path).close()
       fm.delete(path)
@@ -147,42 +138,13 @@ class CheckpointFileManagerSuite extends SharedSparkSession {
   }
 }
 
-abstract class CheckpointFileManagerTestsOnLocalFs
-  extends CheckpointFileManagerTests with SQLHelper {
-
-  protected def withTempHadoopPath(p: Path => Unit): Unit = {
-    withTempDir { f: File =>
-      val basePath = new Path(f.getAbsolutePath)
-      p(basePath)
-    }
-  }
-
-  protected def checkLeakingCrcFiles(path: Path): Unit = {
-    // crc file should not be leaked when origin file doesn't exist.
-    // The implementation of Hadoop filesystem may filter out checksum file, so
-    // listing files from local filesystem.
-    val fileNames = new File(path.toString).listFiles().toSeq
-      .filter(p => p.isFile).map(p => p.getName)
-    val crcFiles = fileNames.filter(n => n.startsWith(".") && n.endsWith(".crc"))
-    val originFileNamesForExistingCrcFiles = crcFiles.map { name =>
-      // remove first "." and last ".crc"
-      name.substring(1, name.length - 4)
-    }
-
-    // Check all origin files exist for all crc files.
-    assert(originFileNamesForExistingCrcFiles.toSet.subsetOf(fileNames.toSet),
-      s"Some of origin files for crc files don't exist - crc files: $crcFiles / " +
-        s"expected origin files: $originFileNamesForExistingCrcFiles / actual files: $fileNames")
-  }
-}
-
-class FileContextBasedCheckpointFileManagerSuite extends CheckpointFileManagerTestsOnLocalFs {
+class FileContextBasedCheckpointFileManagerSuite extends CheckpointFileManagerTests {
   override def createManager(path: Path): CheckpointFileManager = {
     new FileContextBasedCheckpointFileManager(path, new Configuration())
   }
 }
 
-class FileSystemBasedCheckpointFileManagerSuite extends CheckpointFileManagerTestsOnLocalFs {
+class FileSystemBasedCheckpointFileManagerSuite extends CheckpointFileManagerTests {
   override def createManager(path: Path): CheckpointFileManager = {
     new FileSystemBasedCheckpointFileManager(path, new Configuration())
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org