You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2019/02/19 19:08:19 UTC

[hadoop] branch trunk updated: HDDS-1121. Key read failure when data is written parallel in to Ozone. Contributed by Bharat Viswanadham.

This is an automated email from the ASF dual-hosted git repository.

aengineer pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 02d04bd  HDDS-1121. Key read failure when data is written parallel in to Ozone. Contributed by Bharat Viswanadham.
02d04bd is described below

commit 02d04bd1073c2ae89fd0b208eba239a469559f90
Author: Anu Engineer <ae...@apache.org>
AuthorDate: Tue Feb 19 10:55:33 2019 -0800

    HDDS-1121. Key read failure when data is written parallel in to Ozone.
    Contributed by Bharat Viswanadham.
---
 .../hadoop/hdds/scm/storage/BlockOutputStream.java | 14 +++--
 .../org/apache/hadoop/ozone/common/Checksum.java   |  2 +
 .../ozone/client/io/BlockOutputStreamEntry.java    | 33 +++++++-----
 .../hadoop/ozone/client/io/KeyOutputStream.java    | 37 +++++++++----
 .../apache/hadoop/ozone/client/rpc/RpcClient.java  | 20 +++----
 .../client/rpc/TestOzoneRpcClientAbstract.java     | 63 ++++++++++++++++++++++
 .../web/storage/DistributedStorageHandler.java     | 22 ++++----
 7 files changed, 144 insertions(+), 47 deletions(-)

diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
index 4c6cd7b..f3ba656 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
@@ -29,6 +29,7 @@ import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
@@ -76,7 +77,8 @@ public class BlockOutputStream extends OutputStream {
   private final BlockData.Builder containerBlockData;
   private XceiverClientManager xceiverClientManager;
   private XceiverClientSpi xceiverClient;
-  private final Checksum checksum;
+  private final ContainerProtos.ChecksumType checksumType;
+  private final int bytesPerChecksum;
   private final String streamId;
   private int chunkIndex;
   private int chunkSize;
@@ -121,14 +123,16 @@ public class BlockOutputStream extends OutputStream {
    * @param streamBufferFlushSize flush size
    * @param streamBufferMaxSize   max size of the currentBuffer
    * @param watchTimeout          watch timeout
-   * @param checksum              checksum
+   * @param checksumType          checksum type
+   * @param bytesPerChecksum      Bytes per checksum
    */
   @SuppressWarnings("parameternumber")
   public BlockOutputStream(BlockID blockID, String key,
       XceiverClientManager xceiverClientManager, Pipeline pipeline,
       String traceID, int chunkSize, long streamBufferFlushSize,
       long streamBufferMaxSize, long watchTimeout, List<ByteBuffer> bufferList,
-      Checksum checksum) throws IOException {
+      ChecksumType checksumType, int bytesPerChecksum)
+      throws IOException {
     this.blockID = blockID;
     this.key = key;
     this.traceID = traceID;
@@ -146,7 +150,8 @@ public class BlockOutputStream extends OutputStream {
     this.streamBufferMaxSize = streamBufferMaxSize;
     this.watchTimeout = watchTimeout;
     this.bufferList = bufferList;
-    this.checksum = checksum;
+    this.checksumType = checksumType;
+    this.bytesPerChecksum = bytesPerChecksum;
 
     // A single thread executor handle the responses of async requests
     responseExecutor = Executors.newSingleThreadExecutor();
@@ -585,6 +590,7 @@ public class BlockOutputStream extends OutputStream {
   private void writeChunkToContainer(ByteBuffer chunk) throws IOException {
     int effectiveChunkSize = chunk.remaining();
     ByteString data = ByteString.copyFrom(chunk);
+    Checksum checksum = new Checksum(checksumType, bytesPerChecksum);
     ChecksumData checksumData = checksum.computeChecksum(data);
     ChunkInfo chunkInfo = ChunkInfo.newBuilder()
         .setChunkName(DigestUtils.md5Hex(key) + "_stream_" + streamId +
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java
index 56ad71b..2777535 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java
@@ -38,6 +38,8 @@ import org.slf4j.LoggerFactory;
 
 /**
  * Class to compute and verify checksums for chunks.
+ *
+ * This class is not thread safe.
  */
 public class Checksum {
 
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
index 7ba1766..5ae7e8b 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
@@ -23,11 +23,12 @@ import java.nio.ByteBuffer;
 import java.util.List;
 
 import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ChecksumType;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
-import org.apache.hadoop.ozone.common.Checksum;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 
@@ -41,7 +42,8 @@ public final class BlockOutputStreamEntry extends OutputStream {
   private final String key;
   private final XceiverClientManager xceiverClientManager;
   private final Pipeline pipeline;
-  private final Checksum checksum;
+  private final ChecksumType checksumType;
+  private final int bytesPerChecksum;
   private final String requestId;
   private final int chunkSize;
   // total number of bytes that should be written to this stream
@@ -60,7 +62,8 @@ public final class BlockOutputStreamEntry extends OutputStream {
       XceiverClientManager xceiverClientManager,
       Pipeline pipeline, String requestId, int chunkSize,
       long length, long streamBufferFlushSize, long streamBufferMaxSize,
-      long watchTimeout, List<ByteBuffer> bufferList, Checksum checksum,
+      long watchTimeout, List<ByteBuffer> bufferList,
+      ChecksumType checksumType, int bytesPerChecksum,
       Token<OzoneBlockTokenIdentifier> token) {
     this.outputStream = null;
     this.blockID = blockID;
@@ -76,7 +79,8 @@ public final class BlockOutputStreamEntry extends OutputStream {
     this.streamBufferMaxSize = streamBufferMaxSize;
     this.watchTimeout = watchTimeout;
     this.bufferList = bufferList;
-    this.checksum = checksum;
+    this.checksumType = checksumType;
+    this.bytesPerChecksum = bytesPerChecksum;
   }
 
   long getLength() {
@@ -105,7 +109,8 @@ public final class BlockOutputStreamEntry extends OutputStream {
       this.outputStream =
           new BlockOutputStream(blockID, key, xceiverClientManager,
               pipeline, requestId, chunkSize, streamBufferFlushSize,
-              streamBufferMaxSize, watchTimeout, bufferList, checksum);
+              streamBufferMaxSize, watchTimeout, bufferList, checksumType,
+              bytesPerChecksum);
     }
   }
 
@@ -198,10 +203,16 @@ public final class BlockOutputStreamEntry extends OutputStream {
     private long watchTimeout;
     private List<ByteBuffer> bufferList;
     private Token<OzoneBlockTokenIdentifier> token;
-    private Checksum checksum;
+    private ChecksumType checksumType;
+    private int bytesPerChecksum;
 
-    public Builder setChecksum(Checksum cs) {
-      this.checksum = cs;
+    public Builder setChecksumType(ChecksumType type) {
+      this.checksumType = type;
+      return this;
+    }
+
+    public Builder setBytesPerChecksum(int bytes) {
+      this.bytesPerChecksum = bytes;
       return this;
     }
 
@@ -270,7 +281,7 @@ public final class BlockOutputStreamEntry extends OutputStream {
       return new BlockOutputStreamEntry(blockID, key,
           xceiverClientManager, pipeline, requestId, chunkSize,
           length, streamBufferFlushSize, streamBufferMaxSize, watchTimeout,
-          bufferList, checksum, token);
+          bufferList, checksumType, bytesPerChecksum, token);
     }
   }
 
@@ -294,10 +305,6 @@ public final class BlockOutputStreamEntry extends OutputStream {
     return pipeline;
   }
 
-  public Checksum getChecksum() {
-    return checksum;
-  }
-
   public String getRequestId() {
     return requestId;
   }
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
index 44972ae..d74a240 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
@@ -21,10 +21,12 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.fs.FSExceptionMessages;
 import org.apache.hadoop.fs.FileEncryptionInfo;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ChecksumType;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
-import org.apache.hadoop.ozone.common.Checksum;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.om.helpers.*;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
@@ -79,7 +81,8 @@ public class KeyOutputStream extends OutputStream {
   private final long streamBufferMaxSize;
   private final long watchTimeout;
   private final long blockSize;
-  private final Checksum checksum;
+  private final int bytesPerChecksum;
+  private final ChecksumType checksumType;
   private List<ByteBuffer> bufferList;
   private OmMultipartCommitUploadPartInfo commitUploadPartInfo;
   private FileEncryptionInfo feInfo;
@@ -106,7 +109,10 @@ public class KeyOutputStream extends OutputStream {
     bufferList.add(buffer);
     watchTimeout = 0;
     blockSize = 0;
-    this.checksum = new Checksum();
+    this.checksumType = ChecksumType.valueOf(
+        OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE_DEFAULT);
+    this.bytesPerChecksum = OzoneConfigKeys
+        .OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT_BYTES; // Default is 1MB
   }
 
   @VisibleForTesting
@@ -142,7 +148,8 @@ public class KeyOutputStream extends OutputStream {
       OzoneManagerProtocolClientSideTranslatorPB omClient, int chunkSize,
       String requestId, ReplicationFactor factor, ReplicationType type,
       long bufferFlushSize, long bufferMaxSize, long size, long watchTimeout,
-      Checksum checksum, String uploadID, int partNumber, boolean isMultipart) {
+      ChecksumType checksumType, int bytesPerChecksum,
+      String uploadID, int partNumber, boolean isMultipart) {
     this.streamEntries = new ArrayList<>();
     this.currentStreamIndex = 0;
     this.omClient = omClient;
@@ -165,7 +172,8 @@ public class KeyOutputStream extends OutputStream {
     this.streamBufferMaxSize = bufferMaxSize;
     this.blockSize = size;
     this.watchTimeout = watchTimeout;
-    this.checksum = checksum;
+    this.bytesPerChecksum = bytesPerChecksum;
+    this.checksumType = checksumType;
 
     Preconditions.checkState(chunkSize > 0);
     Preconditions.checkState(streamBufferFlushSize > 0);
@@ -220,7 +228,8 @@ public class KeyOutputStream extends OutputStream {
             .setStreamBufferMaxSize(streamBufferMaxSize)
             .setWatchTimeout(watchTimeout)
             .setBufferList(bufferList)
-            .setChecksum(checksum)
+            .setChecksumType(checksumType)
+            .setBytesPerChecksum(bytesPerChecksum)
             .setToken(subKeyInfo.getToken());
     streamEntries.add(builder.build());
   }
@@ -573,7 +582,8 @@ public class KeyOutputStream extends OutputStream {
     private long streamBufferMaxSize;
     private long blockSize;
     private long watchTimeout;
-    private Checksum checksum;
+    private ChecksumType checksumType;
+    private int bytesPerChecksum;
     private String multipartUploadID;
     private int multipartNumber;
     private boolean isMultipartKey;
@@ -651,8 +661,13 @@ public class KeyOutputStream extends OutputStream {
       return this;
     }
 
-    public Builder setChecksum(Checksum checksumObj){
-      this.checksum = checksumObj;
+    public Builder setChecksumType(ChecksumType cType){
+      this.checksumType = cType;
+      return this;
+    }
+
+    public Builder setBytesPerChecksum(int bytes){
+      this.bytesPerChecksum = bytes;
       return this;
     }
 
@@ -664,8 +679,8 @@ public class KeyOutputStream extends OutputStream {
     public KeyOutputStream build() throws IOException {
       return new KeyOutputStream(openHandler, xceiverManager, scmClient,
           omClient, chunkSize, requestID, factor, type, streamBufferFlushSize,
-          streamBufferMaxSize, blockSize, watchTimeout, checksum,
-          multipartUploadID, multipartNumber, isMultipartKey);
+          streamBufferMaxSize, blockSize, watchTimeout, checksumType,
+          bytesPerChecksum, multipartUploadID, multipartNumber, isMultipartKey);
     }
   }
 
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index eb27b7b..fec0530 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -48,7 +48,6 @@ import org.apache.hadoop.ozone.client.io.LengthInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
-import org.apache.hadoop.ozone.common.Checksum;
 import org.apache.hadoop.ozone.om.helpers.BucketEncryptionKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
@@ -110,7 +109,8 @@ public class RpcClient implements ClientProtocol {
       ozoneManagerClient;
   private final XceiverClientManager xceiverClientManager;
   private final int chunkSize;
-  private final Checksum checksum;
+  private final ChecksumType checksumType;
+  private final int bytesPerChecksum;
   private final UserGroupInformation ugi;
   private final OzoneAcl.OzoneACLRights userRights;
   private final OzoneAcl.OzoneACLRights groupRights;
@@ -189,22 +189,22 @@ public class RpcClient implements ClientProtocol {
         OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM,
         OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT,
         StorageUnit.BYTES);
-    int checksumSize;
     if(configuredChecksumSize <
         OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE) {
       LOG.warn("The checksum size ({}) is not allowed to be less than the " +
               "minimum size ({}), resetting to the minimum size.",
           configuredChecksumSize,
           OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE);
-      checksumSize = OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE;
+      bytesPerChecksum =
+          OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE;
     } else {
-      checksumSize = configuredChecksumSize;
+      bytesPerChecksum = configuredChecksumSize;
     }
+
     String checksumTypeStr = conf.get(
         OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE,
         OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE_DEFAULT);
-    ChecksumType checksumType = ChecksumType.valueOf(checksumTypeStr);
-    this.checksum = new Checksum(checksumType, checksumSize);
+    checksumType = ChecksumType.valueOf(checksumTypeStr);
   }
 
   private InetSocketAddress getScmAddressForClient() throws IOException {
@@ -602,7 +602,8 @@ public class RpcClient implements ClientProtocol {
             .setStreamBufferMaxSize(streamBufferMaxSize)
             .setWatchTimeout(watchTimeout)
             .setBlockSize(blockSize)
-            .setChecksum(checksum)
+            .setChecksumType(checksumType)
+            .setBytesPerChecksum(bytesPerChecksum)
             .build();
     groupOutputStream.addPreallocateBlocks(
         openKey.getKeyInfo().getLatestVersionLocations(),
@@ -863,7 +864,8 @@ public class RpcClient implements ClientProtocol {
             .setStreamBufferMaxSize(streamBufferMaxSize)
             .setWatchTimeout(watchTimeout)
             .setBlockSize(blockSize)
-            .setChecksum(checksum)
+            .setBytesPerChecksum(bytesPerChecksum)
+            .setChecksumType(checksumType)
             .setMultipartNumber(partNumber)
             .setMultipartUploadID(uploadID)
             .setIsMultipartKey(true)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
index 4e77bfd..73a7963 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
@@ -27,6 +27,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.hdds.client.OzoneQuota;
 import org.apache.hadoop.hdds.client.ReplicationFactor;
@@ -681,6 +684,66 @@ public abstract class TestOzoneRpcClientAbstract {
     }
   }
 
+
+  @Test
+  public void testPutKeyRatisThreeNodesParallel() throws IOException,
+      InterruptedException {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    long currentTime = Time.now();
+    store.createVolume(volumeName);
+    OzoneVolume volume = store.getVolume(volumeName);
+    volume.createBucket(bucketName);
+    OzoneBucket bucket = volume.getBucket(bucketName);
+
+    CountDownLatch latch = new CountDownLatch(2);
+    AtomicInteger failCount = new AtomicInteger(0);
+
+    Runnable r = () -> {
+      try {
+        for (int i = 0; i < 5; i++) {
+          String keyName = UUID.randomUUID().toString();
+          String data = generateData(5 * 1024 * 1024,
+              (byte) RandomUtils.nextLong()).toString();
+          OzoneOutputStream out = bucket.createKey(keyName,
+              data.getBytes().length, ReplicationType.RATIS,
+              ReplicationFactor.THREE, new HashMap<>());
+          out.write(data.getBytes());
+          out.close();
+          OzoneKey key = bucket.getKey(keyName);
+          Assert.assertEquals(keyName, key.getName());
+          OzoneInputStream is = bucket.readKey(keyName);
+          byte[] fileContent = new byte[data.getBytes().length];
+          is.read(fileContent);
+          is.close();
+          Assert.assertTrue(verifyRatisReplication(volumeName, bucketName,
+              keyName, ReplicationType.RATIS,
+              ReplicationFactor.THREE));
+          Assert.assertEquals(data, new String(fileContent));
+          Assert.assertTrue(key.getCreationTime() >= currentTime);
+          Assert.assertTrue(key.getModificationTime() >= currentTime);
+        }
+        latch.countDown();
+      } catch (IOException ex) {
+        latch.countDown();
+        failCount.incrementAndGet();
+      }
+    };
+
+    Thread thread1 = new Thread(r);
+    Thread thread2 = new Thread(r);
+
+    thread1.start();
+    thread2.start();
+
+    latch.await(600, TimeUnit.SECONDS);
+
+    if (failCount.get() > 0) {
+      fail("testPutKeyRatisThreeNodesParallel failed");
+    }
+
+  }
+
   private void readKey(OzoneBucket bucket, String keyName, String data)
       throws IOException {
     OzoneKey key = bucket.getKey(keyName);
diff --git a/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java b/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
index b73f297..8197ce8 100644
--- a/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
+++ b/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
@@ -24,11 +24,11 @@ import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ChecksumType;
 import org.apache.hadoop.ozone.client.io.KeyInputStream;
 import org.apache.hadoop.ozone.client.io.KeyOutputStream;
 import org.apache.hadoop.ozone.client.io.LengthInputStream;
-import org.apache.hadoop.ozone.common.Checksum;
 import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
@@ -86,7 +86,8 @@ public final class DistributedStorageHandler implements StorageHandler {
   private final long streamBufferMaxSize;
   private final long watchTimeout;
   private final long blockSize;
-  private final Checksum checksum;
+  private final ChecksumType checksumType;
+  private final int bytesPerChecksum;
 
   /**
    * Creates a new DistributedStorageHandler.
@@ -136,23 +137,23 @@ public final class DistributedStorageHandler implements StorageHandler {
         OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM,
         OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT,
         StorageUnit.BYTES);
-    int checksumSize;
+
     if(configuredChecksumSize <
         OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE) {
       LOG.warn("The checksum size ({}) is not allowed to be less than the " +
               "minimum size ({}), resetting to the minimum size.",
           configuredChecksumSize,
           OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE);
-      checksumSize = OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE;
+      bytesPerChecksum =
+          OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE;
     } else {
-      checksumSize = configuredChecksumSize;
+      bytesPerChecksum = configuredChecksumSize;
     }
     String checksumTypeStr = conf.get(
         OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE,
         OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE_DEFAULT);
-    ContainerProtos.ChecksumType checksumType = ContainerProtos.ChecksumType
-        .valueOf(checksumTypeStr);
-    this.checksum = new Checksum(checksumType, checksumSize);
+    this.checksumType = ChecksumType.valueOf(checksumTypeStr);
+
   }
 
   @Override
@@ -451,7 +452,8 @@ public final class DistributedStorageHandler implements StorageHandler {
             .setStreamBufferMaxSize(streamBufferMaxSize)
             .setBlockSize(blockSize)
             .setWatchTimeout(watchTimeout)
-            .setChecksum(checksum)
+            .setChecksumType(checksumType)
+            .setBytesPerChecksum(bytesPerChecksum)
             .build();
     groupOutputStream.addPreallocateBlocks(
         openKey.getKeyInfo().getLatestVersionLocations(),


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org