You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xy...@apache.org on 2018/05/07 20:35:57 UTC

[32/35] hadoop git commit: HADOOP-15446. WASB: PageBlobInputStream.skip breaks HBASE replication. Contributed by Thomas Marquardt

HADOOP-15446. WASB: PageBlobInputStream.skip breaks HBASE replication.
Contributed by Thomas Marquardt


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5b11b9fd
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5b11b9fd
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5b11b9fd

Branch: refs/heads/HDDS-4
Commit: 5b11b9fd413470e134ecdc7c50468f8c7b39fa50
Parents: 67f239c
Author: Steve Loughran <st...@apache.org>
Authored: Mon May 7 11:54:08 2018 +0100
Committer: Steve Loughran <st...@apache.org>
Committed: Mon May 7 11:54:08 2018 +0100

----------------------------------------------------------------------
 .../hadoop/fs/azure/PageBlobInputStream.java    | 123 +++--
 .../fs/azure/ITestPageBlobInputStream.java      | 527 +++++++++++++++++++
 2 files changed, 605 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/5b11b9fd/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobInputStream.java
index aaac490..40bf6f4 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobInputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobInputStream.java
@@ -25,12 +25,14 @@ import static org.apache.hadoop.fs.azure.PageBlobFormatHelpers.toShort;
 import static org.apache.hadoop.fs.azure.PageBlobFormatHelpers.withMD5Checking;
 
 import java.io.ByteArrayOutputStream;
+import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSExceptionMessages;
 import org.apache.hadoop.fs.azure.StorageInterface.CloudPageBlobWrapper;
 
 import com.microsoft.azure.storage.OperationContext;
@@ -58,7 +60,9 @@ final class PageBlobInputStream extends InputStream {
   // The buffer holding the current data we last read from the server.
   private byte[] currentBuffer;
   // The current byte offset we're at in the buffer.
-  private int currentOffsetInBuffer;
+  private int currentBufferOffset;
+  // The current buffer length
+  private int currentBufferLength;
   // Maximum number of pages to get per any one request.
   private static final int MAX_PAGES_PER_DOWNLOAD =
       4 * 1024 * 1024 / PAGE_SIZE;
@@ -174,7 +178,7 @@ final class PageBlobInputStream extends InputStream {
 
   private boolean dataAvailableInBuffer() {
     return currentBuffer != null 
-        && currentOffsetInBuffer < currentBuffer.length;
+        && currentBufferOffset < currentBufferLength;
   }
 
   /**
@@ -194,6 +198,8 @@ final class PageBlobInputStream extends InputStream {
       return true;
     }
     currentBuffer = null;
+    currentBufferOffset = 0;
+    currentBufferLength = 0;
     if (numberOfPagesRemaining == 0) {
       // No more data to read.
       return false;
@@ -209,43 +215,48 @@ final class PageBlobInputStream extends InputStream {
       ByteArrayOutputStream baos = new ByteArrayOutputStream(bufferSize);
       blob.downloadRange(currentOffsetInBlob, bufferSize, baos,
           withMD5Checking(), opContext);
-      currentBuffer = baos.toByteArray();
+      validateDataIntegrity(baos.toByteArray());
     } catch (StorageException e) {
       throw new IOException(e);
     }
     numberOfPagesRemaining -= pagesToRead;
     currentOffsetInBlob += bufferSize;
-    currentOffsetInBuffer = PAGE_HEADER_SIZE;
-
-    // Since we just downloaded a new buffer, validate its consistency.
-    validateCurrentBufferConsistency();
 
     return true;
   }
 
-  private void validateCurrentBufferConsistency()
+  private void validateDataIntegrity(byte[] buffer)
       throws IOException {
-    if (currentBuffer.length % PAGE_SIZE != 0) {
+
+    if (buffer.length % PAGE_SIZE != 0) {
       throw new AssertionError("Unexpected buffer size: " 
-      + currentBuffer.length);
+      + buffer.length);
     }
-    int numberOfPages = currentBuffer.length / PAGE_SIZE;
+
+    int bufferLength = 0;
+    int numberOfPages = buffer.length / PAGE_SIZE;
+    long totalPagesAfterCurrent = numberOfPagesRemaining;
+
     for (int page = 0; page < numberOfPages; page++) {
-      short currentPageSize = getPageSize(blob, currentBuffer,
-          page * PAGE_SIZE);
-      // Calculate the number of pages that exist after this one
-      // in the blob.
-      long totalPagesAfterCurrent =
-          (numberOfPages - page - 1) + numberOfPagesRemaining;
-      // Only the last page is allowed to be not filled completely.
-      if (currentPageSize < PAGE_DATA_SIZE 
+      // Calculate the number of pages that exist in the blob after this one
+      totalPagesAfterCurrent--;
+
+      short currentPageSize = getPageSize(blob, buffer, page * PAGE_SIZE);
+
+      // Only the last page can be partially filled.
+      if (currentPageSize < PAGE_DATA_SIZE
           && totalPagesAfterCurrent > 0) {
         throw fileCorruptException(blob, String.format(
-            "Page with partial data found in the middle (%d pages from the" 
-            + " end) that only has %d bytes of data.",
-            totalPagesAfterCurrent, currentPageSize));
+            "Page with partial data found in the middle (%d pages from the"
+             + " end) that only has %d bytes of data.",
+             totalPagesAfterCurrent, currentPageSize));
       }
+      bufferLength += currentPageSize + PAGE_HEADER_SIZE;
     }
+
+    currentBufferOffset = PAGE_HEADER_SIZE;
+    currentBufferLength = bufferLength;
+    currentBuffer = buffer;
   }
 
   // Reads the page size from the page header at the given offset.
@@ -275,7 +286,7 @@ final class PageBlobInputStream extends InputStream {
       }
       int bytesRemainingInCurrentPage = getBytesRemainingInCurrentPage();
       int numBytesToRead = Math.min(len, bytesRemainingInCurrentPage);
-      System.arraycopy(currentBuffer, currentOffsetInBuffer, outputBuffer,
+      System.arraycopy(currentBuffer, currentBufferOffset, outputBuffer,
           offset, numBytesToRead);
       numberOfBytesRead += numBytesToRead;
       offset += numBytesToRead;
@@ -284,7 +295,7 @@ final class PageBlobInputStream extends InputStream {
         // We've finished this page, move on to the next.
         advancePagesInBuffer(1);
       } else {
-        currentOffsetInBuffer += numBytesToRead;
+        currentBufferOffset += numBytesToRead;
       }
     }
 
@@ -309,9 +320,26 @@ final class PageBlobInputStream extends InputStream {
   }
 
   /**
-   * Skips over and discards n bytes of data from this input stream.
-   * @param n the number of bytes to be skipped.
-   * @return the actual number of bytes skipped.
+   * Skips over and discards <code>n</code> bytes of data from this input
+   * stream. The <code>skip</code> method may, for a variety of reasons, end
+   * up skipping over some smaller number of bytes, possibly <code>0</code>.
+   * This may result from any of a number of conditions; reaching end of file
+   * before <code>n</code> bytes have been skipped is only one possibility.
+   * The actual number of bytes skipped is returned. If {@code n} is
+   * negative, the {@code skip} method for class {@code InputStream} always
+   * returns 0, and no bytes are skipped. Subclasses may handle the negative
+   * value differently.
+   *
+   * <p> The <code>skip</code> method of this class creates a
+   * byte array and then repeatedly reads into it until <code>n</code> bytes
+   * have been read or the end of the stream has been reached. Subclasses are
+   * encouraged to provide a more efficient implementation of this method.
+   * For instance, the implementation may depend on the ability to seek.
+   *
+   * @param      n   the number of bytes to be skipped.
+   * @return     the actual number of bytes skipped.
+   * @exception  IOException  if the stream does not support seek,
+   *                          or if some other I/O error occurs.
    */
   @Override
   public synchronized long skip(long n) throws IOException {
@@ -338,18 +366,23 @@ final class PageBlobInputStream extends InputStream {
     n -= skippedWithinBuffer;
     long skipped = skippedWithinBuffer;
 
-    // Empty the current buffer, we're going beyond it.
-    currentBuffer = null;
+    if (n == 0) {
+      return skipped;
+    }
+
+    if (numberOfPagesRemaining == 0) {
+      throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
+    } else if (numberOfPagesRemaining > 1) {
+      // skip over as many pages as we can, but we must read the last
+      // page as it may not be full
+      long pagesToSkipOver = Math.min(n / PAGE_DATA_SIZE,
+          numberOfPagesRemaining - 1);
+      numberOfPagesRemaining -= pagesToSkipOver;
+      currentOffsetInBlob += pagesToSkipOver * PAGE_SIZE;
+      skipped += pagesToSkipOver * PAGE_DATA_SIZE;
+      n -= pagesToSkipOver * PAGE_DATA_SIZE;
+    }
 
-    // Skip over whole pages as necessary without retrieving them from the
-    // server.
-    long pagesToSkipOver = Math.max(0, Math.min(
-        n / PAGE_DATA_SIZE,
-        numberOfPagesRemaining - 1));
-    numberOfPagesRemaining -= pagesToSkipOver;
-    currentOffsetInBlob += pagesToSkipOver * PAGE_SIZE;
-    skipped += pagesToSkipOver * PAGE_DATA_SIZE;
-    n -= pagesToSkipOver * PAGE_DATA_SIZE;
     if (n == 0) {
       return skipped;
     }
@@ -387,14 +420,14 @@ final class PageBlobInputStream extends InputStream {
 
     // Calculate how many whole pages (pages before the possibly partially
     // filled last page) remain.
-    int currentPageIndex = currentOffsetInBuffer / PAGE_SIZE;
+    int currentPageIndex = currentBufferOffset / PAGE_SIZE;
     int numberOfPagesInBuffer = currentBuffer.length / PAGE_SIZE;
     int wholePagesRemaining = numberOfPagesInBuffer - currentPageIndex - 1;
 
     if (n < (PAGE_DATA_SIZE * wholePagesRemaining)) {
       // I'm within one of the whole pages remaining, skip in there.
       advancePagesInBuffer((int) (n / PAGE_DATA_SIZE));
-      currentOffsetInBuffer += n % PAGE_DATA_SIZE;
+      currentBufferOffset += n % PAGE_DATA_SIZE;
       return n + skipped;
     }
 
@@ -417,8 +450,8 @@ final class PageBlobInputStream extends InputStream {
    */
   private long skipWithinCurrentPage(long n) throws IOException {
     int remainingBytesInCurrentPage = getBytesRemainingInCurrentPage();
-    if (n < remainingBytesInCurrentPage) {
-      currentOffsetInBuffer += n;
+    if (n <= remainingBytesInCurrentPage) {
+      currentBufferOffset += n;
       return n;
     } else {
       advancePagesInBuffer(1);
@@ -438,7 +471,7 @@ final class PageBlobInputStream extends InputStream {
     // Calculate our current position relative to the start of the current
     // page.
     int currentDataOffsetInPage =
-        (currentOffsetInBuffer % PAGE_SIZE) - PAGE_HEADER_SIZE;
+        (currentBufferOffset % PAGE_SIZE) - PAGE_HEADER_SIZE;
     int pageBoundary = getCurrentPageStartInBuffer();
     // Get the data size of the current page from the header.
     short sizeOfCurrentPage = getPageSize(blob, currentBuffer, pageBoundary);
@@ -454,14 +487,14 @@ final class PageBlobInputStream extends InputStream {
   }
 
   private void advancePagesInBuffer(int numberOfPages) {
-    currentOffsetInBuffer =
+    currentBufferOffset =
         getCurrentPageStartInBuffer() 
         + (numberOfPages * PAGE_SIZE) 
         + PAGE_HEADER_SIZE;
   }
 
   private int getCurrentPageStartInBuffer() {
-    return PAGE_SIZE * (currentOffsetInBuffer / PAGE_SIZE);
+    return PAGE_SIZE * (currentBufferOffset / PAGE_SIZE);
   }
 
   private static IOException fileCorruptException(CloudPageBlobWrapper blob,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5b11b9fd/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestPageBlobInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestPageBlobInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestPageBlobInputStream.java
new file mode 100644
index 0000000..8c939fc
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestPageBlobInputStream.java
@@ -0,0 +1,527 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.io.EOFException;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.concurrent.Callable;
+
+import org.junit.FixMethodOrder;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.junit.runners.MethodSorters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Test semantics of the page blob input stream
+ */
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+
+public class ITestPageBlobInputStream extends AbstractWasbTestBase {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      ITestPageBlobInputStream.class);
+  private static final int KILOBYTE = 1024;
+  private static final int MEGABYTE = KILOBYTE * KILOBYTE;
+  private static final int TEST_FILE_SIZE = 6 * MEGABYTE;
+  private static final Path TEST_FILE_PATH = new Path(
+      "TestPageBlobInputStream.txt");
+
+  private long testFileLength;
+
+  /**
+   * Long test timeout.
+   */
+  @Rule
+  public Timeout testTimeout = new Timeout(10 * 60 * 1000);
+  private FileStatus testFileStatus;
+  private Path hugefile;
+
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+    createTestAccount();
+
+    hugefile = fs.makeQualified(TEST_FILE_PATH);
+    try {
+      testFileStatus = fs.getFileStatus(TEST_FILE_PATH);
+      testFileLength = testFileStatus.getLen();
+    } catch (FileNotFoundException e) {
+      // file doesn't exist
+      testFileLength = 0;
+    }
+  }
+
+  @Override
+  protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
+    Configuration conf = new Configuration();
+
+    // Configure the page blob directories key so every file created is a page blob.
+    conf.set(AzureNativeFileSystemStore.KEY_PAGE_BLOB_DIRECTORIES, "/");
+
+    return AzureBlobStorageTestAccount.create(
+        "testpageblobinputstream",
+        EnumSet.of(AzureBlobStorageTestAccount.CreateOptions.CreateContainer),
+        conf,
+        true);
+  }
+
+  /**
+   * Create a test file by repeating the characters in the alphabet.
+   * @throws IOException
+   */
+  private void createTestFileAndSetLength() throws IOException {
+    // To reduce test run time, the test file can be reused.
+    if (fs.exists(TEST_FILE_PATH)) {
+      testFileStatus = fs.getFileStatus(TEST_FILE_PATH);
+      testFileLength = testFileStatus.getLen();
+      LOG.info("Reusing test file: {}", testFileStatus);
+      return;
+    }
+
+    byte[] buffer = new byte[256];
+    for (int i = 0; i < buffer.length; i++) {
+      buffer[i] = (byte) i;
+    }
+
+    LOG.info("Creating test file {} of size: {}", TEST_FILE_PATH,
+        TEST_FILE_SIZE);
+
+    try(FSDataOutputStream outputStream = fs.create(TEST_FILE_PATH)) {
+      int bytesWritten = 0;
+      while (bytesWritten < TEST_FILE_SIZE) {
+        outputStream.write(buffer);
+        bytesWritten += buffer.length;
+      }
+      LOG.info("Closing stream {}", outputStream);
+      outputStream.close();
+    }
+    testFileLength = fs.getFileStatus(TEST_FILE_PATH).getLen();
+  }
+
+  void assumeHugeFileExists() throws IOException {
+    ContractTestUtils.assertPathExists(fs, "huge file not created", hugefile);
+    FileStatus status = fs.getFileStatus(hugefile);
+    ContractTestUtils.assertIsFile(hugefile, status);
+    assertTrue("File " + hugefile + " is empty", status.getLen() > 0);
+  }
+
+  @Test
+  public void test_0100_CreateHugeFile() throws IOException {
+    createTestFileAndSetLength();
+  }
+
+  @Test
+  public void test_0200_BasicReadTest() throws Exception {
+    assumeHugeFileExists();
+
+    try (
+        FSDataInputStream inputStream = fs.open(TEST_FILE_PATH);
+    ) {
+      byte[] buffer = new byte[3 * MEGABYTE];
+
+      // v1 forward seek and read a kilobyte into first kilobyte of buffer
+      long position = 5 * MEGABYTE;
+      inputStream.seek(position);
+      int numBytesRead = inputStream.read(buffer, 0, KILOBYTE);
+      assertEquals(KILOBYTE, numBytesRead);
+
+      byte[] expected = new byte[3 * MEGABYTE];
+
+      for (int i = 0; i < KILOBYTE; i++) {
+        expected[i] = (byte) ((i + position) % 256);
+      }
+
+      assertArrayEquals(expected, buffer);
+
+      int len = MEGABYTE;
+      int offset = buffer.length - len;
+
+      // v1 reverse seek and read a megabyte into last megabyte of buffer
+      position = 3 * MEGABYTE;
+      inputStream.seek(position);
+      numBytesRead = inputStream.read(buffer, offset, len);
+      assertEquals(len, numBytesRead);
+
+      for (int i = offset; i < offset + len; i++) {
+        expected[i] = (byte) ((i + position) % 256);
+      }
+
+      assertArrayEquals(expected, buffer);
+    }
+  }
+
+  @Test
+  public void test_0201_RandomReadTest() throws Exception {
+    assumeHugeFileExists();
+
+    try (
+        FSDataInputStream inputStream = fs.open(TEST_FILE_PATH);
+    ) {
+      final int bufferSize = 4 * KILOBYTE;
+      byte[] buffer = new byte[bufferSize];
+      long position = 0;
+
+      verifyConsistentReads(inputStream, buffer, position);
+
+      inputStream.seek(0);
+
+      verifyConsistentReads(inputStream, buffer, position);
+
+      int seekPosition = 2 * KILOBYTE;
+      inputStream.seek(seekPosition);
+      position = seekPosition;
+      verifyConsistentReads(inputStream, buffer, position);
+
+      inputStream.seek(0);
+      position = 0;
+      verifyConsistentReads(inputStream, buffer, position);
+
+      seekPosition = 5 * KILOBYTE;
+      inputStream.seek(seekPosition);
+      position = seekPosition;
+      verifyConsistentReads(inputStream, buffer, position);
+
+      seekPosition = 10 * KILOBYTE;
+      inputStream.seek(seekPosition);
+      position = seekPosition;
+      verifyConsistentReads(inputStream, buffer, position);
+
+      seekPosition = 4100 * KILOBYTE;
+      inputStream.seek(seekPosition);
+      position = seekPosition;
+      verifyConsistentReads(inputStream, buffer, position);
+
+      for (int i = 4 * 1024 * 1023; i < 5000; i++) {
+        seekPosition = i;
+        inputStream.seek(seekPosition);
+        position = seekPosition;
+        verifyConsistentReads(inputStream, buffer, position);
+      }
+
+      inputStream.seek(0);
+      position = 0;
+      buffer = new byte[1];
+
+      for (int i = 0; i < 5000; i++) {
+        assertEquals(1, inputStream.skip(1));
+        position++;
+        verifyConsistentReads(inputStream, buffer, position);
+        position++;
+      }
+    }
+  }
+
+  private void verifyConsistentReads(FSDataInputStream inputStream,
+                                     byte[] buffer,
+                                     long position) throws IOException {
+    int size = buffer.length;
+    final int numBytesRead = inputStream.read(buffer, 0, size);
+    assertEquals("Bytes read from stream", size, numBytesRead);
+
+    byte[] expected = new byte[size];
+    for (int i = 0; i < expected.length; i++) {
+      expected[i] = (byte) ((position + i) % 256);
+    }
+
+    assertArrayEquals("Mismatch", expected, buffer);
+  }
+
+  /**
+   * Validates the implementation of InputStream.markSupported.
+   * @throws IOException
+   */
+  @Test
+  public void test_0301_MarkSupported() throws IOException {
+    assumeHugeFileExists();
+    try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) {
+      assertTrue("mark is not supported", inputStream.markSupported());
+    }
+  }
+
+  /**
+   * Validates the implementation of InputStream.mark and reset
+   * for version 1 of the block blob input stream.
+   * @throws Exception
+   */
+  @Test
+  public void test_0303_MarkAndResetV1() throws Exception {
+    assumeHugeFileExists();
+    try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) {
+      inputStream.mark(KILOBYTE - 1);
+
+      byte[] buffer = new byte[KILOBYTE];
+      int bytesRead = inputStream.read(buffer);
+      assertEquals(buffer.length, bytesRead);
+
+      inputStream.reset();
+      assertEquals("rest -> pos 0", 0, inputStream.getPos());
+
+      inputStream.mark(8 * KILOBYTE - 1);
+
+      buffer = new byte[8 * KILOBYTE];
+      bytesRead = inputStream.read(buffer);
+      assertEquals(buffer.length, bytesRead);
+
+      intercept(IOException.class,
+          "Resetting to invalid mark",
+          new Callable<FSDataInputStream>() {
+            @Override
+            public FSDataInputStream call() throws Exception {
+              inputStream.reset();
+              return inputStream;
+            }
+          }
+      );
+    }
+  }
+
+  /**
+   * Validates the implementation of Seekable.seekToNewSource, which should
+   * return false for version 1 of the block blob input stream.
+   * @throws IOException
+   */
+  @Test
+  public void test_0305_SeekToNewSourceV1() throws IOException {
+    assumeHugeFileExists();
+    try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) {
+      assertFalse(inputStream.seekToNewSource(0));
+    }
+  }
+
+  /**
+   * Validates the implementation of InputStream.skip and ensures there is no
+   * network I/O for version 1 of the block blob input stream.
+   * @throws Exception
+   */
+  @Test
+  public void test_0307_SkipBounds() throws Exception {
+    assumeHugeFileExists();
+    try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) {
+      long skipped = inputStream.skip(-1);
+      assertEquals(0, skipped);
+
+      skipped = inputStream.skip(0);
+      assertEquals(0, skipped);
+
+      assertTrue(testFileLength > 0);
+
+      skipped = inputStream.skip(testFileLength);
+      assertEquals(testFileLength, skipped);
+
+      intercept(EOFException.class,
+          new Callable<Long>() {
+            @Override
+            public Long call() throws Exception {
+              return inputStream.skip(1);
+            }
+          }
+      );
+    }
+  }
+
+  /**
+   * Validates the implementation of Seekable.seek and ensures there is no
+   * network I/O for forward seek.
+   * @throws Exception
+   */
+  @Test
+  public void test_0309_SeekBounds() throws Exception {
+    assumeHugeFileExists();
+    try (
+        FSDataInputStream inputStream = fs.open(TEST_FILE_PATH);
+    ) {
+      inputStream.seek(0);
+      assertEquals(0, inputStream.getPos());
+
+      intercept(EOFException.class,
+          FSExceptionMessages.NEGATIVE_SEEK,
+          new Callable<FSDataInputStream>() {
+            @Override
+            public FSDataInputStream call() throws Exception {
+              inputStream.seek(-1);
+              return inputStream;
+            }
+          }
+      );
+
+      assertTrue("Test file length only " + testFileLength, testFileLength > 0);
+      inputStream.seek(testFileLength);
+      assertEquals(testFileLength, inputStream.getPos());
+
+      intercept(EOFException.class,
+          FSExceptionMessages.CANNOT_SEEK_PAST_EOF,
+          new Callable<FSDataInputStream>() {
+            @Override
+            public FSDataInputStream call() throws Exception {
+              inputStream.seek(testFileLength + 1);
+              return inputStream;
+            }
+          }
+      );
+    }
+  }
+
+  /**
+   * Validates the implementation of Seekable.seek, Seekable.getPos,
+   * and InputStream.available.
+   * @throws Exception
+   */
+  @Test
+  public void test_0311_SeekAndAvailableAndPosition() throws Exception {
+    assumeHugeFileExists();
+    try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) {
+      byte[] expected1 = {0, 1, 2};
+      byte[] expected2 = {3, 4, 5};
+      byte[] expected3 = {1, 2, 3};
+      byte[] expected4 = {6, 7, 8};
+      byte[] buffer = new byte[3];
+
+      int bytesRead = inputStream.read(buffer);
+      assertEquals(buffer.length, bytesRead);
+      assertArrayEquals(expected1, buffer);
+      assertEquals(buffer.length, inputStream.getPos());
+      assertEquals(testFileLength - inputStream.getPos(),
+          inputStream.available());
+
+      bytesRead = inputStream.read(buffer);
+      assertEquals(buffer.length, bytesRead);
+      assertArrayEquals(expected2, buffer);
+      assertEquals(2 * buffer.length, inputStream.getPos());
+      assertEquals(testFileLength - inputStream.getPos(),
+          inputStream.available());
+
+      // reverse seek
+      int seekPos = 0;
+      inputStream.seek(seekPos);
+
+      bytesRead = inputStream.read(buffer);
+      assertEquals(buffer.length, bytesRead);
+      assertArrayEquals(expected1, buffer);
+      assertEquals(buffer.length + seekPos, inputStream.getPos());
+      assertEquals(testFileLength - inputStream.getPos(),
+          inputStream.available());
+
+      // reverse seek
+      seekPos = 1;
+      inputStream.seek(seekPos);
+
+      bytesRead = inputStream.read(buffer);
+      assertEquals(buffer.length, bytesRead);
+      assertArrayEquals(expected3, buffer);
+      assertEquals(buffer.length + seekPos, inputStream.getPos());
+      assertEquals(testFileLength - inputStream.getPos(),
+          inputStream.available());
+
+      // forward seek
+      seekPos = 6;
+      inputStream.seek(seekPos);
+
+      bytesRead = inputStream.read(buffer);
+      assertEquals(buffer.length, bytesRead);
+      assertArrayEquals(expected4, buffer);
+      assertEquals(buffer.length + seekPos, inputStream.getPos());
+      assertEquals(testFileLength - inputStream.getPos(),
+          inputStream.available());
+    }
+  }
+
+  /**
+   * Validates the implementation of InputStream.skip, Seekable.getPos,
+   * and InputStream.available.
+   * @throws IOException
+   */
+  @Test
+  public void test_0313_SkipAndAvailableAndPosition() throws IOException {
+    assumeHugeFileExists();
+    try (
+        FSDataInputStream inputStream = fs.open(TEST_FILE_PATH);
+    ) {
+      byte[] expected1 = {0, 1, 2};
+      byte[] expected2 = {3, 4, 5};
+      byte[] expected3 = {1, 2, 3};
+      byte[] expected4 = {6, 7, 8};
+      assertEquals(testFileLength, inputStream.available());
+      assertEquals(0, inputStream.getPos());
+
+      int n = 3;
+      long skipped = inputStream.skip(n);
+
+      assertEquals(skipped, inputStream.getPos());
+      assertEquals(testFileLength - inputStream.getPos(),
+          inputStream.available());
+      assertEquals(skipped, n);
+
+      byte[] buffer = new byte[3];
+      int bytesRead = inputStream.read(buffer);
+      assertEquals(buffer.length, bytesRead);
+      assertArrayEquals(expected2, buffer);
+      assertEquals(buffer.length + skipped, inputStream.getPos());
+      assertEquals(testFileLength - inputStream.getPos(),
+          inputStream.available());
+
+      // does skip still work after seek?
+      int seekPos = 1;
+      inputStream.seek(seekPos);
+
+      bytesRead = inputStream.read(buffer);
+      assertEquals(buffer.length, bytesRead);
+      assertArrayEquals(expected3, buffer);
+      assertEquals(buffer.length + seekPos, inputStream.getPos());
+      assertEquals(testFileLength - inputStream.getPos(),
+          inputStream.available());
+
+      long currentPosition = inputStream.getPos();
+      n = 2;
+      skipped = inputStream.skip(n);
+
+      assertEquals(currentPosition + skipped, inputStream.getPos());
+      assertEquals(testFileLength - inputStream.getPos(),
+          inputStream.available());
+      assertEquals(skipped, n);
+
+      bytesRead = inputStream.read(buffer);
+      assertEquals(buffer.length, bytesRead);
+      assertArrayEquals(expected4, buffer);
+      assertEquals(buffer.length + skipped + currentPosition,
+          inputStream.getPos());
+      assertEquals(testFileLength - inputStream.getPos(),
+          inputStream.available());
+    }
+  }
+
+  @Test
+  public void test_999_DeleteHugeFiles() throws IOException {
+    fs.delete(TEST_FILE_PATH, false);
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org