You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2020/09/08 13:02:48 UTC

[hadoop] branch branch-3.3 updated (7970710 -> ccceec8)

This is an automated email from the ASF dual-hosted git repository.

stevel pushed a change to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git.


    from 7970710  HADOOP-17229. No update of bytes received counter value after response failure occurs in ABFS (#2264)
     new 28f1ded  HADOOP-17113. Adding ReadAhead Counters in ABFS (#2154)
     new ccceec8  HADOOP-17158. Test timeout for ITestAbfsInputStreamStatistics#testReadAheadCounters (#2272)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../fs/azurebfs/services/AbfsInputStream.java      | 30 ++++++++
 .../services/AbfsInputStreamStatistics.java        | 12 ++++
 .../services/AbfsInputStreamStatisticsImpl.java    | 36 ++++++++++
 .../azurebfs/ITestAbfsInputStreamStatistics.java   | 81 ++++++++++++++++++++++
 4 files changed, 159 insertions(+)


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 01/02: HADOOP-17113. Adding ReadAhead Counters in ABFS (#2154)

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 28f1ded9fedd0973e88a80337d7c6251946a0672
Author: Mehakmeet Singh <me...@gmail.com>
AuthorDate: Wed Jul 22 22:52:30 2020 +0530

    HADOOP-17113. Adding ReadAhead Counters in ABFS (#2154)
    
    Contributed by Mehakmeet Singh
    
    Change-Id: I6bbd8165385a9267ed64831bb1efa18b6554feb1
---
 .../fs/azurebfs/services/AbfsInputStream.java      |  6 ++
 .../services/AbfsInputStreamStatistics.java        | 12 +++
 .../services/AbfsInputStreamStatisticsImpl.java    | 36 ++++++++
 .../azurebfs/ITestAbfsInputStreamStatistics.java   | 95 ++++++++++++++++++++++
 4 files changed, 149 insertions(+)

diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
index a809bde..926c23d 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
@@ -238,6 +238,9 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
       if (receivedBytes > 0) {
         incrementReadOps();
         LOG.debug("Received data from read ahead, not doing remote read");
+        if (streamStatistics != null) {
+          streamStatistics.readAheadBytesRead(receivedBytes);
+        }
         return receivedBytes;
       }
 
@@ -292,6 +295,9 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
       throw new IOException(ex);
     }
     long bytesRead = op.getResult().getBytesReceived();
+    if (streamStatistics != null) {
+      streamStatistics.remoteBytesRead(bytesRead);
+    }
     if (bytesRead > Integer.MAX_VALUE) {
       throw new IOException("Unexpected Content-Length");
     }
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatistics.java
index 2603394..c910a1f 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatistics.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatistics.java
@@ -85,6 +85,18 @@ public interface AbfsInputStreamStatistics {
   void remoteReadOperation();
 
   /**
+   * Records the bytes read from readAhead buffer.
+   * @param bytes the bytes to be incremented.
+   */
+  void readAheadBytesRead(long bytes);
+
+  /**
+   * Records bytes read remotely after nothing from readAheadBuffer was read.
+   * @param bytes the bytes to be incremented.
+   */
+  void remoteBytesRead(long bytes);
+
+  /**
    * Makes the string of all the AbfsInputStream statistics.
    * @return the string with all the statistics.
    */
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java
index fd18910..12cc407 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java
@@ -33,6 +33,8 @@ public class AbfsInputStreamStatisticsImpl
   private long readOperations;
   private long bytesReadFromBuffer;
   private long remoteReadOperations;
+  private long readAheadBytesRead;
+  private long remoteBytesRead;
 
   /**
    * Seek backwards, incrementing the seek and backward seek counters.
@@ -129,6 +131,30 @@ public class AbfsInputStreamStatisticsImpl
   }
 
   /**
+   * Total bytes read from readAhead buffer during a read operation.
+   *
+   * @param bytes the bytes to be incremented.
+   */
+  @Override
+  public void readAheadBytesRead(long bytes) {
+    if (bytes > 0) {
+      readAheadBytesRead += bytes;
+    }
+  }
+
+  /**
+   * Total bytes read remotely after nothing was read from readAhead buffer.
+   *
+   * @param bytes the bytes to be incremented.
+   */
+  @Override
+  public void remoteBytesRead(long bytes) {
+    if (bytes > 0) {
+      remoteBytesRead += bytes;
+    }
+  }
+
+  /**
    * {@inheritDoc}
    *
    * Increment the counter when a remote read operation occurs.
@@ -178,6 +204,14 @@ public class AbfsInputStreamStatisticsImpl
     return remoteReadOperations;
   }
 
+  public long getReadAheadBytesRead() {
+    return readAheadBytesRead;
+  }
+
+  public long getRemoteBytesRead() {
+    return remoteBytesRead;
+  }
+
   /**
    * String operator describes all the current statistics.
    * <b>Important: there are no guarantees as to the stability
@@ -199,6 +233,8 @@ public class AbfsInputStreamStatisticsImpl
     sb.append(", ReadOperations=").append(readOperations);
     sb.append(", bytesReadFromBuffer=").append(bytesReadFromBuffer);
     sb.append(", remoteReadOperations=").append(remoteReadOperations);
+    sb.append(", readAheadBytesRead=").append(readAheadBytesRead);
+    sb.append(", remoteBytesRead=").append(remoteBytesRead);
     sb.append('}');
     return sb.toString();
   }
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java
index 7a62eca..8385099 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.fs.azurebfs;
 
 import java.io.IOException;
 
+import org.assertj.core.api.Assertions;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,6 +40,10 @@ public class ITestAbfsInputStreamStatistics
       LoggerFactory.getLogger(ITestAbfsInputStreamStatistics.class);
   private static final int ONE_MB = 1024 * 1024;
   private static final int ONE_KB = 1024;
+  private static final int CUSTOM_BLOCK_BUFFER_SIZE = 4 * 1024;
+  private static final int CUSTOM_READ_AHEAD_BUFFER_SIZE = 8 * CUSTOM_BLOCK_BUFFER_SIZE;
+  private static final int THREAD_SLEEP_10_SECONDS = 10;
+  private static final int TIMEOUT_30_SECONDS = 30000;
   private byte[] defBuffer = new byte[ONE_MB];
 
   public ITestAbfsInputStreamStatistics() throws Exception {
@@ -75,6 +80,8 @@ public class ITestAbfsInputStreamStatistics
       checkInitValue(stats.getReadOperations(), "readOps");
       checkInitValue(stats.getBytesReadFromBuffer(), "bytesReadFromBuffer");
       checkInitValue(stats.getRemoteReadOperations(), "remoteReadOps");
+      checkInitValue(stats.getReadAheadBytesRead(), "readAheadBytesRead");
+      checkInitValue(stats.getRemoteBytesRead(), "readAheadRemoteBytesRead");
 
     } finally {
       IOUtils.cleanupWithLogger(LOG, outputStream, inputStream);
@@ -286,6 +293,94 @@ public class ITestAbfsInputStreamStatistics
   }
 
   /**
+   * Testing readAhead counters in AbfsInputStream with 30 seconds timeout.
+   */
+  @Test(timeout = TIMEOUT_30_SECONDS)
+  public void testReadAheadCounters() throws IOException, InterruptedException {
+    describe("Test to check correct values for readAhead counters in "
+        + "AbfsInputStream");
+
+    AzureBlobFileSystem fs = getFileSystem();
+    AzureBlobFileSystemStore abfss = fs.getAbfsStore();
+    Path readAheadCountersPath = path(getMethodName());
+
+    /*
+     * Setting the block size for readAhead as 4KB.
+     */
+    abfss.getAbfsConfiguration().setReadBufferSize(CUSTOM_BLOCK_BUFFER_SIZE);
+
+    AbfsOutputStream out = null;
+    AbfsInputStream in = null;
+
+    try {
+
+      /*
+       * Creating a file of 1MB size.
+       */
+      out = createAbfsOutputStreamWithFlushEnabled(fs, readAheadCountersPath);
+      out.write(defBuffer);
+      out.close();
+
+      in = abfss.openFileForRead(readAheadCountersPath, fs.getFsStatistics());
+
+      /*
+       * Reading 1KB after each i * KB positions. Hence the reads are from 0
+       * to 1KB, 1KB to 2KB, and so on.. for 5 operations.
+       */
+      for (int i = 0; i < 5; i++) {
+        in.seek(ONE_KB * i);
+        in.read(defBuffer, ONE_KB * i, ONE_KB);
+      }
+      AbfsInputStreamStatisticsImpl stats =
+          (AbfsInputStreamStatisticsImpl) in.getStreamStatistics();
+
+      /*
+       * Since, readAhead is done in background threads. Sometimes, the
+       * threads aren't finished in the background and could result in
+       * inaccurate results. So, we wait till we have the accurate values
+       * with a limit of 30 seconds as that's when the test times out.
+       *
+       */
+      while (stats.getRemoteBytesRead() < CUSTOM_READ_AHEAD_BUFFER_SIZE
+          || stats.getReadAheadBytesRead() < CUSTOM_BLOCK_BUFFER_SIZE) {
+        Thread.sleep(THREAD_SLEEP_10_SECONDS);
+      }
+
+      /*
+       * Verifying the counter values of readAheadBytesRead and remoteBytesRead.
+       *
+       * readAheadBytesRead : Since, we read 1KBs 5 times, that means we go
+       * from 0 to 5KB in the file. The bufferSize is set to 4KB, and since
+       * we have 8 blocks of readAhead buffer. We would have 8 blocks of 4KB
+       * buffer. Our read is till 5KB, hence readAhead would ideally read 2
+       * blocks of 4KB which is equal to 8KB. But, sometimes to get more than
+       * one block from readAhead buffer we might have to wait for background
+       * threads to fill the buffer and hence we might do remote read which
+       * would be faster. Therefore, readAheadBytesRead would be equal to or
+       * greater than 4KB.
+       *
+       * remoteBytesRead : Since, the bufferSize is set to 4KB and the number
+       * of blocks or readAheadQueueDepth is equal to 8. We would read 8 * 4
+       * KB buffer on the first read, which is equal to 32KB. But, if we are not
+       * able to read some bytes that were in the buffer after doing
+       * readAhead, we might use remote read again. Thus, the bytes read
+       * remotely could also be greater than 32Kb.
+       *
+       */
+      Assertions.assertThat(stats.getReadAheadBytesRead()).describedAs(
+          "Mismatch in readAheadBytesRead counter value")
+          .isGreaterThanOrEqualTo(CUSTOM_BLOCK_BUFFER_SIZE);
+
+      Assertions.assertThat(stats.getRemoteBytesRead()).describedAs(
+          "Mismatch in remoteBytesRead counter value")
+          .isGreaterThanOrEqualTo(CUSTOM_READ_AHEAD_BUFFER_SIZE);
+
+    } finally {
+      IOUtils.cleanupWithLogger(LOG, out, in);
+    }
+  }
+
+  /**
    * Method to assert the initial values of the statistics.
    *
    * @param actualValue the actual value of the statistics.


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 02/02: HADOOP-17158. Test timeout for ITestAbfsInputStreamStatistics#testReadAheadCounters (#2272)

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit ccceec8af0ce989b20f967eb50228a31255e7109
Author: Mehakmeet Singh <me...@gmail.com>
AuthorDate: Tue Sep 8 14:41:06 2020 +0530

    HADOOP-17158. Test timeout for ITestAbfsInputStreamStatistics#testReadAheadCounters (#2272)
    
    Contributed by: Mehakmeet Singh.
    
    Change-Id: I7ebfa5cd1b5d25f7a750f0c645d7d93c81e89240
---
 .../fs/azurebfs/services/AbfsInputStream.java      | 24 +++++++++++++++
 .../azurebfs/ITestAbfsInputStreamStatistics.java   | 34 +++++++---------------
 2 files changed, 34 insertions(+), 24 deletions(-)

diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
index 926c23d..ff3bd63 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
@@ -70,6 +70,8 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
 
   /** Stream statistics. */
   private final AbfsInputStreamStatistics streamStatistics;
+  private long bytesFromReadAhead; // bytes read from readAhead; for testing
+  private long bytesFromRemoteRead; // bytes read remotely; for testing
 
   public AbfsInputStream(
           final AbfsClient client,
@@ -235,6 +237,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
 
       // try reading from buffers first
       receivedBytes = ReadBufferManager.getBufferManager().getBlock(this, position, length, b);
+      bytesFromReadAhead += receivedBytes;
       if (receivedBytes > 0) {
         incrementReadOps();
         LOG.debug("Received data from read ahead, not doing remote read");
@@ -302,6 +305,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
       throw new IOException("Unexpected Content-Length");
     }
     LOG.debug("HTTP request read bytes = {}", bytesRead);
+    bytesFromRemoteRead += bytesRead;
     return (int) bytesRead;
   }
 
@@ -504,6 +508,26 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
   }
 
   /**
+   * Getter for bytes read from readAhead buffer that fills asynchronously.
+   *
+   * @return value of the counter in long.
+   */
+  @VisibleForTesting
+  public long getBytesFromReadAhead() {
+    return bytesFromReadAhead;
+  }
+
+  /**
+   * Getter for bytes read remotely from the data store.
+   *
+   * @return value of the counter in long.
+   */
+  @VisibleForTesting
+  public long getBytesFromRemoteRead() {
+    return bytesFromRemoteRead;
+  }
+
+  /**
    * Get the statistics of the stream.
    * @return a string value.
    */
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java
index 8385099..52dfdf2 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java
@@ -41,9 +41,6 @@ public class ITestAbfsInputStreamStatistics
   private static final int ONE_MB = 1024 * 1024;
   private static final int ONE_KB = 1024;
   private static final int CUSTOM_BLOCK_BUFFER_SIZE = 4 * 1024;
-  private static final int CUSTOM_READ_AHEAD_BUFFER_SIZE = 8 * CUSTOM_BLOCK_BUFFER_SIZE;
-  private static final int THREAD_SLEEP_10_SECONDS = 10;
-  private static final int TIMEOUT_30_SECONDS = 30000;
   private byte[] defBuffer = new byte[ONE_MB];
 
   public ITestAbfsInputStreamStatistics() throws Exception {
@@ -295,8 +292,8 @@ public class ITestAbfsInputStreamStatistics
   /**
    * Testing readAhead counters in AbfsInputStream with 30 seconds timeout.
    */
-  @Test(timeout = TIMEOUT_30_SECONDS)
-  public void testReadAheadCounters() throws IOException, InterruptedException {
+  @Test
+  public void testReadAheadCounters() throws IOException {
     describe("Test to check correct values for readAhead counters in "
         + "AbfsInputStream");
 
@@ -335,45 +332,34 @@ public class ITestAbfsInputStreamStatistics
           (AbfsInputStreamStatisticsImpl) in.getStreamStatistics();
 
       /*
-       * Since, readAhead is done in background threads. Sometimes, the
-       * threads aren't finished in the background and could result in
-       * inaccurate results. So, we wait till we have the accurate values
-       * with a limit of 30 seconds as that's when the test times out.
-       *
-       */
-      while (stats.getRemoteBytesRead() < CUSTOM_READ_AHEAD_BUFFER_SIZE
-          || stats.getReadAheadBytesRead() < CUSTOM_BLOCK_BUFFER_SIZE) {
-        Thread.sleep(THREAD_SLEEP_10_SECONDS);
-      }
-
-      /*
        * Verifying the counter values of readAheadBytesRead and remoteBytesRead.
        *
        * readAheadBytesRead : Since, we read 1KBs 5 times, that means we go
        * from 0 to 5KB in the file. The bufferSize is set to 4KB, and since
        * we have 8 blocks of readAhead buffer. We would have 8 blocks of 4KB
        * buffer. Our read is till 5KB, hence readAhead would ideally read 2
-       * blocks of 4KB which is equal to 8KB. But, sometimes to get more than
-       * one block from readAhead buffer we might have to wait for background
+       * blocks of 4KB which is equal to 8KB. But, sometimes to get blocks
+       * from readAhead buffer we might have to wait for background
        * threads to fill the buffer and hence we might do remote read which
-       * would be faster. Therefore, readAheadBytesRead would be equal to or
-       * greater than 4KB.
+       * would be faster. Therefore, readAheadBytesRead would be greater than
+       * or equal to the value of bytesFromReadAhead at the point we measure it.
        *
        * remoteBytesRead : Since, the bufferSize is set to 4KB and the number
        * of blocks or readAheadQueueDepth is equal to 8. We would read 8 * 4
        * KB buffer on the first read, which is equal to 32KB. But, if we are not
        * able to read some bytes that were in the buffer after doing
        * readAhead, we might use remote read again. Thus, the bytes read
-       * remotely could also be greater than 32Kb.
+       * remotely would be greater than or equal to the bytesFromRemoteRead
+       * value that we measure at some point of the operation.
        *
        */
       Assertions.assertThat(stats.getReadAheadBytesRead()).describedAs(
           "Mismatch in readAheadBytesRead counter value")
-          .isGreaterThanOrEqualTo(CUSTOM_BLOCK_BUFFER_SIZE);
+          .isGreaterThanOrEqualTo(in.getBytesFromReadAhead());
 
       Assertions.assertThat(stats.getRemoteBytesRead()).describedAs(
           "Mismatch in remoteBytesRead counter value")
-          .isGreaterThanOrEqualTo(CUSTOM_READ_AHEAD_BUFFER_SIZE);
+          .isGreaterThanOrEqualTo(in.getBytesFromRemoteRead());
 
     } finally {
       IOUtils.cleanupWithLogger(LOG, out, in);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org