You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by da...@apache.org on 2020/05/27 20:51:57 UTC

[hadoop] branch trunk updated: HADOOP-16852: Report read-ahead error back

This is an automated email from the ASF dual-hosted git repository.

dazhou pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 53b993e  HADOOP-16852: Report read-ahead error back
53b993e is described below

commit 53b993e6048ffaaf98e460690211fc08efb20cf2
Author: Sneha Vijayarajan <sn...@gmail.com>
AuthorDate: Wed May 27 13:51:42 2020 -0700

    HADOOP-16852: Report read-ahead error back
    
    Contributed by Sneha Vijayarajan
---
 .../fs/azurebfs/services/AbfsInputStream.java      |  12 +
 .../hadoop/fs/azurebfs/services/ReadBuffer.java    |  16 +
 .../fs/azurebfs/services/ReadBufferManager.java    |  90 ++++-
 .../fs/azurebfs/services/ReadBufferWorker.java     |  12 +-
 .../fs/azurebfs/services/TestAbfsInputStream.java  | 450 +++++++++++++++++++++
 .../fs/azurebfs/utils/TestCachedSASToken.java      |  34 ++
 6 files changed, 604 insertions(+), 10 deletions(-)

diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
index 422fa3b..50380c9 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
@@ -24,6 +24,10 @@ import java.io.IOException;
 import java.net.HttpURLConnection;
 
 import com.google.common.base.Preconditions;
+import com.google.common.annotations.VisibleForTesting;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.fs.CanUnbuffer;
 import org.apache.hadoop.fs.FSExceptionMessages;
@@ -41,6 +45,7 @@ import static org.apache.hadoop.util.StringUtils.toLowerCase;
  */
 public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
         StreamCapabilities {
+  private static final Logger LOG = LoggerFactory.getLogger(AbfsInputStream.class);
 
   private final AbfsClient client;
   private final Statistics statistics;
@@ -239,6 +244,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
     final AbfsRestOperation op;
     AbfsPerfTracker tracker = client.getAbfsPerfTracker();
     try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "readRemote", "read")) {
+      LOG.trace("Trigger client.read for path={} position={} offset={} length={}", path, position, offset, length);
       op = client.read(path, position, b, offset, length, tolerateOobAppends ? "*" : eTag, cachedSasToken.get());
       cachedSasToken.update(op.getSasToken());
       perfInfo.registerResult(op.getResult()).registerSuccess(true);
@@ -431,4 +437,10 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
   byte[] getBuffer() {
     return buffer;
   }
+
+  @VisibleForTesting
+  protected void setCachedSasToken(final CachedSASToken cachedSasToken) {
+    this.cachedSasToken = cachedSasToken;
+  }
+
 }
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java
index 00e4f00..5d55726 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java
@@ -18,10 +18,13 @@
 
 package org.apache.hadoop.fs.azurebfs.services;
 
+import java.io.IOException;
 import java.util.concurrent.CountDownLatch;
 
 import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
 
+import static org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus.READ_FAILED;
+
 class ReadBuffer {
 
   private AbfsInputStream stream;
@@ -40,6 +43,8 @@ class ReadBuffer {
   private boolean isLastByteConsumed = false;
   private boolean isAnyByteConsumed = false;
 
+  private IOException errException = null;
+
   public AbfsInputStream getStream() {
     return stream;
   }
@@ -88,12 +93,23 @@ class ReadBuffer {
     this.bufferindex = bufferindex;
   }
 
+  public IOException getErrException() {
+    return errException;
+  }
+
+  public void setErrException(final IOException errException) {
+    this.errException = errException;
+  }
+
   public ReadBufferStatus getStatus() {
     return status;
   }
 
   public void setStatus(ReadBufferStatus status) {
     this.status = status;
+    if (status == READ_FAILED) {
+      bufferindex = -1;
+    }
   }
 
   public CountDownLatch getLatch() {
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
index 5b71cf0..73c23b0 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
@@ -21,12 +21,15 @@ import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.Queue;
 import java.util.Stack;
 import java.util.concurrent.CountDownLatch;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * The Read Buffer Manager for Rest AbfsClient.
  */
@@ -36,8 +39,9 @@ final class ReadBufferManager {
   private static final int NUM_BUFFERS = 16;
   private static final int BLOCK_SIZE = 4 * 1024 * 1024;
   private static final int NUM_THREADS = 8;
-  private static final int THRESHOLD_AGE_MILLISECONDS = 3000; // have to see if 3 seconds is a good threshold
+  private static final int DEFAULT_THRESHOLD_AGE_MILLISECONDS = 3000; // have to see if 3 seconds is a good threshold
 
+  private static int thresholdAgeMilliseconds = DEFAULT_THRESHOLD_AGE_MILLISECONDS;
   private Thread[] threads = new Thread[NUM_THREADS];
   private byte[][] buffers;    // array of byte[] buffers, to hold the data that is read
   private Stack<Integer> freeList = new Stack<>();   // indices in buffers[] array that are available
@@ -141,7 +145,8 @@ final class ReadBufferManager {
    * @param buffer   the buffer to read data into. Note that the buffer will be written into from offset 0.
    * @return the number of bytes read
    */
-  int getBlock(final AbfsInputStream stream, final long position, final int length, final byte[] buffer) {
+  int getBlock(final AbfsInputStream stream, final long position, final int length, final byte[] buffer)
+      throws IOException {
     // not synchronized, so have to be careful with locking
     if (LOGGER.isTraceEnabled()) {
       LOGGER.trace("getBlock for file {}  position {}  thread {}",
@@ -244,7 +249,7 @@ final class ReadBufferManager {
         earliestBirthday = buf.getTimeStamp();
       }
     }
-    if ((currentTimeMillis() - earliestBirthday > THRESHOLD_AGE_MILLISECONDS) && (nodeToEvict != null)) {
+    if ((currentTimeMillis() - earliestBirthday > thresholdAgeMilliseconds) && (nodeToEvict != null)) {
       return evict(nodeToEvict);
     }
 
@@ -253,7 +258,12 @@ final class ReadBufferManager {
   }
 
   private boolean evict(final ReadBuffer buf) {
-    freeList.push(buf.getBufferindex());
+    // As failed ReadBuffers (bufferIndx = -1) are saved in completedReadList,
+    // avoid adding it to freeList.
+    if (buf.getBufferindex() != -1) {
+      freeList.push(buf.getBufferindex());
+    }
+
     completedReadList.remove(buf);
     if (LOGGER.isTraceEnabled()) {
       LOGGER.trace("Evicting buffer idx {}; was used for file {} offset {} length {}",
@@ -289,6 +299,27 @@ final class ReadBufferManager {
     return null;
   }
 
+  /**
+   * Returns buffers that failed or passed from completed queue.
+   * @param stream
+   * @param requestedOffset
+   * @return
+   */
+  private ReadBuffer getBufferFromCompletedQueue(final AbfsInputStream stream, final long requestedOffset) {
+    for (ReadBuffer buffer : completedReadList) {
+      // Buffer is returned if the requestedOffset is at or above buffer's
+      // offset but less than buffer's length or the actual requestedLength
+      if ((buffer.getStream() == stream)
+          && (requestedOffset >= buffer.getOffset())
+          && ((requestedOffset < buffer.getOffset() + buffer.getLength())
+          || (requestedOffset < buffer.getOffset() + buffer.getRequestedLength()))) {
+          return buffer;
+        }
+      }
+
+    return null;
+  }
+
   private void clearFromReadAheadQueue(final AbfsInputStream stream, final long requestedOffset) {
     ReadBuffer buffer = getFromList(readAheadQueue, stream, requestedOffset);
     if (buffer != null) {
@@ -299,11 +330,28 @@ final class ReadBufferManager {
   }
 
   private int getBlockFromCompletedQueue(final AbfsInputStream stream, final long position, final int length,
-                                         final byte[] buffer) {
-    ReadBuffer buf = getFromList(completedReadList, stream, position);
-    if (buf == null || position >= buf.getOffset() + buf.getLength()) {
+                                         final byte[] buffer) throws IOException {
+    ReadBuffer buf = getBufferFromCompletedQueue(stream, position);
+
+    if (buf == null) {
       return 0;
     }
+
+    if (buf.getStatus() == ReadBufferStatus.READ_FAILED) {
+      // To prevent new read requests to fail due to old read-ahead attempts,
+      // return exception only from buffers that failed within last thresholdAgeMilliseconds
+      if ((currentTimeMillis() - (buf.getTimeStamp()) < thresholdAgeMilliseconds)) {
+        throw buf.getErrException();
+      } else {
+        return 0;
+      }
+    }
+
+    if ((buf.getStatus() != ReadBufferStatus.AVAILABLE)
+        || (position >= buf.getOffset() + buf.getLength())) {
+      return 0;
+    }
+
     int cursor = (int) (position - buf.getOffset());
     int availableLengthInBuffer = buf.getLength() - cursor;
     int lengthToCopy = Math.min(length, availableLengthInBuffer);
@@ -368,14 +416,18 @@ final class ReadBufferManager {
       inProgressList.remove(buffer);
       if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) {
         buffer.setStatus(ReadBufferStatus.AVAILABLE);
-        buffer.setTimeStamp(currentTimeMillis());
         buffer.setLength(bytesActuallyRead);
         completedReadList.add(buffer);
       } else {
         freeList.push(buffer.getBufferindex());
-        // buffer should go out of scope after the end of the calling method in ReadBufferWorker, and eligible for GC
+        // buffer will be deleted as per the eviction policy.
       }
+
+      buffer.setStatus(result);
+      buffer.setTimeStamp(currentTimeMillis());
+      completedReadList.add(buffer);
     }
+
     //outside the synchronized, since anyone receiving a wake-up from the latch must see safe-published results
     buffer.getLatch().countDown(); // wake up waiting threads (if any)
   }
@@ -392,4 +444,24 @@ final class ReadBufferManager {
   private long currentTimeMillis() {
     return System.nanoTime() / 1000 / 1000;
   }
+
+  @VisibleForTesting
+  int getThresholdAgeMilliseconds() {
+    return thresholdAgeMilliseconds;
+  }
+
+  @VisibleForTesting
+  static void setThresholdAgeMilliseconds(int thresholdAgeMs) {
+    thresholdAgeMilliseconds = thresholdAgeMs;
+  }
+
+  @VisibleForTesting
+  int getCompletedReadListSize() {
+    return completedReadList.size();
+  }
+
+  @VisibleForTesting
+  void callTryEvict() {
+    tryEvict();
+  }
 }
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java
index af69de0..41acd7e 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.fs.azurebfs.services;
 
+import java.io.IOException;
 import java.util.concurrent.CountDownLatch;
 
 import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
@@ -61,9 +62,18 @@ class ReadBufferWorker implements Runnable {
       if (buffer != null) {
         try {
           // do the actual read, from the file.
-          int bytesRead = buffer.getStream().readRemote(buffer.getOffset(), buffer.getBuffer(), 0, buffer.getRequestedLength());
+          int bytesRead = buffer.getStream().readRemote(
+              buffer.getOffset(),
+              buffer.getBuffer(),
+              0,
+              // If AbfsInputStream was created with bigger buffer size than
+              // read-ahead buffer size, make sure a valid length is passed
+              // for remote read
+              Math.min(buffer.getRequestedLength(), buffer.getBuffer().length));
+
           bufferManager.doneReading(buffer, ReadBufferStatus.AVAILABLE, bytesRead);  // post result back to ReadBufferManager
         } catch (Exception ex) {
+          buffer.setErrException(new IOException(ex));
           bufferManager.doneReading(buffer, ReadBufferStatus.READ_FAILED, 0);
         }
       }
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
new file mode 100644
index 0000000..c9dacd6
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
@@ -0,0 +1,450 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TimeoutException;
+import org.apache.hadoop.fs.azurebfs.utils.TestCachedSASToken;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FORWARD_SLASH;
+
+/**
+ * Unit test AbfsInputStream.
+ */
+public class TestAbfsInputStream extends
+    AbstractAbfsIntegrationTest {
+
+  private static final int ONE_KB = 1 * 1024;
+  private static final int TWO_KB = 2 * 1024;
+  private static final int THREE_KB = 3 * 1024;
+  private static final int REDUCED_READ_BUFFER_AGE_THRESHOLD = 3000; // 3 sec
+
+  private AbfsRestOperation getMockRestOp() {
+    AbfsRestOperation op = mock(AbfsRestOperation.class);
+    AbfsHttpOperation httpOp = mock(AbfsHttpOperation.class);
+    when(httpOp.getBytesReceived()).thenReturn(1024L);
+    when(op.getResult()).thenReturn(httpOp);
+    when(op.getSasToken()).thenReturn(TestCachedSASToken.getTestCachedSASTokenInstance().get());
+    return op;
+  }
+
+  private AbfsClient getMockAbfsClient() {
+    // Mock failure for client.read()
+    AbfsClient client = mock(AbfsClient.class);
+    AbfsPerfTracker tracker = new AbfsPerfTracker(
+        "test",
+        this.getAccountName(),
+        this.getConfiguration());
+    when(client.getAbfsPerfTracker()).thenReturn(tracker);
+
+    return client;
+  }
+
+  private AbfsInputStream getAbfsInputStream(AbfsClient mockAbfsClient, String fileName) {
+    AbfsInputStreamContext inputStreamContext = new AbfsInputStreamContext(-1);
+    // Create AbfsInputStream with the client instance
+    AbfsInputStream inputStream = new AbfsInputStream(
+        mockAbfsClient,
+        null,
+        FORWARD_SLASH + fileName,
+        THREE_KB,
+        inputStreamContext.withReadBufferSize(ONE_KB).withReadAheadQueueDepth(10),
+        "eTag");
+
+    inputStream.setCachedSasToken(
+        TestCachedSASToken.getTestCachedSASTokenInstance());
+
+    return inputStream;
+  }
+
+  private void queueReadAheads(AbfsInputStream inputStream) {
+    // Mimic AbfsInputStream readAhead queue requests
+    ReadBufferManager.getBufferManager()
+        .queueReadAhead(inputStream, 0, ONE_KB);
+    ReadBufferManager.getBufferManager()
+        .queueReadAhead(inputStream, ONE_KB, ONE_KB);
+    ReadBufferManager.getBufferManager()
+        .queueReadAhead(inputStream, TWO_KB, TWO_KB);
+  }
+
+  private void verifyReadCallCount(AbfsClient client, int count) throws
+      AzureBlobFileSystemException, InterruptedException {
+    // ReadAhead threads are triggered asynchronously.
+    // Wait a second before verifying the number of total calls.
+    Thread.sleep(1000);
+    verify(client, times(count)).read(any(String.class), any(Long.class),
+        any(byte[].class), any(Integer.class), any(Integer.class),
+        any(String.class), any(String.class));
+  }
+
+  private void checkEvictedStatus(AbfsInputStream inputStream, int position, boolean expectedToThrowException)
+      throws Exception {
+    // Sleep for the eviction threshold time
+    Thread.sleep(ReadBufferManager.getBufferManager().getThresholdAgeMilliseconds() + 1000);
+
+    // Eviction is done only when AbfsInputStream tries to queue new items.
+    // 1 tryEvict will remove 1 eligible item. To ensure that the current test buffer
+    // will get evicted (considering there could be other tests running in parallel),
+    // call tryEvict for the number of items that are there in completedReadList.
+    int numOfCompletedReadListItems = ReadBufferManager.getBufferManager().getCompletedReadListSize();
+    while (numOfCompletedReadListItems > 0) {
+      ReadBufferManager.getBufferManager().callTryEvict();
+      numOfCompletedReadListItems--;
+    }
+
+    if (expectedToThrowException) {
+      intercept(IOException.class,
+          () -> inputStream.read(position, new byte[ONE_KB], 0, ONE_KB));
+    } else {
+      inputStream.read(position, new byte[ONE_KB], 0, ONE_KB);
+    }
+  }
+
+  public TestAbfsInputStream() throws Exception {
+    super();
+    // Reduce thresholdAgeMilliseconds to 3 sec for the tests
+    ReadBufferManager.getBufferManager().setThresholdAgeMilliseconds(REDUCED_READ_BUFFER_AGE_THRESHOLD);
+  }
+
+  /**
+   * This test expects AbfsInputStream to throw the exception that readAhead
+   * thread received on read. The readAhead thread must be initiated from the
+   * active read request itself.
+   * Also checks that the ReadBuffers are evicted as per the ReadBufferManager
+   * threshold criteria.
+   * @throws Exception
+   */
+  @Test
+  public void testFailedReadAhead() throws Exception {
+    AbfsClient client = getMockAbfsClient();
+    AbfsRestOperation successOp = getMockRestOp();
+
+    // Stub :
+    // Read request leads to 3 readahead calls: Fail all 3 readahead-client.read()
+    // Actual read request fails with the failure in readahead thread
+    doThrow(new TimeoutException("Internal Server error for RAH-Thread-X"))
+        .doThrow(new TimeoutException("Internal Server error for RAH-Thread-Y"))
+        .doThrow(new TimeoutException("Internal Server error RAH-Thread-Z"))
+        .doReturn(successOp) // Any extra calls to read, pass it.
+        .when(client)
+        .read(any(String.class), any(Long.class), any(byte[].class),
+            any(Integer.class), any(Integer.class), any(String.class),
+            any(String.class));
+
+    AbfsInputStream inputStream = getAbfsInputStream(client, "testFailedReadAhead.txt");
+
+    // Scenario: ReadAhead triggered from current active read call failed
+    // Before the change to return exception from readahead buffer,
+    // AbfsInputStream would have triggered an extra readremote on noticing
+    // data absent in readahead buffers
+    // In this test, a read should trigger 3 client.read() calls as file is 3 KB
+    // and readahead buffer size set in AbfsInputStream is 1 KB
+    // There should only be a total of 3 client.read() in this test.
+    intercept(IOException.class,
+        () -> inputStream.read(new byte[ONE_KB]));
+
+    // Only the 3 readAhead threads should have triggered client.read
+    verifyReadCallCount(client, 3);
+
+    // Stub returns success for the 4th read request, if ReadBuffers still
+    // persisted, ReadAheadManager getBlock would have returned exception.
+    checkEvictedStatus(inputStream, 0, false);
+  }
+
+  /**
+   * The test expects AbfsInputStream to initiate a remote read request for
+   * the request offset and length when previous read ahead on the offset had failed.
+   * Also checks that the ReadBuffers are evicted as per the ReadBufferManager
+   * threshold criteria.
+   * @throws Exception
+   */
+  @Test
+  public void testOlderReadAheadFailure() throws Exception {
+    AbfsClient client = getMockAbfsClient();
+    AbfsRestOperation successOp = getMockRestOp();
+
+    // Stub :
+    // First Read request leads to 3 readahead calls: Fail all 3 readahead-client.read()
+    // A second read request will see that readahead had failed for data in
+    // the requested offset range and also that its is an older readahead request.
+    // So attempt a new read only for the requested range.
+    doThrow(new TimeoutException("Internal Server error for RAH-X"))
+        .doThrow(new TimeoutException("Internal Server error for RAH-Y"))
+        .doThrow(new TimeoutException("Internal Server error for RAH-Z"))
+        .doReturn(successOp) // pass the read for second read request
+        .doReturn(successOp) // pass success for post eviction test
+        .when(client)
+        .read(any(String.class), any(Long.class), any(byte[].class),
+            any(Integer.class), any(Integer.class), any(String.class),
+            any(String.class));
+
+    AbfsInputStream inputStream = getAbfsInputStream(client, "testOlderReadAheadFailure.txt");
+
+    // First read request that fails as the readahead triggered from this request failed.
+    intercept(IOException.class,
+        () -> inputStream.read(new byte[ONE_KB]));
+
+    // Only the 3 readAhead threads should have triggered client.read
+    verifyReadCallCount(client, 3);
+
+    // Sleep for thresholdAgeMs so that the read ahead buffer qualifies for being old.
+    Thread.sleep(ReadBufferManager.getBufferManager().getThresholdAgeMilliseconds());
+
+    // Second read request should retry the read (and not issue any new readaheads)
+    inputStream.read(ONE_KB, new byte[ONE_KB], 0, ONE_KB);
+
+    // Once created, mock will remember all interactions. So total number of read
+    // calls will be one more from earlier (there is a reset mock which will reset the
+    // count, but the mock stub is erased as well which needs AbsInputStream to be recreated,
+    // which beats the purpose)
+    verifyReadCallCount(client, 4);
+
+    // Stub returns success for the 5th read request, if ReadBuffers still
+    // persisted request would have failed for position 0.
+    checkEvictedStatus(inputStream, 0, false);
+  }
+
+  /**
+   * The test expects AbfsInputStream to utilize any data read ahead for
+   * requested offset and length.
+   * @throws Exception
+   */
+  @Test
+  public void testSuccessfulReadAhead() throws Exception {
+    // Mock failure for client.read()
+    AbfsClient client = getMockAbfsClient();
+
+    // Success operation mock
+    AbfsRestOperation op = getMockRestOp();
+
+    // Stub :
+    // Pass all readAheads and fail the post eviction request to
+    // prove ReadAhead buffer is used
+    // for post eviction check, fail all read aheads
+    doReturn(op)
+        .doReturn(op)
+        .doReturn(op)
+        .doThrow(new TimeoutException("Internal Server error for RAH-X"))
+        .doThrow(new TimeoutException("Internal Server error for RAH-Y"))
+        .doThrow(new TimeoutException("Internal Server error for RAH-Z"))
+        .when(client)
+        .read(any(String.class), any(Long.class), any(byte[].class),
+            any(Integer.class), any(Integer.class), any(String.class),
+            any(String.class));
+
+    AbfsInputStream inputStream = getAbfsInputStream(client, "testSuccessfulReadAhead.txt");
+
+    // First read request that triggers readAheads.
+    inputStream.read(new byte[ONE_KB]);
+
+    // Only the 3 readAhead threads should have triggered client.read
+    verifyReadCallCount(client, 3);
+
+    // Another read request whose requested data is already read ahead.
+    inputStream.read(ONE_KB, new byte[ONE_KB], 0, ONE_KB);
+
+    // Once created, mock will remember all interactions.
+    // As the above read should not have triggered any server calls, total
+    // number of read calls made at this point will be same as last.
+    verifyReadCallCount(client, 3);
+
+    // Stub will throw exception for client.read() for 4th and later calls
+    // if not using the read-ahead buffer exception will be thrown on read
+    checkEvictedStatus(inputStream, 0, true);
+  }
+
+  /**
+   * This test expects ReadAheadManager to throw exception if the read ahead
+   * thread had failed within the last thresholdAgeMilliseconds.
+   * Also checks that the ReadBuffers are evicted as per the ReadBufferManager
+   * threshold criteria.
+   * @throws Exception
+   */
+  @Test
+  public void testReadAheadManagerForFailedReadAhead() throws Exception {
+    AbfsClient client = getMockAbfsClient();
+    AbfsRestOperation successOp = getMockRestOp();
+
+    // Stub :
+    // Read request leads to 3 readahead calls: Fail all 3 readahead-client.read()
+    // Actual read request fails with the failure in readahead thread
+    doThrow(new TimeoutException("Internal Server error for RAH-Thread-X"))
+        .doThrow(new TimeoutException("Internal Server error for RAH-Thread-Y"))
+        .doThrow(new TimeoutException("Internal Server error RAH-Thread-Z"))
+        .doReturn(successOp) // Any extra calls to read, pass it.
+        .when(client)
+        .read(any(String.class), any(Long.class), any(byte[].class),
+            any(Integer.class), any(Integer.class), any(String.class),
+            any(String.class));
+
+    AbfsInputStream inputStream = getAbfsInputStream(client, "testReadAheadManagerForFailedReadAhead.txt");
+
+    queueReadAheads(inputStream);
+
+    // AbfsInputStream Read would have waited for the read-ahead for the requested offset
+    // as we are testing from ReadAheadManager directly, sleep for a sec to
+    // get the read ahead threads to complete
+    Thread.sleep(1000);
+
+    // if readAhead failed for specific offset, getBlock should
+    // throw exception from the ReadBuffer that failed within last thresholdAgeMilliseconds sec
+    intercept(IOException.class,
+        () -> ReadBufferManager.getBufferManager().getBlock(
+            inputStream,
+            0,
+            ONE_KB,
+            new byte[ONE_KB]));
+
+    // Only the 3 readAhead threads should have triggered client.read
+    verifyReadCallCount(client, 3);
+
+    // Stub returns success for the 4th read request, if ReadBuffers still
+    // persisted, ReadAheadManager getBlock would have returned exception.
+    checkEvictedStatus(inputStream, 0, false);
+  }
+
+  /**
+   * The test expects ReadAheadManager to return 0 receivedBytes when previous
+   * read ahead on the offset had failed and not throw exception received then.
+   * Also checks that the ReadBuffers are evicted as per the ReadBufferManager
+   * threshold criteria.
+   * @throws Exception
+   */
+  @Test
+  public void testReadAheadManagerForOlderReadAheadFailure() throws Exception {
+    AbfsClient client = getMockAbfsClient();
+    AbfsRestOperation successOp = getMockRestOp();
+
+    // Stub :
+    // First Read request leads to 3 readahead calls: Fail all 3 readahead-client.read()
+    // A second read request will see that readahead had failed for data in
+    // the requested offset range but also that its is an older readahead request.
+    // System issue could have resolved by now, so attempt a new read only for the requested range.
+    doThrow(new TimeoutException("Internal Server error for RAH-X"))
+        .doThrow(new TimeoutException("Internal Server error for RAH-X"))
+        .doThrow(new TimeoutException("Internal Server error for RAH-X"))
+        .doReturn(successOp) // pass the read for second read request
+        .doReturn(successOp) // pass success for post eviction test
+        .when(client)
+        .read(any(String.class), any(Long.class), any(byte[].class),
+            any(Integer.class), any(Integer.class), any(String.class),
+            any(String.class));
+
+    AbfsInputStream inputStream = getAbfsInputStream(client, "testReadAheadManagerForOlderReadAheadFailure.txt");
+
+    queueReadAheads(inputStream);
+
+    // AbfsInputStream Read would have waited for the read-ahead for the requested offset
+    // as we are testing from ReadAheadManager directly, sleep for thresholdAgeMilliseconds so that
+    // read buffer qualifies for to be an old buffer
+    Thread.sleep(ReadBufferManager.getBufferManager().getThresholdAgeMilliseconds());
+
+    // Only the 3 readAhead threads should have triggered client.read
+    verifyReadCallCount(client, 3);
+
+    // getBlock from a new read request should return 0 if there is a failure
+    // 30 sec before in read ahead buffer for respective offset.
+    int bytesRead = ReadBufferManager.getBufferManager().getBlock(
+        inputStream,
+        ONE_KB,
+        ONE_KB,
+        new byte[ONE_KB]);
+    Assert.assertEquals("bytesRead should be zero when previously read "
+        + "ahead buffer had failed", 0, bytesRead);
+
+    // Stub returns success for the 5th read request, if ReadBuffers still
+    // persisted request would have failed for position 0.
+    checkEvictedStatus(inputStream, 0, false);
+  }
+
+  /**
+   * The test expects ReadAheadManager to return data from previously read
+   * ahead data of same offset.
+   * @throws Exception
+   */
+  @Test
+  public void testReadAheadManagerForSuccessfulReadAhead() throws Exception {
+    // Mock failure for client.read()
+    AbfsClient client = getMockAbfsClient();
+
+    // Success operation mock
+    AbfsRestOperation op = getMockRestOp();
+
+    // Stub :
+    // Pass all readAheads and fail the post eviction request to
+    // prove ReadAhead buffer is used
+    doReturn(op)
+        .doReturn(op)
+        .doReturn(op)
+        .doThrow(new TimeoutException("Internal Server error for RAH-X")) // for post eviction request
+        .doThrow(new TimeoutException("Internal Server error for RAH-Y"))
+        .doThrow(new TimeoutException("Internal Server error for RAH-Z"))
+        .when(client)
+        .read(any(String.class), any(Long.class), any(byte[].class),
+            any(Integer.class), any(Integer.class), any(String.class),
+            any(String.class));
+
+    AbfsInputStream inputStream = getAbfsInputStream(client, "testSuccessfulReadAhead.txt");
+
+    queueReadAheads(inputStream);
+
+    // AbfsInputStream Read would have waited for the read-ahead for the requested offset
+    // as we are testing from ReadAheadManager directly, sleep for a sec to
+    // get the read ahead threads to complete
+    Thread.sleep(1000);
+
+    // Only the 3 readAhead threads should have triggered client.read
+    verifyReadCallCount(client, 3);
+
+    // getBlock for a new read should return the buffer read-ahead
+    int bytesRead = ReadBufferManager.getBufferManager().getBlock(
+        inputStream,
+        ONE_KB,
+        ONE_KB,
+        new byte[ONE_KB]);
+
+    Assert.assertTrue("bytesRead should be non-zero from the "
+        + "buffer that was read-ahead", bytesRead > 0);
+
+    // Once created, mock will remember all interactions.
+    // As the above read should not have triggered any server calls, total
+    // number of read calls made at this point will be same as last.
+    verifyReadCallCount(client, 3);
+
+    // Stub will throw exception for client.read() for 4th and later calls
+    // if not using the read-ahead buffer exception will be thrown on read
+    checkEvictedStatus(inputStream, 0, true);
+  }
+
+}
\ No newline at end of file
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TestCachedSASToken.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TestCachedSASToken.java
index 1016d4b..cbba808 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TestCachedSASToken.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TestCachedSASToken.java
@@ -22,12 +22,14 @@ import java.io.IOException;
 import java.time.OffsetDateTime;
 import java.time.ZoneOffset;
 import java.time.format.DateTimeFormatter;
+import java.util.UUID;
 
 import org.junit.Assert;
 import org.junit.Test;
 
 import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS;
 import static java.time.temporal.ChronoUnit.SECONDS;
+import static java.time.temporal.ChronoUnit.DAYS;
 
 /**
  * Test CachedSASToken.
@@ -159,4 +161,36 @@ public final class TestCachedSASToken {
     cachedToken = cachedSasToken.get();
     Assert.assertNull(cachedToken);
   }
+
+  public static CachedSASToken getTestCachedSASTokenInstance() {
+    String expiryPostADay = OffsetDateTime.now(ZoneOffset.UTC)
+        .plus(1, DAYS)
+        .format(DateTimeFormatter.ISO_DATE_TIME);
+    String version = "2020-20-20";
+
+    StringBuilder sb = new StringBuilder();
+    sb.append("skoid=");
+    sb.append(UUID.randomUUID().toString());
+    sb.append("&sktid=");
+    sb.append(UUID.randomUUID().toString());
+    sb.append("&skt=");
+    sb.append(OffsetDateTime.now(ZoneOffset.UTC)
+        .minus(1, DAYS)
+        .format(DateTimeFormatter.ISO_DATE_TIME));
+    sb.append("&ske=");
+    sb.append(expiryPostADay);
+    sb.append("&sks=b");
+    sb.append("&skv=");
+    sb.append(version);
+    sb.append("&sp=rw");
+    sb.append("&sr=b");
+    sb.append("&se=");
+    sb.append(expiryPostADay);
+    sb.append("&sv=2");
+    sb.append(version);
+
+    CachedSASToken cachedSASToken = new CachedSASToken();
+    cachedSASToken.update(sb.toString());
+    return cachedSASToken;
+  }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org