You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2020/09/08 13:02:49 UTC
[hadoop] 01/02: HADOOP-17113. Adding ReadAhead Counters in ABFS
(#2154)
This is an automated email from the ASF dual-hosted git repository.
stevel pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit 28f1ded9fedd0973e88a80337d7c6251946a0672
Author: Mehakmeet Singh <me...@gmail.com>
AuthorDate: Wed Jul 22 22:52:30 2020 +0530
HADOOP-17113. Adding ReadAhead Counters in ABFS (#2154)
Contributed by Mehakmeet Singh
Change-Id: I6bbd8165385a9267ed64831bb1efa18b6554feb1
---
.../fs/azurebfs/services/AbfsInputStream.java | 6 ++
.../services/AbfsInputStreamStatistics.java | 12 +++
.../services/AbfsInputStreamStatisticsImpl.java | 36 ++++++++
.../azurebfs/ITestAbfsInputStreamStatistics.java | 95 ++++++++++++++++++++++
4 files changed, 149 insertions(+)
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
index a809bde..926c23d 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
@@ -238,6 +238,9 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
if (receivedBytes > 0) {
incrementReadOps();
LOG.debug("Received data from read ahead, not doing remote read");
+ if (streamStatistics != null) {
+ streamStatistics.readAheadBytesRead(receivedBytes);
+ }
return receivedBytes;
}
@@ -292,6 +295,9 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
throw new IOException(ex);
}
long bytesRead = op.getResult().getBytesReceived();
+ if (streamStatistics != null) {
+ streamStatistics.remoteBytesRead(bytesRead);
+ }
if (bytesRead > Integer.MAX_VALUE) {
throw new IOException("Unexpected Content-Length");
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatistics.java
index 2603394..c910a1f 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatistics.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatistics.java
@@ -85,6 +85,18 @@ public interface AbfsInputStreamStatistics {
void remoteReadOperation();
/**
+ * Records the bytes read from readAhead buffer.
+ * @param bytes the bytes to be incremented.
+ */
+ void readAheadBytesRead(long bytes);
+
+ /**
+ * Records bytes read remotely after nothing from readAheadBuffer was read.
+ * @param bytes the bytes to be incremented.
+ */
+ void remoteBytesRead(long bytes);
+
+ /**
* Makes the string of all the AbfsInputStream statistics.
* @return the string with all the statistics.
*/
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java
index fd18910..12cc407 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java
@@ -33,6 +33,8 @@ public class AbfsInputStreamStatisticsImpl
private long readOperations;
private long bytesReadFromBuffer;
private long remoteReadOperations;
+ private long readAheadBytesRead;
+ private long remoteBytesRead;
/**
* Seek backwards, incrementing the seek and backward seek counters.
@@ -129,6 +131,30 @@ public class AbfsInputStreamStatisticsImpl
}
/**
+ * Total bytes read from readAhead buffer during a read operation.
+ *
+ * @param bytes the bytes to be incremented.
+ */
+ @Override
+ public void readAheadBytesRead(long bytes) {
+ if (bytes > 0) {
+ readAheadBytesRead += bytes;
+ }
+ }
+
+ /**
+ * Total bytes read remotely after nothing was read from readAhead buffer.
+ *
+ * @param bytes the bytes to be incremented.
+ */
+ @Override
+ public void remoteBytesRead(long bytes) {
+ if (bytes > 0) {
+ remoteBytesRead += bytes;
+ }
+ }
+
+ /**
* {@inheritDoc}
*
* Increment the counter when a remote read operation occurs.
@@ -178,6 +204,14 @@ public class AbfsInputStreamStatisticsImpl
return remoteReadOperations;
}
+ public long getReadAheadBytesRead() {
+ return readAheadBytesRead;
+ }
+
+ public long getRemoteBytesRead() {
+ return remoteBytesRead;
+ }
+
/**
* String operator describes all the current statistics.
* <b>Important: there are no guarantees as to the stability
@@ -199,6 +233,8 @@ public class AbfsInputStreamStatisticsImpl
sb.append(", ReadOperations=").append(readOperations);
sb.append(", bytesReadFromBuffer=").append(bytesReadFromBuffer);
sb.append(", remoteReadOperations=").append(remoteReadOperations);
+ sb.append(", readAheadBytesRead=").append(readAheadBytesRead);
+ sb.append(", remoteBytesRead=").append(remoteBytesRead);
sb.append('}');
return sb.toString();
}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java
index 7a62eca..8385099 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.fs.azurebfs;
import java.io.IOException;
+import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,6 +40,10 @@ public class ITestAbfsInputStreamStatistics
LoggerFactory.getLogger(ITestAbfsInputStreamStatistics.class);
private static final int ONE_MB = 1024 * 1024;
private static final int ONE_KB = 1024;
+ private static final int CUSTOM_BLOCK_BUFFER_SIZE = 4 * 1024;
+ private static final int CUSTOM_READ_AHEAD_BUFFER_SIZE = 8 * CUSTOM_BLOCK_BUFFER_SIZE;
+ private static final int THREAD_SLEEP_10_SECONDS = 10;
+ private static final int TIMEOUT_30_SECONDS = 30000;
private byte[] defBuffer = new byte[ONE_MB];
public ITestAbfsInputStreamStatistics() throws Exception {
@@ -75,6 +80,8 @@ public class ITestAbfsInputStreamStatistics
checkInitValue(stats.getReadOperations(), "readOps");
checkInitValue(stats.getBytesReadFromBuffer(), "bytesReadFromBuffer");
checkInitValue(stats.getRemoteReadOperations(), "remoteReadOps");
+ checkInitValue(stats.getReadAheadBytesRead(), "readAheadBytesRead");
+ checkInitValue(stats.getRemoteBytesRead(), "readAheadRemoteBytesRead");
} finally {
IOUtils.cleanupWithLogger(LOG, outputStream, inputStream);
@@ -286,6 +293,94 @@ public class ITestAbfsInputStreamStatistics
}
/**
+ * Testing readAhead counters in AbfsInputStream with 30 seconds timeout.
+ */
+ @Test(timeout = TIMEOUT_30_SECONDS)
+ public void testReadAheadCounters() throws IOException, InterruptedException {
+ describe("Test to check correct values for readAhead counters in "
+ + "AbfsInputStream");
+
+ AzureBlobFileSystem fs = getFileSystem();
+ AzureBlobFileSystemStore abfss = fs.getAbfsStore();
+ Path readAheadCountersPath = path(getMethodName());
+
+ /*
+ * Setting the block size for readAhead as 4KB.
+ */
+ abfss.getAbfsConfiguration().setReadBufferSize(CUSTOM_BLOCK_BUFFER_SIZE);
+
+ AbfsOutputStream out = null;
+ AbfsInputStream in = null;
+
+ try {
+
+ /*
+ * Creating a file of 1MB size.
+ */
+ out = createAbfsOutputStreamWithFlushEnabled(fs, readAheadCountersPath);
+ out.write(defBuffer);
+ out.close();
+
+ in = abfss.openFileForRead(readAheadCountersPath, fs.getFsStatistics());
+
+ /*
+ * Reading 1KB after each i * KB positions. Hence the reads are from 0
+ * to 1KB, 1KB to 2KB, and so on.. for 5 operations.
+ */
+ for (int i = 0; i < 5; i++) {
+ in.seek(ONE_KB * i);
+ in.read(defBuffer, ONE_KB * i, ONE_KB);
+ }
+ AbfsInputStreamStatisticsImpl stats =
+ (AbfsInputStreamStatisticsImpl) in.getStreamStatistics();
+
+ /*
+ * Since, readAhead is done in background threads. Sometimes, the
+ * threads aren't finished in the background and could result in
+ * inaccurate results. So, we wait till we have the accurate values
+ * with a limit of 30 seconds as that's when the test times out.
+ *
+ */
+ while (stats.getRemoteBytesRead() < CUSTOM_READ_AHEAD_BUFFER_SIZE
+ || stats.getReadAheadBytesRead() < CUSTOM_BLOCK_BUFFER_SIZE) {
+ Thread.sleep(THREAD_SLEEP_10_SECONDS);
+ }
+
+ /*
+ * Verifying the counter values of readAheadBytesRead and remoteBytesRead.
+ *
+ * readAheadBytesRead : Since, we read 1KBs 5 times, that means we go
+ * from 0 to 5KB in the file. The bufferSize is set to 4KB, and since
+ * we have 8 blocks of readAhead buffer. We would have 8 blocks of 4KB
+ * buffer. Our read is till 5KB, hence readAhead would ideally read 2
+ * blocks of 4KB which is equal to 8KB. But, sometimes to get more than
+ * one block from readAhead buffer we might have to wait for background
+ * threads to fill the buffer and hence we might do remote read which
+ * would be faster. Therefore, readAheadBytesRead would be equal to or
+ * greater than 4KB.
+ *
+ * remoteBytesRead : Since, the bufferSize is set to 4KB and the number
+ * of blocks or readAheadQueueDepth is equal to 8. We would read 8 * 4
+ * KB buffer on the first read, which is equal to 32KB. But, if we are not
+ * able to read some bytes that were in the buffer after doing
+ * readAhead, we might use remote read again. Thus, the bytes read
+ * remotely could also be greater than 32Kb.
+ *
+ */
+ Assertions.assertThat(stats.getReadAheadBytesRead()).describedAs(
+ "Mismatch in readAheadBytesRead counter value")
+ .isGreaterThanOrEqualTo(CUSTOM_BLOCK_BUFFER_SIZE);
+
+ Assertions.assertThat(stats.getRemoteBytesRead()).describedAs(
+ "Mismatch in remoteBytesRead counter value")
+ .isGreaterThanOrEqualTo(CUSTOM_READ_AHEAD_BUFFER_SIZE);
+
+ } finally {
+ IOUtils.cleanupWithLogger(LOG, out, in);
+ }
+ }
+
+ /**
* Method to assert the initial values of the statistics.
*
* @param actualValue the actual value of the statistics.
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org