You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2019/03/12 11:36:38 UTC

[hadoop] branch branch-3.0 updated: HADOOP-16109. Parquet reading S3AFileSystem causes EOF (#589)

This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 7f70a78  HADOOP-16109. Parquet reading S3AFileSystem causes EOF (#589)
7f70a78 is described below

commit 7f70a78ce3200b75660ef1e5cded901d0a6eac20
Author: Steve Loughran <st...@apache.org>
AuthorDate: Tue Mar 12 11:36:32 2019 +0000

    HADOOP-16109. Parquet reading S3AFileSystem causes EOF (#589)
    
    Nobody gets seek right. No matter how many times they think they have.
    
    Reproducible test from: Dave Christianson
    Fixed seek() logic: Steve Loughran
    
    Change-Id: I39b87f3d5daa98f65de2c0a44e348821a4930573
---
 .../fs/contract/AbstractContractSeekTest.java      |   4 +-
 .../org/apache/hadoop/fs/s3a/S3AInputStream.java   |   4 +-
 .../fs/contract/s3a/ITestS3AContractSeek.java      | 275 +++++++++++++++++++++
 .../org/apache/hadoop/fs/s3a/S3ATestConstants.java |   6 +
 .../org/apache/hadoop/fs/s3a/S3ATestUtils.java     |  69 ++++++
 5 files changed, 355 insertions(+), 3 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractSeekTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractSeekTest.java
index 7af3cb0..957f86a 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractSeekTest.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractSeekTest.java
@@ -272,7 +272,7 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
     describe("Seek round a large file and verify the bytes are what is expected");
     Path testSeekFile = path("bigseekfile.txt");
     byte[] block = dataset(100 * 1024, 0, 255);
-    createFile(getFileSystem(), testSeekFile, false, block);
+    createFile(getFileSystem(), testSeekFile, true, block);
     instream = getFileSystem().open(testSeekFile);
     assertEquals(0, instream.getPos());
     //expect that seek to 0 works
@@ -309,7 +309,7 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
     assumeSupportsPositionedReadable();
     Path testSeekFile = path("bigseekfile.txt");
     byte[] block = dataset(65536, 0, 255);
-    createFile(getFileSystem(), testSeekFile, false, block);
+    createFile(getFileSystem(), testSeekFile, true, block);
     instream = getFileSystem().open(testSeekFile);
     instream.seek(39999);
     assertTrue(-1 != instream.read());
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
index 1172a31..f2e49ee 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
@@ -245,7 +245,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
       long forwardSeekLimit = Math.min(remainingInCurrentRequest,
           forwardSeekRange);
       boolean skipForward = remainingInCurrentRequest > 0
-          && diff <= forwardSeekLimit;
+          && diff < forwardSeekLimit;
       if (skipForward) {
         // the forward seek range is within the limits
         LOG.debug("Forward seek on {}, of {} bytes", uri, diff);
@@ -259,6 +259,8 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
 
         if (pos == targetPos) {
           // all is well
+          LOG.debug("Now at {}: bytes remaining in current request: {}",
+              pos, remainingInCurrentRequest());
           return;
         } else {
           // log a warning; continue to attempt to re-open
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractSeek.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractSeek.java
index 379ace8..787e32a 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractSeek.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractSeek.java
@@ -18,19 +18,82 @@
 
 package org.apache.hadoop.fs.contract.s3a;
 
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.contract.AbstractContractSeekTest;
 import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3AInputPolicy;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE;
+import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADV_NORMAL;
+import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADV_RANDOM;
+import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADV_SEQUENTIAL;
+import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE;
+import static org.apache.hadoop.fs.s3a.S3ATestConstants.FS_S3A_IMPL_DISABLE_CACHE;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.maybeEnableS3Guard;
 
 /**
  * S3A contract tests covering file seek.
  */
+@RunWith(Parameterized.class)
 public class ITestS3AContractSeek extends AbstractContractSeekTest {
 
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ITestS3AContractSeek.class);
+
+  protected static final int READAHEAD = 1024;
+
+  private final String seekPolicy;
+
+  public static final int DATASET_LEN = READAHEAD * 2;
+
+  public static final byte[] DATASET = ContractTestUtils.dataset(DATASET_LEN, 'a', 32);
+
+  /**
+   * This test suite is parameterized for the different seek policies
+   * which S3A Supports.
+   * @return a list of seek policies to test.
+   */
+  @Parameterized.Parameters
+  public static Collection<Object[]> params() {
+    return Arrays.asList(new Object[][]{
+        {INPUT_FADV_RANDOM},
+        {INPUT_FADV_NORMAL},
+        {INPUT_FADV_SEQUENTIAL},
+    });
+  }
+
+  /**
+   * Run the test with a chosen seek policy.
+   * @param seekPolicy fadvise policy to use.
+   */
+  public ITestS3AContractSeek(final String seekPolicy) {
+    this.seekPolicy = seekPolicy;
+  }
+
   /**
    * Create a configuration, possibly patching in S3Guard options.
+   * The FS is set to be uncached and the readahead and seek policies
+   * of the bucket itself are removed, so as to guarantee that the
+   * parameterized and test settings are
    * @return a configuration
    */
   @Override
@@ -38,6 +101,19 @@ public class ITestS3AContractSeek extends AbstractContractSeekTest {
     Configuration conf = super.createConfiguration();
     // patch in S3Guard options
     maybeEnableS3Guard(conf);
+    // purge any per-bucket overrides.
+    try {
+      URI bucketURI = new URI(checkNotNull(conf.get("fs.contract.test.fs.s3a")));
+      S3ATestUtils.removeBucketOverrides(bucketURI.getHost(), conf,
+          READAHEAD_RANGE,
+          INPUT_FADVISE);
+    } catch (URISyntaxException e) {
+      throw new RuntimeException(e);
+    }
+    // the FS is uncached, so will need clearing in test teardowns.
+    S3ATestUtils.disableFilesystemCaching(conf);
+    conf.setInt(READAHEAD_RANGE, READAHEAD);
+    conf.set(INPUT_FADVISE, seekPolicy);
     return conf;
   }
 
@@ -45,4 +121,203 @@ public class ITestS3AContractSeek extends AbstractContractSeekTest {
   protected AbstractFSContract createContract(Configuration conf) {
     return new S3AContract(conf);
   }
+
+  @Override
+  public void teardown() throws Exception {
+    S3AFileSystem fs = getFileSystem();
+    if (fs.getConf().getBoolean(FS_S3A_IMPL_DISABLE_CACHE, false)) {
+      fs.close();
+    }
+    super.teardown();
+  }
+
+  /**
+   * This subclass of the {@code path(path)} operation adds the seek policy
+   * to the end to guarantee uniqueness across different calls of the same
+   * method.
+   *
+   * {@inheritDoc}
+   */
+  @Override
+  protected Path path(final String filepath) throws IOException {
+    return super.path(filepath + "-" + seekPolicy);
+  }
+
+  /**
+   * Go to end, read then seek back to the previous position to force normal
+   * seek policy to switch to random IO.
+   * This will call readByte to trigger the second GET
+   * @param in input stream
+   * @return the byte read
+   * @throws IOException failure.
+   */
+  private byte readAtEndAndReturn(final FSDataInputStream in)
+      throws IOException {
+    long pos = in.getPos();
+    in.seek(DATASET_LEN -1);
+    in.readByte();
+    // go back to start and force a new GET
+    in.seek(pos);
+    return in.readByte();
+  }
+
+  /**
+   * Assert that the data read matches the dataset at the given offset.
+   * This helps verify that the seek process is moving the read pointer
+   * to the correct location in the file.
+   * @param readOffset the offset in the file where the read began.
+   * @param operation operation name for the assertion.
+   * @param data data read in.
+   * @param length length of data to check.
+   */
+  private void assertDatasetEquals(
+      final int readOffset, final String operation,
+      final byte[] data,
+      int length) {
+    for (int i = 0; i < length; i++) {
+      int o = readOffset + i;
+      assertEquals(operation + " with seek policy " + seekPolicy
+          + "and read offset " + readOffset
+          + ": data[" + i + "] != DATASET[" + o + "]",
+          DATASET[o], data[i]);
+    }
+  }
+
+  @Override
+  public S3AFileSystem getFileSystem() {
+    return (S3AFileSystem) super.getFileSystem();
+  }
+
+  @Test
+  public void testReadPolicyInFS() throws Throwable {
+    describe("Verify the read policy is being consistently set");
+    S3AFileSystem fs = getFileSystem();
+    assertEquals(S3AInputPolicy.getPolicy(seekPolicy), fs.getInputPolicy());
+  }
+
+  /**
+   * Test for HADOOP-16109: Parquet reading S3AFileSystem causes EOF.
+   * This sets up a read which will span the active readahead and,
+   * in random IO mode, a subsequent GET.
+   */
+  @Test
+  public void testReadAcrossReadahead() throws Throwable {
+    describe("Sets up a read which will span the active readahead"
+        + " and the rest of the file.");
+    Path path = path("testReadAcrossReadahead");
+    writeTestDataset(path);
+    FileSystem fs = getFileSystem();
+    // forward seek reading across readahead boundary
+    try (FSDataInputStream in = fs.open(path)) {
+      final byte[] temp = new byte[5];
+      in.readByte();
+      int offset = READAHEAD - 1;
+      in.readFully(offset, temp); // <-- works
+      assertDatasetEquals(offset, "read spanning boundary", temp, temp.length);
+    }
+    // Read exactly on the the boundary
+    try (FSDataInputStream in = fs.open(path)) {
+      final byte[] temp = new byte[5];
+      readAtEndAndReturn(in);
+      assertEquals("current position", 1, (int)(in.getPos()));
+      in.readFully(READAHEAD, temp);
+      assertDatasetEquals(READAHEAD, "read exactly on boundary",
+          temp, temp.length);
+    }
+  }
+
+  /**
+   * Read across the end of the read buffer using the readByte call,
+   * which will read a single byte only.
+   */
+  @Test
+  public void testReadSingleByteAcrossReadahead() throws Throwable {
+    describe("Read over boundary using read()/readByte() calls.");
+    Path path = path("testReadSingleByteAcrossReadahead");
+    writeTestDataset(path);
+    FileSystem fs = getFileSystem();
+    try (FSDataInputStream in = fs.open(path)) {
+      final byte[] b0 = new byte[1];
+      readAtEndAndReturn(in);
+      in.seek(READAHEAD - 1);
+      b0[0] = in.readByte();
+      assertDatasetEquals(READAHEAD - 1, "read before end of boundary", b0,
+          b0.length);
+      b0[0] = in.readByte();
+      assertDatasetEquals(READAHEAD, "read at end of boundary", b0, b0.length);
+      b0[0] = in.readByte();
+      assertDatasetEquals(READAHEAD + 1, "read after end of boundary", b0,
+          b0.length);
+    }
+  }
+
+  @Test
+  public void testSeekToReadaheadAndRead() throws Throwable {
+    describe("Seek to just before readahead limit and call"
+        + " InputStream.read(byte[])");
+    Path path = path("testSeekToReadaheadAndRead");
+    FileSystem fs = getFileSystem();
+    writeTestDataset(path);
+    try (FSDataInputStream in = fs.open(path)) {
+      readAtEndAndReturn(in);
+      final byte[] temp = new byte[5];
+      int offset = READAHEAD - 1;
+      in.seek(offset);
+      // expect to read at least one byte.
+      int l = in.read(temp);
+      assertTrue("Reading in temp data", l > 0);
+      LOG.info("Read of byte array at offset {} returned {} bytes", offset, l);
+      assertDatasetEquals(offset, "read at end of boundary", temp, l);
+    }
+  }
+
+  @Test
+  public void testSeekToReadaheadExactlyAndRead() throws Throwable {
+    describe("Seek to exactly the readahead limit and call"
+        + " InputStream.read(byte[])");
+    Path path = path("testSeekToReadaheadExactlyAndRead");
+    FileSystem fs = getFileSystem();
+    writeTestDataset(path);
+    try (FSDataInputStream in = fs.open(path)) {
+      readAtEndAndReturn(in);
+      final byte[] temp = new byte[5];
+      int offset = READAHEAD;
+      in.seek(offset);
+      // expect to read at least one byte.
+      int l = in.read(temp);
+      LOG.info("Read of byte array at offset {} returned {} bytes", offset, l);
+      assertTrue("Reading in temp data", l > 0);
+      assertDatasetEquals(offset, "read at end of boundary", temp, l);
+    }
+  }
+
+  @Test
+  public void testSeekToReadaheadExactlyAndReadByte() throws Throwable {
+    describe("Seek to exactly the readahead limit and call"
+        + " readByte()");
+    Path path = path("testSeekToReadaheadExactlyAndReadByte");
+    FileSystem fs = getFileSystem();
+    writeTestDataset(path);
+    try (FSDataInputStream in = fs.open(path)) {
+      readAtEndAndReturn(in);
+      final byte[] temp = new byte[1];
+      int offset = READAHEAD;
+      in.seek(offset);
+      // expect to read a byte successfully.
+      temp[0] = in.readByte();
+      assertDatasetEquals(READAHEAD, "read at end of boundary", temp, 1);
+      LOG.info("Read of byte at offset {} returned expected value", offset);
+    }
+  }
+
+  /**
+   * Write the standard {@link #DATASET} dataset to the given path.
+   * @param path path to write to.
+   * @throws IOException failure
+   */
+  private void writeTestDataset(final Path path) throws IOException {
+    ContractTestUtils.writeDataset(getFileSystem(), path,
+        DATASET, DATASET_LEN, READAHEAD, true);
+  }
+
 }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java
index 2c4f009..7f7802d 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java
@@ -163,4 +163,10 @@ public interface S3ATestConstants {
    */
   String CONFIGURATION_TEST_ENDPOINT =
       "test.fs.s3a.endpoint";
+
+  /**
+   * Property to set to disable caching.
+   */
+  String FS_S3A_IMPL_DISABLE_CACHE
+      = "fs.s3a.impl.disable.cache";
 }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
index b302e72..ad5523f 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
@@ -36,15 +36,18 @@ import org.junit.internal.AssumptionViolatedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.List;
+import java.util.concurrent.Callable;
 
 import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
 import static org.apache.hadoop.fs.s3a.S3ATestConstants.*;
 import static org.apache.hadoop.fs.s3a.Constants.*;
 import static org.apache.hadoop.fs.s3a.S3AUtils.propagateBucketOptions;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 import static org.junit.Assert.*;
 
 /**
@@ -445,6 +448,72 @@ public final class S3ATestUtils {
   }
 
   /**
+   * Variant of {@code LambdaTestUtils#intercept() which closes the Closeable
+   * returned by the invoked operation, and using its toString() value
+   * for exception messages.
+   * @param clazz class of exception; the raised exception must be this class
+   * <i>or a subclass</i>.
+   * @param contained string which must be in the {@code toString()} value
+   * of the exception
+   * @param eval expression to eval
+   * @param <T> return type of expression
+   * @param <E> exception class
+   * @return the caught exception if it was of the expected type and contents
+   */
+  public static <E extends Throwable, T extends Closeable> E interceptClosing(
+      Class<E> clazz,
+      String contained,
+      Callable<T> eval)
+      throws Exception {
+
+    return intercept(clazz, contained,
+        () -> {
+          try (Closeable c = eval.call()) {
+            return c.toString();
+          }
+        });
+  }
+
+  /**
+   * Remove any values from a bucket.
+   * @param bucket bucket whose overrides are to be removed. Can be null/empty
+   * @param conf config
+   * @param options list of fs.s3a options to remove
+   */
+  public static void removeBucketOverrides(final String bucket,
+      final Configuration conf,
+      final String... options) {
+
+    if (StringUtils.isEmpty(bucket)) {
+      return;
+    }
+    final String bucketPrefix = FS_S3A_BUCKET_PREFIX + bucket + '.';
+    for (String option : options) {
+      final String stripped = option.substring("fs.s3a.".length());
+      String target = bucketPrefix + stripped;
+      if (conf.get(target) != null) {
+        LOG.debug("Removing option {}", target);
+        conf.unset(target);
+      }
+    }
+  }
+
+  /**
+   * Remove any values from a bucket and the base values too.
+   * @param bucket bucket whose overrides are to be removed. Can be null/empty.
+   * @param conf config
+   * @param options list of fs.s3a options to remove
+   */
+  public static void removeBaseAndBucketOverrides(final String bucket,
+      final Configuration conf,
+      final String... options) {
+    for (String option : options) {
+      conf.unset(option);
+    }
+    removeBucketOverrides(bucket, conf, options);
+  }
+
+  /**
    * Helper class to do diffs of metrics.
    */
   public static final class MetricDiff {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org