You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2019/03/13 20:37:33 UTC

[hadoop] branch trunk updated: HADOOP-15625. S3A input stream to use etags/version number to detect changed source files.

This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6fa2298  HADOOP-15625. S3A input stream to use etags/version number to detect changed source files.
6fa2298 is described below

commit 6fa229891e06eea62cb9634efde755f40247e816
Author: Ben Roling <be...@gmail.com>
AuthorDate: Wed Mar 13 20:31:13 2019 +0000

    HADOOP-15625. S3A input stream to use etags/version number to detect changed source files.
    
    Author: Ben Roling <be...@gmail.com>
    
    Initial patch from Brahma Reddy Battula.
---
 .../src/main/resources/core-default.xml            |  42 +++
 .../java/org/apache/hadoop/fs/s3a/Constants.java   |  80 +++++
 .../hadoop/fs/s3a/NoVersionAttributeException.java |  44 +++
 .../hadoop/fs/s3a/RemoteFileChangedException.java  |  49 +++
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java    |  30 +-
 .../org/apache/hadoop/fs/s3a/S3AInputStream.java   |  25 +-
 .../apache/hadoop/fs/s3a/S3AInstrumentation.java   |  14 +
 .../org/apache/hadoop/fs/s3a/S3AReadOpContext.java |  14 +
 .../org/apache/hadoop/fs/s3a/S3ARetryPolicy.java   |   7 +
 .../java/org/apache/hadoop/fs/s3a/Statistic.java   |   4 +-
 .../hadoop/fs/s3a/impl/ChangeDetectionPolicy.java  | 376 +++++++++++++++++++++
 .../apache/hadoop/fs/s3a/impl/ChangeTracker.java   | 196 +++++++++++
 .../apache/hadoop/fs/s3a/impl/LogExactlyOnce.java  |  42 +++
 .../apache/hadoop/fs/s3a/impl/package-info.java    |  30 ++
 .../src/site/markdown/tools/hadoop-aws/index.md    | 125 +++++++
 .../tools/hadoop-aws/troubleshooting_s3a.md        | 113 +++++--
 .../hadoop/fs/s3a/ITestS3AFailureHandling.java     |  67 +---
 .../hadoop/fs/s3a/ITestS3ARemoteFileChanged.java   | 197 +++++++++++
 .../org/apache/hadoop/fs/s3a/S3ATestUtils.java     |  13 +
 .../hadoop/fs/s3a/TestStreamChangeTracker.java     | 255 ++++++++++++++
 20 files changed, 1625 insertions(+), 98 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
index 1f1ab55..5cb37f0 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
+++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
@@ -1874,6 +1874,48 @@
   </description>
 </property>
 
+<property>
+  <name>fs.s3a.change.detection.source</name>
+  <value>etag</value>
+  <description>
+    Select which S3 object attribute to use for change detection.
+    Currently support 'etag' for S3 object eTags and 'versionid' for
+    S3 object version IDs.  Use of version IDs requires object versioning to be
+    enabled for each S3 bucket utilized.  Object versioning is disabled on
+    buckets by default. When version ID is used, the buckets utilized should
+    have versioning enabled before any data is written.
+  </description>
+</property>
+
+<property>
+  <name>fs.s3a.change.detection.mode</name>
+  <value>server</value>
+  <description>
+    Determines how change detection is applied to alert to S3 objects
+    rewritten while being read. Value 'server' indicates to apply the attribute
+    constraint directly on GetObject requests to S3. Value 'client' means to do a
+    client-side comparison of the attribute value returned in the response.  Value
+    'server' would not work with third-party S3 implementations that do not
+    support these constraints on GetObject. Values 'server' and 'client' generate
+    RemoteObjectChangedException when a mismatch is detected.  Value 'warn' works
+    like 'client' but generates only a warning.  Value 'none' will ignore change
+    detection completely.
+  </description>
+</property>
+
+<property>
+  <name>fs.s3a.change.detection.version.required</name>
+  <value>true</value>
+  <description>
+    Determines if S3 object version attribute defined by
+    fs.s3a.change.detection.source should be treated as required.  If true and the
+    referred attribute is unavailable in an S3 GetObject response,
+    NoVersionAttributeException is thrown.  Setting to 'true' is encouraged to
+    avoid potential for inconsistent reads with third-party S3 implementations or
+    against S3 buckets that have object versioning disabled.
+  </description>
+</property>
+
 <!-- Azure file system properties -->
 <property>
   <name>fs.AbstractFileSystem.wasb.impl</name>
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
index bdd3add..e0b1629 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
@@ -641,4 +641,84 @@ public final class Constants {
    */
   public static final boolean ETAG_CHECKSUM_ENABLED_DEFAULT = false;
 
+  /**
+   * Where to get the value to use in change detection.  E.g. eTag, or
+   * versionId?
+   */
+  public static final String CHANGE_DETECT_SOURCE
+      = "fs.s3a.change.detection.source";
+
+  /**
+   * eTag as the change detection mechanism.
+   */
+  public static final String CHANGE_DETECT_SOURCE_ETAG = "etag";
+
+  /**
+   * Object versionId as the change detection mechanism.
+   */
+  public static final String CHANGE_DETECT_SOURCE_VERSION_ID = "versionid";
+
+  /**
+   * Default change detection mechanism: eTag.
+   */
+  public static final String CHANGE_DETECT_SOURCE_DEFAULT =
+      CHANGE_DETECT_SOURCE_ETAG;
+
+  /**
+   * Mode to run change detection in.  Server side comparison?  Client side
+   * comparison? Client side compare and warn rather than exception?  Don't
+   * bother at all?
+   */
+  public static final String CHANGE_DETECT_MODE =
+      "fs.s3a.change.detection.mode";
+
+  /**
+   * Change is detected on the client side by comparing the returned id with the
+   * expected id.  A difference results in {@link RemoteFileChangedException}.
+   */
+  public static final String CHANGE_DETECT_MODE_CLIENT = "client";
+
+  /**
+   * Change is detected by passing the expected value in the GetObject request.
+   * If the expected value is unavailable, {@link RemoteFileChangedException} is
+   * thrown.
+   */
+  public static final String CHANGE_DETECT_MODE_SERVER = "server";
+
+  /**
+   * Change is detected on the client side by comparing the returned id with the
+   * expected id.  A difference results in a WARN level message being logged.
+   */
+  public static final String CHANGE_DETECT_MODE_WARN = "warn";
+
+  /**
+   * Change detection is turned off.  Readers may see inconsistent results due
+   * to concurrent writes without any exception or warning messages.  May be
+   * useful with third-party S3 API implementations that don't support one of
+   * the change detection modes.
+   */
+  public static final String CHANGE_DETECT_MODE_NONE = "none";
+
+  /**
+   * Default change detection mode: server.
+   */
+  public static final String CHANGE_DETECT_MODE_DEFAULT =
+      CHANGE_DETECT_MODE_SERVER;
+
+  /**
+   * If true, raises a {@link RemoteFileChangedException} exception when S3
+   * doesn't provide the attribute defined by fs.s3a.change.detection.source.
+   * For example, if source is versionId, but object versioning is not enabled
+   * on the bucket, or alternatively if source is eTag and a third-party S3
+   * implementation that doesn't return eTag is used.
+   * <p>
+   * When false, only a warning message will be logged for this condition.
+   */
+  public static final String CHANGE_DETECT_REQUIRE_VERSION =
+      "fs.s3a.change.detection.version.required";
+
+  /**
+   * Default change detection require version: true.
+   */
+  public static final boolean CHANGE_DETECT_REQUIRE_VERSION_DEFAULT = true;
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/NoVersionAttributeException.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/NoVersionAttributeException.java
new file mode 100644
index 0000000..8cad74a
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/NoVersionAttributeException.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.PathIOException;
+
+/**
+ * Indicates the S3 object does not provide the versioning attribute required
+ * by the configured change detection policy.
+ */
+@SuppressWarnings("serial")
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class NoVersionAttributeException extends PathIOException {
+
+  /**
+   * Constructs a NoVersionAttributeException.
+   *
+   * @param path the path accessed when the condition was detected
+   * @param message a message providing more details about the condition
+   */
+  public NoVersionAttributeException(String path,
+      String message) {
+    super(path, message);
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/RemoteFileChangedException.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/RemoteFileChangedException.java
new file mode 100644
index 0000000..cfa5935
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/RemoteFileChangedException.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.PathIOException;
+
+/**
+ * Indicates the S3 object is out of sync with the expected version.  Thrown in
+ * cases such as when the object is updated while an {@link S3AInputStream} is
+ * open.
+ */
+@SuppressWarnings("serial")
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class RemoteFileChangedException extends PathIOException {
+
+  /**
+   * Constructs a RemoteFileChangedException.
+   *
+   * @param path the path accessed when the change was detected
+   * @param operation the operation (e.g. open, re-open) performed when the
+   * change was detected
+   * @param message a message providing more details about the condition
+   */
+  public RemoteFileChangedException(String path,
+      String operation,
+      String message) {
+    super(path, message);
+    setOperation(operation);
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 031a80b..1f560d0 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -88,6 +88,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
 import org.apache.hadoop.fs.s3a.select.InternalSelectConstants;
 import org.apache.hadoop.util.LambdaUtils;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
@@ -214,6 +215,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
       createStorageStatistics();
   private long readAhead;
   private S3AInputPolicy inputPolicy;
+  private ChangeDetectionPolicy changeDetectionPolicy;
   private final AtomicBoolean closed = new AtomicBoolean(false);
   private volatile boolean isClosed = false;
   private MetadataStore metadataStore;
@@ -361,6 +363,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
       inputPolicy = S3AInputPolicy.getPolicy(
           conf.getTrimmed(INPUT_FADVISE, INPUT_FADV_NORMAL));
       LOG.debug("Input fadvise policy = {}", inputPolicy);
+      changeDetectionPolicy = ChangeDetectionPolicy.getPolicy(conf);
+      LOG.debug("Change detection policy = {}", changeDetectionPolicy);
       boolean magicCommitterEnabled = conf.getBoolean(
           CommitConstants.MAGIC_COMMITTER_ENABLED,
           CommitConstants.DEFAULT_MAGIC_COMMITTER_ENABLED);
@@ -688,6 +692,15 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
   }
 
   /**
+   * Get the change detection policy for this FS instance.
+   * @return the change detection policy
+   */
+  @VisibleForTesting
+  ChangeDetectionPolicy getChangeDetectionPolicy() {
+    return changeDetectionPolicy;
+  }
+
+  /**
    * Get the encryption algorithm of this endpoint.
    * @return the encryption algorithm.
    */
@@ -875,9 +888,18 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
       S3AInputPolicy policy = S3AInputPolicy.getPolicy(
           o.get(INPUT_FADVISE, inputPolicy.toString()));
       long readAheadRange2 = o.getLong(READAHEAD_RANGE, readAhead);
-      readContext = createReadContext(fileStatus, policy, readAheadRange2);
+      // TODO support change detection policy from options?
+      readContext = createReadContext(
+          fileStatus,
+          policy,
+          changeDetectionPolicy,
+          readAheadRange2);
     } else {
-      readContext = createReadContext(fileStatus, inputPolicy, readAhead);
+      readContext = createReadContext(
+          fileStatus,
+          inputPolicy,
+          changeDetectionPolicy,
+          readAhead);
     }
     LOG.debug("Opening '{}'", readContext);
 
@@ -900,6 +922,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
   private S3AReadOpContext createReadContext(
       final FileStatus fileStatus,
       final S3AInputPolicy seekPolicy,
+      final ChangeDetectionPolicy changePolicy,
       final long readAheadRange) {
     return new S3AReadOpContext(fileStatus.getPath(),
         hasMetadataStore(),
@@ -909,6 +932,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
         instrumentation,
         fileStatus,
         seekPolicy,
+        changePolicy,
         readAheadRange);
   }
 
@@ -3676,7 +3700,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
     long ra = options.getLong(READAHEAD_RANGE, readAhead);
     // build and execute the request
     return selectBinding.select(
-        createReadContext(fileStatus, inputPolicy, ra),
+        createReadContext(fileStatus, inputPolicy, changeDetectionPolicy, ra),
         expression,
         options,
         generateSSECustomerKey(),
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
index 35dd834..d096601 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
@@ -32,6 +32,8 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.CanSetReadahead;
 import org.apache.hadoop.fs.FSExceptionMessages;
 import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.s3a.impl.ChangeTracker;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -66,6 +68,9 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
   public static final String E_NEGATIVE_READAHEAD_VALUE
       = "Negative readahead value";
 
+  public static final String OPERATION_OPEN = "open";
+  public static final String OPERATION_REOPEN = "re-open";
+
   /**
    * This is the public position; the one set in {@link #seek(long)}
    * and returned in {@link #getPos()}.
@@ -110,6 +115,9 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
    */
   private long contentRangeStart;
 
+  /** change tracker. */
+  private final ChangeTracker changeTracker;
+
   /**
    * Create the stream.
    * This does not attempt to open it; that is only done on the first
@@ -138,6 +146,9 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
     this.serverSideEncryptionAlgorithm =
         s3Attributes.getServerSideEncryptionAlgorithm();
     this.serverSideEncryptionKey = s3Attributes.getServerSideEncryptionKey();
+    this.changeTracker = new ChangeTracker(uri,
+        ctx.getChangeDetectionPolicy(),
+        streamStatistics.getVersionMismatchCounter());
     setInputPolicy(ctx.getInputPolicy());
     setReadahead(ctx.getReadahead());
   }
@@ -182,15 +193,20 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
         StringUtils.isNotBlank(serverSideEncryptionKey)){
       request.setSSECustomerKey(new SSECustomerKey(serverSideEncryptionKey));
     }
-    String text = String.format("Failed to %s %s at %d",
-        (opencount == 0 ? "open" : "re-open"), uri, targetPos);
+    String operation = opencount == 0 ? OPERATION_OPEN : OPERATION_REOPEN;
+    String text = String.format("%s %s at %d",
+        operation, uri, targetPos);
+    changeTracker.maybeApplyConstraint(request);
     S3Object object = Invoker.once(text, uri,
         () -> client.getObject(request));
+
+    changeTracker.processResponse(object, operation,
+        targetPos);
     wrappedStream = object.getObjectContent();
     contentRangeStart = targetPos;
     if (wrappedStream == null) {
-      throw new IOException("Null IO stream from reopen of (" + reason +  ") "
-          + uri);
+      throw new PathIOException(uri,
+          "Null IO stream from " + operation + " of (" + reason +  ") ");
     }
 
     this.pos = targetPos;
@@ -670,6 +686,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
       sb.append(" contentRangeFinish=").append(contentRangeFinish);
       sb.append(" remainingInCurrentRequest=")
           .append(remainingInCurrentRequest());
+      sb.append(changeTracker);
       sb.append('\n').append(s);
       sb.append('}');
       return sb.toString();
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
index 17c5aff..812b9c5 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
@@ -161,6 +161,7 @@ public class S3AInstrumentation implements Closeable, MetricsSource {
       OBJECT_PUT_REQUESTS,
       OBJECT_PUT_REQUESTS_COMPLETED,
       OBJECT_SELECT_REQUESTS,
+      STREAM_READ_VERSION_MISMATCHES,
       STREAM_WRITE_FAILURES,
       STREAM_WRITE_BLOCK_UPLOADS,
       STREAM_WRITE_BLOCK_UPLOADS_COMMITTED,
@@ -594,6 +595,8 @@ public class S3AInstrumentation implements Closeable, MetricsSource {
     streamReadsIncomplete.incr(statistics.readsIncomplete);
     streamBytesReadInClose.incr(statistics.bytesReadInClose);
     streamBytesDiscardedInAbort.incr(statistics.bytesDiscardedInAbort);
+    incrementCounter(STREAM_READ_VERSION_MISMATCHES,
+        statistics.versionMismatches.get());
   }
 
   @Override
@@ -639,6 +642,8 @@ public class S3AInstrumentation implements Closeable, MetricsSource {
     public long bytesDiscardedInAbort;
     public long policySetCount;
     public long inputPolicy;
+    /** This is atomic so that it can be passed as a reference. */
+    private final AtomicLong versionMismatches = new AtomicLong(0);
 
     private InputStreamStatistics() {
     }
@@ -764,6 +769,14 @@ public class S3AInstrumentation implements Closeable, MetricsSource {
     }
 
     /**
+     * Get a reference to the version mismatch counter.
+     * @return a counter which can be incremented.
+     */
+    public AtomicLong getVersionMismatchCounter() {
+      return versionMismatches;
+    }
+
+    /**
      * String operator describes all the current statistics.
      * <b>Important: there are no guarantees as to the stability
      * of this value.</b>
@@ -796,6 +809,7 @@ public class S3AInstrumentation implements Closeable, MetricsSource {
       sb.append(", BytesDiscardedInAbort=").append(bytesDiscardedInAbort);
       sb.append(", InputPolicy=").append(inputPolicy);
       sb.append(", InputPolicySetCount=").append(policySetCount);
+      sb.append(", versionMismatches=").append(versionMismatches.get());
       sb.append('}');
       return sb.toString();
     }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java
index e49a7e9..a7317c9 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.fs.s3a;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
 
 import javax.annotation.Nullable;
 
@@ -44,6 +45,11 @@ public class S3AReadOpContext extends S3AOpContext {
   private final S3AInputPolicy inputPolicy;
 
   /**
+   * How to detect and deal with the object being updated during read.
+   */
+  private final ChangeDetectionPolicy changeDetectionPolicy;
+
+  /**
    * Readahead for GET operations/skip, etc.
    */
   private final long readahead;
@@ -59,6 +65,7 @@ public class S3AReadOpContext extends S3AOpContext {
    * @param dstFileStatus target file status
    * @param inputPolicy the input policy
    * @param readahead readahead for GET operations/skip, etc.
+   * @param changeDetectionPolicy change detection policy.
    */
   public S3AReadOpContext(
       final Path path,
@@ -69,6 +76,7 @@ public class S3AReadOpContext extends S3AOpContext {
       S3AInstrumentation instrumentation,
       FileStatus dstFileStatus,
       S3AInputPolicy inputPolicy,
+      ChangeDetectionPolicy changeDetectionPolicy,
       final long readahead) {
     super(isS3GuardEnabled, invoker, s3guardInvoker, stats, instrumentation,
         dstFileStatus);
@@ -76,6 +84,7 @@ public class S3AReadOpContext extends S3AOpContext {
     Preconditions.checkArgument(readahead >= 0,
         "invalid readahead %d", readahead);
     this.inputPolicy = checkNotNull(inputPolicy);
+    this.changeDetectionPolicy = checkNotNull(changeDetectionPolicy);
     this.readahead = readahead;
   }
 
@@ -110,6 +119,10 @@ public class S3AReadOpContext extends S3AOpContext {
     return inputPolicy;
   }
 
+  public ChangeDetectionPolicy getChangeDetectionPolicy() {
+    return changeDetectionPolicy;
+  }
+
   /**
    * Get the readahead for this operation.
    * @return a value {@literal >=} 0
@@ -125,6 +138,7 @@ public class S3AReadOpContext extends S3AOpContext {
     sb.append("path=").append(path);
     sb.append(", inputPolicy=").append(inputPolicy);
     sb.append(", readahead=").append(readahead);
+    sb.append(", changeDetectionPolicy=").append(changeDetectionPolicy);
     sb.append('}');
     return sb.toString();
   }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java
index 1e475e1..c7b80c9 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java
@@ -172,6 +172,13 @@ public class S3ARetryPolicy implements RetryPolicy {
     policyMap.put(FileNotFoundException.class, fail);
     policyMap.put(InvalidRequestException.class, fail);
 
+    // once the file has changed, trying again is not going to help
+    policyMap.put(RemoteFileChangedException.class, fail);
+
+    // likely only recovered by changing the policy configuration or s3
+    // implementation
+    policyMap.put(NoVersionAttributeException.class, fail);
+
     // should really be handled by resubmitting to new location;
     // that's beyond the scope of this retry policy
     policyMap.put(AWSRedirectException.class, fail);
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
index 6f79286..54a2c60 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
@@ -116,13 +116,15 @@ public enum Statistic {
   STREAM_OPENED("stream_opened",
       "Total count of times an input stream to object store was opened"),
   STREAM_READ_EXCEPTIONS("stream_read_exceptions",
-      "Number of seek operations invoked on input streams"),
+      "Number of exceptions invoked on input streams"),
   STREAM_READ_FULLY_OPERATIONS("stream_read_fully_operations",
       "Count of readFully() operations in streams"),
   STREAM_READ_OPERATIONS("stream_read_operations",
       "Count of read() operations in streams"),
   STREAM_READ_OPERATIONS_INCOMPLETE("stream_read_operations_incomplete",
       "Count of incomplete read() operations in streams"),
+  STREAM_READ_VERSION_MISMATCHES("stream_read_version_mismatches",
+      "Count of version mismatches encountered while reading streams"),
   STREAM_SEEK_BYTES_BACKWARDS("stream_bytes_backwards_on_seek",
       "Count of bytes moved backwards during seek operations"),
   STREAM_SEEK_BYTES_READ("stream_bytes_read",
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ChangeDetectionPolicy.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ChangeDetectionPolicy.java
new file mode 100644
index 0000000..f3d8bc2
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ChangeDetectionPolicy.java
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.util.Locale;
+
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.s3a.RemoteFileChangedException;
+
+import static org.apache.hadoop.fs.s3a.Constants.*;
+
+/**
+ * Object change detection policy.
+ * Determines which attribute is used to detect change and what to do when
+ * change is detected.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public abstract class ChangeDetectionPolicy {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ChangeDetectionPolicy.class);
+
+  @VisibleForTesting
+  public static final String CHANGE_DETECTED = "change detected";
+
+  private final Mode mode;
+  private final boolean requireVersion;
+
+  /**
+   * Version support is only warned about once per S3A instance.
+   * This still means that on a long-lived application which destroys
+   * filesystems it'll appear once-per-query in the logs, but at least
+   * it will not appear once per file read.
+   */
+  private final LogExactlyOnce logNoVersionSupport = new LogExactlyOnce(LOG);
+
+  /**
+   * The S3 object attribute used to detect change.
+   */
+  public enum Source {
+    ETag(CHANGE_DETECT_SOURCE_ETAG),
+    VersionId(CHANGE_DETECT_SOURCE_VERSION_ID),
+    /** you can't ask for this explicitly outside of tests. */
+    None("none");
+
+    private final String source;
+
+    Source(String source) {
+      this.source = source;
+    }
+
+    private static Source fromString(String trimmed) {
+      for (Source value : values()) {
+        if (value.source.equals(trimmed)) {
+          return value;
+        }
+      }
+      LOG.warn("Unrecognized " + CHANGE_DETECT_SOURCE + " value: \"{}\"",
+          trimmed);
+      return fromString(CHANGE_DETECT_SOURCE_DEFAULT);
+    }
+
+    static Source fromConfiguration(Configuration configuration) {
+      String trimmed = configuration.get(CHANGE_DETECT_SOURCE,
+          CHANGE_DETECT_SOURCE_DEFAULT).trim()
+          .toLowerCase(Locale.ENGLISH);
+      return fromString(trimmed);
+    }
+  }
+
+  /**
+   * What to do when change is detected.
+   */
+  public enum Mode {
+    /** Client side validation. */
+    Client(CHANGE_DETECT_MODE_CLIENT),
+    /** Server side validation. */
+    Server(CHANGE_DETECT_MODE_SERVER),
+    /** Warn but continue. */
+    Warn(CHANGE_DETECT_MODE_WARN),
+    /** No checks. */
+    None(CHANGE_DETECT_MODE_NONE);
+
+    private final String mode;
+
+    Mode(String mode) {
+      this.mode = mode;
+    }
+
+    private static Mode fromString(String trimmed) {
+      for (Mode value : values()) {
+        if (value.mode.equals(trimmed)) {
+          return value;
+        }
+      }
+      LOG.warn("Unrecognized " + CHANGE_DETECT_MODE + " value: \"{}\"",
+          trimmed);
+      return fromString(CHANGE_DETECT_MODE_DEFAULT);
+    }
+
+    static Mode fromConfiguration(Configuration configuration) {
+      String trimmed = configuration.get(CHANGE_DETECT_MODE,
+          CHANGE_DETECT_MODE_DEFAULT)
+          .trim()
+          .toLowerCase(Locale.ENGLISH);
+      return fromString(trimmed);
+    }
+  }
+
+  protected ChangeDetectionPolicy(Mode mode, boolean requireVersion) {
+    this.mode = mode;
+    this.requireVersion = requireVersion;
+  }
+
+  public Mode getMode() {
+    return mode;
+  }
+
+  public abstract Source getSource();
+
+  public boolean isRequireVersion() {
+    return requireVersion;
+  }
+
+  public LogExactlyOnce getLogNoVersionSupport() {
+    return logNoVersionSupport;
+  }
+
+  /**
+   * Reads the change detection policy from Configuration.
+   *
+   * @param configuration the configuration
+   * @return the policy
+   */
+  public static ChangeDetectionPolicy getPolicy(Configuration configuration) {
+    Mode mode = Mode.fromConfiguration(configuration);
+    Source source = Source.fromConfiguration(configuration);
+    boolean requireVersion = configuration.getBoolean(
+        CHANGE_DETECT_REQUIRE_VERSION, CHANGE_DETECT_REQUIRE_VERSION_DEFAULT);
+    return createPolicy(mode, source, requireVersion);
+  }
+
+  /**
+   * Create a policy.
+   * @param mode mode pf checks
+   * @param source source of change
+   * @param requireVersion throw exception when no version available?
+   * @return the policy
+   */
+  @VisibleForTesting
+  public static ChangeDetectionPolicy createPolicy(final Mode mode,
+      final Source source, final boolean requireVersion) {
+    switch (source) {
+    case ETag:
+      return new ETagChangeDetectionPolicy(mode, requireVersion);
+    case VersionId:
+      return new VersionIdChangeDetectionPolicy(mode, requireVersion);
+    default:
+      return new NoChangeDetection();
+    }
+  }
+
+  /**
+   * Pulls the attribute this policy uses to detect change out of the S3 object
+   * metadata.  The policy generically refers to this attribute as
+   * {@code revisionId}.
+   *
+   * @param objectMetadata the s3 object metadata
+   * @param uri the URI of the object
+   * @return the revisionId string as interpreted by this policy, or potentially
+   * null if the attribute is unavailable (such as when the policy says to use
+   * versionId but object versioning is not enabled for the bucket).
+   */
+  public abstract String getRevisionId(ObjectMetadata objectMetadata,
+      String uri);
+
+  /**
+   * Applies the given {@link #getRevisionId(ObjectMetadata, String) revisionId}
+   * as a server-side qualification on the {@code GetObjectRequest}.
+   *
+   * @param request the request
+   * @param revisionId the revision id
+   */
+  public abstract void applyRevisionConstraint(GetObjectRequest request,
+      String revisionId);
+
+  /**
+   * Takes appropriate action based on {@link #getMode() mode} when a change has
+   * been detected.
+   *
+   * @param revisionId the expected revision id
+   * @param newRevisionId the detected revision id
+   * @param uri the URI of the object being accessed
+   * @param position the position being read in the object
+   * @param operation the operation being performed on the object (e.g. open or
+   * re-open) that triggered the change detection
+   * @param timesAlreadyDetected number of times a change has already been
+   * detected on the current stream
+   * @return a pair of: was a change detected, and any exception to throw.
+   * If the change was detected, this updates a counter in the stream
+   * statistics; If an exception was returned it is thrown after the counter
+   * update.
+   */
+  public ImmutablePair<Boolean, RemoteFileChangedException> onChangeDetected(
+      String revisionId,
+      String newRevisionId,
+      String uri,
+      long position,
+      String operation,
+      long timesAlreadyDetected) {
+    switch (mode) {
+    case None:
+      // something changed; we don't care.
+      return new ImmutablePair<>(false, null);
+    case Warn:
+      if (timesAlreadyDetected == 0) {
+        // only warn on the first detection to avoid a noisy log
+        LOG.warn(
+            String.format("%s change detected on %s %s at %d. Expected %s got %s",
+                getSource(), operation, uri, position, revisionId,
+                newRevisionId));
+        return new ImmutablePair<>(true, null);
+      }
+      return new ImmutablePair<>(false, null);
+    case Client:
+    case Server:
+    default:
+      // mode == Client (or Server, but really won't be called for Server)
+      return new ImmutablePair<>(true,
+          new RemoteFileChangedException(uri,
+              operation,
+              String.format("%s "
+                      + CHANGE_DETECTED
+                      + " while reading at position %s."
+                    + " Expected %s got %s",
+              getSource(), position, revisionId, newRevisionId)));
+    }
+  }
+
+  /**
+   * Change detection policy based on {@link ObjectMetadata#getETag() eTag}.
+   */
+  static class ETagChangeDetectionPolicy extends ChangeDetectionPolicy {
+
+    ETagChangeDetectionPolicy(Mode mode, boolean requireVersion) {
+      super(mode, requireVersion);
+    }
+
+    @Override
+    public String getRevisionId(ObjectMetadata objectMetadata, String uri) {
+      return objectMetadata.getETag();
+    }
+
+    @Override
+    public void applyRevisionConstraint(GetObjectRequest request,
+        String revisionId) {
+      LOG.debug("Restricting request to etag {}", revisionId);
+      request.withMatchingETagConstraint(revisionId);
+    }
+
+    @Override
+    public Source getSource() {
+      return Source.ETag;
+    }
+
+    @Override
+    public String toString() {
+      return "ETagChangeDetectionPolicy mode=" + getMode();
+    }
+
+  }
+
+  /**
+   * Change detection policy based on
+   * {@link ObjectMetadata#getVersionId() versionId}.
+   */
+  static class VersionIdChangeDetectionPolicy extends
+      ChangeDetectionPolicy {
+
+    VersionIdChangeDetectionPolicy(Mode mode, boolean requireVersion) {
+      super(mode, requireVersion);
+    }
+
+    @Override
+    public String getRevisionId(ObjectMetadata objectMetadata, String uri) {
+      String versionId = objectMetadata.getVersionId();
+      if (versionId == null) {
+        // this policy doesn't work if the bucket doesn't have object versioning
+        // enabled (which isn't by default)
+        getLogNoVersionSupport().warn(
+            CHANGE_DETECT_MODE + " set to " + Source.VersionId
+                + " but no versionId available while reading {}. "
+                + "Ensure your bucket has object versioning enabled. "
+                + "You may see inconsistent reads.",
+            uri);
+      }
+      return versionId;
+    }
+
+    @Override
+    public void applyRevisionConstraint(GetObjectRequest request,
+        String revisionId) {
+      LOG.debug("Restricting request to version {}", revisionId);
+      request.withVersionId(revisionId);
+    }
+
+    @Override
+    public Source getSource() {
+      return Source.VersionId;
+    }
+
+    @Override
+    public String toString() {
+      return "VersionIdChangeDetectionPolicy mode=" + getMode();
+    }
+  }
+
+  /**
+   * Don't check for changes.
+   */
+  static class NoChangeDetection extends ChangeDetectionPolicy {
+
+    NoChangeDetection() {
+      super(Mode.None, false);
+    }
+
+    @Override
+    public Source getSource() {
+      return Source.None;
+    }
+
+    @Override
+    public String getRevisionId(final ObjectMetadata objectMetadata,
+        final String uri) {
+      return null;
+    }
+
+    @Override
+    public void applyRevisionConstraint(final GetObjectRequest request,
+        final String revisionId) {
+
+    }
+
+    @Override
+    public String toString() {
+      return "NoChangeDetection";
+    }
+
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ChangeTracker.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ChangeTracker.java
new file mode 100644
index 0000000..f76602b
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ChangeTracker.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.S3Object;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.s3a.NoVersionAttributeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.s3a.RemoteFileChangedException;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Change tracking for input streams: the revision ID/etag
+ * the previous request is recorded and when the next request comes in,
+ * it is compared.
+ * Self-contained for testing and use in different streams.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class ChangeTracker {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ChangeTracker.class);
+
+  public static final String CHANGE_REPORTED_BY_S3 = "reported by S3";
+
+  /** Policy to use. */
+  private final ChangeDetectionPolicy policy;
+
+  /**
+   * URI of file being read.
+   */
+  private final String uri;
+
+  /**
+   * Mismatch counter; expected to be wired up to StreamStatistics except
+   * during testing.
+   */
+  private final AtomicLong versionMismatches;
+
+  /**
+   * Revision identifier (e.g. eTag or versionId, depending on change
+   * detection policy).
+   */
+  private String revisionId;
+
+  /**
+   * Create a change tracker.
+   * @param uri URI of object being tracked
+   * @param policy policy to track.
+   * @param versionMismatches reference to the version mismatch counter
+   */
+  public ChangeTracker(final String uri,
+      final ChangeDetectionPolicy policy,
+      final AtomicLong versionMismatches) {
+    this.policy = checkNotNull(policy);
+    this.uri = uri;
+    this.versionMismatches = versionMismatches;
+  }
+
+  public String getRevisionId() {
+    return revisionId;
+  }
+
+  public ChangeDetectionPolicy.Source getSource() {
+    return policy.getSource();
+  }
+
+  @VisibleForTesting
+  public AtomicLong getVersionMismatches() {
+    return versionMismatches;
+  }
+
+  /**
+   * Apply any revision control set by the policy if it is to be
+   * enforced on the server.
+   * @param request request to modify
+   * @return true iff a constraint was added.
+   */
+  public boolean maybeApplyConstraint(
+      final GetObjectRequest request) {
+
+    if (policy.getMode() == ChangeDetectionPolicy.Mode.Server
+        && revisionId != null) {
+      policy.applyRevisionConstraint(request, revisionId);
+      return true;
+    }
+    return false;
+  }
+
+
+  /**
+   * Process the response from the server for validation against the
+   * change policy.
+   * @param object object returned; may be null.
+   * @param operation operation in progress.
+   * @param pos offset of read
+   * @throws PathIOException raised on failure
+   * @throws RemoteFileChangedException if the remote file has changed.
+   */
+  public void processResponse(final S3Object object,
+      final String operation,
+      final long pos) throws PathIOException {
+    if (object == null) {
+      // no object returned. Either mismatch or something odd.
+      if (revisionId != null) {
+        // the requirements of the change detection policy wasn't met: the
+        // object was not returned.
+        versionMismatches.incrementAndGet();
+        throw new RemoteFileChangedException(uri, operation,
+            String.format("%s change "
+                    + CHANGE_REPORTED_BY_S3
+                    + " while reading"
+                    + " at position %s."
+                    + " Version %s was unavailable",
+                getSource(),
+                pos,
+                getRevisionId()));
+      } else {
+        throw new PathIOException(uri, "No data returned from GET request");
+      }
+    }
+
+    final ObjectMetadata metadata = object.getObjectMetadata();
+    final String newRevisionId = policy.getRevisionId(metadata, uri);
+    if (newRevisionId == null && policy.isRequireVersion()) {
+      throw new NoVersionAttributeException(uri, String.format(
+          "Change detection policy requires %s",
+          policy.getSource()));
+    }
+    if (revisionId == null) {
+      // revisionId is null on first (re)open. Pin it so change can be detected
+      // if object has been updated
+      LOG.debug("Setting revision ID for object at {}: {}",
+          uri, newRevisionId);
+      revisionId = newRevisionId;
+    } else if (!revisionId.equals(newRevisionId)) {
+      LOG.debug("Revision ID changed from {} to {}",
+          revisionId, newRevisionId);
+      ImmutablePair<Boolean, RemoteFileChangedException> pair =
+          policy.onChangeDetected(
+              revisionId,
+              newRevisionId,
+              uri,
+              pos,
+              operation,
+              versionMismatches.get());
+      if (pair.left) {
+        // an mismatch has occurred: note it.
+        versionMismatches.incrementAndGet();
+      }
+      if (pair.right != null) {
+        // there's an exception to raise: do it
+        throw pair.right;
+      }
+    }
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder(
+        "ChangeTracker{");
+    sb.append("changeDetectionPolicy=").append(policy);
+    sb.append(", revisionId='").append(revisionId).append('\'');
+    sb.append('}');
+    return sb.toString();
+  }
+
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/LogExactlyOnce.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/LogExactlyOnce.java
new file mode 100644
index 0000000..54a8836
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/LogExactlyOnce.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.slf4j.Logger;
+
+/**
+ * Log exactly once, even across threads.
+ */
+public class LogExactlyOnce {
+
+  private final AtomicBoolean logged = new AtomicBoolean(false);
+  private final Logger log;
+
+  public LogExactlyOnce(final Logger log) {
+    this.log = log;
+  }
+
+  public void warn(String format, Object...args) {
+    if (!logged.getAndSet(true)) {
+      log.warn(format, args);
+    }
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/package-info.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/package-info.java
new file mode 100644
index 0000000..2ef6db8
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/package-info.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+/**
+ * Implementation classes private to the S3A store.
+ * Do not use outside of the hadoop-aws module.
+ */
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.fs.s3a.impl;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
index 11ff610..1741471 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
@@ -1144,6 +1144,131 @@ the capacity through `hadoop s3guard set-capacity` (and pay more, obviously).
 
 
 
+## Handling Read-During-Overwrite
+
+Read-during-overwrite is the condition where a writer overwrites a file while
+a reader has an open input stream on the file.  Depending on configuration,
+the S3AFileSystem may detect this and throw a `RemoteFileChangedException` in
+conditions where the reader's input stream might otherwise silently switch over
+from reading bytes from the original version of the file to reading bytes from
+the new version.
+
+The configurations items controlling this behavior are:
+
+```xml
+<property>
+  <name>fs.s3a.change.detection.source</name>
+  <value>etag</value>
+  <description>
+    Select which S3 object attribute to use for change detection.
+    Currently support 'etag' for S3 object eTags and 'versionid' for
+    S3 object version IDs.  Use of version IDs requires object versioning to be
+    enabled for each S3 bucket utilized.  Object versioning is disabled on
+    buckets by default. When version ID is used, the buckets utilized should
+    have versioning enabled before any data is written.
+  </description>
+</property>
+
+<property>
+  <name>fs.s3a.change.detection.mode</name>
+  <value>server</value>
+  <description>
+    Determines how change detection is applied to alert to S3 objects
+    rewritten while being read. Value 'server' indicates to apply the attribute
+    constraint directly on GetObject requests to S3. Value 'client' means to do a
+    client-side comparison of the attribute value returned in the response.  Value
+    'server' would not work with third-party S3 implementations that do not
+    support these constraints on GetObject. Values 'server' and 'client' generate
+    RemoteObjectChangedException when a mismatch is detected.  Value 'warn' works
+    like 'client' but generates only a warning.  Value 'none' will ignore change
+    detection completely.
+  </description>
+</property>
+
+<property>
+  <name>fs.s3a.change.detection.version.required</name>
+  <value>true</value>
+  <description>
+    Determines if S3 object version attribute defined by
+    fs.s3.change.detection.source should be treated as required.  If true and the
+    referred attribute is unavailable in an S3 GetObject response,
+    NoVersionAttributeException is thrown.  Setting to 'true' is encouraged to
+    avoid potential for inconsistent reads with third-party S3 implementations or
+    against S3 buckets that have object versioning disabled.
+  </description>
+</property>
+```
+
+In the default configuration, S3 object eTags are used to detect changes.  When
+the filesystem retrieves a file from S3 using
+[Get Object](https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectGET.html),
+it captures the eTag and uses that eTag in an 'If-Match' condition on each
+subsequent request.  If a concurrent writer has overwritten the file, the
+'If-Match' condition will fail and a RemoteFileChangedException will be thrown.
+
+Even in this default configuration, a new write may not trigger this exception
+on an open reader.  For example, if the reader only reads forward in the file
+then only a single S3 'Get Object' request is made and the full contents of the
+file are streamed from a single response.  An overwrite of the file after the
+'Get Object' request would not be seen at all by a reader with an input stream
+that had already read the first byte.  Seeks backward on the other hand can
+result in new 'Get Object' requests that can trigger the
+`RemoteFileChangedException`.
+
+Additionally, due to the eventual consistency of S3 in a read-after-overwrite
+scenario, visibility of a new write may be delayed, avoiding the
+`RemoteFileChangedException` for some readers.  That said, if a reader does not
+see `RemoteFileChangedException`, they will have at least read a consistent view
+of a single version of the file (the version available when they started
+reading).
+
+### Change detection with S3 Versions.
+
+It is possible to switch to using the
+[S3 object version id](https://docs.aws.amazon.com/AmazonS3/latest/dev/ObjectVersioning.html)
+instead of eTag as the change detection mechanism.  Use of this option requires
+object versioning to be enabled on any S3 buckets used by the filesystem.  The
+benefit of using version id instead of eTag is potentially reduced frequency
+of RemoteFileChangedException. With object versioning enabled, old versions
+of objects remain available after they have been overwritten.
+This means an open input stream will still be able to seek backwards after a
+concurrent writer has overwritten the file.
+The reader will retain their consistent view of the version of the file from
+which they read the first byte.
+Because the version ID is null for objects written prior to enablement of
+object versioning, **this option should only be used when the S3 buckets
+have object versioning enabled from the beginning.**
+
+Note: when you rename files the copied files may have a different version number.
+
+### Change Detection Modes.
+
+Configurable change detection mode is the next option.  Different modes are
+available primarily for compatibility with third-party S3 implementations which
+may not support all change detection mechanisms.
+
+* `server`: the version/etag check is performed on the server by adding
+extra headers to the `GET` request. This is the default.
+* `client` : check on the client by comparing the eTag/version ID of a
+reopened file with the previous version.
+This is useful when the implementation doesn't support the `If-Match` header.
+* `warn`: check on the client, but only warn on a mismatch, rather than fail.
+* `none` do not check. Useful if the implementation doesn't provide eTag
+or version ID support at all or you would like to retain previous behavior
+where the reader's input stream silently switches over to the new object version
+(not recommended).
+
+The final option (`fs.s3a.change.detection.version.required`) is present
+primarily to ensure the filesystem doesn't silently ignore the condition
+where it is configured to use version ID on a bucket that doesn't have
+object versioning enabled or alternatively it is configured to use eTag on
+an S3 implementation that doesn't return eTags.
+
+When `true` (default) and 'Get Object' doesn't return eTag or
+version ID (depending on configured 'source'), a `NoVersionAttributeException`
+will be thrown.  When `false` and and eTag or version ID is not returned,
+the stream can be read, but without any version checking.
+
 
 ## <a name="per_bucket_configuration"></a>Configuring different S3 buckets with Per-Bucket Configuration
 
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md
index b3c3e38..3123221 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md
@@ -715,36 +715,36 @@ org.apache.hadoop.fs.s3a.AWSS3IOException: copyFromLocalFile(file:/tmp/hello.txt
     (Service: Amazon S3; Status Code: 400; Error Code: BadDigest; Request ID: 4018131225),
     S3 Extended Request ID: null
   at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:127)
-	at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:69)
-	at org.apache.hadoop.fs.s3a.S3AFileSystem.copyFromLocalFile(S3AFileSystem.java:1494)
-	at org.apache.hadoop.tools.cloudup.Cloudup.uploadOneFile(Cloudup.java:466)
-	at org.apache.hadoop.tools.cloudup.Cloudup.access$000(Cloudup.java:63)
-	at org.apache.hadoop.tools.cloudup.Cloudup$1.call(Cloudup.java:353)
-	at org.apache.hadoop.tools.cloudup.Cloudup$1.call(Cloudup.java:350)
-	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
-	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
-	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
-	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
-	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
-	at java.lang.Thread.run(Thread.java:748)
+  at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:69)
+  at org.apache.hadoop.fs.s3a.S3AFileSystem.copyFromLocalFile(S3AFileSystem.java:1494)
+  at org.apache.hadoop.tools.cloudup.Cloudup.uploadOneFile(Cloudup.java:466)
+  at org.apache.hadoop.tools.cloudup.Cloudup.access$000(Cloudup.java:63)
+  at org.apache.hadoop.tools.cloudup.Cloudup$1.call(Cloudup.java:353)
+  at org.apache.hadoop.tools.cloudup.Cloudup$1.call(Cloudup.java:350)
+  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
+  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
+  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
+  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
+  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
+  at java.lang.Thread.run(Thread.java:748)
 Caused by: com.amazonaws.services.s3.model.AmazonS3Exception:
     The Content-MD5 you specified did not match what we received.
     (Service: Amazon S3; Status Code: 400; Error Code: BadDigest; Request ID: 4018131225),
     S3 Extended Request ID: null
   at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:1307)
-	at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:894)
-	at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:597)
-	at com.amazonaws.http.AmazonHttpClient.doExecute(AmazonHttpClient.java:363)
-	at com.amazonaws.http.AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:329)
-	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:308)
-	at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3659)
-	at com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1422)
-	at com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:131)
-	at com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:123)
-	at com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:139)
-	at com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:47)
-	at org.apache.hadoop.fs.s3a.BlockingThreadPoolExecutorService$CallableWithPermitRelease.call(BlockingThreadPoolExecutorService.java:239)
-	... 4 more
+  at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:894)
+  at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:597)
+  at com.amazonaws.http.AmazonHttpClient.doExecute(AmazonHttpClient.java:363)
+  at com.amazonaws.http.AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:329)
+  at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:308)
+  at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3659)
+  at com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1422)
+  at com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:131)
+  at com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:123)
+  at com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:139)
+  at com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:47)
+  at org.apache.hadoop.fs.s3a.BlockingThreadPoolExecutorService$CallableWithPermitRelease.call(BlockingThreadPoolExecutorService.java:239)
+  ... 4 more
 ```
 
 This stack trace was seen when interacting with a third-party S3 store whose
@@ -966,6 +966,69 @@ if it is required that the data is persisted durably after every
 This includes resilient logging, HBase-style journaling
 and the like. The standard strategy here is to save to HDFS and then copy to S3.
 
+### `RemoteFileChangedException` and read-during-overwrite
+
+```
+org.apache.hadoop.fs.s3a.RemoteFileChangedException: re-open `s3a://my-bucket/test/file.txt':
+  ETag change reported by S3 while reading at position 1949.
+  Version f9c186d787d4de9657e99f280ba26555 was unavailable
+  at org.apache.hadoop.fs.s3a.impl.ChangeTracker.processResponse(ChangeTracker.java:137)
+  at org.apache.hadoop.fs.s3a.S3AInputStream.reopen(S3AInputStream.java:200)
+  at org.apache.hadoop.fs.s3a.S3AInputStream.lambda$lazySeek$1(S3AInputStream.java:346)
+  at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$2(Invoker.java:195)
+  at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
+  at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:265)
+  at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:322)
+  at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:261)
+  at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:193)
+  at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:215)
+  at org.apache.hadoop.fs.s3a.S3AInputStream.lazySeek(S3AInputStream.java:339)
+  at org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:372)
+```
+
+If an S3 object is updated while an S3A filesystem reader has an open
+`InputStream` on it, the reader may encounter `RemoteFileChangedException`.  This
+occurs if the S3A `InputStream` needs to re-open the object (e.g. during a seek())
+and detects the change.
+
+If the change detection mode is configured to 'warn', a warning like the
+following will be seen instead of `RemoteFileChangedException`:
+
+```
+WARN  - ETag change detected on re-open s3a://my-bucket/test/readFileToChange.txt at 1949.
+ Expected f9c186d787d4de9657e99f280ba26555 got 043abff21b7bd068d2d2f27ccca70309
+```
+
+Using a third-party S3 implementation that doesn't support eTags might result in
+the following error.
+
+```
+org.apache.hadoop.fs.s3a.NoVersionAttributeException: `s3a://my-bucket/test/file.txt':
+ Change detection policy requires ETag
+  at org.apache.hadoop.fs.s3a.impl.ChangeTracker.processResponse(ChangeTracker.java:153)
+  at org.apache.hadoop.fs.s3a.S3AInputStream.reopen(S3AInputStream.java:200)
+  at org.apache.hadoop.fs.s3a.S3AInputStream.lambda$lazySeek$1(S3AInputStream.java:346)
+  at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$2(Invoker.java:195)
+  at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
+  at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:265)
+  at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:322)
+  at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:261)
+  at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:193)
+  at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:215)
+  at org.apache.hadoop.fs.s3a.S3AInputStream.lazySeek(S3AInputStream.java:339)
+  at org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:372)
+```
+
+If the change policy is `versionid` there are a number of possible causes
+
+* The bucket does not have object versioning enabled.
+* The bucket does have versioning enabled, but the object being read was created
+before versioning was enabled.
+* The bucket is on a third-party store which does not support object versioning.
+
+See [Handling Read-During-Overwrite](./index.html#handling_read-during-overwrite)
+for more information.
+
 ## <a name="encryption"></a> S3 Server Side Encryption
 
 ### `AWSS3IOException` `KMS.NotFoundException` "Invalid arn" when using SSE-KMS
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFailureHandling.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFailureHandling.java
index 5c2b5a3..8f8d860 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFailureHandling.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFailureHandling.java
@@ -20,17 +20,14 @@ package org.apache.hadoop.fs.s3a;
 
 import com.amazonaws.services.s3.model.DeleteObjectsRequest;
 import com.amazonaws.services.s3.model.MultiObjectDeleteException;
+
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.EOFException;
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -40,8 +37,7 @@ import static org.apache.hadoop.fs.s3a.S3ATestUtils.getLandsatCSVPath;
 import static org.apache.hadoop.test.LambdaTestUtils.*;
 
 /**
- * Test S3A Failure translation, including a functional test
- * generating errors during stream IO.
+ * Test S3A Failure translation.
  */
 public class ITestS3AFailureHandling extends AbstractS3ATestBase {
   private static final Logger LOG =
@@ -54,65 +50,6 @@ public class ITestS3AFailureHandling extends AbstractS3ATestBase {
     conf.setBoolean(Constants.ENABLE_MULTI_DELETE, true);
     return conf;
   }
-  @Test
-  public void testReadFileChanged() throws Throwable {
-    describe("overwrite a file with a shorter one during a read, seek");
-    final int fullLength = 8192;
-    final byte[] fullDataset = dataset(fullLength, 'a', 32);
-    final int shortLen = 4096;
-    final byte[] shortDataset = dataset(shortLen, 'A', 32);
-    final FileSystem fs = getFileSystem();
-    final Path testpath = path("readFileToChange.txt");
-    // initial write
-    writeDataset(fs, testpath, fullDataset, fullDataset.length, 1024, false);
-    try(FSDataInputStream instream = fs.open(testpath)) {
-      instream.seek(fullLength - 16);
-      assertTrue("no data to read", instream.read() >= 0);
-      // overwrite
-      writeDataset(fs, testpath, shortDataset, shortDataset.length, 1024, true);
-      // here the file length is less. Probe the file to see if this is true,
-      // with a spin and wait
-      eventually(30 * 1000, 1000,
-          () -> {
-            assertEquals(shortLen, fs.getFileStatus(testpath).getLen());
-          });
-
-      // here length is shorter. Assuming it has propagated to all replicas,
-      // the position of the input stream is now beyond the EOF.
-      // An attempt to seek backwards to a position greater than the
-      // short length will raise an exception from AWS S3, which must be
-      // translated into an EOF
-
-      instream.seek(shortLen + 1024);
-      int c = instream.read();
-      assertIsEOF("read()", c);
-
-      byte[] buf = new byte[256];
-
-      assertIsEOF("read(buffer)", instream.read(buf));
-      assertIsEOF("read(offset)",
-          instream.read(instream.getPos(), buf, 0, buf.length));
-
-      // now do a block read fully, again, backwards from the current pos
-      intercept(EOFException.class, "", "readfully",
-          () -> instream.readFully(shortLen + 512, buf));
-
-      assertIsEOF("read(offset)",
-          instream.read(shortLen + 510, buf, 0, buf.length));
-
-      // seek somewhere useful
-      instream.seek(shortLen - 256);
-
-      // delete the file. Reads must fail
-      fs.delete(testpath, false);
-
-      intercept(FileNotFoundException.class, "", "read()",
-          () -> instream.read());
-      intercept(FileNotFoundException.class, "", "readfully",
-          () -> instream.readFully(2048, buf));
-
-    }
-  }
 
   /**
    * Assert that a read operation returned an EOF value.
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARemoteFileChanged.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARemoteFileChanged.java
new file mode 100644
index 0000000..98dd202
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARemoteFileChanged.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.FileNotFoundException;
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.junit.Assume;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy.Source;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
+import static org.apache.hadoop.fs.s3a.Constants.*;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBucketOverrides;
+import static org.apache.hadoop.test.LambdaTestUtils.eventually;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Test S3A remote file change detection.
+ */
+@RunWith(Parameterized.class)
+public class ITestS3ARemoteFileChanged extends AbstractS3ATestBase {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ITestS3ARemoteFileChanged.class);
+
+  private final String changeDetectionSource;
+  private final String changeDetectionMode;
+  private final boolean expectChangeException;
+  private final boolean expectFileNotFoundException;
+
+  @Parameterized.Parameters
+  public static Collection<Object[]> params() {
+    return Arrays.asList(new Object[][]{
+        // make sure it works with invalid config
+        {"bogus", "bogus", true, true},
+
+        // test with etag
+        {CHANGE_DETECT_SOURCE_ETAG, CHANGE_DETECT_MODE_SERVER, true, true},
+        {CHANGE_DETECT_SOURCE_ETAG, CHANGE_DETECT_MODE_CLIENT, true, true},
+        {CHANGE_DETECT_SOURCE_ETAG, CHANGE_DETECT_MODE_WARN, false, true},
+        {CHANGE_DETECT_SOURCE_ETAG, CHANGE_DETECT_MODE_NONE, false, true},
+
+        // test with versionId
+        // when using server-side versionId, the exceptions shouldn't happen
+        // since the previous version will still be available
+        {CHANGE_DETECT_SOURCE_VERSION_ID, CHANGE_DETECT_MODE_SERVER, false,
+            false},
+
+        // with client-side versionId it will behave similar to client-side eTag
+        {CHANGE_DETECT_SOURCE_VERSION_ID, CHANGE_DETECT_MODE_CLIENT, true,
+            true},
+
+        {CHANGE_DETECT_SOURCE_VERSION_ID, CHANGE_DETECT_MODE_WARN, false, true},
+        {CHANGE_DETECT_SOURCE_VERSION_ID, CHANGE_DETECT_MODE_NONE, false, true}
+    });
+  }
+
+  public ITestS3ARemoteFileChanged(String changeDetectionSource,
+      String changeDetectionMode,
+      boolean expectException,
+      boolean expectFileNotFoundException) {
+    this.changeDetectionSource = changeDetectionSource;
+    this.changeDetectionMode = changeDetectionMode;
+    this.expectChangeException = expectException;
+    this.expectFileNotFoundException = expectFileNotFoundException;
+  }
+
+  @Override
+  protected Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    String bucketName = getTestBucketName(conf);
+    removeBucketOverrides(bucketName, conf,
+        CHANGE_DETECT_SOURCE,
+        CHANGE_DETECT_MODE);
+    conf.set(CHANGE_DETECT_SOURCE, changeDetectionSource);
+    conf.set(CHANGE_DETECT_MODE, changeDetectionMode);
+    S3ATestUtils.disableFilesystemCaching(conf);
+    return conf;
+  }
+
+  @Test
+  public void testReadFileChanged() throws Throwable {
+    final int originalLength = 8192;
+    final byte[] originalDataset = dataset(originalLength, 'a', 32);
+    final int newLength = originalLength + 1;
+    final byte[] newDataset = dataset(newLength, 'A', 32);
+    final S3AFileSystem fs = getFileSystem();
+    final Path testpath = path("readFileToChange.txt");
+    // initial write
+    writeDataset(fs, testpath, originalDataset, originalDataset.length,
+        1024, false);
+
+    if (fs.getChangeDetectionPolicy().getSource() == Source.VersionId) {
+      // skip versionId tests if the bucket doesn't have object versioning
+      // enabled
+      Assume.assumeTrue(
+          "Target filesystem does not support versioning",
+          fs.getObjectMetadata(fs.pathToKey(testpath)).getVersionId() != null);
+    }
+
+    try(FSDataInputStream instream = fs.open(testpath)) {
+      // seek forward and read successfully
+      instream.seek(1024);
+      assertTrue("no data to read", instream.read() >= 0);
+
+      // overwrite
+      writeDataset(fs, testpath, newDataset, newDataset.length, 1024, true);
+      // here the new file length is larger. Probe the file to see if this is
+      // true, with a spin and wait
+      eventually(30 * 1000, 1000,
+          () -> {
+            assertEquals(newLength, fs.getFileStatus(testpath).getLen());
+          });
+
+      // With the new file version in place, any subsequent S3 read by
+      // eTag/versionId will fail.  A new read by eTag/versionId will occur in
+      // reopen() on read after a seek() backwards.  We verify seek backwards
+      // results in the expected exception and seek() forward works without
+      // issue.
+
+      // first check seek forward
+      instream.seek(2048);
+      assertTrue("no data to read", instream.read() >= 0);
+
+      // now check seek backward
+      instream.seek(instream.getPos() - 100);
+
+      if (expectChangeException) {
+        intercept(RemoteFileChangedException.class, "", "read",
+            () -> instream.read());
+      } else {
+        instream.read();
+      }
+
+      byte[] buf = new byte[256];
+
+      // seek backward
+      instream.seek(0);
+
+      if (expectChangeException) {
+        intercept(RemoteFileChangedException.class, "", "read",
+            () -> instream.read(buf));
+        intercept(RemoteFileChangedException.class, "", "read",
+            () -> instream.read(0, buf, 0, buf.length));
+        intercept(RemoteFileChangedException.class,  "", "readfully",
+            () -> instream.readFully(0, buf));
+      } else {
+        instream.read(buf);
+        instream.read(0, buf, 0, buf.length);
+        instream.readFully(0, buf);
+      }
+
+      // delete the file. Reads must fail
+      fs.delete(testpath, false);
+
+      // seek backward
+      instream.seek(0);
+
+      if (expectFileNotFoundException) {
+        intercept(FileNotFoundException.class, "", "read()",
+            () -> instream.read());
+        intercept(FileNotFoundException.class, "", "readfully",
+            () -> instream.readFully(2048, buf));
+      } else {
+        instream.read();
+        instream.readFully(2048, buf);
+      }
+    }
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
index e15c24a..5068524 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
@@ -61,6 +61,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
 
+import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CREDENTIAL_PROVIDER_PATH;
 import static org.apache.commons.lang3.StringUtils.isNotEmpty;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
@@ -696,6 +697,18 @@ public final class S3ATestUtils {
   }
 
   /**
+   * Get the name of the test bucket.
+   * @param conf configuration to scan.
+   * @return the bucket name from the config.
+   * @throws NullPointerException: no test bucket
+   */
+  public static String getTestBucketName(final Configuration conf) {
+    String bucket = checkNotNull(conf.get(TEST_FS_S3A_NAME),
+        "No test bucket");
+    return URI.create(bucket).getHost();
+  }
+
+  /**
    * Remove any values from a bucket.
    * @param bucket bucket whose overrides are to be removed. Can be null/empty
    * @param conf config
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestStreamChangeTracker.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestStreamChangeTracker.java
new file mode 100644
index 0000000..f073c4c
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestStreamChangeTracker.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.amazonaws.services.s3.Headers;
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.S3Object;
+import org.apache.hadoop.fs.PathIOException;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
+import org.apache.hadoop.fs.s3a.impl.ChangeTracker;
+import org.apache.hadoop.test.HadoopTestBase;
+
+import static org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy.CHANGE_DETECTED;
+import static org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy.createPolicy;
+import static org.apache.hadoop.fs.s3a.impl.ChangeTracker.CHANGE_REPORTED_BY_S3;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Test {@link ChangeTracker}.
+ */
+public class TestStreamChangeTracker extends HadoopTestBase {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestStreamChangeTracker.class);
+
+  public static final String BUCKET = "bucket";
+
+  public static final String OBJECT = "object";
+
+  public static final String URI = "s3a://" + BUCKET + "/" + OBJECT;
+
+  @Test
+  public void testVersionCheckingHandlingNoVersions() throws Throwable {
+    LOG.info("If an endpoint doesn't return versions, that's OK");
+    ChangeTracker tracker = newTracker(
+        ChangeDetectionPolicy.Mode.Client,
+        ChangeDetectionPolicy.Source.VersionId,
+        false);
+    assertFalse("Tracker should not have applied contraints " + tracker,
+        tracker.maybeApplyConstraint(newGetObjectRequest()));
+    tracker.processResponse(
+        newResponse(null, null),
+        "", 0);
+    assertTrackerMismatchCount(tracker, 0);
+  }
+
+  @Test
+  public void testVersionCheckingHandlingNoVersionsVersionRequired()
+      throws Throwable {
+    LOG.info("If an endpoint doesn't return versions but we are configured to"
+        + "require them");
+    ChangeTracker tracker = newTracker(
+        ChangeDetectionPolicy.Mode.Client,
+        ChangeDetectionPolicy.Source.VersionId,
+        true);
+    expectNoVersionAttributeException(tracker, newResponse(null, null),
+        "policy requires VersionId");
+  }
+
+  @Test
+  public void testEtagCheckingWarn() throws Throwable {
+    LOG.info("If an endpoint doesn't return errors, that's OK");
+    ChangeTracker tracker = newTracker(
+        ChangeDetectionPolicy.Mode.Warn,
+        ChangeDetectionPolicy.Source.ETag,
+        false);
+    assertFalse("Tracker should not have applied constraints " + tracker,
+        tracker.maybeApplyConstraint(newGetObjectRequest()));
+    tracker.processResponse(
+        newResponse("e1", null),
+        "", 0);
+    tracker.processResponse(
+        newResponse("e1", null),
+        "", 0);
+    tracker.processResponse(
+        newResponse("e2", null),
+        "", 0);
+    assertTrackerMismatchCount(tracker, 1);
+    // subsequent error triggers doesn't trigger another warning
+    tracker.processResponse(
+        newResponse("e2", null),
+        "", 0);
+    assertTrackerMismatchCount(tracker, 1);
+  }
+
+  @Test
+  public void testVersionCheckingOnClient() throws Throwable {
+    LOG.info("Verify the client-side version checker raises exceptions");
+    ChangeTracker tracker = newTracker(
+        ChangeDetectionPolicy.Mode.Client,
+        ChangeDetectionPolicy.Source.VersionId,
+        false);
+    assertFalse("Tracker should not have applied constraints " + tracker,
+        tracker.maybeApplyConstraint(newGetObjectRequest()));
+    tracker.processResponse(
+        newResponse(null, "rev1"),
+        "", 0);
+    assertTrackerMismatchCount(tracker, 0);
+    assertRevisionId(tracker, "rev1");
+    GetObjectRequest request = newGetObjectRequest();
+    expectChangeException(tracker,
+        newResponse(null, "rev2"), "change detected");
+    // mismatch was noted (so gets to FS stats)
+    assertTrackerMismatchCount(tracker, 1);
+
+    // another read causes another exception
+    expectChangeException(tracker,
+        newResponse(null, "rev2"), "change detected");
+    // mismatch was noted again
+    assertTrackerMismatchCount(tracker, 2);
+  }
+
+  @Test
+  public void testVersionCheckingOnServer() throws Throwable {
+    LOG.info("Verify the client-side version checker handles null-ness");
+    ChangeTracker tracker = newTracker(
+        ChangeDetectionPolicy.Mode.Server,
+        ChangeDetectionPolicy.Source.VersionId,
+        false);
+    assertFalse("Tracker should not have applied contraints " + tracker,
+        tracker.maybeApplyConstraint(newGetObjectRequest()));
+    tracker.processResponse(
+        newResponse(null, "rev1"),
+        "", 0);
+    assertTrackerMismatchCount(tracker, 0);
+    assertRevisionId(tracker, "rev1");
+    GetObjectRequest request = newGetObjectRequest();
+    assertConstraintApplied(tracker, request);
+    // now, the tracker expects a null response
+    expectChangeException(tracker, null, CHANGE_REPORTED_BY_S3);
+    assertTrackerMismatchCount(tracker, 1);
+
+    // now, imagine the server doesn't trigger a failure due to some
+    // bug in its logic
+    // we should still react to the reported value
+    expectChangeException(tracker,
+        newResponse(null, "rev2"),
+        CHANGE_DETECTED);
+  }
+
+  protected void assertConstraintApplied(final ChangeTracker tracker,
+      final GetObjectRequest request) {
+    assertTrue("Tracker should have applied contraints " + tracker,
+        tracker.maybeApplyConstraint(request));
+  }
+
+  protected RemoteFileChangedException expectChangeException(
+      final ChangeTracker tracker,
+      final S3Object response,
+      final String message) throws Exception {
+    return expectException(tracker, response, message,
+        RemoteFileChangedException.class);
+  }
+
+  protected PathIOException expectNoVersionAttributeException(
+      final ChangeTracker tracker,
+      final S3Object response,
+      final String message) throws Exception {
+    return expectException(tracker, response, message,
+        NoVersionAttributeException.class);
+  }
+
+  protected <T extends Exception> T expectException(
+      final ChangeTracker tracker,
+      final S3Object response,
+      final String message,
+      final Class<T> clazz) throws Exception {
+    return intercept(
+        clazz,
+        message,
+        () -> {
+          tracker.processResponse(response, "", 0);
+          return tracker;
+        });
+  }
+
+  protected void assertRevisionId(final ChangeTracker tracker,
+      final String revId) {
+    assertEquals("Wrong revision ID in " + tracker,
+        revId, tracker.getRevisionId());
+  }
+
+
+  protected void assertTrackerMismatchCount(
+      final ChangeTracker tracker,
+      final int expectedCount) {
+    assertEquals("counter in tracker " + tracker,
+        expectedCount, tracker.getVersionMismatches().get());
+  }
+
+  /**
+   * Create tracker.
+   * Contains standard assertions(s).
+   * @return the tracker.
+   */
+  protected ChangeTracker newTracker(final ChangeDetectionPolicy.Mode mode,
+      final ChangeDetectionPolicy.Source source, boolean requireVersion) {
+    ChangeDetectionPolicy policy = createPolicy(
+        mode,
+        source,
+        requireVersion);
+    ChangeTracker tracker = new ChangeTracker(URI, policy,
+        new AtomicLong(0));
+    assertFalse("Tracker should not have applied constraints " + tracker,
+        tracker.maybeApplyConstraint(newGetObjectRequest()));
+    return tracker;
+  }
+
+  private GetObjectRequest newGetObjectRequest() {
+    return new GetObjectRequest(BUCKET, OBJECT);
+  }
+
+  private S3Object newResponse(String etag, String versionId) {
+    ObjectMetadata md = new ObjectMetadata();
+    if (etag != null) {
+      md.setHeader(Headers.ETAG, etag);
+    }
+    if (versionId != null) {
+      md.setHeader(Headers.S3_VERSION_ID, versionId);
+    }
+    S3Object response = emptyResponse();
+    response.setObjectMetadata(md);
+    return response;
+  }
+
+  private S3Object emptyResponse() {
+    S3Object response = new S3Object();
+    response.setBucketName(BUCKET);
+    response.setKey(OBJECT);
+    return response;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org