You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by el...@apache.org on 2019/10/13 06:51:18 UTC

[hadoop-ozone] 01/01: HDDS-2204. Avoid buffer coping in checksum verification.

This is an automated email from the ASF dual-hosted git repository.

elek pushed a commit to branch HDDS-2204
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git

commit a76bdf71fd6ceac7edc9f688722c59c4771bf188
Author: Tsz Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Fri Oct 4 17:58:37 2019 +0800

    HDDS-2204. Avoid buffer coping in checksum verification.
---
 .../hdds/scm/storage/ContainerProtocolCalls.java   |   2 +-
 .../org/apache/hadoop/ozone/common/Checksum.java   | 218 ++++++++++-----------
 .../apache/hadoop/ozone/common/ChecksumData.java   |   4 +-
 .../apache/hadoop/ozone/common/TestChecksum.java   |   6 +-
 4 files changed, 108 insertions(+), 122 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
index d0ba60d..e549eec 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
@@ -337,7 +337,7 @@ public final class ContainerProtocolCalls  {
         KeyValue.newBuilder().setKey("OverWriteRequested").setValue("true")
             .build();
     Checksum checksum = new Checksum();
-    ChecksumData checksumData = checksum.computeChecksum(data, 0, data.length);
+    final ChecksumData checksumData = checksum.computeChecksum(data);
     ChunkInfo chunk =
         ChunkInfo.newBuilder()
             .setChunkName(blockID.getLocalID() + "_chunk")
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java
index 0e70515..7db1d2b 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java
@@ -18,22 +18,21 @@
 package org.apache.hadoop.ozone.common;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.primitives.Longs;
 
+import java.io.DataOutputStream;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.function.Function;
+import java.util.function.Supplier;
 
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ChecksumType;
-import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.util.PureJavaCrc32;
-import org.apache.hadoop.util.PureJavaCrc32C;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,16 +43,71 @@ import org.slf4j.LoggerFactory;
  * This class is not thread safe.
  */
 public class Checksum {
-
   public static final Logger LOG = LoggerFactory.getLogger(Checksum.class);
 
+  private static Function<ByteBuffer, ByteString> newMessageDigestFunction(
+      String algorithm) {
+    final MessageDigest md;
+    try {
+      md = MessageDigest.getInstance(algorithm);
+    } catch (NoSuchAlgorithmException e) {
+      throw new IllegalStateException(
+          "Failed to get MessageDigest for " + algorithm,  e);
+    }
+    return data -> {
+      md.reset();
+      md.update(data);
+      return ByteString.copyFrom(md.digest());
+    };
+  }
+
+  private static ByteString int2ByteString(int n) {
+    final ByteString.Output out = ByteString.newOutput();
+    try(DataOutputStream dataOut = new DataOutputStream(out)) {
+      dataOut.writeInt(n);
+    } catch (IOException e) {
+      throw new IllegalStateException(
+          "Failed to write integer n = " + n + " to a ByteString", e);
+    }
+    return out.toByteString();
+  }
+
+  private static Function<ByteBuffer, ByteString> newChecksumByteBufferFunction(
+      Supplier<ChecksumByteBuffer> constructor) {
+    final ChecksumByteBuffer algorithm = constructor.get();
+    return  data -> {
+      algorithm.reset();
+      algorithm.update(data);
+      return int2ByteString((int)algorithm.getValue());
+    };
+  }
+
+  /** The algorithms for {@link ChecksumType}. */
+  enum Algorithm {
+    NONE(() -> data -> ByteString.EMPTY),
+    CRC32(() -> newChecksumByteBufferFunction(PureJavaCrc32ByteBuffer::new)),
+    CRC32C(() -> newChecksumByteBufferFunction(PureJavaCrc32CByteBuffer::new)),
+    SHA256(() -> newMessageDigestFunction("SHA-256")),
+    MD5(() -> newMessageDigestFunction("MD5"));
+
+    private final Supplier<Function<ByteBuffer, ByteString>> constructor;
+
+    static Algorithm valueOf(ChecksumType type) {
+      return valueOf(type.name());
+    }
+
+    Algorithm(Supplier<Function<ByteBuffer, ByteString>> constructor) {
+      this.constructor = constructor;
+    }
+
+    Function<ByteBuffer, ByteString> newChecksumFunction() {
+      return constructor.get();
+    }
+  }
+
   private final ChecksumType checksumType;
   private final int bytesPerChecksum;
 
-  private PureJavaCrc32 crc32Checksum;
-  private PureJavaCrc32C crc32cChecksum;
-  private MessageDigest sha;
-
   /**
    * Constructs a Checksum object.
    * @param type type of Checksum
@@ -78,13 +132,12 @@ public class Checksum {
 
   /**
    * Computes checksum for give data.
-   * @param byteBuffer input data in the form of ByteString.
+   * @param data input data.
    * @return ChecksumData computed for input data.
    */
-  public ChecksumData computeChecksum(ByteBuffer byteBuffer)
+  public ChecksumData computeChecksum(byte[] data, int off, int len)
       throws OzoneChecksumException {
-    return computeChecksum(byteBuffer.array(), byteBuffer.position(),
-        byteBuffer.limit());
+    return computeChecksum(ByteBuffer.wrap(data, off, len));
   }
 
   /**
@@ -94,46 +147,38 @@ public class Checksum {
    */
   public ChecksumData computeChecksum(byte[] data)
       throws OzoneChecksumException {
-    return computeChecksum(data, 0, data.length);
+    return computeChecksum(ByteBuffer.wrap(data));
   }
 
   /**
    * Computes checksum for give data.
-   * @param data input data in the form of byte array.
+   * @param data input data.
    * @return ChecksumData computed for input data.
+   * @throws OzoneChecksumException thrown when ChecksumType is not recognized
    */
-  public ChecksumData computeChecksum(byte[] data, int offset, int len)
+  public ChecksumData computeChecksum(ByteBuffer data)
       throws OzoneChecksumException {
-    ChecksumData checksumData = new ChecksumData(this.checksumType, this
-        .bytesPerChecksum);
+    if (!data.isReadOnly()) {
+      data = data.asReadOnlyBuffer();
+    }
+
+    final ChecksumData checksumData = new ChecksumData(
+        checksumType, bytesPerChecksum);
     if (checksumType == ChecksumType.NONE) {
       // Since type is set to NONE, we do not need to compute the checksums
       return checksumData;
     }
 
-    switch (checksumType) {
-    case CRC32:
-      crc32Checksum = new PureJavaCrc32();
-      break;
-    case CRC32C:
-      crc32cChecksum = new PureJavaCrc32C();
-      break;
-    case SHA256:
-      try {
-        sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
-      } catch (NoSuchAlgorithmException e) {
-        throw new OzoneChecksumException(OzoneConsts.FILE_HASH, e);
-      }
-      break;
-    case MD5:
-      break;
-    default:
+    final Function<ByteBuffer, ByteString> function;
+    try {
+      function = Algorithm.valueOf(checksumType).newChecksumFunction();
+    } catch (Exception e) {
       throw new OzoneChecksumException(checksumType);
     }
 
     // Compute number of checksums needs for given data length based on bytes
     // per checksum.
-    int dataSize = len - offset;
+    final int dataSize = data.remaining();
     int numChecksums = (dataSize + bytesPerChecksum - 1) / bytesPerChecksum;
 
     // Checksum is computed for each bytesPerChecksum number of bytes of data
@@ -141,7 +186,7 @@ public class Checksum {
     // remaining data with length less than bytesPerChecksum.
     List<ByteString> checksumList = new ArrayList<>(numChecksums);
     for (int index = 0; index < numChecksums; index++) {
-      checksumList.add(computeChecksumAtIndex(data, index, offset, len));
+      checksumList.add(computeChecksum(data, function, bytesPerChecksum));
     }
     checksumData.setChecksums(checksumList);
 
@@ -149,78 +194,24 @@ public class Checksum {
   }
 
   /**
-   * Computes checksum based on checksumType for a data block at given index
-   * and a max length of bytesPerChecksum.
+   * Compute checksum using the algorithm for the data upto the max length.
    * @param data input data
-   * @param index index to compute the offset from where data must be read
-   * @param start start pos of the array where the computation has to start
-   * @length length of array till which checksum needs to be computed
+   * @param function the checksum function
+   * @param maxLength the max length of data
    * @return computed checksum ByteString
-   * @throws OzoneChecksumException thrown when ChecksumType is not recognized
    */
-  private ByteString computeChecksumAtIndex(byte[] data, int index, int start,
-      int length)
-      throws OzoneChecksumException {
-    int offset = start + index * bytesPerChecksum;
-    int dataLength = length - start;
-    int len = bytesPerChecksum;
-    if ((offset + len) > dataLength) {
-      len = dataLength - offset;
-    }
-    byte[] checksumBytes = null;
-    switch (checksumType) {
-    case CRC32:
-      checksumBytes = computeCRC32Checksum(data, offset, len);
-      break;
-    case CRC32C:
-      checksumBytes = computeCRC32CChecksum(data, offset, len);
-      break;
-    case SHA256:
-      checksumBytes = computeSHA256Checksum(data, offset, len);
-      break;
-    case MD5:
-      checksumBytes = computeMD5Checksum(data, offset, len);
-      break;
-    default:
-      throw new OzoneChecksumException(checksumType);
+  private static ByteString computeChecksum(ByteBuffer data,
+      Function<ByteBuffer, ByteString> function, int maxLength) {
+    final int limit = data.limit();
+    try {
+      final int maxIndex = data.position() + maxLength;
+      if (limit > maxIndex) {
+        data.limit(maxIndex);
+      }
+      return function.apply(data);
+    } finally {
+      data.limit(limit);
     }
-
-    return ByteString.copyFrom(checksumBytes);
-  }
-
-  /**
-   * Computes CRC32 checksum.
-   */
-  private byte[] computeCRC32Checksum(byte[] data, int offset, int len) {
-    crc32Checksum.reset();
-    crc32Checksum.update(data, offset, len);
-    return Longs.toByteArray(crc32Checksum.getValue());
-  }
-
-  /**
-   * Computes CRC32C checksum.
-   */
-  private byte[] computeCRC32CChecksum(byte[] data, int offset, int len) {
-    crc32cChecksum.reset();
-    crc32cChecksum.update(data, offset, len);
-    return Longs.toByteArray(crc32cChecksum.getValue());
-  }
-
-  /**
-   * Computes SHA-256 checksum.
-   */
-  private byte[] computeSHA256Checksum(byte[] data, int offset, int len) {
-    sha.reset();
-    sha.update(data, offset, len);
-    return sha.digest();
-  }
-
-  /**
-   * Computes MD5 checksum.
-   */
-  private byte[] computeMD5Checksum(byte[] data, int offset, int len) {
-    MD5Hash md5out = MD5Hash.digest(data, offset, len);
-    return md5out.getDigest();
   }
 
   /**
@@ -235,7 +226,8 @@ public class Checksum {
    */
   public static boolean verifyChecksum(ByteString byteString,
       ChecksumData checksumData, int startIndex) throws OzoneChecksumException {
-    return verifyChecksum(byteString.toByteArray(), checksumData, startIndex);
+    final ByteBuffer buffer = byteString.asReadOnlyByteBuffer();
+    return verifyChecksum(buffer, checksumData, startIndex);
   }
 
   /**
@@ -247,7 +239,7 @@ public class Checksum {
    */
   public static boolean verifyChecksum(byte[] data, ChecksumData checksumData)
       throws OzoneChecksumException {
-    return verifyChecksum(data, checksumData, 0);
+    return verifyChecksum(ByteBuffer.wrap(data), checksumData, 0);
   }
 
   /**
@@ -259,7 +251,8 @@ public class Checksum {
    *                   data's computed checksum.
    * @throws OzoneChecksumException is thrown if checksums do not match
    */
-  public static boolean verifyChecksum(byte[] data, ChecksumData checksumData,
+  private static boolean verifyChecksum(ByteBuffer data,
+      ChecksumData checksumData,
       int startIndex) throws OzoneChecksumException {
     ChecksumType checksumType = checksumData.getChecksumType();
     if (checksumType == ChecksumType.NONE) {
@@ -269,11 +262,8 @@ public class Checksum {
 
     int bytesPerChecksum = checksumData.getBytesPerChecksum();
     Checksum checksum = new Checksum(checksumType, bytesPerChecksum);
-    ChecksumData computedChecksumData =
-        checksum.computeChecksum(data, 0, data.length);
-
-    return checksumData.verifyChecksumDataMatches(computedChecksumData,
-        startIndex);
+    final ChecksumData computed = checksum.computeChecksum(data);
+    return checksumData.verifyChecksumDataMatches(computed, startIndex);
   }
 
   /**
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChecksumData.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChecksumData.java
index 4a927fb..3658b40 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChecksumData.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChecksumData.java
@@ -36,8 +36,8 @@ public class ChecksumData {
   private ChecksumType type;
   // Checksum will be computed for every bytesPerChecksum number of bytes and
   // stored sequentially in checksumList
-  private int bytesPerChecksum;
-  private List<ByteString> checksums;
+  private final int bytesPerChecksum;
+  private final List<ByteString> checksums;
 
   public ChecksumData(ChecksumType checksumType, int bytesPerChecksum) {
     this(checksumType, bytesPerChecksum, Lists.newArrayList());
diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChecksum.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChecksum.java
index 819c29f..a924321 100644
--- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChecksum.java
+++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChecksum.java
@@ -82,16 +82,12 @@ public class TestChecksum {
    * not match.
    */
   @Test
-  public void testChecksumMismatchForDifferentChecksumTypes() throws Exception {
-    byte[] data = RandomStringUtils.randomAlphabetic(55).getBytes();
-
+  public void testChecksumMismatchForDifferentChecksumTypes() {
     // Checksum1 of type SHA-256
     Checksum checksum1 = getChecksum(null);
-    ChecksumData checksumData1 = checksum1.computeChecksum(data);
 
     // Checksum2 of type CRC32
     Checksum checksum2 = getChecksum(ContainerProtos.ChecksumType.CRC32);
-    ChecksumData checksumData2 = checksum2.computeChecksum(data);
 
     // The two checksums should not match as they have different types
     Assert.assertNotEquals(


---------------------------------------------------------------------
To unsubscribe, e-mail: hdfs-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: hdfs-commits-help@hadoop.apache.org