You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2020/09/08 13:02:49 UTC

[hadoop] 01/02: HADOOP-17113. Adding ReadAhead Counters in ABFS (#2154)

This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 28f1ded9fedd0973e88a80337d7c6251946a0672
Author: Mehakmeet Singh <me...@gmail.com>
AuthorDate: Wed Jul 22 22:52:30 2020 +0530

    HADOOP-17113. Adding ReadAhead Counters in ABFS (#2154)
    
    Contributed by Mehakmeet Singh
    
    Change-Id: I6bbd8165385a9267ed64831bb1efa18b6554feb1
---
 .../fs/azurebfs/services/AbfsInputStream.java      |  6 ++
 .../services/AbfsInputStreamStatistics.java        | 12 +++
 .../services/AbfsInputStreamStatisticsImpl.java    | 36 ++++++++
 .../azurebfs/ITestAbfsInputStreamStatistics.java   | 95 ++++++++++++++++++++++
 4 files changed, 149 insertions(+)

diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
index a809bde..926c23d 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
@@ -238,6 +238,9 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
       if (receivedBytes > 0) {
         incrementReadOps();
         LOG.debug("Received data from read ahead, not doing remote read");
+        if (streamStatistics != null) {
+          streamStatistics.readAheadBytesRead(receivedBytes);
+        }
         return receivedBytes;
       }
 
@@ -292,6 +295,9 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
       throw new IOException(ex);
     }
     long bytesRead = op.getResult().getBytesReceived();
+    if (streamStatistics != null) {
+      streamStatistics.remoteBytesRead(bytesRead);
+    }
     if (bytesRead > Integer.MAX_VALUE) {
       throw new IOException("Unexpected Content-Length");
     }
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatistics.java
index 2603394..c910a1f 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatistics.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatistics.java
@@ -85,6 +85,18 @@ public interface AbfsInputStreamStatistics {
   void remoteReadOperation();
 
   /**
+   * Records the bytes read from readAhead buffer.
+   * @param bytes the bytes to be incremented.
+   */
+  void readAheadBytesRead(long bytes);
+
+  /**
+   * Records bytes read remotely after nothing from readAheadBuffer was read.
+   * @param bytes the bytes to be incremented.
+   */
+  void remoteBytesRead(long bytes);
+
+  /**
    * Makes the string of all the AbfsInputStream statistics.
    * @return the string with all the statistics.
    */
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java
index fd18910..12cc407 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java
@@ -33,6 +33,8 @@ public class AbfsInputStreamStatisticsImpl
   private long readOperations;
   private long bytesReadFromBuffer;
   private long remoteReadOperations;
+  private long readAheadBytesRead;
+  private long remoteBytesRead;
 
   /**
    * Seek backwards, incrementing the seek and backward seek counters.
@@ -129,6 +131,30 @@ public class AbfsInputStreamStatisticsImpl
   }
 
   /**
+   * Total bytes read from readAhead buffer during a read operation.
+   *
+   * @param bytes the bytes to be incremented.
+   */
+  @Override
+  public void readAheadBytesRead(long bytes) {
+    if (bytes > 0) {
+      readAheadBytesRead += bytes;
+    }
+  }
+
+  /**
+   * Total bytes read remotely after nothing was read from readAhead buffer.
+   *
+   * @param bytes the bytes to be incremented.
+   */
+  @Override
+  public void remoteBytesRead(long bytes) {
+    if (bytes > 0) {
+      remoteBytesRead += bytes;
+    }
+  }
+
+  /**
    * {@inheritDoc}
    *
    * Increment the counter when a remote read operation occurs.
@@ -178,6 +204,14 @@ public class AbfsInputStreamStatisticsImpl
     return remoteReadOperations;
   }
 
+  public long getReadAheadBytesRead() {
+    return readAheadBytesRead;
+  }
+
+  public long getRemoteBytesRead() {
+    return remoteBytesRead;
+  }
+
   /**
    * String operator describes all the current statistics.
    * <b>Important: there are no guarantees as to the stability
@@ -199,6 +233,8 @@ public class AbfsInputStreamStatisticsImpl
     sb.append(", ReadOperations=").append(readOperations);
     sb.append(", bytesReadFromBuffer=").append(bytesReadFromBuffer);
     sb.append(", remoteReadOperations=").append(remoteReadOperations);
+    sb.append(", readAheadBytesRead=").append(readAheadBytesRead);
+    sb.append(", remoteBytesRead=").append(remoteBytesRead);
     sb.append('}');
     return sb.toString();
   }
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java
index 7a62eca..8385099 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.fs.azurebfs;
 
 import java.io.IOException;
 
+import org.assertj.core.api.Assertions;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,6 +40,10 @@ public class ITestAbfsInputStreamStatistics
       LoggerFactory.getLogger(ITestAbfsInputStreamStatistics.class);
   private static final int ONE_MB = 1024 * 1024;
   private static final int ONE_KB = 1024;
+  private static final int CUSTOM_BLOCK_BUFFER_SIZE = 4 * 1024;
+  private static final int CUSTOM_READ_AHEAD_BUFFER_SIZE = 8 * CUSTOM_BLOCK_BUFFER_SIZE;
+  private static final int THREAD_SLEEP_10_SECONDS = 10;
+  private static final int TIMEOUT_30_SECONDS = 30000;
   private byte[] defBuffer = new byte[ONE_MB];
 
   public ITestAbfsInputStreamStatistics() throws Exception {
@@ -75,6 +80,8 @@ public class ITestAbfsInputStreamStatistics
       checkInitValue(stats.getReadOperations(), "readOps");
       checkInitValue(stats.getBytesReadFromBuffer(), "bytesReadFromBuffer");
       checkInitValue(stats.getRemoteReadOperations(), "remoteReadOps");
+      checkInitValue(stats.getReadAheadBytesRead(), "readAheadBytesRead");
+      checkInitValue(stats.getRemoteBytesRead(), "readAheadRemoteBytesRead");
 
     } finally {
       IOUtils.cleanupWithLogger(LOG, outputStream, inputStream);
@@ -286,6 +293,94 @@ public class ITestAbfsInputStreamStatistics
   }
 
   /**
+   * Testing readAhead counters in AbfsInputStream with 30 seconds timeout.
+   */
+  @Test(timeout = TIMEOUT_30_SECONDS)
+  public void testReadAheadCounters() throws IOException, InterruptedException {
+    describe("Test to check correct values for readAhead counters in "
+        + "AbfsInputStream");
+
+    AzureBlobFileSystem fs = getFileSystem();
+    AzureBlobFileSystemStore abfss = fs.getAbfsStore();
+    Path readAheadCountersPath = path(getMethodName());
+
+    /*
+     * Setting the block size for readAhead as 4KB.
+     */
+    abfss.getAbfsConfiguration().setReadBufferSize(CUSTOM_BLOCK_BUFFER_SIZE);
+
+    AbfsOutputStream out = null;
+    AbfsInputStream in = null;
+
+    try {
+
+      /*
+       * Creating a file of 1MB size.
+       */
+      out = createAbfsOutputStreamWithFlushEnabled(fs, readAheadCountersPath);
+      out.write(defBuffer);
+      out.close();
+
+      in = abfss.openFileForRead(readAheadCountersPath, fs.getFsStatistics());
+
+      /*
+       * Reading 1KB after each i * KB positions. Hence the reads are from 0
+       * to 1KB, 1KB to 2KB, and so on.. for 5 operations.
+       */
+      for (int i = 0; i < 5; i++) {
+        in.seek(ONE_KB * i);
+        in.read(defBuffer, ONE_KB * i, ONE_KB);
+      }
+      AbfsInputStreamStatisticsImpl stats =
+          (AbfsInputStreamStatisticsImpl) in.getStreamStatistics();
+
+      /*
+       * Since, readAhead is done in background threads. Sometimes, the
+       * threads aren't finished in the background and could result in
+       * inaccurate results. So, we wait till we have the accurate values
+       * with a limit of 30 seconds as that's when the test times out.
+       *
+       */
+      while (stats.getRemoteBytesRead() < CUSTOM_READ_AHEAD_BUFFER_SIZE
+          || stats.getReadAheadBytesRead() < CUSTOM_BLOCK_BUFFER_SIZE) {
+        Thread.sleep(THREAD_SLEEP_10_SECONDS);
+      }
+
+      /*
+       * Verifying the counter values of readAheadBytesRead and remoteBytesRead.
+       *
+       * readAheadBytesRead : Since, we read 1KBs 5 times, that means we go
+       * from 0 to 5KB in the file. The bufferSize is set to 4KB, and since
+       * we have 8 blocks of readAhead buffer. We would have 8 blocks of 4KB
+       * buffer. Our read is till 5KB, hence readAhead would ideally read 2
+       * blocks of 4KB which is equal to 8KB. But, sometimes to get more than
+       * one block from readAhead buffer we might have to wait for background
+       * threads to fill the buffer and hence we might do remote read which
+       * would be faster. Therefore, readAheadBytesRead would be equal to or
+       * greater than 4KB.
+       *
+       * remoteBytesRead : Since, the bufferSize is set to 4KB and the number
+       * of blocks or readAheadQueueDepth is equal to 8. We would read 8 * 4
+       * KB buffer on the first read, which is equal to 32KB. But, if we are not
+       * able to read some bytes that were in the buffer after doing
+       * readAhead, we might use remote read again. Thus, the bytes read
+       * remotely could also be greater than 32Kb.
+       *
+       */
+      Assertions.assertThat(stats.getReadAheadBytesRead()).describedAs(
+          "Mismatch in readAheadBytesRead counter value")
+          .isGreaterThanOrEqualTo(CUSTOM_BLOCK_BUFFER_SIZE);
+
+      Assertions.assertThat(stats.getRemoteBytesRead()).describedAs(
+          "Mismatch in remoteBytesRead counter value")
+          .isGreaterThanOrEqualTo(CUSTOM_READ_AHEAD_BUFFER_SIZE);
+
+    } finally {
+      IOUtils.cleanupWithLogger(LOG, out, in);
+    }
+  }
+
+  /**
    * Method to assert the initial values of the statistics.
    *
    * @param actualValue the actual value of the statistics.


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org