You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by ja...@apache.org on 2022/07/26 06:29:07 UTC
[ozone] branch master updated: HDDS-6980. support block composite-crc checksum (#3581)
This is an automated email from the ASF dual-hosted git repository.
jacksonyao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 72ca001178 HDDS-6980. support block composite-crc checksum (#3581)
72ca001178 is described below
commit 72ca001178c8b6f658da7de8fff34a1f956ae404
Author: Jie Yao <ja...@tencent.com>
AuthorDate: Tue Jul 26 14:29:01 2022 +0800
HDDS-6980. support block composite-crc checksum (#3581)
---
.../checksum/AbstractBlockChecksumComputer.java | 5 +-
.../checksum/ReplicatedBlockChecksumComputer.java | 82 ++++++++++++++++++++--
.../checksum/ReplicatedFileChecksumHelper.java | 16 +++--
.../TestReplicatedBlockChecksumComputer.java | 56 +++++++++++----
4 files changed, 136 insertions(+), 23 deletions(-)
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/AbstractBlockChecksumComputer.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/AbstractBlockChecksumComputer.java
index 4be13e6331..a3a1ca177b 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/AbstractBlockChecksumComputer.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/AbstractBlockChecksumComputer.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.ozone.client.checksum;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -31,7 +33,8 @@ public abstract class AbstractBlockChecksumComputer {
* Compute block checksum. The result can be obtained by getOutBytes().
* @throws IOException
*/
- public abstract void compute() throws IOException;
+ public abstract void compute(
+ OzoneClientConfig.ChecksumCombineMode combineMode) throws IOException;
public ByteBuffer getOutByteBuffer() {
return outByteBuffer;
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/ReplicatedBlockChecksumComputer.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/ReplicatedBlockChecksumComputer.java
index 2d0e198dbc..3efb57acc0 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/ReplicatedBlockChecksumComputer.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/ReplicatedBlockChecksumComputer.java
@@ -17,8 +17,13 @@
*/
package org.apache.hadoop.ozone.client.checksum;
+import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.io.MD5Hash;
+import org.apache.hadoop.util.CrcComposer;
+import org.apache.hadoop.util.CrcUtil;
+import org.apache.hadoop.util.DataChecksum;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,14 +44,23 @@ public class ReplicatedBlockChecksumComputer extends
private List<ContainerProtos.ChunkInfo> chunkInfoList;
public ReplicatedBlockChecksumComputer(
- List<ContainerProtos.ChunkInfo> chunkInfoList)
- throws IOException {
+ List<ContainerProtos.ChunkInfo> chunkInfoList) {
this.chunkInfoList = chunkInfoList;
}
@Override
- public void compute() throws IOException {
- computeMd5Crc();
+ public void compute(OzoneClientConfig.ChecksumCombineMode combineMode)
+ throws IOException {
+ switch (combineMode) {
+ case MD5MD5CRC:
+ computeMd5Crc();
+ return;
+ case COMPOSITE_CRC:
+ computeCompositeCrc();
+ return;
+ default:
+ throw new IllegalArgumentException("unsupported combine mode");
+ }
}
// compute the block checksum, which is the md5 of chunk checksums
@@ -69,4 +83,64 @@ public class ReplicatedBlockChecksumComputer extends
LOG.debug("number of chunks={}, md5out={}",
chunkInfoList.size(), fileMD5);
}
+
+ // compute the block checksum of CompositeCrc,
+ // which is the incremental computation of chunk checksums
+ private void computeCompositeCrc() throws IOException {
+ DataChecksum.Type dataChecksumType;
+ long bytesPerCrc;
+ long chunkSize;
+ Preconditions.checkArgument(chunkInfoList.size() > 0);
+
+ final ContainerProtos.ChunkInfo firstChunkInfo = chunkInfoList.get(0);
+ switch (firstChunkInfo.getChecksumData().getType()) {
+ case CRC32C:
+ dataChecksumType = DataChecksum.Type.CRC32C;
+ break;
+ case CRC32:
+ dataChecksumType = DataChecksum.Type.CRC32;
+ break;
+ default:
+ throw new IllegalArgumentException("unsupported checksum type: " +
+ firstChunkInfo.getChecksumData().getType());
+ }
+ chunkSize = firstChunkInfo.getLen();
+ bytesPerCrc = firstChunkInfo.getChecksumData().getBytesPerChecksum();
+
+
+ CrcComposer blockCrcComposer =
+ CrcComposer.newCrcComposer(dataChecksumType, chunkSize);
+
+ for (ContainerProtos.ChunkInfo chunkInfo : chunkInfoList) {
+ ContainerProtos.ChecksumData checksumData =
+ chunkInfo.getChecksumData();
+ List<ByteString> checksums = checksumData.getChecksumsList();
+ CrcComposer chunkCrcComposer =
+ CrcComposer.newCrcComposer(dataChecksumType, bytesPerCrc);
+ //compute the composite-crc checksum of the whole chunk by iterating
+ //all the checksum data one by one
+ long remainingChunkSize = chunkInfo.getLen();
+ Preconditions.checkArgument(remainingChunkSize <=
+ checksums.size() * chunkSize);
+ for (ByteString checksum : checksums) {
+ int checksumDataCrc = CrcUtil.readInt(checksum.toByteArray(), 0);
+ chunkCrcComposer.update(checksumDataCrc,
+ Math.min(bytesPerCrc, remainingChunkSize));
+ remainingChunkSize -= bytesPerCrc;
+ }
+ //get the composite-crc checksum of the whole chunk
+ int chunkChecksumCrc = CrcUtil.readInt(chunkCrcComposer.digest(), 0);
+
+ //update block checksum using chunk checksum
+ blockCrcComposer.update(chunkChecksumCrc, chunkInfo.getLen());
+ }
+
+ //compute the composite-crc checksum of the whole block
+ byte[] compositeCrcChunkChecksum = blockCrcComposer.digest();
+ setOutBytes(compositeCrcChunkChecksum);
+
+ LOG.debug("number of chunks = {}, chunk checksum type is {}, " +
+ "composite checksum = {}", chunkInfoList.size(), dataChecksumType,
+ compositeCrcChunkChecksum);
+ }
}
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/ReplicatedFileChecksumHelper.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/ReplicatedFileChecksumHelper.java
index 0364906bf9..84a44515f0 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/ReplicatedFileChecksumHelper.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/ReplicatedFileChecksumHelper.java
@@ -78,7 +78,8 @@ public class ReplicatedFileChecksumHelper extends BaseFileChecksumHelper {
if (!checksumBlock(keyLocationInfo)) {
throw new PathIOException(getSrc(),
- "Fail to get block MD5 for " + keyLocationInfo);
+ "Fail to get block checksum for " + keyLocationInfo
+ + ", checksum combine mode : {}" + getCombineMode());
}
}
}
@@ -89,6 +90,12 @@ public class ReplicatedFileChecksumHelper extends BaseFileChecksumHelper {
*/
private boolean checksumBlock(OmKeyLocationInfo keyLocationInfo)
throws IOException {
+ // for each block, send request
+ List<ContainerProtos.ChunkInfo> chunkInfos =
+ getChunkInfos(keyLocationInfo);
+ if (chunkInfos.size() == 0) {
+ return false;
+ }
long blockNumBytes = keyLocationInfo.getLength();
@@ -96,9 +103,7 @@ public class ReplicatedFileChecksumHelper extends BaseFileChecksumHelper {
blockNumBytes = getRemaining();
}
setRemaining(getRemaining() - blockNumBytes);
- // for each block, send request
- List<ContainerProtos.ChunkInfo> chunkInfos =
- getChunkInfos(keyLocationInfo);
+
ContainerProtos.ChecksumData checksumData =
chunkInfos.get(0).getChecksumData();
int bytesPerChecksum = checksumData.getBytesPerChecksum();
@@ -171,8 +176,7 @@ public class ReplicatedFileChecksumHelper extends BaseFileChecksumHelper {
throws IOException {
AbstractBlockChecksumComputer blockChecksumComputer =
new ReplicatedBlockChecksumComputer(chunkInfoList);
- // TODO: support composite CRC
- blockChecksumComputer.compute();
+ blockChecksumComputer.compute(getCombineMode());
return blockChecksumComputer.getOutByteBuffer();
}
diff --git a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/checksum/TestReplicatedBlockChecksumComputer.java b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/checksum/TestReplicatedBlockChecksumComputer.java
index fba0773d15..7e896f327e 100644
--- a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/checksum/TestReplicatedBlockChecksumComputer.java
+++ b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/checksum/TestReplicatedBlockChecksumComputer.java
@@ -19,6 +19,9 @@ package org.apache.hadoop.ozone.client.checksum;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.io.MD5Hash;
+import org.apache.hadoop.util.CrcComposer;
+import org.apache.hadoop.util.CrcUtil;
+import org.apache.hadoop.util.DataChecksum;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.junit.Test;
@@ -26,7 +29,10 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
+import java.util.Random;
+import static org.apache.hadoop.hdds.scm.OzoneClientConfig.ChecksumCombineMode.COMPOSITE_CRC;
+import static org.apache.hadoop.hdds.scm.OzoneClientConfig.ChecksumCombineMode.MD5MD5CRC;
import static org.junit.Assert.assertArrayEquals;
/**
@@ -35,34 +41,60 @@ import static org.junit.Assert.assertArrayEquals;
public class TestReplicatedBlockChecksumComputer {
@Test
public void testComputeMd5Crc() throws IOException {
- final int lenOfZeroBytes = 32;
- byte[] emptyChunkChecksum = new byte[lenOfZeroBytes];
- MD5Hash emptyBlockMD5 = MD5Hash.digest(emptyChunkChecksum);
+ final int lenOfBytes = 32;
+ byte[] randomChunkChecksum = new byte[lenOfBytes];
+ Random r = new Random();
+ r.nextBytes(randomChunkChecksum);
+ MD5Hash emptyBlockMD5 = MD5Hash.digest(randomChunkChecksum);
byte[] emptyBlockMD5Hash = emptyBlockMD5.getDigest();
+ AbstractBlockChecksumComputer computer =
+ buildBlockChecksumComputer(randomChunkChecksum,
+ lenOfBytes, ContainerProtos.ChecksumType.CRC32);
+ computer.compute(MD5MD5CRC);
+ ByteBuffer output = computer.getOutByteBuffer();
+ assertArrayEquals(emptyBlockMD5Hash, output.array());
+ }
+
+ @Test
+ public void testComputeCompositeCrc() throws IOException {
+ final int lenOfBytes = 32;
+ byte[] randomChunkChecksum = new byte[lenOfBytes];
+ Random r = new Random();
+ r.nextBytes(randomChunkChecksum);
+
+ CrcComposer crcComposer =
+ CrcComposer.newCrcComposer(DataChecksum.Type.CRC32C, 4);
+ int chunkCrc = CrcUtil.readInt(randomChunkChecksum, 0);
+ crcComposer.update(chunkCrc, 4);
+ byte[] blockCompositeCRC = crcComposer.digest();
+ AbstractBlockChecksumComputer computer =
+ buildBlockChecksumComputer(randomChunkChecksum,
+ lenOfBytes, ContainerProtos.ChecksumType.CRC32C);
+ computer.compute(COMPOSITE_CRC);
+ ByteBuffer output = computer.getOutByteBuffer();
+ assertArrayEquals(blockCompositeCRC, output.array());
+ }
- ByteString checkSum = ByteString.copyFrom(emptyChunkChecksum);
+ private AbstractBlockChecksumComputer buildBlockChecksumComputer(
+ byte[] checksum, int len, ContainerProtos.ChecksumType checksumType) {
+ ByteString checkSum = ByteString.copyFrom(checksum);
ContainerProtos.ChecksumData checksumData =
ContainerProtos.ChecksumData.newBuilder()
.addChecksums(checkSum)
.setBytesPerChecksum(4)
- .setType(ContainerProtos.ChecksumType.CRC32)
+ .setType(checksumType)
.build();
ContainerProtos.ChunkInfo chunkInfo =
ContainerProtos.ChunkInfo.newBuilder()
.setChecksumData(checksumData)
.setChunkName("dummy_chunk")
.setOffset(0)
- .setLen(lenOfZeroBytes)
+ .setLen(len)
.build();
List<ContainerProtos.ChunkInfo> chunkInfoList =
Collections.singletonList(chunkInfo);
- AbstractBlockChecksumComputer computer =
- new ReplicatedBlockChecksumComputer(chunkInfoList);
- computer.compute();
-
- ByteBuffer output = computer.getOutByteBuffer();
- assertArrayEquals(emptyBlockMD5Hash, output.array());
+ return new ReplicatedBlockChecksumComputer(chunkInfoList);
}
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org