You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by we...@apache.org on 2021/07/26 10:08:04 UTC

[hbase] branch branch-2 updated: HBASE-21946 Use ByteBuffer pread instead of byte[] pread in HFileBlock when applicable (#3434)

This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 37c3f7b  HBASE-21946 Use ByteBuffer pread instead of byte[] pread in HFileBlock when applicable (#3434)
37c3f7b is described below

commit 37c3f7b4467b71ecaad14fa20be6adb5d7b8c31b
Author: Wei-Chiu Chuang <we...@apache.org>
AuthorDate: Mon Jul 26 02:31:39 2021 -0700

    HBASE-21946 Use ByteBuffer pread instead of byte[] pread in HFileBlock when applicable (#3434)
    
    Signed-off-by: Duo Zhang <zh...@apache.org>
    (cherry picked from commit 4a3c7d73b04928ca0d63b7117186ddb9757997f2)
---
 .../apache/hadoop/hbase/io/util/BlockIOUtils.java  | 82 +++++++++++++++++--
 .../hadoop/hbase/io/hfile/TestBlockIOUtils.java    | 94 ++++++++++++++++++++++
 2 files changed, 171 insertions(+), 5 deletions(-)

diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/BlockIOUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/BlockIOUtils.java
index a98a478..01eda2d 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/BlockIOUtils.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/BlockIOUtils.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.io.util;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.nio.ByteBuffer;
 
 import org.apache.hadoop.fs.ByteBufferReadable;
@@ -27,15 +29,36 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @InterfaceAudience.Private
 public final class BlockIOUtils {
+  private static final Logger LOG =
+    LoggerFactory.getLogger(BlockIOUtils.class);
+  // TODO: remove the reflection when we update to Hadoop 3.3 or above.
+  private static Method byteBufferPositionedReadMethod;
+
+  static {
+    initByteBufferPositionReadableMethod();
+  }
 
   // Disallow instantiation
   private BlockIOUtils() {
 
   }
 
+  private static void initByteBufferPositionReadableMethod() {
+    try {
+      //long position, ByteBuffer buf
+      byteBufferPositionedReadMethod = FSDataInputStream.class.getMethod("read", long.class,
+        ByteBuffer.class);
+    } catch (NoSuchMethodException e) {
+      LOG.debug("Unable to find positioned bytebuffer read API of FSDataInputStream. "
+        + "preadWithExtra() will use a temporary on-heap byte array.");
+    }
+  }
+
   public static boolean isByteBufferReadable(FSDataInputStream is) {
     InputStream cur = is.getWrappedStream();
     for (;;) {
@@ -197,6 +220,10 @@ public final class BlockIOUtils {
    * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but uses positional read and
    * specifies a number of "extra" bytes that would be desirable but not absolutely necessary to
    * read.
+   *
+   * If the input stream supports ByteBufferPositionedReadable, it reads to the byte buffer
+   * directly, and does not allocate a temporary byte array.
+   *
    * @param buff ByteBuff to read into.
    * @param dis the input stream to read from
    * @param position the position within the stream from which to start reading
@@ -207,6 +234,17 @@ public final class BlockIOUtils {
    */
   public static boolean preadWithExtra(ByteBuff buff, FSDataInputStream dis, long position,
       int necessaryLen, int extraLen) throws IOException {
+    boolean preadbytebuffer = dis.hasCapability("in:preadbytebuffer");
+
+    if (preadbytebuffer) {
+      return preadWithExtraDirectly(buff, dis, position, necessaryLen, extraLen);
+    } else {
+      return preadWithExtraOnHeap(buff, dis, position, necessaryLen, extraLen);
+    }
+  }
+
+  private static boolean preadWithExtraOnHeap(ByteBuff buff, FSDataInputStream dis, long position,
+    int necessaryLen, int extraLen) throws IOException {
     int remain = necessaryLen + extraLen;
     byte[] buf = new byte[remain];
     int bytesRead = 0;
@@ -220,15 +258,49 @@ public final class BlockIOUtils {
       bytesRead += ret;
       remain -= ret;
     }
-    // Copy the bytes from on-heap bytes[] to ByteBuffer[] now, and after resolving HDFS-3246, we
-    // will read the bytes to ByteBuffer[] directly without allocating any on-heap byte[].
-    // TODO I keep the bytes copy here, because I want to abstract the ByteBuffer[]
-    // preadWithExtra method for the upper layer, only need to refactor this method if the
-    // ByteBuffer pread is OK.
     copyToByteBuff(buf, 0, bytesRead, buff);
     return (extraLen > 0) && (bytesRead == necessaryLen + extraLen);
   }
 
+  private static boolean preadWithExtraDirectly(ByteBuff buff, FSDataInputStream dis, long position,
+    int necessaryLen, int extraLen) throws IOException {
+    int remain = necessaryLen + extraLen, bytesRead = 0, idx = 0;
+    ByteBuffer[] buffers = buff.nioByteBuffers();
+    ByteBuffer cur = buffers[idx];
+    while (bytesRead < necessaryLen) {
+      int ret;
+      while (!cur.hasRemaining()) {
+        if (++idx >= buffers.length) {
+          throw new IOException("Not enough ByteBuffers to read the reminding " + remain + "bytes");
+        }
+        cur = buffers[idx];
+      }
+      cur.limit(cur.position() + Math.min(remain, cur.remaining()));
+      try {
+        ret = (Integer) byteBufferPositionedReadMethod.invoke(dis, position + bytesRead, cur);
+      } catch (IllegalAccessException e) {
+        throw new IOException("Unable to invoke ByteBuffer positioned read when trying to read "
+          + bytesRead + " bytes from position " + position, e);
+      } catch (InvocationTargetException e) {
+        throw new IOException("Encountered an exception when invoking ByteBuffer positioned read"
+          + " when trying to read " + bytesRead + " bytes from position " + position, e);
+      } catch (NullPointerException e) {
+        throw new IOException("something is null");
+      } catch (Exception e) {
+        throw e;
+      }
+      if (ret < 0) {
+        throw new IOException("Premature EOF from inputStream (positional read returned " + ret
+          + ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen
+          + " extra bytes, successfully read " + bytesRead);
+      }
+      bytesRead += ret;
+      remain -= ret;
+    }
+
+    return (extraLen > 0) && (bytesRead == necessaryLen + extraLen);
+  }
+
   private static int copyToByteBuff(byte[] buf, int offset, int len, ByteBuff out)
       throws IOException {
     if (offset < 0 || len < 0 || offset + len > buf.length) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockIOUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockIOUtils.java
index a386f49..2cd2610 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockIOUtils.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockIOUtils.java
@@ -21,12 +21,15 @@ import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeTrue;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.nio.ByteBuffer;
 
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -138,9 +141,11 @@ public class TestBlockIOUtils {
     ByteBuff bb = new SingleByteBuff(ByteBuffer.wrap(buf, 0, totalLen));
     FSDataInputStream in = mock(FSDataInputStream.class);
     when(in.read(position, buf, bufOffset, totalLen)).thenReturn(totalLen);
+    when(in.hasCapability(anyString())).thenReturn(false);
     boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen);
     assertFalse("Expect false return when no extra bytes requested", ret);
     verify(in).read(position, buf, bufOffset, totalLen);
+    verify(in).hasCapability(anyString());
     verifyNoMoreInteractions(in);
   }
 
@@ -156,10 +161,12 @@ public class TestBlockIOUtils {
     FSDataInputStream in = mock(FSDataInputStream.class);
     when(in.read(position, buf, bufOffset, totalLen)).thenReturn(5);
     when(in.read(5, buf, 5, 5)).thenReturn(5);
+    when(in.hasCapability(anyString())).thenReturn(false);
     boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen);
     assertFalse("Expect false return when no extra bytes requested", ret);
     verify(in).read(position, buf, bufOffset, totalLen);
     verify(in).read(5, buf, 5, 5);
+    verify(in).hasCapability(anyString());
     verifyNoMoreInteractions(in);
   }
 
@@ -174,9 +181,11 @@ public class TestBlockIOUtils {
     ByteBuff bb = new SingleByteBuff(ByteBuffer.wrap(buf, 0, totalLen));
     FSDataInputStream in = mock(FSDataInputStream.class);
     when(in.read(position, buf, bufOffset, totalLen)).thenReturn(totalLen);
+    when(in.hasCapability(anyString())).thenReturn(false);
     boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen);
     assertTrue("Expect true return when reading extra bytes succeeds", ret);
     verify(in).read(position, buf, bufOffset, totalLen);
+    verify(in).hasCapability(anyString());
     verifyNoMoreInteractions(in);
   }
 
@@ -191,9 +200,11 @@ public class TestBlockIOUtils {
     ByteBuff bb = new SingleByteBuff(ByteBuffer.wrap(buf, 0, totalLen));
     FSDataInputStream in = mock(FSDataInputStream.class);
     when(in.read(position, buf, bufOffset, totalLen)).thenReturn(necessaryLen);
+    when(in.hasCapability(anyString())).thenReturn(false);
     boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen);
     assertFalse("Expect false return when reading extra bytes fails", ret);
     verify(in).read(position, buf, bufOffset, totalLen);
+    verify(in).hasCapability(anyString());
     verifyNoMoreInteractions(in);
   }
 
@@ -210,10 +221,12 @@ public class TestBlockIOUtils {
     FSDataInputStream in = mock(FSDataInputStream.class);
     when(in.read(position, buf, bufOffset, totalLen)).thenReturn(5);
     when(in.read(5, buf, 5, 10)).thenReturn(10);
+    when(in.hasCapability(anyString())).thenReturn(false);
     boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen);
     assertTrue("Expect true return when reading extra bytes succeeds", ret);
     verify(in).read(position, buf, bufOffset, totalLen);
     verify(in).read(5, buf, 5, 10);
+    verify(in).hasCapability(anyString());
     verifyNoMoreInteractions(in);
   }
 
@@ -229,8 +242,89 @@ public class TestBlockIOUtils {
     FSDataInputStream in = mock(FSDataInputStream.class);
     when(in.read(position, buf, bufOffset, totalLen)).thenReturn(9);
     when(in.read(position, buf, bufOffset, totalLen)).thenReturn(-1);
+    when(in.hasCapability(anyString())).thenReturn(false);
     exception.expect(IOException.class);
     exception.expectMessage("EOF");
     BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen);
   }
+
+  /**
+   * Determine if ByteBufferPositionedReadable API is available
+   * .
+   * @return true if FSDataInputStream implements ByteBufferPositionedReadable API.
+   */
+  private boolean isByteBufferPositionedReadable() {
+    try {
+      //long position, ByteBuffer buf
+      FSDataInputStream.class.getMethod("read", long.class, ByteBuffer.class);
+    } catch (NoSuchMethodException e) {
+      return false;
+    }
+    return true;
+  }
+
+  public static class MyFSDataInputStream extends FSDataInputStream {
+    public MyFSDataInputStream(InputStream in) {
+      super(in);
+    }
+
+    // This is the ByteBufferPositionReadable API we want to test.
+    // Because the API is only available in Hadoop 3.3, FSDataInputStream in older Hadoop
+    // does not implement the interface, and it wouldn't compile trying to mock the method.
+    // So explicitly declare the method here to make mocking possible.
+    public int read(long position, ByteBuffer buf) throws IOException {
+      return 0;
+    }
+  }
+
+  @Test
+  public void testByteBufferPositionedReadable() throws IOException {
+    assumeTrue("Skip the test because ByteBufferPositionedReadable is not available",
+      isByteBufferPositionedReadable());
+    long position = 0;
+    int necessaryLen = 10;
+    int extraLen = 1;
+    int totalLen = necessaryLen + extraLen;
+    int firstReadLen = 6;
+    int secondReadLen = totalLen - firstReadLen;
+    ByteBuffer buf = ByteBuffer.allocate(totalLen);
+    ByteBuff bb = new SingleByteBuff(buf);
+    MyFSDataInputStream in = mock(MyFSDataInputStream.class);
+
+    when(in.read(position, buf)).thenReturn(firstReadLen);
+    when(in.read(firstReadLen, buf)).thenReturn(secondReadLen);
+    when(in.hasCapability(anyString())).thenReturn(true);
+    boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen);
+    assertTrue("Expect true return when reading extra bytes succeeds", ret);
+    verify(in).read(position, buf);
+    verify(in).read(firstReadLen, buf);
+    verify(in).hasCapability(anyString());
+    verifyNoMoreInteractions(in);
+  }
+
+  @Test
+  public void testByteBufferPositionedReadableEOF() throws IOException {
+    assumeTrue("Skip the test because ByteBufferPositionedReadable is not available",
+      isByteBufferPositionedReadable());
+    long position = 0;
+    int necessaryLen = 10;
+    int extraLen = 0;
+    int totalLen = necessaryLen + extraLen;
+    int firstReadLen = 9;
+    ByteBuffer buf = ByteBuffer.allocate(totalLen);
+    ByteBuff bb = new SingleByteBuff(buf);
+    MyFSDataInputStream in = mock(MyFSDataInputStream.class);
+
+    when(in.read(position, buf)).thenReturn(firstReadLen);
+    when(in.read(position, buf)).thenReturn(-1);
+    when(in.hasCapability(anyString())).thenReturn(true);
+    exception.expect(IOException.class);
+    exception.expectMessage("EOF");
+    BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen);
+
+    verify(in).read(position, buf);
+    verify(in).read(firstReadLen, buf);
+    verify(in).hasCapability(anyString());
+    verifyNoMoreInteractions(in);
+  }
 }