You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by ja...@apache.org on 2022/07/26 06:29:07 UTC

[ozone] branch master updated: HDDS-6980. support block composite-crc checksum (#3581)

This is an automated email from the ASF dual-hosted git repository.

jacksonyao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 72ca001178 HDDS-6980. support block composite-crc checksum (#3581)
72ca001178 is described below

commit 72ca001178c8b6f658da7de8fff34a1f956ae404
Author: Jie Yao <ja...@tencent.com>
AuthorDate: Tue Jul 26 14:29:01 2022 +0800

    HDDS-6980. support block composite-crc checksum (#3581)
---
 .../checksum/AbstractBlockChecksumComputer.java    |  5 +-
 .../checksum/ReplicatedBlockChecksumComputer.java  | 82 ++++++++++++++++++++--
 .../checksum/ReplicatedFileChecksumHelper.java     | 16 +++--
 .../TestReplicatedBlockChecksumComputer.java       | 56 +++++++++++----
 4 files changed, 136 insertions(+), 23 deletions(-)

diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/AbstractBlockChecksumComputer.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/AbstractBlockChecksumComputer.java
index 4be13e6331..a3a1ca177b 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/AbstractBlockChecksumComputer.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/AbstractBlockChecksumComputer.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.ozone.client.checksum;
 
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
@@ -31,7 +33,8 @@ public abstract class AbstractBlockChecksumComputer {
    * Compute block checksum. The result can be obtained by getOutBytes().
    * @throws IOException
    */
-  public abstract void compute() throws IOException;
+  public abstract void compute(
+      OzoneClientConfig.ChecksumCombineMode combineMode) throws IOException;
 
   public ByteBuffer getOutByteBuffer() {
     return outByteBuffer;
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/ReplicatedBlockChecksumComputer.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/ReplicatedBlockChecksumComputer.java
index 2d0e198dbc..3efb57acc0 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/ReplicatedBlockChecksumComputer.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/ReplicatedBlockChecksumComputer.java
@@ -17,8 +17,13 @@
  */
 package org.apache.hadoop.ozone.client.checksum;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.io.MD5Hash;
+import org.apache.hadoop.util.CrcComposer;
+import org.apache.hadoop.util.CrcUtil;
+import org.apache.hadoop.util.DataChecksum;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,14 +44,23 @@ public class ReplicatedBlockChecksumComputer extends
   private List<ContainerProtos.ChunkInfo> chunkInfoList;
 
   public ReplicatedBlockChecksumComputer(
-      List<ContainerProtos.ChunkInfo> chunkInfoList)
-      throws IOException {
+      List<ContainerProtos.ChunkInfo> chunkInfoList) {
     this.chunkInfoList = chunkInfoList;
   }
 
   @Override
-  public void compute() throws IOException {
-    computeMd5Crc();
+  public void compute(OzoneClientConfig.ChecksumCombineMode combineMode)
+      throws IOException {
+    switch (combineMode) {
+    case MD5MD5CRC:
+      computeMd5Crc();
+      return;
+    case COMPOSITE_CRC:
+      computeCompositeCrc();
+      return;
+    default:
+      throw new IllegalArgumentException("unsupported combine mode");
+    }
   }
 
   // compute the block checksum, which is the md5 of chunk checksums
@@ -69,4 +83,64 @@ public class ReplicatedBlockChecksumComputer extends
     LOG.debug("number of chunks={}, md5out={}",
         chunkInfoList.size(), fileMD5);
   }
+
+  // compute the block checksum of CompositeCrc,
+  // which is the incremental computation of chunk checksums
+  private void computeCompositeCrc() throws IOException {
+    DataChecksum.Type dataChecksumType;
+    long bytesPerCrc;
+    long chunkSize;
+    Preconditions.checkArgument(chunkInfoList.size() > 0);
+
+    final ContainerProtos.ChunkInfo firstChunkInfo = chunkInfoList.get(0);
+    switch (firstChunkInfo.getChecksumData().getType()) {
+    case CRC32C:
+      dataChecksumType = DataChecksum.Type.CRC32C;
+      break;
+    case CRC32:
+      dataChecksumType = DataChecksum.Type.CRC32;
+      break;
+    default:
+      throw new IllegalArgumentException("unsupported checksum type: " +
+          firstChunkInfo.getChecksumData().getType());
+    }
+    chunkSize = firstChunkInfo.getLen();
+    bytesPerCrc = firstChunkInfo.getChecksumData().getBytesPerChecksum();
+
+
+    CrcComposer blockCrcComposer =
+        CrcComposer.newCrcComposer(dataChecksumType, chunkSize);
+
+    for (ContainerProtos.ChunkInfo chunkInfo : chunkInfoList) {
+      ContainerProtos.ChecksumData checksumData =
+          chunkInfo.getChecksumData();
+      List<ByteString> checksums = checksumData.getChecksumsList();
+      CrcComposer chunkCrcComposer =
+          CrcComposer.newCrcComposer(dataChecksumType, bytesPerCrc);
+      //compute the composite-crc checksum of the whole chunk by iterating
+      //all the checksum data one by one
+      long remainingChunkSize = chunkInfo.getLen();
+      Preconditions.checkArgument(remainingChunkSize <=
+          checksums.size() * chunkSize);
+      for (ByteString checksum : checksums) {
+        int checksumDataCrc = CrcUtil.readInt(checksum.toByteArray(), 0);
+        chunkCrcComposer.update(checksumDataCrc,
+            Math.min(bytesPerCrc, remainingChunkSize));
+        remainingChunkSize -= bytesPerCrc;
+      }
+      //get the composite-crc checksum of the whole chunk
+      int chunkChecksumCrc = CrcUtil.readInt(chunkCrcComposer.digest(), 0);
+
+      //update block checksum using chunk checksum
+      blockCrcComposer.update(chunkChecksumCrc, chunkInfo.getLen());
+    }
+
+    //compute the composite-crc checksum of the whole block
+    byte[] compositeCrcChunkChecksum = blockCrcComposer.digest();
+    setOutBytes(compositeCrcChunkChecksum);
+
+    LOG.debug("number of chunks = {}, chunk checksum type is {}, " +
+            "composite checksum = {}", chunkInfoList.size(), dataChecksumType,
+        compositeCrcChunkChecksum);
+  }
 }
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/ReplicatedFileChecksumHelper.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/ReplicatedFileChecksumHelper.java
index 0364906bf9..84a44515f0 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/ReplicatedFileChecksumHelper.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/ReplicatedFileChecksumHelper.java
@@ -78,7 +78,8 @@ public class ReplicatedFileChecksumHelper extends BaseFileChecksumHelper {
 
       if (!checksumBlock(keyLocationInfo)) {
         throw new PathIOException(getSrc(),
-            "Fail to get block MD5 for " + keyLocationInfo);
+            "Fail to get block checksum for " + keyLocationInfo
+                + ", checksum combine mode : {}" + getCombineMode());
       }
     }
   }
@@ -89,6 +90,12 @@ public class ReplicatedFileChecksumHelper extends BaseFileChecksumHelper {
    */
   private boolean checksumBlock(OmKeyLocationInfo keyLocationInfo)
       throws IOException {
+    // for each block, send request
+    List<ContainerProtos.ChunkInfo> chunkInfos =
+        getChunkInfos(keyLocationInfo);
+    if (chunkInfos.size() == 0) {
+      return false;
+    }
 
     long blockNumBytes = keyLocationInfo.getLength();
 
@@ -96,9 +103,7 @@ public class ReplicatedFileChecksumHelper extends BaseFileChecksumHelper {
       blockNumBytes = getRemaining();
     }
     setRemaining(getRemaining() - blockNumBytes);
-    // for each block, send request
-    List<ContainerProtos.ChunkInfo> chunkInfos =
-        getChunkInfos(keyLocationInfo);
+
     ContainerProtos.ChecksumData checksumData =
         chunkInfos.get(0).getChecksumData();
     int bytesPerChecksum = checksumData.getBytesPerChecksum();
@@ -171,8 +176,7 @@ public class ReplicatedFileChecksumHelper extends BaseFileChecksumHelper {
       throws IOException {
     AbstractBlockChecksumComputer blockChecksumComputer =
         new ReplicatedBlockChecksumComputer(chunkInfoList);
-    // TODO: support composite CRC
-    blockChecksumComputer.compute();
+    blockChecksumComputer.compute(getCombineMode());
 
     return blockChecksumComputer.getOutByteBuffer();
   }
diff --git a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/checksum/TestReplicatedBlockChecksumComputer.java b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/checksum/TestReplicatedBlockChecksumComputer.java
index fba0773d15..7e896f327e 100644
--- a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/checksum/TestReplicatedBlockChecksumComputer.java
+++ b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/checksum/TestReplicatedBlockChecksumComputer.java
@@ -19,6 +19,9 @@ package org.apache.hadoop.ozone.client.checksum;
 
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.io.MD5Hash;
+import org.apache.hadoop.util.CrcComposer;
+import org.apache.hadoop.util.CrcUtil;
+import org.apache.hadoop.util.DataChecksum;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.junit.Test;
 
@@ -26,7 +29,10 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
+import java.util.Random;
 
+import static org.apache.hadoop.hdds.scm.OzoneClientConfig.ChecksumCombineMode.COMPOSITE_CRC;
+import static org.apache.hadoop.hdds.scm.OzoneClientConfig.ChecksumCombineMode.MD5MD5CRC;
 import static org.junit.Assert.assertArrayEquals;
 
 /**
@@ -35,34 +41,60 @@ import static org.junit.Assert.assertArrayEquals;
 public class TestReplicatedBlockChecksumComputer {
   @Test
   public void testComputeMd5Crc() throws IOException {
-    final int lenOfZeroBytes = 32;
-    byte[] emptyChunkChecksum = new byte[lenOfZeroBytes];
-    MD5Hash emptyBlockMD5 = MD5Hash.digest(emptyChunkChecksum);
+    final int lenOfBytes = 32;
+    byte[] randomChunkChecksum = new byte[lenOfBytes];
+    Random r = new Random();
+    r.nextBytes(randomChunkChecksum);
+    MD5Hash emptyBlockMD5 = MD5Hash.digest(randomChunkChecksum);
     byte[] emptyBlockMD5Hash = emptyBlockMD5.getDigest();
+    AbstractBlockChecksumComputer computer =
+        buildBlockChecksumComputer(randomChunkChecksum,
+            lenOfBytes, ContainerProtos.ChecksumType.CRC32);
+    computer.compute(MD5MD5CRC);
+    ByteBuffer output = computer.getOutByteBuffer();
+    assertArrayEquals(emptyBlockMD5Hash, output.array());
+  }
+
+  @Test
+  public void testComputeCompositeCrc() throws IOException {
+    final int lenOfBytes = 32;
+    byte[] randomChunkChecksum = new byte[lenOfBytes];
+    Random r = new Random();
+    r.nextBytes(randomChunkChecksum);
+
+    CrcComposer crcComposer =
+        CrcComposer.newCrcComposer(DataChecksum.Type.CRC32C, 4);
+    int chunkCrc = CrcUtil.readInt(randomChunkChecksum, 0);
+    crcComposer.update(chunkCrc, 4);
+    byte[] blockCompositeCRC = crcComposer.digest();
+    AbstractBlockChecksumComputer computer =
+        buildBlockChecksumComputer(randomChunkChecksum,
+            lenOfBytes, ContainerProtos.ChecksumType.CRC32C);
+    computer.compute(COMPOSITE_CRC);
+    ByteBuffer output = computer.getOutByteBuffer();
+    assertArrayEquals(blockCompositeCRC, output.array());
+  }
 
-    ByteString checkSum = ByteString.copyFrom(emptyChunkChecksum);
+  private AbstractBlockChecksumComputer buildBlockChecksumComputer(
+      byte[] checksum, int len, ContainerProtos.ChecksumType checksumType) {
+    ByteString checkSum = ByteString.copyFrom(checksum);
 
     ContainerProtos.ChecksumData checksumData =
         ContainerProtos.ChecksumData.newBuilder()
             .addChecksums(checkSum)
             .setBytesPerChecksum(4)
-            .setType(ContainerProtos.ChecksumType.CRC32)
+            .setType(checksumType)
             .build();
     ContainerProtos.ChunkInfo chunkInfo =
         ContainerProtos.ChunkInfo.newBuilder()
             .setChecksumData(checksumData)
             .setChunkName("dummy_chunk")
             .setOffset(0)
-            .setLen(lenOfZeroBytes)
+            .setLen(len)
             .build();
     List<ContainerProtos.ChunkInfo> chunkInfoList =
         Collections.singletonList(chunkInfo);
-    AbstractBlockChecksumComputer computer =
-        new ReplicatedBlockChecksumComputer(chunkInfoList);
 
-    computer.compute();
-
-    ByteBuffer output = computer.getOutByteBuffer();
-    assertArrayEquals(emptyBlockMD5Hash, output.array());
+    return new ReplicatedBlockChecksumComputer(chunkInfoList);
   }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org