You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2021/10/22 06:16:15 UTC

[hadoop] branch branch-3.3 updated: HADOOP-17770 WASB : Support disabling buffered reads in positional reads (#3233)

This is an automated email from the ASF dual-hosted git repository.

surendralilhore pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 913d06a  HADOOP-17770 WASB : Support disabling buffered reads in positional reads (#3233)
913d06a is described below

commit 913d06ad4d43f4943e89f834658275bf60309284
Author: Anoop Sam John <an...@gmail.com>
AuthorDate: Fri Oct 22 11:45:42 2021 +0530

    HADOOP-17770 WASB : Support disabling buffered reads in positional reads (#3233)
---
 .../hadoop-azure/dev-support/findbugs-exclude.xml  | 13 +++++
 .../fs/azure/AzureNativeFileSystemStore.java       | 31 +++++++++--
 .../hadoop/fs/azure/BlockBlobInputStream.java      | 41 +++++++++++++-
 .../hadoop/fs/azure/NativeAzureFileSystem.java     | 64 +++++++++++++++++++++-
 .../hadoop/fs/azure/NativeFileSystemStore.java     |  4 ++
 .../hadoop-azure/src/site/markdown/index.md        | 11 ++++
 .../hadoop/fs/azure/ITestBlockBlobInputStream.java | 56 +++++++++++++++++++
 7 files changed, 212 insertions(+), 8 deletions(-)

diff --git a/hadoop-tools/hadoop-azure/dev-support/findbugs-exclude.xml b/hadoop-tools/hadoop-azure/dev-support/findbugs-exclude.xml
index b750b8b..fa6085f 100644
--- a/hadoop-tools/hadoop-azure/dev-support/findbugs-exclude.xml
+++ b/hadoop-tools/hadoop-azure/dev-support/findbugs-exclude.xml
@@ -83,4 +83,17 @@
     <Bug pattern="IS2_INCONSISTENT_SYNC" />
   </Match>
 
+  <!-- This field is instance of BlockBlobInputStream and read(long, byte[], int, int)
+  calls it's Super class method when 'fs.azure.block.blob.buffered.pread.disable'
+  is configured false. Super class FSInputStream's implementation is having
+  proper synchronization.
+  When 'fs.azure.block.blob.buffered.pread.disable' is true, we want a lock free
+  implementation of blob read. Here we don't use any of the InputStream's
+  shared resource (buffer) and also don't change any cursor position etc.
+  So its safe to go with unsynchronized way of read. -->
+  <Match>
+    <Class name="org.apache.hadoop.fs.azure.NativeAzureFileSystem$NativeAzureFsInputStream" />
+    <Field name="in" />
+    <Bug pattern="IS2_INCONSISTENT_SYNC" />
+  </Match>
  </FindBugsFilter>
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
index c613468..368283a 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
@@ -40,6 +40,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 
 import org.apache.commons.lang3.StringUtils;
@@ -236,6 +237,16 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
   public static final String KEY_ENABLE_FLAT_LISTING = "fs.azure.flatlist.enable";
 
   /**
+   * Optional config to enable a lock free pread which will bypass buffer in
+   * BlockBlobInputStream.
+   * This is not a config which can be set at cluster level. It can be used as
+   * an option on FutureDataInputStreamBuilder.
+   * @see FileSystem#openFile(org.apache.hadoop.fs.Path)
+   */
+  public static final String FS_AZURE_BLOCK_BLOB_BUFFERED_PREAD_DISABLE =
+      "fs.azure.block.blob.buffered.pread.disable";
+
+  /**
    * The set of directories where we should apply atomic folder rename
    * synchronized with createNonRecursive.
    */
@@ -1577,8 +1588,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
    * Opens a new input stream for the given blob (page or block blob)
    * to read its data.
    */
-  private InputStream openInputStream(CloudBlobWrapper blob)
-      throws StorageException, IOException {
+  private InputStream openInputStream(CloudBlobWrapper blob,
+      Optional<Configuration> options) throws StorageException, IOException {
     if (blob instanceof CloudBlockBlobWrapper) {
       LOG.debug("Using stream seek algorithm {}", inputStreamVersion);
       switch(inputStreamVersion) {
@@ -1586,9 +1597,13 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
         return blob.openInputStream(getDownloadOptions(),
             getInstrumentedContext(isConcurrentOOBAppendAllowed()));
       case 2:
+        boolean bufferedPreadDisabled = options.map(c -> c
+            .getBoolean(FS_AZURE_BLOCK_BLOB_BUFFERED_PREAD_DISABLE, false))
+            .orElse(false);
         return new BlockBlobInputStream((CloudBlockBlobWrapper) blob,
             getDownloadOptions(),
-            getInstrumentedContext(isConcurrentOOBAppendAllowed()));
+            getInstrumentedContext(isConcurrentOOBAppendAllowed()),
+            bufferedPreadDisabled);
       default:
         throw new IOException("Unknown seek algorithm: " + inputStreamVersion);
       }
@@ -2262,6 +2277,12 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
   @Override
   public InputStream retrieve(String key, long startByteOffset)
       throws AzureException, IOException {
+    return retrieve(key, startByteOffset, Optional.empty());
+  }
+
+  @Override
+  public InputStream retrieve(String key, long startByteOffset,
+      Optional<Configuration> options) throws AzureException, IOException {
       try {
         // Check if a session exists, if not create a session with the
         // Azure storage server.
@@ -2273,7 +2294,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
         }
         checkContainer(ContainerAccessType.PureRead);
 
-        InputStream inputStream = openInputStream(getBlobReference(key));
+        InputStream inputStream = openInputStream(getBlobReference(key), options);
         if (startByteOffset > 0) {
           // Skip bytes and ignore return value. This is okay
           // because if you try to skip too far you will be positioned
@@ -2824,7 +2845,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
         OutputStream opStream = null;
         try {
           if (srcBlob.getProperties().getBlobType() == BlobType.PAGE_BLOB){
-            ipStream = openInputStream(srcBlob);
+            ipStream = openInputStream(srcBlob, Optional.empty());
             opStream = openOutputStream(dstBlob);
             byte[] buffer = new byte[PageBlobFormatHelpers.PAGE_SIZE];
             int len;
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobInputStream.java
index c37b2be..00e84ad 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobInputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobInputStream.java
@@ -28,6 +28,7 @@ import com.microsoft.azure.storage.StorageException;
 import com.microsoft.azure.storage.blob.BlobRequestOptions;
 
 import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.fs.Seekable;
 import org.apache.hadoop.fs.azure.StorageInterface.CloudBlockBlobWrapper;
 
@@ -36,10 +37,11 @@ import org.apache.hadoop.fs.azure.StorageInterface.CloudBlockBlobWrapper;
  * random access and seek. Random access performance is improved by several
  * orders of magnitude.
  */
-final class BlockBlobInputStream extends InputStream implements Seekable {
+final class BlockBlobInputStream extends FSInputStream {
   private final CloudBlockBlobWrapper blob;
   private final BlobRequestOptions options;
   private final OperationContext opContext;
+  private final boolean bufferedPreadDisabled;
   private InputStream blobInputStream = null;
   private int minimumReadSizeInBytes = 0;
   private long streamPositionAfterLastRead = -1;
@@ -64,10 +66,12 @@ final class BlockBlobInputStream extends InputStream implements Seekable {
    */
   BlockBlobInputStream(CloudBlockBlobWrapper blob,
       BlobRequestOptions options,
-      OperationContext opContext) throws IOException {
+      OperationContext opContext, boolean bufferedPreadDisabled)
+      throws IOException {
     this.blob = blob;
     this.options = options;
     this.opContext = opContext;
+    this.bufferedPreadDisabled = bufferedPreadDisabled;
 
     this.minimumReadSizeInBytes = blob.getStreamMinimumReadSizeInBytes();
 
@@ -263,6 +267,39 @@ final class BlockBlobInputStream extends InputStream implements Seekable {
     }
   }
 
+  @Override
+  public int read(long position, byte[] buffer, int offset, int length)
+      throws IOException {
+    synchronized (this) {
+      checkState();
+    }
+    if (!bufferedPreadDisabled) {
+      // This will do a seek + read in which the streamBuffer will get used.
+      return super.read(position, buffer, offset, length);
+    }
+    validatePositionedReadArgs(position, buffer, offset, length);
+    if (length == 0) {
+      return 0;
+    }
+    if (position >= streamLength) {
+      throw new EOFException("position is beyond stream capacity");
+    }
+    MemoryOutputStream os = new MemoryOutputStream(buffer, offset, length);
+    long bytesToRead = Math.min(minimumReadSizeInBytes,
+        Math.min(os.capacity(), streamLength - position));
+    try {
+      blob.downloadRange(position, bytesToRead, os, options, opContext);
+    } catch (StorageException e) {
+      throw new IOException(e);
+    }
+    int bytesRead = os.size();
+    if (bytesRead == 0) {
+      // This may happen if the blob was modified after the length was obtained.
+      throw new EOFException("End of stream reached unexpectedly.");
+    }
+    return bytesRead;
+  }
+
   /**
    * Reads up to <code>len</code> bytes of data from the input stream into an
    * array of bytes.
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
index 48ef495..e9f0e78 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
@@ -33,11 +33,14 @@ import java.util.Date;
 import java.util.EnumSet;
 import java.util.TimeZone;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.Stack;
 import java.util.HashMap;
 
@@ -61,6 +64,7 @@ import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PositionedReadable;
 import org.apache.hadoop.fs.Seekable;
 import org.apache.hadoop.fs.StreamCapabilities;
 import org.apache.hadoop.fs.Syncable;
@@ -70,6 +74,8 @@ import org.apache.hadoop.fs.azure.metrics.AzureFileSystemMetricsSystem;
 import org.apache.hadoop.fs.azure.security.Constants;
 import org.apache.hadoop.fs.azure.security.RemoteWasbDelegationTokenManager;
 import org.apache.hadoop.fs.azure.security.WasbDelegationTokenManager;
+import org.apache.hadoop.fs.impl.AbstractFSBuilderImpl;
+import org.apache.hadoop.fs.impl.OpenFileParameters;
 import org.apache.hadoop.fs.impl.StoreImplementationUtils;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -79,6 +85,7 @@ import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
+import org.apache.hadoop.util.LambdaUtils;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.Time;
 
@@ -916,6 +923,43 @@ public class NativeAzureFileSystem extends FileSystem {
     }
 
     @Override
+    public int read(long position, byte[] buffer, int offset, int length)
+        throws IOException {
+      // SpotBugs reports bug type IS2_INCONSISTENT_SYNC here.
+      // This report is not valid here.
+      // 'this.in' is instance of BlockBlobInputStream and read(long, byte[], int, int)
+      // calls it's Super class method when 'fs.azure.block.blob.buffered.pread.disable'
+      // is configured false. Super class FSInputStream's implementation is having
+      // proper synchronization.
+      // When 'fs.azure.block.blob.buffered.pread.disable' is true, we want a lock free
+      // implementation of blob read. Here we don't use any of the InputStream's
+      // shared resource (buffer) and also don't change any cursor position etc.
+      // So its safe to go with unsynchronized way of read.
+      if (in instanceof PositionedReadable) {
+        try {
+          int result = ((PositionedReadable) this.in).read(position, buffer,
+              offset, length);
+          if (null != statistics && result > 0) {
+            statistics.incrementBytesRead(result);
+          }
+          return result;
+        } catch (IOException e) {
+          Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
+          if (innerException instanceof StorageException) {
+            LOG.error("Encountered Storage Exception for read on Blob : {}"
+                + " Exception details: {} Error Code : {}",
+                key, e, ((StorageException) innerException).getErrorCode());
+            if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
+              throw new FileNotFoundException(String.format("%s is not found", key));
+            }
+          }
+          throw e;
+        }
+      }
+      return super.read(position, buffer, offset, length);
+    }
+
+    @Override
     public synchronized void close() throws IOException {
       if (!closed) {
         closed = true;
@@ -3043,6 +3087,12 @@ public class NativeAzureFileSystem extends FileSystem {
 
   @Override
   public FSDataInputStream open(Path f, int bufferSize) throws FileNotFoundException, IOException {
+    return open(f, bufferSize, Optional.empty());
+  }
+
+  private FSDataInputStream open(Path f, int bufferSize,
+      Optional<Configuration> options)
+      throws FileNotFoundException, IOException {
 
     LOG.debug("Opening file: {}", f.toString());
 
@@ -3077,7 +3127,7 @@ public class NativeAzureFileSystem extends FileSystem {
 
     InputStream inputStream;
     try {
-      inputStream = store.retrieve(key);
+      inputStream = store.retrieve(key, 0, options);
     } catch(Exception ex) {
       Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
 
@@ -3095,6 +3145,18 @@ public class NativeAzureFileSystem extends FileSystem {
   }
 
   @Override
+  protected CompletableFuture<FSDataInputStream> openFileWithOptions(Path path,
+      OpenFileParameters parameters) throws IOException {
+    AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(
+        parameters.getMandatoryKeys(),
+        Collections.emptySet(),
+        "for " + path);
+    return LambdaUtils.eval(
+        new CompletableFuture<>(), () ->
+            open(path, parameters.getBufferSize(), Optional.of(parameters.getOptions())));
+  }
+
+  @Override
   public boolean rename(Path src, Path dst) throws FileNotFoundException, IOException {
 
     FolderRenamePending renamePending = null;
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
index 0944b1b..91aad99 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
 import java.util.Date;
+import java.util.Optional;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
@@ -50,6 +51,9 @@ interface NativeFileSystemStore {
 
   InputStream retrieve(String key, long byteRangeStart) throws IOException;
 
+  InputStream retrieve(String key, long byteRangeStart,
+      Optional<Configuration> options) throws IOException;
+
   DataOutputStream storefile(String keyEncoded,
       PermissionStatus permissionStatus,
       String key) throws AzureException;
diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/index.md b/hadoop-tools/hadoop-azure/src/site/markdown/index.md
index 11d0a18..2af6b49 100644
--- a/hadoop-tools/hadoop-azure/src/site/markdown/index.md
+++ b/hadoop-tools/hadoop-azure/src/site/markdown/index.md
@@ -545,6 +545,17 @@ The maximum number of entries that that cache can hold can be customized using t
     </property>
 ```
 
+### Performance optimization configurations
+
+`fs.azure.block.blob.buffered.pread.disable`: By default the positional read API will do a
+seek and read on input stream. This read will fill the buffer cache in
+BlockBlobInputStream. If this configuration is true it will skip usage of buffer and do a
+lock free call for reading from blob. This optimization is very much helpful for HBase kind
+of short random read over a shared InputStream instance.
+Note: This is not a config which can be set at cluster level. It can be used as
+an option on FutureDataInputStreamBuilder.
+See FileSystem#openFile(Path path)
+
 ## Further Reading
 
 * [Testing the Azure WASB client](testing_azure.html).
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestBlockBlobInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestBlockBlobInputStream.java
index 07a13df..cea11c0 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestBlockBlobInputStream.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestBlockBlobInputStream.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FSExceptionMessages;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.azure.integration.AbstractAzureScaleTest;
 import org.apache.hadoop.fs.azure.integration.AzureTestUtils;
@@ -306,6 +307,61 @@ public class ITestBlockBlobInputStream extends AbstractAzureScaleTest {
     assertArrayEquals("Mismatch in read data", bufferV1, bufferV2);
   }
 
+  @Test
+  public void test_202_PosReadTest() throws Exception {
+    assumeHugeFileExists();
+    FutureDataInputStreamBuilder builder = accountUsingInputStreamV2
+        .getFileSystem().openFile(TEST_FILE_PATH);
+    builder.opt(AzureNativeFileSystemStore.FS_AZURE_BLOCK_BLOB_BUFFERED_PREAD_DISABLE, true);
+    try (
+        FSDataInputStream inputStreamV1
+            = accountUsingInputStreamV1.getFileSystem().open(TEST_FILE_PATH);
+        FSDataInputStream inputStreamV2
+            = accountUsingInputStreamV2.getFileSystem().open(TEST_FILE_PATH);
+        FSDataInputStream inputStreamV2NoBuffer = builder.build().get();
+    ) {
+      final int bufferSize = 4 * KILOBYTE;
+      byte[] bufferV1 = new byte[bufferSize];
+      byte[] bufferV2 = new byte[bufferSize];
+      byte[] bufferV2NoBuffer = new byte[bufferSize];
+
+      verifyConsistentReads(inputStreamV1, inputStreamV2, inputStreamV2NoBuffer, 0,
+          bufferV1, bufferV2, bufferV2NoBuffer);
+
+      int pos = 2 * KILOBYTE;
+      verifyConsistentReads(inputStreamV1, inputStreamV2, inputStreamV2NoBuffer, pos,
+          bufferV1, bufferV2, bufferV2NoBuffer);
+
+      pos = 10 * KILOBYTE;
+      verifyConsistentReads(inputStreamV1, inputStreamV2, inputStreamV2NoBuffer, pos,
+          bufferV1, bufferV2, bufferV2NoBuffer);
+
+      pos = 4100 * KILOBYTE;
+      verifyConsistentReads(inputStreamV1, inputStreamV2, inputStreamV2NoBuffer, pos,
+          bufferV1, bufferV2, bufferV2NoBuffer);
+    }
+  }
+
+  private void verifyConsistentReads(FSDataInputStream inputStreamV1,
+      FSDataInputStream inputStreamV2, FSDataInputStream inputStreamV2NoBuffer,
+      int pos, byte[] bufferV1, byte[] bufferV2, byte[] bufferV2NoBuffer)
+      throws IOException {
+    int size = bufferV1.length;
+    int numBytesReadV1 = inputStreamV1.read(pos, bufferV1, 0, size);
+    assertEquals("Bytes read from V1 stream", size, numBytesReadV1);
+
+    int numBytesReadV2 = inputStreamV2.read(pos, bufferV2, 0, size);
+    assertEquals("Bytes read from V2 stream", size, numBytesReadV2);
+
+    int numBytesReadV2NoBuffer = inputStreamV2NoBuffer.read(pos,
+        bufferV2NoBuffer, 0, size);
+    assertEquals("Bytes read from V2 stream (buffered pread disabled)", size,
+        numBytesReadV2NoBuffer);
+
+    assertArrayEquals("Mismatch in read data", bufferV1, bufferV2);
+    assertArrayEquals("Mismatch in read data", bufferV2, bufferV2NoBuffer);
+  }
+
   /**
    * Validates the implementation of InputStream.markSupported.
    * @throws IOException

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org