You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ozone.apache.org by GitBox <gi...@apache.org> on 2022/10/26 04:34:10 UTC

[GitHub] [ozone] umamaheswararao commented on a diff in pull request #3790: HDDS-6971. EC: Add EC block checksum computer.

umamaheswararao commented on code in PR #3790:
URL: https://github.com/apache/ozone/pull/3790#discussion_r1000827053


##########
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java:
##########
@@ -88,6 +92,62 @@ public CompletableFuture<ContainerProtos.ContainerCommandResponseProto> write(
     return writeChunkToContainer(ChunkBuffer.wrap(buff));
   }
 
+  public CompletableFuture<ContainerProtos.
+      ContainerCommandResponseProto> executePutBlock(boolean close,
+      boolean force, long blockGroupLength, BlockData[] blockData)
+      throws IOException {
+
+    BlockData checksumBlockData = null;
+    //Reverse Traversal as all parity will have checksumBytes
+    for (int i = blockData.length - 1; i >= 0; i--) {
+      BlockData bd = blockData[i];
+      if (bd == null) {
+        continue;
+      }
+      List<ChunkInfo> chunks = bd.getChunks();
+      if (chunks != null && chunks.get(0).getMetadataCount() > 0) {
+        checksumBlockData = bd;
+        break;
+      }
+    }
+
+    if (checksumBlockData != null) {
+      List<ChunkInfo> currentChunks = getContainerBlockData().getChunksList();
+      List<ChunkInfo> checksumBlockDataChunks = checksumBlockData.getChunks();
+
+      Preconditions.checkArgument(
+          currentChunks.size() == checksumBlockDataChunks.size());
+      List<ChunkInfo> newChunkList = new ArrayList<>();
+
+      for (int i = 0; i < currentChunks.size(); i++) {
+        ChunkInfo chunkInfo = currentChunks.get(i);
+        ChunkInfo checksumChunk = checksumBlockDataChunks.get(i);
+
+        ChunkInfo newInfo = ChunkInfo.newBuilder(chunkInfo)
+            .addAllMetadata(checksumChunk.getMetadataList()).build();
+        newChunkList.add(newInfo);
+      }
+
+      getContainerBlockData().clearChunks();
+      getContainerBlockData().addAllChunks(newChunkList);
+    } else {
+      throw new IOException("None of the block data have checksum " +
+          "which means parity+1 blocks are not present");

Review Comment:
   Nit: instead of string parity+1, is there a way we get the actual count?



##########
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java:
##########
@@ -108,6 +168,20 @@ private void updateBlockGroupLengthInPutBlockMeta(final long blockGroupLen) {
     getContainerBlockData().addAllMetadata(metadataList); // Add updated meta.
   }
 
+  private void updateChecksum(String checksum) {
+    List<ChunkInfo> chunkInfos = getContainerBlockData().getChunksList();
+    ContainerProtos.KeyValue keyValue = ContainerProtos.KeyValue.newBuilder()
+        .setKey(STRIPE_CHECKSUM).setValue(checksum).build();
+    List<ChunkInfo> chunks = new ArrayList<>();

Review Comment:
   Nit: you can name chunks -> updatedChunks
   chunkInfos -> existingChunkInfos



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECContainerOperationClient.java:
##########
@@ -165,6 +167,17 @@ public void createRecoveringContainer(long containerID, DatanodeDetails dn,
     }
   }
 
+  Pipeline singleNodePipeline(DatanodeDetails dn,
+      ECReplicationConfig repConfig, int replicaIndex) {
+
+    Map<DatanodeDetails, Integer> dnIndexMap = new HashMap<>();

Review Comment:
   You may want to use this? ImmutableMap.of(dn, replicaIndex);



##########
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java:
##########
@@ -88,6 +92,62 @@ public CompletableFuture<ContainerProtos.ContainerCommandResponseProto> write(
     return writeChunkToContainer(ChunkBuffer.wrap(buff));
   }
 
+  public CompletableFuture<ContainerProtos.
+      ContainerCommandResponseProto> executePutBlock(boolean close,
+      boolean force, long blockGroupLength, BlockData[] blockData)
+      throws IOException {
+
+    BlockData checksumBlockData = null;
+    //Reverse Traversal as all parity will have checksumBytes
+    for (int i = blockData.length - 1; i >= 0; i--) {
+      BlockData bd = blockData[i];
+      if (bd == null) {
+        continue;
+      }
+      List<ChunkInfo> chunks = bd.getChunks();
+      if (chunks != null && chunks.get(0).getMetadataCount() > 0) {
+        checksumBlockData = bd;
+        break;
+      }
+    }
+
+    if (checksumBlockData != null) {
+      List<ChunkInfo> currentChunks = getContainerBlockData().getChunksList();
+      List<ChunkInfo> checksumBlockDataChunks = checksumBlockData.getChunks();
+
+      Preconditions.checkArgument(
+          currentChunks.size() == checksumBlockDataChunks.size());
+      List<ChunkInfo> newChunkList = new ArrayList<>();
+
+      for (int i = 0; i < currentChunks.size(); i++) {
+        ChunkInfo chunkInfo = currentChunks.get(i);
+        ChunkInfo checksumChunk = checksumBlockDataChunks.get(i);
+
+        ChunkInfo newInfo = ChunkInfo.newBuilder(chunkInfo)
+            .addAllMetadata(checksumChunk.getMetadataList()).build();
+        newChunkList.add(newInfo);
+      }
+
+      getContainerBlockData().clearChunks();
+      getContainerBlockData().addAllChunks(newChunkList);
+    } else {

Review Comment:
   getXceiverClient().getPipeline() should give you the replicationConfig. So, why don't we print the active number instead of parity+1 string?



##########
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/ECFileChecksumHelper.java:
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.checksum;
+
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.io.MD5Hash;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.CrcUtil;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The helper class to compute file checksum for EC files.
+ */
+public class ECFileChecksumHelper extends BaseFileChecksumHelper {
+  private int blockIdx;
+
+  public ECFileChecksumHelper(OzoneVolume volume, OzoneBucket bucket,
+      String keyName, long length, OzoneClientConfig.ChecksumCombineMode
+      checksumCombineMode, ClientProtocol rpcClient, OmKeyInfo keyInfo)
+      throws IOException {
+    super(volume, bucket, keyName, length, checksumCombineMode, rpcClient,
+        keyInfo);
+  }
+
+  @Override
+  protected void checksumBlocks() throws IOException {
+    long currentLength = 0;
+    for (blockIdx = 0;
+         blockIdx < getKeyLocationInfoList().size() && getRemaining() >= 0;
+         blockIdx++) {
+      OmKeyLocationInfo keyLocationInfo =
+          getKeyLocationInfoList().get(blockIdx);
+
+      if (currentLength > getLength()) {
+        return;
+      }
+
+      if (!checksumBlock(keyLocationInfo)) {
+        throw new PathIOException(getSrc(),
+            "Fail to get block checksum for " + keyLocationInfo
+                + ", checksum combine mode : {}" + getCombineMode());
+      }
+
+      currentLength += keyLocationInfo.getLength();
+    }
+  }
+
+  private boolean checksumBlock(OmKeyLocationInfo keyLocationInfo)
+      throws IOException {
+    // for each block, send request
+    List<ContainerProtos.ChunkInfo> chunkInfos =
+        getChunkInfos(keyLocationInfo);
+    if (chunkInfos.size() == 0) {
+      return false;
+    }
+
+    long blockNumBytes = keyLocationInfo.getLength();
+
+    if (getRemaining() < blockNumBytes) {
+      blockNumBytes = getRemaining();
+    }
+    setRemaining(getRemaining() - blockNumBytes);
+
+    ContainerProtos.ChecksumData checksumData =
+        chunkInfos.get(0).getChecksumData();
+    setChecksumType(checksumData.getType());
+    int bytesPerChecksum = checksumData.getBytesPerChecksum();
+    setBytesPerCRC(bytesPerChecksum);
+
+    ByteBuffer blockChecksumByteBuffer =
+        getBlockChecksumFromChunkChecksums(chunkInfos);
+    String blockChecksumForDebug =
+        populateBlockChecksumBuf(blockChecksumByteBuffer);
+
+    LOG.debug("Got reply from EC pipeline {} for block {}: blockChecksum={}, " +
+            "blockChecksumType={}",
+        keyLocationInfo.getPipeline(), keyLocationInfo.getBlockID(),
+        blockChecksumForDebug, checksumData.getType());
+    return true;
+  }
+
+  private String populateBlockChecksumBuf(
+      ByteBuffer blockChecksumByteBuffer) throws IOException {
+    String blockChecksumForDebug = null;
+    switch (getCombineMode()) {
+    case MD5MD5CRC:
+      final MD5Hash md5 = new MD5Hash(blockChecksumByteBuffer.array());
+      md5.write(getBlockChecksumBuf());
+      if (LOG.isDebugEnabled()) {
+        blockChecksumForDebug = md5.toString();
+      }
+      break;
+    case COMPOSITE_CRC:
+      byte[] crcBytes = blockChecksumByteBuffer.array();
+      if (LOG.isDebugEnabled()) {
+        blockChecksumForDebug = CrcUtil.toSingleCrcString(crcBytes);
+      }
+      getBlockChecksumBuf().write(crcBytes);
+      break;
+    default:
+      throw new IOException(
+          "Unknown combine mode: " + getCombineMode());
+    }
+
+    return blockChecksumForDebug;
+  }
+
+  private ByteBuffer getBlockChecksumFromChunkChecksums(
+      List<ContainerProtos.ChunkInfo> chunkInfos) throws IOException {
+
+    AbstractBlockChecksumComputer blockChecksumComputer =
+        new ECBlockChecksumComputer(chunkInfos, getKeyInfo());
+    blockChecksumComputer.compute(getCombineMode());
+
+    return blockChecksumComputer.getOutByteBuffer();
+  }
+
+  private List<ContainerProtos.ChunkInfo> getChunkInfos(OmKeyLocationInfo
+      keyLocationInfo) throws IOException {
+    // To read an EC block, we create a STANDALONE pipeline that contains the
+    // single location for the block index we want to read. The EC blocks are
+    // indexed from 1 to N, however the data locations are stored in the
+    // dataLocations array indexed from zero.
+    Token<OzoneBlockTokenIdentifier> token = keyLocationInfo.getToken();
+    BlockID blockID = keyLocationInfo.getBlockID();
+
+    Pipeline pipeline = keyLocationInfo.getPipeline();
+
+    List<DatanodeDetails> nodes = pipeline.getNodes();
+    List<DatanodeDetails> newNodes = new ArrayList<>();
+    ECReplicationConfig repConfig = (ECReplicationConfig)
+        pipeline.getReplicationConfig();
+    int totalNodes = repConfig.getRequiredNodes();
+    int parity = repConfig.getParity();
+
+    // Filtering the nodes that has the checksumBytes
+    for (int i = 0; i < nodes.size(); i++) {
+      if (i > 0 && i < totalNodes - parity) {
+        continue;
+      }
+      newNodes.add(nodes.get(i));
+    }
+
+    pipeline = Pipeline.newBuilder(pipeline)
+        .setReplicationConfig(StandaloneReplicationConfig
+            .getInstance(HddsProtos.ReplicationFactor.THREE))
+        .setNodes(newNodes)
+        .build();
+
+    boolean success = false;
+    List<ContainerProtos.ChunkInfo> chunks;
+    XceiverClientSpi xceiverClientSpi = null;
+    try {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Initializing BlockInputStream for get key to access {}",
+            blockID.getContainerID());
+      }
+      xceiverClientSpi =
+          getXceiverClientFactory().acquireClientForReadData(pipeline);
+
+      ContainerProtos.DatanodeBlockID datanodeBlockID = blockID
+          .getDatanodeBlockIDProtobuf();
+      ContainerProtos.GetBlockResponseProto response = ContainerProtocolCalls
+          .getBlock(xceiverClientSpi, datanodeBlockID, token);
+
+      chunks = response.getBlockData().getChunksList();
+      success = true;
+    } finally {
+      if (!success && xceiverClientSpi != null) {

Review Comment:
   If it's success, we don't want to release client?



##########
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java:
##########
@@ -261,16 +264,29 @@ Pipeline createSingleECBlockPipeline(Pipeline ecPipeline,
         .build();
   }
 
-  void executePutBlock(boolean isClose, long blockGroupLength) {
+  void executePutBlock(boolean isClose, long blockGroupLength,
+      String checksum) {
     if (!isInitialized()) {
       return;
     }
-    for (ECBlockOutputStream stream : blockOutputStreams) {
+
+    ECReplicationConfig repConfig = (ECReplicationConfig)
+        getPipeline().getReplicationConfig();
+    int totalNodes = repConfig.getRequiredNodes();
+    int parity = repConfig.getParity();
+
+    for (int idx = 0; idx < blockOutputStreams.length; idx++) {
+      ECBlockOutputStream stream = blockOutputStreams[idx];
       if (stream == null) {
         continue;
       }
       try {
-        stream.executePutBlock(isClose, true, blockGroupLength);
+        // Set checksum only for 1st node and parity nodes

Review Comment:
   same as other comment



##########
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/ECBlockChecksumComputer.java:
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.checksum;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.io.MD5Hash;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.util.CrcComposer;
+import org.apache.hadoop.util.CrcUtil;
+import org.apache.hadoop.util.DataChecksum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+
+/**
+ * The implementation of AbstractBlockChecksumComputer for EC blocks.
+ */
+public class ECBlockChecksumComputer extends AbstractBlockChecksumComputer {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ECBlockChecksumComputer.class);
+
+  private List<ContainerProtos.ChunkInfo> chunkInfoList;
+  private OmKeyInfo keyInfo;
+
+
+  public ECBlockChecksumComputer(
+      List<ContainerProtos.ChunkInfo> chunkInfoList, OmKeyInfo keyInfo) {
+    this.chunkInfoList = chunkInfoList;
+    this.keyInfo = keyInfo;
+  }
+
+  @Override
+  public void compute(OzoneClientConfig.ChecksumCombineMode combineMode)
+      throws IOException {
+    switch (combineMode) {
+    case MD5MD5CRC:
+      computeMd5Crc();
+      return;
+    case COMPOSITE_CRC:
+      computeCompositeCrc();
+      return;
+    default:
+      throw new IllegalArgumentException("unsupported combine mode");
+    }
+
+  }
+
+  private void computeMd5Crc() throws IOException {
+    Preconditions.checkArgument(chunkInfoList.size() > 0);
+
+    final ContainerProtos.ChunkInfo firstChunkInfo = chunkInfoList.get(0);
+    long chunkSize = firstChunkInfo.getLen();
+    long bytesPerCrc = firstChunkInfo.getChecksumData().getBytesPerChecksum();
+    // Total parity checksum bytes per stripe to remove
+    int parityBytes = getParityBytes(chunkSize, bytesPerCrc);
+
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+
+    for (ContainerProtos.ChunkInfo chunkInfo : chunkInfoList) {
+      ContainerProtos.KeyValue checksumKeyValue =
+          chunkInfo.getMetadata(0);
+
+      Preconditions.checkNotNull(checksumKeyValue);
+      byte[] checksumBytes =
+          checksumKeyValue.getValue().getBytes(StandardCharsets.ISO_8859_1);
+
+      Preconditions.checkArgument(checksumBytes.length % 4 == 0,
+          "Checksum Bytes size does not match");
+
+      ByteBuffer byteWrap = ByteBuffer
+          .wrap(checksumBytes, 0, checksumBytes.length - parityBytes);
+      byte[] currentChecksum = new byte[4];
+
+      while (byteWrap.hasRemaining()) {
+        byteWrap.get(currentChecksum);
+        out.write(currentChecksum);
+      }
+    }
+
+    MD5Hash fileMD5 = MD5Hash.digest(out.toByteArray());
+    setOutBytes(fileMD5.getDigest());
+
+    LOG.debug("number of chunks={}, md5out={}",
+        chunkInfoList.size(), fileMD5);
+  }
+
+  private void computeCompositeCrc() throws IOException {
+    DataChecksum.Type dataChecksumType;
+    Preconditions.checkArgument(chunkInfoList.size() > 0);
+
+    final ContainerProtos.ChunkInfo firstChunkInfo = chunkInfoList.get(0);
+    switch (firstChunkInfo.getChecksumData().getType()) {
+    case CRC32C:
+      dataChecksumType = DataChecksum.Type.CRC32C;
+      break;
+    case CRC32:
+      dataChecksumType = DataChecksum.Type.CRC32;
+      break;
+    default:
+      throw new IllegalArgumentException("unsupported checksum type: " +
+          firstChunkInfo.getChecksumData().getType());
+    }
+
+    // Bytes required to create a CRC
+    long bytesPerCrc = firstChunkInfo.getChecksumData().getBytesPerChecksum();
+    ECReplicationConfig replicationConfig =
+        (ECReplicationConfig) keyInfo.getReplicationConfig();
+    long chunkSize = replicationConfig.getEcChunkSize();
+
+    //When EC chunk size is not a multiple of ozone.client.bytes.per.checksum
+    // (default = 1MB) the last checksum in an EC chunk is only generated for
+    // offset.
+    long bytesPerCrcOffset = chunkSize % bytesPerCrc;
+
+    long keySize = keyInfo.getDataSize();
+    // Total parity checksum bytes per stripe to remove
+    int parityBytes = getParityBytes(chunkSize, bytesPerCrc);
+
+    // Number of checksum per chunk, Eg: 2MB EC chunk will
+    // have 2 checksum per chunk.
+    int numChecksumPerChunk = (int)
+        (Math.ceil((double) chunkSize / bytesPerCrc));
+
+    CrcComposer blockCrcComposer =
+        CrcComposer.newCrcComposer(dataChecksumType, bytesPerCrc);
+
+    for (ContainerProtos.ChunkInfo chunkInfo : chunkInfoList) {
+      ContainerProtos.KeyValue checksumKeyValue =
+          chunkInfo.getMetadata(0);
+
+      Preconditions.checkNotNull(checksumKeyValue);
+      byte[] checksumBytes =
+          checksumKeyValue.getValue().getBytes(StandardCharsets.ISO_8859_1);
+
+      Preconditions.checkArgument(checksumBytes.length % 4 == 0,
+          "Checksum Bytes size does not match");
+      CrcComposer chunkCrcComposer =
+          CrcComposer.newCrcComposer(dataChecksumType, bytesPerCrc);
+
+      // Limit parity bytes as they do not contribute to fileChecksum
+      ByteBuffer byteWrap = ByteBuffer
+          .wrap(checksumBytes, 0, checksumBytes.length - parityBytes);
+      byte[] currentChecksum = new byte[4];
+
+      long chunkOffsetIndex = 1;
+      while (byteWrap.hasRemaining()) {
+
+        /*
+        When chunk size is not a multiple of bytes.per.crc we get an offset.
+        For eg, RS-3-2-1524k is not a multiple of 1MB. So two checksums are
+        generated 1st checksum for 1024k bytes and 2nd checksum for 500k bytes.
+        When we reach the 2nd Checksum we need to modify the bytesPerCrc as in
+        this case 500k is the bytes for which the checksum is generated.
+        */
+        long currentChunkOffset = Long.MAX_VALUE;
+        if ((chunkOffsetIndex % numChecksumPerChunk == 0)
+            && (bytesPerCrcOffset > 0)) {
+          currentChunkOffset = bytesPerCrcOffset;
+        }
+
+        byteWrap.get(currentChecksum);
+        int checksumDataCrc = CrcUtil.readInt(currentChecksum, 0);
+        //To handle last chunk when it size is lower than 1524K in the case
+        // of rs-3-2-1524k.
+        long chunkSizePerChecksum = Math.min(Math.min(keySize, bytesPerCrc),
+            currentChunkOffset);
+        chunkCrcComposer.update(checksumDataCrc, chunkSizePerChecksum);
+
+        int chunkChecksumCrc = CrcUtil.readInt(chunkCrcComposer.digest(), 0);
+        blockCrcComposer.update(chunkChecksumCrc, chunkSizePerChecksum);
+        keySize -= Math.min(bytesPerCrc, currentChunkOffset);
+        ++chunkOffsetIndex;
+      }
+    }
+
+    //compute the composite-crc checksum of the whole block
+    byte[] compositeCrcChunkChecksum = blockCrcComposer.digest();
+    setOutBytes(compositeCrcChunkChecksum);
+

Review Comment:
   Nit: number --> Number



##########
hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientUtils.java:
##########
@@ -212,15 +213,15 @@ public static FileChecksum getFileChecksumWithCombineMode(OzoneVolume volume,
         .setRefreshPipeline(true).setSortDatanodesInPipeline(true)
         .setLatestVersionLocation(true).build();
     OmKeyInfo keyInfo = rpcClient.getOzoneManagerClient().lookupKey(keyArgs);
-
-    // TODO: return null util ECFileChecksum is implemented.
+    BaseFileChecksumHelper helper;
     if (keyInfo.getReplicationConfig()
         .getReplicationType() == HddsProtos.ReplicationType.EC) {
-      return null;

Review Comment:
   You could also create small checkSumHelper factory and create the respective instance based on replicationType.
   Ex: BaseFileChecksumHelper helper = checksumHelperFactory.getHelper(keyInfo.getReplicationConfig()
           .getReplicationType())



##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystem.java:
##########
@@ -1750,6 +1753,67 @@ public void testUnbuffer() throws IOException {
 
   }
 
+  /**
+   *  Test EC checksum with Replicated checksum.
+   */
+  @Test
+  public void testFileChecksum() throws IOException {

Review Comment:
   Can we make this test to have parameterized and run with different lengths of files? ex: partial stripes and multiple blocks, single chunk, multichunk with partial stripe and padding etc?



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java:
##########
@@ -268,10 +272,20 @@ void reconstructECBlockGroup(BlockLocationInfo blockLocationInfo,
         length -= readLen;
       }
 
+      int totalNodes = repConfig.getRequiredNodes();
+      int parity = repConfig.getParity();
+
       try {
         for (ECBlockOutputStream targetStream : targetBlockStreams) {
-          targetStream
-              .executePutBlock(true, true, blockLocationInfo.getLength());
+          long replicaIndex = targetStream.getReplicationIndex();
+          //Write checksum only to parity and 1st Replica.

Review Comment:
   Since we can get rpelica index details at stream, can we push this logic inside EC specific stream? Meaning, just call 
   targetStream.executePutBlock(true, true,
                   blockLocationInfo.getLength(), blockDataGroup);
                   
   inside executePutBlock can take case whether to write checksums or not?



##########
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/ECBlockChecksumComputer.java:
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.checksum;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.io.MD5Hash;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.util.CrcComposer;
+import org.apache.hadoop.util.CrcUtil;
+import org.apache.hadoop.util.DataChecksum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+
+/**
+ * The implementation of AbstractBlockChecksumComputer for EC blocks.
+ */
+public class ECBlockChecksumComputer extends AbstractBlockChecksumComputer {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ECBlockChecksumComputer.class);
+
+  private List<ContainerProtos.ChunkInfo> chunkInfoList;
+  private OmKeyInfo keyInfo;
+
+
+  public ECBlockChecksumComputer(
+      List<ContainerProtos.ChunkInfo> chunkInfoList, OmKeyInfo keyInfo) {
+    this.chunkInfoList = chunkInfoList;
+    this.keyInfo = keyInfo;
+  }
+
+  @Override
+  public void compute(OzoneClientConfig.ChecksumCombineMode combineMode)
+      throws IOException {
+    switch (combineMode) {
+    case MD5MD5CRC:
+      computeMd5Crc();
+      return;
+    case COMPOSITE_CRC:
+      computeCompositeCrc();
+      return;
+    default:
+      throw new IllegalArgumentException("unsupported combine mode");
+    }
+
+  }
+
+  private void computeMd5Crc() throws IOException {
+    Preconditions.checkArgument(chunkInfoList.size() > 0);
+
+    final ContainerProtos.ChunkInfo firstChunkInfo = chunkInfoList.get(0);
+    long chunkSize = firstChunkInfo.getLen();
+    long bytesPerCrc = firstChunkInfo.getChecksumData().getBytesPerChecksum();
+    // Total parity checksum bytes per stripe to remove
+    int parityBytes = getParityBytes(chunkSize, bytesPerCrc);
+
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+
+    for (ContainerProtos.ChunkInfo chunkInfo : chunkInfoList) {
+      ContainerProtos.KeyValue checksumKeyValue =
+          chunkInfo.getMetadata(0);
+
+      Preconditions.checkNotNull(checksumKeyValue);
+      byte[] checksumBytes =
+          checksumKeyValue.getValue().getBytes(StandardCharsets.ISO_8859_1);
+
+      Preconditions.checkArgument(checksumBytes.length % 4 == 0,
+          "Checksum Bytes size does not match");
+
+      ByteBuffer byteWrap = ByteBuffer
+          .wrap(checksumBytes, 0, checksumBytes.length - parityBytes);
+      byte[] currentChecksum = new byte[4];
+
+      while (byteWrap.hasRemaining()) {
+        byteWrap.get(currentChecksum);
+        out.write(currentChecksum);
+      }
+    }
+
+    MD5Hash fileMD5 = MD5Hash.digest(out.toByteArray());
+    setOutBytes(fileMD5.getDigest());
+
+    LOG.debug("number of chunks={}, md5out={}",
+        chunkInfoList.size(), fileMD5);
+  }
+
+  private void computeCompositeCrc() throws IOException {
+    DataChecksum.Type dataChecksumType;
+    Preconditions.checkArgument(chunkInfoList.size() > 0);
+
+    final ContainerProtos.ChunkInfo firstChunkInfo = chunkInfoList.get(0);
+    switch (firstChunkInfo.getChecksumData().getType()) {
+    case CRC32C:
+      dataChecksumType = DataChecksum.Type.CRC32C;
+      break;
+    case CRC32:
+      dataChecksumType = DataChecksum.Type.CRC32;
+      break;
+    default:

Review Comment:
   Nit: unsupported -> Unsupported



##########
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java:
##########
@@ -88,6 +92,62 @@ public CompletableFuture<ContainerProtos.ContainerCommandResponseProto> write(
     return writeChunkToContainer(ChunkBuffer.wrap(buff));
   }
 
+  public CompletableFuture<ContainerProtos.
+      ContainerCommandResponseProto> executePutBlock(boolean close,
+      boolean force, long blockGroupLength, BlockData[] blockData)
+      throws IOException {
+
+    BlockData checksumBlockData = null;
+    //Reverse Traversal as all parity will have checksumBytes
+    for (int i = blockData.length - 1; i >= 0; i--) {

Review Comment:
   We don;t want to read from >1 and <parity chunks right? is that optimization covered in this?



##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystem.java:
##########
@@ -1750,6 +1753,67 @@ public void testUnbuffer() throws IOException {
 
   }
 
+  /**
+   *  Test EC checksum with Replicated checksum.
+   */
+  @Test
+  public void testFileChecksum() throws IOException {
+    int dataLen = 1024 * 1024 * 5;
+    byte[] data = RandomStringUtils.randomAlphabetic(dataLen)
+        .getBytes(UTF_8);
+
+    BucketArgs.Builder builder = BucketArgs.newBuilder();
+    builder.setStorageType(StorageType.DISK);
+    builder.setBucketLayout(BucketLayout.LEGACY);
+    builder.setDefaultReplicationConfig(
+        new DefaultReplicationConfig(ReplicationType.EC,
+            new ECReplicationConfig("RS-3-2-1024k")));
+    BucketArgs omBucketArgs = builder.build();
+
+    String vol = UUID.randomUUID().toString();
+    String ecBucket = UUID.randomUUID().toString();
+    final OzoneBucket bucket101 = TestDataUtil
+        .createVolumeAndBucket(cluster, vol, ecBucket, BucketLayout.LEGACY,
+            omBucketArgs);
+
+    Assert.assertEquals(ReplicationType.EC.name(),
+        bucket101.getReplicationConfig().getReplicationType().name());
+
+    try (OzoneFSOutputStream file = adapter
+        .createFile(vol + "/" + ecBucket + "/test", (short) 3, true, false)) {
+      file.write(data);
+    }
+
+    Path parent = new Path("/" + vol + "/" + ecBucket + "/");
+    Path ecKey = new Path(parent, "test");
+    FileChecksum ecChecksum = fs.getFileChecksum(ecKey);
+    String ecChecksumString = StringUtils.byteToHexString(
+        ecChecksum.getBytes(), 0, ecChecksum.getLength());
+

Review Comment:
   Nit: Below: you can create in single statement.
   BucketArgs omBucketArgs = BucketArgs.newBuilder().setStorageType(StorageType.DISK).setBucketLayout(BucketLayout.LEGACY).build();
   ? 



##########
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java:
##########
@@ -384,6 +400,35 @@ private Stream<ECBlockOutputStream> dataStreams() {
         .filter(Objects::nonNull);
   }
 
+  public String calculateChecksum() throws IOException {
+    if (blockOutputStreams == null) {
+      throw new IOException("Block Output Stream is null");
+    }
+
+    List<ContainerProtos.ChunkInfo> chunkInfos = new ArrayList<>();
+    // First chunk should always have the additional chunks in a partial stripe.
+    int currentIdx = blockOutputStreams[0]
+        .getContainerBlockData().getChunksCount();
+    for (ECBlockOutputStream stream: blockOutputStreams) {
+      List<ContainerProtos.ChunkInfo> chunks =
+          stream.getContainerBlockData().getChunksList();
+      if (chunks.size() > currentIdx - 1) {
+        chunkInfos.add(chunks.get(currentIdx - 1));
+      }
+    }
+
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    for (ContainerProtos.ChunkInfo info : chunkInfos) {
+      List<ByteString> byteStrings = new ArrayList<>(info.getChecksumData()
+          .getChecksumsList());
+      for (ByteString byteString : byteStrings) {
+        out.write(byteString.toByteArray());
+      }
+    }
+

Review Comment:
   Why don't we use UTF_8 ?



##########
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/ECBlockChecksumComputer.java:
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.checksum;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.io.MD5Hash;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.util.CrcComposer;
+import org.apache.hadoop.util.CrcUtil;
+import org.apache.hadoop.util.DataChecksum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+
+/**
+ * The implementation of AbstractBlockChecksumComputer for EC blocks.
+ */
+public class ECBlockChecksumComputer extends AbstractBlockChecksumComputer {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ECBlockChecksumComputer.class);
+
+  private List<ContainerProtos.ChunkInfo> chunkInfoList;
+  private OmKeyInfo keyInfo;
+
+
+  public ECBlockChecksumComputer(
+      List<ContainerProtos.ChunkInfo> chunkInfoList, OmKeyInfo keyInfo) {
+    this.chunkInfoList = chunkInfoList;
+    this.keyInfo = keyInfo;
+  }
+
+  @Override
+  public void compute(OzoneClientConfig.ChecksumCombineMode combineMode)
+      throws IOException {
+    switch (combineMode) {
+    case MD5MD5CRC:
+      computeMd5Crc();
+      return;
+    case COMPOSITE_CRC:
+      computeCompositeCrc();
+      return;
+    default:
+      throw new IllegalArgumentException("unsupported combine mode");
+    }
+
+  }
+
+  private void computeMd5Crc() throws IOException {
+    Preconditions.checkArgument(chunkInfoList.size() > 0);
+
+    final ContainerProtos.ChunkInfo firstChunkInfo = chunkInfoList.get(0);
+    long chunkSize = firstChunkInfo.getLen();
+    long bytesPerCrc = firstChunkInfo.getChecksumData().getBytesPerChecksum();
+    // Total parity checksum bytes per stripe to remove
+    int parityBytes = getParityBytes(chunkSize, bytesPerCrc);
+
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+
+    for (ContainerProtos.ChunkInfo chunkInfo : chunkInfoList) {
+      ContainerProtos.KeyValue checksumKeyValue =
+          chunkInfo.getMetadata(0);
+
+      Preconditions.checkNotNull(checksumKeyValue);
+      byte[] checksumBytes =
+          checksumKeyValue.getValue().getBytes(StandardCharsets.ISO_8859_1);
+
+      Preconditions.checkArgument(checksumBytes.length % 4 == 0,
+          "Checksum Bytes size does not match");
+
+      ByteBuffer byteWrap = ByteBuffer
+          .wrap(checksumBytes, 0, checksumBytes.length - parityBytes);
+      byte[] currentChecksum = new byte[4];
+
+      while (byteWrap.hasRemaining()) {
+        byteWrap.get(currentChecksum);
+        out.write(currentChecksum);
+      }
+    }
+
+    MD5Hash fileMD5 = MD5Hash.digest(out.toByteArray());
+    setOutBytes(fileMD5.getDigest());
+
+    LOG.debug("number of chunks={}, md5out={}",

Review Comment:
   Nit: number --> Number
   md5out -> md5hash



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org