You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2021/02/17 11:29:32 UTC

[hadoop] branch branch-3.3 updated: HADOOP-16906. Abortable (#2684)

This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 4423a7e  HADOOP-16906. Abortable (#2684)
4423a7e is described below

commit 4423a7e736dce40728160b796774b1bc2ceda7eb
Author: Steve Loughran <st...@cloudera.com>
AuthorDate: Thu Feb 11 17:37:20 2021 +0000

    HADOOP-16906. Abortable (#2684)
    
    Adds an Abortable.abort() interface for streams to enable output streams to be terminated; this
    is implemented by the S3A connector's output stream. It allows for commit protocols
    to be implemented which commit/abort work by writing to the final destination and
    using the abort() call to cancel any write which is not intended to be committed.
    Consult the specification document for information about the interface and its use.
    
    Contributed by Jungtaek Lim and Steve Loughran.
    
    Change-Id: I7fcc25e9dd8c10ce6c29f383529f3a2642a201ae
---
 .../main/java/org/apache/hadoop/fs/Abortable.java  |  67 ++++++++
 .../apache/hadoop/fs/CommonPathCapabilities.java   |   7 +
 .../org/apache/hadoop/fs/FSDataOutputStream.java   |  19 ++-
 .../org/apache/hadoop/fs/FSExceptionMessages.java  |   6 +
 .../org/apache/hadoop/fs/StreamCapabilities.java   |   7 +
 .../hadoop/fs/statistics/StoreStatisticNames.java  |   3 +
 .../src/site/markdown/filesystem/abortable.md      | 186 +++++++++++++++++++++
 .../src/site/markdown/filesystem/index.md          |   1 +
 .../src/site/markdown/filesystem/outputstream.md   |   6 +-
 .../hadoop/fs/contract/ContractTestUtils.java      |   2 +-
 .../apache/hadoop/fs/s3a/S3ABlockOutputStream.java | 153 ++++++++++++++---
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java    |   1 +
 .../apache/hadoop/fs/s3a/S3AInstrumentation.java   |   6 +-
 .../java/org/apache/hadoop/fs/s3a/Statistic.java   |   5 +
 .../apache/hadoop/fs/s3a/WriteOperationHelper.java |  29 +++-
 .../org/apache/hadoop/fs/s3a/WriteOperations.java  |   3 +-
 .../apache/hadoop/fs/s3a/s3guard/S3GuardTool.java  |   2 +-
 .../hadoop/fs/s3a/ITestS3ABlockOutputArray.java    |  51 ++++++
 .../apache/hadoop/fs/s3a/MultipartTestUtils.java   |   2 +-
 .../hadoop/fs/s3a/TestS3ABlockOutputStream.java    |  26 +++
 .../scale/ITestS3AMultipartUploadSizeLimits.java   |  93 +++++++++++
 .../apache/hadoop/fs/s3a/test/ExtraAssertions.java |  29 ++++
 22 files changed, 665 insertions(+), 39 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Abortable.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Abortable.java
new file mode 100644
index 0000000..d2fd174
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Abortable.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ *  Abort data being written to  a stream, so that close() does
+ *  not write the data. It is implemented by output streams in
+ *  some object stores, and passed through {@link FSDataOutputStream}.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public interface Abortable {
+
+  /**
+   * Abort the active operation without the output becoming visible.
+   *
+   * This is to provide ability to cancel the write on stream; once
+   * a stream is aborted, the write MUST NOT become visible.
+   *
+   * @throws UnsupportedOperationException if the operation is not supported.
+   * @return the result.
+   */
+  AbortableResult abort();
+
+  /**
+   * Interface for the result of aborts; allows subclasses to extend
+   * (IOStatistics etc) or for future enhancements if ever needed.
+   */
+  interface AbortableResult {
+
+    /**
+     * Was the stream already closed/aborted?
+     * @return true if a close/abort operation had already
+     * taken place.
+     */
+    boolean alreadyClosed();
+
+    /**
+     * Any exception caught during cleanup operations,
+     * exceptions whose raising/catching does not change
+     * the semantics of the abort.
+     * @return an exception or null.
+     */
+    IOException anyCleanupException();
+  }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonPathCapabilities.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonPathCapabilities.java
index 539b3e2..df932df 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonPathCapabilities.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonPathCapabilities.java
@@ -139,4 +139,11 @@ public final class CommonPathCapabilities {
   public static final String FS_MULTIPART_UPLOADER =
       "fs.capability.multipart.uploader";
 
+
+  /**
+   * Stream abort() capability implemented by {@link Abortable#abort()}.
+   * Value: {@value}.
+   */
+  public static final String ABORTABLE_STREAM =
+      "fs.capability.outputstream.abortable";
 }
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java
index add5d08..94c56b7 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java
@@ -35,7 +35,7 @@ import org.apache.hadoop.fs.statistics.IOStatisticsSupport;
 @InterfaceStability.Stable
 public class FSDataOutputStream extends DataOutputStream
     implements Syncable, CanSetDropBehind, StreamCapabilities,
-      IOStatisticsSource {
+      IOStatisticsSource, Abortable {
   private final OutputStream wrappedStream;
 
   private static class PositionCache extends FilterOutputStream {
@@ -168,4 +168,21 @@ public class FSDataOutputStream extends DataOutputStream
   public IOStatistics getIOStatistics() {
     return IOStatisticsSupport.retrieveIOStatistics(wrappedStream);
   }
+
+  /**
+   * Invoke {@code abort()} on the wrapped stream if it
+   * is Abortable, otherwise raise an
+   * {@code UnsupportedOperationException}.
+   * @throws UnsupportedOperationException if not available.
+   * @return the result.
+   */
+  @Override
+  public AbortableResult abort() {
+    if (wrappedStream instanceof Abortable) {
+      return ((Abortable) wrappedStream).abort();
+    } else {
+      throw new UnsupportedOperationException(
+          FSExceptionMessages.ABORTABLE_UNSUPPORTED);
+    }
+  }
 }
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSExceptionMessages.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSExceptionMessages.java
index a8e7b71..f4616f1 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSExceptionMessages.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSExceptionMessages.java
@@ -51,4 +51,10 @@ public class FSExceptionMessages {
 
   public static final String PERMISSION_DENIED_BY_STICKY_BIT =
       "Permission denied by sticky bit";
+
+  /**
+   * A call was made to abort(), but it is not supported.
+   */
+  public static final String ABORTABLE_UNSUPPORTED =
+      "Abortable.abort() is not supported";
 }
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java
index 29af862..8611780 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java
@@ -81,6 +81,13 @@ public interface StreamCapabilities {
   String IOSTATISTICS = "iostatistics";
 
   /**
+   * Stream abort() capability implemented by {@link Abortable#abort()}.
+   * This matches the Path Capability
+   * {@link CommonPathCapabilities#ABORTABLE_STREAM}.
+   */
+  String ABORTABLE_STREAM =  CommonPathCapabilities.ABORTABLE_STREAM;
+
+  /**
    * Capabilities that a stream can support and be queried for.
    */
   @Deprecated
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java
index b6d2a91..ef04fec 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java
@@ -38,6 +38,9 @@ import org.apache.hadoop.classification.InterfaceStability;
 public final class StoreStatisticNames {
 
   /** {@value}. */
+  public static final String OP_ABORT = "op_abort";
+
+  /** {@value}. */
   public static final String OP_APPEND = "op_append";
 
   /** {@value}. */
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/abortable.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/abortable.md
new file mode 100644
index 0000000..7e6ea01
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/abortable.md
@@ -0,0 +1,186 @@
+ <!---
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+
+<!--  ============================================================= -->
+<!--  CLASS: FileSystem -->
+<!--  ============================================================= -->
+
+# interface `org.apache.hadoop.fs.Abortable`
+
+<!-- MACRO{toc|fromDepth=1|toDepth=2} -->
+
+Abort the active operation such that the output does not become
+manifest.
+
+Specifically, if supported on an [output stream](outputstream.html),
+a successful `abort()` MUST guarantee that the stream will not be made visible in the `close()`
+operation.
+
+```java
+
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public interface Abortable {
+
+  /**
+   * Abort the active operation without the output becoming visible.
+   *
+   * This is to provide ability to cancel the write on stream; once
+   * a stream is aborted, the write MUST NOT become visible.
+   *
+   * @throws UnsupportedOperationException if the operation is not supported.
+   * @return the result.
+   */
+  AbortableResult abort();
+
+  /**
+   * Interface for the result of aborts; allows subclasses to extend
+   * (IOStatistics etc) or for future enhancements if ever needed.
+   */
+  interface AbortableResult {
+
+    /**
+     * Was the stream already closed/aborted?
+     * @return true if a close/abort operation had already
+     * taken place.
+     */
+    boolean alreadyClosed();
+
+    /**
+     * Any exception caught during cleanup operations,
+     * exceptions whose raising/catching does not change
+     * the semantics of the abort.
+     * @return an exception or null.
+     */
+    IOException anyCleanupException();
+  }
+}
+```
+
+## Method `abort()`
+
+Aborts the ongoing operation such that no output SHALL become visible
+when the operation is completed.
+
+Unless and until other File System classes implement `Abortable`, the
+interface is specified purely for output streams.
+
+## Method `abort()` on an output stream
+
+`Abortable.abort()` MUST only be supported on output streams
+whose output is only made visible when `close()` is called,
+for example. output streams returned by the S3A FileSystem.
+
+## Preconditions
+
+The stream MUST implement `Abortable` and `StreamCapabilities`.
+
+```python
+ if unsupported:
+  throw UnsupportedException
+
+if not isOpen(stream):
+  no-op
+
+StreamCapabilities.hasCapability("fs.capability.outputstream.abortable") == True
+
+```
+
+
+## Postconditions
+
+After `abort()` returns, the filesystem MUST be unchanged:
+
+```
+FS' = FS
+```
+
+A successful `abort()` operation MUST guarantee that
+when the stream` close()` is invoked no output shall be manifest.
+
+* The stream MUST retry any remote calls needed to force the abort outcome.
+* If any file was present at the destination path, it MUST remain unchanged.
+
+Strictly then:
+
+> if `Abortable.abort()` does not raise `UnsupportedOperationException`
+> then returns, then it guarantees that the write SHALL NOT become visible
+> and that any existing data in the filesystem at the destination path SHALL
+> continue to be available.
+
+
+1. Calls to `write()` methods MUST fail.
+1. Calls to `flush()` MUST be no-ops (applications sometimes call this on closed streams)
+1. Subsequent calls to `abort()` MUST be no-ops.
+1. `close()` MUST NOT manifest the file, and MUST NOT raise an exception
+
+That is, the postconditions of `close()` becomes:
+
+```
+FS' = FS
+```
+
+### Cleanup
+
+* If temporary data is stored in the local filesystem or in the store's upload
+  infrastructure then this MAY be cleaned up; best-effort is expected here.
+
+* The stream SHOULD NOT retry cleanup operations; any failure there MUST be
+  caught and added to `AbortResult`
+
+#### Returned `AbortResult`
+
+The `AbortResult` value returned is primarily for testing and logging.
+
+`alreadyClosed()`: MUST return `true` if the write had already been aborted or closed;
+
+`anyCleanupException();`: SHOULD return any IOException raised during any optional
+cleanup operations.
+
+
+### Thread safety and atomicity
+
+Output streams themselves aren't formally required to  be thread safe,
+but as applications do sometimes assume they are, this call MUST be thread safe.
+
+## Path/Stream capability "fs.capability.outputstream.abortable"
+
+
+An application MUST be able to verify that a stream supports the `Abortable.abort()`
+operation without actually calling it. This is done through the `StreamCapabilities`
+interface.
+
+1. If a stream instance supports `Abortable` then it MUST return `true`
+in the probe `hasCapability("fs.capability.outputstream.abortable")`
+
+1. If a stream instance does not support `Abortable` then it MUST return `false`
+in the probe `hasCapability("fs.capability.outputstream.abortable")`
+
+That is: if a stream declares its support for the feature, a call to `abort()`
+SHALL meet the defined semantics of the operation.
+
+FileSystem/FileContext implementations SHOULD declare support similarly, to
+allow for applications to probe for the feature in the destination directory/path.
+
+If a filesystem supports `Abortable` under a path `P` then it SHOULD return `true` to
+`PathCababilities.hasPathCapability(path, "fs.capability.outputstream.abortable")`
+This is to allow applications to verify that the store supports the feature.
+
+If a filesystem does not support `Abortable` under a path `P` then it MUST
+return `false` to
+`PathCababilities.hasPathCapability(path, "fs.capability.outputstream.abortable")`
+
+
+
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/index.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/index.md
index aba0a44..a4aa136 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/index.md
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/index.md
@@ -33,6 +33,7 @@ HDFS as these are commonly expected by Hadoop client applications.
 1. [Model](model.html)
 1. [FileSystem class](filesystem.html)
 1. [OutputStream, Syncable and `StreamCapabilities`](outputstream.html)
+1. [Abortable](abortable.html)
 1. [FSDataInputStream class](fsdatainputstream.html)
 1. [PathCapabilities interface](pathcapabilities.html)
 1. [FSDataOutputStreamBuilder class](fsdataoutputstreambuilder.html)
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/outputstream.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/outputstream.md
index 33d9648..8d0d4c4 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/outputstream.md
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/outputstream.md
@@ -893,7 +893,7 @@ Object store streams MAY buffer the entire stream's output
 until the final `close()` operation triggers a single `PUT` of the data
 and materialization of the final output.
 
-This significantly change's their behaviour compared to that of
+This significantly changes their behaviour compared to that of
 POSIX filesystems and that specified in this document.
 
 #### Visibility of newly created objects
@@ -961,6 +961,10 @@ is present: the act of instantiating the object, while potentially exhibiting
 create inconsistency, is atomic. Applications may be able to use that fact
 to their advantage.
 
+The [Abortable](abortable.html) interface exposes this ability to abort an output
+stream before its data is made visible, so can be used for checkpointing and similar
+operations.
+
 ## <a name="implementors"></a> Implementors notes.
 
 ### Always implement `Syncable` -even if just to throw `UnsupportedOperationException`
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
index c8cf197..35193fa 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
@@ -233,8 +233,8 @@ public class ContractTestUtils extends Assert {
   public static void verifyFileContents(FileSystem fs,
                                         Path path,
                                         byte[] original) throws IOException {
-    assertIsFile(fs, path);
     FileStatus stat = fs.getFileStatus(path);
+    assertIsFile(path, stat);
     String statText = stat.toString();
     assertEquals("wrong length " + statText, original.length, stat.getLen());
     byte[] bytes = readDataset(fs, path, original.length);
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
index 9a1a940..4f06981 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
@@ -25,6 +25,7 @@ import java.time.Instant;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Locale;
+import java.util.StringJoiner;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -38,7 +39,6 @@ import com.amazonaws.services.s3.model.PutObjectRequest;
 import com.amazonaws.services.s3.model.PutObjectResult;
 import com.amazonaws.services.s3.model.UploadPartRequest;
 
-import org.apache.hadoop.fs.Syncable;
 import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
 import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures;
 import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture;
@@ -49,11 +49,14 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.Abortable;
 import org.apache.hadoop.fs.PathIOException;
 import org.apache.hadoop.fs.StreamCapabilities;
+import org.apache.hadoop.fs.Syncable;
 import org.apache.hadoop.fs.s3a.commit.CommitConstants;
 import org.apache.hadoop.fs.s3a.commit.PutTracker;
 import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics;
+import org.apache.hadoop.fs.statistics.DurationTracker;
 import org.apache.hadoop.fs.statistics.IOStatistics;
 import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
 import org.apache.hadoop.fs.statistics.IOStatisticsSource;
@@ -61,7 +64,9 @@ import org.apache.hadoop.util.Progressable;
 
 import static org.apache.hadoop.fs.s3a.S3AUtils.*;
 import static org.apache.hadoop.fs.s3a.Statistic.*;
+import static org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext.EMPTY_BLOCK_OUTPUT_STREAM_STATISTICS;
 import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.emptyStatistics;
+import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation;
 import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
 
 /**
@@ -79,7 +84,7 @@ import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 class S3ABlockOutputStream extends OutputStream implements
-    StreamCapabilities, IOStatisticsSource, Syncable {
+    StreamCapabilities, IOStatisticsSource, Syncable, Abortable {
 
   private static final Logger LOG =
       LoggerFactory.getLogger(S3ABlockOutputStream.class);
@@ -171,7 +176,9 @@ class S3ABlockOutputStream extends OutputStream implements
     this.key = key;
     this.blockFactory = blockFactory;
     this.blockSize = (int) blockSize;
-    this.statistics = statistics;
+    this.statistics = statistics != null
+        ? statistics
+        : EMPTY_BLOCK_OUTPUT_STREAM_STATISTICS;
     // test instantiations may not provide statistics;
     this.iostatistics = statistics != null
         ? statistics.getIOStatistics()
@@ -421,22 +428,110 @@ class S3ABlockOutputStream extends OutputStream implements
       // if this happened during a multipart upload, abort the
       // operation, so as to not leave (billable) data
       // pending on the bucket
-      if (multiPartUpload != null) {
-        multiPartUpload.abort();
-      }
+      maybeAbortMultipart();
       writeOperationHelper.writeFailed(ioe);
       throw ioe;
     } finally {
-      cleanupWithLogger(LOG, block, blockFactory);
-      LOG.debug("Statistics: {}", statistics);
-      cleanupWithLogger(LOG, statistics);
-      clearActiveBlock();
+      cleanupOnClose();
     }
     // Note end of write. This does not change the state of the remote FS.
     writeOperationHelper.writeSuccessful(bytes);
   }
 
   /**
+   * Final operations in close/abort of stream.
+   * Shuts down block factory, closes any active block,
+   * and pushes out statistics.
+   */
+  private synchronized void cleanupOnClose() {
+    cleanupWithLogger(LOG, getActiveBlock(), blockFactory);
+    LOG.debug("Statistics: {}", statistics);
+    cleanupWithLogger(LOG, statistics);
+    clearActiveBlock();
+  }
+
+  /**
+   * Best effort abort of the multipart upload; sets
+   * the field to null afterwards.
+   * @return any exception caught during the operation.
+   */
+  private synchronized IOException maybeAbortMultipart() {
+    if (multiPartUpload != null) {
+      final IOException ioe = multiPartUpload.abort();
+      multiPartUpload = null;
+      return ioe;
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * Abort any active uploads, enter closed state.
+   * @return the outcome
+   */
+  @Override
+  public AbortableResult abort() {
+    if (closed.getAndSet(true)) {
+      // already closed
+      LOG.debug("Ignoring abort() as stream is already closed");
+      return new AbortableResultImpl(true, null);
+    }
+    try (DurationTracker d =
+             statistics.trackDuration(INVOCATION_ABORT.getSymbol())) {
+      return new AbortableResultImpl(false, maybeAbortMultipart());
+    } finally {
+      cleanupOnClose();
+    }
+  }
+
+  /**
+   * Abortable result.
+   */
+  private static final class AbortableResultImpl implements AbortableResult {
+
+    /**
+     * Had the stream already been closed/aborted?
+     */
+    private final boolean alreadyClosed;
+
+    /**
+     * Was any exception raised during non-essential
+     * cleanup actions (i.e. MPU abort)?
+     */
+    private final IOException anyCleanupException;
+
+    /**
+     * Constructor.
+     * @param alreadyClosed Had the stream already been closed/aborted?
+     * @param anyCleanupException Was any exception raised during cleanup?
+     */
+    private AbortableResultImpl(final boolean alreadyClosed,
+        final IOException anyCleanupException) {
+      this.alreadyClosed = alreadyClosed;
+      this.anyCleanupException = anyCleanupException;
+    }
+
+    @Override
+    public boolean alreadyClosed() {
+      return alreadyClosed;
+    }
+
+    @Override
+    public IOException anyCleanupException() {
+      return anyCleanupException;
+    }
+
+    @Override
+    public String toString() {
+      return new StringJoiner(", ",
+          AbortableResultImpl.class.getSimpleName() + "[", "]")
+          .add("alreadyClosed=" + alreadyClosed)
+          .add("anyCleanupException=" + anyCleanupException)
+          .toString();
+    }
+  }
+
+  /**
    * Upload the current block as a single PUT request; if the buffer
    * is empty a 0-byte PUT will be invoked, as it is needed to create an
    * entry at the far end.
@@ -548,6 +643,10 @@ class S3ABlockOutputStream extends OutputStream implements
     case StreamCapabilities.IOSTATISTICS:
       return true;
 
+      // S3A supports abort.
+    case StreamCapabilities.ABORTABLE_STREAM:
+      return true;
+
     default:
       return false;
     }
@@ -730,7 +829,7 @@ class S3ABlockOutputStream extends OutputStream implements
         //abort multipartupload
         this.abort();
         throw extractException("Multi-part upload with id '" + uploadId
-                + "' to " + key, key, ee);
+            + "' to " + key, key, ee);
       }
     }
 
@@ -756,35 +855,43 @@ class S3ABlockOutputStream extends OutputStream implements
       maybeRethrowUploadFailure();
       AtomicInteger errorCount = new AtomicInteger(0);
       try {
-        writeOperationHelper.completeMPUwithRetries(key,
-            uploadId,
-            partETags,
-            bytesSubmitted,
-            errorCount);
+        trackDurationOfInvocation(statistics,
+            MULTIPART_UPLOAD_COMPLETED.getSymbol(), () -> {
+              writeOperationHelper.completeMPUwithRetries(key,
+                  uploadId,
+                  partETags,
+                  bytesSubmitted,
+                  errorCount);
+            });
       } finally {
         statistics.exceptionInMultipartComplete(errorCount.get());
       }
     }
 
     /**
-     * Abort a multi-part upload. Retries are attempted on failures.
+     * Abort a multi-part upload. Retries are not attempted on failures.
      * IOExceptions are caught; this is expected to be run as a cleanup process.
+     * @return any caught exception.
      */
-    public void abort() {
+    private IOException abort() {
       LOG.debug("Aborting upload");
-      fs.incrementStatistic(OBJECT_MULTIPART_UPLOAD_ABORTED);
-      cancelAllActiveFutures();
       try {
-        writeOperationHelper.abortMultipartUpload(key, uploadId,
-            (text, e, r, i) -> statistics.exceptionInMultipartAbort());
+        trackDurationOfInvocation(statistics,
+            OBJECT_MULTIPART_UPLOAD_ABORTED.getSymbol(), () -> {
+              cancelAllActiveFutures();
+              writeOperationHelper.abortMultipartUpload(key, uploadId,
+                  false, null);
+            });
+        return null;
       } catch (IOException e) {
         // this point is only reached if the operation failed more than
         // the allowed retry count
         LOG.warn("Unable to abort multipart upload,"
             + " you may need to purge uploaded parts", e);
+        statistics.exceptionInMultipartAbort();
+        return e;
       }
     }
-
   }
 
   /**
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index e6fb8c0..4be35de 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -4724,6 +4724,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
       return getConf().getBoolean(ETAG_CHECKSUM_ENABLED,
           ETAG_CHECKSUM_ENABLED_DEFAULT);
 
+    case CommonPathCapabilities.ABORTABLE_STREAM:
     case CommonPathCapabilities.FS_MULTIPART_UPLOADER:
       return true;
 
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
index 5fcc157..dd28f3e 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
@@ -1350,7 +1350,11 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
           .withGauges(
               STREAM_WRITE_BLOCK_UPLOADS_PENDING.getSymbol(),
               STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING.getSymbol())
-          .withDurationTracking(ACTION_EXECUTOR_ACQUIRED)
+          .withDurationTracking(
+              ACTION_EXECUTOR_ACQUIRED,
+              INVOCATION_ABORT.getSymbol(),
+              OBJECT_MULTIPART_UPLOAD_ABORTED.getSymbol(),
+              MULTIPART_UPLOAD_COMPLETED.getSymbol())
           .build();
       setIOStatistics(st);
       // these are extracted to avoid lookups on heavily used counters.
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
index f5d6053..1a53f0d 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
@@ -88,6 +88,11 @@ public enum Statistic {
       TYPE_COUNTER),
   IGNORED_ERRORS("ignored_errors", "Errors caught and ignored",
       TYPE_COUNTER),
+
+  INVOCATION_ABORT(
+      StoreStatisticNames.OP_ABORT,
+      "Calls of abort()",
+      TYPE_DURATION),
   INVOCATION_COPY_FROM_LOCAL_FILE(
       StoreStatisticNames.OP_COPY_FROM_LOCAL_FILE,
       "Calls of copyFromLocalFile()",
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java
index 49a5eb2..9bdf61c 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java
@@ -336,21 +336,32 @@ public class WriteOperationHelper implements WriteOperations {
    * Abort a multipart upload operation.
    * @param destKey destination key of the upload
    * @param uploadId multipart operation Id
+   * @param shouldRetry should failures trigger a retry?
    * @param retrying callback invoked on every retry
    * @throws IOException failure to abort
    * @throws FileNotFoundException if the abort ID is unknown
    */
   @Retries.RetryTranslated
   public void abortMultipartUpload(String destKey, String uploadId,
-      Retried retrying)
+      boolean shouldRetry, Retried retrying)
       throws IOException {
-    invoker.retry("Aborting multipart upload ID " + uploadId,
-        destKey,
-        true,
-        retrying,
-        () -> owner.abortMultipartUpload(
-            destKey,
-            uploadId));
+    if (shouldRetry) {
+      // retrying option
+      invoker.retry("Aborting multipart upload ID " + uploadId,
+          destKey,
+          true,
+          retrying,
+          () -> owner.abortMultipartUpload(
+              destKey,
+              uploadId));
+    } else {
+      // single pass attempt.
+      once("Aborting multipart upload ID " + uploadId,
+          destKey,
+          () -> owner.abortMultipartUpload(
+              destKey,
+              uploadId));
+    }
   }
 
   /**
@@ -401,7 +412,7 @@ public class WriteOperationHelper implements WriteOperations {
   @Retries.RetryTranslated
   public void abortMultipartCommit(String destKey, String uploadId)
       throws IOException {
-    abortMultipartUpload(destKey, uploadId, invoker.getRetryCallback());
+    abortMultipartUpload(destKey, uploadId, true, invoker.getRetryCallback());
   }
 
   /**
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java
index 2636ed7..09b9cc9 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java
@@ -154,13 +154,14 @@ public interface WriteOperations {
    * Abort a multipart upload operation.
    * @param destKey destination key of the upload
    * @param uploadId multipart operation Id
+   * @param shouldRetry should failures trigger a retry?
    * @param retrying callback invoked on every retry
    * @throws IOException failure to abort
    * @throws FileNotFoundException if the abort ID is unknown
    */
   @Retries.RetryTranslated
   void abortMultipartUpload(String destKey, String uploadId,
-      Invoker.Retried retrying)
+      boolean shouldRetry, Invoker.Retried retrying)
       throws IOException;
 
   /**
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java
index 7096d53..2acae76 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java
@@ -1622,7 +1622,7 @@ public abstract class S3GuardTool extends Configured implements Tool,
         if (mode == Mode.ABORT) {
           getFilesystem().getWriteOperationHelper()
               .abortMultipartUpload(upload.getKey(), upload.getUploadId(),
-                  LOG_EVENT);
+                  true, LOG_EVENT);
         }
       }
       if (mode != Mode.EXPECT || verbose) {
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputArray.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputArray.java
index 88e0cef..53fa0d8 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputArray.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputArray.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.fs.s3a;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics;
@@ -32,7 +33,10 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
 
+import static org.apache.hadoop.fs.StreamCapabilities.ABORTABLE_STREAM;
 import static org.apache.hadoop.fs.s3a.Constants.*;
+import static org.apache.hadoop.fs.s3a.test.ExtraAssertions.assertCompleteAbort;
+import static org.apache.hadoop.fs.s3a.test.ExtraAssertions.assertNoopAbort;
 
 /**
  * Tests small file upload functionality for
@@ -155,4 +159,51 @@ public class ITestS3ABlockOutputArray extends AbstractS3ATestBase {
     markAndResetDatablock(createFactory(getFileSystem()));
   }
 
+  @Test
+  public void testAbortAfterWrite() throws Throwable {
+    describe("Verify abort after a write does not create a file");
+    Path dest = path(getMethodName());
+    FileSystem fs = getFileSystem();
+    ContractTestUtils.assertHasPathCapabilities(fs, dest, ABORTABLE_STREAM);
+    FSDataOutputStream stream = fs.create(dest, true);
+    byte[] data = ContractTestUtils.dataset(16, 'a', 26);
+    try {
+      ContractTestUtils.assertCapabilities(stream,
+          new String[]{ABORTABLE_STREAM},
+          null);
+      stream.write(data);
+      assertCompleteAbort(stream.abort());
+      // second attempt is harmless
+      assertNoopAbort(stream.abort());
+
+      // the path should not exist
+      ContractTestUtils.assertPathsDoNotExist(fs, "aborted file", dest);
+    } finally {
+      IOUtils.closeStream(stream);
+      // check the path doesn't exist "after" closing stream
+      ContractTestUtils.assertPathsDoNotExist(fs, "aborted file", dest);
+    }
+    // and it can be called on the stream after being closed.
+    assertNoopAbort(stream.abort());
+  }
+
+  /**
+   * A stream which was abort()ed after being close()d for a
+   * successful write will return indicating nothing happened.
+   */
+  @Test
+  public void testAbortAfterCloseIsHarmless() throws Throwable {
+    describe("Verify abort on a closed stream is harmless "
+        + "and that the result indicates that nothing happened");
+    Path dest = path(getMethodName());
+    FileSystem fs = getFileSystem();
+    byte[] data = ContractTestUtils.dataset(16, 'a', 26);
+    try (FSDataOutputStream stream = fs.create(dest, true)) {
+      stream.write(data);
+      assertCompleteAbort(stream.abort());
+      stream.close();
+      assertNoopAbort(stream.abort());
+    }
+  }
+
 }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MultipartTestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MultipartTestUtils.java
index 8be3ff7..8618242 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MultipartTestUtils.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MultipartTestUtils.java
@@ -88,7 +88,7 @@ public final class MultipartTestUtils {
       while (uploads.hasNext()) {
         MultipartUpload upload = uploads.next();
         fs.getWriteOperationHelper().abortMultipartUpload(upload.getKey(),
-            upload.getUploadId(), LOG_EVENT);
+            upload.getUploadId(), true, LOG_EVENT);
         LOG.debug("Cleaning up upload: {} {}", upload.getKey(),
             truncatedUploadId(upload.getUploadId()));
       }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockOutputStream.java
index 284718b..baa4a54 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockOutputStream.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockOutputStream.java
@@ -82,4 +82,30 @@ public class TestS3ABlockOutputStream extends AbstractS3AMockTest {
         () -> woh.newUploadPartRequest(key,
             "uploadId", 50000, 1024, inputStream, null, 0L));
   }
+
+  static class StreamClosedException extends IOException {}
+
+  @Test
+  public void testStreamClosedAfterAbort() throws Exception {
+    stream.abort();
+
+    // This verification replaces testing various operations after calling
+    // abort: after calling abort, stream is closed like calling close().
+    intercept(IOException.class, () -> stream.checkOpen());
+
+    // check that calling write() will call checkOpen() and throws exception
+    doThrow(new StreamClosedException()).when(stream).checkOpen();
+
+    intercept(StreamClosedException.class,
+        () -> stream.write(new byte[] {'a', 'b', 'c'}));
+  }
+
+  @Test
+  public void testCallingCloseAfterCallingAbort() throws Exception {
+    stream.abort();
+
+    // This shouldn't throw IOException like calling close() multiple times.
+    // This will ensure abort() can be called with try-with-resource.
+    stream.close();
+  }
 }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AMultipartUploadSizeLimits.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AMultipartUploadSizeLimits.java
index 4a348be..231cfd8 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AMultipartUploadSizeLimits.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AMultipartUploadSizeLimits.java
@@ -25,19 +25,33 @@ import org.junit.Test;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.statistics.IOStatistics;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
 import org.apache.hadoop.fs.s3a.S3AInstrumentation;
 import org.apache.hadoop.fs.s3a.Statistic;
 import org.apache.hadoop.fs.s3a.auth.ProgressCounter;
 import org.apache.hadoop.fs.s3a.commit.CommitOperations;
 
+import static org.apache.hadoop.fs.StreamCapabilities.ABORTABLE_STREAM;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.verifyFileContents;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile;
 import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static org.apache.hadoop.fs.s3a.Statistic.INVOCATION_ABORT;
+import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_MULTIPART_UPLOAD_ABORTED;
 import static org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT;
+import static org.apache.hadoop.fs.s3a.test.ExtraAssertions.assertCompleteAbort;
+import static org.apache.hadoop.fs.s3a.test.ExtraAssertions.assertNoopAbort;
+import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
+import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
+import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 
 /**
@@ -118,4 +132,83 @@ public class ITestS3AMultipartUploadSizeLimits extends S3AScaleTestBase {
         describedAs("commit abort count")
         .isEqualTo(initial + 1);
   }
+
+  @Test
+  public void testAbortAfterTwoPartUpload() throws Throwable {
+    Path file = path(getMethodName());
+
+    byte[] data = dataset(6 * _1MB, 'a', 'z' - 'a');
+
+    S3AFileSystem fs = getFileSystem();
+    FSDataOutputStream stream = fs.create(file, true);
+    try {
+      stream.write(data);
+
+      // From testTwoPartUpload() we know closing stream will finalize uploads
+      // and materialize the path. Here we call abort() to abort the upload,
+      // and ensure the path is NOT available. (uploads are aborted)
+
+      assertCompleteAbort(stream.abort());
+
+      // the path should not exist
+      assertPathDoesNotExist("upload must not have completed", file);
+    } finally {
+      IOUtils.closeStream(stream);
+      // check the path doesn't exist "after" closing stream
+      assertPathDoesNotExist("upload must not have completed", file);
+    }
+    verifyStreamWasAborted(fs, stream);
+    // a second abort is a no-op
+    assertNoopAbort(stream.abort());
+  }
+
+
+  @Test
+  public void testAbortWhenOverwritingAFile() throws Throwable {
+    Path file = path(getMethodName());
+
+    S3AFileSystem fs = getFileSystem();
+    // write the original data
+    byte[] smallData = writeTextFile(fs, file, "original", true);
+
+    // now attempt a multipart upload
+    byte[] data = dataset(6 * _1MB, 'a', 'z' - 'a');
+    FSDataOutputStream stream = fs.create(file, true);
+    try {
+      ContractTestUtils.assertCapabilities(stream,
+          new String[]{ABORTABLE_STREAM},
+          null);
+      stream.write(data);
+      assertCompleteAbort(stream.abort());
+
+      verifyFileContents(fs, file, smallData);
+    } finally {
+      IOUtils.closeStream(stream);
+    }
+    verifyFileContents(fs, file, smallData);
+    verifyStreamWasAborted(fs, stream);
+  }
+
+  /**
+   * Check up on the IOStatistics of the FS and stream to verify that
+   * a stream was aborted -both in invocations of abort() and
+   * that the multipart upload itself was aborted.
+   * @param fs filesystem
+   * @param stream stream
+   */
+  private void verifyStreamWasAborted(final S3AFileSystem fs,
+      final FSDataOutputStream stream) {
+    // check the stream
+    final IOStatistics iostats = stream.getIOStatistics();
+    final String sstr = ioStatisticsToPrettyString(iostats);
+    LOG.info("IOStatistics for stream: {}", sstr);
+    verifyStatisticCounterValue(iostats, INVOCATION_ABORT.getSymbol(), 1);
+    verifyStatisticCounterValue(iostats,
+        OBJECT_MULTIPART_UPLOAD_ABORTED.getSymbol(), 1);
+
+    // now the FS.
+    final IOStatistics fsIostats = fs.getIOStatistics();
+    assertThatStatisticCounter(fsIostats, INVOCATION_ABORT.getSymbol())
+        .isGreaterThanOrEqualTo(1);
+  }
 }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/test/ExtraAssertions.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/test/ExtraAssertions.java
index 28b3432..77c7736 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/test/ExtraAssertions.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/test/ExtraAssertions.java
@@ -23,11 +23,13 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import org.assertj.core.api.Assertions;
 import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.Abortable;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
@@ -149,4 +151,31 @@ public final class ExtraAssertions {
       throw e;
     }
   }
+
+
+  /**
+   * Assert that an abort was completely successful in that it
+   * was not a no-op and no exception was raised during
+   * cleanup.
+   * @param result result to assert over
+   */
+  public static void assertCompleteAbort(
+      Abortable.AbortableResult result) {
+    Assertions.assertThat(result)
+        .describedAs("Abort operation result %s", result)
+        .matches(r -> !r.alreadyClosed())
+        .matches(r -> r.anyCleanupException() == null);
+  }
+
+  /**
+   * Assert that an abort was a no-op as the
+   * stream had already closed/aborted.
+   * @param result result to assert over
+   */
+  public static void assertNoopAbort(
+      Abortable.AbortableResult result) {
+    Assertions.assertThat(result)
+        .describedAs("Abort operation result %s", result)
+        .matches(r -> r.alreadyClosed());
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org