You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by da...@apache.org on 2020/05/27 20:51:57 UTC
[hadoop] branch trunk updated: HADOOP-16852: Report read-ahead
error back
This is an automated email from the ASF dual-hosted git repository.
dazhou pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 53b993e HADOOP-16852: Report read-ahead error back
53b993e is described below
commit 53b993e6048ffaaf98e460690211fc08efb20cf2
Author: Sneha Vijayarajan <sn...@gmail.com>
AuthorDate: Wed May 27 13:51:42 2020 -0700
HADOOP-16852: Report read-ahead error back
Contributed by Sneha Vijayarajan
---
.../fs/azurebfs/services/AbfsInputStream.java | 12 +
.../hadoop/fs/azurebfs/services/ReadBuffer.java | 16 +
.../fs/azurebfs/services/ReadBufferManager.java | 90 ++++-
.../fs/azurebfs/services/ReadBufferWorker.java | 12 +-
.../fs/azurebfs/services/TestAbfsInputStream.java | 450 +++++++++++++++++++++
.../fs/azurebfs/utils/TestCachedSASToken.java | 34 ++
6 files changed, 604 insertions(+), 10 deletions(-)
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
index 422fa3b..50380c9 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
@@ -24,6 +24,10 @@ import java.io.IOException;
import java.net.HttpURLConnection;
import com.google.common.base.Preconditions;
+import com.google.common.annotations.VisibleForTesting;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.CanUnbuffer;
import org.apache.hadoop.fs.FSExceptionMessages;
@@ -41,6 +45,7 @@ import static org.apache.hadoop.util.StringUtils.toLowerCase;
*/
public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
StreamCapabilities {
+ private static final Logger LOG = LoggerFactory.getLogger(AbfsInputStream.class);
private final AbfsClient client;
private final Statistics statistics;
@@ -239,6 +244,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
final AbfsRestOperation op;
AbfsPerfTracker tracker = client.getAbfsPerfTracker();
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "readRemote", "read")) {
+ LOG.trace("Trigger client.read for path={} position={} offset={} length={}", path, position, offset, length);
op = client.read(path, position, b, offset, length, tolerateOobAppends ? "*" : eTag, cachedSasToken.get());
cachedSasToken.update(op.getSasToken());
perfInfo.registerResult(op.getResult()).registerSuccess(true);
@@ -431,4 +437,10 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
byte[] getBuffer() {
return buffer;
}
+
+ @VisibleForTesting
+ protected void setCachedSasToken(final CachedSASToken cachedSasToken) {
+ this.cachedSasToken = cachedSasToken;
+ }
+
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java
index 00e4f00..5d55726 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java
@@ -18,10 +18,13 @@
package org.apache.hadoop.fs.azurebfs.services;
+import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
+import static org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus.READ_FAILED;
+
class ReadBuffer {
private AbfsInputStream stream;
@@ -40,6 +43,8 @@ class ReadBuffer {
private boolean isLastByteConsumed = false;
private boolean isAnyByteConsumed = false;
+ private IOException errException = null;
+
public AbfsInputStream getStream() {
return stream;
}
@@ -88,12 +93,23 @@ class ReadBuffer {
this.bufferindex = bufferindex;
}
+ public IOException getErrException() {
+ return errException;
+ }
+
+ public void setErrException(final IOException errException) {
+ this.errException = errException;
+ }
+
public ReadBufferStatus getStatus() {
return status;
}
public void setStatus(ReadBufferStatus status) {
this.status = status;
+ if (status == READ_FAILED) {
+ bufferindex = -1;
+ }
}
public CountDownLatch getLatch() {
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
index 5b71cf0..73c23b0 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
@@ -21,12 +21,15 @@ import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Stack;
import java.util.concurrent.CountDownLatch;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* The Read Buffer Manager for Rest AbfsClient.
*/
@@ -36,8 +39,9 @@ final class ReadBufferManager {
private static final int NUM_BUFFERS = 16;
private static final int BLOCK_SIZE = 4 * 1024 * 1024;
private static final int NUM_THREADS = 8;
- private static final int THRESHOLD_AGE_MILLISECONDS = 3000; // have to see if 3 seconds is a good threshold
+ private static final int DEFAULT_THRESHOLD_AGE_MILLISECONDS = 3000; // have to see if 3 seconds is a good threshold
+ private static int thresholdAgeMilliseconds = DEFAULT_THRESHOLD_AGE_MILLISECONDS;
private Thread[] threads = new Thread[NUM_THREADS];
private byte[][] buffers; // array of byte[] buffers, to hold the data that is read
private Stack<Integer> freeList = new Stack<>(); // indices in buffers[] array that are available
@@ -141,7 +145,8 @@ final class ReadBufferManager {
* @param buffer the buffer to read data into. Note that the buffer will be written into from offset 0.
* @return the number of bytes read
*/
- int getBlock(final AbfsInputStream stream, final long position, final int length, final byte[] buffer) {
+ int getBlock(final AbfsInputStream stream, final long position, final int length, final byte[] buffer)
+ throws IOException {
// not synchronized, so have to be careful with locking
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("getBlock for file {} position {} thread {}",
@@ -244,7 +249,7 @@ final class ReadBufferManager {
earliestBirthday = buf.getTimeStamp();
}
}
- if ((currentTimeMillis() - earliestBirthday > THRESHOLD_AGE_MILLISECONDS) && (nodeToEvict != null)) {
+ if ((currentTimeMillis() - earliestBirthday > thresholdAgeMilliseconds) && (nodeToEvict != null)) {
return evict(nodeToEvict);
}
@@ -253,7 +258,12 @@ final class ReadBufferManager {
}
private boolean evict(final ReadBuffer buf) {
- freeList.push(buf.getBufferindex());
+ // As failed ReadBuffers (bufferIndx = -1) are saved in completedReadList,
+ // avoid adding it to freeList.
+ if (buf.getBufferindex() != -1) {
+ freeList.push(buf.getBufferindex());
+ }
+
completedReadList.remove(buf);
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Evicting buffer idx {}; was used for file {} offset {} length {}",
@@ -289,6 +299,27 @@ final class ReadBufferManager {
return null;
}
+ /**
+ * Returns buffers that failed or passed from completed queue.
+ * @param stream
+ * @param requestedOffset
+ * @return
+ */
+ private ReadBuffer getBufferFromCompletedQueue(final AbfsInputStream stream, final long requestedOffset) {
+ for (ReadBuffer buffer : completedReadList) {
+ // Buffer is returned if the requestedOffset is at or above buffer's
+ // offset but less than buffer's length or the actual requestedLength
+ if ((buffer.getStream() == stream)
+ && (requestedOffset >= buffer.getOffset())
+ && ((requestedOffset < buffer.getOffset() + buffer.getLength())
+ || (requestedOffset < buffer.getOffset() + buffer.getRequestedLength()))) {
+ return buffer;
+ }
+ }
+
+ return null;
+ }
+
private void clearFromReadAheadQueue(final AbfsInputStream stream, final long requestedOffset) {
ReadBuffer buffer = getFromList(readAheadQueue, stream, requestedOffset);
if (buffer != null) {
@@ -299,11 +330,28 @@ final class ReadBufferManager {
}
private int getBlockFromCompletedQueue(final AbfsInputStream stream, final long position, final int length,
- final byte[] buffer) {
- ReadBuffer buf = getFromList(completedReadList, stream, position);
- if (buf == null || position >= buf.getOffset() + buf.getLength()) {
+ final byte[] buffer) throws IOException {
+ ReadBuffer buf = getBufferFromCompletedQueue(stream, position);
+
+ if (buf == null) {
return 0;
}
+
+ if (buf.getStatus() == ReadBufferStatus.READ_FAILED) {
+ // To prevent new read requests to fail due to old read-ahead attempts,
+ // return exception only from buffers that failed within last thresholdAgeMilliseconds
+ if ((currentTimeMillis() - (buf.getTimeStamp()) < thresholdAgeMilliseconds)) {
+ throw buf.getErrException();
+ } else {
+ return 0;
+ }
+ }
+
+ if ((buf.getStatus() != ReadBufferStatus.AVAILABLE)
+ || (position >= buf.getOffset() + buf.getLength())) {
+ return 0;
+ }
+
int cursor = (int) (position - buf.getOffset());
int availableLengthInBuffer = buf.getLength() - cursor;
int lengthToCopy = Math.min(length, availableLengthInBuffer);
@@ -368,14 +416,18 @@ final class ReadBufferManager {
inProgressList.remove(buffer);
if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) {
buffer.setStatus(ReadBufferStatus.AVAILABLE);
- buffer.setTimeStamp(currentTimeMillis());
buffer.setLength(bytesActuallyRead);
completedReadList.add(buffer);
} else {
freeList.push(buffer.getBufferindex());
- // buffer should go out of scope after the end of the calling method in ReadBufferWorker, and eligible for GC
+ // buffer will be deleted as per the eviction policy.
}
+
+ buffer.setStatus(result);
+ buffer.setTimeStamp(currentTimeMillis());
+ completedReadList.add(buffer);
}
+
//outside the synchronized, since anyone receiving a wake-up from the latch must see safe-published results
buffer.getLatch().countDown(); // wake up waiting threads (if any)
}
@@ -392,4 +444,24 @@ final class ReadBufferManager {
private long currentTimeMillis() {
return System.nanoTime() / 1000 / 1000;
}
+
+ @VisibleForTesting
+ int getThresholdAgeMilliseconds() {
+ return thresholdAgeMilliseconds;
+ }
+
+ @VisibleForTesting
+ static void setThresholdAgeMilliseconds(int thresholdAgeMs) {
+ thresholdAgeMilliseconds = thresholdAgeMs;
+ }
+
+ @VisibleForTesting
+ int getCompletedReadListSize() {
+ return completedReadList.size();
+ }
+
+ @VisibleForTesting
+ void callTryEvict() {
+ tryEvict();
+ }
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java
index af69de0..41acd7e 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.fs.azurebfs.services;
+import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
@@ -61,9 +62,18 @@ class ReadBufferWorker implements Runnable {
if (buffer != null) {
try {
// do the actual read, from the file.
- int bytesRead = buffer.getStream().readRemote(buffer.getOffset(), buffer.getBuffer(), 0, buffer.getRequestedLength());
+ int bytesRead = buffer.getStream().readRemote(
+ buffer.getOffset(),
+ buffer.getBuffer(),
+ 0,
+ // If AbfsInputStream was created with bigger buffer size than
+ // read-ahead buffer size, make sure a valid length is passed
+ // for remote read
+ Math.min(buffer.getRequestedLength(), buffer.getBuffer().length));
+
bufferManager.doneReading(buffer, ReadBufferStatus.AVAILABLE, bytesRead); // post result back to ReadBufferManager
} catch (Exception ex) {
+ buffer.setErrException(new IOException(ex));
bufferManager.doneReading(buffer, ReadBufferStatus.READ_FAILED, 0);
}
}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
new file mode 100644
index 0000000..c9dacd6
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
@@ -0,0 +1,450 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TimeoutException;
+import org.apache.hadoop.fs.azurebfs.utils.TestCachedSASToken;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FORWARD_SLASH;
+
+/**
+ * Unit test AbfsInputStream.
+ */
+public class TestAbfsInputStream extends
+ AbstractAbfsIntegrationTest {
+
+ private static final int ONE_KB = 1 * 1024;
+ private static final int TWO_KB = 2 * 1024;
+ private static final int THREE_KB = 3 * 1024;
+ private static final int REDUCED_READ_BUFFER_AGE_THRESHOLD = 3000; // 3 sec
+
+ private AbfsRestOperation getMockRestOp() {
+ AbfsRestOperation op = mock(AbfsRestOperation.class);
+ AbfsHttpOperation httpOp = mock(AbfsHttpOperation.class);
+ when(httpOp.getBytesReceived()).thenReturn(1024L);
+ when(op.getResult()).thenReturn(httpOp);
+ when(op.getSasToken()).thenReturn(TestCachedSASToken.getTestCachedSASTokenInstance().get());
+ return op;
+ }
+
+ private AbfsClient getMockAbfsClient() {
+ // Mock failure for client.read()
+ AbfsClient client = mock(AbfsClient.class);
+ AbfsPerfTracker tracker = new AbfsPerfTracker(
+ "test",
+ this.getAccountName(),
+ this.getConfiguration());
+ when(client.getAbfsPerfTracker()).thenReturn(tracker);
+
+ return client;
+ }
+
+ private AbfsInputStream getAbfsInputStream(AbfsClient mockAbfsClient, String fileName) {
+ AbfsInputStreamContext inputStreamContext = new AbfsInputStreamContext(-1);
+ // Create AbfsInputStream with the client instance
+ AbfsInputStream inputStream = new AbfsInputStream(
+ mockAbfsClient,
+ null,
+ FORWARD_SLASH + fileName,
+ THREE_KB,
+ inputStreamContext.withReadBufferSize(ONE_KB).withReadAheadQueueDepth(10),
+ "eTag");
+
+ inputStream.setCachedSasToken(
+ TestCachedSASToken.getTestCachedSASTokenInstance());
+
+ return inputStream;
+ }
+
+ private void queueReadAheads(AbfsInputStream inputStream) {
+ // Mimic AbfsInputStream readAhead queue requests
+ ReadBufferManager.getBufferManager()
+ .queueReadAhead(inputStream, 0, ONE_KB);
+ ReadBufferManager.getBufferManager()
+ .queueReadAhead(inputStream, ONE_KB, ONE_KB);
+ ReadBufferManager.getBufferManager()
+ .queueReadAhead(inputStream, TWO_KB, TWO_KB);
+ }
+
+ private void verifyReadCallCount(AbfsClient client, int count) throws
+ AzureBlobFileSystemException, InterruptedException {
+ // ReadAhead threads are triggered asynchronously.
+ // Wait a second before verifying the number of total calls.
+ Thread.sleep(1000);
+ verify(client, times(count)).read(any(String.class), any(Long.class),
+ any(byte[].class), any(Integer.class), any(Integer.class),
+ any(String.class), any(String.class));
+ }
+
+ private void checkEvictedStatus(AbfsInputStream inputStream, int position, boolean expectedToThrowException)
+ throws Exception {
+ // Sleep for the eviction threshold time
+ Thread.sleep(ReadBufferManager.getBufferManager().getThresholdAgeMilliseconds() + 1000);
+
+ // Eviction is done only when AbfsInputStream tries to queue new items.
+ // 1 tryEvict will remove 1 eligible item. To ensure that the current test buffer
+ // will get evicted (considering there could be other tests running in parallel),
+ // call tryEvict for the number of items that are there in completedReadList.
+ int numOfCompletedReadListItems = ReadBufferManager.getBufferManager().getCompletedReadListSize();
+ while (numOfCompletedReadListItems > 0) {
+ ReadBufferManager.getBufferManager().callTryEvict();
+ numOfCompletedReadListItems--;
+ }
+
+ if (expectedToThrowException) {
+ intercept(IOException.class,
+ () -> inputStream.read(position, new byte[ONE_KB], 0, ONE_KB));
+ } else {
+ inputStream.read(position, new byte[ONE_KB], 0, ONE_KB);
+ }
+ }
+
+ public TestAbfsInputStream() throws Exception {
+ super();
+ // Reduce thresholdAgeMilliseconds to 3 sec for the tests
+ ReadBufferManager.getBufferManager().setThresholdAgeMilliseconds(REDUCED_READ_BUFFER_AGE_THRESHOLD);
+ }
+
+ /**
+ * This test expects AbfsInputStream to throw the exception that readAhead
+ * thread received on read. The readAhead thread must be initiated from the
+ * active read request itself.
+ * Also checks that the ReadBuffers are evicted as per the ReadBufferManager
+ * threshold criteria.
+ * @throws Exception
+ */
+ @Test
+ public void testFailedReadAhead() throws Exception {
+ AbfsClient client = getMockAbfsClient();
+ AbfsRestOperation successOp = getMockRestOp();
+
+ // Stub :
+ // Read request leads to 3 readahead calls: Fail all 3 readahead-client.read()
+ // Actual read request fails with the failure in readahead thread
+ doThrow(new TimeoutException("Internal Server error for RAH-Thread-X"))
+ .doThrow(new TimeoutException("Internal Server error for RAH-Thread-Y"))
+ .doThrow(new TimeoutException("Internal Server error RAH-Thread-Z"))
+ .doReturn(successOp) // Any extra calls to read, pass it.
+ .when(client)
+ .read(any(String.class), any(Long.class), any(byte[].class),
+ any(Integer.class), any(Integer.class), any(String.class),
+ any(String.class));
+
+ AbfsInputStream inputStream = getAbfsInputStream(client, "testFailedReadAhead.txt");
+
+ // Scenario: ReadAhead triggered from current active read call failed
+ // Before the change to return exception from readahead buffer,
+ // AbfsInputStream would have triggered an extra readremote on noticing
+ // data absent in readahead buffers
+ // In this test, a read should trigger 3 client.read() calls as file is 3 KB
+ // and readahead buffer size set in AbfsInputStream is 1 KB
+ // There should only be a total of 3 client.read() in this test.
+ intercept(IOException.class,
+ () -> inputStream.read(new byte[ONE_KB]));
+
+ // Only the 3 readAhead threads should have triggered client.read
+ verifyReadCallCount(client, 3);
+
+ // Stub returns success for the 4th read request, if ReadBuffers still
+ // persisted, ReadAheadManager getBlock would have returned exception.
+ checkEvictedStatus(inputStream, 0, false);
+ }
+
+ /**
+ * The test expects AbfsInputStream to initiate a remote read request for
+ * the request offset and length when previous read ahead on the offset had failed.
+ * Also checks that the ReadBuffers are evicted as per the ReadBufferManager
+ * threshold criteria.
+ * @throws Exception
+ */
+ @Test
+ public void testOlderReadAheadFailure() throws Exception {
+ AbfsClient client = getMockAbfsClient();
+ AbfsRestOperation successOp = getMockRestOp();
+
+ // Stub :
+ // First Read request leads to 3 readahead calls: Fail all 3 readahead-client.read()
+ // A second read request will see that readahead had failed for data in
+ // the requested offset range and also that its is an older readahead request.
+ // So attempt a new read only for the requested range.
+ doThrow(new TimeoutException("Internal Server error for RAH-X"))
+ .doThrow(new TimeoutException("Internal Server error for RAH-Y"))
+ .doThrow(new TimeoutException("Internal Server error for RAH-Z"))
+ .doReturn(successOp) // pass the read for second read request
+ .doReturn(successOp) // pass success for post eviction test
+ .when(client)
+ .read(any(String.class), any(Long.class), any(byte[].class),
+ any(Integer.class), any(Integer.class), any(String.class),
+ any(String.class));
+
+ AbfsInputStream inputStream = getAbfsInputStream(client, "testOlderReadAheadFailure.txt");
+
+ // First read request that fails as the readahead triggered from this request failed.
+ intercept(IOException.class,
+ () -> inputStream.read(new byte[ONE_KB]));
+
+ // Only the 3 readAhead threads should have triggered client.read
+ verifyReadCallCount(client, 3);
+
+ // Sleep for thresholdAgeMs so that the read ahead buffer qualifies for being old.
+ Thread.sleep(ReadBufferManager.getBufferManager().getThresholdAgeMilliseconds());
+
+ // Second read request should retry the read (and not issue any new readaheads)
+ inputStream.read(ONE_KB, new byte[ONE_KB], 0, ONE_KB);
+
+ // Once created, mock will remember all interactions. So total number of read
+ // calls will be one more from earlier (there is a reset mock which will reset the
+ // count, but the mock stub is erased as well which needs AbsInputStream to be recreated,
+ // which beats the purpose)
+ verifyReadCallCount(client, 4);
+
+ // Stub returns success for the 5th read request, if ReadBuffers still
+ // persisted request would have failed for position 0.
+ checkEvictedStatus(inputStream, 0, false);
+ }
+
+ /**
+ * The test expects AbfsInputStream to utilize any data read ahead for
+ * requested offset and length.
+ * @throws Exception
+ */
+ @Test
+ public void testSuccessfulReadAhead() throws Exception {
+ // Mock failure for client.read()
+ AbfsClient client = getMockAbfsClient();
+
+ // Success operation mock
+ AbfsRestOperation op = getMockRestOp();
+
+ // Stub :
+ // Pass all readAheads and fail the post eviction request to
+ // prove ReadAhead buffer is used
+ // for post eviction check, fail all read aheads
+ doReturn(op)
+ .doReturn(op)
+ .doReturn(op)
+ .doThrow(new TimeoutException("Internal Server error for RAH-X"))
+ .doThrow(new TimeoutException("Internal Server error for RAH-Y"))
+ .doThrow(new TimeoutException("Internal Server error for RAH-Z"))
+ .when(client)
+ .read(any(String.class), any(Long.class), any(byte[].class),
+ any(Integer.class), any(Integer.class), any(String.class),
+ any(String.class));
+
+ AbfsInputStream inputStream = getAbfsInputStream(client, "testSuccessfulReadAhead.txt");
+
+ // First read request that triggers readAheads.
+ inputStream.read(new byte[ONE_KB]);
+
+ // Only the 3 readAhead threads should have triggered client.read
+ verifyReadCallCount(client, 3);
+
+ // Another read request whose requested data is already read ahead.
+ inputStream.read(ONE_KB, new byte[ONE_KB], 0, ONE_KB);
+
+ // Once created, mock will remember all interactions.
+ // As the above read should not have triggered any server calls, total
+ // number of read calls made at this point will be same as last.
+ verifyReadCallCount(client, 3);
+
+ // Stub will throw exception for client.read() for 4th and later calls
+ // if not using the read-ahead buffer exception will be thrown on read
+ checkEvictedStatus(inputStream, 0, true);
+ }
+
+ /**
+ * This test expects ReadAheadManager to throw exception if the read ahead
+ * thread had failed within the last thresholdAgeMilliseconds.
+ * Also checks that the ReadBuffers are evicted as per the ReadBufferManager
+ * threshold criteria.
+ * @throws Exception
+ */
+ @Test
+ public void testReadAheadManagerForFailedReadAhead() throws Exception {
+ AbfsClient client = getMockAbfsClient();
+ AbfsRestOperation successOp = getMockRestOp();
+
+ // Stub :
+ // Read request leads to 3 readahead calls: Fail all 3 readahead-client.read()
+ // Actual read request fails with the failure in readahead thread
+ doThrow(new TimeoutException("Internal Server error for RAH-Thread-X"))
+ .doThrow(new TimeoutException("Internal Server error for RAH-Thread-Y"))
+ .doThrow(new TimeoutException("Internal Server error RAH-Thread-Z"))
+ .doReturn(successOp) // Any extra calls to read, pass it.
+ .when(client)
+ .read(any(String.class), any(Long.class), any(byte[].class),
+ any(Integer.class), any(Integer.class), any(String.class),
+ any(String.class));
+
+ AbfsInputStream inputStream = getAbfsInputStream(client, "testReadAheadManagerForFailedReadAhead.txt");
+
+ queueReadAheads(inputStream);
+
+ // AbfsInputStream Read would have waited for the read-ahead for the requested offset
+ // as we are testing from ReadAheadManager directly, sleep for a sec to
+ // get the read ahead threads to complete
+ Thread.sleep(1000);
+
+ // if readAhead failed for specific offset, getBlock should
+ // throw exception from the ReadBuffer that failed within last thresholdAgeMilliseconds sec
+ intercept(IOException.class,
+ () -> ReadBufferManager.getBufferManager().getBlock(
+ inputStream,
+ 0,
+ ONE_KB,
+ new byte[ONE_KB]));
+
+ // Only the 3 readAhead threads should have triggered client.read
+ verifyReadCallCount(client, 3);
+
+ // Stub returns success for the 4th read request, if ReadBuffers still
+ // persisted, ReadAheadManager getBlock would have returned exception.
+ checkEvictedStatus(inputStream, 0, false);
+ }
+
+ /**
+ * The test expects ReadAheadManager to return 0 receivedBytes when previous
+ * read ahead on the offset had failed and not throw exception received then.
+ * Also checks that the ReadBuffers are evicted as per the ReadBufferManager
+ * threshold criteria.
+ * @throws Exception
+ */
+ @Test
+ public void testReadAheadManagerForOlderReadAheadFailure() throws Exception {
+ AbfsClient client = getMockAbfsClient();
+ AbfsRestOperation successOp = getMockRestOp();
+
+ // Stub :
+ // First Read request leads to 3 readahead calls: Fail all 3 readahead-client.read()
+ // A second read request will see that readahead had failed for data in
+ // the requested offset range but also that its is an older readahead request.
+ // System issue could have resolved by now, so attempt a new read only for the requested range.
+ doThrow(new TimeoutException("Internal Server error for RAH-X"))
+ .doThrow(new TimeoutException("Internal Server error for RAH-X"))
+ .doThrow(new TimeoutException("Internal Server error for RAH-X"))
+ .doReturn(successOp) // pass the read for second read request
+ .doReturn(successOp) // pass success for post eviction test
+ .when(client)
+ .read(any(String.class), any(Long.class), any(byte[].class),
+ any(Integer.class), any(Integer.class), any(String.class),
+ any(String.class));
+
+ AbfsInputStream inputStream = getAbfsInputStream(client, "testReadAheadManagerForOlderReadAheadFailure.txt");
+
+ queueReadAheads(inputStream);
+
+ // AbfsInputStream Read would have waited for the read-ahead for the requested offset
+ // as we are testing from ReadAheadManager directly, sleep for thresholdAgeMilliseconds so that
+ // read buffer qualifies for to be an old buffer
+ Thread.sleep(ReadBufferManager.getBufferManager().getThresholdAgeMilliseconds());
+
+ // Only the 3 readAhead threads should have triggered client.read
+ verifyReadCallCount(client, 3);
+
+ // getBlock from a new read request should return 0 if there is a failure
+ // 30 sec before in read ahead buffer for respective offset.
+ int bytesRead = ReadBufferManager.getBufferManager().getBlock(
+ inputStream,
+ ONE_KB,
+ ONE_KB,
+ new byte[ONE_KB]);
+ Assert.assertEquals("bytesRead should be zero when previously read "
+ + "ahead buffer had failed", 0, bytesRead);
+
+ // Stub returns success for the 5th read request, if ReadBuffers still
+ // persisted request would have failed for position 0.
+ checkEvictedStatus(inputStream, 0, false);
+ }
+
+ /**
+ * The test expects ReadAheadManager to return data from previously read
+ * ahead data of same offset.
+ * @throws Exception
+ */
+ @Test
+ public void testReadAheadManagerForSuccessfulReadAhead() throws Exception {
+ // Mock failure for client.read()
+ AbfsClient client = getMockAbfsClient();
+
+ // Success operation mock
+ AbfsRestOperation op = getMockRestOp();
+
+ // Stub :
+ // Pass all readAheads and fail the post eviction request to
+ // prove ReadAhead buffer is used
+ doReturn(op)
+ .doReturn(op)
+ .doReturn(op)
+ .doThrow(new TimeoutException("Internal Server error for RAH-X")) // for post eviction request
+ .doThrow(new TimeoutException("Internal Server error for RAH-Y"))
+ .doThrow(new TimeoutException("Internal Server error for RAH-Z"))
+ .when(client)
+ .read(any(String.class), any(Long.class), any(byte[].class),
+ any(Integer.class), any(Integer.class), any(String.class),
+ any(String.class));
+
+ AbfsInputStream inputStream = getAbfsInputStream(client, "testSuccessfulReadAhead.txt");
+
+ queueReadAheads(inputStream);
+
+ // AbfsInputStream Read would have waited for the read-ahead for the requested offset
+ // as we are testing from ReadAheadManager directly, sleep for a sec to
+ // get the read ahead threads to complete
+ Thread.sleep(1000);
+
+ // Only the 3 readAhead threads should have triggered client.read
+ verifyReadCallCount(client, 3);
+
+ // getBlock for a new read should return the buffer read-ahead
+ int bytesRead = ReadBufferManager.getBufferManager().getBlock(
+ inputStream,
+ ONE_KB,
+ ONE_KB,
+ new byte[ONE_KB]);
+
+ Assert.assertTrue("bytesRead should be non-zero from the "
+ + "buffer that was read-ahead", bytesRead > 0);
+
+ // Once created, mock will remember all interactions.
+ // As the above read should not have triggered any server calls, total
+ // number of read calls made at this point will be same as last.
+ verifyReadCallCount(client, 3);
+
+ // Stub will throw exception for client.read() for 4th and later calls
+ // if not using the read-ahead buffer exception will be thrown on read
+ checkEvictedStatus(inputStream, 0, true);
+ }
+
+}
\ No newline at end of file
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TestCachedSASToken.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TestCachedSASToken.java
index 1016d4b..cbba808 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TestCachedSASToken.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TestCachedSASToken.java
@@ -22,12 +22,14 @@ import java.io.IOException;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
+import java.util.UUID;
import org.junit.Assert;
import org.junit.Test;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS;
import static java.time.temporal.ChronoUnit.SECONDS;
+import static java.time.temporal.ChronoUnit.DAYS;
/**
* Test CachedSASToken.
@@ -159,4 +161,36 @@ public final class TestCachedSASToken {
cachedToken = cachedSasToken.get();
Assert.assertNull(cachedToken);
}
+
+ public static CachedSASToken getTestCachedSASTokenInstance() {
+ String expiryPostADay = OffsetDateTime.now(ZoneOffset.UTC)
+ .plus(1, DAYS)
+ .format(DateTimeFormatter.ISO_DATE_TIME);
+ String version = "2020-20-20";
+
+ StringBuilder sb = new StringBuilder();
+ sb.append("skoid=");
+ sb.append(UUID.randomUUID().toString());
+ sb.append("&sktid=");
+ sb.append(UUID.randomUUID().toString());
+ sb.append("&skt=");
+ sb.append(OffsetDateTime.now(ZoneOffset.UTC)
+ .minus(1, DAYS)
+ .format(DateTimeFormatter.ISO_DATE_TIME));
+ sb.append("&ske=");
+ sb.append(expiryPostADay);
+ sb.append("&sks=b");
+ sb.append("&skv=");
+ sb.append(version);
+ sb.append("&sp=rw");
+ sb.append("&sr=b");
+ sb.append("&se=");
+ sb.append(expiryPostADay);
+ sb.append("&sv=2");
+ sb.append(version);
+
+ CachedSASToken cachedSASToken = new CachedSASToken();
+ cachedSASToken.update(sb.toString());
+ return cachedSASToken;
+ }
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org