You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-issues@hadoop.apache.org by GitBox <gi...@apache.org> on 2019/03/04 18:03:05 UTC

[GitHub] [hadoop] hadoop-yetus commented on a change in pull request #539: HADOOP-16109. Parquet reading S3AFileSystem causes EOF

hadoop-yetus commented on a change in pull request #539: HADOOP-16109. Parquet reading S3AFileSystem causes EOF
URL: https://github.com/apache/hadoop/pull/539#discussion_r262176368
 
 

 ##########
 File path: hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractSeek.java
 ##########
 @@ -18,31 +18,282 @@
 
 package org.apache.hadoop.fs.contract.s3a;
 
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.contract.AbstractContractSeekTest;
 import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.s3a.Constants;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3AInputPolicy;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE;
+import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADV_NORMAL;
+import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADV_RANDOM;
+import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADV_SEQUENTIAL;
+import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE;
+import static org.apache.hadoop.fs.s3a.S3ATestConstants.FS_S3A_IMPL_DISABLE_CACHE;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.maybeEnableS3Guard;
 
 /**
  * S3A contract tests covering file seek.
  */
+@RunWith(Parameterized.class)
 public class ITestS3AContractSeek extends AbstractContractSeekTest {
 
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ITestS3AContractSeek.class);
+
+  protected static final int READAHEAD = 1024;
+
+  private final String seekPolicy;
+
+  public static final int DATASET_LEN = READAHEAD * 2;
+
+  public static final byte[] DATASET = ContractTestUtils.dataset(DATASET_LEN, 'a', 32);
+
+  /**
+   * Test array for parameterized test runs.
+   * @return a list of parameter tuples.
+   */
+  @Parameterized.Parameters
+  public static Collection<Object[]> params() {
+    return Arrays.asList(new Object[][]{
+        {INPUT_FADV_RANDOM},
+        {INPUT_FADV_NORMAL},
+        {INPUT_FADV_SEQUENTIAL},
+    });
+  }
+
+  public ITestS3AContractSeek(final String seekPolicy) {
+    this.seekPolicy = seekPolicy;
+  }
+
   /**
    * Create a configuration, possibly patching in S3Guard options.
+   * The FS is set to be uncached and the readhead and seek policies 
+   * of the bucket itself are removed, so as to guarantee that the
+   * parameterized and test settings are 
    * @return a configuration
    */
   @Override
   protected Configuration createConfiguration() {
     Configuration conf = super.createConfiguration();
     // patch in S3Guard options
     maybeEnableS3Guard(conf);
+    // purge any per-bucket overrides.
+    try {
+      URI bucketURI = new URI(checkNotNull(conf.get("fs.contract.test.fs.s3a")));
+      S3ATestUtils.removeBucketOverrides(bucketURI.getHost(), conf,
+          READAHEAD_RANGE,
+          INPUT_FADVISE);
+    } catch (URISyntaxException e) {
+      throw new RuntimeException(e);
+    }
+    // the FS is uncached, so will need clearing in test teardowns.
+    S3ATestUtils.disableFilesystemCaching(conf);
+    conf.setInt(READAHEAD_RANGE, READAHEAD);
+    conf.set(INPUT_FADVISE, seekPolicy);
+    conf.set(INPUT_FADVISE, seekPolicy);
     return conf;
   }
 
   @Override
   protected AbstractFSContract createContract(Configuration conf) {
     return new S3AContract(conf);
   }
+
+  @Override
+  public void teardown() throws Exception {
+    S3AFileSystem fs = getFileSystem();
+    if (fs.getConf().getBoolean(FS_S3A_IMPL_DISABLE_CACHE, false)) {
+      fs.close();
+    }
+    super.teardown();
+  }
+
+  /**
+   * This subclass of the {@code path(path)} operation adds the seek policy
+   * to the end to guarantee uniqueness across different calls of the same method.
+   * @param filepath path string in
+   * @return
+   * @throws IOException
+   */
+  @Override
+  protected Path path(final String filepath) throws IOException {
+    return super.path(filepath + "-" + seekPolicy);
+  }
+
+  /**
+   * Go to end, read then seek back to the previous position to force normal
+   * seek policy to switch to random IO.
+   * This will call readByte to trigger the second GET
+   * @param in input stream
+   * @return the byte read
+   * @throws IOException failure.
+   */
+  private byte readAtEndAndReturn(final FSDataInputStream in)
+      throws IOException {
+    // 
 
 Review comment:
   whitespace:end of line
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org