You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2021/04/24 17:33:08 UTC

[hadoop] branch branch-3.3 updated: HADOOP-17597. Optionally downgrade on S3A Syncable calls (#2801)

This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new fb71e6c  HADOOP-17597. Optionally downgrade on S3A Syncable calls (#2801)
fb71e6c is described below

commit fb71e6c91ed88f26244d50abeb0fcf1c843e2a2a
Author: Steve Loughran <st...@cloudera.com>
AuthorDate: Fri Apr 23 18:44:41 2021 +0100

    HADOOP-17597. Optionally downgrade on S3A Syncable calls (#2801)
    
    Followup to HADOOP-13327, which changed S3A output stream hsync/hflush calls
    to raise an exception.
    
    Adds a new option fs.s3a.downgrade.syncable.exceptions
    
    When true, calls to Syncable hsync/hflush on S3A output streams will
    log once at warn (for entire process life, not just the stream), then
    increment IOStats with the relevant operation counter
    
    With the downgrade option false (default)
    * IOStats are incremented
    * The UnsupportedOperationException current raised includes a link to the
      JIRA.
    
    Contributed by Steve Loughran.
    
    Change-Id: I967e077eda1d1a1a3795b4d22e003fe7997b6679
---
 .../hadoop/fs/statistics/StoreStatisticNames.java  |   6 +
 .../java/org/apache/hadoop/fs/s3a/Constants.java   |  22 +-
 .../apache/hadoop/fs/s3a/S3ABlockOutputStream.java | 258 +++++++++++++++++----
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java    |  27 ++-
 .../apache/hadoop/fs/s3a/S3AInstrumentation.java   |  14 +-
 .../java/org/apache/hadoop/fs/s3a/Statistic.java   |   8 +
 .../apache/hadoop/fs/s3a/WriteOperationHelper.java |   5 +
 .../org/apache/hadoop/fs/s3a/WriteOperations.java  |   6 +
 .../statistics/BlockOutputStreamStatistics.java    |  10 +
 .../statistics/impl/EmptyS3AStatisticsContext.java |   8 +
 .../tools/hadoop-aws/troubleshooting_s3a.md        |  37 ++-
 .../hadoop/fs/s3a/ITestDowngradeSyncable.java      | 114 +++++++++
 .../hadoop/fs/s3a/TestS3ABlockOutputStream.java    |  55 ++++-
 13 files changed, 499 insertions(+), 71 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java
index b6b08fe..9514439 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java
@@ -85,6 +85,12 @@ public final class StoreStatisticNames {
   public static final String OP_IS_FILE = "op_is_file";
 
   /** {@value}. */
+  public static final String OP_HFLUSH = "op_hflush";
+
+  /** {@value}. */
+  public static final String OP_HSYNC = "op_hsync";
+
+  /** {@value}. */
   public static final String OP_IS_DIRECTORY = "op_is_directory";
 
   /** {@value}. */
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
index c4b8f6e..f6900cb 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
@@ -329,7 +329,6 @@ public final class Constants {
    * Default is {@link #FAST_UPLOAD_BUFFER_DISK}
    * Value: {@value}
    */
-  @InterfaceStability.Unstable
   public static final String FAST_UPLOAD_BUFFER =
       "fs.s3a.fast.upload.buffer";
 
@@ -338,26 +337,22 @@ public final class Constants {
    * Capacity is limited to available disk space.
    */
 
-  @InterfaceStability.Unstable
   public static final String FAST_UPLOAD_BUFFER_DISK = "disk";
 
   /**
    * Use an in-memory array. Fast but will run of heap rapidly: {@value}.
    */
-  @InterfaceStability.Unstable
   public static final String FAST_UPLOAD_BUFFER_ARRAY = "array";
 
   /**
    * Use a byte buffer. May be more memory efficient than the
    * {@link #FAST_UPLOAD_BUFFER_ARRAY}: {@value}.
    */
-  @InterfaceStability.Unstable
   public static final String FAST_UPLOAD_BYTEBUFFER = "bytebuffer";
 
   /**
    * Default buffer option: {@value}.
    */
-  @InterfaceStability.Unstable
   public static final String DEFAULT_FAST_UPLOAD_BUFFER =
       FAST_UPLOAD_BUFFER_DISK;
 
@@ -370,7 +365,6 @@ public final class Constants {
    * <p>
    * Default is {@link #DEFAULT_FAST_UPLOAD_ACTIVE_BLOCKS}
    */
-  @InterfaceStability.Unstable
   public static final String FAST_UPLOAD_ACTIVE_BLOCKS =
       "fs.s3a.fast.upload.active.blocks";
 
@@ -378,10 +372,24 @@ public final class Constants {
    * Limit of queued block upload operations before writes
    * block. Value: {@value}
    */
-  @InterfaceStability.Unstable
   public static final int DEFAULT_FAST_UPLOAD_ACTIVE_BLOCKS = 4;
 
   /**
+   * Rather than raise an exception when an attempt is
+   * made to call the Syncable APIs, warn and downgrade.
+   * Value: {@value}.
+   */
+  public static final String DOWNGRADE_SYNCABLE_EXCEPTIONS =
+      "fs.s3a.downgrade.syncable.exceptions";
+
+  /**
+   * Default value for syncable invocation.
+   * Value: {@value}.
+   */
+  public static final boolean DOWNGRADE_SYNCABLE_EXCEPTIONS_DEFAULT =
+      false;
+
+  /**
    * The capacity of executor queues for operations other than block
    * upload, where {@link #FAST_UPLOAD_ACTIVE_BLOCKS} is used instead.
    * This should be less than {@link #MAX_THREADS} for fair
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
index 4f06981..65b9535 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.fs.StreamCapabilities;
 import org.apache.hadoop.fs.Syncable;
 import org.apache.hadoop.fs.s3a.commit.CommitConstants;
 import org.apache.hadoop.fs.s3a.commit.PutTracker;
+import org.apache.hadoop.fs.s3a.impl.LogExactlyOnce;
 import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics;
 import org.apache.hadoop.fs.statistics.DurationTracker;
 import org.apache.hadoop.fs.statistics.IOStatistics;
@@ -62,10 +63,10 @@ import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
 import org.apache.hadoop.fs.statistics.IOStatisticsSource;
 import org.apache.hadoop.util.Progressable;
 
+import static java.util.Objects.requireNonNull;
 import static org.apache.hadoop.fs.s3a.S3AUtils.*;
 import static org.apache.hadoop.fs.s3a.Statistic.*;
 import static org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext.EMPTY_BLOCK_OUTPUT_STREAM_STATISTICS;
-import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.emptyStatistics;
 import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation;
 import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
 
@@ -89,10 +90,8 @@ class S3ABlockOutputStream extends OutputStream implements
   private static final Logger LOG =
       LoggerFactory.getLogger(S3ABlockOutputStream.class);
 
-  private static final String E_NOT_SYNCABLE = "S3A streams are not Syncable";
-
-  /** Owner FileSystem. */
-  private final S3AFileSystem fs;
+  private static final String E_NOT_SYNCABLE =
+      "S3A streams are not Syncable. See HADOOP-17597.";
 
   /** Object being uploaded. */
   private final String key;
@@ -136,62 +135,48 @@ class S3ABlockOutputStream extends OutputStream implements
   /**
    * Write operation helper; encapsulation of the filesystem operations.
    */
-  private final WriteOperationHelper writeOperationHelper;
+  private final WriteOperations writeOperationHelper;
 
   /**
    * Track multipart put operation.
    */
   private final PutTracker putTracker;
 
+  /** Should Syncable calls be downgraded? */
+  private final boolean downgradeSyncableExceptions;
+
+  /**
+   * Downagraded syncable API calls are only logged at warn
+   * once across the entire process.
+   */
+  private static final LogExactlyOnce WARN_ON_SYNCABLE =
+      new LogExactlyOnce(LOG);
+
   /**
    * An S3A output stream which uploads partitions in a separate pool of
    * threads; different {@link S3ADataBlocks.BlockFactory}
    * instances can control where data is buffered.
-   *
-   * @param fs S3AFilesystem
-   * @param key S3 object to work on.
-   * @param executorService the executor service to use to schedule work
-   * @param progress report progress in order to prevent timeouts. If
-   * this object implements {@code ProgressListener} then it will be
-   * directly wired up to the AWS client, so receive detailed progress
-   * information.
-   * @param blockSize size of a single block.
-   * @param blockFactory factory for creating stream destinations
-   * @param statistics stats for this stream
-   * @param writeOperationHelper state of the write operation.
-   * @param putTracker put tracking for commit support
    * @throws IOException on any problem
    */
-  S3ABlockOutputStream(S3AFileSystem fs,
-      String key,
-      ExecutorService executorService,
-      Progressable progress,
-      long blockSize,
-      S3ADataBlocks.BlockFactory blockFactory,
-      BlockOutputStreamStatistics statistics,
-      WriteOperationHelper writeOperationHelper,
-      PutTracker putTracker)
+  S3ABlockOutputStream(BlockOutputStreamBuilder builder)
       throws IOException {
-    this.fs = fs;
-    this.key = key;
-    this.blockFactory = blockFactory;
-    this.blockSize = (int) blockSize;
-    this.statistics = statistics != null
-        ? statistics
-        : EMPTY_BLOCK_OUTPUT_STREAM_STATISTICS;
+    builder.validate();
+    this.key = builder.key;
+    this.blockFactory = builder.blockFactory;
+    this.blockSize = (int) builder.blockSize;
+    this.statistics = builder.statistics;
     // test instantiations may not provide statistics;
-    this.iostatistics = statistics != null
-        ? statistics.getIOStatistics()
-        : emptyStatistics();
-    this.writeOperationHelper = writeOperationHelper;
-    this.putTracker = putTracker;
-    Preconditions.checkArgument(blockSize >= Constants.MULTIPART_MIN_SIZE,
-        "Block size is too small: %d", blockSize);
-    this.executorService = MoreExecutors.listeningDecorator(executorService);
+    this.iostatistics = statistics.getIOStatistics();
+    this.writeOperationHelper = builder.writeOperations;
+    this.putTracker = builder.putTracker;
+    this.executorService = MoreExecutors.listeningDecorator(
+        builder.executorService);
     this.multiPartUpload = null;
+    final Progressable progress = builder.progress;
     this.progressListener = (progress instanceof ProgressListener) ?
         (ProgressListener) progress
         : new ProgressableListener(progress);
+    downgradeSyncableExceptions = builder.downgradeSyncableExceptions;
     // create that first block. This guarantees that an open + close sequence
     // writes a 0-byte entry.
     createBlockIfNeeded();
@@ -597,7 +582,7 @@ class S3ABlockOutputStream extends OutputStream implements
   }
 
   private void incrementWriteOperations() {
-    fs.incrementWriteOperations();
+    writeOperationHelper.incrementWriteOperations();
   }
 
   /**
@@ -654,12 +639,31 @@ class S3ABlockOutputStream extends OutputStream implements
 
   @Override
   public void hflush() throws IOException {
-    throw new UnsupportedOperationException(E_NOT_SYNCABLE);
+    statistics.hflushInvoked();
+    handleSyncableInvocation();
   }
 
   @Override
   public void hsync() throws IOException {
-    throw new UnsupportedOperationException(E_NOT_SYNCABLE);
+    statistics.hsyncInvoked();
+    handleSyncableInvocation();
+  }
+
+  /**
+   * Shared processing of Syncable operation reporting/downgrade.
+   */
+  private void handleSyncableInvocation() {
+    final UnsupportedOperationException ex
+        = new UnsupportedOperationException(E_NOT_SYNCABLE);
+    if (!downgradeSyncableExceptions) {
+      throw ex;
+    }
+    // downgrading.
+    WARN_ON_SYNCABLE.warn("Application invoked the Syncable API against"
+        + " stream writing to {}. This is unsupported",
+        key);
+    // and log at debug
+    LOG.debug("Downgrading Syncable call", ex);
   }
 
   @Override
@@ -982,4 +986,166 @@ class S3ABlockOutputStream extends OutputStream implements
     }
   }
 
+  /**
+   * Create a builder.
+   * @return
+   */
+  public static BlockOutputStreamBuilder builder() {
+    return new BlockOutputStreamBuilder();
+  }
+
+  /**
+   * Builder class for constructing an output stream.
+   */
+  public static final class BlockOutputStreamBuilder {
+
+    /** S3 object to work on. */
+    private String key;
+
+    /** The executor service to use to schedule work. */
+    private ExecutorService executorService;
+
+    /**
+     * Report progress in order to prevent timeouts.
+     * this object implements {@code ProgressListener} then it will be
+     * directly wired up to the AWS client, so receive detailed progress
+     * information.
+     */
+    private Progressable progress;
+
+    /** The size of a single block. */
+    private long blockSize;
+
+    /** The factory for creating stream destinations. */
+    private S3ADataBlocks.BlockFactory blockFactory;
+
+    /** The output statistics for the stream. */
+    private BlockOutputStreamStatistics statistics =
+        EMPTY_BLOCK_OUTPUT_STREAM_STATISTICS;
+
+    /** Operations to write data. */
+    private WriteOperations writeOperations;
+
+    /** put tracking for commit support. */
+    private PutTracker putTracker;
+
+    /** Should Syncable calls be downgraded? */
+    private boolean downgradeSyncableExceptions;
+
+    private BlockOutputStreamBuilder() {
+    }
+
+    /**
+     * Validate the arguments.
+     */
+    public void validate() {
+      requireNonNull(key, "null key");
+      requireNonNull(executorService, "null executorService");
+      requireNonNull(blockFactory, "null blockFactory");
+      requireNonNull(statistics, "null statistics");
+      requireNonNull(writeOperations, "null writeOperationHelper");
+      requireNonNull(putTracker, "null putTracker");
+      Preconditions.checkArgument(blockSize >= Constants.MULTIPART_MIN_SIZE,
+          "Block size is too small: %s", blockSize);
+    }
+
+    /**
+     * Set builder value.
+     * @param value new value
+     * @return the builder
+     */
+    public BlockOutputStreamBuilder withKey(
+        final String value) {
+      key = value;
+      return this;
+    }
+
+    /**
+     * Set builder value.
+     * @param value new value
+     * @return the builder
+     */
+    public BlockOutputStreamBuilder withExecutorService(
+        final ExecutorService value) {
+      executorService = value;
+      return this;
+    }
+
+    /**
+     * Set builder value.
+     * @param value new value
+     * @return the builder
+     */
+    public BlockOutputStreamBuilder withProgress(
+        final Progressable value) {
+      progress = value;
+      return this;
+    }
+
+    /**
+     * Set builder value.
+     * @param value new value
+     * @return the builder
+     */
+    public BlockOutputStreamBuilder withBlockSize(
+        final long value) {
+      blockSize = value;
+      return this;
+    }
+
+    /**
+     * Set builder value.
+     * @param value new value
+     * @return the builder
+     */
+    public BlockOutputStreamBuilder withBlockFactory(
+        final S3ADataBlocks.BlockFactory value) {
+      blockFactory = value;
+      return this;
+    }
+
+    /**
+     * Set builder value.
+     * @param value new value
+     * @return the builder
+     */
+    public BlockOutputStreamBuilder withStatistics(
+        final BlockOutputStreamStatistics value) {
+      statistics = value;
+      return this;
+    }
+
+    /**
+     * Set builder value.
+     * @param value new value
+     * @return the builder
+     */
+    public BlockOutputStreamBuilder withWriteOperations(
+        final WriteOperationHelper value) {
+      writeOperations = value;
+      return this;
+    }
+
+    /**
+     * Set builder value.
+     * @param value new value
+     * @return the builder
+     */
+    public BlockOutputStreamBuilder withPutTracker(
+        final PutTracker value) {
+      putTracker = value;
+      return this;
+    }
+
+    /**
+     * Set builder value.
+     * @param value new value
+     * @return the builder
+     */
+    public BlockOutputStreamBuilder withDowngradeSyncableExceptions(
+        final boolean value) {
+      downgradeSyncableExceptions = value;
+      return this;
+    }
+  }
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index ca9bdf2..22a1281 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -1348,19 +1348,26 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
     String destKey = putTracker.getDestKey();
     final BlockOutputStreamStatistics outputStreamStatistics
         = statisticsContext.newOutputStreamStatistics();
-    return new FSDataOutputStream(
-        new S3ABlockOutputStream(this,
-            destKey,
+    final S3ABlockOutputStream.BlockOutputStreamBuilder builder =
+        S3ABlockOutputStream.builder()
+        .withKey(destKey)
+        .withBlockFactory(blockFactory)
+        .withBlockSize(partSize)
+        .withStatistics(outputStreamStatistics)
+        .withProgress(progress)
+        .withPutTracker(putTracker)
+        .withWriteOperations(getWriteOperationHelper())
+        .withExecutorService(
             new SemaphoredDelegatingExecutor(
                 boundedThreadPool,
                 blockOutputActiveBlocks,
-                true),
-            progress,
-            partSize,
-            blockFactory,
-            outputStreamStatistics,
-            getWriteOperationHelper(),
-            putTracker),
+                true))
+        .withDowngradeSyncableExceptions(
+            getConf().getBoolean(
+                DOWNGRADE_SYNCABLE_EXCEPTIONS,
+                DOWNGRADE_SYNCABLE_EXCEPTIONS_DEFAULT));
+    return new FSDataOutputStream(
+        new S3ABlockOutputStream(builder),
         null);
   }
 
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
index dd28f3e..169a74a 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
@@ -1346,7 +1346,9 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
               STREAM_WRITE_EXCEPTIONS_COMPLETING_UPLOADS.getSymbol(),
               STREAM_WRITE_QUEUE_DURATION.getSymbol(),
               STREAM_WRITE_TOTAL_DATA.getSymbol(),
-              STREAM_WRITE_TOTAL_TIME.getSymbol())
+              STREAM_WRITE_TOTAL_TIME.getSymbol(),
+              INVOCATION_HFLUSH.getSymbol(),
+              INVOCATION_HSYNC.getSymbol())
           .withGauges(
               STREAM_WRITE_BLOCK_UPLOADS_PENDING.getSymbol(),
               STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING.getSymbol())
@@ -1490,6 +1492,16 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
     }
 
     @Override
+    public void hflushInvoked() {
+      incCounter(INVOCATION_HFLUSH.getSymbol(), 1);
+    }
+
+    @Override
+    public void hsyncInvoked() {
+      incCounter(INVOCATION_HSYNC.getSymbol(), 1);
+    }
+
+    @Override
     public void close() {
       if (getBytesPendingUpload() > 0) {
         LOG.warn("Closing output stream statistics while data is still marked" +
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
index 1a53f0d..c613c06 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
@@ -137,6 +137,14 @@ public enum Statistic {
       StoreStatisticNames.OP_IS_FILE,
       "Calls of isFile()",
       TYPE_COUNTER),
+  INVOCATION_HFLUSH(
+      StoreStatisticNames.OP_HFLUSH,
+      "Calls of hflush()",
+      TYPE_COUNTER),
+  INVOCATION_HSYNC(
+      StoreStatisticNames.OP_HSYNC,
+      "Calls of hsync()",
+      TYPE_COUNTER),
   INVOCATION_LIST_FILES(
       StoreStatisticNames.OP_LIST_FILES,
       "Calls of listFiles()",
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java
index 9bdf61c..8b71fc3 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java
@@ -693,4 +693,9 @@ public class WriteOperationHelper implements WriteOperations {
           }
         });
   }
+
+  @Override
+  public void incrementWriteOperations() {
+    owner.incrementWriteOperations();
+  }
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java
index 09b9cc9..0a8150c 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java
@@ -338,4 +338,10 @@ public interface WriteOperations {
       SelectObjectContentRequest request,
       String action)
       throws IOException;
+
+  /**
+   * Increment the write operation counter
+   * of the filesystem.
+   */
+  void incrementWriteOperations();
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/BlockOutputStreamStatistics.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/BlockOutputStreamStatistics.java
index b1cee71..772b965 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/BlockOutputStreamStatistics.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/BlockOutputStreamStatistics.java
@@ -134,4 +134,14 @@ public interface BlockOutputStreamStatistics extends Closeable,
    * @return the value or null if no matching gauge was found.
    */
   Long lookupGaugeValue(String name);
+
+  /**
+   * Syncable.hflush() has been invoked.
+   */
+  void hflushInvoked();
+
+  /**
+   * Syncable.hsync() has been invoked.
+   */
+  void hsyncInvoked();
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java
index c8cd805..3a65102 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java
@@ -483,6 +483,14 @@ public final class EmptyS3AStatisticsContext implements S3AStatisticsContext {
     }
 
     @Override
+    public void hflushInvoked() {
+    }
+
+    @Override
+    public void hsyncInvoked() {
+    }
+
+    @Override
     public void close() throws IOException {
     }
 
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md
index 416793b..661dd2f 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md
@@ -22,7 +22,7 @@ Common problems working with S3 are
 
 1. Classpath setup
 1. Authentication
-1. S3 Inconsistency side-effects
+1. Incorrect configuration
 
 
 Troubleshooting IAM Assumed Roles is covered in its
@@ -1027,7 +1027,7 @@ at the end of a write operation. If a process terminated unexpectedly, or failed
 to call the `close()` method on an output stream, the pending data will have
 been lost.
 
-### File `flush()`, `hsync` and `hflush()` calls do not save data to S3
+### File `flush()` calls do not save data to S3
 
 Again, this is due to the fact that the data is cached locally until the
 `close()` operation. The S3A filesystem cannot be used as a store of data
@@ -1036,6 +1036,39 @@ if it is required that the data is persisted durably after every
 This includes resilient logging, HBase-style journaling
 and the like. The standard strategy here is to save to HDFS and then copy to S3.
 
+### <a name="syncable"></a> `UnsupportedOperationException` "S3A streams are not Syncable. See HADOOP-17597."
+
+The application has tried to call either the `Syncable.hsync()` or `Syncable.hflush()`
+methods on an S3A output stream. This has been rejected because the
+connector isn't saving any data at all. The `Syncable` API, especially the
+`hsync()` call, are critical for applications such as HBase to safely
+persist data.
+
+The S3A connector throws an `UnsupportedOperationException` when these API calls
+are made, because the guarantees absolutely cannot be met: nothing is being flushed
+or saved.
+
+* Applications which intend to invoke the Syncable APIs call `hasCapability("hsync")` on
+  the stream to see if they are supported.
+* Or catch and downgrade `UnsupportedOperationException`.
+
+These recommendations _apply to all filesystems_. 
+
+To downgrade the S3A connector to simply warning of the use of
+`hsync()` or `hflush()` calls, set the option
+`fs.s3a.downgrade.syncable.exceptions` to true.
+
+```xml
+<property>
+  <name>fs.s3a.downgrade.syncable.exceptions</name>
+  <value>true</value>
+</property>
+```
+
+The count of invocations of the two APIs are collected
+in the S3A filesystem Statistics/IOStatistics and so
+their use can be monitored.
+
 ### `RemoteFileChangedException` and read-during-overwrite
 
 ```
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestDowngradeSyncable.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestDowngradeSyncable.java
new file mode 100644
index 0000000..0bcb11a
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestDowngradeSyncable.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+
+import static org.apache.hadoop.fs.s3a.Constants.DOWNGRADE_SYNCABLE_EXCEPTIONS;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBucketOverrides;
+import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
+import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToString;
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_HFLUSH;
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_HSYNC;
+
+
+public class ITestDowngradeSyncable extends AbstractS3ACostTest {
+
+  protected static final Logger LOG =
+      LoggerFactory.getLogger(ITestDowngradeSyncable.class);
+
+
+  public ITestDowngradeSyncable() {
+    super(false, true, false);
+  }
+
+  @Override
+  public Configuration createConfiguration() {
+    final Configuration conf = super.createConfiguration();
+    String bucketName = getTestBucketName(conf);
+    removeBucketOverrides(bucketName, conf,
+        DOWNGRADE_SYNCABLE_EXCEPTIONS);
+    conf.setBoolean(DOWNGRADE_SYNCABLE_EXCEPTIONS, true);
+    return conf;
+  }
+
+  @Test
+  public void testHFlushDowngrade() throws Throwable {
+    describe("Verify that hflush() calls can be downgraded from fail"
+        + " to ignore; the relevant counter is updated");
+    Path path = methodPath();
+    S3AFileSystem fs = getFileSystem();
+    final IOStatistics fsIoStats = fs.getIOStatistics();
+    assertThatStatisticCounter(fsIoStats, OP_HFLUSH)
+        .isEqualTo(0);
+
+    try (FSDataOutputStream out = fs.create(path, true)) {
+      out.write('1');
+      // must succeed
+      out.hflush();
+      // stats counter records the downgrade
+      IOStatistics iostats = out.getIOStatistics();
+      LOG.info("IOStats {}", ioStatisticsToString(iostats));
+      assertThatStatisticCounter(iostats, OP_HFLUSH)
+          .isEqualTo(1);
+      assertThatStatisticCounter(iostats, OP_HSYNC)
+          .isEqualTo(0);
+    }
+    // once closed. the FS will have its stats merged.
+    assertThatStatisticCounter(fsIoStats, OP_HFLUSH)
+        .isEqualTo(1);
+  }
+
+  @Test
+  public void testHSyncDowngrade() throws Throwable {
+    describe("Verify that hsync() calls can be downgraded from fail"
+        + " to ignore; the relevant counter is updated");
+    Path path = methodPath();
+    S3AFileSystem fs = getFileSystem();
+    final IOStatistics fsIoStats = fs.getIOStatistics();
+    assertThatStatisticCounter(fsIoStats, OP_HSYNC)
+        .isEqualTo(0);
+
+    try (FSDataOutputStream out = fs.create(path, true)) {
+      out.write('1');
+      // must succeed
+      out.hsync();
+      // stats counter records the downgrade
+      IOStatistics iostats = out.getIOStatistics();
+      LOG.info("IOStats {}", ioStatisticsToString(iostats));
+      assertThatStatisticCounter(iostats, OP_HFLUSH)
+          .isEqualTo(0);
+      assertThatStatisticCounter(iostats, OP_HSYNC)
+          .isEqualTo(1);
+    }
+    // once closed. the FS will have its stats merged.
+    assertThatStatisticCounter(fsIoStats, OP_HSYNC)
+        .isEqualTo(1);
+  }
+
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockOutputStream.java
index baa4a54..de27411 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockOutputStream.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockOutputStream.java
@@ -43,8 +43,11 @@ public class TestS3ABlockOutputStream extends AbstractS3AMockTest {
 
   private S3ABlockOutputStream stream;
 
-  @Before
-  public void setUp() throws Exception {
+  /**
+   * Create an S3A Builder all mocked up from component pieces.
+   * @return stream builder.
+   */
+  private S3ABlockOutputStream.BlockOutputStreamBuilder mockS3ABuilder() {
     ExecutorService executorService = mock(ExecutorService.class);
     Progressable progressable = mock(Progressable.class);
     S3ADataBlocks.BlockFactory blockFactory =
@@ -52,11 +55,26 @@ public class TestS3ABlockOutputStream extends AbstractS3AMockTest {
     long blockSize = Constants.DEFAULT_MULTIPART_SIZE;
     WriteOperationHelper oHelper = mock(WriteOperationHelper.class);
     PutTracker putTracker = mock(PutTracker.class);
-    stream = spy(new S3ABlockOutputStream(fs, "", executorService,
-      progressable, blockSize, blockFactory, null, oHelper,
-      putTracker));
+    final S3ABlockOutputStream.BlockOutputStreamBuilder builder =
+        S3ABlockOutputStream.builder()
+            .withBlockFactory(blockFactory)
+            .withBlockSize(blockSize)
+            .withExecutorService(executorService)
+            .withKey("")
+            .withProgress(progressable)
+            .withPutTracker(putTracker)
+            .withWriteOperations(oHelper);
+    return builder;
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    final S3ABlockOutputStream.BlockOutputStreamBuilder
+        builder = mockS3ABuilder();
+    stream = spy(new S3ABlockOutputStream(builder));
   }
 
+
   @Test
   public void testFlushNoOpWhenStreamClosed() throws Exception {
     doThrow(new IOException()).when(stream).checkOpen();
@@ -108,4 +126,31 @@ public class TestS3ABlockOutputStream extends AbstractS3AMockTest {
     // This will ensure abort() can be called with try-with-resource.
     stream.close();
   }
+
+
+  /**
+   * Unless configured to downgrade, the stream will raise exceptions on
+   * Syncable API calls.
+   */
+  @Test
+  public void testSyncableUnsupported() throws Exception {
+    intercept(UnsupportedOperationException.class, () -> stream.hflush());
+    intercept(UnsupportedOperationException.class, () -> stream.hsync());
+  }
+
+  /**
+   * When configured to downgrade, the stream downgrades on
+   * Syncable API calls.
+   */
+  @Test
+  public void testSyncableDowngrade() throws Exception {
+    final S3ABlockOutputStream.BlockOutputStreamBuilder
+        builder = mockS3ABuilder();
+    builder.withDowngradeSyncableExceptions(true);
+    stream = spy(new S3ABlockOutputStream(builder));
+
+    stream.hflush();
+    stream.hsync();
+  }
+
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org