You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2019/02/19 19:08:19 UTC
[hadoop] branch trunk updated: HDDS-1121. Key read failure when
data is written parallel in to Ozone. Contributed by Bharat Viswanadham.
This is an automated email from the ASF dual-hosted git repository.
aengineer pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 02d04bd HDDS-1121. Key read failure when data is written parallel in to Ozone. Contributed by Bharat Viswanadham.
02d04bd is described below
commit 02d04bd1073c2ae89fd0b208eba239a469559f90
Author: Anu Engineer <ae...@apache.org>
AuthorDate: Tue Feb 19 10:55:33 2019 -0800
HDDS-1121. Key read failure when data is written parallel in to Ozone.
Contributed by Bharat Viswanadham.
---
.../hadoop/hdds/scm/storage/BlockOutputStream.java | 14 +++--
.../org/apache/hadoop/ozone/common/Checksum.java | 2 +
.../ozone/client/io/BlockOutputStreamEntry.java | 33 +++++++-----
.../hadoop/ozone/client/io/KeyOutputStream.java | 37 +++++++++----
.../apache/hadoop/ozone/client/rpc/RpcClient.java | 20 +++----
.../client/rpc/TestOzoneRpcClientAbstract.java | 63 ++++++++++++++++++++++
.../web/storage/DistributedStorageHandler.java | 22 ++++----
7 files changed, 144 insertions(+), 47 deletions(-)
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
index 4c6cd7b..f3ba656 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
@@ -29,6 +29,7 @@ import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
@@ -76,7 +77,8 @@ public class BlockOutputStream extends OutputStream {
private final BlockData.Builder containerBlockData;
private XceiverClientManager xceiverClientManager;
private XceiverClientSpi xceiverClient;
- private final Checksum checksum;
+ private final ContainerProtos.ChecksumType checksumType;
+ private final int bytesPerChecksum;
private final String streamId;
private int chunkIndex;
private int chunkSize;
@@ -121,14 +123,16 @@ public class BlockOutputStream extends OutputStream {
* @param streamBufferFlushSize flush size
* @param streamBufferMaxSize max size of the currentBuffer
* @param watchTimeout watch timeout
- * @param checksum checksum
+ * @param checksumType checksum type
+ * @param bytesPerChecksum Bytes per checksum
*/
@SuppressWarnings("parameternumber")
public BlockOutputStream(BlockID blockID, String key,
XceiverClientManager xceiverClientManager, Pipeline pipeline,
String traceID, int chunkSize, long streamBufferFlushSize,
long streamBufferMaxSize, long watchTimeout, List<ByteBuffer> bufferList,
- Checksum checksum) throws IOException {
+ ChecksumType checksumType, int bytesPerChecksum)
+ throws IOException {
this.blockID = blockID;
this.key = key;
this.traceID = traceID;
@@ -146,7 +150,8 @@ public class BlockOutputStream extends OutputStream {
this.streamBufferMaxSize = streamBufferMaxSize;
this.watchTimeout = watchTimeout;
this.bufferList = bufferList;
- this.checksum = checksum;
+ this.checksumType = checksumType;
+ this.bytesPerChecksum = bytesPerChecksum;
// A single thread executor handle the responses of async requests
responseExecutor = Executors.newSingleThreadExecutor();
@@ -585,6 +590,7 @@ public class BlockOutputStream extends OutputStream {
private void writeChunkToContainer(ByteBuffer chunk) throws IOException {
int effectiveChunkSize = chunk.remaining();
ByteString data = ByteString.copyFrom(chunk);
+ Checksum checksum = new Checksum(checksumType, bytesPerChecksum);
ChecksumData checksumData = checksum.computeChecksum(data);
ChunkInfo chunkInfo = ChunkInfo.newBuilder()
.setChunkName(DigestUtils.md5Hex(key) + "_stream_" + streamId +
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java
index 56ad71b..2777535 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java
@@ -38,6 +38,8 @@ import org.slf4j.LoggerFactory;
/**
* Class to compute and verify checksums for chunks.
+ *
+ * This class is not thread safe.
*/
public class Checksum {
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
index 7ba1766..5ae7e8b 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
@@ -23,11 +23,12 @@ import java.nio.ByteBuffer;
import java.util.List;
import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+ .ChecksumType;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
-import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
@@ -41,7 +42,8 @@ public final class BlockOutputStreamEntry extends OutputStream {
private final String key;
private final XceiverClientManager xceiverClientManager;
private final Pipeline pipeline;
- private final Checksum checksum;
+ private final ChecksumType checksumType;
+ private final int bytesPerChecksum;
private final String requestId;
private final int chunkSize;
// total number of bytes that should be written to this stream
@@ -60,7 +62,8 @@ public final class BlockOutputStreamEntry extends OutputStream {
XceiverClientManager xceiverClientManager,
Pipeline pipeline, String requestId, int chunkSize,
long length, long streamBufferFlushSize, long streamBufferMaxSize,
- long watchTimeout, List<ByteBuffer> bufferList, Checksum checksum,
+ long watchTimeout, List<ByteBuffer> bufferList,
+ ChecksumType checksumType, int bytesPerChecksum,
Token<OzoneBlockTokenIdentifier> token) {
this.outputStream = null;
this.blockID = blockID;
@@ -76,7 +79,8 @@ public final class BlockOutputStreamEntry extends OutputStream {
this.streamBufferMaxSize = streamBufferMaxSize;
this.watchTimeout = watchTimeout;
this.bufferList = bufferList;
- this.checksum = checksum;
+ this.checksumType = checksumType;
+ this.bytesPerChecksum = bytesPerChecksum;
}
long getLength() {
@@ -105,7 +109,8 @@ public final class BlockOutputStreamEntry extends OutputStream {
this.outputStream =
new BlockOutputStream(blockID, key, xceiverClientManager,
pipeline, requestId, chunkSize, streamBufferFlushSize,
- streamBufferMaxSize, watchTimeout, bufferList, checksum);
+ streamBufferMaxSize, watchTimeout, bufferList, checksumType,
+ bytesPerChecksum);
}
}
@@ -198,10 +203,16 @@ public final class BlockOutputStreamEntry extends OutputStream {
private long watchTimeout;
private List<ByteBuffer> bufferList;
private Token<OzoneBlockTokenIdentifier> token;
- private Checksum checksum;
+ private ChecksumType checksumType;
+ private int bytesPerChecksum;
- public Builder setChecksum(Checksum cs) {
- this.checksum = cs;
+ public Builder setChecksumType(ChecksumType type) {
+ this.checksumType = type;
+ return this;
+ }
+
+ public Builder setBytesPerChecksum(int bytes) {
+ this.bytesPerChecksum = bytes;
return this;
}
@@ -270,7 +281,7 @@ public final class BlockOutputStreamEntry extends OutputStream {
return new BlockOutputStreamEntry(blockID, key,
xceiverClientManager, pipeline, requestId, chunkSize,
length, streamBufferFlushSize, streamBufferMaxSize, watchTimeout,
- bufferList, checksum, token);
+ bufferList, checksumType, bytesPerChecksum, token);
}
}
@@ -294,10 +305,6 @@ public final class BlockOutputStreamEntry extends OutputStream {
return pipeline;
}
- public Checksum getChecksum() {
- return checksum;
- }
-
public String getRequestId() {
return requestId;
}
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
index 44972ae..d74a240 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
@@ -21,10 +21,12 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FileEncryptionInfo;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+ .ChecksumType;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
-import org.apache.hadoop.ozone.common.Checksum;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.om.helpers.*;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
@@ -79,7 +81,8 @@ public class KeyOutputStream extends OutputStream {
private final long streamBufferMaxSize;
private final long watchTimeout;
private final long blockSize;
- private final Checksum checksum;
+ private final int bytesPerChecksum;
+ private final ChecksumType checksumType;
private List<ByteBuffer> bufferList;
private OmMultipartCommitUploadPartInfo commitUploadPartInfo;
private FileEncryptionInfo feInfo;
@@ -106,7 +109,10 @@ public class KeyOutputStream extends OutputStream {
bufferList.add(buffer);
watchTimeout = 0;
blockSize = 0;
- this.checksum = new Checksum();
+ this.checksumType = ChecksumType.valueOf(
+ OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE_DEFAULT);
+ this.bytesPerChecksum = OzoneConfigKeys
+ .OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT_BYTES; // Default is 1MB
}
@VisibleForTesting
@@ -142,7 +148,8 @@ public class KeyOutputStream extends OutputStream {
OzoneManagerProtocolClientSideTranslatorPB omClient, int chunkSize,
String requestId, ReplicationFactor factor, ReplicationType type,
long bufferFlushSize, long bufferMaxSize, long size, long watchTimeout,
- Checksum checksum, String uploadID, int partNumber, boolean isMultipart) {
+ ChecksumType checksumType, int bytesPerChecksum,
+ String uploadID, int partNumber, boolean isMultipart) {
this.streamEntries = new ArrayList<>();
this.currentStreamIndex = 0;
this.omClient = omClient;
@@ -165,7 +172,8 @@ public class KeyOutputStream extends OutputStream {
this.streamBufferMaxSize = bufferMaxSize;
this.blockSize = size;
this.watchTimeout = watchTimeout;
- this.checksum = checksum;
+ this.bytesPerChecksum = bytesPerChecksum;
+ this.checksumType = checksumType;
Preconditions.checkState(chunkSize > 0);
Preconditions.checkState(streamBufferFlushSize > 0);
@@ -220,7 +228,8 @@ public class KeyOutputStream extends OutputStream {
.setStreamBufferMaxSize(streamBufferMaxSize)
.setWatchTimeout(watchTimeout)
.setBufferList(bufferList)
- .setChecksum(checksum)
+ .setChecksumType(checksumType)
+ .setBytesPerChecksum(bytesPerChecksum)
.setToken(subKeyInfo.getToken());
streamEntries.add(builder.build());
}
@@ -573,7 +582,8 @@ public class KeyOutputStream extends OutputStream {
private long streamBufferMaxSize;
private long blockSize;
private long watchTimeout;
- private Checksum checksum;
+ private ChecksumType checksumType;
+ private int bytesPerChecksum;
private String multipartUploadID;
private int multipartNumber;
private boolean isMultipartKey;
@@ -651,8 +661,13 @@ public class KeyOutputStream extends OutputStream {
return this;
}
- public Builder setChecksum(Checksum checksumObj){
- this.checksum = checksumObj;
+ public Builder setChecksumType(ChecksumType cType){
+ this.checksumType = cType;
+ return this;
+ }
+
+ public Builder setBytesPerChecksum(int bytes){
+ this.bytesPerChecksum = bytes;
return this;
}
@@ -664,8 +679,8 @@ public class KeyOutputStream extends OutputStream {
public KeyOutputStream build() throws IOException {
return new KeyOutputStream(openHandler, xceiverManager, scmClient,
omClient, chunkSize, requestID, factor, type, streamBufferFlushSize,
- streamBufferMaxSize, blockSize, watchTimeout, checksum,
- multipartUploadID, multipartNumber, isMultipartKey);
+ streamBufferMaxSize, blockSize, watchTimeout, checksumType,
+ bytesPerChecksum, multipartUploadID, multipartNumber, isMultipartKey);
}
}
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index eb27b7b..fec0530 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -48,7 +48,6 @@ import org.apache.hadoop.ozone.client.io.LengthInputStream;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
-import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.ozone.om.helpers.BucketEncryptionKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
@@ -110,7 +109,8 @@ public class RpcClient implements ClientProtocol {
ozoneManagerClient;
private final XceiverClientManager xceiverClientManager;
private final int chunkSize;
- private final Checksum checksum;
+ private final ChecksumType checksumType;
+ private final int bytesPerChecksum;
private final UserGroupInformation ugi;
private final OzoneAcl.OzoneACLRights userRights;
private final OzoneAcl.OzoneACLRights groupRights;
@@ -189,22 +189,22 @@ public class RpcClient implements ClientProtocol {
OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM,
OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT,
StorageUnit.BYTES);
- int checksumSize;
if(configuredChecksumSize <
OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE) {
LOG.warn("The checksum size ({}) is not allowed to be less than the " +
"minimum size ({}), resetting to the minimum size.",
configuredChecksumSize,
OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE);
- checksumSize = OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE;
+ bytesPerChecksum =
+ OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE;
} else {
- checksumSize = configuredChecksumSize;
+ bytesPerChecksum = configuredChecksumSize;
}
+
String checksumTypeStr = conf.get(
OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE,
OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE_DEFAULT);
- ChecksumType checksumType = ChecksumType.valueOf(checksumTypeStr);
- this.checksum = new Checksum(checksumType, checksumSize);
+ checksumType = ChecksumType.valueOf(checksumTypeStr);
}
private InetSocketAddress getScmAddressForClient() throws IOException {
@@ -602,7 +602,8 @@ public class RpcClient implements ClientProtocol {
.setStreamBufferMaxSize(streamBufferMaxSize)
.setWatchTimeout(watchTimeout)
.setBlockSize(blockSize)
- .setChecksum(checksum)
+ .setChecksumType(checksumType)
+ .setBytesPerChecksum(bytesPerChecksum)
.build();
groupOutputStream.addPreallocateBlocks(
openKey.getKeyInfo().getLatestVersionLocations(),
@@ -863,7 +864,8 @@ public class RpcClient implements ClientProtocol {
.setStreamBufferMaxSize(streamBufferMaxSize)
.setWatchTimeout(watchTimeout)
.setBlockSize(blockSize)
- .setChecksum(checksum)
+ .setBytesPerChecksum(bytesPerChecksum)
+ .setChecksumType(checksumType)
.setMultipartNumber(partNumber)
.setMultipartUploadID(uploadID)
.setIsMultipartKey(true)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
index 4e77bfd..73a7963 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
@@ -27,6 +27,9 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hdds.client.OzoneQuota;
import org.apache.hadoop.hdds.client.ReplicationFactor;
@@ -681,6 +684,66 @@ public abstract class TestOzoneRpcClientAbstract {
}
}
+
+ @Test
+ public void testPutKeyRatisThreeNodesParallel() throws IOException,
+ InterruptedException {
+ String volumeName = UUID.randomUUID().toString();
+ String bucketName = UUID.randomUUID().toString();
+ long currentTime = Time.now();
+ store.createVolume(volumeName);
+ OzoneVolume volume = store.getVolume(volumeName);
+ volume.createBucket(bucketName);
+ OzoneBucket bucket = volume.getBucket(bucketName);
+
+ CountDownLatch latch = new CountDownLatch(2);
+ AtomicInteger failCount = new AtomicInteger(0);
+
+ Runnable r = () -> {
+ try {
+ for (int i = 0; i < 5; i++) {
+ String keyName = UUID.randomUUID().toString();
+ String data = generateData(5 * 1024 * 1024,
+ (byte) RandomUtils.nextLong()).toString();
+ OzoneOutputStream out = bucket.createKey(keyName,
+ data.getBytes().length, ReplicationType.RATIS,
+ ReplicationFactor.THREE, new HashMap<>());
+ out.write(data.getBytes());
+ out.close();
+ OzoneKey key = bucket.getKey(keyName);
+ Assert.assertEquals(keyName, key.getName());
+ OzoneInputStream is = bucket.readKey(keyName);
+ byte[] fileContent = new byte[data.getBytes().length];
+ is.read(fileContent);
+ is.close();
+ Assert.assertTrue(verifyRatisReplication(volumeName, bucketName,
+ keyName, ReplicationType.RATIS,
+ ReplicationFactor.THREE));
+ Assert.assertEquals(data, new String(fileContent));
+ Assert.assertTrue(key.getCreationTime() >= currentTime);
+ Assert.assertTrue(key.getModificationTime() >= currentTime);
+ }
+ latch.countDown();
+ } catch (IOException ex) {
+ latch.countDown();
+ failCount.incrementAndGet();
+ }
+ };
+
+ Thread thread1 = new Thread(r);
+ Thread thread2 = new Thread(r);
+
+ thread1.start();
+ thread2.start();
+
+ latch.await(600, TimeUnit.SECONDS);
+
+ if (failCount.get() > 0) {
+ fail("testPutKeyRatisThreeNodesParallel failed");
+ }
+
+ }
+
private void readKey(OzoneBucket bucket, String keyName, String data)
throws IOException {
OzoneKey key = bucket.getKey(keyName);
diff --git a/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java b/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
index b73f297..8197ce8 100644
--- a/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
+++ b/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
@@ -24,11 +24,11 @@ import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ozone.OzoneConfigKeys;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+ .ChecksumType;
import org.apache.hadoop.ozone.client.io.KeyInputStream;
import org.apache.hadoop.ozone.client.io.KeyOutputStream;
import org.apache.hadoop.ozone.client.io.LengthInputStream;
-import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
@@ -86,7 +86,8 @@ public final class DistributedStorageHandler implements StorageHandler {
private final long streamBufferMaxSize;
private final long watchTimeout;
private final long blockSize;
- private final Checksum checksum;
+ private final ChecksumType checksumType;
+ private final int bytesPerChecksum;
/**
* Creates a new DistributedStorageHandler.
@@ -136,23 +137,23 @@ public final class DistributedStorageHandler implements StorageHandler {
OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM,
OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT,
StorageUnit.BYTES);
- int checksumSize;
+
if(configuredChecksumSize <
OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE) {
LOG.warn("The checksum size ({}) is not allowed to be less than the " +
"minimum size ({}), resetting to the minimum size.",
configuredChecksumSize,
OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE);
- checksumSize = OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE;
+ bytesPerChecksum =
+ OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE;
} else {
- checksumSize = configuredChecksumSize;
+ bytesPerChecksum = configuredChecksumSize;
}
String checksumTypeStr = conf.get(
OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE,
OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE_DEFAULT);
- ContainerProtos.ChecksumType checksumType = ContainerProtos.ChecksumType
- .valueOf(checksumTypeStr);
- this.checksum = new Checksum(checksumType, checksumSize);
+ this.checksumType = ChecksumType.valueOf(checksumTypeStr);
+
}
@Override
@@ -451,7 +452,8 @@ public final class DistributedStorageHandler implements StorageHandler {
.setStreamBufferMaxSize(streamBufferMaxSize)
.setBlockSize(blockSize)
.setWatchTimeout(watchTimeout)
- .setChecksum(checksum)
+ .setChecksumType(checksumType)
+ .setBytesPerChecksum(bytesPerChecksum)
.build();
groupOutputStream.addPreallocateBlocks(
openKey.getKeyInfo().getLatestVersionLocations(),
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org