You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by op...@apache.org on 2019/03/06 12:22:20 UTC

[hbase] branch HBASE-21879 updated: HBASE-21917 Make the HFileBlock#validateChecksum can accept ByteBuff as an input.

This is an automated email from the ASF dual-hosted git repository.

openinx pushed a commit to branch HBASE-21879
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/HBASE-21879 by this push:
     new 72fe2cc  HBASE-21917 Make the HFileBlock#validateChecksum can accept ByteBuff as an input.
72fe2cc is described below

commit 72fe2cce2148916b2337732cac48f0f43e5fe2d5
Author: huzheng <op...@gmail.com>
AuthorDate: Mon Feb 18 17:12:23 2019 +0800

    HBASE-21917 Make the HFileBlock#validateChecksum can accept ByteBuff as an input.
---
 .../apache/hadoop/hbase/io/hfile/ChecksumUtil.java | 153 ++++++++++++++-------
 .../apache/hadoop/hbase/io/hfile/HFileBlock.java   |  14 +-
 .../apache/hadoop/hbase/io/hfile/TestChecksum.java |  64 ++++++---
 3 files changed, 151 insertions(+), 80 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ChecksumUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ChecksumUtil.java
index 5eb1826..5317f0e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ChecksumUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ChecksumUtil.java
@@ -17,11 +17,12 @@
  */
 package org.apache.hadoop.hbase.io.hfile;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.nio.SingleByteBuff;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,8 +36,7 @@ import org.apache.hadoop.util.DataChecksum;
 public class ChecksumUtil {
   public static final Logger LOG = LoggerFactory.getLogger(ChecksumUtil.class);
 
-  /** This is used to reserve space in a byte buffer */
-  private static byte[] DUMMY_VALUE = new byte[128 * HFileBlock.CHECKSUM_SIZE];
+  public static final int CHECKSUM_BUF_SIZE = 256;
 
   /**
    * This is used by unit tests to make checksum failures throw an
@@ -78,50 +78,118 @@ public class ChecksumUtil {
   }
 
   /**
+   * Like the hadoop's {@link DataChecksum#verifyChunkedSums(ByteBuffer, ByteBuffer, String, long)},
+   * this method will also verify checksum of each chunk in data. the difference is: this method can
+   * accept {@link ByteBuff} as arguments, we can not add it in hadoop-common so defined here.
+   * @param dataChecksum to calculate the checksum.
+   * @param data as the input
+   * @param checksums to compare
+   * @param pathName indicate that the data is read from which file.
+   * @return a flag indicate the checksum match or mismatch.
+   * @see org.apache.hadoop.util.DataChecksum#verifyChunkedSums(ByteBuffer, ByteBuffer, String,
+   *      long)
+   */
+  private static boolean verifyChunkedSums(DataChecksum dataChecksum, ByteBuff data,
+      ByteBuff checksums, String pathName) {
+    // Almost all of the HFile Block are about 64KB, so it would be a SingleByteBuff, use the
+    // Hadoop's verify checksum directly, because it'll use the native checksum, which has no extra
+    // byte[] allocation or copying. (HBASE-21917)
+    if (data instanceof SingleByteBuff && checksums instanceof SingleByteBuff) {
+      // the checksums ByteBuff must also be an SingleByteBuff because it's duplicated from data.
+      ByteBuffer dataBB = (ByteBuffer) (data.nioByteBuffers()[0]).duplicate()
+          .position(data.position()).limit(data.limit());
+      ByteBuffer checksumBB = (ByteBuffer) (checksums.nioByteBuffers()[0]).duplicate()
+          .position(checksums.position()).limit(checksums.limit());
+      try {
+        dataChecksum.verifyChunkedSums(dataBB, checksumBB, pathName, 0);
+        return true;
+      } catch (ChecksumException e) {
+        return false;
+      }
+    }
+
+    // Only when the dataBlock is larger than 4MB (default buffer size in BucketCache), the block
+    // will be an MultiByteBuff. we use a small byte[] to update the checksum many times for
+    // reducing GC pressure. it's a rare case.
+    int checksumTypeSize = dataChecksum.getChecksumType().size;
+    if (checksumTypeSize == 0) {
+      return true;
+    }
+    // we have 5 checksum type now: NULL,DEFAULT,MIXED,CRC32,CRC32C. the former three need 0 byte,
+    // and the other two need 4 bytes.
+    assert checksumTypeSize == 4;
+
+    int bytesPerChecksum = dataChecksum.getBytesPerChecksum();
+    int startDataPos = data.position();
+    data.mark();
+    checksums.mark();
+    try {
+      // allocate an small buffer for reducing young GC (HBASE-21917), and copy 256 bytes from
+      // ByteBuff to update the checksum each time. if we upgrade to an future JDK and hadoop
+      // version which support DataCheckSum#update(ByteBuffer), we won't need to update the checksum
+      // multiple times then.
+      byte[] buf = new byte[CHECKSUM_BUF_SIZE];
+      byte[] sum = new byte[checksumTypeSize];
+      while (data.remaining() > 0) {
+        int n = Math.min(data.remaining(), bytesPerChecksum);
+        checksums.get(sum);
+        dataChecksum.reset();
+        for (int remain = n, len; remain > 0; remain -= len) {
+          // Copy 256 bytes from ByteBuff to update the checksum each time, if the remaining
+          // bytes is less than 256, then just update the remaining bytes.
+          len = Math.min(CHECKSUM_BUF_SIZE, remain);
+          data.get(buf, 0, len);
+          dataChecksum.update(buf, 0, len);
+        }
+        int calculated = (int) dataChecksum.getValue();
+        int stored = (sum[0] << 24 & 0xff000000) | (sum[1] << 16 & 0xff0000)
+            | (sum[2] << 8 & 0xff00) | (sum[3] & 0xff);
+        if (calculated != stored) {
+          if (LOG.isTraceEnabled()) {
+            long errPos = data.position() - startDataPos - n;
+            LOG.trace("Checksum error: {} at {} expected: {} got: {}", pathName, errPos, stored,
+              calculated);
+          }
+          return false;
+        }
+      }
+    } finally {
+      data.reset();
+      checksums.reset();
+    }
+    return true;
+  }
+
+  /**
    * Validates that the data in the specified HFileBlock matches the checksum. Generates the
    * checksums for the data and then validate that it matches those stored in the end of the data.
-   * @param buffer Contains the data in following order: HFileBlock header, data, checksums.
+   * @param buf Contains the data in following order: HFileBlock header, data, checksums.
    * @param pathName Path of the HFile to which the {@code data} belongs. Only used for logging.
    * @param offset offset of the data being validated. Only used for logging.
    * @param hdrSize Size of the block header in {@code data}. Only used for logging.
    * @return True if checksum matches, else false.
    */
-  static boolean validateChecksum(ByteBuffer buffer, String pathName, long offset, int hdrSize)
-      throws IOException {
-    // A ChecksumType.NULL indicates that the caller is not interested in validating checksums,
-    // so we always return true.
-    ChecksumType cktype =
-        ChecksumType.codeToType(buffer.get(HFileBlock.Header.CHECKSUM_TYPE_INDEX));
-    if (cktype == ChecksumType.NULL) {
-      return true; // No checksum validations needed for this block.
+  static boolean validateChecksum(ByteBuff buf, String pathName, long offset, int hdrSize) {
+    ChecksumType ctype = ChecksumType.codeToType(buf.get(HFileBlock.Header.CHECKSUM_TYPE_INDEX));
+    if (ctype == ChecksumType.NULL) {
+      return true;// No checksum validations needed for this block.
     }
 
     // read in the stored value of the checksum size from the header.
-    int bytesPerChecksum = buffer.getInt(HFileBlock.Header.BYTES_PER_CHECKSUM_INDEX);
-
-    DataChecksum dataChecksum = DataChecksum.newDataChecksum(
-        cktype.getDataChecksumType(), bytesPerChecksum);
+    int bytesPerChecksum = buf.getInt(HFileBlock.Header.BYTES_PER_CHECKSUM_INDEX);
+    DataChecksum dataChecksum =
+        DataChecksum.newDataChecksum(ctype.getDataChecksumType(), bytesPerChecksum);
     assert dataChecksum != null;
     int onDiskDataSizeWithHeader =
-        buffer.getInt(HFileBlock.Header.ON_DISK_DATA_SIZE_WITH_HEADER_INDEX);
+        buf.getInt(HFileBlock.Header.ON_DISK_DATA_SIZE_WITH_HEADER_INDEX);
     if (LOG.isTraceEnabled()) {
-      LOG.info("dataLength=" + buffer.capacity()
-          + ", sizeWithHeader=" + onDiskDataSizeWithHeader
-          + ", checksumType=" + cktype.getName()
-          + ", file=" + pathName
-          + ", offset=" + offset
-          + ", headerSize=" + hdrSize
-          + ", bytesPerChecksum=" + bytesPerChecksum);
-    }
-    try {
-      ByteBuffer data = (ByteBuffer) buffer.duplicate().position(0).limit(onDiskDataSizeWithHeader);
-      ByteBuffer checksums = (ByteBuffer) buffer.duplicate().position(onDiskDataSizeWithHeader)
-          .limit(buffer.capacity());
-      dataChecksum.verifyChunkedSums(data, checksums, pathName, 0);
-    } catch (ChecksumException e) {
-      return false;
+      LOG.info("dataLength=" + buf.capacity() + ", sizeWithHeader=" + onDiskDataSizeWithHeader
+          + ", checksumType=" + ctype.getName() + ", file=" + pathName + ", offset=" + offset
+          + ", headerSize=" + hdrSize + ", bytesPerChecksum=" + bytesPerChecksum);
     }
-    return true;  // checksum is valid
+    ByteBuff data = buf.duplicate().position(0).limit(onDiskDataSizeWithHeader);
+    ByteBuff checksums = buf.duplicate().position(onDiskDataSizeWithHeader).limit(buf.limit());
+    return verifyChunkedSums(dataChecksum, data, checksums, pathName);
   }
 
   /**
@@ -151,25 +219,6 @@ public class ChecksumUtil {
   }
 
   /**
-   * Write dummy checksums to the end of the specified bytes array
-   * to reserve space for writing checksums later
-   * @param baos OutputStream to write dummy checkum values
-   * @param numBytes Number of bytes of data for which dummy checksums
-   *                 need to be generated
-   * @param bytesPerChecksum Number of bytes per checksum value
-   */
-  static void reserveSpaceForChecksums(ByteArrayOutputStream baos,
-    int numBytes, int bytesPerChecksum) throws IOException {
-    long numChunks = numChunks(numBytes, bytesPerChecksum);
-    long bytesLeft = numChunks * HFileBlock.CHECKSUM_SIZE;
-    while (bytesLeft > 0) {
-      long count = Math.min(bytesLeft, DUMMY_VALUE.length);
-      baos.write(DUMMY_VALUE, 0, (int)count);
-      bytesLeft -= count;
-    }
-  }
-
-  /**
    * Mechanism to throw an exception in case of hbase checksum
    * failure. This is used by unit tests only.
    * @param value Setting this to true will cause hbase checksum
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
index 968a87e..91e63fd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
@@ -1784,10 +1784,10 @@ public class HFileBlock implements Cacheable {
       // Do a few checks before we go instantiate HFileBlock.
       assert onDiskSizeWithHeader > this.hdrSize;
       verifyOnDiskSizeMatchesHeader(onDiskSizeWithHeader, headerBuf, offset, checksumSupport);
-      ByteBuffer onDiskBlockByteBuffer = ByteBuffer.wrap(onDiskBlock, 0, onDiskSizeWithHeader);
+      ByteBuff onDiskBlockByteBuff =
+          new SingleByteBuff(ByteBuffer.wrap(onDiskBlock, 0, onDiskSizeWithHeader));
       // Verify checksum of the data before using it for building HFileBlock.
-      if (verifyChecksum &&
-          !validateChecksum(offset, onDiskBlockByteBuffer, hdrSize)) {
+      if (verifyChecksum && !validateChecksum(offset, onDiskBlockByteBuff, hdrSize)) {
         return null;
       }
       long duration = System.currentTimeMillis() - startTime;
@@ -1797,9 +1797,8 @@ public class HFileBlock implements Cacheable {
       // The onDiskBlock will become the headerAndDataBuffer for this block.
       // If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already
       // contains the header of next block, so no need to set next block's header in it.
-      HFileBlock hFileBlock =
-          new HFileBlock(new SingleByteBuff(onDiskBlockByteBuffer), checksumSupport,
-              MemoryType.EXCLUSIVE, offset, nextBlockOnDiskSize, fileContext);
+      HFileBlock hFileBlock = new HFileBlock(onDiskBlockByteBuff, checksumSupport,
+          MemoryType.EXCLUSIVE, offset, nextBlockOnDiskSize, fileContext);
       // Run check on uncompressed sizings.
       if (!fileContext.isCompressedOrEncrypted()) {
         hFileBlock.sanityCheckUncompressed();
@@ -1838,8 +1837,7 @@ public class HFileBlock implements Cacheable {
      * If the block doesn't uses checksum, returns false.
      * @return True if checksum matches, else false.
      */
-    private boolean validateChecksum(long offset, ByteBuffer data, int hdrSize)
-        throws IOException {
+    private boolean validateChecksum(long offset, ByteBuff data, int hdrSize) {
       // If this is an older version of the block that does not have checksums, then return false
       // indicating that checksum verification did not succeed. Actually, this method should never
       // be called when the minorVersion is 0, thus this is a defensive check for a cannot-happen
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java
index de28422..e93b61e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java
@@ -21,16 +21,15 @@ import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.GZ;
 import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.BufferUnderflowException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
+import java.nio.ByteBuffer;
+
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -42,6 +41,8 @@ import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.nio.MultiByteBuff;
+import org.apache.hadoop.hbase.nio.SingleByteBuff;
 import org.apache.hadoop.hbase.testclassification.IOTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.ChecksumType;
@@ -102,22 +103,35 @@ public class TestChecksum {
     assertEquals(b.getChecksumType(), ChecksumType.getDefaultChecksumType().getCode());
   }
 
-  /**
-   * Test all checksum types by writing and reading back blocks.
-   */
+  private void verifyMBBCheckSum(ByteBuff buf) throws IOException {
+    int size = buf.remaining() / 2 + 1;
+    ByteBuff mbb = new MultiByteBuff(ByteBuffer.allocate(size), ByteBuffer.allocate(size))
+          .position(0).limit(buf.remaining());
+    for (int i = buf.position(); i < buf.limit(); i++) {
+      mbb.put(buf.get(i));
+    }
+    mbb.position(0).limit(buf.remaining());
+    assertEquals(mbb.remaining(), buf.remaining());
+    assertTrue(mbb.remaining() > size);
+    ChecksumUtil.validateChecksum(mbb, "test", 0, HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM);
+  }
+
+  private void verifySBBCheckSum(ByteBuff buf) throws IOException {
+    ChecksumUtil.validateChecksum(buf, "test", 0, HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM);
+  }
+
   @Test
-  public void testAllChecksumTypes() throws IOException {
-    List<ChecksumType> cktypes = new ArrayList<>(Arrays.asList(ChecksumType.values()));
-    for (Iterator<ChecksumType> itr = cktypes.iterator(); itr.hasNext(); ) {
-      ChecksumType cktype = itr.next();
-      Path path = new Path(TEST_UTIL.getDataTestDir(), "checksum" + cktype.getName());
+  public void testVerifyCheckSum() throws IOException {
+    int intCount = 10000;
+    for (ChecksumType ckt : ChecksumType.values()) {
+      Path path = new Path(TEST_UTIL.getDataTestDir(), "checksum" + ckt.getName());
       FSDataOutputStream os = fs.create(path);
       HFileContext meta = new HFileContextBuilder()
-          .withChecksumType(cktype)
-          .build();
+            .withChecksumType(ckt)
+            .build();
       HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta);
       DataOutputStream dos = hbw.startWriting(BlockType.DATA);
-      for (int i = 0; i < 1000; ++i) {
+      for (int i = 0; i < intCount; ++i) {
         dos.writeInt(i);
       }
       hbw.writeHeaderAndData(os);
@@ -130,19 +144,25 @@ public class TestChecksum {
       FSDataInputStreamWrapper is = new FSDataInputStreamWrapper(fs, path);
       meta = new HFileContextBuilder().withHBaseCheckSum(true).build();
       HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(
-          is, totalSize, (HFileSystem) fs, path, meta);
+            is, totalSize, (HFileSystem) fs, path, meta);
       HFileBlock b = hbr.readBlockData(0, -1, false, false);
+
+      // verify SingleByteBuff checksum.
+      verifySBBCheckSum(b.getBufferReadOnly());
+
+      // verify MultiByteBuff checksum.
+      verifyMBBCheckSum(b.getBufferReadOnly());
+
       ByteBuff data = b.getBufferWithoutHeader();
-      for (int i = 0; i < 1000; i++) {
+      for (int i = 0; i < intCount; i++) {
         assertEquals(i, data.getInt());
       }
-      boolean exception_thrown = false;
       try {
         data.getInt();
+        fail();
       } catch (BufferUnderflowException e) {
-        exception_thrown = true;
+        // expected failure
       }
-      assertTrue(exception_thrown);
       assertEquals(0, HFile.getAndResetChecksumFailuresCount());
     }
   }
@@ -216,16 +236,19 @@ public class TestChecksum {
         for (int i = 0; i <
              HFileBlock.CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD + 1; i++) {
           b = hbr.readBlockData(0, -1, pread, false);
+          assertTrue(b.getBufferReadOnly() instanceof SingleByteBuff);
           assertEquals(0, HFile.getAndResetChecksumFailuresCount());
         }
         // The next read should have hbase checksum verification reanabled,
         // we verify this by assertng that there was a hbase-checksum failure.
         b = hbr.readBlockData(0, -1, pread, false);
+        assertTrue(b.getBufferReadOnly() instanceof SingleByteBuff);
         assertEquals(1, HFile.getAndResetChecksumFailuresCount());
 
         // Since the above encountered a checksum failure, we switch
         // back to not checking hbase checksums.
         b = hbr.readBlockData(0, -1, pread, false);
+        assertTrue(b.getBufferReadOnly() instanceof SingleByteBuff);
         assertEquals(0, HFile.getAndResetChecksumFailuresCount());
         is.close();
 
@@ -319,6 +342,7 @@ public class TestChecksum {
         HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(new FSDataInputStreamWrapper(
             is, nochecksum), totalSize, hfs, path, meta);
         HFileBlock b = hbr.readBlockData(0, -1, pread, false);
+        assertTrue(b.getBufferReadOnly() instanceof SingleByteBuff);
         is.close();
         b.sanityCheck();
         assertEquals(dataSize, b.getUncompressedSizeWithoutHeader());